You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mingyu Kim <mk...@palantir.com> on 2014/07/15 19:55:24 UTC

How does Spark speculation prevent duplicated work?

Hi all,

I was curious about the details of Spark speculation. So, my understanding
is that, when ³speculated² tasks are newly scheduled on other machines, the
original tasks are still running until the entire stage completes. This
seems to leave some room for duplicated work because some spark actions are
not idempotent. For example, it may be counting a partition twice in case of
RDD.count or may be writing a partition to HDFS twice in case of
RDD.save*(). How does it prevent this kind of duplicated work?

Mingyu



Re: How does Spark speculation prevent duplicated work?

Posted by Mingyu Kim <mk...@palantir.com>.
That makes sense. Thanks everyone for the explanations!

Mingyu

From:  Matei Zaharia <ma...@gmail.com>
Reply-To:  "user@spark.apache.org" <us...@spark.apache.org>
Date:  Tuesday, July 15, 2014 at 3:00 PM
To:  "user@spark.apache.org" <us...@spark.apache.org>
Subject:  Re: How does Spark speculation prevent duplicated work?

Yeah, this is handled by the "commit" call of the FileOutputFormat. In
general Hadoop OutputFormats have a concept called "committing" the output,
which you should do only once per partition. In the file ones it does an
atomic rename to make sure that the final output is a complete file.

Matei

On Jul 15, 2014, at 2:49 PM, Tathagata Das <ta...@gmail.com>
wrote:

> The way the HDFS file writing works at a high level is that each attempt to
> write a partition to a file starts writing to unique temporary file (say,
> something like targetDirectory/_temp/part-XXXXX_attempt-YYYY). If the writing
> into the file successfully completes, then the temporary file is moved to the
> final location (say, targetDirectory/part-XXXXX). If, due to speculative
> execution, the file already exists in the final intended location, then move
> is avoided. Or, its overwritten, I forget the implementation. Either ways, all
> attempts to write the same partition, will always write the same data to the
> temp file (assuming the spark transformation generating the data is
> deterministic and idempotent). And once one attempt is successful, the final
> file will have the same data. Hence, writing to HDFS / S3 is idempotent.
> 
> Now this logic is already implemented within the Hadoop's MapReduce logic, and
> Spark just uses it directly.
> 
> TD
> 
> 
> On Tue, Jul 15, 2014 at 2:33 PM, Mingyu Kim <mk...@palantir.com> wrote:
>> Thanks for the explanation, guys.
>> 
>> I looked into the saveAsHadoopFile implementation a little bit. If you see
>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/sp
>> ark/rdd/PairRDDFunctions.scala
>> <https://urldefense.proofpoint.com/v1/url?u=https://github.com/apache/spark/b
>> lob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala&k=
>> fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=UKDOcu6qL3KsoZhpOohNBR1ucPNmWnbd3eEJ9hVUdMk
>> %3D%0A&m=Sb74h34ZToCtFlhH6q91HplG%2FXaCtRoAmwWFXD9vXI0%3D%0A&s=a68ed701b6f285
>> 5cc2fb0aaec8d033cd6ef9bafbb2a91ce7a10e465e79d0a4d2>  at line 843, the HDFS
>> write happens at per-partition processing, not at the result handling, so I
>> have a feeling that it might be writing multiple times. This may be fine if
>> both tasks for the same partition completes because it will simply overwrite
>> the output partition with the same content, but this could be an issue if one
>> of the tasks completes and the other is in the middle of writing the
>> partition by the time the entire stage completes. Can someone explain this?
>> 
>> Bertrand, I¹m slightly confused about your comment. So, is it the case that
>> HDFS will handle the writes as a temp file write followed by an atomic move,
>> so the concern I had above is handled at the HDFS level?
>> 
>> Mingyu
>> 
>> From: Bertrand Dechoux <de...@gmail.com>
>> Reply-To: "user@spark.apache.org" <us...@spark.apache.org>
>> Date: Tuesday, July 15, 2014 at 1:22 PM
>> To: "user@spark.apache.org" <us...@spark.apache.org>
>> Subject: Re: How does Spark speculation prevent duplicated work?
>> 
>> I haven't look at the implementation but what you would do with any
>> filesystem is write to a file inside the workspace directory of the task. And
>> then only the attempt of the task that should be kept will perform a move to
>> the final path. The other attempts are simply discarded. For most filesystem
>> (and that's the case for HDFS), a 'move' is a very simple and fast action
>> because only the "full path/name" of the file change but not its content or
>> where this content is physically stored.
>> 
>> Executive speculation happens in Hadoop MapReduce. Spark has the same
>> concept. As long as you apply functions with no side effect (ie the only
>> impact is the returned results), then you just need to not take into account
>> results from additional attempts of the same task/operator.
>> 
>> Bertrand Dechoux
>> 
>> 
>> On Tue, Jul 15, 2014 at 9:34 PM, Andrew Ash <an...@andrewash.com> wrote:
>>> Hi Nan, 
>>> 
>>> Great digging in -- that makes sense to me for when a job is producing some
>>> output handled by Spark like a .count or .distinct or similar.
>>> 
>>> For the other part of the question, I'm also interested in side effects like
>>> an HDFS disk write.  If one task is writing to an HDFS path and another task
>>> starts up, wouldn't it also attempt to write to the same path?  How is that
>>> de-conflicted?
>>> 
>>> 
>>> On Tue, Jul 15, 2014 at 3:02 PM, Nan Zhu <zh...@gmail.com> wrote:
>>>> Hi, Mingyuan, 
>>>> 
>>>> According to my understanding,
>>>> 
>>>> Spark processes the result generated from each partition by passing them to
>>>> resultHandler (SparkContext.scala L1056)
>>>> 
>>>> This resultHandler is usually just put the result in a driver-side array,
>>>> the length of which is always partitions.size
>>>> 
>>>> this design effectively ensures that the actions are idempotent
>>>> 
>>>> e.g. the count is implemented as
>>>> 
>>>> def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
>>>> 
>>>> even the task in the partition is duplicately executed, the result put in
>>>> the array is the same
>>>> 
>>>> 
>>>> 
>>>> At the same time, I think the Spark implementation ensures that the
>>>> operation applied on the return value of SparkContext.runJob will not be
>>>> triggered when the duplicate tasks are finished
>>>> 
>>>> Because, 
>>>> 
>>>> 
>>>> when a task is finished, the code execution path is
>>>> TaskSetManager.handleSuccessfulTask -> DAGScheduler.taskEnded
>>>> 
>>>> in taskEnded, it will trigger the CompletionEvent message handler, where
>>>> DAGScheduler will check if (!job.finished(rt.outputid)) and rt.outputid is
>>>> the partitionid
>>>> 
>>>> so even the duplicate task invokes a CompletionEvent message, it will find
>>>> job.finished(rt.outputId) has been true eventually
>>>> 
>>>> 
>>>> Maybe I was wrongŠjust went through the code roughly, welcome to correct me
>>>> 
>>>> Best,
>>>> 
>>>> 
>>>> -- 
>>>> Nan Zhu
>>>> 
>>>> On Tuesday, July 15, 2014 at 1:55 PM, Mingyu Kim wrote:
>>>>> 
>>>>> Hi all,
>>>>> 
>>>>> I was curious about the details of Spark speculation. So, my understanding
>>>>> is that, when ³speculated² tasks are newly scheduled on other machines,
>>>>> the original tasks are still running until the entire stage completes.
>>>>> This seems to leave some room for duplicated work because some spark
>>>>> actions are not idempotent. For example, it may be counting a partition
>>>>> twice in case of RDD.count or may be writing a partition to HDFS twice in
>>>>> case of RDD.save*(). How does it prevent this kind of duplicated work?
>>>>> 
>>>>> Mingyu
>>>>> 
>>>>> Attachments:
>>>>> - smime.p7s
>>>> 
>>> 
>> 
> 




Re: How does Spark speculation prevent duplicated work?

Posted by Matei Zaharia <ma...@gmail.com>.
Yeah, this is handled by the "commit" call of the FileOutputFormat. In general Hadoop OutputFormats have a concept called "committing" the output, which you should do only once per partition. In the file ones it does an atomic rename to make sure that the final output is a complete file.

Matei

On Jul 15, 2014, at 2:49 PM, Tathagata Das <ta...@gmail.com> wrote:

> The way the HDFS file writing works at a high level is that each attempt to write a partition to a file starts writing to unique temporary file (say, something like targetDirectory/_temp/part-XXXXX_attempt-YYYY). If the writing into the file successfully completes, then the temporary file is moved to the final location (say, targetDirectory/part-XXXXX). If, due to speculative execution, the file already exists in the final intended location, then move is avoided. Or, its overwritten, I forget the implementation. Either ways, all attempts to write the same partition, will always write the same data to the temp file (assuming the spark transformation generating the data is deterministic and idempotent). And once one attempt is successful, the final file will have the same data. Hence, writing to HDFS / S3 is idempotent. 
> 
> Now this logic is already implemented within the Hadoop's MapReduce logic, and Spark just uses it directly. 
> 
> TD
> 
> 
> On Tue, Jul 15, 2014 at 2:33 PM, Mingyu Kim <mk...@palantir.com> wrote:
> Thanks for the explanation, guys.
> 
> I looked into the saveAsHadoopFile implementation a little bit. If you see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala at line 843, the HDFS write happens at per-partition processing, not at the result handling, so I have a feeling that it might be writing multiple times. This may be fine if both tasks for the same partition completes because it will simply overwrite the output partition with the same content, but this could be an issue if one of the tasks completes and the other is in the middle of writing the partition by the time the entire stage completes. Can someone explain this?
> 
> Bertrand, I’m slightly confused about your comment. So, is it the case that HDFS will handle the writes as a temp file write followed by an atomic move, so the concern I had above is handled at the HDFS level?
> 
> Mingyu
> 
> From: Bertrand Dechoux <de...@gmail.com>
> Reply-To: "user@spark.apache.org" <us...@spark.apache.org>
> Date: Tuesday, July 15, 2014 at 1:22 PM
> To: "user@spark.apache.org" <us...@spark.apache.org>
> Subject: Re: How does Spark speculation prevent duplicated work?
> 
> I haven't look at the implementation but what you would do with any filesystem is write to a file inside the workspace directory of the task. And then only the attempt of the task that should be kept will perform a move to the final path. The other attempts are simply discarded. For most filesystem (and that's the case for HDFS), a 'move' is a very simple and fast action because only the "full path/name" of the file change but not its content or where this content is physically stored.
> 
> Executive speculation happens in Hadoop MapReduce. Spark has the same concept. As long as you apply functions with no side effect (ie the only impact is the returned results), then you just need to not take into account results from additional attempts of the same task/operator. 
> 
> Bertrand Dechoux
> 
> 
> On Tue, Jul 15, 2014 at 9:34 PM, Andrew Ash <an...@andrewash.com> wrote:
> Hi Nan,
> 
> Great digging in -- that makes sense to me for when a job is producing some output handled by Spark like a .count or .distinct or similar.
> 
> For the other part of the question, I'm also interested in side effects like an HDFS disk write.  If one task is writing to an HDFS path and another task starts up, wouldn't it also attempt to write to the same path?  How is that de-conflicted?
> 
> 
> On Tue, Jul 15, 2014 at 3:02 PM, Nan Zhu <zh...@gmail.com> wrote:
> Hi, Mingyuan, 
> 
> According to my understanding, 
> 
> Spark processes the result generated from each partition by passing them to resultHandler (SparkContext.scala L1056)
> 
> This resultHandler is usually just put the result in a driver-side array, the length of which is always partitions.size
> 
> this design effectively ensures that the actions are idempotent 
> 
> e.g. the count is implemented as 
> 
> def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
> 
> even the task in the partition is duplicately executed, the result put in the array is the same
> 
> 
> 
> At the same time, I think the Spark implementation ensures that the operation applied on the return value of SparkContext.runJob will not be triggered when the duplicate tasks are finished
> 
> Because, 
> 
> 
> when a task is finished, the code execution path is TaskSetManager.handleSuccessfulTask -> DAGScheduler.taskEnded 
> 
> in taskEnded, it will trigger the CompletionEvent message handler, where DAGScheduler will check if (!job.finished(rt.outputid)) and rt.outputid is the partitionid
> 
> so even the duplicate task invokes a CompletionEvent message, it will find job.finished(rt.outputId) has been true eventually
> 
> 
> Maybe I was wrong…just went through the code roughly, welcome to correct me
> 
> Best,
> 
> 
> -- 
> Nan Zhu
> 
> On Tuesday, July 15, 2014 at 1:55 PM, Mingyu Kim wrote:
> 
>> Hi all,
>> 
>> I was curious about the details of Spark speculation. So, my understanding is that, when “speculated” tasks are newly scheduled on other machines, the original tasks are still running until the entire stage completes. This seems to leave some room for duplicated work because some spark actions are not idempotent. For example, it may be counting a partition twice in case of RDD.count or may be writing a partition to HDFS twice in case of RDD.save*(). How does it prevent this kind of duplicated work?
>> 
>> Mingyu
>> 
>> Attachments:
>> - smime.p7s
> 
> 
> 
> 


Re: How does Spark speculation prevent duplicated work?

Posted by Tathagata Das <ta...@gmail.com>.
The way the HDFS file writing works at a high level is that each attempt to
write a partition to a file starts writing to unique temporary file (say,
something like targetDirectory/_temp/part-XXXXX_attempt-YYYY). If the
writing into the file successfully completes, then the temporary file is
moved to the final location (say, targetDirectory/part-XXXXX). If, due to
speculative execution, the file already exists in the final intended
location, then move is avoided. Or, its overwritten, I forget the
implementation. Either ways, all attempts to write the same partition, will
always write the same data to the temp file (assuming the spark
transformation generating the data is deterministic and idempotent). And
once one attempt is successful, the final file will have the same data.
Hence, writing to HDFS / S3 is idempotent.

Now this logic is already implemented within the Hadoop's MapReduce logic,
and Spark just uses it directly.

TD


On Tue, Jul 15, 2014 at 2:33 PM, Mingyu Kim <mk...@palantir.com> wrote:

> Thanks for the explanation, guys.
>
> I looked into the saveAsHadoopFile implementation a little bit. If you see
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala at
> line 843, the HDFS write happens at per-partition processing, not at the
> result handling, so I have a feeling that it might be writing multiple
> times. This may be fine if both tasks for the same partition completes
> because it will simply overwrite the output partition with the same
> content, but this could be an issue if one of the tasks completes and the
> other is in the middle of writing the partition by the time the entire
> stage completes. Can someone explain this?
>
> Bertrand, I’m slightly confused about your comment. So, is it the case
> that HDFS will handle the writes as a temp file write followed by an atomic
> move, so the concern I had above is handled at the HDFS level?
>
> Mingyu
>
> From: Bertrand Dechoux <de...@gmail.com>
> Reply-To: "user@spark.apache.org" <us...@spark.apache.org>
> Date: Tuesday, July 15, 2014 at 1:22 PM
> To: "user@spark.apache.org" <us...@spark.apache.org>
> Subject: Re: How does Spark speculation prevent duplicated work?
>
> I haven't look at the implementation but what you would do with any
> filesystem is write to a file inside the workspace directory of the task.
> And then only the attempt of the task that should be kept will perform a
> move to the final path. The other attempts are simply discarded. For most
> filesystem (and that's the case for HDFS), a 'move' is a very simple and
> fast action because only the "full path/name" of the file change but not
> its content or where this content is physically stored.
>
> Executive speculation happens in Hadoop MapReduce. Spark has the same
> concept. As long as you apply functions with no side effect (ie the only
> impact is the returned results), then you just need to not take into
> account results from additional attempts of the same task/operator.
>
> Bertrand Dechoux
>
>
> On Tue, Jul 15, 2014 at 9:34 PM, Andrew Ash <an...@andrewash.com> wrote:
>
>> Hi Nan,
>>
>> Great digging in -- that makes sense to me for when a job is producing
>> some output handled by Spark like a .count or .distinct or similar.
>>
>> For the other part of the question, I'm also interested in side effects
>> like an HDFS disk write.  If one task is writing to an HDFS path and
>> another task starts up, wouldn't it also attempt to write to the same path?
>>  How is that de-conflicted?
>>
>>
>> On Tue, Jul 15, 2014 at 3:02 PM, Nan Zhu <zh...@gmail.com> wrote:
>>
>>> Hi, Mingyuan,
>>>
>>> According to my understanding,
>>>
>>> Spark processes the result generated from each partition by passing them
>>> to resultHandler (SparkContext.scala L1056)
>>>
>>> This resultHandler is usually just put the result in a driver-side
>>> array, the length of which is always partitions.size
>>>
>>> this design effectively ensures that the actions are idempotent
>>>
>>> e.g. the count is implemented as
>>>
>>> def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
>>>
>>> even the task in the partition is duplicately executed, the result put
>>> in the array is the same
>>>
>>>
>>>
>>> At the same time, I think the Spark implementation ensures that the
>>> operation applied on the return value of SparkContext.runJob will not be
>>> triggered when the duplicate tasks are finished
>>>
>>> Because,
>>>
>>>
>>> when a task is finished, the code execution path is
>>> TaskSetManager.handleSuccessfulTask -> DAGScheduler.taskEnded
>>>
>>> in taskEnded, it will trigger the CompletionEvent message handler, where DAGScheduler
>>> will check if (!job.finished(rt.outputid)) and rt.outputid is the
>>> partitionid
>>>
>>> so even the duplicate task invokes a CompletionEvent message, it will
>>> find job.finished(rt.outputId) has been true eventually
>>>
>>>
>>> Maybe I was wrong…just went through the code roughly, welcome to correct
>>> me
>>>
>>> Best,
>>>
>>>
>>> --
>>> Nan Zhu
>>>
>>> On Tuesday, July 15, 2014 at 1:55 PM, Mingyu Kim wrote:
>>>
>>> Hi all,
>>>
>>> I was curious about the details of Spark speculation. So, my
>>> understanding is that, when “speculated” tasks are newly scheduled on other
>>> machines, the original tasks are still running until the entire stage
>>> completes. This seems to leave some room for duplicated work because some
>>> spark actions are not idempotent. For example, it may be counting a
>>> partition twice in case of RDD.count or may be writing a partition to HDFS
>>> twice in case of RDD.save*(). How does it prevent this kind of duplicated
>>> work?
>>>
>>> Mingyu
>>>
>>> Attachments:
>>> - smime.p7s
>>>
>>>
>>>
>>
>

Re: How does Spark speculation prevent duplicated work?

Posted by Mingyu Kim <mk...@palantir.com>.
Thanks for the explanation, guys.

I looked into the saveAsHadoopFile implementation a little bit. If you see
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/s
park/rdd/PairRDDFunctions.scala at line 843, the HDFS write happens at
per-partition processing, not at the result handling, so I have a feeling
that it might be writing multiple times. This may be fine if both tasks for
the same partition completes because it will simply overwrite the output
partition with the same content, but this could be an issue if one of the
tasks completes and the other is in the middle of writing the partition by
the time the entire stage completes. Can someone explain this?

Bertrand, I¹m slightly confused about your comment. So, is it the case that
HDFS will handle the writes as a temp file write followed by an atomic move,
so the concern I had above is handled at the HDFS level?

Mingyu

From:  Bertrand Dechoux <de...@gmail.com>
Reply-To:  "user@spark.apache.org" <us...@spark.apache.org>
Date:  Tuesday, July 15, 2014 at 1:22 PM
To:  "user@spark.apache.org" <us...@spark.apache.org>
Subject:  Re: How does Spark speculation prevent duplicated work?

I haven't look at the implementation but what you would do with any
filesystem is write to a file inside the workspace directory of the task.
And then only the attempt of the task that should be kept will perform a
move to the final path. The other attempts are simply discarded. For most
filesystem (and that's the case for HDFS), a 'move' is a very simple and
fast action because only the "full path/name" of the file change but not its
content or where this content is physically stored.

Executive speculation happens in Hadoop MapReduce. Spark has the same
concept. As long as you apply functions with no side effect (ie the only
impact is the returned results), then you just need to not take into account
results from additional attempts of the same task/operator.

Bertrand Dechoux


On Tue, Jul 15, 2014 at 9:34 PM, Andrew Ash <an...@andrewash.com> wrote:
> Hi Nan, 
> 
> Great digging in -- that makes sense to me for when a job is producing some
> output handled by Spark like a .count or .distinct or similar.
> 
> For the other part of the question, I'm also interested in side effects like
> an HDFS disk write.  If one task is writing to an HDFS path and another task
> starts up, wouldn't it also attempt to write to the same path?  How is that
> de-conflicted?
> 
> 
> On Tue, Jul 15, 2014 at 3:02 PM, Nan Zhu <zh...@gmail.com> wrote:
>> Hi, Mingyuan,  
>> 
>> According to my understanding,
>> 
>> Spark processes the result generated from each partition by passing them to
>> resultHandler (SparkContext.scala L1056)
>> 
>> This resultHandler is usually just put the result in a driver-side array, the
>> length of which is always partitions.size
>> 
>> this design effectively ensures that the actions are idempotent
>> 
>> e.g. the count is implemented as
>> 
>> def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
>> 
>> even the task in the partition is duplicately executed, the result put in the
>> array is the same
>> 
>> 
>> 
>> At the same time, I think the Spark implementation ensures that the operation
>> applied on the return value of SparkContext.runJob will not be triggered when
>> the duplicate tasks are finished
>> 
>> Because, 
>> 
>> 
>> when a task is finished, the code execution path is
>> TaskSetManager.handleSuccessfulTask -> DAGScheduler.taskEnded
>> 
>> in taskEnded, it will trigger the CompletionEvent message handler, where
>> DAGScheduler will check if (!job.finished(rt.outputid)) and rt.outputid is
>> the partitionid
>> 
>> so even the duplicate task invokes a CompletionEvent message, it will find
>> job.finished(rt.outputId) has been true eventually
>> 
>> 
>> Maybe I was wrongŠjust went through the code roughly, welcome to correct me
>> 
>> Best,
>> 
>> 
>> -- 
>> Nan Zhu
>> 
>> On Tuesday, July 15, 2014 at 1:55 PM, Mingyu Kim wrote:
>>> 
>>> Hi all,
>>> 
>>> I was curious about the details of Spark speculation. So, my understanding
>>> is that, when ³speculated² tasks are newly scheduled on other machines, the
>>> original tasks are still running until the entire stage completes. This
>>> seems to leave some room for duplicated work because some spark actions are
>>> not idempotent. For example, it may be counting a partition twice in case of
>>> RDD.count or may be writing a partition to HDFS twice in case of
>>> RDD.save*(). How does it prevent this kind of duplicated work?
>>> 
>>> Mingyu
>>> 
>>> Attachments:
>>> - smime.p7s
>> 
> 




Re: How does Spark speculation prevent duplicated work?

Posted by Bertrand Dechoux <de...@gmail.com>.
I haven't look at the implementation but what you would do with any
filesystem is write to a file inside the workspace directory of the task.
And then only the attempt of the task that should be kept will perform a
move to the final path. The other attempts are simply discarded. For most
filesystem (and that's the case for HDFS), a 'move' is a very simple and
fast action because only the "full path/name" of the file change but not
its content or where this content is physically stored.

Executive speculation happens in Hadoop MapReduce. Spark has the same
concept. As long as you apply functions with no side effect (ie the only
impact is the returned results), then you just need to not take into
account results from additional attempts of the same task/operator.

Bertrand Dechoux


On Tue, Jul 15, 2014 at 9:34 PM, Andrew Ash <an...@andrewash.com> wrote:

> Hi Nan,
>
> Great digging in -- that makes sense to me for when a job is producing
> some output handled by Spark like a .count or .distinct or similar.
>
> For the other part of the question, I'm also interested in side effects
> like an HDFS disk write.  If one task is writing to an HDFS path and
> another task starts up, wouldn't it also attempt to write to the same path?
>  How is that de-conflicted?
>
>
> On Tue, Jul 15, 2014 at 3:02 PM, Nan Zhu <zh...@gmail.com> wrote:
>
>>  Hi, Mingyuan,
>>
>> According to my understanding,
>>
>> Spark processes the result generated from each partition by passing them
>> to resultHandler (SparkContext.scala L1056)
>>
>> This resultHandler is usually just put the result in a driver-side array,
>> the length of which is always partitions.size
>>
>> this design effectively ensures that the actions are idempotent
>>
>> e.g. the count is implemented as
>>
>> def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
>>
>> even the task in the partition is duplicately executed, the result put in
>> the array is the same
>>
>>
>>
>> At the same time, I think the Spark implementation ensures that the
>> operation applied on the return value of SparkContext.runJob will not be
>> triggered when the duplicate tasks are finished
>>
>> Because,
>>
>>
>> when a task is finished, the code execution path is
>> TaskSetManager.handleSuccessfulTask -> DAGScheduler.taskEnded
>>
>> in taskEnded, it will trigger the CompletionEvent message handler, where DAGScheduler
>> will check if (!job.finished(rt.outputid)) and rt.outputid is the
>> partitionid
>>
>> so even the duplicate task invokes a CompletionEvent message, it will
>> find job.finished(rt.outputId) has been true eventually
>>
>>
>> Maybe I was wrong…just went through the code roughly, welcome to correct
>> me
>>
>> Best,
>>
>>
>> --
>> Nan Zhu
>>
>> On Tuesday, July 15, 2014 at 1:55 PM, Mingyu Kim wrote:
>>
>> Hi all,
>>
>> I was curious about the details of Spark speculation. So, my
>> understanding is that, when “speculated” tasks are newly scheduled on other
>> machines, the original tasks are still running until the entire stage
>> completes. This seems to leave some room for duplicated work because some
>> spark actions are not idempotent. For example, it may be counting a
>> partition twice in case of RDD.count or may be writing a partition to HDFS
>> twice in case of RDD.save*(). How does it prevent this kind of duplicated
>> work?
>>
>> Mingyu
>>
>> Attachments:
>>  - smime.p7s
>>
>>
>>
>

Re: How does Spark speculation prevent duplicated work?

Posted by Andrew Ash <an...@andrewash.com>.
Hi Nan,

Great digging in -- that makes sense to me for when a job is producing some
output handled by Spark like a .count or .distinct or similar.

For the other part of the question, I'm also interested in side effects
like an HDFS disk write.  If one task is writing to an HDFS path and
another task starts up, wouldn't it also attempt to write to the same path?
 How is that de-conflicted?


On Tue, Jul 15, 2014 at 3:02 PM, Nan Zhu <zh...@gmail.com> wrote:

>  Hi, Mingyuan,
>
> According to my understanding,
>
> Spark processes the result generated from each partition by passing them
> to resultHandler (SparkContext.scala L1056)
>
> This resultHandler is usually just put the result in a driver-side array,
> the length of which is always partitions.size
>
> this design effectively ensures that the actions are idempotent
>
> e.g. the count is implemented as
>
> def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
>
> even the task in the partition is duplicately executed, the result put in
> the array is the same
>
>
>
> At the same time, I think the Spark implementation ensures that the
> operation applied on the return value of SparkContext.runJob will not be
> triggered when the duplicate tasks are finished
>
> Because,
>
>
> when a task is finished, the code execution path is
> TaskSetManager.handleSuccessfulTask -> DAGScheduler.taskEnded
>
> in taskEnded, it will trigger the CompletionEvent message handler, where DAGScheduler
> will check if (!job.finished(rt.outputid)) and rt.outputid is the
> partitionid
>
> so even the duplicate task invokes a CompletionEvent message, it will find
> job.finished(rt.outputId) has been true eventually
>
>
> Maybe I was wrong…just went through the code roughly, welcome to correct me
>
> Best,
>
>
> --
> Nan Zhu
>
> On Tuesday, July 15, 2014 at 1:55 PM, Mingyu Kim wrote:
>
> Hi all,
>
> I was curious about the details of Spark speculation. So, my understanding
> is that, when “speculated” tasks are newly scheduled on other machines, the
> original tasks are still running until the entire stage completes. This
> seems to leave some room for duplicated work because some spark actions are
> not idempotent. For example, it may be counting a partition twice in case
> of RDD.count or may be writing a partition to HDFS twice in case of
> RDD.save*(). How does it prevent this kind of duplicated work?
>
> Mingyu
>
> Attachments:
>  - smime.p7s
>
>
>

Re: How does Spark speculation prevent duplicated work?

Posted by Nan Zhu <zh...@gmail.com>.
Hi, Mingyuan,   

According to my understanding,  

Spark processes the result generated from each partition by passing them to resultHandler (SparkContext.scala L1056)

This resultHandler is usually just put the result in a driver-side array, the length of which is always partitions.size

this design effectively ensures that the actions are idempotent  

e.g. the count is implemented as  

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

even the task in the partition is duplicately executed, the result put in the array is the same



At the same time, I think the Spark implementation ensures that the operation applied on the return value of SparkContext.runJob will not be triggered when the duplicate tasks are finished

Because,  


when a task is finished, the code execution path is TaskSetManager.handleSuccessfulTask -> DAGScheduler.taskEnded  

in taskEnded, it will trigger the CompletionEvent message handler, where DAGScheduler will check if (!job.finished(rt.outputid)) and rt.outputid is the partitionid

so even the duplicate task invokes a CompletionEvent message, it will find job.finished(rt.outputId) has been true eventually


Maybe I was wrong…just went through the code roughly, welcome to correct me

Best,


--  
Nan Zhu


On Tuesday, July 15, 2014 at 1:55 PM, Mingyu Kim wrote:

> Hi all,
>  
> I was curious about the details of Spark speculation. So, my understanding is that, when “speculated” tasks are newly scheduled on other machines, the original tasks are still running until the entire stage completes. This seems to leave some room for duplicated work because some spark actions are not idempotent. For example, it may be counting a partition twice in case of RDD.count or may be writing a partition to HDFS twice in case of RDD.save*(). How does it prevent this kind of duplicated work?
>  
> Mingyu  
>  
>  
> Attachments:  
> - smime.p7s
>