You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by ala <gi...@git.apache.org> on 2017/02/16 17:11:43 UTC

[GitHub] spark pull request #16960: [SPARK-19447] Make Range operator generate "recor...

GitHub user ala opened a pull request:

    https://github.com/apache/spark/pull/16960

    [SPARK-19447] Make Range operator generate "recordsRead" metric

    ## What changes were proposed in this pull request?
    
    The Range was modified to produce "recordsRead" metric instead of "generated rows". The tests were updated and partially moved to SQLMetricsSuite.
    
    ## How was this patch tested?
    
    Unit tests.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ala/spark range-records-read

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/16960.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #16960
    
----
commit 10a53a783682a3d8966a8ba6bb255aadcb9dc87d
Author: Ala Luszczak <al...@databricks.com>
Date:   2017-02-15T14:52:23Z

    Using recordsRead instead of generated rows.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16960: [SPARK-19447] Make Range operator generate "recordsRead"...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on the issue:

    https://github.com/apache/spark/pull/16960
  
    LGTM - pending jenkins.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16960: [SPARK-19447] Make Range operator generate "recor...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16960#discussion_r101575264
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -309,4 +314,84 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
         assert(metricInfoDeser.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
       }
     
    +  test("range metrics") {
    +    val res1 = InputOutputMetricsHelper.run(
    +      spark.range(30).filter(x => x % 3 == 0).toDF()
    +    )
    +    assert(res1 === (30L, 0L, 30L) :: Nil)
    +
    +    val res2 = InputOutputMetricsHelper.run(
    +      spark.range(150).repartition(4).filter(x => x < 10).toDF()
    +    )
    +    assert(res2 === (150L, 0L, 150L) :: (0L, 150L, 10L) :: Nil)
    +
    +    withTempDir { tempDir =>
    +      val dir = new File(tempDir, "pqS").getCanonicalPath
    +
    +      spark.range(10).write.parquet(dir)
    +      spark.read.parquet(dir).createOrReplaceTempView("pqS")
    +
    +      val res3 = InputOutputMetricsHelper.run(
    +        spark.range(0, 30).repartition(3).crossJoin(sql("select * from pqS")).repartition(2).toDF()
    +      )
    +      assert(res3 === (10L, 0L, 10L) :: (30L, 0L, 30L) :: (0L, 30L, 300L) :: (0L, 300L, 0L) :: Nil)
    +    }
    +  }
    +}
    +
    +object InputOutputMetricsHelper {
    +   private class InputOutputMetricsListener extends SparkListener {
    +    private case class MetricsResult(
    +        var recordsRead: Long = 0L,
    +        var shuffleRecordsRead: Long = 0L,
    +        var sumMaxOutputRows: Long = 0L)
    +
    +    private[this] var stageIdToMetricsResult = HashMap.empty[Int, MetricsResult]
    +
    +    def reset(): Unit = {
    +      stageIdToMetricsResult = HashMap.empty[Int, MetricsResult]
    +    }
    +
    +    def getResults(): List[(Long, Long, Long)] = {
    --- End diff --
    
    here too long long long


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16960: [SPARK-19447] Make Range operator generate "recordsRead"...

Posted by jaceklaskowski <gi...@git.apache.org>.
Github user jaceklaskowski commented on the issue:

    https://github.com/apache/spark/pull/16960
  
    I'll have a look at this this week and send a PR unless you beat me to it :) Thanks @ala!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16960: [SPARK-19447] Make Range operator generate "recor...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16960#discussion_r101784563
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -309,4 +314,94 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
         assert(metricInfoDeser.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
       }
     
    +  test("range metrics") {
    +    val res1 = InputOutputMetricsHelper.run(
    +      spark.range(30).filter(x => x % 3 == 0).toDF()
    +    )
    +    assert(res1 === (30L, 0L, 30L) :: Nil)
    +
    +    val res2 = InputOutputMetricsHelper.run(
    +      spark.range(150).repartition(4).filter(x => x < 10).toDF()
    +    )
    +    assert(res2 === (150L, 0L, 150L) :: (0L, 150L, 10L) :: Nil)
    +
    +    withTempDir { tempDir =>
    +      val dir = new File(tempDir, "pqS").getCanonicalPath
    +
    +      spark.range(10).write.parquet(dir)
    +      spark.read.parquet(dir).createOrReplaceTempView("pqS")
    +
    +      val res3 = InputOutputMetricsHelper.run(
    +        spark.range(0, 30).repartition(3).crossJoin(sql("select * from pqS")).repartition(2).toDF()
    --- End diff --
    
    This is hard to reason about. Could you add a few lines of documentation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16960: [SPARK-19447] Make Range operator generate "recordsRead"...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16960
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16960: [SPARK-19447] Make Range operator generate "recordsRead"...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16960
  
    **[Test build #73057 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73057/testReport)** for PR 16960 at commit [`70fe843`](https://github.com/apache/spark/commit/70fe8431410b27aa454ef0894f11adca8a008ea1).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16960: [SPARK-19447] Make Range operator generate "recor...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16960#discussion_r101783584
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -309,4 +314,94 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
         assert(metricInfoDeser.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
       }
     
    +  test("range metrics") {
    +    val res1 = InputOutputMetricsHelper.run(
    +      spark.range(30).filter(x => x % 3 == 0).toDF()
    +    )
    +    assert(res1 === (30L, 0L, 30L) :: Nil)
    +
    +    val res2 = InputOutputMetricsHelper.run(
    +      spark.range(150).repartition(4).filter(x => x < 10).toDF()
    +    )
    +    assert(res2 === (150L, 0L, 150L) :: (0L, 150L, 10L) :: Nil)
    +
    +    withTempDir { tempDir =>
    +      val dir = new File(tempDir, "pqS").getCanonicalPath
    +
    +      spark.range(10).write.parquet(dir)
    +      spark.read.parquet(dir).createOrReplaceTempView("pqS")
    +
    +      val res3 = InputOutputMetricsHelper.run(
    +        spark.range(0, 30).repartition(3).crossJoin(sql("select * from pqS")).repartition(2).toDF()
    +      )
    +      assert(res3 === (10L, 0L, 10L) :: (30L, 0L, 30L) :: (0L, 30L, 300L) :: (0L, 300L, 0L) :: Nil)
    +    }
    +  }
    +}
    +
    +object InputOutputMetricsHelper {
    +   private class InputOutputMetricsListener extends SparkListener {
    +    private case class MetricsResult(
    +        var recordsRead: Long = 0L,
    +        var shuffleRecordsRead: Long = 0L,
    +        var sumMaxOutputRows: Long = 0L)
    +
    +    private[this] var stageIdToMetricsResult = HashMap.empty[Int, MetricsResult]
    +
    +    def reset(): Unit = {
    +      stageIdToMetricsResult = HashMap.empty[Int, MetricsResult]
    +    }
    +
    +    /**
    +     * Return a list of recorded metrics aggregated per stage.
    +     *
    +     * The list is sorted in the ascending order on the stageId.
    +     * For each recorded stage, the following tuple is returned:
    +     *  - sum of inputMetrics.recordsRead for all the tasks in the stage
    +     *  - sum of shuffleReadMetrics.recordsRead for all the tasks in the stage
    +     *  - sum of the highest values of "number of output rows" metric for all the tasks in the stage
    +     */
    +    def getResults(): List[(Long, Long, Long)] = {
    +      stageIdToMetricsResult.keySet.toList.sorted.map({ stageId =>
    +        val res = stageIdToMetricsResult(stageId)
    +        (res.recordsRead, res.shuffleRecordsRead, res.sumMaxOutputRows)})
    +    }
    +
    +    override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
    +      val res = stageIdToMetricsResult.getOrElseUpdate(taskEnd.stageId, { MetricsResult() })
    +
    +      res.recordsRead += taskEnd.taskMetrics.inputMetrics.recordsRead
    +      res.shuffleRecordsRead += taskEnd.taskMetrics.shuffleReadMetrics.recordsRead
    +
    +      var maxOutputRows = 0L
    +      for (accum <- taskEnd.taskMetrics.externalAccums) {
    +        val info = accum.toInfo(Some(accum.value), None)
    +        if (info.name.toString.contains("number of output rows")) {
    +          info.update match {
    +            case Some(n: Number) =>
    +              if (n.longValue() > maxOutputRows) {
    +                maxOutputRows = n.longValue()
    +              }
    +            case _ => // Ignore.
    +          }
    +        }
    +      }
    +      res.sumMaxOutputRows += maxOutputRows
    +    }
    +  }
    +
    +  // Run df.collect() and return aggregated metrics for each stage.
    +  def run(df: DataFrame): List[(Long, Long, Long)] = {
    +    val spark = df.sparkSession
    +    val sparkContext = spark.sparkContext
    +    val listener = new InputOutputMetricsListener()
    --- End diff --
    
    Use try...finally here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16960: [SPARK-19447] Make Range operator generate "recordsRead"...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16960
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73004/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16960: [SPARK-19447] Make Range operator generate "recordsRead"...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16960
  
    **[Test build #73057 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73057/testReport)** for PR 16960 at commit [`70fe843`](https://github.com/apache/spark/commit/70fe8431410b27aa454ef0894f11adca8a008ea1).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16960: [SPARK-19447] Make Range operator generate "recordsRead"...

Posted by ala <gi...@git.apache.org>.
Github user ala commented on the issue:

    https://github.com/apache/spark/pull/16960
  
    Thanks @jaceklaskowski - it's already done: https://github.com/apache/spark/pull/17939


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16960: [SPARK-19447] Make Range operator generate "recordsRead"...

Posted by jaceklaskowski <gi...@git.apache.org>.
Github user jaceklaskowski commented on the issue:

    https://github.com/apache/spark/pull/16960
  
    I think that the commit has left [numGeneratedRows](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala#L344) metrics off, hasn't it? (it was added in #16829)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16960: [SPARK-19447] Make Range operator generate "recordsRead"...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16960
  
    **[Test build #73005 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73005/testReport)** for PR 16960 at commit [`088556b`](https://github.com/apache/spark/commit/088556b0ee1b5c33c88b4ccddcae11b2eb18660f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16960: [SPARK-19447] Make Range operator generate "recor...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16960#discussion_r101782666
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -309,4 +314,94 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
         assert(metricInfoDeser.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
       }
     
    +  test("range metrics") {
    +    val res1 = InputOutputMetricsHelper.run(
    +      spark.range(30).filter(x => x % 3 == 0).toDF()
    +    )
    +    assert(res1 === (30L, 0L, 30L) :: Nil)
    +
    +    val res2 = InputOutputMetricsHelper.run(
    +      spark.range(150).repartition(4).filter(x => x < 10).toDF()
    +    )
    +    assert(res2 === (150L, 0L, 150L) :: (0L, 150L, 10L) :: Nil)
    +
    +    withTempDir { tempDir =>
    +      val dir = new File(tempDir, "pqS").getCanonicalPath
    +
    +      spark.range(10).write.parquet(dir)
    +      spark.read.parquet(dir).createOrReplaceTempView("pqS")
    +
    +      val res3 = InputOutputMetricsHelper.run(
    +        spark.range(0, 30).repartition(3).crossJoin(sql("select * from pqS")).repartition(2).toDF()
    +      )
    +      assert(res3 === (10L, 0L, 10L) :: (30L, 0L, 30L) :: (0L, 30L, 300L) :: (0L, 300L, 0L) :: Nil)
    +    }
    +  }
    +}
    +
    +object InputOutputMetricsHelper {
    +   private class InputOutputMetricsListener extends SparkListener {
    +    private case class MetricsResult(
    --- End diff --
    
    Nit: add space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16960: [SPARK-19447] Make Range operator generate "recordsRead"...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the issue:

    https://github.com/apache/spark/pull/16960
  
    Merging in master.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16960: [SPARK-19447] Make Range operator generate "recordsRead"...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16960
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73005/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16960: [SPARK-19447] Make Range operator generate "recordsRead"...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16960
  
    **[Test build #73005 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73005/testReport)** for PR 16960 at commit [`088556b`](https://github.com/apache/spark/commit/088556b0ee1b5c33c88b4ccddcae11b2eb18660f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16960: [SPARK-19447] Make Range operator generate "recordsRead"...

Posted by ala <gi...@git.apache.org>.
Github user ala commented on the issue:

    https://github.com/apache/spark/pull/16960
  
    True. There's a couple of lines that should be removed with this change, that were left behind. numGeneratedRows should be gone.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16960: [SPARK-19447] Make Range operator generate "recordsRead"...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16960
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73057/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16960: [SPARK-19447] Make Range operator generate "recordsRead"...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16960
  
    **[Test build #73004 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73004/testReport)** for PR 16960 at commit [`10a53a7`](https://github.com/apache/spark/commit/10a53a783682a3d8966a8ba6bb255aadcb9dc87d).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16960: [SPARK-19447] Make Range operator generate "recor...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16960#discussion_r101782872
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -309,4 +314,94 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
         assert(metricInfoDeser.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
       }
     
    +  test("range metrics") {
    +    val res1 = InputOutputMetricsHelper.run(
    +      spark.range(30).filter(x => x % 3 == 0).toDF()
    +    )
    +    assert(res1 === (30L, 0L, 30L) :: Nil)
    +
    +    val res2 = InputOutputMetricsHelper.run(
    +      spark.range(150).repartition(4).filter(x => x < 10).toDF()
    +    )
    +    assert(res2 === (150L, 0L, 150L) :: (0L, 150L, 10L) :: Nil)
    +
    +    withTempDir { tempDir =>
    +      val dir = new File(tempDir, "pqS").getCanonicalPath
    +
    +      spark.range(10).write.parquet(dir)
    +      spark.read.parquet(dir).createOrReplaceTempView("pqS")
    +
    +      val res3 = InputOutputMetricsHelper.run(
    +        spark.range(0, 30).repartition(3).crossJoin(sql("select * from pqS")).repartition(2).toDF()
    +      )
    +      assert(res3 === (10L, 0L, 10L) :: (30L, 0L, 30L) :: (0L, 30L, 300L) :: (0L, 300L, 0L) :: Nil)
    +    }
    +  }
    +}
    +
    +object InputOutputMetricsHelper {
    +   private class InputOutputMetricsListener extends SparkListener {
    +    private case class MetricsResult(
    +        var recordsRead: Long = 0L,
    +        var shuffleRecordsRead: Long = 0L,
    +        var sumMaxOutputRows: Long = 0L)
    +
    +    private[this] var stageIdToMetricsResult = HashMap.empty[Int, MetricsResult]
    --- End diff --
    
    Make this val.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16960: [SPARK-19447] Make Range operator generate "recor...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16960#discussion_r101783889
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -309,4 +314,94 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
         assert(metricInfoDeser.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
       }
     
    +  test("range metrics") {
    +    val res1 = InputOutputMetricsHelper.run(
    +      spark.range(30).filter(x => x % 3 == 0).toDF()
    +    )
    +    assert(res1 === (30L, 0L, 30L) :: Nil)
    +
    +    val res2 = InputOutputMetricsHelper.run(
    +      spark.range(150).repartition(4).filter(x => x < 10).toDF()
    +    )
    +    assert(res2 === (150L, 0L, 150L) :: (0L, 150L, 10L) :: Nil)
    +
    +    withTempDir { tempDir =>
    +      val dir = new File(tempDir, "pqS").getCanonicalPath
    +
    +      spark.range(10).write.parquet(dir)
    +      spark.read.parquet(dir).createOrReplaceTempView("pqS")
    +
    +      val res3 = InputOutputMetricsHelper.run(
    +        spark.range(0, 30).repartition(3).crossJoin(sql("select * from pqS")).repartition(2).toDF()
    +      )
    +      assert(res3 === (10L, 0L, 10L) :: (30L, 0L, 30L) :: (0L, 30L, 300L) :: (0L, 300L, 0L) :: Nil)
    +    }
    +  }
    +}
    +
    +object InputOutputMetricsHelper {
    +   private class InputOutputMetricsListener extends SparkListener {
    +    private case class MetricsResult(
    +        var recordsRead: Long = 0L,
    +        var shuffleRecordsRead: Long = 0L,
    +        var sumMaxOutputRows: Long = 0L)
    +
    +    private[this] var stageIdToMetricsResult = HashMap.empty[Int, MetricsResult]
    +
    +    def reset(): Unit = {
    +      stageIdToMetricsResult = HashMap.empty[Int, MetricsResult]
    +    }
    +
    +    /**
    +     * Return a list of recorded metrics aggregated per stage.
    +     *
    +     * The list is sorted in the ascending order on the stageId.
    +     * For each recorded stage, the following tuple is returned:
    +     *  - sum of inputMetrics.recordsRead for all the tasks in the stage
    +     *  - sum of shuffleReadMetrics.recordsRead for all the tasks in the stage
    +     *  - sum of the highest values of "number of output rows" metric for all the tasks in the stage
    +     */
    +    def getResults(): List[(Long, Long, Long)] = {
    +      stageIdToMetricsResult.keySet.toList.sorted.map({ stageId =>
    +        val res = stageIdToMetricsResult(stageId)
    +        (res.recordsRead, res.shuffleRecordsRead, res.sumMaxOutputRows)})
    +    }
    +
    +    override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
    +      val res = stageIdToMetricsResult.getOrElseUpdate(taskEnd.stageId, { MetricsResult() })
    --- End diff --
    
    Nit remove curly braces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16960: [SPARK-19447] Make Range operator generate "recordsRead"...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16960
  
    **[Test build #73004 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73004/testReport)** for PR 16960 at commit [`10a53a7`](https://github.com/apache/spark/commit/10a53a783682a3d8966a8ba6bb255aadcb9dc87d).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16960: [SPARK-19447] Make Range operator generate "recor...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16960#discussion_r101575199
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -309,4 +314,84 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
         assert(metricInfoDeser.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
       }
     
    +  test("range metrics") {
    +    val res1 = InputOutputMetricsHelper.run(
    +      spark.range(30).filter(x => x % 3 == 0).toDF()
    +    )
    +    assert(res1 === (30L, 0L, 30L) :: Nil)
    +
    +    val res2 = InputOutputMetricsHelper.run(
    +      spark.range(150).repartition(4).filter(x => x < 10).toDF()
    +    )
    +    assert(res2 === (150L, 0L, 150L) :: (0L, 150L, 10L) :: Nil)
    +
    +    withTempDir { tempDir =>
    +      val dir = new File(tempDir, "pqS").getCanonicalPath
    +
    +      spark.range(10).write.parquet(dir)
    +      spark.read.parquet(dir).createOrReplaceTempView("pqS")
    +
    +      val res3 = InputOutputMetricsHelper.run(
    +        spark.range(0, 30).repartition(3).crossJoin(sql("select * from pqS")).repartition(2).toDF()
    +      )
    +      assert(res3 === (10L, 0L, 10L) :: (30L, 0L, 30L) :: (0L, 30L, 300L) :: (0L, 300L, 0L) :: Nil)
    +    }
    +  }
    +}
    +
    +object InputOutputMetricsHelper {
    +   private class InputOutputMetricsListener extends SparkListener {
    +    private case class MetricsResult(
    +        var recordsRead: Long = 0L,
    +        var shuffleRecordsRead: Long = 0L,
    +        var sumMaxOutputRows: Long = 0L)
    +
    +    private[this] var stageIdToMetricsResult = HashMap.empty[Int, MetricsResult]
    +
    +    def reset(): Unit = {
    +      stageIdToMetricsResult = HashMap.empty[Int, MetricsResult]
    +    }
    +
    +    def getResults(): List[(Long, Long, Long)] = {
    +      stageIdToMetricsResult.keySet.toList.sorted.map({ stageId =>
    +        val res = stageIdToMetricsResult(stageId)
    +        (res.recordsRead, res.shuffleRecordsRead, res.sumMaxOutputRows)})
    +    }
    +
    +    override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
    +      val res = stageIdToMetricsResult.getOrElseUpdate(taskEnd.stageId, { MetricsResult() })
    +
    +      res.recordsRead += taskEnd.taskMetrics.inputMetrics.recordsRead
    +      res.shuffleRecordsRead += taskEnd.taskMetrics.shuffleReadMetrics.recordsRead
    +
    +      var maxOutputRows = 0L
    +      for (accum <- taskEnd.taskMetrics.externalAccums) {
    +        val info = accum.toInfo(Some(accum.value), None)
    +        if (info.name.toString.contains("number of output rows")) {
    +          info.update match {
    +            case Some(n: Number) =>
    +              if (n.longValue() > maxOutputRows) {
    +                maxOutputRows = n.longValue()
    +              }
    +            case _ => // Ignore.
    +          }
    +        }
    +      }
    +      res.sumMaxOutputRows += maxOutputRows
    +    }
    +  }
    +
    +  def run(df: DataFrame): List[(Long, Long, Long)] = {
    --- End diff --
    
    document what hte long long long are for?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16960: [SPARK-19447] Make Range operator generate "recor...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/16960


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16960: [SPARK-19447] Make Range operator generate "recordsRead"...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the issue:

    https://github.com/apache/spark/pull/16960
  
    cc @hvanhovell if you have a min to review this ...



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16960: [SPARK-19447] Make Range operator generate "recordsRead"...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16960
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16960: [SPARK-19447] Make Range operator generate "recordsRead"...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16960
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org