You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Waleed Fateem <wa...@gmail.com> on 2020/06/25 19:42:50 UTC

Setting spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=1 and Doc issue

Hello!

I noticed that in the documentation starting with 2.2.0 it states that the
parameter spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version is 1
by default:
https://issues.apache.org/jira/browse/SPARK-20107

I don't actually see this being set anywhere explicitly in the Spark code
and so the documentation isn't entirely accurate in case you run on an
environment that has MAPREDUCE-6406 implemented (starting with Hadoop 3.0).

The default version was explicitly set to 2 in the FileOutputCommitter
class, so any output committer that inherits from this class
(ParquetOutputCommitter for example) would use v2 in a Hadoop 3.0
environment and v1 in the older Hadoop environments.

Would it make sense for us to consider setting v1 as the default in code in
case the configuration was not set by a user?

Regards,

Waleed

Re: Setting spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=1 and Doc issue

Posted by Steve Loughran <st...@cloudera.com.INVALID>.
https://issues.apache.org/jira/browse/MAPREDUCE-7282

"MR v2 commit algorithm is dangerous, should be deprecated and not the
default"

someone do a PR to change the default & if it doesn't break too much I'l
merge it



On Mon, 29 Jun 2020 at 13:20, Steve Loughran <st...@cloudera.com> wrote:

> v2 does a file-by-file copy to the dest dir in task commit; v1 promotes
> task attempts to job attempt dir by dir rename, job commit lists those and
> moves the contents
>
> if the worker fails during task commit -the next task attempt has to
> replace every file -so it had better use the same filenames.
>
> The really scary issue is a network partition: if the first worker went
> off-line long enough for a second attempt to commit (If speculation has
> enabled that may not be very long at all as could already be waiting) then
> if the second worker goes online again it may continue with its commit and
> partially overwrite some but not all of the output.
>
> That task commit is not atomic even though spark requires this. It is
> worse on Amazon S3 because rename is O(data). The window for failure is a
> lot longer.
>
> The S3A committers don't commit their work until job commit; while that is
> non-atomic (nor is MR v1 BTW) it's time is |files|/(min(|threads|,
> max-http-pool-size))
>
> The EMR spark committer does actually commit its work in task commit, so
> is also vulnerable. I wish they copied more of our ASF-licensed code :). Or
> some of IBM's stocator work.
>
>
> Presumably their algorithm is
>
> pre-task-reporting ready-to-commit: upload files from the localfd task
> attempt staging dir to dest dir, without completing the upload. You could
> actually do this with a scanning thread uploading as you go along.
> task commit: POST all the uploads
> job commit: touch _SUCCESS
>
> The scales better (no need to load & commit uploads in job commit) and
> does not require any consistent cluster FS. And is faster.
>
> But again: the failure semantic of task commit isn't what spark expects.
>
> Bonus fun: google GCS dir commit is file-by-file so non atomic; v1 task
> commit does expect an atomic dir rename. So you may as well use v2.
>
> They could add a committer which didn't do that rename, just write a
> manifest file to the job attempt dir pointing to the successful task
> attempt; commit that with their atomic file rename. The committer plugin
> point in MR lets you declare a committer factory for each FS, so it could
> be done without any further changes to spark.
>
> On Thu, 25 Jun 2020 at 22:38, Waleed Fateem <wa...@gmail.com>
> wrote:
>
>> I was trying to make my email short and concise, but the rationale behind
>> setting that as 1 by default is because it's safer. With algorithm version
>> 2 you run the risk of having bad data in cases where tasks fail or even
>> duplicate data if a task fails and succeeds on a reattempt (I don't know if
>> this is true for all OutputCommitters that extend the FileOutputCommitter
>> or not).
>>
>> Imran and Marcelo also discussed this here:
>>
>> https://issues.apache.org/jira/browse/SPARK-20107?focusedCommentId=15945177&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15945177
>>
>> I also did discuss this a bit with Steve Loughran and his opinion was
>> that v2 should just be deprecated all together. I believe he was going to
>> bring that up with the Hadoop developers.
>>
>>
>> On Thu, Jun 25, 2020 at 3:56 PM Sean Owen <sr...@gmail.com> wrote:
>>
>>> I think is a Hadoop property that is just passed through? if the
>>> default is different in Hadoop 3 we could mention that in the docs. i
>>> don't know if we want to always set it to 1 as a Spark default, even
>>> in Hadoop 3 right?
>>>
>>> On Thu, Jun 25, 2020 at 2:43 PM Waleed Fateem <wa...@gmail.com>
>>> wrote:
>>> >
>>> > Hello!
>>> >
>>> > I noticed that in the documentation starting with 2.2.0 it states that
>>> the parameter spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version
>>> is 1 by default:
>>> > https://issues.apache.org/jira/browse/SPARK-20107
>>> >
>>> > I don't actually see this being set anywhere explicitly in the Spark
>>> code and so the documentation isn't entirely accurate in case you run on an
>>> environment that has MAPREDUCE-6406 implemented (starting with Hadoop 3.0).
>>> >
>>> > The default version was explicitly set to 2 in the FileOutputCommitter
>>> class, so any output committer that inherits from this class
>>> (ParquetOutputCommitter for example) would use v2 in a Hadoop 3.0
>>> environment and v1 in the older Hadoop environments.
>>> >
>>> > Would it make sense for us to consider setting v1 as the default in
>>> code in case the configuration was not set by a user?
>>> >
>>> > Regards,
>>> >
>>> > Waleed
>>>
>>

Re: Setting spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=1 and Doc issue

Posted by Steve Loughran <st...@cloudera.com.INVALID>.
v2 does a file-by-file copy to the dest dir in task commit; v1 promotes
task attempts to job attempt dir by dir rename, job commit lists those and
moves the contents

if the worker fails during task commit -the next task attempt has to
replace every file -so it had better use the same filenames.

The really scary issue is a network partition: if the first worker went
off-line long enough for a second attempt to commit (If speculation has
enabled that may not be very long at all as could already be waiting) then
if the second worker goes online again it may continue with its commit and
partially overwrite some but not all of the output.

That task commit is not atomic even though spark requires this. It is worse
on Amazon S3 because rename is O(data). The window for failure is a lot
longer.

The S3A committers don't commit their work until job commit; while that is
non-atomic (nor is MR v1 BTW) it's time is |files|/(min(|threads|,
max-http-pool-size))

The EMR spark committer does actually commit its work in task commit, so is
also vulnerable. I wish they copied more of our ASF-licensed code :). Or
some of IBM's stocator work.


Presumably their algorithm is

pre-task-reporting ready-to-commit: upload files from the localfd task
attempt staging dir to dest dir, without completing the upload. You could
actually do this with a scanning thread uploading as you go along.
task commit: POST all the uploads
job commit: touch _SUCCESS

The scales better (no need to load & commit uploads in job commit) and does
not require any consistent cluster FS. And is faster.

But again: the failure semantic of task commit isn't what spark expects.

Bonus fun: google GCS dir commit is file-by-file so non atomic; v1 task
commit does expect an atomic dir rename. So you may as well use v2.

They could add a committer which didn't do that rename, just write a
manifest file to the job attempt dir pointing to the successful task
attempt; commit that with their atomic file rename. The committer plugin
point in MR lets you declare a committer factory for each FS, so it could
be done without any further changes to spark.

On Thu, 25 Jun 2020 at 22:38, Waleed Fateem <wa...@gmail.com> wrote:

> I was trying to make my email short and concise, but the rationale behind
> setting that as 1 by default is because it's safer. With algorithm version
> 2 you run the risk of having bad data in cases where tasks fail or even
> duplicate data if a task fails and succeeds on a reattempt (I don't know if
> this is true for all OutputCommitters that extend the FileOutputCommitter
> or not).
>
> Imran and Marcelo also discussed this here:
>
> https://issues.apache.org/jira/browse/SPARK-20107?focusedCommentId=15945177&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15945177
>
> I also did discuss this a bit with Steve Loughran and his opinion was that
> v2 should just be deprecated all together. I believe he was going to bring
> that up with the Hadoop developers.
>
>
> On Thu, Jun 25, 2020 at 3:56 PM Sean Owen <sr...@gmail.com> wrote:
>
>> I think is a Hadoop property that is just passed through? if the
>> default is different in Hadoop 3 we could mention that in the docs. i
>> don't know if we want to always set it to 1 as a Spark default, even
>> in Hadoop 3 right?
>>
>> On Thu, Jun 25, 2020 at 2:43 PM Waleed Fateem <wa...@gmail.com>
>> wrote:
>> >
>> > Hello!
>> >
>> > I noticed that in the documentation starting with 2.2.0 it states that
>> the parameter spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version
>> is 1 by default:
>> > https://issues.apache.org/jira/browse/SPARK-20107
>> >
>> > I don't actually see this being set anywhere explicitly in the Spark
>> code and so the documentation isn't entirely accurate in case you run on an
>> environment that has MAPREDUCE-6406 implemented (starting with Hadoop 3.0).
>> >
>> > The default version was explicitly set to 2 in the FileOutputCommitter
>> class, so any output committer that inherits from this class
>> (ParquetOutputCommitter for example) would use v2 in a Hadoop 3.0
>> environment and v1 in the older Hadoop environments.
>> >
>> > Would it make sense for us to consider setting v1 as the default in
>> code in case the configuration was not set by a user?
>> >
>> > Regards,
>> >
>> > Waleed
>>
>

Re: Setting spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=1 and Doc issue

Posted by Waleed Fateem <wa...@gmail.com>.
I was trying to make my email short and concise, but the rationale behind
setting that as 1 by default is because it's safer. With algorithm version
2 you run the risk of having bad data in cases where tasks fail or even
duplicate data if a task fails and succeeds on a reattempt (I don't know if
this is true for all OutputCommitters that extend the FileOutputCommitter
or not).

Imran and Marcelo also discussed this here:
https://issues.apache.org/jira/browse/SPARK-20107?focusedCommentId=15945177&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15945177

I also did discuss this a bit with Steve Loughran and his opinion was that
v2 should just be deprecated all together. I believe he was going to bring
that up with the Hadoop developers.


On Thu, Jun 25, 2020 at 3:56 PM Sean Owen <sr...@gmail.com> wrote:

> I think is a Hadoop property that is just passed through? if the
> default is different in Hadoop 3 we could mention that in the docs. i
> don't know if we want to always set it to 1 as a Spark default, even
> in Hadoop 3 right?
>
> On Thu, Jun 25, 2020 at 2:43 PM Waleed Fateem <wa...@gmail.com>
> wrote:
> >
> > Hello!
> >
> > I noticed that in the documentation starting with 2.2.0 it states that
> the parameter spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version
> is 1 by default:
> > https://issues.apache.org/jira/browse/SPARK-20107
> >
> > I don't actually see this being set anywhere explicitly in the Spark
> code and so the documentation isn't entirely accurate in case you run on an
> environment that has MAPREDUCE-6406 implemented (starting with Hadoop 3.0).
> >
> > The default version was explicitly set to 2 in the FileOutputCommitter
> class, so any output committer that inherits from this class
> (ParquetOutputCommitter for example) would use v2 in a Hadoop 3.0
> environment and v1 in the older Hadoop environments.
> >
> > Would it make sense for us to consider setting v1 as the default in code
> in case the configuration was not set by a user?
> >
> > Regards,
> >
> > Waleed
>

Re: Setting spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=1 and Doc issue

Posted by Sean Owen <sr...@gmail.com>.
I think is a Hadoop property that is just passed through? if the
default is different in Hadoop 3 we could mention that in the docs. i
don't know if we want to always set it to 1 as a Spark default, even
in Hadoop 3 right?

On Thu, Jun 25, 2020 at 2:43 PM Waleed Fateem <wa...@gmail.com> wrote:
>
> Hello!
>
> I noticed that in the documentation starting with 2.2.0 it states that the parameter spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version is 1 by default:
> https://issues.apache.org/jira/browse/SPARK-20107
>
> I don't actually see this being set anywhere explicitly in the Spark code and so the documentation isn't entirely accurate in case you run on an environment that has MAPREDUCE-6406 implemented (starting with Hadoop 3.0).
>
> The default version was explicitly set to 2 in the FileOutputCommitter class, so any output committer that inherits from this class (ParquetOutputCommitter for example) would use v2 in a Hadoop 3.0 environment and v1 in the older Hadoop environments.
>
> Would it make sense for us to consider setting v1 as the default in code in case the configuration was not set by a user?
>
> Regards,
>
> Waleed

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org