You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Mendelson, Assaf" <As...@rsa.com> on 2017/02/14 10:22:06 UTC

fault tolerant dataframe write with overwrite

Hi,

I have a case where I have an iterative process which overwrites the results of a previous iteration.
Every iteration I need to write a dataframe with the results.
The problem is that when I write, if I simply overwrite the results of the previous iteration, this is not fault tolerant. i.e. if the program crashes in the middle of an iteration, the data from previous ones is lost as overwrite first removes the previous data and then starts writing.

Currently we simply write to a new directory and then rename but this is not the best way as it requires us to know the interfaces to the underlying file system (as well as requiring some extra work to manage which is the last one etc.)
I know I can also use checkpoint (although I haven't fully tested the process there), however, checkpointing converts the result to RDD which both takes more time and more space.
I was wondering if there is any efficient method of managing this from inside spark.
Thanks,
                Assaf.

Re: fault tolerant dataframe write with overwrite

Posted by Jörn Franke <jo...@gmail.com>.
If you use S3 you can first copy it into a temporary folder on HDFS. However for the checkpointing I would use the spark implementation. You can load also the file from S3 and checkpoint to HDFS.

> On 14 Feb 2017, at 17:43, Mendelson, Assaf <As...@rsa.com> wrote:
> 
> Thanks, I didn’t know the Hadoop API supports other file systems other than HDFS and local file system (when there is 1 node).
> My main goal is indeed for checkpointing, every N iterations I save the data for future use. The problem is that if I use overwrite mode then it first deletes and then write the new one so that is what I am looking to solve.
>  
> I wasn’t aware of the issues with renaming in S3 (we currently not using it, we just know we would probably need to support it or a similar store in the future). That said, how does spark handle this case then when writing a dataframe? Currently it writes everything to a temporary sub directory and renames it at the end?
>  
> In any case, I was hoping for some way internal to spark to do a write which does not harm the previous version of the dataframe on disk until a successful writing of the new one.
> Thanks,
> Assaf.
>  
>  
> From: Steve Loughran [mailto:stevel@hortonworks.com] 
> Sent: Tuesday, February 14, 2017 3:25 PM
> To: Mendelson, Assaf
> Cc: Jörn Franke; user
> Subject: Re: fault tolerant dataframe write with overwrite
>  
>  
> On 14 Feb 2017, at 11:12, Mendelson, Assaf <As...@rsa.com> wrote:
>  
> I know how to get the filesystem, the problem is that this means using Hadoop directly so if in the future we change to something else (e.g. S3) I would need to rewrite the code.
>  
> well, no, because the s3 and hfs clients use the same API
>  
> FileSystem fs = FileSystem.get("hdfs://nn:8020/users/stevel", conf)
>  
> vs
>  
> FileSystem fs = FileSystem.get("s3a:/bucket1/dataset", conf)
>  
> same for wasb://  (which, being consistent and with fast atomic rename, can be used instead of HDFS), other cluster filesystems. If it's a native fs, then file:// should work everywhere, or some derivative (as redhat do with gluster)
> 
> 
> This also relate to finding the last iteration, I would need to use Hadoop filesystem which is not agnostic to the deployment.
>  
>  
> see above. if you are using a spark cluster of size > 1 you will need some distributed filesystem, which is going to have to provide a
>  
> If there is an issue here, it is that if you rely on FileSystem.rename() being an atomic O(1) operation then you are going to be disappointed on S3, as its a non-atomic O(data) copy & delete whose failure state is "undefined". 
>  
>  
> The solution here comes from having specific commiter logic for the different object stores. You really, really don' t want to go there. If you do, have a start by looking at the S3guard WiP one: https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer.md
>  
> further reading: http://www.slideshare.net/steve_l/spark-summit-east-2017-apache-spark-and-object-stores
> 
> 
> Kyroserializer still costs much more than dataframe write.
>  
> As for the use case, I am doing a very large number of iterations. So the idea is that every X iterations I want to save to disk so that if something crashes I do not have to begin from the first iteration but just from the relevant iteration.
>  
>  
> sounds like you don't really want the output to always be the FS, more checkpointing iterations. Couldn't you do something like every 20 iterations, write() the relevant RDD to the DFS
>  
>  
> Basically I would have liked to see something like saving normally and the original data would not be removed until a successful write.
> Assaf.
>  
> From: Jörn Franke [mailto:jornfranke@gmail.com] 
> Sent: Tuesday, February 14, 2017 12:54 PM
> To: Mendelson, Assaf
> Cc: user
> Subject: Re: fault tolerant dataframe write with overwrite
>  
> Normally you can fetch the filesystem interface from the configuration ( I assume you mean URI).
> Managing to get the last iteration: I do not understand the issue. You can have as the directory the current timestamp and at the end you simply select the directory with the highest number.
>  
> Regards to checkpointing , you can use also kyroserializer to avoid some space overhead.
>  
> Aside from that, can you elaborate on the use case why you need to write every iteration?
> 
> On 14 Feb 2017, at 11:22, Mendelson, Assaf <As...@rsa.com> wrote:
> 
> Hi,
>  
> I have a case where I have an iterative process which overwrites the results of a previous iteration.
> Every iteration I need to write a dataframe with the results.
> The problem is that when I write, if I simply overwrite the results of the previous iteration, this is not fault tolerant. i.e. if the program crashes in the middle of an iteration, the data from previous ones is lost as overwrite first removes the previous data and then starts writing.
>  
> Currently we simply write to a new directory and then rename but this is not the best way as it requires us to know the interfaces to the underlying file system (as well as requiring some extra work to manage which is the last one etc.)
> I know I can also use checkpoint (although I haven’t fully tested the process there), however, checkpointing converts the result to RDD which both takes more time and more space.
> I was wondering if there is any efficient method of managing this from inside spark.
> Thanks,
>                 Assaf.
>  

RE: fault tolerant dataframe write with overwrite

Posted by "Mendelson, Assaf" <As...@rsa.com>.
Thanks, I didn’t know the Hadoop API supports other file systems other than HDFS and local file system (when there is 1 node).
My main goal is indeed for checkpointing, every N iterations I save the data for future use. The problem is that if I use overwrite mode then it first deletes and then write the new one so that is what I am looking to solve.

I wasn’t aware of the issues with renaming in S3 (we currently not using it, we just know we would probably need to support it or a similar store in the future). That said, how does spark handle this case then when writing a dataframe? Currently it writes everything to a temporary sub directory and renames it at the end?

In any case, I was hoping for some way internal to spark to do a write which does not harm the previous version of the dataframe on disk until a successful writing of the new one.
Thanks,
Assaf.


From: Steve Loughran [mailto:stevel@hortonworks.com]
Sent: Tuesday, February 14, 2017 3:25 PM
To: Mendelson, Assaf
Cc: Jörn Franke; user
Subject: Re: fault tolerant dataframe write with overwrite


On 14 Feb 2017, at 11:12, Mendelson, Assaf <As...@rsa.com>> wrote:

I know how to get the filesystem, the problem is that this means using Hadoop directly so if in the future we change to something else (e.g. S3) I would need to rewrite the code.

well, no, because the s3 and hfs clients use the same API

FileSystem fs = FileSystem.get("hdfs://nn:8020/users/stevel", conf)

vs

FileSystem fs = FileSystem.get("s3a:/bucket1/dataset", conf)

same for wasb://  (which, being consistent and with fast atomic rename, can be used instead of HDFS), other cluster filesystems. If it's a native fs, then file:// should work everywhere, or some derivative (as redhat do with gluster)


This also relate to finding the last iteration, I would need to use Hadoop filesystem which is not agnostic to the deployment.


see above. if you are using a spark cluster of size > 1 you will need some distributed filesystem, which is going to have to provide a

If there is an issue here, it is that if you rely on FileSystem.rename() being an atomic O(1) operation then you are going to be disappointed on S3, as its a non-atomic O(data) copy & delete whose failure state is "undefined".


The solution here comes from having specific commiter logic for the different object stores. You really, really don' t want to go there. If you do, have a start by looking at the S3guard WiP one: https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer.md

further reading: http://www.slideshare.net/steve_l/spark-summit-east-2017-apache-spark-and-object-stores


Kyroserializer still costs much more than dataframe write.

As for the use case, I am doing a very large number of iterations. So the idea is that every X iterations I want to save to disk so that if something crashes I do not have to begin from the first iteration but just from the relevant iteration.


sounds like you don't really want the output to always be the FS, more checkpointing iterations. Couldn't you do something like every 20 iterations, write() the relevant RDD to the DFS


Basically I would have liked to see something like saving normally and the original data would not be removed until a successful write.
Assaf.

From: Jörn Franke [mailto:jornfranke@gmail.com]
Sent: Tuesday, February 14, 2017 12:54 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: fault tolerant dataframe write with overwrite

Normally you can fetch the filesystem interface from the configuration ( I assume you mean URI).
Managing to get the last iteration: I do not understand the issue. You can have as the directory the current timestamp and at the end you simply select the directory with the highest number.

Regards to checkpointing , you can use also kyroserializer to avoid some space overhead.

Aside from that, can you elaborate on the use case why you need to write every iteration?

On 14 Feb 2017, at 11:22, Mendelson, Assaf <As...@rsa.com>> wrote:
Hi,

I have a case where I have an iterative process which overwrites the results of a previous iteration.
Every iteration I need to write a dataframe with the results.
The problem is that when I write, if I simply overwrite the results of the previous iteration, this is not fault tolerant. i.e. if the program crashes in the middle of an iteration, the data from previous ones is lost as overwrite first removes the previous data and then starts writing.

Currently we simply write to a new directory and then rename but this is not the best way as it requires us to know the interfaces to the underlying file system (as well as requiring some extra work to manage which is the last one etc.)
I know I can also use checkpoint (although I haven’t fully tested the process there), however, checkpointing converts the result to RDD which both takes more time and more space.
I was wondering if there is any efficient method of managing this from inside spark.
Thanks,
                Assaf.


Re: fault tolerant dataframe write with overwrite

Posted by Steve Loughran <st...@hortonworks.com>.
On 14 Feb 2017, at 11:12, Mendelson, Assaf <As...@rsa.com>> wrote:

I know how to get the filesystem, the problem is that this means using Hadoop directly so if in the future we change to something else (e.g. S3) I would need to rewrite the code.

well, no, because the s3 and hfs clients use the same API

FileSystem fs = FileSystem.get("hdfs://nn:8020/users/stevel", conf)

vs

FileSystem fs = FileSystem.get("s3a:/bucket1/dataset", conf)

same for wasb://  (which, being consistent and with fast atomic rename, can be used instead of HDFS), other cluster filesystems. If it's a native fs, then file:// should work everywhere, or some derivative (as redhat do with gluster)

This also relate to finding the last iteration, I would need to use Hadoop filesystem which is not agnostic to the deployment.


see above. if you are using a spark cluster of size > 1 you will need some distributed filesystem, which is going to have to provide a

If there is an issue here, it is that if you rely on FileSystem.rename() being an atomic O(1) operation then you are going to be disappointed on S3, as its a non-atomic O(data) copy & delete whose failure state is "undefined".


The solution here comes from having specific commiter logic for the different object stores. You really, really don' t want to go there. If you do, have a start by looking at the S3guard WiP one: https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer.md

further reading: http://www.slideshare.net/steve_l/spark-summit-east-2017-apache-spark-and-object-stores

Kyroserializer still costs much more than dataframe write.

As for the use case, I am doing a very large number of iterations. So the idea is that every X iterations I want to save to disk so that if something crashes I do not have to begin from the first iteration but just from the relevant iteration.


sounds like you don't really want the output to always be the FS, more checkpointing iterations. Couldn't you do something like every 20 iterations, write() the relevant RDD to the DFS


Basically I would have liked to see something like saving normally and the original data would not be removed until a successful write.
Assaf.

From: Jörn Franke [mailto:jornfranke@gmail.com]
Sent: Tuesday, February 14, 2017 12:54 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: fault tolerant dataframe write with overwrite

Normally you can fetch the filesystem interface from the configuration ( I assume you mean URI).
Managing to get the last iteration: I do not understand the issue. You can have as the directory the current timestamp and at the end you simply select the directory with the highest number.

Regards to checkpointing , you can use also kyroserializer to avoid some space overhead.

Aside from that, can you elaborate on the use case why you need to write every iteration?

On 14 Feb 2017, at 11:22, Mendelson, Assaf <As...@rsa.com>> wrote:
Hi,

I have a case where I have an iterative process which overwrites the results of a previous iteration.
Every iteration I need to write a dataframe with the results.
The problem is that when I write, if I simply overwrite the results of the previous iteration, this is not fault tolerant. i.e. if the program crashes in the middle of an iteration, the data from previous ones is lost as overwrite first removes the previous data and then starts writing.

Currently we simply write to a new directory and then rename but this is not the best way as it requires us to know the interfaces to the underlying file system (as well as requiring some extra work to manage which is the last one etc.)
I know I can also use checkpoint (although I haven’t fully tested the process there), however, checkpointing converts the result to RDD which both takes more time and more space.
I was wondering if there is any efficient method of managing this from inside spark.
Thanks,
                Assaf.


RE: fault tolerant dataframe write with overwrite

Posted by "Mendelson, Assaf" <As...@rsa.com>.
I know how to get the filesystem, the problem is that this means using Hadoop directly so if in the future we change to something else (e.g. S3) I would need to rewrite the code. This also relate to finding the last iteration, I would need to use Hadoop filesystem which is not agnostic to the deployment.

Kyroserializer still costs much more than dataframe write.

As for the use case, I am doing a very large number of iterations. So the idea is that every X iterations I want to save to disk so that if something crashes I do not have to begin from the first iteration but just from the relevant iteration.

Basically I would have liked to see something like saving normally and the original data would not be removed until a successful write.
Assaf.

From: Jörn Franke [mailto:jornfranke@gmail.com]
Sent: Tuesday, February 14, 2017 12:54 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: fault tolerant dataframe write with overwrite

Normally you can fetch the filesystem interface from the configuration ( I assume you mean URI).
Managing to get the last iteration: I do not understand the issue. You can have as the directory the current timestamp and at the end you simply select the directory with the highest number.

Regards to checkpointing , you can use also kyroserializer to avoid some space overhead.

Aside from that, can you elaborate on the use case why you need to write every iteration?

On 14 Feb 2017, at 11:22, Mendelson, Assaf <As...@rsa.com>> wrote:
Hi,

I have a case where I have an iterative process which overwrites the results of a previous iteration.
Every iteration I need to write a dataframe with the results.
The problem is that when I write, if I simply overwrite the results of the previous iteration, this is not fault tolerant. i.e. if the program crashes in the middle of an iteration, the data from previous ones is lost as overwrite first removes the previous data and then starts writing.

Currently we simply write to a new directory and then rename but this is not the best way as it requires us to know the interfaces to the underlying file system (as well as requiring some extra work to manage which is the last one etc.)
I know I can also use checkpoint (although I haven’t fully tested the process there), however, checkpointing converts the result to RDD which both takes more time and more space.
I was wondering if there is any efficient method of managing this from inside spark.
Thanks,
                Assaf.

Re: fault tolerant dataframe write with overwrite

Posted by Jörn Franke <jo...@gmail.com>.
Normally you can fetch the filesystem interface from the configuration ( I assume you mean URI).
Managing to get the last iteration: I do not understand the issue. You can have as the directory the current timestamp and at the end you simply select the directory with the highest number.

Regards to checkpointing , you can use also kyroserializer to avoid some space overhead.

Aside from that, can you elaborate on the use case why you need to write every iteration?

> On 14 Feb 2017, at 11:22, Mendelson, Assaf <As...@rsa.com> wrote:
> 
> Hi,
>  
> I have a case where I have an iterative process which overwrites the results of a previous iteration.
> Every iteration I need to write a dataframe with the results.
> The problem is that when I write, if I simply overwrite the results of the previous iteration, this is not fault tolerant. i.e. if the program crashes in the middle of an iteration, the data from previous ones is lost as overwrite first removes the previous data and then starts writing.
>  
> Currently we simply write to a new directory and then rename but this is not the best way as it requires us to know the interfaces to the underlying file system (as well as requiring some extra work to manage which is the last one etc.)
> I know I can also use checkpoint (although I haven’t fully tested the process there), however, checkpointing converts the result to RDD which both takes more time and more space.
> I was wondering if there is any efficient method of managing this from inside spark.
> Thanks,
>                 Assaf.