You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by xuanyuanking <gi...@git.apache.org> on 2018/12/03 13:39:48 UTC

[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

GitHub user xuanyuanking opened a pull request:

    https://github.com/apache/spark/pull/23207

    [SPARK-26193][SQL] Implement shuffle write metrics in SQL

    ## What changes were proposed in this pull request?
    
    1. Implement `SQLShuffleWriteMetricsReporter` on the SQL side as the customized `ShuffleWriteMetricsReporter`.
    2. Add shuffle write metrics to `ShuffleExchangeExec`, and use these metrics to create corresponding `SQLShuffleWriteMetricsReporter` in shuffle dependency.
    3. Expand current `ShuffleWriteMetrics` in context as a proxy, register the shuffle write metrics reporter to it during ShuffleMapTask is created on executor.
    
    ## How was this patch tested?
    Add UT in SQLMetricsSuite.
    Manually test locally, update screen shot to document attached in JIRA.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/xuanyuanking/spark SPARK-26193

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/23207.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #23207
    
----
commit 6b26c629439973045da77f7bcd4b852afe8ebd8b
Author: Yuanjian Li <xy...@...>
Date:   2018-12-02T12:19:55Z

    Commit for fist time success

commit a8a1225837419c99a3d9941046a2ca6b501f6dc8
Author: Yuanjian Li <xy...@...>
Date:   2018-12-03T12:06:34Z

    Simplify implement by add metrics in ShuffleExchangeExec

commit 7c8e5161904f1fd0fa4d99e6c497ef1be3542bdb
Author: Yuanjian Li <xy...@...>
Date:   2018-12-03T12:40:41Z

    code clean and comments

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239548704
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala ---
    @@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter {
         FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"),
         RECORDS_READ -> SQLMetrics.createMetric(sc, "records read"))
     }
    +
    +/**
    + * A shuffle write metrics reporter for SQL exchange operators. Different with
    + * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in
    + * shuffle dependency, so the local SQLMetric should transient and create on executor.
    + * @param metrics Shuffle write metrics in current SparkPlan.
    + * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter.
    + */
    +private[spark] case class SQLShuffleWriteMetricsReporter(
    +    metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter)
    --- End diff --
    
    Reimplement done in a780b70.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r238630981
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
    @@ -92,6 +92,12 @@ private[spark] class ShuffleMapTask(
           threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
         } else 0L
     
    +    // Register the shuffle write metrics reporter to shuffleWriteMetrics.
    +    if (dep.shuffleWriteMetricsReporter.isDefined) {
    +      context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter(
    --- End diff --
    
    This happens per-task, I think `ShuffleWriteMetrics.externalReporters` can be option instead of array.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99782 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99782/testReport)** for PR 23207 at commit [`d5ee249`](https://github.com/apache/spark/commit/d5ee2493478d11ba688172d4b27a15b18beaf559).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99825 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99825/testReport)** for PR 23207 at commit [`bc2c4f1`](https://github.com/apache/spark/commit/bc2c4f187f8037aa540b0e6bae5d90d7d6e3509d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99722 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99722/testReport)** for PR 23207 at commit [`cf35b9f`](https://github.com/apache/spark/commit/cf35b9f948f174a5726a7feba611224c4ac495e7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r238732441
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
    @@ -92,6 +92,12 @@ private[spark] class ShuffleMapTask(
           threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
         } else 0L
     
    +    // Register the shuffle write metrics reporter to shuffleWriteMetrics.
    +    if (dep.shuffleWriteMetricsReporter.isDefined) {
    +      context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter(
    --- End diff --
    
    Cool! That's a more cleaner implementation on consistency for both read and write metrics reporter, also read metrics can extend `ShuffleReadMetricsReporter` directly. Done in ca6c407


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    @AmplabJenkins retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99676/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239677653
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala ---
    @@ -333,8 +343,19 @@ object ShuffleExchangeExec {
           new ShuffleDependency[Int, InternalRow, InternalRow](
             rddWithPartitionIds,
             new PartitionIdPassthrough(part.numPartitions),
    -        serializer)
    +        serializer,
    +        shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics))
     
         dependency
       }
    +
    +  /**
    +   * Create a customized [[ShuffleWriteProcessor]] for SQL which wrap the default metrics reporter
    +   * with [[SQLShuffleWriteMetricsReporter]] as new reporter for [[ShuffleWriteProcessor]].
    +   */
    +  def createShuffleWriteProcessor(metrics: Map[String, SQLMetric]): ShuffleWriteProcessor = {
    +    (reporter: ShuffleWriteMetricsReporter) => {
    --- End diff --
    
    does this work with Scala 2.11? maybe we don't need to be that fancy and just write
    ```
    new ShuffleWriteProcessor {
      xxx
    }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239736660
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala ---
    @@ -78,6 +80,7 @@ object SQLMetrics {
       private val SUM_METRIC = "sum"
       private val SIZE_METRIC = "size"
       private val TIMING_METRIC = "timing"
    +  private val NORMALIZE_TIMING_METRIC = "normalizeTiming"
    --- End diff --
    
    `private val NS_TIMING_METRIC = "nsTiming"`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99616 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99616/testReport)** for PR 23207 at commit [`7c8e516`](https://github.com/apache/spark/commit/7c8e5161904f1fd0fa4d99e6c497ef1be3542bdb).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99805 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99805/testReport)** for PR 23207 at commit [`6378a3d`](https://github.com/apache/spark/commit/6378a3d4707b0d7559fca20220229cde71f9a64b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99617 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99617/testReport)** for PR 23207 at commit [`7c8e516`](https://github.com/apache/spark/commit/7c8e5161904f1fd0fa4d99e6c497ef1be3542bdb).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239308007
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -38,12 +38,18 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
       override def outputPartitioning: Partitioning = SinglePartition
       override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
       private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
    -  override lazy val metrics = SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
    +  private val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
    +  override lazy val metrics =
    --- End diff --
    
    this is somewhat confusing. I'd create a variable for the read metrics so you can pass just that into the ShuffledRDD.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99677/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    ```
    the code looks much cleaner now!
    ```
    Sorry for the original rush and code, I should and will pay more attention on coding clean and more discussion on optional implementation.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99643 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99643/testReport)** for PR 23207 at commit [`7c8e516`](https://github.com/apache/spark/commit/7c8e5161904f1fd0fa4d99e6c497ef1be3542bdb).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5673/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99643/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r238836448
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala ---
    @@ -50,3 +50,57 @@ private[spark] trait ShuffleWriteMetricsReporter {
       private[spark] def decBytesWritten(v: Long): Unit
       private[spark] def decRecordsWritten(v: Long): Unit
     }
    +
    +
    +/**
    + * A proxy class of ShuffleWriteMetricsReporter which proxy all metrics updating to the input
    + * reporters.
    + */
    +private[spark] class GroupedShuffleWriteMetricsReporter(
    --- End diff --
    
    I'd not create a general API here. Just put one in SQL similar to the read side that also calls the default one.
    
    It can be expensive to go through a seq for each record and bytes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99736/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99817 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99817/testReport)** for PR 23207 at commit [`6378a3d`](https://github.com/apache/spark/commit/6378a3d4707b0d7559fca20220229cde71f9a64b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239995006
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -38,13 +38,21 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
       override def outputPartitioning: Partitioning = SinglePartition
       override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
       private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
    -  override lazy val metrics = SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
    +  private lazy val writeMetrics =
    +    SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
    +  private lazy val readMetrics =
    +    SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
    --- End diff --
    
    Yea that was done and revert in https://github.com/apache/spark/pull/23207/commits/7d104ebe854effb3d8ceb63cd408b9749cee1a8a, will separate to another pr after this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/23207


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    cc @cloud-fan @gatorsmile @rxin 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239744840
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.shuffle
    +
    +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.scheduler.MapStatus
    +
    +/**
    + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor
    + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask.
    + */
    +private[spark] class ShuffleWriteProcessor extends Serializable with Logging {
    +
    +  /**
    +   * Create a [[ShuffleWriteMetricsReporter]] from the task context, always return a proxy
    +   * reporter for both local accumulator and original reporter updating. As the reporter is a
    +   * per-row operator, here need a careful consideration on performance.
    +   */
    +  def createMetricsReporter(context: TaskContext): ShuffleWriteMetricsReporter = {
    --- End diff --
    
    Yea, thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5767/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99736 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99736/testReport)** for PR 23207 at commit [`76d1ca0`](https://github.com/apache/spark/commit/76d1ca0036bbb50a005e9d12f8b22bf21697af7f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Thanks for your reply Wenchen, there's a sketch doc assigned in JIRA:https://docs.google.com/document/d/1DX0gLkpk_NCE5MwI1_m4gnA2rLdjDkynZ02u2VWDR-8/edit
    
    ```
    IMO shuffle write metrics is hard, as an RDD can have shuffle dependencies with multiple upstream RDDs. That said, in general the shuffle write metrics should belong to the upstream RDDs.
    ```
    That's right and that's also what I try to do at first, logically upstream operator trigger shuffle write, and first attempt implementation is also changed SparkPlan base class to achieve this.
    
    ```
    In Spark SQL, it's a little simpler, as the ShuffledRowRDD always have only one child, so it's reasonable to say that shuffle write metrics belong to ShuffledRowRDD.
    ```
    Yes, maybe this also the suggestion by Reynold, ShuffleExchangeExec has only one child, we can do a simplify on the implementation. But as the shuffle write metrics are updated by task inner, so the core module still need some changes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5819/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    @xuanyuanking can you separate the prs to rename read side metric and the write side change?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    @AmplabJenkins


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239735015
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.shuffle
    +
    +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.scheduler.MapStatus
    +
    +/**
    + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor
    + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask.
    + */
    +private[spark] class ShuffleWriteProcessor extends Serializable with Logging {
    +
    +  /**
    +   * Create a [[ShuffleWriteMetricsReporter]] from the task context, always return a proxy
    +   * reporter for both local accumulator and original reporter updating. As the reporter is a
    +   * per-row operator, here need a careful consideration on performance.
    +   */
    +  def createMetricsReporter(context: TaskContext): ShuffleWriteMetricsReporter = {
    --- End diff --
    
    this can be protected?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99752 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99752/testReport)** for PR 23207 at commit [`9966c2a`](https://github.com/apache/spark/commit/9966c2abc821492d5f5c6c74034407879c764573).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    thanks, merging to master!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239677325
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.shuffle
    +
    +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.scheduler.MapStatus
    +
    +/**
    + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor
    + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask.
    + */
    +private[spark] trait ShuffleWriteProcessor extends Serializable with Logging {
    +
    +  /**
    +   * Create a [[ShuffleWriteMetricsReporter]] from the default reporter, always return a proxy
    +   * reporter for both local accumulator and original reporter updating. As the reporter is a
    +   * per-row operator, here need a careful consideration on performance.
    +   */
    +  def createMetricsReporter(reporter: ShuffleWriteMetricsReporter): ShuffleWriteMetricsReporter
    --- End diff --
    
    how about `def createMetricsReporter(context: TaskContext)`?
    
    Then in core it's implemented as
    ```
    context.taskMetrics().shuffleWriteMetrics
    ```
    
    and in SQL
    ```
    new SQLShuffle.....Reporter(context.taskMetrics().shuffleWriteMetrics)
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    test this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r238845399
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -299,12 +312,25 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
           val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
           val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key", "value")
           // Assume the execution plan is
    -      // ... -> ShuffledHashJoin(nodeId = 1) -> Project(nodeId = 0)
    +      // Project(nodeId = 0)
    +      // +- ShuffledHashJoin(nodeId = 1)
    +      // :- Exchange(nodeId = 2)
    +      // :  +- Project(nodeId = 3)
    +      // :     +- LocalTableScan(nodeId = 4)
    +      // +- Exchange(nodeId = 5)
    +      // +- Project(nodeId = 6)
    +      // +- LocalTableScan(nodeId = 7)
           val df = df1.join(df2, "key")
           testSparkPlanMetrics(df, 1, Map(
             1L -> (("ShuffledHashJoin", Map(
               "number of output rows" -> 2L,
    -          "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))))
    +          "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))),
    +        2L -> (("Exchange", Map(
    +          "shuffle records written" -> 2L,
    +          "records read" -> 2L))),
    --- End diff --
    
    is this always going to be the same as "shuffle records written" ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239308197
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala ---
    @@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter {
         FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"),
         RECORDS_READ -> SQLMetrics.createMetric(sc, "records read"))
     }
    +
    +/**
    + * A shuffle write metrics reporter for SQL exchange operators. Different with
    + * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in
    + * shuffle dependency, so the local SQLMetric should transient and create on executor.
    + * @param metrics Shuffle write metrics in current SparkPlan.
    + * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter.
    + */
    +private[spark] case class SQLShuffleWriteMetricsReporter(
    +    metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter)
    --- End diff --
    
    why are there two parameter list here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239735425
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala ---
    @@ -78,6 +80,7 @@ object SQLMetrics {
       private val SUM_METRIC = "sum"
       private val SIZE_METRIC = "size"
       private val TIMING_METRIC = "timing"
    +  private val NORMALIZE_TIMING_METRIC = "normalizeTiming"
    --- End diff --
    
    maybe `nsToMsTiming`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239744767
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala ---
    @@ -78,6 +80,7 @@ object SQLMetrics {
       private val SUM_METRIC = "sum"
       private val SIZE_METRIC = "size"
       private val TIMING_METRIC = "timing"
    +  private val NORMALIZE_TIMING_METRIC = "normalizeTiming"
    --- End diff --
    
    Thanks, there's no back and forth, thanks for your advise and help all along Wenchen. :)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239060606
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala ---
    @@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter {
         FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"),
         RECORDS_READ -> SQLMetrics.createMetric(sc, "records read"))
     }
    +
    +/**
    + * A shuffle write metrics reporter for SQL exchange operators. Different with
    + * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in
    + * shuffle dependency, so the local SQLMetric should transient and create on executor.
    + * @param metrics Shuffle write metrics in current SparkPlan.
    + * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter.
    + */
    +private[spark] case class SQLShuffleWriteMetricsReporter(
    +    metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter)
    +  extends ShuffleWriteMetricsReporter with Serializable {
    +  @transient private[this] lazy val _bytesWritten =
    +    metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN)
    +  @transient private[this] lazy val _recordsWritten =
    +    metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN)
    +  @transient private[this] lazy val _writeTime =
    +    metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)
    +
    +  override private[spark] def incBytesWritten(v: Long): Unit = {
    +    metricsReporter.incBytesWritten(v)
    +    _bytesWritten.add(v)
    +  }
    +  override private[spark] def decRecordsWritten(v: Long): Unit = {
    +    metricsReporter.decBytesWritten(v)
    +    _recordsWritten.set(_recordsWritten.value - v)
    +  }
    +  override private[spark] def incRecordsWritten(v: Long): Unit = {
    +    metricsReporter.incRecordsWritten(v)
    +    _recordsWritten.add(v)
    +  }
    +  override private[spark] def incWriteTime(v: Long): Unit = {
    +    metricsReporter.incWriteTime(v)
    +    _writeTime.add(v)
    +  }
    +  override private[spark] def decBytesWritten(v: Long): Unit = {
    +    metricsReporter.decBytesWritten(v)
    +    _bytesWritten.set(_bytesWritten.value - v)
    +  }
    +}
    +
    +private[spark] object SQLShuffleWriteMetricsReporter {
    +  val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten"
    +  val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten"
    +  val SHUFFLE_WRITE_TIME = "shuffleWriteTime"
    --- End diff --
    
    do we have other time metrics using nanoseconds?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r238630996
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
    @@ -92,6 +92,12 @@ private[spark] class ShuffleMapTask(
           threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
         } else 0L
     
    +    // Register the shuffle write metrics reporter to shuffleWriteMetrics.
    +    if (dep.shuffleWriteMetricsReporter.isDefined) {
    +      context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter(
    --- End diff --
    
    This happens per-task, I think `ShuffleWriteMetrics.externalReporters` can be option instead of array.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99677 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99677/testReport)** for PR 23207 at commit [`fcd62b3`](https://github.com/apache/spark/commit/fcd62b390ba4b5e2b1b9c6138026ac6da1b78d1f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239698174
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.shuffle
    +
    +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.scheduler.MapStatus
    +
    +/**
    + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor
    + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask.
    + */
    +private[spark] trait ShuffleWriteProcessor extends Serializable with Logging {
    +
    +  /**
    +   * Create a [[ShuffleWriteMetricsReporter]] from the default reporter, always return a proxy
    +   * reporter for both local accumulator and original reporter updating. As the reporter is a
    +   * per-row operator, here need a careful consideration on performance.
    +   */
    +  def createMetricsReporter(reporter: ShuffleWriteMetricsReporter): ShuffleWriteMetricsReporter
    --- End diff --
    
    Copy, the trait can be added when we need more customization for SQL shuffle. Done in 6378a3d.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99617 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99617/testReport)** for PR 23207 at commit [`7c8e516`](https://github.com/apache/spark/commit/7c8e5161904f1fd0fa4d99e6c497ef1be3542bdb).
     * This patch **fails from timeout after a configured wait of `400m`**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239050549
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -170,13 +172,23 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
         val df = testData2.groupBy().agg(collect_set('a)) // 2 partitions
         testSparkPlanMetrics(df, 1, Map(
           2L -> (("ObjectHashAggregate", Map("number of output rows" -> 2L))),
    +      1L -> (("Exchange", Map(
    +        "shuffle records written" -> 2L,
    +        "records read" -> 2L,
    +        "local blocks fetched" -> 2L,
    --- End diff --
    
    I agree "fetch" is a more code name in `ShuffleBlockFetcherIterator`, but do you mean just change the display in ui? Cause there's many place even api.scala use the name `localBlocksFetched`, change them all maybe not a good choice for code backport, WDYT?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5849/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    the code looks much cleaner now!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99722 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99722/testReport)** for PR 23207 at commit [`cf35b9f`](https://github.com/apache/spark/commit/cf35b9f948f174a5726a7feba611224c4ac495e7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239308706
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala ---
    @@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter {
         FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"),
         RECORDS_READ -> SQLMetrics.createMetric(sc, "records read"))
     }
    +
    +/**
    + * A shuffle write metrics reporter for SQL exchange operators. Different with
    + * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in
    + * shuffle dependency, so the local SQLMetric should transient and create on executor.
    + * @param metrics Shuffle write metrics in current SparkPlan.
    + * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter.
    + */
    +private[spark] case class SQLShuffleWriteMetricsReporter(
    +    metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter)
    +  extends ShuffleWriteMetricsReporter with Serializable {
    +  @transient private[this] lazy val _bytesWritten =
    +    metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN)
    +  @transient private[this] lazy val _recordsWritten =
    +    metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN)
    +  @transient private[this] lazy val _writeTime =
    +    metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)
    +
    +  override private[spark] def incBytesWritten(v: Long): Unit = {
    +    metricsReporter.incBytesWritten(v)
    +    _bytesWritten.add(v)
    +  }
    +  override private[spark] def decRecordsWritten(v: Long): Unit = {
    +    metricsReporter.decBytesWritten(v)
    +    _recordsWritten.set(_recordsWritten.value - v)
    +  }
    +  override private[spark] def incRecordsWritten(v: Long): Unit = {
    +    metricsReporter.incRecordsWritten(v)
    +    _recordsWritten.add(v)
    +  }
    +  override private[spark] def incWriteTime(v: Long): Unit = {
    +    metricsReporter.incWriteTime(v)
    +    _writeTime.add(v)
    +  }
    +  override private[spark] def decBytesWritten(v: Long): Unit = {
    +    metricsReporter.decBytesWritten(v)
    +    _bytesWritten.set(_bytesWritten.value - v)
    +  }
    +}
    +
    +private[spark] object SQLShuffleWriteMetricsReporter {
    +  val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten"
    +  val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten"
    +  val SHUFFLE_WRITE_TIME = "shuffleWriteTime"
    --- End diff --
    
    yea i think we can just report ms level granularity. no point reporting ns (although we might want to measure based on ns)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5791/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5729/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    @SparkQA retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99617/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99817 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99817/testReport)** for PR 23207 at commit [`6378a3d`](https://github.com/apache/spark/commit/6378a3d4707b0d7559fca20220229cde71f9a64b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    ```    var writer: ShuffleWriter[Any, Any] = null
        try {
          val manager = SparkEnv.get.shuffleManager
          writer = manager.getWriter[Any, Any](
            dep.shuffleHandle, partitionId, context, context.taskMetrics().shuffleWriteMetrics)
          writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
          writer.stop(success = true).get
        } catch {
          case e: Exception =>
            try {
              if (writer != null) {
                writer.stop(success = false)
              }
            } catch {
              case e: Exception =>
                log.debug("Could not stop writer", e)
            }
            throw e
        }```
    
    Can we put the above in a closure and pass it into shuffle dependency? Then in SQL we just put the above in SQL using custom metrics.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239698500
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala ---
    @@ -78,6 +80,7 @@ object SQLMetrics {
       private val SUM_METRIC = "sum"
       private val SIZE_METRIC = "size"
       private val TIMING_METRIC = "timing"
    +  private val NS_TIMING_METRIC = "nanosecond"
    --- End diff --
    
    How about naming it as `NORMALIZE_TIMING_METRIC`, maybe it can be reused later for other timing metric which need normalize unit. If you think its strange name I'll change back.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99677 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99677/testReport)** for PR 23207 at commit [`fcd62b3`](https://github.com/apache/spark/commit/fcd62b390ba4b5e2b1b9c6138026ac6da1b78d1f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5700/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99782/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r238843017
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala ---
    @@ -163,6 +171,8 @@ object SQLMetrics {
             Utils.bytesToString
           } else if (metricsType == TIMING_METRIC) {
             Utils.msDurationToString
    +      } else if (metricsType == NANO_TIMING_METRIC) {
    +        duration => Utils.msDurationToString(duration / 100000)
    --- End diff --
    
    is this the right conversion from nanosecs to millisecs?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r238837000
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala ---
    @@ -50,3 +50,57 @@ private[spark] trait ShuffleWriteMetricsReporter {
       private[spark] def decBytesWritten(v: Long): Unit
       private[spark] def decRecordsWritten(v: Long): Unit
     }
    +
    +
    +/**
    + * A proxy class of ShuffleWriteMetricsReporter which proxy all metrics updating to the input
    + * reporters.
    + */
    +private[spark] class GroupedShuffleWriteMetricsReporter(
    +    reporters: Seq[ShuffleWriteMetricsReporter]) extends ShuffleWriteMetricsReporter {
    +  override private[spark] def incBytesWritten(v: Long): Unit = {
    +    reporters.foreach(_.incBytesWritten(v))
    +  }
    +  override private[spark] def decRecordsWritten(v: Long): Unit = {
    +    reporters.foreach(_.decRecordsWritten(v))
    +  }
    +  override private[spark] def incRecordsWritten(v: Long): Unit = {
    +    reporters.foreach(_.incRecordsWritten(v))
    +  }
    +  override private[spark] def incWriteTime(v: Long): Unit = {
    +    reporters.foreach(_.incWriteTime(v))
    +  }
    +  override private[spark] def decBytesWritten(v: Long): Unit = {
    +    reporters.foreach(_.decBytesWritten(v))
    +  }
    +}
    +
    +
    +/**
    + * A proxy class of ShuffleReadMetricsReporter which proxy all metrics updating to the input
    + * reporters.
    + */
    +private[spark] class GroupedShuffleReadMetricsReporter(
    --- End diff --
    
    Again - I think your old approach is much better. No point creating a general util when there is only one implementation without any known future needs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239312090
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -170,13 +172,23 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
         val df = testData2.groupBy().agg(collect_set('a)) // 2 partitions
         testSparkPlanMetrics(df, 1, Map(
           2L -> (("ObjectHashAggregate", Map("number of output rows" -> 2L))),
    +      1L -> (("Exchange", Map(
    +        "shuffle records written" -> 2L,
    +        "records read" -> 2L,
    +        "local blocks fetched" -> 2L,
    --- End diff --
    
    Copy, the display text will be done in another pr.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    test this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99782 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99782/testReport)** for PR 23207 at commit [`d5ee249`](https://github.com/apache/spark/commit/d5ee2493478d11ba688172d4b27a15b18beaf559).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99825 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99825/testReport)** for PR 23207 at commit [`bc2c4f1`](https://github.com/apache/spark/commit/bc2c4f187f8037aa540b0e6bae5d90d7d6e3509d).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239049121
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala ---
    @@ -78,6 +78,7 @@ object SQLMetrics {
       private val SUM_METRIC = "sum"
       private val SIZE_METRIC = "size"
       private val TIMING_METRIC = "timing"
    +  private val NANO_TIMING_METRIC = "nanosecond"
    --- End diff --
    
    Done in cf35b9f.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239308829
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -170,13 +172,23 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
         val df = testData2.groupBy().agg(collect_set('a)) // 2 partitions
         testSparkPlanMetrics(df, 1, Map(
           2L -> (("ObjectHashAggregate", Map("number of output rows" -> 2L))),
    +      1L -> (("Exchange", Map(
    +        "shuffle records written" -> 2L,
    +        "records read" -> 2L,
    +        "local blocks fetched" -> 2L,
    --- End diff --
    
    yea i'd just change the display text here, and not change the api


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239067552
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala ---
    @@ -163,6 +171,8 @@ object SQLMetrics {
             Utils.bytesToString
           } else if (metricsType == TIMING_METRIC) {
             Utils.msDurationToString
    +      } else if (metricsType == NS_TIMING_METRIC) {
    +        duration => Utils.msDurationToString(duration / 1000 / 1000)
    --- End diff --
    
    Maybe it's ok, as I test this locally with UT in SQLMetricsSuites, result below:
    ```
    shuffle records written: 2
    shuffle write time total (min, med, max): 37 ms (37 ms, 37 ms, 37 ms)
    shuffle bytes written total (min, med, max): 66.0 B (66.0 B, 66.0 B, 66.0 
    ```
    In the actual scenario the shuffle bytes written will be more larger, and keep the time to ms maybe enough, WDYT?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99676 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99676/testReport)** for PR 23207 at commit [`ca6c407`](https://github.com/apache/spark/commit/ca6c407929e62492a2c5233504efaeaf731f8cc9).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99752 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99752/testReport)** for PR 23207 at commit [`9966c2a`](https://github.com/apache/spark/commit/9966c2abc821492d5f5c6c74034407879c764573).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99805 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99805/testReport)** for PR 23207 at commit [`6378a3d`](https://github.com/apache/spark/commit/6378a3d4707b0d7559fca20220229cde71f9a64b).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239677846
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala ---
    @@ -78,6 +80,7 @@ object SQLMetrics {
       private val SUM_METRIC = "sum"
       private val SIZE_METRIC = "size"
       private val TIMING_METRIC = "timing"
    +  private val NS_TIMING_METRIC = "nanosecond"
    --- End diff --
    
    Can we change it to `ms`? The core side can still be `ns`, but in SQL side we truncate it into `ms`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239308082
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -38,12 +38,18 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
       override def outputPartitioning: Partitioning = SinglePartition
       override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
       private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
    -  override lazy val metrics = SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
    +  private val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
    --- End diff --
    
    why is metrics lazy val and this one val?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239748512
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.shuffle
    +
    +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.scheduler.MapStatus
    +
    +/**
    + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor
    + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask.
    + */
    +private[spark] class ShuffleWriteProcessor extends Serializable with Logging {
    +
    +  /**
    +   * Create a [[ShuffleWriteMetricsReporter]] from the task context, always return a proxy
    +   * reporter for both local accumulator and original reporter updating. As the reporter is a
    --- End diff --
    
    Ah, I think I know your meaning, yea, after we passing context, more things can be done in this interface, I'll delete this comment.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5730/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    @AmplabJenkins test this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    ```
    can you separate the prs to rename read side metric and the write side change?
    ```
    No problem, next commit will revert the changes of rename read side.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r238633725
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
    @@ -92,6 +92,12 @@ private[spark] class ShuffleMapTask(
           threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
         } else 0L
     
    +    // Register the shuffle write metrics reporter to shuffleWriteMetrics.
    +    if (dep.shuffleWriteMetricsReporter.isDefined) {
    +      context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter(
    --- End diff --
    
    a simpler idea:
    1. create a `class GroupedShuffleWriteMetricsReporter(reporters: Seq[ShuffleWriteMetricsReporter]) extends ShuffleWriteMetricsReporter`, which proxy all the metrics updating to the input reporters.
    2. create a `GroupedShuffleWriteMetricsReporter` instance here: `new GroupedShuffleWriteMetricsReporter(Seq(dep.shuffleWriteMetricsReporter.get, context.taskMetrics().shuffleWriteMetrics))`, and pass it to `manager.getWriter`
    
    I think we can use the same approach for read metrics as well.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239311141
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -38,12 +38,18 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
       override def outputPartitioning: Partitioning = SinglePartition
       override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
       private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
    -  override lazy val metrics = SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
    +  private val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
    --- End diff --
    
    Both should be private lazy val(also newly added readMetrics), I'll change them.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99752/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5842/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239735814
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala ---
    @@ -78,6 +80,7 @@ object SQLMetrics {
       private val SUM_METRIC = "sum"
       private val SIZE_METRIC = "size"
       private val TIMING_METRIC = "timing"
    +  private val NORMALIZE_TIMING_METRIC = "normalizeTiming"
    --- End diff --
    
    Actually I think your previous naming is good, sorry for the back and forth.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99676 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99676/testReport)** for PR 23207 at commit [`ca6c407`](https://github.com/apache/spark/commit/ca6c407929e62492a2c5233504efaeaf731f8cc9).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239311018
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -38,12 +38,18 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
       override def outputPartitioning: Partitioning = SinglePartition
       override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
       private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
    -  override lazy val metrics = SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
    +  private val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
    +  override lazy val metrics =
    --- End diff --
    
    Thanks, make sense, I'll change to separate both read/write metrics and pass them.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    ```
    Can we put the above in a closure and pass it into shuffle dependency? Then in SQL we just put the above in SQL using custom metrics.
    ```
    Yea, the commit of a780b70 achieve this by adding `ShuffleWriteProcessor` abstract.
    And the read metrics rename reverted in 7d104eb, will do it and display change in another pr.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239990986
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -38,13 +38,21 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
       override def outputPartitioning: Partitioning = SinglePartition
       override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
       private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
    -  override lazy val metrics = SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
    +  private lazy val writeMetrics =
    +    SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
    +  private lazy val readMetrics =
    +    SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
    --- End diff --
    
    I feel it is better to rename SQLShuffleMetricsReporter to SQLShuffleReadMetricsReporter to make it match with SQLShuffleWriteMetricsReporter. It can be in a followup.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99825/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239698273
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala ---
    @@ -333,8 +343,19 @@ object ShuffleExchangeExec {
           new ShuffleDependency[Int, InternalRow, InternalRow](
             rddWithPartitionIds,
             new PartitionIdPassthrough(part.numPartitions),
    -        serializer)
    +        serializer,
    +        shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics))
     
         dependency
       }
    +
    +  /**
    +   * Create a customized [[ShuffleWriteProcessor]] for SQL which wrap the default metrics reporter
    +   * with [[SQLShuffleWriteMetricsReporter]] as new reporter for [[ShuffleWriteProcessor]].
    +   */
    +  def createShuffleWriteProcessor(metrics: Map[String, SQLMetric]): ShuffleWriteProcessor = {
    +    (reporter: ShuffleWriteMetricsReporter) => {
    --- End diff --
    
    Yes it can't work with Scala 2.11, should write in more readable, done in 6378a3d.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5671/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239049030
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala ---
    @@ -50,3 +50,57 @@ private[spark] trait ShuffleWriteMetricsReporter {
       private[spark] def decBytesWritten(v: Long): Unit
       private[spark] def decRecordsWritten(v: Long): Unit
     }
    +
    +
    +/**
    + * A proxy class of ShuffleWriteMetricsReporter which proxy all metrics updating to the input
    + * reporters.
    + */
    +private[spark] class GroupedShuffleWriteMetricsReporter(
    +    reporters: Seq[ShuffleWriteMetricsReporter]) extends ShuffleWriteMetricsReporter {
    +  override private[spark] def incBytesWritten(v: Long): Unit = {
    +    reporters.foreach(_.incBytesWritten(v))
    +  }
    +  override private[spark] def decRecordsWritten(v: Long): Unit = {
    +    reporters.foreach(_.decRecordsWritten(v))
    +  }
    +  override private[spark] def incRecordsWritten(v: Long): Unit = {
    +    reporters.foreach(_.incRecordsWritten(v))
    +  }
    +  override private[spark] def incWriteTime(v: Long): Unit = {
    +    reporters.foreach(_.incWriteTime(v))
    +  }
    +  override private[spark] def decBytesWritten(v: Long): Unit = {
    +    reporters.foreach(_.decBytesWritten(v))
    +  }
    +}
    +
    +
    +/**
    + * A proxy class of ShuffleReadMetricsReporter which proxy all metrics updating to the input
    + * reporters.
    + */
    +private[spark] class GroupedShuffleReadMetricsReporter(
    --- End diff --
    
    Got it, thanks for your guidance, revert to old approach and just little changes for `SQLShuffleReadMetricsReporter` which followed https://github.com/apache/spark/pull/23147.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    @SparkQA test this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239677477
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.shuffle
    +
    +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.scheduler.MapStatus
    +
    +/**
    + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor
    + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask.
    + */
    +private[spark] trait ShuffleWriteProcessor extends Serializable with Logging {
    +
    +  /**
    +   * Create a [[ShuffleWriteMetricsReporter]] from the default reporter, always return a proxy
    +   * reporter for both local accumulator and original reporter updating. As the reporter is a
    +   * per-row operator, here need a careful consideration on performance.
    +   */
    +  def createMetricsReporter(reporter: ShuffleWriteMetricsReporter): ShuffleWriteMetricsReporter
    --- End diff --
    
    after it, we can just make `ShuffleWriteProcessor` a class


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239059162
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala ---
    @@ -163,6 +171,8 @@ object SQLMetrics {
             Utils.bytesToString
           } else if (metricsType == TIMING_METRIC) {
             Utils.msDurationToString
    +      } else if (metricsType == NS_TIMING_METRIC) {
    +        duration => Utils.msDurationToString(duration / 1000 / 1000)
    --- End diff --
    
    will this string lose the nanosecond precision?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239049398
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala ---
    @@ -163,6 +171,8 @@ object SQLMetrics {
             Utils.bytesToString
           } else if (metricsType == TIMING_METRIC) {
             Utils.msDurationToString
    +      } else if (metricsType == NANO_TIMING_METRIC) {
    +        duration => Utils.msDurationToString(duration / 100000)
    --- End diff --
    
    Sorry...Sorry for this, change it to `1000 / 1000` as other place do for safety.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99805/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239311564
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala ---
    @@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter {
         FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"),
         RECORDS_READ -> SQLMetrics.createMetric(sc, "records read"))
     }
    +
    +/**
    + * A shuffle write metrics reporter for SQL exchange operators. Different with
    + * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in
    + * shuffle dependency, so the local SQLMetric should transient and create on executor.
    + * @param metrics Shuffle write metrics in current SparkPlan.
    + * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter.
    + */
    +private[spark] case class SQLShuffleWriteMetricsReporter(
    +    metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter)
    --- End diff --
    
    As our discussion here https://github.com/apache/spark/pull/23207#discussion_r238909822, The latest approach choose to carry a function of (reporter => reporter)  in shuffle dependency to create SQLShuffleWriteMetrics in ShuffleMapTask.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Can you share some ideas about it? IMO shuffle write metrics is hard, as an RDD can have shuffle dependencies with multiple upstream RDDs. That said, in general the shuffle write metrics should belong to the upstream RDDs.
    
    In Spark SQL, it's a little simpler, as the `ShuffledRowRDD` always have only one child, so it's reasonable to say that shuffle write metrics belong to `ShuffledRowRDD`.
    
    That said, we need to design a not-so-general shuffle write metrics API in Spark core, which will only be used in Spark SQL.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239054315
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -299,12 +312,25 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
           val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
           val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key", "value")
           // Assume the execution plan is
    -      // ... -> ShuffledHashJoin(nodeId = 1) -> Project(nodeId = 0)
    +      // Project(nodeId = 0)
    +      // +- ShuffledHashJoin(nodeId = 1)
    +      // :- Exchange(nodeId = 2)
    +      // :  +- Project(nodeId = 3)
    +      // :     +- LocalTableScan(nodeId = 4)
    +      // +- Exchange(nodeId = 5)
    +      // +- Project(nodeId = 6)
    +      // +- LocalTableScan(nodeId = 7)
           val df = df1.join(df2, "key")
           testSparkPlanMetrics(df, 1, Map(
             1L -> (("ShuffledHashJoin", Map(
               "number of output rows" -> 2L,
    -          "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))))
    +          "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))),
    +        2L -> (("Exchange", Map(
    +          "shuffle records written" -> 2L,
    +          "records read" -> 2L))),
    --- End diff --
    
    For most scenario the answer is yes, but like sort merge join cases, 2 sort node reuse same child will make shuffle records written/records read different, I also add cases in here:
    
    https://github.com/xuanyuanking/spark/blob/SPARK-26193/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala#L217-L222


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99722/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5776/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239048356
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala ---
    @@ -50,3 +50,57 @@ private[spark] trait ShuffleWriteMetricsReporter {
       private[spark] def decBytesWritten(v: Long): Unit
       private[spark] def decRecordsWritten(v: Long): Unit
     }
    +
    +
    +/**
    + * A proxy class of ShuffleWriteMetricsReporter which proxy all metrics updating to the input
    + * reporters.
    + */
    +private[spark] class GroupedShuffleWriteMetricsReporter(
    --- End diff --
    
    Thanks for your guidance Reynold and Wenchen, I choose the second implementation, it takes account of both less heavy option and similar use patten as `SQLShuffleReadMetricsReporter`. Done in cf35b9f.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99643 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99643/testReport)** for PR 23207 at commit [`7c8e516`](https://github.com/apache/spark/commit/7c8e5161904f1fd0fa4d99e6c497ef1be3542bdb).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r238845029
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -170,13 +172,23 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
         val df = testData2.groupBy().agg(collect_set('a)) // 2 partitions
         testSparkPlanMetrics(df, 1, Map(
           2L -> (("ObjectHashAggregate", Map("number of output rows" -> 2L))),
    +      1L -> (("Exchange", Map(
    +        "shuffle records written" -> 2L,
    +        "records read" -> 2L,
    +        "local blocks fetched" -> 2L,
    --- End diff --
    
    i think we should be consistent and name these "read", rather than "fetch".



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r238909822
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala ---
    @@ -50,3 +50,57 @@ private[spark] trait ShuffleWriteMetricsReporter {
       private[spark] def decBytesWritten(v: Long): Unit
       private[spark] def decRecordsWritten(v: Long): Unit
     }
    +
    +
    +/**
    + * A proxy class of ShuffleWriteMetricsReporter which proxy all metrics updating to the input
    + * reporters.
    + */
    +private[spark] class GroupedShuffleWriteMetricsReporter(
    --- End diff --
    
    For the write metrics, it's different. It's the default one calls the SQL one, which needs to hack the default one to register external reporters.
    
    Maybe we should not change the read side, just create a special `PairShuffleWriteMetricsReporter` to update both the SQL reporter and default reporter.
    
    Another idea is, `ShuffleDependency` carries a `reporter => reporter` function, instead of a reporter. Then we can create a SQL reporter which takes another reporter(similar to read side), and put the SQL reporter's constructor in `ShuffleDependency`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99616 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99616/testReport)** for PR 23207 at commit [`7c8e516`](https://github.com/apache/spark/commit/7c8e5161904f1fd0fa4d99e6c497ef1be3542bdb).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5854/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239743452
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.shuffle
    +
    +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.scheduler.MapStatus
    +
    +/**
    + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor
    + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask.
    + */
    +private[spark] class ShuffleWriteProcessor extends Serializable with Logging {
    +
    +  /**
    +   * Create a [[ShuffleWriteMetricsReporter]] from the task context, always return a proxy
    +   * reporter for both local accumulator and original reporter updating. As the reporter is a
    --- End diff --
    
    Not stale, maybe I didn't express clearly, here I want to express is we always return a proxy reporter like currently SQLShuffleWriteReporter, it's not only update self metrics(local accumulator) but also the exists reporter passing in(like metrics in context).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99817/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239734920
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.shuffle
    +
    +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.scheduler.MapStatus
    +
    +/**
    + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor
    + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask.
    + */
    +private[spark] class ShuffleWriteProcessor extends Serializable with Logging {
    +
    +  /**
    +   * Create a [[ShuffleWriteMetricsReporter]] from the task context, always return a proxy
    +   * reporter for both local accumulator and original reporter updating. As the reporter is a
    --- End diff --
    
    `always return a proxy reporter for both local accumulator and original reporter updating`
    
    is it stale?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r238842276
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala ---
    @@ -78,6 +78,7 @@ object SQLMetrics {
       private val SUM_METRIC = "sum"
       private val SIZE_METRIC = "size"
       private val TIMING_METRIC = "timing"
    +  private val NANO_TIMING_METRIC = "nanosecond"
    --- End diff --
    
    ns


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    **[Test build #99736 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99736/testReport)** for PR 23207 at commit [`76d1ca0`](https://github.com/apache/spark/commit/76d1ca0036bbb50a005e9d12f8b22bf21697af7f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99616/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239069014
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala ---
    @@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter {
         FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"),
         RECORDS_READ -> SQLMetrics.createMetric(sc, "records read"))
     }
    +
    +/**
    + * A shuffle write metrics reporter for SQL exchange operators. Different with
    + * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in
    + * shuffle dependency, so the local SQLMetric should transient and create on executor.
    + * @param metrics Shuffle write metrics in current SparkPlan.
    + * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter.
    + */
    +private[spark] case class SQLShuffleWriteMetricsReporter(
    +    metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter)
    +  extends ShuffleWriteMetricsReporter with Serializable {
    +  @transient private[this] lazy val _bytesWritten =
    +    metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN)
    +  @transient private[this] lazy val _recordsWritten =
    +    metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN)
    +  @transient private[this] lazy val _writeTime =
    +    metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)
    +
    +  override private[spark] def incBytesWritten(v: Long): Unit = {
    +    metricsReporter.incBytesWritten(v)
    +    _bytesWritten.add(v)
    +  }
    +  override private[spark] def decRecordsWritten(v: Long): Unit = {
    +    metricsReporter.decBytesWritten(v)
    +    _recordsWritten.set(_recordsWritten.value - v)
    +  }
    +  override private[spark] def incRecordsWritten(v: Long): Unit = {
    +    metricsReporter.incRecordsWritten(v)
    +    _recordsWritten.add(v)
    +  }
    +  override private[spark] def incWriteTime(v: Long): Unit = {
    +    metricsReporter.incWriteTime(v)
    +    _writeTime.add(v)
    +  }
    +  override private[spark] def decBytesWritten(v: Long): Unit = {
    +    metricsReporter.decBytesWritten(v)
    +    _bytesWritten.set(_bytesWritten.value - v)
    +  }
    +}
    +
    +private[spark] object SQLShuffleWriteMetricsReporter {
    +  val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten"
    +  val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten"
    +  val SHUFFLE_WRITE_TIME = "shuffleWriteTime"
    --- End diff --
    
    Just this shuffle write time in this PR. The left one of time metrics is `fetch wait time`, it's in ms set in `ShuffleBlockFetcherIterator`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239090244
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala ---
    @@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter {
         FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"),
         RECORDS_READ -> SQLMetrics.createMetric(sc, "records read"))
     }
    +
    +/**
    + * A shuffle write metrics reporter for SQL exchange operators. Different with
    + * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in
    + * shuffle dependency, so the local SQLMetric should transient and create on executor.
    + * @param metrics Shuffle write metrics in current SparkPlan.
    + * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter.
    + */
    +private[spark] case class SQLShuffleWriteMetricsReporter(
    +    metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter)
    +  extends ShuffleWriteMetricsReporter with Serializable {
    +  @transient private[this] lazy val _bytesWritten =
    +    metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN)
    +  @transient private[this] lazy val _recordsWritten =
    +    metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN)
    +  @transient private[this] lazy val _writeTime =
    +    metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)
    +
    +  override private[spark] def incBytesWritten(v: Long): Unit = {
    +    metricsReporter.incBytesWritten(v)
    +    _bytesWritten.add(v)
    +  }
    +  override private[spark] def decRecordsWritten(v: Long): Unit = {
    +    metricsReporter.decBytesWritten(v)
    +    _recordsWritten.set(_recordsWritten.value - v)
    +  }
    +  override private[spark] def incRecordsWritten(v: Long): Unit = {
    +    metricsReporter.incRecordsWritten(v)
    +    _recordsWritten.add(v)
    +  }
    +  override private[spark] def incWriteTime(v: Long): Unit = {
    +    metricsReporter.incWriteTime(v)
    +    _writeTime.add(v)
    +  }
    +  override private[spark] def decBytesWritten(v: Long): Unit = {
    +    metricsReporter.decBytesWritten(v)
    +    _bytesWritten.set(_bytesWritten.value - v)
    +  }
    +}
    +
    +private[spark] object SQLShuffleWriteMetricsReporter {
    +  val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten"
    +  val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten"
    +  val SHUFFLE_WRITE_TIME = "shuffleWriteTime"
    --- End diff --
    
    cc @rxin , do you think we should change this metric to use ms as well? In all the places that read/write it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23207
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org