You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by chandan prakash <ch...@gmail.com> on 2018/08/11 16:33:35 UTC

[Structured Streaming SPARK-23966] Why non-atomic rename is problem in State Store ?

Hi All,
I was going through this pull request about new CheckpointFileManager
abstraction in structured streaming coming in 2.4 :
https://issues.apache.org/jira/browse/SPARK-23966
https://github.com/apache/spark/pull/21048

I went through the code in detail and found it will indtroduce a very nice
abstraction which is much cleaner and extensible for Direct Writes File
System like S3 (in addition to current HDFS file system).

*But I am unable to understand, is it really solving some problem in
exsisting State Store code which is currently  existing in Spark 2.3 ? *

*My questions related to above statements in State Store code : *
 *PR description*:: "Checkpoint files must be written atomically such that *no
partial files are generated*.
*QUESTION*: When are partial files generated in current code ?  I can see
that data is first written to temp-delta file and then renamed to
version.delta file. If something bad happens, the task will fail due to
thrown exception and abort() will be called on store to close and delete
tempDeltaFileStream . I think it is quite clean, what is the case that
partial files might be generated ?

 *PR description*:: *State Store behavior is incorrect - HDFS FileSystem
implementation does not have atomic rename*"
*QUESTION*:  Hdfs filesystem rename operation is atomic, I think above line
takes into account about checking existing file if exists and then taking
appropriate action which together makes the file renaming operation
multi-steps and hence non-atomic. But why this behaviour is incorrect ?
Even if multiple executors try to write to the same version.delta file,
only 1st of them will succeed, the second one will see the file exists and
will delete its temp-delta file. Looks good .

Anything I am missing here?
Really curious to know which corner cases we are trying to solve by this
new pull request ?

Regards,
Chandan

Re: [Structured Streaming SPARK-23966] Why non-atomic rename is problem in State Store ?

Posted by Jungtaek Lim <ka...@gmail.com>.
Removing user@ since cross-posting multiple mailing lists are considered as
not-good practice.

My knowledge is based on the codebase after SPARK-23966, so I'm reading
SPARK-23966 back and try to explain what I can see in the patch. Anyone
please correct me if I'm missing here.

You may want to note that abort() doesn't remove final delta file for 2.3:
assuming rename is not atomic operation, if the task is committing the file
and if it fails when in the middle of renaming, partial file of delta could
be left.

And commitUpdates() skips writing temporary file to delta, hence when
partial file is left, both speculative task and task in retrying batch
could skip writing and mark as successful, result in partial delta being
considered for correct delta file.

Does it make sense?

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 9월 30일 (일) 오후 4:51, chandan prakash <ch...@gmail.com>님이 작성:

> Anyone who can clear doubts on the questions asked here   ?
>
> Regards,
> Chandan
>
>
> On Sat, Aug 11, 2018 at 10:03 PM chandan prakash <
> chandanbaranwal@gmail.com> wrote:
>
>> Hi All,
>> I was going through this pull request about new CheckpointFileManager
>> abstraction in structured streaming coming in 2.4 :
>> https://issues.apache.org/jira/browse/SPARK-23966
>> https://github.com/apache/spark/pull/21048
>>
>> I went through the code in detail and found it will indtroduce a very
>> nice abstraction which is much cleaner and extensible for Direct Writes
>> File System like S3 (in addition to current HDFS file system).
>>
>> *But I am unable to understand, is it really solving some problem in
>> exsisting State Store code which is currently  existing in Spark 2.3 ? *
>>
>> *My questions related to above statements in State Store code : *
>>  *PR description*:: "Checkpoint files must be written atomically such
>> that *no partial files are generated*.
>> *QUESTION*: When are partial files generated in current code ?  I can
>> see that data is first written to temp-delta file and then renamed to
>> version.delta file. If something bad happens, the task will fail due to
>> thrown exception and abort() will be called on store to close and delete
>> tempDeltaFileStream . I think it is quite clean, what is the case that
>> partial files might be generated ?
>>
>>  *PR description*:: *State Store behavior is incorrect - HDFS FileSystem
>> implementation does not have atomic rename*"
>> *QUESTION*:  Hdfs filesystem rename operation is atomic, I think above
>> line takes into account about checking existing file if exists and then
>> taking appropriate action which together makes the file renaming operation
>> multi-steps and hence non-atomic. But why this behaviour is incorrect ?
>> Even if multiple executors try to write to the same version.delta file,
>> only 1st of them will succeed, the second one will see the file exists and
>> will delete its temp-delta file. Looks good .
>>
>> Anything I am missing here?
>> Really curious to know which corner cases we are trying to solve by this
>> new pull request ?
>>
>> Regards,
>> Chandan
>>
>>
>>
>>
>>
>
> --
> Chandan Prakash
>
>

Re: [Structured Streaming SPARK-23966] Why non-atomic rename is problem in State Store ?

Posted by chandan prakash <ch...@gmail.com>.
Anyone who can clear doubts on the questions asked here   ?

Regards,
Chandan

On Sat, Aug 11, 2018 at 10:03 PM chandan prakash <ch...@gmail.com>
wrote:

> Hi All,
> I was going through this pull request about new CheckpointFileManager
> abstraction in structured streaming coming in 2.4 :
> https://issues.apache.org/jira/browse/SPARK-23966
> https://github.com/apache/spark/pull/21048
>
> I went through the code in detail and found it will indtroduce a very nice
> abstraction which is much cleaner and extensible for Direct Writes File
> System like S3 (in addition to current HDFS file system).
>
> *But I am unable to understand, is it really solving some problem in
> exsisting State Store code which is currently  existing in Spark 2.3 ? *
>
> *My questions related to above statements in State Store code : *
>  *PR description*:: "Checkpoint files must be written atomically such
> that *no partial files are generated*.
> *QUESTION*: When are partial files generated in current code ?  I can see
> that data is first written to temp-delta file and then renamed to
> version.delta file. If something bad happens, the task will fail due to
> thrown exception and abort() will be called on store to close and delete
> tempDeltaFileStream . I think it is quite clean, what is the case that
> partial files might be generated ?
>
>  *PR description*:: *State Store behavior is incorrect - HDFS FileSystem
> implementation does not have atomic rename*"
> *QUESTION*:  Hdfs filesystem rename operation is atomic, I think above
> line takes into account about checking existing file if exists and then
> taking appropriate action which together makes the file renaming operation
> multi-steps and hence non-atomic. But why this behaviour is incorrect ?
> Even if multiple executors try to write to the same version.delta file,
> only 1st of them will succeed, the second one will see the file exists and
> will delete its temp-delta file. Looks good .
>
> Anything I am missing here?
> Really curious to know which corner cases we are trying to solve by this
> new pull request ?
>
> Regards,
> Chandan
>
>
>
>
>

-- 
Chandan Prakash

Re: [Structured Streaming SPARK-23966] Why non-atomic rename is problem in State Store ?

Posted by chandan prakash <ch...@gmail.com>.
Anyone who can clear doubts on the questions asked here   ?

Regards,
Chandan

On Sat, Aug 11, 2018 at 10:03 PM chandan prakash <ch...@gmail.com>
wrote:

> Hi All,
> I was going through this pull request about new CheckpointFileManager
> abstraction in structured streaming coming in 2.4 :
> https://issues.apache.org/jira/browse/SPARK-23966
> https://github.com/apache/spark/pull/21048
>
> I went through the code in detail and found it will indtroduce a very nice
> abstraction which is much cleaner and extensible for Direct Writes File
> System like S3 (in addition to current HDFS file system).
>
> *But I am unable to understand, is it really solving some problem in
> exsisting State Store code which is currently  existing in Spark 2.3 ? *
>
> *My questions related to above statements in State Store code : *
>  *PR description*:: "Checkpoint files must be written atomically such
> that *no partial files are generated*.
> *QUESTION*: When are partial files generated in current code ?  I can see
> that data is first written to temp-delta file and then renamed to
> version.delta file. If something bad happens, the task will fail due to
> thrown exception and abort() will be called on store to close and delete
> tempDeltaFileStream . I think it is quite clean, what is the case that
> partial files might be generated ?
>
>  *PR description*:: *State Store behavior is incorrect - HDFS FileSystem
> implementation does not have atomic rename*"
> *QUESTION*:  Hdfs filesystem rename operation is atomic, I think above
> line takes into account about checking existing file if exists and then
> taking appropriate action which together makes the file renaming operation
> multi-steps and hence non-atomic. But why this behaviour is incorrect ?
> Even if multiple executors try to write to the same version.delta file,
> only 1st of them will succeed, the second one will see the file exists and
> will delete its temp-delta file. Looks good .
>
> Anything I am missing here?
> Really curious to know which corner cases we are trying to solve by this
> new pull request ?
>
> Regards,
> Chandan
>
>
>
>
>

-- 
Chandan Prakash

Re: [Structured Streaming SPARK-23966] Why non-atomic rename is problem in State Store ?

Posted by Jungtaek Lim <ka...@gmail.com>.
Thanks Steve to answer in detail. I was under same feeling with Chandan
from the line as well: it was against my knowledge as rename operation
itself in HDFS is atomic, and I didn't imagine it was for tackling object
store.

I learned a lot for object store from your answer. Thanks again.

Jungtaek Lim (HeartSaVioR)

2018년 10월 3일 (수) 오전 2:48, chandan prakash <ch...@gmail.com>님이 작성:

> Thanks a lot Steve and Jungtaek for your answers.
> Steve,
> You explained really well in depth.
>
>  I understood that the existing old implementation was not correct for
> object store like S3. The new implementation will address that. And for
> better performance we should better choose a Direct Write based checkpoint
> rather than Rename based (which we can implement using the new
> CheckpointFilemanager abstraction)
> My confusion was because of this line in PR:
> *This is incorrect as rename is not atomic in HDFS FileSystem
> implementation*
> I thought the above line meant that existing old implementation is not
> correct for HDFS file system as well .
> So wanted to understand if there is something I am missing . The new
> implementation is for addressing issue of Object Store like S3 and nor HDFS.
> Thanks again for your explanation, I am sure it will help a lot of other
> code readers as well .
>
> Regards,
> Chandan
>
>
>
> On Mon, Oct 1, 2018 at 5:37 PM Steve Loughran <st...@hortonworks.com>
> wrote:
>
>>
>>
>> On 11 Aug 2018, at 17:33, chandan prakash <ch...@gmail.com>
>> wrote:
>>
>> Hi All,
>> I was going through this pull request about new CheckpointFileManager
>> abstraction in structured streaming coming in 2.4 :
>> https://issues.apache.org/jira/browse/SPARK-23966
>> https://github.com/apache/spark/pull/21048
>>
>> I went through the code in detail and found it will indtroduce a very
>> nice abstraction which is much cleaner and extensible for Direct Writes
>> File System like S3 (in addition to current HDFS file system).
>>
>> *But I am unable to understand, is it really solving some problem in
>> exsisting State Store code which is currently  existing in Spark 2.3 ? *
>>
>> *My questions related to above statements in State Store code : *
>>  *PR description*:: "Checkpoint files must be written atomically such
>> that *no partial files are generated*.
>> *QUESTION*: When are partial files generated in current code ?  I can
>> see that data is first written to temp-delta file and then renamed to
>> version.delta file. If something bad happens, the task will fail due to
>> thrown exception and abort() will be called on store to close and delete
>> tempDeltaFileStream . I think it is quite clean, what is the case that
>> partial files might be generated ?
>>
>>
>> I suspect the issue is that as files are written to a "classic" Posix
>> store, flush/sync operations can result in the intermediate data being
>> visible to others. Which is why the convention for checkpointing/commit
>> operations is : write to temp & rename. Which is not what you want for
>> object stores, especially S3
>>
>>
>>
>>  *PR description*:: *State Store behavior is incorrect - HDFS FileSystem
>> implementation does not have atomic rename*"
>> *QUESTION*:  Hdfs filesystem rename operation is atomic, I think above
>> line takes into account about checking existing file if exists and then
>> taking appropriate action which together makes the file renaming operation
>> multi-steps and hence non-atomic. But why this behaviour is incorrect ?
>> Even if multiple executors try to write to the same version.delta file,
>> only 1st of them will succeed, the second one will see the file exists and
>> will delete its temp-delta file. Looks good .
>>
>>
>> HDFS single file and dir rename is atomic; it grabs a lock on the
>> metadatastore, does the change, unlocks it. If you are doing any FS op
>> which explicitly renames more than one file in your commit, you lose
>> atomicity.  If there's a check + rename then yes, it's two step, unless you
>> can use create(path, overwrite=false) to create some lease file where you
>> know that the creation is exclusive & atomic for HDFS + Posix, generally
>> not-at-all for the stores, especially S3 which can actually cache the 404
>> in its load balancers for a few tens of milliseconds
>>
>> For object stores, you are in different world of pain
>>
>> S3: nope; O(files+ data)  + observable + partial failures. List
>> inconsistency + caching of negative GET/HEAD to defend against DoS
>> wasb: no, except for bits of the tree where you enable leases, something
>> which increases cost of operations. O(files), with the odd pause if some
>> shard movement has to take place
>> google GCS: not sure, but it is O(files)
>> Azure abfs. Not atomic yet As the code says:
>>
>>     if (isAtomicRenameKey(source.getName())) {
>>       LOG.warn("The atomic rename feature is not supported by the ABFS
>> scheme; however rename,"
>>               +" create and delete operations are atomic if Namespace is
>> enabled for your Azure Storage account.");
>>     }
>>
>> From my reading of the SPARK-23966 PR, it's the object store problem
>> which is being addressed -both correctness and performance.
>>
>>
>> Anything I am missing here?
>> Really curious to know which corner cases we are trying to solve by this
>> new pull request ?
>>
>>
>>
>> Object stores as the back end. For S3 in particular, where that rename is
>> O(data) and a direct PUT to the destination gives you that atomic ness.
>>
>>
>> Someone needs to sit down and write that reference implementation.
>>
>> Whoever  does want to do that,
>>
>> - I believe it can all be done with the normal Hadoop FS APIs, simply
>> knowing that for the store that OutputStream.close() is (a) atomic, (b)
>> potentially really slow as the remaining data gets uploaded and (c) when it
>> fails, can mean all your data just got lost.
>> - I've got the TLA+ spec for the S3 API which they can use as the
>> foundation for their proofs of correctness
>> https://issues.apache.org/jira/secure/attachment/12865161/objectstore.pdf
>>
>>
>>
>> -Steve
>>
>
>
> --
> Chandan Prakash
>
>

Re: [Structured Streaming SPARK-23966] Why non-atomic rename is problem in State Store ?

Posted by chandan prakash <ch...@gmail.com>.
Thanks a lot Steve and Jungtaek for your answers.
Steve,
You explained really well in depth.

 I understood that the existing old implementation was not correct for
object store like S3. The new implementation will address that. And for
better performance we should better choose a Direct Write based checkpoint
rather than Rename based (which we can implement using the new
CheckpointFilemanager abstraction)
My confusion was because of this line in PR:
*This is incorrect as rename is not atomic in HDFS FileSystem
implementation*
I thought the above line meant that existing old implementation is not
correct for HDFS file system as well .
So wanted to understand if there is something I am missing . The new
implementation is for addressing issue of Object Store like S3 and nor HDFS.
Thanks again for your explanation, I am sure it will help a lot of other
code readers as well .

Regards,
Chandan



On Mon, Oct 1, 2018 at 5:37 PM Steve Loughran <st...@hortonworks.com>
wrote:

>
>
> On 11 Aug 2018, at 17:33, chandan prakash <ch...@gmail.com>
> wrote:
>
> Hi All,
> I was going through this pull request about new CheckpointFileManager
> abstraction in structured streaming coming in 2.4 :
> https://issues.apache.org/jira/browse/SPARK-23966
> https://github.com/apache/spark/pull/21048
>
> I went through the code in detail and found it will indtroduce a very nice
> abstraction which is much cleaner and extensible for Direct Writes File
> System like S3 (in addition to current HDFS file system).
>
> *But I am unable to understand, is it really solving some problem in
> exsisting State Store code which is currently  existing in Spark 2.3 ? *
>
> *My questions related to above statements in State Store code : *
>  *PR description*:: "Checkpoint files must be written atomically such
> that *no partial files are generated*.
> *QUESTION*: When are partial files generated in current code ?  I can see
> that data is first written to temp-delta file and then renamed to
> version.delta file. If something bad happens, the task will fail due to
> thrown exception and abort() will be called on store to close and delete
> tempDeltaFileStream . I think it is quite clean, what is the case that
> partial files might be generated ?
>
>
> I suspect the issue is that as files are written to a "classic" Posix
> store, flush/sync operations can result in the intermediate data being
> visible to others. Which is why the convention for checkpointing/commit
> operations is : write to temp & rename. Which is not what you want for
> object stores, especially S3
>
>
>
>  *PR description*:: *State Store behavior is incorrect - HDFS FileSystem
> implementation does not have atomic rename*"
> *QUESTION*:  Hdfs filesystem rename operation is atomic, I think above
> line takes into account about checking existing file if exists and then
> taking appropriate action which together makes the file renaming operation
> multi-steps and hence non-atomic. But why this behaviour is incorrect ?
> Even if multiple executors try to write to the same version.delta file,
> only 1st of them will succeed, the second one will see the file exists and
> will delete its temp-delta file. Looks good .
>
>
> HDFS single file and dir rename is atomic; it grabs a lock on the
> metadatastore, does the change, unlocks it. If you are doing any FS op
> which explicitly renames more than one file in your commit, you lose
> atomicity.  If there's a check + rename then yes, it's two step, unless you
> can use create(path, overwrite=false) to create some lease file where you
> know that the creation is exclusive & atomic for HDFS + Posix, generally
> not-at-all for the stores, especially S3 which can actually cache the 404
> in its load balancers for a few tens of milliseconds
>
> For object stores, you are in different world of pain
>
> S3: nope; O(files+ data)  + observable + partial failures. List
> inconsistency + caching of negative GET/HEAD to defend against DoS
> wasb: no, except for bits of the tree where you enable leases, something
> which increases cost of operations. O(files), with the odd pause if some
> shard movement has to take place
> google GCS: not sure, but it is O(files)
> Azure abfs. Not atomic yet As the code says:
>
>     if (isAtomicRenameKey(source.getName())) {
>       LOG.warn("The atomic rename feature is not supported by the ABFS
> scheme; however rename,"
>               +" create and delete operations are atomic if Namespace is
> enabled for your Azure Storage account.");
>     }
>
> From my reading of the SPARK-23966 PR, it's the object store problem which
> is being addressed -both correctness and performance.
>
>
> Anything I am missing here?
> Really curious to know which corner cases we are trying to solve by this
> new pull request ?
>
>
>
> Object stores as the back end. For S3 in particular, where that rename is
> O(data) and a direct PUT to the destination gives you that atomic ness.
>
>
> Someone needs to sit down and write that reference implementation.
>
> Whoever  does want to do that,
>
> - I believe it can all be done with the normal Hadoop FS APIs, simply
> knowing that for the store that OutputStream.close() is (a) atomic, (b)
> potentially really slow as the remaining data gets uploaded and (c) when it
> fails, can mean all your data just got lost.
> - I've got the TLA+ spec for the S3 API which they can use as the
> foundation for their proofs of correctness
> https://issues.apache.org/jira/secure/attachment/12865161/objectstore.pdf
>
>
> -Steve
>


-- 
Chandan Prakash

Re: [Structured Streaming SPARK-23966] Why non-atomic rename is problem in State Store ?

Posted by Steve Loughran <st...@hortonworks.com>.

On 11 Aug 2018, at 17:33, chandan prakash <ch...@gmail.com>> wrote:

Hi All,
I was going through this pull request about new CheckpointFileManager abstraction in structured streaming coming in 2.4 :
https://issues.apache.org/jira/browse/SPARK-23966
https://github.com/apache/spark/pull/21048

I went through the code in detail and found it will indtroduce a very nice abstraction which is much cleaner and extensible for Direct Writes File System like S3 (in addition to current HDFS file system).

But I am unable to understand, is it really solving some problem in exsisting State Store code which is currently  existing in Spark 2.3 ?

My questions related to above statements in State Store code :
 PR description:: "Checkpoint files must be written atomically such that no partial files are generated.
QUESTION: When are partial files generated in current code ?  I can see that data is first written to temp-delta file and then renamed to version.delta file. If something bad happens, the task will fail due to thrown exception and abort() will be called on store to close and delete tempDeltaFileStream . I think it is quite clean, what is the case that partial files might be generated ?

I suspect the issue is that as files are written to a "classic" Posix store, flush/sync operations can result in the intermediate data being visible to others. Which is why the convention for checkpointing/commit operations is : write to temp & rename. Which is not what you want for object stores, especially S3



 PR description:: State Store behavior is incorrect - HDFS FileSystem implementation does not have atomic rename"
QUESTION:  Hdfs filesystem rename operation is atomic, I think above line takes into account about checking existing file if exists and then taking appropriate action which together makes the file renaming operation multi-steps and hence non-atomic. But why this behaviour is incorrect ?
Even if multiple executors try to write to the same version.delta file, only 1st of them will succeed, the second one will see the file exists and will delete its temp-delta file. Looks good .


HDFS single file and dir rename is atomic; it grabs a lock on the metadatastore, does the change, unlocks it. If you are doing any FS op which explicitly renames more than one file in your commit, you lose atomicity.  If there's a check + rename then yes, it's two step, unless you can use create(path, overwrite=false) to create some lease file where you know that the creation is exclusive & atomic for HDFS + Posix, generally not-at-all for the stores, especially S3 which can actually cache the 404 in its load balancers for a few tens of milliseconds

For object stores, you are in different world of pain

S3: nope; O(files+ data)  + observable + partial failures. List inconsistency + caching of negative GET/HEAD to defend against DoS
wasb: no, except for bits of the tree where you enable leases, something which increases cost of operations. O(files), with the odd pause if some shard movement has to take place
google GCS: not sure, but it is O(files)
Azure abfs. Not atomic yet As the code says:

    if (isAtomicRenameKey(source.getName())) {
      LOG.warn("The atomic rename feature is not supported by the ABFS scheme; however rename,"
              +" create and delete operations are atomic if Namespace is enabled for your Azure Storage account.");
    }

From my reading of the SPARK-23966 PR, it's the object store problem which is being addressed -both correctness and performance.


Anything I am missing here?
Really curious to know which corner cases we are trying to solve by this new pull request ?


Object stores as the back end. For S3 in particular, where that rename is O(data) and a direct PUT to the destination gives you that atomic ness.


Someone needs to sit down and write that reference implementation.

Whoever  does want to do that,

- I believe it can all be done with the normal Hadoop FS APIs, simply knowing that for the store that OutputStream.close() is (a) atomic, (b) potentially really slow as the remaining data gets uploaded and (c) when it fails, can mean all your data just got lost.
- I've got the TLA+ spec for the S3 API which they can use as the foundation for their proofs of correctness https://issues.apache.org/jira/secure/attachment/12865161/objectstore.pdf


-Steve