You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by sryza <gi...@git.apache.org> on 2014/07/21 08:37:07 UTC

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

GitHub user sryza opened a pull request:

    https://github.com/apache/spark/pull/1507

    SPARK-2565. Update ShuffleReadMetrics as blocks are fetched

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sryza/spark sandy-spark-2565

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1507.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1507
    
----
commit 75730ddbb025f837bb635a345b1ecde15e0bb8e7
Author: Sandy Ryza <sa...@cloudera.com>
Date:   2014-07-18T00:17:19Z

    SPARK-2565. Update ShuffleReadMetrics as blocks are fetched

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15796149
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
    @@ -131,7 +122,9 @@ object BlockFetcherIterator {
                 val networkSize = blockMessage.getData.limit()
                 results.put(new FetchResult(blockId, sizeMap(blockId),
                   () => dataDeserialize(blockId, blockMessage.getData, serializer)))
    -            _remoteBytesRead += networkSize
    +            // TODO: race conditions can occur here with NettyBlockFetcherIterator
    --- End diff --
    
    Also this comment is pretty vague. It would be good if you could elaborate on it (what you described in the JIRA itself is good enough)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15796012
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
    @@ -131,7 +122,9 @@ object BlockFetcherIterator {
                 val networkSize = blockMessage.getData.limit()
                 results.put(new FetchResult(blockId, sizeMap(blockId),
                   () => dataDeserialize(blockId, blockMessage.getData, serializer)))
    -            _remoteBytesRead += networkSize
    +            // TODO: race conditions can occur here with NettyBlockFetcherIterator
    --- End diff --
    
    Could you add a reference to the JIRA to the comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-51396408
  
    QA tests have started for PR 1507. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18049/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15250406
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
    @@ -154,14 +147,15 @@ object BlockFetcherIterator {
           // Split local and remote blocks. Remote blocks are further split into FetchRequests of size
           // at most maxBytesInFlight in order to limit the amount of data in flight.
           val remoteRequests = new ArrayBuffer[FetchRequest]
    +      var totalBlocks = 0
           for ((address, blockInfos) <- blocksByAddress) {
    +        totalBlocks += blockInfos.size
             if (address == blockManagerId) {
    -          numLocal = blockInfos.size
    +          readMetrics.localBlocksFetched += blockInfos.size
    --- End diff --
    
    Maybe we don't care about this...but this results in the metrics reporting that local blocks have been fetched before they're actually read from disk.  Is it too annoying to move this to where the blocks actually get read?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by sryza <gi...@git.apache.org>.
Github user sryza commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-50976412
  
    Just tested this and observed the shuffle bytes read going up for in-progress tasks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15903066
  
    --- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala ---
    @@ -98,19 +105,31 @@ class TaskMetrics extends Serializable {
        */
       var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None
     
    -  /** Adds the given ShuffleReadMetrics to any existing shuffle metrics for this task. */
    -  def updateShuffleReadMetrics(newMetrics: ShuffleReadMetrics) = synchronized {
    -    _shuffleReadMetrics match {
    -      case Some(existingMetrics) =>
    -        existingMetrics.shuffleFinishTime = math.max(
    -          existingMetrics.shuffleFinishTime, newMetrics.shuffleFinishTime)
    -        existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
    -        existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
    -        existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched
    -        existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
    -      case None =>
    -        _shuffleReadMetrics = Some(newMetrics)
    +  /**
    +   * A task may have multiple shuffle readers for multiple dependencies. To avoid synchronization
    +   * issues from readers in different threads, in-progress tasks use a ShuffleReadMetrics for each
    +   * dependency, and merge these metrics before reporting them to the driver. This method returns
    +   * a ShuffleReadMetrics for a dependency and registers it for merging later.
    +   */
    +  def createShuffleReadMetricsForDependency(): ShuffleReadMetrics = synchronized {
    +    val readMetrics = new ShuffleReadMetrics()
    +    depsShuffleReadMetrics += readMetrics
    +    readMetrics
    +  }
    +
    +  /**
    +   * Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics.
    +   */
    +  def mergeShuffleReadMetrics() = synchronized {
    --- End diff --
    
    can this be private[spark]?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by sryza <gi...@git.apache.org>.
Github user sryza commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15246919
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
    @@ -131,7 +122,9 @@ object BlockFetcherIterator {
                 val networkSize = blockMessage.getData.limit()
                 results.put(new FetchResult(blockId, sizeMap(blockId),
                   () => dataDeserialize(blockId, blockMessage.getData, serializer)))
    -            _remoteBytesRead += networkSize
    +            // TODO: race conditions can occur here with NettyBlockFetcherIterator
    --- End diff --
    
    Filed https://issues.apache.org/jira/browse/SPARK-2625
    
    In case it wasn't obvious, the race conditions aren't caused by this patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by sryza <gi...@git.apache.org>.
Github user sryza commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15246387
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
    @@ -191,7 +183,7 @@ object BlockFetcherIterator {
             }
           }
           logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " +
    -        (numLocal + numRemote) + " blocks")
    +        (_numBlocksToFetch + localBlocksToFetch.size) + " blocks")
    --- End diff --
    
    Ooh yup


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15905625
  
    --- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala ---
    @@ -73,11 +75,16 @@ class TaskMetrics extends Serializable {
       var inputMetrics: Option[InputMetrics] = None
     
       /**
    -   * If this task reads from shuffle output, metrics on getting shuffle data will be collected here
    +   * If this task reads from shuffle output, metrics on getting shuffle data will be collected here.
    +   * This includes read metrics aggregated over all the task's shuffle dependencies.
        */
    -  private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
    +  var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
    --- End diff --
    
    can we add a `private[spark]` setter for it and explain it should only be used when recreating objects from JSON? I find it weird to expose this as a var since now users should not ever modify this directly - if someone looked at this class that would be non obvious. So I'd say we keep the var private and we add a private[spark] setter and say in the doc it should only be used during JSON deserialization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-49789216
  
    Thanks Sandy!! Just a few more small things.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-49776893
  
    At a high level, this depends on one of your other patches (#1056?) to incrementally send updates right?  Is the idea that mergeShuffleReadMetrics will get called incrementally as the task runs, before sending partial results back to the driver?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-49575625
  
    QA tests have started for PR 1507. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16901/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15238942
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
    @@ -131,7 +122,9 @@ object BlockFetcherIterator {
                 val networkSize = blockMessage.getData.limit()
                 results.put(new FetchResult(blockId, sizeMap(blockId),
                   () => dataDeserialize(blockId, blockMessage.getData, serializer)))
    -            _remoteBytesRead += networkSize
    +            // TODO: race conditions can occur here with NettyBlockFetcherIterator
    --- End diff --
    
    Might be good to file a JIRA for metrics for the NettyBlockFetcherIterator more generally -- I noticed yesterday that the Netty version also doesn't report fetchWaitTime


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-51442968
  
    QA results for PR 1507:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18113/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15796079
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
    @@ -191,7 +184,7 @@ object BlockFetcherIterator {
             }
           }
           logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " +
    -        (numLocal + numRemote) + " blocks")
    +        totalBlocks + " blocks")
    --- End diff --
    
    Is this ever used other than for logging?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-49813539
  
    Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15907133
  
    --- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala ---
    @@ -73,11 +75,16 @@ class TaskMetrics extends Serializable {
       var inputMetrics: Option[InputMetrics] = None
     
       /**
    -   * If this task reads from shuffle output, metrics on getting shuffle data will be collected here
    +   * If this task reads from shuffle output, metrics on getting shuffle data will be collected here.
    +   * This includes read metrics aggregated over all the task's shuffle dependencies.
        */
    -  private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
    +  var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
    --- End diff --
    
    by "users" I mean internal spark code that instruments this. I think it is counter intuitive to make something a var (and to expose it as a var publicly) but actually there is no intended use where someone modifies this var. There is a corner case here where we deserialize an object form JSON - so I'd propose we create a very narrow interface to deal with that case - a setter with clear documentation. I really think object deserialization is a special case here... if something is mutable for the purpose of deserializing - IMO - that's quite different than making it mutable outright for normal program execution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by sryza <gi...@git.apache.org>.
Github user sryza commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15900998
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
    @@ -191,7 +184,7 @@ object BlockFetcherIterator {
             }
           }
           logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " +
    -        (numLocal + numRemote) + " blocks")
    +        totalBlocks + " blocks")
    --- End diff --
    
    Naw


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by sryza <gi...@git.apache.org>.
Github user sryza commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15716773
  
    --- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala ---
    @@ -73,11 +75,16 @@ class TaskMetrics extends Serializable {
       var inputMetrics: Option[InputMetrics] = None
     
       /**
    -   * If this task reads from shuffle output, metrics on getting shuffle data will be collected here
    +   * If this task reads from shuffle output, metrics on getting shuffle data will be collected here.
    +   * This includes read metrics aggregated over all the task's shuffle dependencies.
        */
    -  private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
    +  var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
    --- End diff --
    
    Making depsShuffleReadMetrics private.  shuffleReadMetrics still needs to be set inside of JsonProtocol, so making it private would either require updating it through createShuffleReadMetricsForDependency or adding a setter, both of which seem a little weird to me.  Any thoughts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15238649
  
    --- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala ---
    @@ -90,19 +94,18 @@ class TaskMetrics extends Serializable {
        */
       var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None
     
    -  /** Adds the given ShuffleReadMetrics to any existing shuffle metrics for this task. */
    -  def updateShuffleReadMetrics(newMetrics: ShuffleReadMetrics) = synchronized {
    -    _shuffleReadMetrics match {
    -      case Some(existingMetrics) =>
    -        existingMetrics.shuffleFinishTime = math.max(
    -          existingMetrics.shuffleFinishTime, newMetrics.shuffleFinishTime)
    -        existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
    -        existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
    -        existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched
    -        existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
    -      case None =>
    -        _shuffleReadMetrics = Some(newMetrics)
    +  def mergeShuffleReadMetrics() {
    +    val merged = new ShuffleReadMetrics()
    +    synchronized {
    +      for (depMetrics <- depsShuffleReadMetrics) {
    +        merged.fetchWaitTime += depMetrics.fetchWaitTime
    +        merged.localBlocksFetched += depMetrics.localBlocksFetched
    +        merged.remoteBlocksFetched += depMetrics.remoteBlocksFetched
    +        merged.remoteBytesRead += depMetrics.remoteBytesRead
    +        merged.shuffleFinishTime = math.max(merged.shuffleFinishTime, depMetrics.shuffleFinishTime)
    +      }
         }
    +    shuffleReadMetrics = Some(merged)
    --- End diff --
    
    Why is this outside the synchronized block?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by sryza <gi...@git.apache.org>.
Github user sryza commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15906274
  
    --- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala ---
    @@ -73,11 +75,16 @@ class TaskMetrics extends Serializable {
       var inputMetrics: Option[InputMetrics] = None
     
       /**
    -   * If this task reads from shuffle output, metrics on getting shuffle data will be collected here
    +   * If this task reads from shuffle output, metrics on getting shuffle data will be collected here.
    +   * This includes read metrics aggregated over all the task's shuffle dependencies.
        */
    -  private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
    +  var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
    --- End diff --
    
    Does the same logic not apply for all the other fields in TaskMetrics that are vars?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15796125
  
    --- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala ---
    @@ -98,19 +105,22 @@ class TaskMetrics extends Serializable {
        */
       var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None
     
    -  /** Adds the given ShuffleReadMetrics to any existing shuffle metrics for this task. */
    -  def updateShuffleReadMetrics(newMetrics: ShuffleReadMetrics) = synchronized {
    -    _shuffleReadMetrics match {
    -      case Some(existingMetrics) =>
    -        existingMetrics.shuffleFinishTime = math.max(
    -          existingMetrics.shuffleFinishTime, newMetrics.shuffleFinishTime)
    -        existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
    -        existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
    -        existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched
    -        existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
    -      case None =>
    -        _shuffleReadMetrics = Some(newMetrics)
    +  def createShuffleReadMetricsForDependency(): ShuffleReadMetrics = synchronized {
    +    val readMetrics = new ShuffleReadMetrics()
    +    depsShuffleReadMetrics += readMetrics
    +    readMetrics
    +  }
    +
    +  def mergeShuffleReadMetrics() = synchronized {
    --- End diff --
    
    Could you add a brief comment on what this does? (and when this happens)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by sryza <gi...@git.apache.org>.
Github user sryza commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15246688
  
    --- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala ---
    @@ -75,9 +76,12 @@ class TaskMetrics extends Serializable {
       /**
        * If this task reads from shuffle output, metrics on getting shuffle data will be collected here
        */
    -  private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
    +  var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
    --- End diff --
    
    I agree it would be a little clearer, but I'm worried that it's too verbose and is going to make a bunch of lines spill over 100 characters.  It's mainly read in the driver, which doesn't really care that it's aggregated.
    
    Would a comment be sufficient? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15906866
  
    --- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala ---
    @@ -98,19 +105,31 @@ class TaskMetrics extends Serializable {
        */
       var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None
     
    -  /** Adds the given ShuffleReadMetrics to any existing shuffle metrics for this task. */
    -  def updateShuffleReadMetrics(newMetrics: ShuffleReadMetrics) = synchronized {
    -    _shuffleReadMetrics match {
    -      case Some(existingMetrics) =>
    -        existingMetrics.shuffleFinishTime = math.max(
    -          existingMetrics.shuffleFinishTime, newMetrics.shuffleFinishTime)
    -        existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
    -        existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
    -        existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched
    -        existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
    -      case None =>
    -        _shuffleReadMetrics = Some(newMetrics)
    +  /**
    +   * A task may have multiple shuffle readers for multiple dependencies. To avoid synchronization
    +   * issues from readers in different threads, in-progress tasks use a ShuffleReadMetrics for each
    +   * dependency, and merge these metrics before reporting them to the driver. This method returns
    +   * a ShuffleReadMetrics for a dependency and registers it for merging later.
    +   */
    +  def createShuffleReadMetricsForDependency(): ShuffleReadMetrics = synchronized {
    +    val readMetrics = new ShuffleReadMetrics()
    +    depsShuffleReadMetrics += readMetrics
    +    readMetrics
    +  }
    +
    +  /**
    +   * Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics.
    +   */
    +  def mergeShuffleReadMetrics() = synchronized {
    --- End diff --
    
    Also I think this should be called `updateShuffleReadMetrics` or something since it mutates this object in place. We have functions named `mergeX` in several other places in the spark code e.g. `mergeCombiners`, they all return no objects rather than mutating an existing one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15905777
  
    --- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala ---
    @@ -73,11 +75,16 @@ class TaskMetrics extends Serializable {
       var inputMetrics: Option[InputMetrics] = None
     
       /**
    -   * If this task reads from shuffle output, metrics on getting shuffle data will be collected here
    +   * If this task reads from shuffle output, metrics on getting shuffle data will be collected here.
    +   * This includes read metrics aggregated over all the task's shuffle dependencies.
        */
    -  private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
    +  var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
    --- End diff --
    
    This plan seems good to me (sorry for the slow response here!)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-49782493
  
    QA tests have started for PR 1507. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16978/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-49581839
  
    QA results for PR 1507:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16901/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15238878
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
    @@ -31,6 +31,7 @@ import org.apache.spark.network.ConnectionManagerId
     import org.apache.spark.network.netty.ShuffleCopier
     import org.apache.spark.serializer.Serializer
     import org.apache.spark.util.Utils
    +import org.apache.spark.executor.ShuffleReadMetrics
    --- End diff --
    
    import ordering :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-51497079
  
    It's failing the 3 flaky tests that have been failing many PRs lately... test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15250466
  
    --- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala ---
    @@ -73,11 +75,16 @@ class TaskMetrics extends Serializable {
       var inputMetrics: Option[InputMetrics] = None
     
       /**
    -   * If this task reads from shuffle output, metrics on getting shuffle data will be collected here
    +   * If this task reads from shuffle output, metrics on getting shuffle data will be collected here.
    +   * This includes read metrics aggregated over all the task's shuffle dependencies.
        */
    -  private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
    +  var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
    --- End diff --
    
    Can we make this and depsShuffleReadMetrics private again, since they should only be updated through createShuffleReadMetricsForDependency() and mergeShuffleReadMetrics()?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-51439437
  
    QA tests have started for PR 1507. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18113/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-49814031
  
    QA tests have started for PR 1507. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16995/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-51404085
  
    QA results for PR 1507:<br>- This patch PASSES unit tests.<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18047/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15972438
  
    --- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala ---
    @@ -81,11 +83,26 @@ class TaskMetrics extends Serializable {
       var inputMetrics: Option[InputMetrics] = None
     
       /**
    -   * If this task reads from shuffle output, metrics on getting shuffle data will be collected here
    +   * If this task reads from shuffle output, metrics on getting shuffle data will be collected here.
    +   * This includes read metrics aggregated over all the task's shuffle dependencies.
        */
       private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
     
    -  def shuffleReadMetrics = _shuffleReadMetrics
    +  def shuffleReadMetrics() = _shuffleReadMetrics
    --- End diff --
    
    nit: since this doesn't mutate internal state the original lack of parentheses is correct style.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-51552698
  
    Okay I merged this with the minor style change. Thanks Sandy!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-49794831
  
    QA results for PR 1507:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16978/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15250426
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -29,7 +29,7 @@ import akka.actor.{ActorSystem, Cancellable, Props}
     import sun.nio.ch.DirectBuffer
     
     import org.apache.spark._
    -import org.apache.spark.executor.{DataReadMethod, InputMetrics}
    +import org.apache.spark.executor.{ShuffleReadMetrics, DataReadMethod, InputMetrics}
    --- End diff --
    
    one more alphabetization


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-51497616
  
    QA tests have started for PR 1507. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18126/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15903044
  
    --- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala ---
    @@ -98,19 +105,31 @@ class TaskMetrics extends Serializable {
        */
       var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None
     
    -  /** Adds the given ShuffleReadMetrics to any existing shuffle metrics for this task. */
    -  def updateShuffleReadMetrics(newMetrics: ShuffleReadMetrics) = synchronized {
    -    _shuffleReadMetrics match {
    -      case Some(existingMetrics) =>
    -        existingMetrics.shuffleFinishTime = math.max(
    -          existingMetrics.shuffleFinishTime, newMetrics.shuffleFinishTime)
    -        existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
    -        existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
    -        existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched
    -        existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
    -      case None =>
    -        _shuffleReadMetrics = Some(newMetrics)
    +  /**
    +   * A task may have multiple shuffle readers for multiple dependencies. To avoid synchronization
    +   * issues from readers in different threads, in-progress tasks use a ShuffleReadMetrics for each
    +   * dependency, and merge these metrics before reporting them to the driver. This method returns
    +   * a ShuffleReadMetrics for a dependency and registers it for merging later.
    +   */
    +  def createShuffleReadMetricsForDependency(): ShuffleReadMetrics = synchronized {
    --- End diff --
    
    can this be private[spark]?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by sryza <gi...@git.apache.org>.
Github user sryza commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-49778015
  
    Exactly.  The idea is to call mergeShuffleReadMetrics when we're about to send the metrics update. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15244576
  
    --- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala ---
    @@ -75,9 +76,12 @@ class TaskMetrics extends Serializable {
       /**
        * If this task reads from shuffle output, metrics on getting shuffle data will be collected here
        */
    -  private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
    +  var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
    --- End diff --
    
    Maybe aggregatedShuffleReadMetrics?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-51395051
  
    QA tests have started for PR 1507. This patch DID NOT merge cleanly! <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18047/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15238657
  
    --- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala ---
    @@ -19,6 +19,7 @@ package org.apache.spark.executor
     
     import org.apache.spark.annotation.DeveloperApi
     import org.apache.spark.storage.{BlockId, BlockStatus}
    +import scala.collection.mutable.ArrayBuffer
    --- End diff --
    
    nit: Import ordering


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/1507


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-49820737
  
    QA results for PR 1507:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16995/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15238692
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala ---
    @@ -20,6 +20,8 @@ package org.apache.spark.shuffle.hash
     import org.apache.spark.{InterruptibleIterator, TaskContext}
     import org.apache.spark.serializer.Serializer
     import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReader}
    +import org.apache.spark.executor.ShuffleReadMetrics
    +import scala.collection.mutable.ArrayBuffer
    --- End diff --
    
    import ordering


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15238860
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala ---
    @@ -35,8 +37,15 @@ class HashShuffleReader[K, C](
     
       /** Read the combined key-values for this reduce task */
       override def read(): Iterator[Product2[K, C]] = {
    +    val readMetrics = new ShuffleReadMetrics()
    +    context.taskMetrics.synchronized {
    +      if (context.taskMetrics.depsShuffleReadMetrics == null) {
    +        context.taskMetrics.depsShuffleReadMetrics = new ArrayBuffer[ShuffleReadMetrics]()
    +      }
    +      context.taskMetrics.depsShuffleReadMetrics += readMetrics
    +    }
    --- End diff --
    
    Can we make getNewShuffleReadMetrics() or something a function in TaskMetrics that does this?  I think it would be better to have TaskMetrics handle the synchronization, because it makes it more obvious to someone reading / modifying the code later what the concurrency properties of depsShuffleReadMetrics are.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15242942
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
    @@ -191,7 +183,7 @@ object BlockFetcherIterator {
             }
           }
           logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " +
    -        (numLocal + numRemote) + " blocks")
    +        (_numBlocksToFetch + localBlocksToFetch.size) + " blocks")
    --- End diff --
    
    Doesn't _numBlocksToFetch already include the local blocks?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1507#discussion_r15247072
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
    @@ -131,7 +122,9 @@ object BlockFetcherIterator {
                 val networkSize = blockMessage.getData.limit()
                 results.put(new FetchResult(blockId, sizeMap(blockId),
                   () => dataDeserialize(blockId, blockMessage.getData, serializer)))
    -            _remoteBytesRead += networkSize
    +            // TODO: race conditions can occur here with NettyBlockFetcherIterator
    --- End diff --
    
    Cool thanks for filing that!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-51403107
  
    QA results for PR 1507:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18049/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-49690248
  
    @kayousterhout


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1507#issuecomment-51504248
  
    QA results for PR 1507:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18126/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org