You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "tomzhu (JIRA)" <ji...@apache.org> on 2017/11/23 13:02:00 UTC
[jira] [Created] (SPARK-22593) submitMissingTask in DagScheduler
will call partitions function many times whch may be time consuming
tomzhu created SPARK-22593:
------------------------------
Summary: submitMissingTask in DagScheduler will call partitions function many times whch may be time consuming
Key: SPARK-22593
URL: https://issues.apache.org/jira/browse/SPARK-22593
Project: Spark
Issue Type: Question
Components: Spark Core
Affects Versions: 2.2.0
Reporter: tomzhu
Priority: Minor
when dagScheduler call submitMissing task, will create tasks and calling stage.rdd.partitions, it will can many times which may be time-consuming, the code is:
{quote}
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p) //here is a little time consuming.
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
}
{quote}
for example, for a parallelCollectionRdd with 3 slices or partitions, to create task, the code will call stage.rdd.partitions three times, since stage.rdd.partitions will call getPartitions, so getPartions will call three times, it is a little time-cousuming. the stage.rdd.partitions code :
{quote}
final def partitions: Array[Partition] = {
checkpointRDD.map(_.partitions).getOrElse {
if (partitions_ == null) {
partitions_ = getPartitions
partitions_.zipWithIndex.foreach { case (partition, index) =>
require(partition.index == index,
s"partitions($index).partition == ${partition.index}, but it should equal $index")
}
}
partitions_
}
}
{quote}
it would be better to avoid this.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org