You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2016/04/02 17:12:25 UTC
[jira] [Resolved] (SPARK-14178) DAGScheduler should get map output
statuses directly, not by
MapOutputTrackerMaster.getSerializedMapOutputStatuses.
[ https://issues.apache.org/jira/browse/SPARK-14178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-14178.
-------------------------------
Resolution: Won't Fix
Resolving per comments in PR. It's not clear this is valid.
> DAGScheduler should get map output statuses directly, not by MapOutputTrackerMaster.getSerializedMapOutputStatuses.
> -------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-14178
> URL: https://issues.apache.org/jira/browse/SPARK-14178
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Reporter: Guoqiang Li
>
> DAGScheduler gets map output statuses by {{MapOutputTrackerMaster.getSerializedMapOutputStatuses}}.
> [DAGScheduler.scala#L357 | https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L357]
> {noformat}
> private def newOrUsedShuffleStage(
> shuffleDep: ShuffleDependency[_, _, _],
> firstJobId: Int): ShuffleMapStage = {
> val rdd = shuffleDep.rdd
> val numTasks = rdd.partitions.length
> val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)
> if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
> val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
> // Deserialization very time consuming.
> val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
> (0 until locs.length).foreach { i =>
> if (locs(i) ne null) {
> // locs(i) will be null if missing
> stage.addOutputLoc(i, locs(i))
> }
> }
> } else {
> // Kind of ugly: need to register RDDs with the cache and map output tracker here
> // since we can't do it in the RDD constructor because # of partitions is unknown
> logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
> mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
> }
> stage
> }
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org