You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Reza Safi (JIRA)" <ji...@apache.org> on 2017/09/28 22:42:00 UTC

[jira] [Commented] (SPARK-22162) Executors and the driver use inconsistent Job IDs during the new RDD commit protocol

    [ https://issues.apache.org/jira/browse/SPARK-22162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16185036#comment-16185036 ] 

Reza Safi commented on SPARK-22162:
-----------------------------------

I will send a pull request shortly for this issue.

> Executors and the driver use inconsistent Job IDs during the new RDD commit protocol
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-22162
>                 URL: https://issues.apache.org/jira/browse/SPARK-22162
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0, 2.3.0
>            Reporter: Reza Safi
>
> After SPARK-18191 commit in pull request 15769, using the new commit protocol it is possible that driver and executors uses different jobIds during a rdd commit.
> In the old code, the variable stageId is part of the closure used to define the task as you can see here:
>  [https://github.com/apache/spark/blob/9c8deef64efee20a0ddc9b612f90e77c80aede60/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L1098]
> As a result, a TaskAttemptId is constructed in executors using the same "stageId" as the driver, since it is a value that is serialized in the driver. Also the value of stageID is actually the rdd.id which is assigned here: [https://github.com/apache/spark/blob/9c8deef64efee20a0ddc9b612f90e77c80aede60/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L1084]
> However, after the change in pull request 15769, the value is no longer part of the task closure, which gets serialized by the driver. Instead, it is pulled from the taskContext as you can see here:[https://github.com/apache/spark/pull/15769/files#diff-dff185cb90c666bce445e3212a21d765R103]
> and then that value is used to construct the TaskAttemptId on the executors: [https://github.com/apache/spark/pull/15769/files#diff-dff185cb90c666bce445e3212a21d765R134]
> taskContext has a stageID value which will be set in DAGScheduler. So after the change unlike the old code which a rdd.id was used, an actual stage.id is used which can be different between executors and the driver since it is no longer serialized.
> In summary, the old code consistently used rddId, and just incorrectly named it "stageId".
> The new code uses a mix of rddId and stageId. There should be a consistent ID between executors and the drivers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org