You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Adam Binford <ad...@gmail.com> on 2021/08/17 16:39:39 UTC

Observer Namenode and Committer Algorithm V1

Hi,

We ran into an interesting issue that I wanted to share as well as get
thoughts on if anything should be done about this. We run our own Hadoop
cluster and recently deployed an Observer Namenode to take some burden off
of our Active Namenode. We mostly use Delta Lake as our format, and
everything seemed great. But when running some one-off analytics we ran
into an issue. Specifically, we did something like:

"df.<do some analytic>.repartition(1).write.csv()"

This is our quick way of creating a CSV we can download and do other things
with when our result is some small aggregation. However, we kept getting an
empty output directory (just a _SUCCESS file and nothing else), even though
in the Spark UI it says it wrote some positive number of rows. Eventually
traced it back to our update to use the ObserverReadProxyProvider in our
notebook sessions. I finally figured out it was due to the "Maintaining
Client Consistency" section talked about in
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html
.

After setting the auto msync period to a low value, the writes started
working. I kept digging in and realized it's due to how the
FileOutputCommitter v1 algorithm works. During the commitJob phase, the
AM/driver does a file system listing on the output directory to find all
the finished task output files it needs to move to the top level output
directory. But since this is a read, the observer can serve this request,
but it can be out of date and not see the newly written files that just
finished from the executors. The auto msync fixed it because it forced the
driver to do an msync before the read took place. However, frequent auto
msyncs can defeat some of the performance benefits of the Observer.

The v2 algorithm shouldn't have this issue because the tasks themselves
copy the output to the final directory when they finish, and the driver
simply adds the _SUCCESS at the end. And Hadoop's default is v2, but Spark
overrides that to use v1 by default because of potential correctness
issues, which is fair. While this is mostly an issue with Hadoop, the fact
that Spark defaults to the v1 algorithm makes it somewhat of a Spark
problem. Also, things like Delta Lake (or even regular structured streaming
output I think) shouldn't have issues because they are direct write with
transaction log based, so no file moving on the driver involved.

So I mostly wanted to share that in case anyone else runs into this same
issue. But also wanted to get thoughts on if anything should be done about
this to prevent it from happening. Several ideas in no particular order:

- Perform an msync during Spark's commitJob before calling the parent
commitJob. Since this is only available in newer APIs, probably isn't even
possible while maintaining compatibility with older Hadoop versions.
- Attempt to get an msync added upstream in Hadoop's v1 committer's
commitJob
- Attempt to detect the use of the ObserverReadProxyProvider and either
force using v2 committer on the spark side or just print out a warning that
you either need to use the v2 committer or you need to set the auto msync
period very low or 0 to guarantee correct output.
- Simply add something to the Spark docs somewhere about things to know
when using the ObserverReadProxyProvider
- Assume that if you are capable of creating your own Hadoop cluster with
an Observer Namenode you will recognize this limitation quickly, which it
only took me about an hour to figure out so that's also fair

Thanks,

-- 
Adam

Re: Observer Namenode and Committer Algorithm V1

Posted by Venkatakrishnan Sowrirajan <vs...@asu.edu>.
I have created a JIRA (https://issues.apache.org/jira/browse/SPARK-36810)
to track this issue. Will look into this issue further in the coming days.

Regards
Venkata krishnan


On Tue, Sep 7, 2021 at 5:57 AM Steve Loughran <st...@cloudera.com.invalid>
wrote:

> FileContext came in Hadoop 2.x with a cleaner split of client API and
> driver implementation, and stricter definition of some things considered
> broken in FileSystem (rename() corner cases, notion of a current directory,
> ...)
>
> But as it came out after the platform was broadly adopted & never
> backported to hadoop 1, it never got picked up... So even though its tagged
> as the "newer" API, it's not the one used by apps. And as it will relay to
> FileSystem, anyone doing interesting things at the FS client level can just
> add it there and have it adopted in both places.
>
> The design of FileContext _is_ better, but the extra layers get in the way
> of the interesting games you can play to deliver performance speedups
> against cloud storage. So that's why we tend to work in FileSystem, with
> the FS API spec and contract tests essentially reverse engineering what it
> is that HDFS does and which applications expect (thread safety of input and
> output streams, rename() return codes, ...)
>
> FileSystem is never going to go away. I'd like to fix rename() but we
> can't change rename/2's semantics, making the protected rename/3 isn't
> sufficient. See https://github.com/apache/hadoop/pull/2735
> <https://urldefense.com/v3/__https://github.com/apache/hadoop/pull/2735__;!!IKRxdwAv5BmarQ!PNuufr0_QHextDKhP3kMaqTc2HJjvEcE50VXE4ah12vXr5C-1nIBGhBiEvgYh-Q$>
> for my lapsed work. Got too complicated for some spare-time work,
> especially when there are others with more tangible benefit which don't
> have good alternatives (https://github.com/apache/hadoop/pull/2584
> <https://urldefense.com/v3/__https://github.com/apache/hadoop/pull/2584__;!!IKRxdwAv5BmarQ!PNuufr0_QHextDKhP3kMaqTc2HJjvEcE50VXE4ah12vXr5C-1nIBGhBi1IJO5sU$>
> )
>
> On Mon, 6 Sept 2021 at 16:49, Adam Binford <ad...@gmail.com> wrote:
>
>> Sharing some things I learned looking into the Delta Lake issue:
>>
>> - This was a read after write inconsistency _all on the driver_.
>> Specifically it currently uses the FileSystem API for reading table logs
>> for greater compatibility, but the FileContext API for writes for atomic
>> renames. This led to the FileSystem reads becoming stale as they didn't
>> have to update their state ID after the FileContext writes from a different
>> DFSClient.
>> - The FileContext API generally seems less suitable for an HA HDFS setup,
>> as each FileContext object creates a new DFSClient that has to re-find the
>> active/observer nodes. I know these are cheap operations, but still extra
>> overhead and not ideal. This is compounded by the fact that the name
>> "FileContext" is misleading, as it sounds like something you should create
>> a new instance of per file you want to interact with, and not try to reuse
>> across a file system. There's been an open issue for 12 years about adding
>> caching to the FileContext/AbstractFileSystem API, but there seems some
>> hesitation there due to what happens when you update HDFS while a client is
>> still active. This doesn't appear to be a huge issue directly in spark,
>> since the main place FileContext is used is for the structured streaming
>> commit log for atomic renames, but something to look out for in third party
>> libraries. I do see a lot of warnings about the HDFSMetadataLog looking for
>> the active namenode which I haven't looked into much. I'd expect to only
>> see that once since it seems to properly reuse a single FileContext
>> instance.
>>
>> Adam
>>
>> On Fri, Aug 20, 2021 at 2:22 PM Steve Loughran
>> <st...@cloudera.com.invalid> wrote:
>>
>>>
>>> ooh, this is fun,
>>>
>>> v2 isn't safe to use unless every task attempt generates files with
>>> exactly the same names and it is okay to intermingle the output of two task
>>> attempts.
>>>
>>> This is because task commit can felt partway through (or worse, that
>>> process pause for a full GC), and a second attempt committed. Spark commit
>>> algorithm assumes that it's OK to commit a 2nd attempt if the first attempt
>>> failed, timed out etc. It is for v1, but not v2
>>>
>>> Therefore: a (nonbinding) -1 to any proposal to switch to v2. You are
>>> only changing problems
>>>
>>>
>>> I think the best fix here is to do it in the FileOutputCommitter. Be
>>> aware that we are all scared of that class and always want to do the
>>> minimum necessary.
>>>
>>> I will certainly add to the manifest committer, whose "call for
>>> reviewers and testing" is still open, especially all the way through spark
>>> https://github.com/apache/hadoop/pull/2971
>>> <https://urldefense.com/v3/__https://github.com/apache/hadoop/pull/2971__;!!IKRxdwAv5BmarQ!PNuufr0_QHextDKhP3kMaqTc2HJjvEcE50VXE4ah12vXr5C-1nIBGhBi-1KKzmM$>
>>>
>>> That committer works with HDFS too, I'd be interested in anyone
>>> benchmarking it on queries with deep/wide directory trees and with
>>> different tasks all generating output for the same destination directories
>>> (i.e file rename dominates in job commit, not task rename). I'm not
>>> optimising it for HDFS -it's trying to deal with cloud storage quirks like
>>> nonatomic dir rename (GCS), slow list/file rename perf (everywhere), deep
>>> directory delete timeouts, and other cloud storage specific issues.
>>>
>>>
>>> Further reading on the commit problem in general
>>> https://github.com/steveloughran/zero-rename-committer/releases/tag/tag_release_2021-05-17
>>> <https://urldefense.com/v3/__https://github.com/steveloughran/zero-rename-committer/releases/tag/tag_release_2021-05-17__;!!IKRxdwAv5BmarQ!PNuufr0_QHextDKhP3kMaqTc2HJjvEcE50VXE4ah12vXr5C-1nIBGhBiMBaygVI$>
>>>
>>> -Steve
>>>
>>>
>>>
>>> On Tue, 17 Aug 2021 at 17:39, Adam Binford <ad...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> We ran into an interesting issue that I wanted to share as well as get
>>>> thoughts on if anything should be done about this. We run our own Hadoop
>>>> cluster and recently deployed an Observer Namenode to take some burden off
>>>> of our Active Namenode. We mostly use Delta Lake as our format, and
>>>> everything seemed great. But when running some one-off analytics we ran
>>>> into an issue. Specifically, we did something like:
>>>>
>>>> "df.<do some analytic>.repartition(1).write.csv()"
>>>>
>>>> This is our quick way of creating a CSV we can download and do other
>>>> things with when our result is some small aggregation. However, we kept
>>>> getting an empty output directory (just a _SUCCESS file and nothing else),
>>>> even though in the Spark UI it says it wrote some positive number of rows.
>>>> Eventually traced it back to our update to use the
>>>> ObserverReadProxyProvider in our notebook sessions. I finally figured out
>>>> it was due to the "Maintaining Client Consistency" section talked about in
>>>> https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html
>>>> <https://urldefense.com/v3/__https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html__;!!IKRxdwAv5BmarQ!PNuufr0_QHextDKhP3kMaqTc2HJjvEcE50VXE4ah12vXr5C-1nIBGhBid87xBgQ$>
>>>> .
>>>>
>>>> After setting the auto msync period to a low value, the writes started
>>>> working. I kept digging in and realized it's due to how the
>>>> FileOutputCommitter v1 algorithm works. During the commitJob phase, the
>>>> AM/driver does a file system listing on the output directory to find all
>>>> the finished task output files it needs to move to the top level output
>>>> directory. But since this is a read, the observer can serve this request,
>>>> but it can be out of date and not see the newly written files that just
>>>> finished from the executors. The auto msync fixed it because it forced the
>>>> driver to do an msync before the read took place. However, frequent auto
>>>> msyncs can defeat some of the performance benefits of the Observer.
>>>>
>>>> The v2 algorithm shouldn't have this issue because the tasks themselves
>>>> copy the output to the final directory when they finish, and the driver
>>>> simply adds the _SUCCESS at the end. And Hadoop's default is v2, but Spark
>>>> overrides that to use v1 by default because of potential correctness
>>>> issues, which is fair. While this is mostly an issue with Hadoop, the fact
>>>> that Spark defaults to the v1 algorithm makes it somewhat of a Spark
>>>> problem. Also, things like Delta Lake (or even regular structured streaming
>>>> output I think) shouldn't have issues because they are direct write with
>>>> transaction log based, so no file moving on the driver involved.
>>>>
>>>> So I mostly wanted to share that in case anyone else runs into this
>>>> same issue. But also wanted to get thoughts on if anything should be done
>>>> about this to prevent it from happening. Several ideas in no particular
>>>> order:
>>>>
>>>> - Perform an msync during Spark's commitJob before calling the parent
>>>> commitJob. Since this is only available in newer APIs, probably isn't even
>>>> possible while maintaining compatibility with older Hadoop versions.
>>>> - Attempt to get an msync added upstream in Hadoop's v1 committer's
>>>> commitJob
>>>> - Attempt to detect the use of the ObserverReadProxyProvider and either
>>>> force using v2 committer on the spark side or just print out a warning that
>>>> you either need to use the v2 committer or you need to set the auto msync
>>>> period very low or 0 to guarantee correct output.
>>>> - Simply add something to the Spark docs somewhere about things to know
>>>> when using the ObserverReadProxyProvider
>>>> - Assume that if you are capable of creating your own Hadoop cluster
>>>> with an Observer Namenode you will recognize this limitation quickly, which
>>>> it only took me about an hour to figure out so that's also fair
>>>>
>>>> Thanks,
>>>>
>>>> --
>>>> Adam
>>>>
>>>
>>
>> --
>> Adam Binford
>>
>

Re: Observer Namenode and Committer Algorithm V1

Posted by Steve Loughran <st...@cloudera.com.INVALID>.
FileContext came in Hadoop 2.x with a cleaner split of client API and
driver implementation, and stricter definition of some things considered
broken in FileSystem (rename() corner cases, notion of a current directory,
...)

But as it came out after the platform was broadly adopted & never
backported to hadoop 1, it never got picked up... So even though its tagged
as the "newer" API, it's not the one used by apps. And as it will relay to
FileSystem, anyone doing interesting things at the FS client level can just
add it there and have it adopted in both places.

The design of FileContext _is_ better, but the extra layers get in the way
of the interesting games you can play to deliver performance speedups
against cloud storage. So that's why we tend to work in FileSystem, with
the FS API spec and contract tests essentially reverse engineering what it
is that HDFS does and which applications expect (thread safety of input and
output streams, rename() return codes, ...)

FileSystem is never going to go away. I'd like to fix rename() but we can't
change rename/2's semantics, making the protected rename/3 isn't
sufficient. See https://github.com/apache/hadoop/pull/2735 for my lapsed
work. Got too complicated for some spare-time work, especially when there
are others with more tangible benefit which don't have good alternatives (
https://github.com/apache/hadoop/pull/2584)

On Mon, 6 Sept 2021 at 16:49, Adam Binford <ad...@gmail.com> wrote:

> Sharing some things I learned looking into the Delta Lake issue:
>
> - This was a read after write inconsistency _all on the driver_.
> Specifically it currently uses the FileSystem API for reading table logs
> for greater compatibility, but the FileContext API for writes for atomic
> renames. This led to the FileSystem reads becoming stale as they didn't
> have to update their state ID after the FileContext writes from a different
> DFSClient.
> - The FileContext API generally seems less suitable for an HA HDFS setup,
> as each FileContext object creates a new DFSClient that has to re-find the
> active/observer nodes. I know these are cheap operations, but still extra
> overhead and not ideal. This is compounded by the fact that the name
> "FileContext" is misleading, as it sounds like something you should create
> a new instance of per file you want to interact with, and not try to reuse
> across a file system. There's been an open issue for 12 years about adding
> caching to the FileContext/AbstractFileSystem API, but there seems some
> hesitation there due to what happens when you update HDFS while a client is
> still active. This doesn't appear to be a huge issue directly in spark,
> since the main place FileContext is used is for the structured streaming
> commit log for atomic renames, but something to look out for in third party
> libraries. I do see a lot of warnings about the HDFSMetadataLog looking for
> the active namenode which I haven't looked into much. I'd expect to only
> see that once since it seems to properly reuse a single FileContext
> instance.
>
> Adam
>
> On Fri, Aug 20, 2021 at 2:22 PM Steve Loughran <st...@cloudera.com.invalid>
> wrote:
>
>>
>> ooh, this is fun,
>>
>> v2 isn't safe to use unless every task attempt generates files with
>> exactly the same names and it is okay to intermingle the output of two task
>> attempts.
>>
>> This is because task commit can felt partway through (or worse, that
>> process pause for a full GC), and a second attempt committed. Spark commit
>> algorithm assumes that it's OK to commit a 2nd attempt if the first attempt
>> failed, timed out etc. It is for v1, but not v2
>>
>> Therefore: a (nonbinding) -1 to any proposal to switch to v2. You are
>> only changing problems
>>
>>
>> I think the best fix here is to do it in the FileOutputCommitter. Be
>> aware that we are all scared of that class and always want to do the
>> minimum necessary.
>>
>> I will certainly add to the manifest committer, whose "call for reviewers
>> and testing" is still open, especially all the way through spark
>> https://github.com/apache/hadoop/pull/2971
>>
>> That committer works with HDFS too, I'd be interested in anyone
>> benchmarking it on queries with deep/wide directory trees and with
>> different tasks all generating output for the same destination directories
>> (i.e file rename dominates in job commit, not task rename). I'm not
>> optimising it for HDFS -it's trying to deal with cloud storage quirks like
>> nonatomic dir rename (GCS), slow list/file rename perf (everywhere), deep
>> directory delete timeouts, and other cloud storage specific issues.
>>
>>
>> Further reading on the commit problem in general
>> https://github.com/steveloughran/zero-rename-committer/releases/tag/tag_release_2021-05-17
>>
>> -Steve
>>
>>
>>
>> On Tue, 17 Aug 2021 at 17:39, Adam Binford <ad...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> We ran into an interesting issue that I wanted to share as well as get
>>> thoughts on if anything should be done about this. We run our own Hadoop
>>> cluster and recently deployed an Observer Namenode to take some burden off
>>> of our Active Namenode. We mostly use Delta Lake as our format, and
>>> everything seemed great. But when running some one-off analytics we ran
>>> into an issue. Specifically, we did something like:
>>>
>>> "df.<do some analytic>.repartition(1).write.csv()"
>>>
>>> This is our quick way of creating a CSV we can download and do other
>>> things with when our result is some small aggregation. However, we kept
>>> getting an empty output directory (just a _SUCCESS file and nothing else),
>>> even though in the Spark UI it says it wrote some positive number of rows.
>>> Eventually traced it back to our update to use the
>>> ObserverReadProxyProvider in our notebook sessions. I finally figured out
>>> it was due to the "Maintaining Client Consistency" section talked about in
>>> https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html
>>> .
>>>
>>> After setting the auto msync period to a low value, the writes started
>>> working. I kept digging in and realized it's due to how the
>>> FileOutputCommitter v1 algorithm works. During the commitJob phase, the
>>> AM/driver does a file system listing on the output directory to find all
>>> the finished task output files it needs to move to the top level output
>>> directory. But since this is a read, the observer can serve this request,
>>> but it can be out of date and not see the newly written files that just
>>> finished from the executors. The auto msync fixed it because it forced the
>>> driver to do an msync before the read took place. However, frequent auto
>>> msyncs can defeat some of the performance benefits of the Observer.
>>>
>>> The v2 algorithm shouldn't have this issue because the tasks themselves
>>> copy the output to the final directory when they finish, and the driver
>>> simply adds the _SUCCESS at the end. And Hadoop's default is v2, but Spark
>>> overrides that to use v1 by default because of potential correctness
>>> issues, which is fair. While this is mostly an issue with Hadoop, the fact
>>> that Spark defaults to the v1 algorithm makes it somewhat of a Spark
>>> problem. Also, things like Delta Lake (or even regular structured streaming
>>> output I think) shouldn't have issues because they are direct write with
>>> transaction log based, so no file moving on the driver involved.
>>>
>>> So I mostly wanted to share that in case anyone else runs into this same
>>> issue. But also wanted to get thoughts on if anything should be done about
>>> this to prevent it from happening. Several ideas in no particular order:
>>>
>>> - Perform an msync during Spark's commitJob before calling the parent
>>> commitJob. Since this is only available in newer APIs, probably isn't even
>>> possible while maintaining compatibility with older Hadoop versions.
>>> - Attempt to get an msync added upstream in Hadoop's v1 committer's
>>> commitJob
>>> - Attempt to detect the use of the ObserverReadProxyProvider and either
>>> force using v2 committer on the spark side or just print out a warning that
>>> you either need to use the v2 committer or you need to set the auto msync
>>> period very low or 0 to guarantee correct output.
>>> - Simply add something to the Spark docs somewhere about things to know
>>> when using the ObserverReadProxyProvider
>>> - Assume that if you are capable of creating your own Hadoop cluster
>>> with an Observer Namenode you will recognize this limitation quickly, which
>>> it only took me about an hour to figure out so that's also fair
>>>
>>> Thanks,
>>>
>>> --
>>> Adam
>>>
>>
>
> --
> Adam Binford
>

Re: Observer Namenode and Committer Algorithm V1

Posted by Adam Binford <ad...@gmail.com>.
Sharing some things I learned looking into the Delta Lake issue:

- This was a read after write inconsistency _all on the driver_.
Specifically it currently uses the FileSystem API for reading table logs
for greater compatibility, but the FileContext API for writes for atomic
renames. This led to the FileSystem reads becoming stale as they didn't
have to update their state ID after the FileContext writes from a different
DFSClient.
- The FileContext API generally seems less suitable for an HA HDFS setup,
as each FileContext object creates a new DFSClient that has to re-find the
active/observer nodes. I know these are cheap operations, but still extra
overhead and not ideal. This is compounded by the fact that the name
"FileContext" is misleading, as it sounds like something you should create
a new instance of per file you want to interact with, and not try to reuse
across a file system. There's been an open issue for 12 years about adding
caching to the FileContext/AbstractFileSystem API, but there seems some
hesitation there due to what happens when you update HDFS while a client is
still active. This doesn't appear to be a huge issue directly in spark,
since the main place FileContext is used is for the structured streaming
commit log for atomic renames, but something to look out for in third party
libraries. I do see a lot of warnings about the HDFSMetadataLog looking for
the active namenode which I haven't looked into much. I'd expect to only
see that once since it seems to properly reuse a single FileContext
instance.

Adam

On Fri, Aug 20, 2021 at 2:22 PM Steve Loughran <st...@cloudera.com.invalid>
wrote:

>
> ooh, this is fun,
>
> v2 isn't safe to use unless every task attempt generates files with
> exactly the same names and it is okay to intermingle the output of two task
> attempts.
>
> This is because task commit can felt partway through (or worse, that
> process pause for a full GC), and a second attempt committed. Spark commit
> algorithm assumes that it's OK to commit a 2nd attempt if the first attempt
> failed, timed out etc. It is for v1, but not v2
>
> Therefore: a (nonbinding) -1 to any proposal to switch to v2. You are only
> changing problems
>
>
> I think the best fix here is to do it in the FileOutputCommitter. Be aware
> that we are all scared of that class and always want to do the minimum
> necessary.
>
> I will certainly add to the manifest committer, whose "call for reviewers
> and testing" is still open, especially all the way through spark
> https://github.com/apache/hadoop/pull/2971
>
> That committer works with HDFS too, I'd be interested in anyone
> benchmarking it on queries with deep/wide directory trees and with
> different tasks all generating output for the same destination directories
> (i.e file rename dominates in job commit, not task rename). I'm not
> optimising it for HDFS -it's trying to deal with cloud storage quirks like
> nonatomic dir rename (GCS), slow list/file rename perf (everywhere), deep
> directory delete timeouts, and other cloud storage specific issues.
>
>
> Further reading on the commit problem in general
> https://github.com/steveloughran/zero-rename-committer/releases/tag/tag_release_2021-05-17
>
> -Steve
>
>
>
> On Tue, 17 Aug 2021 at 17:39, Adam Binford <ad...@gmail.com> wrote:
>
>> Hi,
>>
>> We ran into an interesting issue that I wanted to share as well as get
>> thoughts on if anything should be done about this. We run our own Hadoop
>> cluster and recently deployed an Observer Namenode to take some burden off
>> of our Active Namenode. We mostly use Delta Lake as our format, and
>> everything seemed great. But when running some one-off analytics we ran
>> into an issue. Specifically, we did something like:
>>
>> "df.<do some analytic>.repartition(1).write.csv()"
>>
>> This is our quick way of creating a CSV we can download and do other
>> things with when our result is some small aggregation. However, we kept
>> getting an empty output directory (just a _SUCCESS file and nothing else),
>> even though in the Spark UI it says it wrote some positive number of rows.
>> Eventually traced it back to our update to use the
>> ObserverReadProxyProvider in our notebook sessions. I finally figured out
>> it was due to the "Maintaining Client Consistency" section talked about in
>> https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html
>> .
>>
>> After setting the auto msync period to a low value, the writes started
>> working. I kept digging in and realized it's due to how the
>> FileOutputCommitter v1 algorithm works. During the commitJob phase, the
>> AM/driver does a file system listing on the output directory to find all
>> the finished task output files it needs to move to the top level output
>> directory. But since this is a read, the observer can serve this request,
>> but it can be out of date and not see the newly written files that just
>> finished from the executors. The auto msync fixed it because it forced the
>> driver to do an msync before the read took place. However, frequent auto
>> msyncs can defeat some of the performance benefits of the Observer.
>>
>> The v2 algorithm shouldn't have this issue because the tasks themselves
>> copy the output to the final directory when they finish, and the driver
>> simply adds the _SUCCESS at the end. And Hadoop's default is v2, but Spark
>> overrides that to use v1 by default because of potential correctness
>> issues, which is fair. While this is mostly an issue with Hadoop, the fact
>> that Spark defaults to the v1 algorithm makes it somewhat of a Spark
>> problem. Also, things like Delta Lake (or even regular structured streaming
>> output I think) shouldn't have issues because they are direct write with
>> transaction log based, so no file moving on the driver involved.
>>
>> So I mostly wanted to share that in case anyone else runs into this same
>> issue. But also wanted to get thoughts on if anything should be done about
>> this to prevent it from happening. Several ideas in no particular order:
>>
>> - Perform an msync during Spark's commitJob before calling the parent
>> commitJob. Since this is only available in newer APIs, probably isn't even
>> possible while maintaining compatibility with older Hadoop versions.
>> - Attempt to get an msync added upstream in Hadoop's v1 committer's
>> commitJob
>> - Attempt to detect the use of the ObserverReadProxyProvider and either
>> force using v2 committer on the spark side or just print out a warning that
>> you either need to use the v2 committer or you need to set the auto msync
>> period very low or 0 to guarantee correct output.
>> - Simply add something to the Spark docs somewhere about things to know
>> when using the ObserverReadProxyProvider
>> - Assume that if you are capable of creating your own Hadoop cluster with
>> an Observer Namenode you will recognize this limitation quickly, which it
>> only took me about an hour to figure out so that's also fair
>>
>> Thanks,
>>
>> --
>> Adam
>>
>

-- 
Adam Binford

Re: Observer Namenode and Committer Algorithm V1

Posted by Steve Loughran <st...@cloudera.com.INVALID>.
ooh, this is fun,

v2 isn't safe to use unless every task attempt generates files with exactly
the same names and it is okay to intermingle the output of two task
attempts.

This is because task commit can felt partway through (or worse, that
process pause for a full GC), and a second attempt committed. Spark commit
algorithm assumes that it's OK to commit a 2nd attempt if the first attempt
failed, timed out etc. It is for v1, but not v2

Therefore: a (nonbinding) -1 to any proposal to switch to v2. You are only
changing problems


I think the best fix here is to do it in the FileOutputCommitter. Be aware
that we are all scared of that class and always want to do the minimum
necessary.

I will certainly add to the manifest committer, whose "call for reviewers
and testing" is still open, especially all the way through spark
https://github.com/apache/hadoop/pull/2971

That committer works with HDFS too, I'd be interested in anyone
benchmarking it on queries with deep/wide directory trees and with
different tasks all generating output for the same destination directories
(i.e file rename dominates in job commit, not task rename). I'm not
optimising it for HDFS -it's trying to deal with cloud storage quirks like
nonatomic dir rename (GCS), slow list/file rename perf (everywhere), deep
directory delete timeouts, and other cloud storage specific issues.


Further reading on the commit problem in general
https://github.com/steveloughran/zero-rename-committer/releases/tag/tag_release_2021-05-17

-Steve



On Tue, 17 Aug 2021 at 17:39, Adam Binford <ad...@gmail.com> wrote:

> Hi,
>
> We ran into an interesting issue that I wanted to share as well as get
> thoughts on if anything should be done about this. We run our own Hadoop
> cluster and recently deployed an Observer Namenode to take some burden off
> of our Active Namenode. We mostly use Delta Lake as our format, and
> everything seemed great. But when running some one-off analytics we ran
> into an issue. Specifically, we did something like:
>
> "df.<do some analytic>.repartition(1).write.csv()"
>
> This is our quick way of creating a CSV we can download and do other
> things with when our result is some small aggregation. However, we kept
> getting an empty output directory (just a _SUCCESS file and nothing else),
> even though in the Spark UI it says it wrote some positive number of rows.
> Eventually traced it back to our update to use the
> ObserverReadProxyProvider in our notebook sessions. I finally figured out
> it was due to the "Maintaining Client Consistency" section talked about in
> https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html
> .
>
> After setting the auto msync period to a low value, the writes started
> working. I kept digging in and realized it's due to how the
> FileOutputCommitter v1 algorithm works. During the commitJob phase, the
> AM/driver does a file system listing on the output directory to find all
> the finished task output files it needs to move to the top level output
> directory. But since this is a read, the observer can serve this request,
> but it can be out of date and not see the newly written files that just
> finished from the executors. The auto msync fixed it because it forced the
> driver to do an msync before the read took place. However, frequent auto
> msyncs can defeat some of the performance benefits of the Observer.
>
> The v2 algorithm shouldn't have this issue because the tasks themselves
> copy the output to the final directory when they finish, and the driver
> simply adds the _SUCCESS at the end. And Hadoop's default is v2, but Spark
> overrides that to use v1 by default because of potential correctness
> issues, which is fair. While this is mostly an issue with Hadoop, the fact
> that Spark defaults to the v1 algorithm makes it somewhat of a Spark
> problem. Also, things like Delta Lake (or even regular structured streaming
> output I think) shouldn't have issues because they are direct write with
> transaction log based, so no file moving on the driver involved.
>
> So I mostly wanted to share that in case anyone else runs into this same
> issue. But also wanted to get thoughts on if anything should be done about
> this to prevent it from happening. Several ideas in no particular order:
>
> - Perform an msync during Spark's commitJob before calling the parent
> commitJob. Since this is only available in newer APIs, probably isn't even
> possible while maintaining compatibility with older Hadoop versions.
> - Attempt to get an msync added upstream in Hadoop's v1 committer's
> commitJob
> - Attempt to detect the use of the ObserverReadProxyProvider and either
> force using v2 committer on the spark side or just print out a warning that
> you either need to use the v2 committer or you need to set the auto msync
> period very low or 0 to guarantee correct output.
> - Simply add something to the Spark docs somewhere about things to know
> when using the ObserverReadProxyProvider
> - Assume that if you are capable of creating your own Hadoop cluster with
> an Observer Namenode you will recognize this limitation quickly, which it
> only took me about an hour to figure out so that's also fair
>
> Thanks,
>
> --
> Adam
>

Re: Observer Namenode and Committer Algorithm V1

Posted by Adam Binford <ad...@gmail.com>.
So it turns out Delta Lake isn't compatible out of the box due to it's
mixed use of the FileContext API for writes and the FileSystem API for
reads on the driver. Bringing that up with those devs now but in the
meantime the auto-msync-only-on-driver trick is already coming in handy,
thanks!

On Wed, Aug 18, 2021 at 10:52 AM Adam Binford <ad...@gmail.com> wrote:

> Ahhh we don't do any RDD checkpointing but that makes sense too. Thanks
> for the tip on setting that on the driver only, I didn't know that was
> possible but it makes a lot of sense.
>
> I couldn't tell you the first thing about reflection but good to know it's
> actually something possible to implement on the Spark side. Only really
> know enough Scala to get my away around the Spark repo. So I probably
> couldn't help much implementing the fixes but happy to test or bounce ideas
> off of. We'll probably stick to the committer v2 for ad-hoc cases and auto
> msync turned off and see if we run into any other issues. Are there any
> issues for this yet? If we encounter anything else I can report it there.
>
> Adam
>
> On Tue, Aug 17, 2021 at 4:17 PM Erik Krogen <xk...@apache.org> wrote:
>
>> Hi Adam,
>>
>> Thanks for this great writeup of the issue. We (LinkedIn) also operate
>> Observer NameNodes, and have observed the same issues, but have not yet
>> gotten around to implementing a proper fix.
>>
>> To add a bit of context from our side, there is at least one other place
>> besides the committer v1 algorithm where this can occur, specifically
>> around RDD checkpointing. In this situation, executors write out data to
>> HDFS, then communicate their status back to the driver, which then tries to
>> gather metadata about those files on HDFS (a listing operation). For the
>> time being, we have worked around this by enabling auto-msync mode (as
>> described in HDFS-14211
>> <https://issues.apache.org/jira/browse/HDFS-14211>)
>> with dfs.client.failover.observer.auto-msync-period.<namespace>=0. We set
>> this in our default configurations *on the driver only*, which helps to
>> make sure we get most of the scalability benefits of the observer reads. We
>> achieve this by putting the config as a System property in
>> spark.driver.defaultJavaOptions. This can cause performance issues with
>> operations which perform many metadata operations serially, but it's a
>> tradeoff we find acceptable for now in terms of correctness vs. performance.
>>
>> Long-term, we believe adding appropriate msync() commands to Spark is the
>> right way forward (the first option you mentioned). I think the
>> documentation mentioned in your 4th option is a good short-term addition,
>> but in the long run, targeted msync() operations will be a more performant
>> fix that can work out-of-the-box. We can hide the calls behind reflection
>> to mitigate concerns around compatibility if needed. There is interest from
>> our side in pursuing this work, and certainly we would be happy to
>> collaborate if there is interest from you or others as well.
>>
>> On Tue, Aug 17, 2021 at 9:40 AM Adam Binford <ad...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> We ran into an interesting issue that I wanted to share as well as get
>>> thoughts on if anything should be done about this. We run our own Hadoop
>>> cluster and recently deployed an Observer Namenode to take some burden off
>>> of our Active Namenode. We mostly use Delta Lake as our format, and
>>> everything seemed great. But when running some one-off analytics we ran
>>> into an issue. Specifically, we did something like:
>>>
>>> "df.<do some analytic>.repartition(1).write.csv()"
>>>
>>> This is our quick way of creating a CSV we can download and do other
>>> things with when our result is some small aggregation. However, we kept
>>> getting an empty output directory (just a _SUCCESS file and nothing else),
>>> even though in the Spark UI it says it wrote some positive number of rows.
>>> Eventually traced it back to our update to use the
>>> ObserverReadProxyProvider in our notebook sessions. I finally figured out
>>> it was due to the "Maintaining Client Consistency" section talked about in
>>> https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html
>>> .
>>>
>>> After setting the auto msync period to a low value, the writes started
>>> working. I kept digging in and realized it's due to how the
>>> FileOutputCommitter v1 algorithm works. During the commitJob phase, the
>>> AM/driver does a file system listing on the output directory to find all
>>> the finished task output files it needs to move to the top level output
>>> directory. But since this is a read, the observer can serve this request,
>>> but it can be out of date and not see the newly written files that just
>>> finished from the executors. The auto msync fixed it because it forced the
>>> driver to do an msync before the read took place. However, frequent auto
>>> msyncs can defeat some of the performance benefits of the Observer.
>>>
>>> The v2 algorithm shouldn't have this issue because the tasks themselves
>>> copy the output to the final directory when they finish, and the driver
>>> simply adds the _SUCCESS at the end. And Hadoop's default is v2, but Spark
>>> overrides that to use v1 by default because of potential correctness
>>> issues, which is fair. While this is mostly an issue with Hadoop, the fact
>>> that Spark defaults to the v1 algorithm makes it somewhat of a Spark
>>> problem. Also, things like Delta Lake (or even regular structured streaming
>>> output I think) shouldn't have issues because they are direct write with
>>> transaction log based, so no file moving on the driver involved.
>>>
>>> So I mostly wanted to share that in case anyone else runs into this same
>>> issue. But also wanted to get thoughts on if anything should be done about
>>> this to prevent it from happening. Several ideas in no particular order:
>>>
>>> - Perform an msync during Spark's commitJob before calling the parent
>>> commitJob. Since this is only available in newer APIs, probably isn't even
>>> possible while maintaining compatibility with older Hadoop versions.
>>> - Attempt to get an msync added upstream in Hadoop's v1 committer's
>>> commitJob
>>> - Attempt to detect the use of the ObserverReadProxyProvider and either
>>> force using v2 committer on the spark side or just print out a warning that
>>> you either need to use the v2 committer or you need to set the auto msync
>>> period very low or 0 to guarantee correct output.
>>> - Simply add something to the Spark docs somewhere about things to know
>>> when using the ObserverReadProxyProvider
>>> - Assume that if you are capable of creating your own Hadoop cluster
>>> with an Observer Namenode you will recognize this limitation quickly, which
>>> it only took me about an hour to figure out so that's also fair
>>>
>>> Thanks,
>>>
>>> --
>>> Adam
>>>
>>
>
> --
> Adam Binford
>


-- 
Adam Binford

Re: Observer Namenode and Committer Algorithm V1

Posted by Adam Binford <ad...@gmail.com>.
Ahhh we don't do any RDD checkpointing but that makes sense too. Thanks for
the tip on setting that on the driver only, I didn't know that was possible
but it makes a lot of sense.

I couldn't tell you the first thing about reflection but good to know it's
actually something possible to implement on the Spark side. Only really
know enough Scala to get my away around the Spark repo. So I probably
couldn't help much implementing the fixes but happy to test or bounce ideas
off of. We'll probably stick to the committer v2 for ad-hoc cases and auto
msync turned off and see if we run into any other issues. Are there any
issues for this yet? If we encounter anything else I can report it there.

Adam

On Tue, Aug 17, 2021 at 4:17 PM Erik Krogen <xk...@apache.org> wrote:

> Hi Adam,
>
> Thanks for this great writeup of the issue. We (LinkedIn) also operate
> Observer NameNodes, and have observed the same issues, but have not yet
> gotten around to implementing a proper fix.
>
> To add a bit of context from our side, there is at least one other place
> besides the committer v1 algorithm where this can occur, specifically
> around RDD checkpointing. In this situation, executors write out data to
> HDFS, then communicate their status back to the driver, which then tries to
> gather metadata about those files on HDFS (a listing operation). For the
> time being, we have worked around this by enabling auto-msync mode (as
> described in HDFS-14211 <https://issues.apache.org/jira/browse/HDFS-14211>)
> with dfs.client.failover.observer.auto-msync-period.<namespace>=0. We set
> this in our default configurations *on the driver only*, which helps to
> make sure we get most of the scalability benefits of the observer reads. We
> achieve this by putting the config as a System property in
> spark.driver.defaultJavaOptions. This can cause performance issues with
> operations which perform many metadata operations serially, but it's a
> tradeoff we find acceptable for now in terms of correctness vs. performance.
>
> Long-term, we believe adding appropriate msync() commands to Spark is the
> right way forward (the first option you mentioned). I think the
> documentation mentioned in your 4th option is a good short-term addition,
> but in the long run, targeted msync() operations will be a more performant
> fix that can work out-of-the-box. We can hide the calls behind reflection
> to mitigate concerns around compatibility if needed. There is interest from
> our side in pursuing this work, and certainly we would be happy to
> collaborate if there is interest from you or others as well.
>
> On Tue, Aug 17, 2021 at 9:40 AM Adam Binford <ad...@gmail.com> wrote:
>
>> Hi,
>>
>> We ran into an interesting issue that I wanted to share as well as get
>> thoughts on if anything should be done about this. We run our own Hadoop
>> cluster and recently deployed an Observer Namenode to take some burden off
>> of our Active Namenode. We mostly use Delta Lake as our format, and
>> everything seemed great. But when running some one-off analytics we ran
>> into an issue. Specifically, we did something like:
>>
>> "df.<do some analytic>.repartition(1).write.csv()"
>>
>> This is our quick way of creating a CSV we can download and do other
>> things with when our result is some small aggregation. However, we kept
>> getting an empty output directory (just a _SUCCESS file and nothing else),
>> even though in the Spark UI it says it wrote some positive number of rows.
>> Eventually traced it back to our update to use the
>> ObserverReadProxyProvider in our notebook sessions. I finally figured out
>> it was due to the "Maintaining Client Consistency" section talked about in
>> https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html
>> .
>>
>> After setting the auto msync period to a low value, the writes started
>> working. I kept digging in and realized it's due to how the
>> FileOutputCommitter v1 algorithm works. During the commitJob phase, the
>> AM/driver does a file system listing on the output directory to find all
>> the finished task output files it needs to move to the top level output
>> directory. But since this is a read, the observer can serve this request,
>> but it can be out of date and not see the newly written files that just
>> finished from the executors. The auto msync fixed it because it forced the
>> driver to do an msync before the read took place. However, frequent auto
>> msyncs can defeat some of the performance benefits of the Observer.
>>
>> The v2 algorithm shouldn't have this issue because the tasks themselves
>> copy the output to the final directory when they finish, and the driver
>> simply adds the _SUCCESS at the end. And Hadoop's default is v2, but Spark
>> overrides that to use v1 by default because of potential correctness
>> issues, which is fair. While this is mostly an issue with Hadoop, the fact
>> that Spark defaults to the v1 algorithm makes it somewhat of a Spark
>> problem. Also, things like Delta Lake (or even regular structured streaming
>> output I think) shouldn't have issues because they are direct write with
>> transaction log based, so no file moving on the driver involved.
>>
>> So I mostly wanted to share that in case anyone else runs into this same
>> issue. But also wanted to get thoughts on if anything should be done about
>> this to prevent it from happening. Several ideas in no particular order:
>>
>> - Perform an msync during Spark's commitJob before calling the parent
>> commitJob. Since this is only available in newer APIs, probably isn't even
>> possible while maintaining compatibility with older Hadoop versions.
>> - Attempt to get an msync added upstream in Hadoop's v1 committer's
>> commitJob
>> - Attempt to detect the use of the ObserverReadProxyProvider and either
>> force using v2 committer on the spark side or just print out a warning that
>> you either need to use the v2 committer or you need to set the auto msync
>> period very low or 0 to guarantee correct output.
>> - Simply add something to the Spark docs somewhere about things to know
>> when using the ObserverReadProxyProvider
>> - Assume that if you are capable of creating your own Hadoop cluster with
>> an Observer Namenode you will recognize this limitation quickly, which it
>> only took me about an hour to figure out so that's also fair
>>
>> Thanks,
>>
>> --
>> Adam
>>
>

-- 
Adam Binford

Re: Observer Namenode and Committer Algorithm V1

Posted by Erik Krogen <xk...@apache.org>.
Hi Adam,

Thanks for this great writeup of the issue. We (LinkedIn) also operate
Observer NameNodes, and have observed the same issues, but have not yet
gotten around to implementing a proper fix.

To add a bit of context from our side, there is at least one other place
besides the committer v1 algorithm where this can occur, specifically
around RDD checkpointing. In this situation, executors write out data to
HDFS, then communicate their status back to the driver, which then tries to
gather metadata about those files on HDFS (a listing operation). For the
time being, we have worked around this by enabling auto-msync mode (as
described in HDFS-14211 <https://issues.apache.org/jira/browse/HDFS-14211>)
with dfs.client.failover.observer.auto-msync-period.<namespace>=0. We set
this in our default configurations *on the driver only*, which helps to
make sure we get most of the scalability benefits of the observer reads. We
achieve this by putting the config as a System property in
spark.driver.defaultJavaOptions. This can cause performance issues with
operations which perform many metadata operations serially, but it's a
tradeoff we find acceptable for now in terms of correctness vs. performance.

Long-term, we believe adding appropriate msync() commands to Spark is the
right way forward (the first option you mentioned). I think the
documentation mentioned in your 4th option is a good short-term addition,
but in the long run, targeted msync() operations will be a more performant
fix that can work out-of-the-box. We can hide the calls behind reflection
to mitigate concerns around compatibility if needed. There is interest from
our side in pursuing this work, and certainly we would be happy to
collaborate if there is interest from you or others as well.

On Tue, Aug 17, 2021 at 9:40 AM Adam Binford <ad...@gmail.com> wrote:

> Hi,
>
> We ran into an interesting issue that I wanted to share as well as get
> thoughts on if anything should be done about this. We run our own Hadoop
> cluster and recently deployed an Observer Namenode to take some burden off
> of our Active Namenode. We mostly use Delta Lake as our format, and
> everything seemed great. But when running some one-off analytics we ran
> into an issue. Specifically, we did something like:
>
> "df.<do some analytic>.repartition(1).write.csv()"
>
> This is our quick way of creating a CSV we can download and do other
> things with when our result is some small aggregation. However, we kept
> getting an empty output directory (just a _SUCCESS file and nothing else),
> even though in the Spark UI it says it wrote some positive number of rows.
> Eventually traced it back to our update to use the
> ObserverReadProxyProvider in our notebook sessions. I finally figured out
> it was due to the "Maintaining Client Consistency" section talked about in
> https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html
> .
>
> After setting the auto msync period to a low value, the writes started
> working. I kept digging in and realized it's due to how the
> FileOutputCommitter v1 algorithm works. During the commitJob phase, the
> AM/driver does a file system listing on the output directory to find all
> the finished task output files it needs to move to the top level output
> directory. But since this is a read, the observer can serve this request,
> but it can be out of date and not see the newly written files that just
> finished from the executors. The auto msync fixed it because it forced the
> driver to do an msync before the read took place. However, frequent auto
> msyncs can defeat some of the performance benefits of the Observer.
>
> The v2 algorithm shouldn't have this issue because the tasks themselves
> copy the output to the final directory when they finish, and the driver
> simply adds the _SUCCESS at the end. And Hadoop's default is v2, but Spark
> overrides that to use v1 by default because of potential correctness
> issues, which is fair. While this is mostly an issue with Hadoop, the fact
> that Spark defaults to the v1 algorithm makes it somewhat of a Spark
> problem. Also, things like Delta Lake (or even regular structured streaming
> output I think) shouldn't have issues because they are direct write with
> transaction log based, so no file moving on the driver involved.
>
> So I mostly wanted to share that in case anyone else runs into this same
> issue. But also wanted to get thoughts on if anything should be done about
> this to prevent it from happening. Several ideas in no particular order:
>
> - Perform an msync during Spark's commitJob before calling the parent
> commitJob. Since this is only available in newer APIs, probably isn't even
> possible while maintaining compatibility with older Hadoop versions.
> - Attempt to get an msync added upstream in Hadoop's v1 committer's
> commitJob
> - Attempt to detect the use of the ObserverReadProxyProvider and either
> force using v2 committer on the spark side or just print out a warning that
> you either need to use the v2 committer or you need to set the auto msync
> period very low or 0 to guarantee correct output.
> - Simply add something to the Spark docs somewhere about things to know
> when using the ObserverReadProxyProvider
> - Assume that if you are capable of creating your own Hadoop cluster with
> an Observer Namenode you will recognize this limitation quickly, which it
> only took me about an hour to figure out so that's also fair
>
> Thanks,
>
> --
> Adam
>