You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by JoshRosen <gi...@git.apache.org> on 2017/05/11 23:10:24 UTC

[GitHub] spark pull request #17955: [SPARK-20715] Store MapStatuses only in MapOutput...

GitHub user JoshRosen opened a pull request:

    https://github.com/apache/spark/pull/17955

    [SPARK-20715] Store MapStatuses only in MapOutputTracker, not ShuffleMapStage

    ## What changes were proposed in this pull request?
    
    This PR refactors `ShuffleMapStage` and `MapOutputTracker` in order to simplify the management of `MapStatuses`, reduce driver memory consumption, and remove a potential source of scheduler correctness bugs.
    
    ### Background
    
    In Spark there are currently two places where MapStatuses are tracked:
    
    - The `MapOutputTracker` maintains an `Array[MapStatus]` storing a single location for each map output. This mapping is used by the `DAGScheduler` for determining reduce-task locality preferences (when locality-aware reduce task scheduling is enabled) and is also used to serve map output locations to executors / tasks.
    - Each `ShuffleMapStage` also contains a mapping of `Array[List[MapStatus]]` which holds the complete set of locations where each map output could be available. This mapping is used to determine which map tasks need to be run when constructing `TaskSets` for the stage.
    
    This duplication adds complexity and creates the potential for certain types of correctness bugs.  Bad things can happen if these two copies of the map output locations get out of sync. For instance, if the `MapOutputTracker` is missing locations for a map output but `ShuffleMapStage` believes that locations are available then tasks will fail with `MetadataFetchFailedException` but `ShuffleMapStage` will not be updated to reflect the missing map outputs, leading to situations where the stage will be reattempted (because downstream stages experienced fetch failures) but no task sets will be launched (because `ShuffleMapStage` thinks all maps are available).
    
    I observed this behavior in a real-world deployment. I'm still not quite sure how the state got out of sync in the first place, but we can completely avoid this class of bug if we eliminate the duplicate state.
    
    ### Why we only need to track a single location for each map output
    
    I think that storing an `Array[List[MapStatus]]` in `ShuffleMapStage` is unnecessary.
    
    First, note that this adds memory/object bloat to the driver we need one extra `List` per task. If you have millions of tasks across all stages then this can add up to be a significant amount of resources. 
    
    Secondly, I believe that it's extremely uncommon that these lists will ever contain more than one entry. It's not impossible, but is very unlikely given the conditions which must occur for that to happen:
    
    - In normal operation (no task failures) we'll only run each task once and thus will have at most one output.
    - If speculation is enabled then it's possible that we'll have multiple attempts of a task. The TaskSetManager will [kill duplicate attempts of a task](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L717) after a task finishes successfully, reducing the likelihood that both the original and speculated task will successfully register map outputs.
    - There is a [comment in `TaskSetManager`](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L113) which suggests that running tasks are not killed if a task set becomes a zombie. However:
      - If the task set becomes a zombie due to the job being cancelled then it doesn't matter whether we record map outputs.
      - If the task set became a zombie because of a stage failure (e.g. the map stage itself had a fetch failure from an upstream match stage) then I believe that the "failedEpoch" will be updated which may cause map outputs from still-running tasks to [be ignored](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1213). (I'm not 100% sure on this point, though).
    - Even if you _do_ manage to record multiple map outputs for a stage, only a single map output is reported to / tracked by the MapOutputTracker. The only situation where the additional output locations could actually be read or used would be if a task experienced a `FetchFailure` exception. The most likely cause of a `FetchFailure` exception is an executor lost, which will have most likely caused the loss of several map tasks' output, so saving on potential re-execution of a single map task isn't a huge win if we're going to have to recompute several other lost map outputs from other tasks which ran on that lost executor. Also note that the re-population of MapOutputTracker state from state in the ShuffleMapTask only happens after the reduce stage has failed; the additional location doesn't help to prevent FetchFailures but, instead, can only reduce the amount of work when recomputing missing parent stages.
    
    Given this, this patch chooses to do away with tracking multiple locations for map outputs and instead stores only a single location. This change removes the main distinction between the `ShuffleMapTask` and `MapOutputTracker`'s copies of this state, paving the way for storing it only in the `MapOutputTracker`.
    
    ### Overview of other changes
    
    - Significantly simplified the cache / lock management inside of the `MapOutputTrackerMaster`:
      - The old code had several parallel `HashMap`s which had to be guarded by maps of `Object`s which were used as locks. This code was somewhat complicated to follow.
      - The new code uses a new `ShuffleStatus` class to group together all of the state associated with a particular shuffle, including cached serialized map statuses, significantly simplifying the logic.
    - Moved more code out of the shared `MapOutputTracker` abstract base class and into the `MapOutputTrackerMaster` and `MapOutputTrackerWorker` subclasses. This makes it easier to reason about which functionality needs to be supported only on the driver or executor.
    - Removed a bunch of code from the `DAGScheduler` which was used to synchronize information from the `MapOutputTracker` to `ShuffleMapStage`.
    
    I will comment on these changes via inline GitHub review comments.
    
    /cc @hvanhovell and @rxin (whom I discussed this with offline), @tgravescs (who recently worked on caching of serialized MapOutputStatuses), and @kayousterhout and @markhamstra (for scheduler changes).
    
    ## How was this patch tested?
    
    Existing tests. I purposely avoided making interface / API which would require significant updates or modifications to test code.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/JoshRosen/spark map-output-tracker-rewrite

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/17955.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #17955
    
----
commit 9e12f59d4e184094d655981342b3610cf0cf616e
Author: Josh Rosen <jo...@databricks.com>
Date:   2017-05-11T02:29:59Z

    WIP towards changing map output tracker internals

commit 52be832d0847443ba7ebed1bd2f47ed4bf545678
Author: Josh Rosen <jo...@databricks.com>
Date:   2017-05-11T06:05:20Z

    Separate driver and executor impls.

commit 1aa14f895969d252705af15a47cf8bd243da131d
Author: Josh Rosen <jo...@databricks.com>
Date:   2017-05-11T06:11:58Z

    Avoid unnecessary materialization of mapoutputtracker format locs.

commit a4298da66da765a9987bc1d0d794e911c4a45e1f
Author: Josh Rosen <jo...@databricks.com>
Date:   2017-05-11T06:13:20Z

    Fix formatting problem.

commit 8ed62bdbca293384b55c8eee67ada5fbb9d83481
Author: Josh Rosen <jo...@databricks.com>
Date:   2017-05-11T06:22:45Z

    Implement getNumCachedSerializedBroadcast

commit 5683ec17d6c6df32820e84906107cc838c1166fb
Author: Josh Rosen <jo...@databricks.com>
Date:   2017-05-11T06:26:36Z

    Fix getMapSizesByExecutorId in case of no outputs.

commit 06ef8d35589901088daa1fd90bd2e8892b8ce7f5
Author: Josh Rosen <jo...@databricks.com>
Date:   2017-05-11T07:13:13Z

    Get DAGScheduler suite passing.

commit e9caad5246f9d6e81663743ef3de7600a6a65960
Author: Josh Rosen <jo...@databricks.com>
Date:   2017-05-11T07:14:40Z

    formatting

commit 54a033cfbeaf99ec380b60a2acf7d93ed24c8545
Author: Josh Rosen <jo...@databricks.com>
Date:   2017-05-11T21:46:21Z

    Remove need to store multiple statuses for a single map (explanation to come in PR desc.)

commit f4c096f69cadafd7695b38fad14d58c54943a085
Author: Josh Rosen <jo...@databricks.com>
Date:   2017-05-11T22:23:13Z

    Add Scaladoc.

commit 7d59bbe117a0a06b418c10fd509b9ff7e344bfd7
Author: Josh Rosen <jo...@databricks.com>
Date:   2017-05-11T22:28:43Z

    Fix test multiple simultaneous attempts for one task (SPARK-8029)

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    I've looked at only the DAGScheduler changes so far. They LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17955: [SPARK-20715] Store MapStatuses only in MapOutput...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17955#discussion_r116124168
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1233,16 +1223,13 @@ class DAGScheduler(
                   logInfo("waiting: " + waitingStages)
                   logInfo("failed: " + failedStages)
     
    -              // We supply true to increment the epoch number here in case this is a
    +              // Increment the epoch number here in case this is a
                   // recomputation of the map outputs. In that case, some nodes may have cached
                   // locations with holes (from when we detected the error) and will need the
                   // epoch incremented to refetch them.
                   // TODO: Only increment the epoch number if this is not the first time
    --- End diff --
    
    It might be possible to resolve this `TODO` but I haven't thought about it too much yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17955: [SPARK-20715] Store MapStatuses only in MapOutput...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17955#discussion_r120205772
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala ---
    @@ -42,13 +41,12 @@ private[spark] class ShuffleMapStage(
         parents: List[Stage],
         firstJobId: Int,
         callSite: CallSite,
    -    val shuffleDep: ShuffleDependency[_, _, _])
    +    val shuffleDep: ShuffleDependency[_, _, _],
    --- End diff --
    
    I'm actually going to leave this as-is for now since I think the reference might actually be serving a purpose and I want to minimize scope of changes for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    **[Test build #76836 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76836/testReport)** for PR 17955 at commit [`7d59bbe`](https://github.com/apache/spark/commit/7d59bbe117a0a06b418c10fd509b9ff7e344bfd7).
     * This patch **fails to generate documentation**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    @JoshRosen Yes, I agree that it is orthogonal -- at least for now. I'm mostly just offering a heads up that if we get around to addressing `interruptThread`, then there may also need to be some changes related to mapOutput tracking.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76838/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76886/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    **[Test build #76838 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76838/testReport)** for PR 17955 at commit [`e3da298`](https://github.com/apache/spark/commit/e3da298d59c764388ec6ca93ec23ba3eb8de96d3).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    **[Test build #76836 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76836/testReport)** for PR 17955 at commit [`7d59bbe`](https://github.com/apache/spark/commit/7d59bbe117a0a06b418c10fd509b9ff7e344bfd7).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    at a high level this definitely makes sense.  I need to look at in more detail, I'll try to do that in the next day or two.
    
    I am wondering what all testing you have done on this?  have you done manual testing on large jobs and different types of failures, like the fetch failures and such?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77850/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    **[Test build #77759 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77759/testReport)** for PR 17955 at commit [`4550f61`](https://github.com/apache/spark/commit/4550f616a4f9c144a2da49a31ef3eaa19a0eeea8).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77759/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    I've been running some local tests of scheduler throughput to make sure that this doesn't adversely affect performance in the processing of task completion events (I discovered the perf. hotspot fixed in #18008 as part of this testing).
    
    I'm planning to run more comprehensive end-to-end tests on real clusters (with injected failures) in the next couple of days.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    **[Test build #77850 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77850/testReport)** for PR 17955 at commit [`4550f61`](https://github.com/apache/spark/commit/4550f616a4f9c144a2da49a31ef3eaa19a0eeea8).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17955: [SPARK-20715] Store MapStatuses only in MapOutput...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17955#discussion_r117385673
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1233,17 +1223,6 @@ class DAGScheduler(
                   logInfo("waiting: " + waitingStages)
                   logInfo("failed: " + failedStages)
     
    -              // We supply true to increment the epoch number here in case this is a
    -              // recomputation of the map outputs. In that case, some nodes may have cached
    -              // locations with holes (from when we detected the error) and will need the
    -              // epoch incremented to refetch them.
    -              // TODO: Only increment the epoch number if this is not the first time
    -              //       we registered these map outputs.
    -              mapOutputTracker.registerMapOutputs(
    -                shuffleStage.shuffleDep.shuffleId,
    -                shuffleStage.outputLocInMapOutputTrackerFormat(),
    -                changeEpoch = true)
    --- End diff --
    
    Is it safer if we increment the epoch number here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    The `MapOutputTrackerSuite` `remote fetch` test case failed as of that last commit because I didn't faithfully replicate the behavior of `clearEpoch()` / `incrementEpoch()`.
    
    In the old code it seems like bumping the epoch would effectively invalidate the entire driver-side cache of map output statuses, whereas my code does this at a finer-granularity by effectively tracking cache validity on a per-shuffle basis: if an executor lost event actually ends up removing map output statuses for a shuffle then we invalidate only that shuffle's cache; other caches aren't affected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    **[Test build #76886 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76886/testReport)** for PR 17955 at commit [`a8069a3`](https://github.com/apache/spark/commit/a8069a3fb3edbff39786301d7572be6de1cd931c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    @JoshRosen The hard coding of interruptThread = true within TaskSetManager's handleSuccessfulTask to effect the killing of duplicate, speculative attempts of a task is potentially an issue -- not a new issue with this PR, but one that hasn't been fully analyzed and addressed AFAIK. https://issues.apache.org/jira/browse/SPARK-17064


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17955: [SPARK-20715] Store MapStatuses only in MapOutput...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17955#discussion_r117388593
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala ---
    @@ -42,13 +41,12 @@ private[spark] class ShuffleMapStage(
         parents: List[Stage],
         firstJobId: Int,
         callSite: CallSite,
    -    val shuffleDep: ShuffleDependency[_, _, _])
    +    val shuffleDep: ShuffleDependency[_, _, _],
    --- End diff --
    
    Good catch. I agree, but with the caveat that we can only clean this up if this isn't functioning as the last strong reference which keeps the dependency from being garbage-collected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    jenkins retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17955: [SPARK-20715] Store MapStatuses only in MapOutput...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17955#discussion_r117385925
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala ---
    @@ -42,13 +41,12 @@ private[spark] class ShuffleMapStage(
         parents: List[Stage],
         firstJobId: Int,
         callSite: CallSite,
    -    val shuffleDep: ShuffleDependency[_, _, _])
    +    val shuffleDep: ShuffleDependency[_, _, _],
    --- End diff --
    
    Seems we can pass the `shuffleId`, instead of `ShuffleDependency` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    I've merged this to master (2.3.0).
    
    Thanks to everyone who helped to review. If there is additional feedback at this point then I'll address it in a quick followup.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17955: [SPARK-20715] Store MapStatuses only in MapOutput...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/17955


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17955: [SPARK-20715] Store MapStatuses only in MapOutput...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17955#discussion_r116124697
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1392,17 +1378,7 @@ class DAGScheduler(
     
           if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {
             logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
    -        // TODO: This will be really slow if we keep accumulating shuffle map stages
    -        for ((shuffleId, stage) <- shuffleIdToMapStage) {
    -          stage.removeOutputsOnExecutor(execId)
    -          mapOutputTracker.registerMapOutputs(
    -            shuffleId,
    -            stage.outputLocInMapOutputTrackerFormat(),
    --- End diff --
    
    This was a potential inefficiency in the old code. If you lost a single executor then we'd end up overwriting the `MapOutputTracker`'s state for every ShuffleMapStage. We'd have to iterate through each task once in  `stage.removeOutputsOnExecutor` and then a second time in `stage.outputLocInMapOutputTrackerFormat()` and in both iterations we'd have to iterate on a `List` for each task too. 
    
    While the new code still needs to scan every task it doesn't have to scan a separate list per task and only needs a single scan, not two.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76836/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    LGTM, also cc @cloud-fan 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    **[Test build #77759 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77759/testReport)** for PR 17955 at commit [`4550f61`](https://github.com/apache/spark/commit/4550f616a4f9c144a2da49a31ef3eaa19a0eeea8).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    @jiangxb1987, sorry for the super long delay in addressing that latest review comment. I've made that one change you suggested at https://github.com/apache/spark/pull/17955#discussion_r117385673 so now I think this should be good to go. Any final comments?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by sameeragarwal <gi...@git.apache.org>.
Github user sameeragarwal commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    cc @jiangxb1987 for review


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    **[Test build #77850 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77850/testReport)** for PR 17955 at commit [`4550f61`](https://github.com/apache/spark/commit/4550f616a4f9c144a2da49a31ef3eaa19a0eeea8).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    @markhamstra, I think that the the `interruptThread = true` hardcoding may be orthogonal to this PR's proposed changes: `interruptThread` affects how we carry out task cancellation, not whether we attempt to cancel, so even if you set `interruptThread = false` you could still effectively cancel redundant task attempts since those tasks would check the `interrupted` flag in `TaskContext.isInterrupted` or via `InterruptibleIterator`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17955: [SPARK-20715] Store MapStatuses only in MapOutput...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17955#discussion_r116124857
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1392,17 +1378,7 @@ class DAGScheduler(
     
           if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {
             logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
    -        // TODO: This will be really slow if we keep accumulating shuffle map stages
    -        for ((shuffleId, stage) <- shuffleIdToMapStage) {
    -          stage.removeOutputsOnExecutor(execId)
    -          mapOutputTracker.registerMapOutputs(
    -            shuffleId,
    -            stage.outputLocInMapOutputTrackerFormat(),
    -            changeEpoch = true)
    -        }
    -        if (shuffleIdToMapStage.isEmpty) {
    -          mapOutputTracker.incrementEpoch()
    --- End diff --
    
    I think the idea behind this branch was to ensure that we always increment the `mapOutputTracker` epoch upon executor lost. Thus I put an `incrementEpoch()` into the `removeOutputsOnExecutor()` call itself inside `MapOutputTrackerMaster`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17955: [SPARK-20715] Store MapStatuses only in MapOutput...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17955#discussion_r116123982
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -328,25 +328,14 @@ class DAGScheduler(
         val numTasks = rdd.partitions.length
         val parents = getOrCreateParentStages(rdd, jobId)
         val id = nextStageId.getAndIncrement()
    -    val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
    +    val stage = new ShuffleMapStage(
    +      id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
     
         stageIdToStage(id) = stage
         shuffleIdToMapStage(shuffleDep.shuffleId) = stage
         updateJobIdStageIdMaps(jobId, stage)
     
    -    if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
    --- End diff --
    
    This logic is no longer necessary because `ShuffleMapTask` queries the `MapOutputTracker`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17955: [SPARK-20715] Store MapStatuses only in MapOutput...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17955#discussion_r116124372
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1216,7 +1205,8 @@ class DAGScheduler(
                   // The epoch of the task is acceptable (i.e., the task was launched after the most
                   // recent failure we're aware of for the executor), so mark the task's output as
                   // available.
    -              shuffleStage.addOutputLoc(smt.partitionId, status)
    --- End diff --
    
    In the old code we'd incrementally register map outputs in `ShuffleStage` and then would write the entire set of complete map outputs into `MapOutputTracker` in one shot upon stage completion (see line 1242 in the old code). Now we write this incrementally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17955: [SPARK-20715] Store MapStatuses only in MapOutput...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17955#discussion_r117388447
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1233,17 +1223,6 @@ class DAGScheduler(
                   logInfo("waiting: " + waitingStages)
                   logInfo("failed: " + failedStages)
     
    -              // We supply true to increment the epoch number here in case this is a
    -              // recomputation of the map outputs. In that case, some nodes may have cached
    -              // locations with holes (from when we detected the error) and will need the
    -              // epoch incremented to refetch them.
    -              // TODO: Only increment the epoch number if this is not the first time
    -              //       we registered these map outputs.
    -              mapOutputTracker.registerMapOutputs(
    -                shuffleStage.shuffleDep.shuffleId,
    -                shuffleStage.outputLocInMapOutputTrackerFormat(),
    -                changeEpoch = true)
    --- End diff --
    
    I need to think about this carefully and maybe make a matrix of possible cases to be sure. My original thought process was something like this:
    
    - The old code comment says `TODO: Only increment the epoch number if this is not the first time we registered these map outputs`, which implies that at least some of the epoch increments here were unnecessary.
    - If we assume that a new, never-before-computed map output won't be requested by executors before it is complete then we don't need to worry about executors caching incomplete map outputs.
    - I believe that any FetchFailure should end up incrementing the epoch.
    
    That said, the increment here is only occurring once per stage completion. It probably doesn't _hurt_ to bump the epoch here because in a single-stage-at-a-time case we'd only be invalidating map outputs which we'll never fetch again anyways. Even if we were unnecessarily invalidating the map output statuses of other concurrent stages I think that the impact of this is going to be relatively small (if we did find that this had an impact then a sane approach would be to implement an e-tag like mechanism where bumping the epoch doesn't purge the executor-side caches, but, instead, has them verify a per-stage epoch / counter). Finally, the existing code might be giving us nice eager cleanup of map statuses after stages complete (vs. the cleanup which occurs later when stages or shuffles are fully cleaned up).
    
    I think you're right that this change carries unnecessary / not-fully-understood risks for now, so let me go ahead and put in an explicit increment here (with an updated comment / ref. to this discussion) in my next push to this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    **[Test build #76886 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76886/testReport)** for PR 17955 at commit [`a8069a3`](https://github.com/apache/spark/commit/a8069a3fb3edbff39786301d7572be6de1cd931c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17955
  
    **[Test build #76838 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76838/testReport)** for PR 17955 at commit [`e3da298`](https://github.com/apache/spark/commit/e3da298d59c764388ec6ca93ec23ba3eb8de96d3).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17955: [SPARK-20715] Store MapStatuses only in MapOutput...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17955#discussion_r116344085
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -495,106 +532,153 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf,
         None
       }
     
    -  def incrementEpoch() {
    +  private def incrementEpoch() {
         epochLock.synchronized {
           epoch += 1
           logDebug("Increasing epoch to " + epoch)
         }
       }
     
    -  private def removeBroadcast(bcast: Broadcast[_]): Unit = {
    -    if (null != bcast) {
    -      broadcastManager.unbroadcast(bcast.id,
    -        removeFromDriver = true, blocking = false)
    +  /** Called to get current epoch number. */
    +  def getEpoch: Long = {
    +    epochLock.synchronized {
    +      return epoch
         }
       }
     
    -  private def clearCachedBroadcast(): Unit = {
    -    for (cached <- cachedSerializedBroadcast) removeBroadcast(cached._2)
    -    cachedSerializedBroadcast.clear()
    -  }
    -
    -  def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
    -    var statuses: Array[MapStatus] = null
    -    var retBytes: Array[Byte] = null
    -    var epochGotten: Long = -1
    -
    -    // Check to see if we have a cached version, returns true if it does
    -    // and has side effect of setting retBytes.  If not returns false
    -    // with side effect of setting statuses
    -    def checkCachedStatuses(): Boolean = {
    -      epochLock.synchronized {
    -        if (epoch > cacheEpoch) {
    -          cachedSerializedStatuses.clear()
    -          clearCachedBroadcast()
    -          cacheEpoch = epoch
    -        }
    -        cachedSerializedStatuses.get(shuffleId) match {
    -          case Some(bytes) =>
    -            retBytes = bytes
    -            true
    -          case None =>
    -            logDebug("cached status not found for : " + shuffleId)
    -            statuses = mapStatuses.getOrElse(shuffleId, Array.empty[MapStatus])
    -            epochGotten = epoch
    -            false
    -        }
    -      }
    -    }
    -
    -    if (checkCachedStatuses()) return retBytes
    -    var shuffleIdLock = shuffleIdLocks.get(shuffleId)
    -    if (null == shuffleIdLock) {
    -      val newLock = new Object()
    -      // in general, this condition should be false - but good to be paranoid
    -      val prevLock = shuffleIdLocks.putIfAbsent(shuffleId, newLock)
    -      shuffleIdLock = if (null != prevLock) prevLock else newLock
    -    }
    -    // synchronize so we only serialize/broadcast it once since multiple threads call
    -    // in parallel
    -    shuffleIdLock.synchronized {
    -      // double check to make sure someone else didn't serialize and cache the same
    -      // mapstatus while we were waiting on the synchronize
    -      if (checkCachedStatuses()) return retBytes
    -
    -      // If we got here, we failed to find the serialized locations in the cache, so we pulled
    -      // out a snapshot of the locations as "statuses"; let's serialize and return that
    -      val (bytes, bcast) = MapOutputTracker.serializeMapStatuses(statuses, broadcastManager,
    -        isLocal, minSizeForBroadcast)
    -      logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
    -      // Add them into the table only if the epoch hasn't changed while we were working
    -      epochLock.synchronized {
    -        if (epoch == epochGotten) {
    -          cachedSerializedStatuses(shuffleId) = bytes
    -          if (null != bcast) cachedSerializedBroadcast(shuffleId) = bcast
    -        } else {
    -          logInfo("Epoch changed, not caching!")
    -          removeBroadcast(bcast)
    +  // This method is only called in local-mode.
    +  def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
    +      : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
    +    logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
    +    shuffleStatuses.get(shuffleId) match {
    +      case Some (shuffleStatus) =>
    +        shuffleStatus.withMapStatuses { statuses =>
    +          MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
             }
    -      }
    -      bytes
    +      case None =>
    +        Seq.empty
         }
       }
     
       override def stop() {
         mapOutputRequests.offer(PoisonPill)
         threadpool.shutdown()
         sendTracker(StopMapOutputTracker)
    -    mapStatuses.clear()
         trackerEndpoint = null
    -    cachedSerializedStatuses.clear()
    -    clearCachedBroadcast()
    -    shuffleIdLocks.clear()
    +    shuffleStatuses.clear()
       }
     }
     
     /**
    - * MapOutputTracker for the executors, which fetches map output information from the driver's
    - * MapOutputTrackerMaster.
    + * Executor-side client for fetching map output info from the driver's MapOutputTrackerMaster.
    + * Note that this is not used in local-mode; instead, local-mode Executors access the
    + * MapOutputTrackerMaster directly (which is possible because the master and worker share a comon
    + * superclass).
      */
     private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
    -  protected val mapStatuses: Map[Int, Array[MapStatus]] =
    +
    +  val mapStatuses: Map[Int, Array[MapStatus]] =
         new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
    +
    +  /** Remembers which map output locations are currently being fetched on an executor. */
    +  private val fetching = new HashSet[Int]
    +
    +  override def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
    +      : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
    +    logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
    +    val statuses = getStatuses(shuffleId)
    +    try {
    +      MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
    +    } catch {
    +      case e: MetadataFetchFailedException =>
    +        // We experienced a fetch failure so our mapStatuses cache is outdated; clear it:
    +        mapStatuses.clear()
    --- End diff --
    
    The idea here is to _locally_ clear the mapStatuses cache. In the old code the cache would be indirectly cleared after the FetchFailure is handled by the DAGScheduler and the epoch is incremented in the MapOutputTrackerMaster.
    
    The cache clearing still happens when the master sends us a higher epoch, but now the driver-side epoch is only bumped after outputs are actually lost (or missing outputs become available), not after every fetch failure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org