You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by juanrh <gi...@git.apache.org> on 2017/10/26 23:14:07 UTC

[GitHub] spark pull request #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE]

GitHub user juanrh opened a pull request:

    https://github.com/apache/spark/pull/19583

    [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE]

    ## What changes were proposed in this pull request?
    When a task finishes with error due to a fetch error, then DAGScheduler unregisters the shuffle blocks hosted by the serving executor (or even all the executors in the failing host, with external shuffle and spark.files.fetchFailure.unRegisterOutputOnHost enabled) in the shuffle block directory stored by MapOutputTracker, that then increments its epoch as a result. This event is only signaled to the other executors when a new task with a new epoch starts in each executor. This means that other executors reading from the failed executors will retry fetching shuffle blocks from them, even though the driver already knows those executors are lost and those blocks are now unavailable at those locations. This impacts job runtime, specially for long shuffles and executor failures at the end of a stage, when the only pending tasks are shuffle reads.
    This could be improved by pushing the epoch update to the executors without having to wait for a new task. In the attached patch I sketch a possible solution that sends the updated epoch from the driver to the executors by piggybacking on the executor heartbeat response. ShuffleBlockFetcherIterator, RetryingBlockFetcher and BlockFetchingListener are modified so blocks locations are checked on each fetch retry. This doesn't introduce additional traffic, as MapOutputTrackerWorker.mapStatuses is shared by all tasks running on the same Executor, and the lookup of the new shuffle blocks directory was going to happen anyway when the new epoch is detected during the start of the next task.
    I would like to know the opinion of the community on this approach.
    
    ## How was this patch tested?
    Unit tests, tests in Yarn cluster

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/juanrh/spark hortala-push_epoch_update

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19583.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19583
    
----
commit 1381edd9ba5d6b776c91fec14f25382225e80cb4
Author: Juan Rodriguez Hortala <ho...@amazon.com>
Date:   2017-10-21T01:56:29Z

    draft proposal

commit 7f538df6f3bffcb76a7dbc262cf75d410b635bdd
Author: Juan Rodriguez Hortala <ho...@amazon.com>
Date:   2017-10-21T02:06:43Z

    adapt code to change in MapOutputTracker

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push epoch u...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19583
  
    how much benifit we can get here? By default the shffule block fetcher only retry 3 times, which won't be a lot of time. After shuffle block fetcher failed and we retry this task, we are fine.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19583#discussion_r147526776
  
    --- Diff: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ---
    @@ -51,7 +51,26 @@ private case class ExecutorRegistered(executorId: String)
     
     private case class ExecutorRemoved(executorId: String)
     
    -private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
    +private[spark] case class UpdatedEpoch(epoch: Long)
    +
    +private[spark] object HeartbeatResponse {
    +  def apply(reregisterBlockManager: Boolean,
    +            updatedEpoch: Option[Long] = None): HeartbeatResponse =
    +    updatedEpoch.fold[HeartbeatResponse](BasicHeartbeatResponse(reregisterBlockManager)) {
    --- End diff --
    
    nit: a while back I think the community decided that `Option.fold` is pretty confusing for folks that aren't used to scala, with little gain, so I'd get rid of it
    (though given my other comment about the unnecessary class, you can probably just remove this whole thing.)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push epoch u...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19583
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19583#discussion_r148917664
  
    --- Diff: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ---
    @@ -51,7 +51,26 @@ private case class ExecutorRegistered(executorId: String)
     
     private case class ExecutorRemoved(executorId: String)
     
    -private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
    +private[spark] case class UpdatedEpoch(epoch: Long)
    +
    +private[spark] object HeartbeatResponse {
    +  def apply(reregisterBlockManager: Boolean,
    +            updatedEpoch: Option[Long] = None): HeartbeatResponse =
    +    updatedEpoch.fold[HeartbeatResponse](BasicHeartbeatResponse(reregisterBlockManager)) {
    +      epoch => HeartbeatResponseWithEpoch(reregisterBlockManager, Some(epoch))
    +    }
    +}
    +
    +private[spark] sealed trait HeartbeatResponse {
    +  def reregisterBlockManager: Boolean
    +  def updatedEpoch: Option[Long] = None
    +}
    +
    +private[spark] case class BasicHeartbeatResponse(reregisterBlockManager: Boolean)
    +  extends HeartbeatResponse
    +private[spark] case class HeartbeatResponseWithEpoch(reregisterBlockManager: Boolean,
    +                                                     override val updatedEpoch: Option[Long])
    +  extends HeartbeatResponse
    --- End diff --
    
    seems not a big saving, I think we can even always include the epoch in the heartbeat response.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push epoch u...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19583
  
    instead of piggybacking the heartbeat, can we just let the driver send an event to all executors asynchronously about the node failure? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push epoch u...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19583
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push epoch u...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19583
  
    also cc @JoshRosen 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19583#discussion_r147526524
  
    --- Diff: core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala ---
    @@ -225,6 +270,7 @@ class HeartbeatReceiverSuite
             Matchers.eq(Array(1L -> metrics.accumulators())),
             Matchers.eq(blockManagerId))
         }
    +    expectedUpdatedEpoch.foreach{_ => assert(response.updatedEpoch === expectedUpdatedEpoch)}
    --- End diff --
    
    don't you want to do this assert even if the expectedUpdate is `None`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push epoch u...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19583
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83136/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push epoch u...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19583
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push epoch u...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19583
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push ...

Posted by juanrh <gi...@git.apache.org>.
Github user juanrh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19583#discussion_r148078337
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ---
    @@ -241,6 +243,21 @@ final class ShuffleBlockFetcherIterator(
             logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e)
             results.put(new FailureFetchResult(BlockId(blockId), address, e))
           }
    +
    +      override def shouldRetry(t: Throwable): Boolean = {
    --- End diff --
    
    The idea here is that `shouldRetry` checks again the map from `BlockManagerId` to blocks, because `_blocksByAddress` is a by name parameter, and therefore ` _blocksByAddress.toMap` is actually recomputing that `Seq[(BlockManagerId, Seq[(BlockId, Long)])]` that is passed in `BlockStoreShuffleReader.read` as `mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition)`. So `shouldRetry` indirectly calls `mapOutputTracker.getMapSizesByExecutorId` and this might even lead to a `MetadataFetchFailedException` for the missing executor


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push ...

Posted by juanrh <gi...@git.apache.org>.
Github user juanrh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19583#discussion_r148075569
  
    --- Diff: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ---
    @@ -51,7 +51,26 @@ private case class ExecutorRegistered(executorId: String)
     
     private case class ExecutorRemoved(executorId: String)
     
    -private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
    +private[spark] case class UpdatedEpoch(epoch: Long)
    +
    +private[spark] object HeartbeatResponse {
    +  def apply(reregisterBlockManager: Boolean,
    +            updatedEpoch: Option[Long] = None): HeartbeatResponse =
    +    updatedEpoch.fold[HeartbeatResponse](BasicHeartbeatResponse(reregisterBlockManager)) {
    +      epoch => HeartbeatResponseWithEpoch(reregisterBlockManager, Some(epoch))
    +    }
    +}
    +
    +private[spark] sealed trait HeartbeatResponse {
    +  def reregisterBlockManager: Boolean
    +  def updatedEpoch: Option[Long] = None
    +}
    +
    +private[spark] case class BasicHeartbeatResponse(reregisterBlockManager: Boolean)
    +  extends HeartbeatResponse
    +private[spark] case class HeartbeatResponseWithEpoch(reregisterBlockManager: Boolean,
    +                                                     override val updatedEpoch: Option[Long])
    +  extends HeartbeatResponse
    --- End diff --
    
    The idea was saving a bit of space when serializing, because `BasicHeartbeatResponse` doesn't even have an `updatedEpoch` field but encodes the `None` in the type. As `BasicHeartbeatResponse` is what we send most of the time, then we only pay for the additional optional field when we actually have an updated epoch to send. But this might be a tiny optimization that is not worth the complexity


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push epoch u...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19583
  
    **[Test build #83136 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83136/testReport)** for PR 19583 at commit [`7f538df`](https://github.com/apache/spark/commit/7f538df6f3bffcb76a7dbc262cf75d410b635bdd).
     * This patch **fails MiMa tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push epoch u...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/19583
  
    Jenkins, ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push epoch u...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19583
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push epoch u...

Posted by juanrh <gi...@git.apache.org>.
Github user juanrh commented on the issue:

    https://github.com/apache/spark/pull/19583
  
    Even though we only wait 5 seconds by default between retries, the retries themselves can take a lot of time. For example in a simple word count job where a node is lost during stage 1.0 I have seen the following logs for `RetryingBlockFetcher`: 
    
    ```
    17/09/13 23:34:23 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 332 outstanding blocks after 5000 ms
    17/09/13 23:34:23 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 286 outstanding blocks after 5000 ms
    17/09/13 23:34:23 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 312 outstanding blocks after 5000 ms
    17/09/13 23:34:23 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 264 outstanding blocks after 5000 ms
    17/09/13 23:34:23 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 294 outstanding blocks after 5000 ms
    17/09/13 23:34:23 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 316 outstanding blocks after 5000 ms
    17/09/13 23:34:23 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 276 outstanding blocks after 5000 ms
    17/09/13 23:34:23 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 327 outstanding blocks after 5000 ms
    17/09/13 23:36:28 INFO RetryingBlockFetcher: Retrying fetch (2/3) for 316 outstanding blocks after 5000 ms
    17/09/13 23:38:28 INFO RetryingBlockFetcher: Retrying fetch (2/3) for 264 outstanding blocks after 5000 ms
    17/09/13 23:40:28 INFO RetryingBlockFetcher: Retrying fetch (2/3) for 294 outstanding blocks after 5000 ms
    17/09/13 23:42:28 INFO RetryingBlockFetcher: Retrying fetch (2/3) for 286 outstanding blocks after 5000 ms
    17/09/13 23:44:28 INFO RetryingBlockFetcher: Retrying fetch (2/3) for 312 outstanding blocks after 5000 ms
    17/09/13 23:46:28 INFO RetryingBlockFetcher: Retrying fetch (2/3) for 332 outstanding blocks after 5000 ms
    17/09/13 23:48:28 INFO RetryingBlockFetcher: Retrying fetch (2/3) for 327 outstanding blocks after 5000 ms
    17/09/13 23:50:28 INFO RetryingBlockFetcher: Retrying fetch (2/3) for 276 outstanding blocks after 5000 ms
    17/09/13 23:50:31 INFO RetryingBlockFetcher: Retrying fetch (3/3) for 327 outstanding blocks after 5000 ms
    17/09/13 23:50:34 INFO RetryingBlockFetcher: Retrying fetch (3/3) for 332 outstanding blocks after 5000 ms
    17/09/13 23:50:37 INFO RetryingBlockFetcher: Retrying fetch (3/3) for 312 outstanding blocks after 5000 ms
    17/09/13 23:50:40 INFO RetryingBlockFetcher: Retrying fetch (3/3) for 286 outstanding blocks after 5000 ms
    17/09/13 23:50:43 INFO RetryingBlockFetcher: Retrying fetch (3/3) for 294 outstanding blocks after 5000 ms
    17/09/13 23:50:46 INFO RetryingBlockFetcher: Retrying fetch (3/3) for 264 outstanding blocks after 5000 ms
    17/09/13 23:50:49 INFO RetryingBlockFetcher: Retrying fetch (3/3) for 316 outstanding blocks after 5000 ms
    17/09/13 23:51:07 INFO RetryingBlockFetcher: Retrying fetch (3/3) for 276 outstanding blocks after 5000 ms
    ```
    
    where the job has the following stage submission and completion events 
    
    ```
    17/09/13 23:21:57 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at flatMap at WordCount.scala:64), which has no missing parents
    17/09/13 23:31:34 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at saveAsTextFile at WordCount.scala:110), which has no missing parents
    17/09/13 23:50:51 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at flatMap at WordCount.scala:64), which has no missing parents
    17/09/13 23:52:19 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at saveAsTextFile at WordCount.scala:110), which has no missing parents
    
    17/09/13 23:31:34 INFO DAGScheduler: ShuffleMapStage 0 (flatMap at WordCount.scala:64) finished in 575.500 s
    17/09/13 23:52:19 INFO DAGScheduler: ShuffleMapStage 0 (flatMap at WordCount.scala:64) finished in 87.455 s
    17/09/13 23:59:07 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at WordCount.scala:110) finished in 408.167 s
    ```
    
    This means that approximately 16 minutes of a 38 minutes job where spent in retries, which is 42% of the job time. This might impact applications that lose a node at a moment when a single action is running. For example applications with a single action at the end, for writing the application results to external storage.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push ...

Posted by juanrh <gi...@git.apache.org>.
Github user juanrh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19583#discussion_r148079448
  
    --- Diff: core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala ---
    @@ -225,6 +270,7 @@ class HeartbeatReceiverSuite
             Matchers.eq(Array(1L -> metrics.accumulators())),
             Matchers.eq(blockManagerId))
         }
    +    expectedUpdatedEpoch.foreach{_ => assert(response.updatedEpoch === expectedUpdatedEpoch)}
    --- End diff --
    
    you are right, I was only thinking on the positive case


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push epoch u...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19583
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push epoch u...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19583
  
    **[Test build #83136 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83136/testReport)** for PR 19583 at commit [`7f538df`](https://github.com/apache/spark/commit/7f538df6f3bffcb76a7dbc262cf75d410b635bdd).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE]

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19583
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push ...

Posted by juanrh <gi...@git.apache.org>.
Github user juanrh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19583#discussion_r148078847
  
    --- Diff: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ---
    @@ -51,7 +51,26 @@ private case class ExecutorRegistered(executorId: String)
     
     private case class ExecutorRemoved(executorId: String)
     
    -private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
    +private[spark] case class UpdatedEpoch(epoch: Long)
    +
    +private[spark] object HeartbeatResponse {
    +  def apply(reregisterBlockManager: Boolean,
    +            updatedEpoch: Option[Long] = None): HeartbeatResponse =
    +    updatedEpoch.fold[HeartbeatResponse](BasicHeartbeatResponse(reregisterBlockManager)) {
    --- End diff --
    
    Makes sense, this code should be adapted to follow the code to the Spark code base conventions and style


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19583#discussion_r147525074
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ---
    @@ -241,6 +243,21 @@ final class ShuffleBlockFetcherIterator(
             logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e)
             results.put(new FailureFetchResult(BlockId(blockId), address, e))
           }
    +
    +      override def shouldRetry(t: Throwable): Boolean = {
    --- End diff --
    
    I don't understand the changes to `ShuffleBlockFetcherIterator` at all (I guess they're incomplete from the TODOs??)
    
    couldn't your `shouldRetry` just check if the  epoch has been updated?  (you'd have to modify it so it has some way to get the current epoch)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push epoch u...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19583
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83419/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19583: [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE] Push ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19583#discussion_r147522958
  
    --- Diff: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ---
    @@ -51,7 +51,26 @@ private case class ExecutorRegistered(executorId: String)
     
     private case class ExecutorRemoved(executorId: String)
     
    -private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
    +private[spark] case class UpdatedEpoch(epoch: Long)
    +
    +private[spark] object HeartbeatResponse {
    +  def apply(reregisterBlockManager: Boolean,
    +            updatedEpoch: Option[Long] = None): HeartbeatResponse =
    +    updatedEpoch.fold[HeartbeatResponse](BasicHeartbeatResponse(reregisterBlockManager)) {
    +      epoch => HeartbeatResponseWithEpoch(reregisterBlockManager, Some(epoch))
    +    }
    +}
    +
    +private[spark] sealed trait HeartbeatResponse {
    +  def reregisterBlockManager: Boolean
    +  def updatedEpoch: Option[Long] = None
    +}
    +
    +private[spark] case class BasicHeartbeatResponse(reregisterBlockManager: Boolean)
    +  extends HeartbeatResponse
    +private[spark] case class HeartbeatResponseWithEpoch(reregisterBlockManager: Boolean,
    +                                                     override val updatedEpoch: Option[Long])
    +  extends HeartbeatResponse
    --- End diff --
    
    the two levels of optional updatedEpoch -- (1) implementing class and (2) the `Option` itself seem unnecessary.  I'd stick to just one type with an `Option`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org