You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Tim Robertson <ti...@gmail.com> on 2018/08/22 09:29:25 UTC

[DISCUSS] Performance of write() in file based IO

Hi folks,

I've recently been involved in projects rewriting Avro files and have
discovered a concerning performance trait in Beam.

I have observed Beam between 6-20x slower than native Spark or MapReduce
code for a simple pipeline of read Avro, modify, write Avro.

 - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark, 40
minutes with a map-only MR job
 - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18
minutes using vanilla Spark code. Test code available [1]

These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark /
YARN) on reference Dell / Cloudera hardware.

I have only just started exploring but I believe the cause is rooted in the
WriteFiles which is used by all our file based IO. WriteFiles is reasonably
complex with reshuffles, spilling to temporary files (presumably to
accommodate varying bundle sizes/avoid small files), a union, a GBK etc.

Before I go too far with exploration I'd appreciate thoughts on whether we
believe this is a concern (I do), if we should explore optimisations or any
insight from previous work in this area.

Thanks,
Tim

[1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro

Re: [DISCUSS] Performance of write() in file based IO

Posted by Reuven Lax <re...@google.com>.
I think we need dig in more to understand where the slowness is. Some
context (which might not be obvious from the code):

* Much of the complexity in WriteFiles is not always active. e.g. a lot of
it is there to support dynamic output (where the filename is dynamically
chosen based on the input record), and if you're not using dynamic output a
lot of those codepaths will not be used.

* There is some overhead because Beam does not assume that ParDos are
deterministic (by contrast, Spark often assumes that user code is
deterministic), and so inserts a shuffle to make sure that file writes are
deterministic. I believe that the current Spark runner might checkpoint the
entire RDD in such a case, which is quite inefficient. We should try on
other runners to make sure that this issue is not specific to the Spark
runner.

* Spilling to temporary files is done to avoid workers crashing with out of
memory. Beam attempts to write files straight out of the bundle (to avoid
shuffling all the data and just shuffle filenames). However empirically
when there are too many files we get large bundles and all the file write
buffers cause the workers to start running out of memory; a solution is to
reshuffle the data to distribute it. This will only happen if you are using
windowed writes or dynamic destinations to write to dynamic locations,
otherwise the spilled code path is never executed.

On Wed, Aug 22, 2018 at 2:29 AM Tim Robertson <ti...@gmail.com>
wrote:

> Hi folks,
>
> I've recently been involved in projects rewriting Avro files and have
> discovered a concerning performance trait in Beam.
>
> I have observed Beam between 6-20x slower than native Spark or MapReduce
> code for a simple pipeline of read Avro, modify, write Avro.
>
>  - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark,
> 40 minutes with a map-only MR job
>  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18
> minutes using vanilla Spark code. Test code available [1]
>
> These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark /
> YARN) on reference Dell / Cloudera hardware.
>
> I have only just started exploring but I believe the cause is rooted in
> the WriteFiles which is used by all our file based IO. WriteFiles is
> reasonably complex with reshuffles, spilling to temporary files (presumably
> to accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
>
> Before I go too far with exploration I'd appreciate thoughts on whether we
> believe this is a concern (I do), if we should explore optimisations or any
> insight from previous work in this area.
>
> Thanks,
> Tim
>
> [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>

Re: [DISCUSS] Performance of write() in file based IO

Posted by Tim Robertson <ti...@gmail.com>.
> Are we seeing similar discrepancies for Flink?

I am not sure I'm afraid (no easy access to flink right now). I tried
without success to get Apex runner going on Cloudera YARN for this today -
I'll keep trying when time allows.

I've updated the DAGs to show more detail:
https://github.com/gbif/beam-perf/tree/master/avro-to-avro

On Wed, Aug 22, 2018 at 1:41 PM Robert Bradshaw <ro...@google.com> wrote:

> That is quite the DAG... Are we seeing similar discrepancies for
> Flink? (Trying to understand if this is Beam->Spark translation bloat,
> or inherent to the WriteFiles transform itself.)
> On Wed, Aug 22, 2018 at 1:35 PM Tim Robertson <ti...@gmail.com>
> wrote:
> >
> > Thanks Robert
> >
> > > Have you profiled to see which stages and/or operations are taking up
> the time?
> >
> > Not yet. I'm browsing through the spark DAG produced which I've
> committed [1] and reading the code.
> >
> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
> >
> > On Wed, Aug 22, 2018 at 12:12 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> I agree that this is concerning. Some of the complexity may have also
> >> been introduced to accommodate writing files in Streaming mode, but it
> >> seems we should be able to execute this as a single Map operation.
> >>
> >> Have you profiled to see which stages and/or operations are taking up
> the time?
> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
> >> <ti...@gmail.com> wrote:
> >> >
> >> > Hi folks,
> >> >
> >> > I've recently been involved in projects rewriting Avro files and have
> discovered a concerning performance trait in Beam.
> >> >
> >> > I have observed Beam between 6-20x slower than native Spark or
> MapReduce code for a simple pipeline of read Avro, modify, write Avro.
> >> >
> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
> Beam/Spark, 40 minutes with a map-only MR job
> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark,
> 18 minutes using vanilla Spark code. Test code available [1]
> >> >
> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters
> (Spark / YARN) on reference Dell / Cloudera hardware.
> >> >
> >> > I have only just started exploring but I believe the cause is rooted
> in the WriteFiles which is used by all our file based IO. WriteFiles is
> reasonably complex with reshuffles, spilling to temporary files (presumably
> to accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
> >> >
> >> > Before I go too far with exploration I'd appreciate thoughts on
> whether we believe this is a concern (I do), if we should explore
> optimisations or any insight from previous work in this area.
> >> >
> >> > Thanks,
> >> > Tim
> >> >
> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>

Re: [DISCUSS] Performance of write() in file based IO

Posted by Robert Bradshaw <ro...@google.com>.
That is quite the DAG... Are we seeing similar discrepancies for
Flink? (Trying to understand if this is Beam->Spark translation bloat,
or inherent to the WriteFiles transform itself.)
On Wed, Aug 22, 2018 at 1:35 PM Tim Robertson <ti...@gmail.com> wrote:
>
> Thanks Robert
>
> > Have you profiled to see which stages and/or operations are taking up the time?
>
> Not yet. I'm browsing through the spark DAG produced which I've committed [1] and reading the code.
>
> [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>
> On Wed, Aug 22, 2018 at 12:12 PM Robert Bradshaw <ro...@google.com> wrote:
>>
>> I agree that this is concerning. Some of the complexity may have also
>> been introduced to accommodate writing files in Streaming mode, but it
>> seems we should be able to execute this as a single Map operation.
>>
>> Have you profiled to see which stages and/or operations are taking up the time?
>> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>> <ti...@gmail.com> wrote:
>> >
>> > Hi folks,
>> >
>> > I've recently been involved in projects rewriting Avro files and have discovered a concerning performance trait in Beam.
>> >
>> > I have observed Beam between 6-20x slower than native Spark or MapReduce code for a simple pipeline of read Avro, modify, write Avro.
>> >
>> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark, 40 minutes with a map-only MR job
>> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
>> >
>> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark / YARN) on reference Dell / Cloudera hardware.
>> >
>> > I have only just started exploring but I believe the cause is rooted in the WriteFiles which is used by all our file based IO. WriteFiles is reasonably complex with reshuffles, spilling to temporary files (presumably to accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
>> >
>> > Before I go too far with exploration I'd appreciate thoughts on whether we believe this is a concern (I do), if we should explore optimisations or any insight from previous work in this area.
>> >
>> > Thanks,
>> > Tim
>> >
>> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro

Re: [DISCUSS] Performance of write() in file based IO

Posted by Tim Robertson <ti...@gmail.com>.
Thanks Robert

> Have you profiled to see which stages and/or operations are taking up the
time?

Not yet. I'm browsing through the spark DAG produced which I've committed
[1] and reading the code.

[1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro

On Wed, Aug 22, 2018 at 12:12 PM Robert Bradshaw <ro...@google.com>
wrote:

> I agree that this is concerning. Some of the complexity may have also
> been introduced to accommodate writing files in Streaming mode, but it
> seems we should be able to execute this as a single Map operation.
>
> Have you profiled to see which stages and/or operations are taking up the
> time?
> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
> <ti...@gmail.com> wrote:
> >
> > Hi folks,
> >
> > I've recently been involved in projects rewriting Avro files and have
> discovered a concerning performance trait in Beam.
> >
> > I have observed Beam between 6-20x slower than native Spark or MapReduce
> code for a simple pipeline of read Avro, modify, write Avro.
> >
> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark,
> 40 minutes with a map-only MR job
> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18
> minutes using vanilla Spark code. Test code available [1]
> >
> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark /
> YARN) on reference Dell / Cloudera hardware.
> >
> > I have only just started exploring but I believe the cause is rooted in
> the WriteFiles which is used by all our file based IO. WriteFiles is
> reasonably complex with reshuffles, spilling to temporary files (presumably
> to accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
> >
> > Before I go too far with exploration I'd appreciate thoughts on whether
> we believe this is a concern (I do), if we should explore optimisations or
> any insight from previous work in this area.
> >
> > Thanks,
> > Tim
> >
> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>

Re: [DISCUSS] Performance of write() in file based IO

Posted by Reuven Lax <re...@google.com>.
Tim, thanks for digging into this! There are some complexities fixing the
bug (i.e. Beam currently allows the temp directory to be different than the
target directory), but let's continue discussion on that JIRA.

Reuven

On Thu, Aug 23, 2018 at 6:05 AM Tim Robertson <ti...@gmail.com>
wrote:

> Thanks for linking this discussion with BEAM-5036 (and transitively to
> BEAM-4861 which also comes in to play) Jozek.
>
> What Reuven speculated and Jozek had previously observed is indeed the
> major cause. Today I've been testing the effect of a "move" using rename()
> instead of a copy() and delete().
>
> My test environment is different today but still using 1.5TB input data
> and the code I linked earlier in GH [1]:
>
>   - Spark API: 35 minutes
>   - Beam AvroIO (2.6.0): 1.7hrs
>   - Beam AvroIO with rename() patch: 42 minutes
>
> On the DAG linked in the GH repo [1] stages 3&4 are reduced to seconds
> saving 53 minutes from Beam 2.6.0 version which is the predominant gain
> here.
>
> Unless new comments come in I propose fixing BEAM-5036 and BEAM-4861 and
> continuing discussion on those Jiras.
> This requires a bit of exploration and decision around the expectations of
> e.g. the target directory not existing and also correcting the incorrect
> use of the HDFS API (it ignores the return value which can indicate error
> on e.g. directory not existing today).
>
> Thank you all for contributing to this discussion.
>
> [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>
>
>
> On Thu, Aug 23, 2018 at 11:55 AM Jozef Vilcek <jo...@gmail.com>
> wrote:
>
>> Just for reference, there is a JIRA open for
>> FileBasedSink.moveToOutputFiles()  and filesystem move behavior
>>
>> https://issues.apache.org/jira/browse/BEAM-5036
>>
>>
>> On Wed, Aug 22, 2018 at 9:15 PM Tim Robertson <ti...@gmail.com>
>> wrote:
>>
>>> Reuven, I think you might be on to something
>>>
>>> The Beam HadoopFileSystem copy() does indeed stream through the driver
>>> [1], and the FileBasedSink.moveToOutputFiles() seemingly uses that method
>>> [2].
>>> I'll cobble together a patched version to test using a rename() rather
>>> than a copy() and report back findings before we consider the implications.
>>>
>>> Thanks
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L124
>>> [2]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L288
>>>
>>> On Wed, Aug 22, 2018 at 8:52 PM Tim Robertson <ti...@gmail.com>
>>> wrote:
>>>
>>>> > Does HDFS support a fast rename operation?
>>>>
>>>> Yes. From the shell it is “mv” and in the Java API it is “rename(Path
>>>> src, Path dst)”.
>>>> I am not aware of a fast copy though. I think an HDFS copy streams the
>>>> bytes through the driver (unless a distcp is issued which is a MR job).
>>>>
>>>> (Thanks for engaging in this discussion folks)
>>>>
>>>>
>>>> On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> I have another theory: in FileBasedSink.moveToOutputFiles we copy the
>>>>> temporary files to the final destination and then delete the temp files.
>>>>> Does HDFS support a fast rename operation? If so, I bet Spark is using that
>>>>> instead of paying the cost of copying the files.
>>>>>
>>>>> On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Ismael, that should already be true. If not using dynamic
>>>>>> destinations there might be some edges in the graph that are never used
>>>>>> (i.e. no records are ever published on them), but that should not affect
>>>>>> performance. If this is not the case we should fix it.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía <ie...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Spark runner uses the Spark broadcast mechanism to materialize the
>>>>>>> side input PCollections in the workers, not sure exactly if this is
>>>>>>> efficient assigned in an optimal way but seems logical at least.
>>>>>>>
>>>>>>> Just wondering if we shouldn't better first tackle the fact that if
>>>>>>> the pipeline does not have dynamic destinations (this case)
>>>>>>> WriteFiles
>>>>>>> should not be doing so much extra magic?
>>>>>>>
>>>>>>> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax <re...@google.com> wrote:
>>>>>>> >
>>>>>>> > Often only the metadata (i.e. temp file names) are shuffled,
>>>>>>> except in the "spilling" case (which should only happen when using dynamic
>>>>>>> destinations).
>>>>>>> >
>>>>>>> > WriteFiles depends heavily on side inputs. How are side inputs
>>>>>>> implemented in the Spark runner?
>>>>>>> >
>>>>>>> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw <
>>>>>>> robertwb@google.com> wrote:
>>>>>>> >>
>>>>>>> >> Yes, I stand corrected, dynamic writes is now much more than the
>>>>>>> >> primitive window-based naming we used to have.
>>>>>>> >>
>>>>>>> >> It would be interesting to visualize how much of this codepath is
>>>>>>> >> metatada vs. the actual data.
>>>>>>> >>
>>>>>>> >> In the case of file writing, it seems one could (maybe?) avoid
>>>>>>> >> requiring a stable input, as shards are accepted as a whole
>>>>>>> (unlike,
>>>>>>> >> say, sinks where a deterministic uid is needed for deduplication
>>>>>>> on
>>>>>>> >> retry).
>>>>>>> >>
>>>>>>> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax <re...@google.com>
>>>>>>> wrote:
>>>>>>> >> >
>>>>>>> >> > Robert - much of the complexity isn't due to streaming, but
>>>>>>> rather because WriteFiles supports "dynamic" output (where the user can
>>>>>>> choose a destination file based on the input record). In practice if a
>>>>>>> pipeline is not using dynamic destinations the full graph is still
>>>>>>> generated, but much of that graph is never used (empty PCollections).
>>>>>>> >> >
>>>>>>> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw <
>>>>>>> robertwb@google.com> wrote:
>>>>>>> >> >>
>>>>>>> >> >> I agree that this is concerning. Some of the complexity may
>>>>>>> have also
>>>>>>> >> >> been introduced to accommodate writing files in Streaming
>>>>>>> mode, but it
>>>>>>> >> >> seems we should be able to execute this as a single Map
>>>>>>> operation.
>>>>>>> >> >>
>>>>>>> >> >> Have you profiled to see which stages and/or operations are
>>>>>>> taking up the time?
>>>>>>> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>>>>>>> >> >> <ti...@gmail.com> wrote:
>>>>>>> >> >> >
>>>>>>> >> >> > Hi folks,
>>>>>>> >> >> >
>>>>>>> >> >> > I've recently been involved in projects rewriting Avro files
>>>>>>> and have discovered a concerning performance trait in Beam.
>>>>>>> >> >> >
>>>>>>> >> >> > I have observed Beam between 6-20x slower than native Spark
>>>>>>> or MapReduce code for a simple pipeline of read Avro, modify, write Avro.
>>>>>>> >> >> >
>>>>>>> >> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
>>>>>>> Beam/Spark, 40 minutes with a map-only MR job
>>>>>>> >> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using
>>>>>>> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
>>>>>>> >> >> >
>>>>>>> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x
>>>>>>> clusters (Spark / YARN) on reference Dell / Cloudera hardware.
>>>>>>> >> >> >
>>>>>>> >> >> > I have only just started exploring but I believe the cause
>>>>>>> is rooted in the WriteFiles which is used by all our file based IO.
>>>>>>> WriteFiles is reasonably complex with reshuffles, spilling to temporary
>>>>>>> files (presumably to accommodate varying bundle sizes/avoid small files), a
>>>>>>> union, a GBK etc.
>>>>>>> >> >> >
>>>>>>> >> >> > Before I go too far with exploration I'd appreciate thoughts
>>>>>>> on whether we believe this is a concern (I do), if we should explore
>>>>>>> optimisations or any insight from previous work in this area.
>>>>>>> >> >> >
>>>>>>> >> >> > Thanks,
>>>>>>> >> >> > Tim
>>>>>>> >> >> >
>>>>>>> >> >> > [1]
>>>>>>> https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>>>>>>>
>>>>>>

Re: [DISCUSS] Performance of write() in file based IO

Posted by Tim Robertson <ti...@gmail.com>.
Thanks for linking this discussion with BEAM-5036 (and transitively to
BEAM-4861 which also comes in to play) Jozek.

What Reuven speculated and Jozek had previously observed is indeed the
major cause. Today I've been testing the effect of a "move" using rename()
instead of a copy() and delete().

My test environment is different today but still using 1.5TB input data and
the code I linked earlier in GH [1]:

  - Spark API: 35 minutes
  - Beam AvroIO (2.6.0): 1.7hrs
  - Beam AvroIO with rename() patch: 42 minutes

On the DAG linked in the GH repo [1] stages 3&4 are reduced to seconds
saving 53 minutes from Beam 2.6.0 version which is the predominant gain
here.

Unless new comments come in I propose fixing BEAM-5036 and BEAM-4861 and
continuing discussion on those Jiras.
This requires a bit of exploration and decision around the expectations of
e.g. the target directory not existing and also correcting the incorrect
use of the HDFS API (it ignores the return value which can indicate error
on e.g. directory not existing today).

Thank you all for contributing to this discussion.

[1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro



On Thu, Aug 23, 2018 at 11:55 AM Jozef Vilcek <jo...@gmail.com> wrote:

> Just for reference, there is a JIRA open for
> FileBasedSink.moveToOutputFiles()  and filesystem move behavior
>
> https://issues.apache.org/jira/browse/BEAM-5036
>
>
> On Wed, Aug 22, 2018 at 9:15 PM Tim Robertson <ti...@gmail.com>
> wrote:
>
>> Reuven, I think you might be on to something
>>
>> The Beam HadoopFileSystem copy() does indeed stream through the driver
>> [1], and the FileBasedSink.moveToOutputFiles() seemingly uses that method
>> [2].
>> I'll cobble together a patched version to test using a rename() rather
>> than a copy() and report back findings before we consider the implications.
>>
>> Thanks
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L124
>> [2]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L288
>>
>> On Wed, Aug 22, 2018 at 8:52 PM Tim Robertson <ti...@gmail.com>
>> wrote:
>>
>>> > Does HDFS support a fast rename operation?
>>>
>>> Yes. From the shell it is “mv” and in the Java API it is “rename(Path
>>> src, Path dst)”.
>>> I am not aware of a fast copy though. I think an HDFS copy streams the
>>> bytes through the driver (unless a distcp is issued which is a MR job).
>>>
>>> (Thanks for engaging in this discussion folks)
>>>
>>>
>>> On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> I have another theory: in FileBasedSink.moveToOutputFiles we copy the
>>>> temporary files to the final destination and then delete the temp files.
>>>> Does HDFS support a fast rename operation? If so, I bet Spark is using that
>>>> instead of paying the cost of copying the files.
>>>>
>>>> On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Ismael, that should already be true. If not using dynamic destinations
>>>>> there might be some edges in the graph that are never used (i.e. no records
>>>>> are ever published on them), but that should not affect performance. If
>>>>> this is not the case we should fix it.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía <ie...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Spark runner uses the Spark broadcast mechanism to materialize the
>>>>>> side input PCollections in the workers, not sure exactly if this is
>>>>>> efficient assigned in an optimal way but seems logical at least.
>>>>>>
>>>>>> Just wondering if we shouldn't better first tackle the fact that if
>>>>>> the pipeline does not have dynamic destinations (this case) WriteFiles
>>>>>> should not be doing so much extra magic?
>>>>>>
>>>>>> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax <re...@google.com> wrote:
>>>>>> >
>>>>>> > Often only the metadata (i.e. temp file names) are shuffled, except
>>>>>> in the "spilling" case (which should only happen when using dynamic
>>>>>> destinations).
>>>>>> >
>>>>>> > WriteFiles depends heavily on side inputs. How are side inputs
>>>>>> implemented in the Spark runner?
>>>>>> >
>>>>>> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw <
>>>>>> robertwb@google.com> wrote:
>>>>>> >>
>>>>>> >> Yes, I stand corrected, dynamic writes is now much more than the
>>>>>> >> primitive window-based naming we used to have.
>>>>>> >>
>>>>>> >> It would be interesting to visualize how much of this codepath is
>>>>>> >> metatada vs. the actual data.
>>>>>> >>
>>>>>> >> In the case of file writing, it seems one could (maybe?) avoid
>>>>>> >> requiring a stable input, as shards are accepted as a whole
>>>>>> (unlike,
>>>>>> >> say, sinks where a deterministic uid is needed for deduplication on
>>>>>> >> retry).
>>>>>> >>
>>>>>> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax <re...@google.com>
>>>>>> wrote:
>>>>>> >> >
>>>>>> >> > Robert - much of the complexity isn't due to streaming, but
>>>>>> rather because WriteFiles supports "dynamic" output (where the user can
>>>>>> choose a destination file based on the input record). In practice if a
>>>>>> pipeline is not using dynamic destinations the full graph is still
>>>>>> generated, but much of that graph is never used (empty PCollections).
>>>>>> >> >
>>>>>> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw <
>>>>>> robertwb@google.com> wrote:
>>>>>> >> >>
>>>>>> >> >> I agree that this is concerning. Some of the complexity may
>>>>>> have also
>>>>>> >> >> been introduced to accommodate writing files in Streaming mode,
>>>>>> but it
>>>>>> >> >> seems we should be able to execute this as a single Map
>>>>>> operation.
>>>>>> >> >>
>>>>>> >> >> Have you profiled to see which stages and/or operations are
>>>>>> taking up the time?
>>>>>> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>>>>>> >> >> <ti...@gmail.com> wrote:
>>>>>> >> >> >
>>>>>> >> >> > Hi folks,
>>>>>> >> >> >
>>>>>> >> >> > I've recently been involved in projects rewriting Avro files
>>>>>> and have discovered a concerning performance trait in Beam.
>>>>>> >> >> >
>>>>>> >> >> > I have observed Beam between 6-20x slower than native Spark
>>>>>> or MapReduce code for a simple pipeline of read Avro, modify, write Avro.
>>>>>> >> >> >
>>>>>> >> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
>>>>>> Beam/Spark, 40 minutes with a map-only MR job
>>>>>> >> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using
>>>>>> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
>>>>>> >> >> >
>>>>>> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x
>>>>>> clusters (Spark / YARN) on reference Dell / Cloudera hardware.
>>>>>> >> >> >
>>>>>> >> >> > I have only just started exploring but I believe the cause is
>>>>>> rooted in the WriteFiles which is used by all our file based IO. WriteFiles
>>>>>> is reasonably complex with reshuffles, spilling to temporary files
>>>>>> (presumably to accommodate varying bundle sizes/avoid small files), a
>>>>>> union, a GBK etc.
>>>>>> >> >> >
>>>>>> >> >> > Before I go too far with exploration I'd appreciate thoughts
>>>>>> on whether we believe this is a concern (I do), if we should explore
>>>>>> optimisations or any insight from previous work in this area.
>>>>>> >> >> >
>>>>>> >> >> > Thanks,
>>>>>> >> >> > Tim
>>>>>> >> >> >
>>>>>> >> >> > [1]
>>>>>> https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>>>>>>
>>>>>

Re: [DISCUSS] Performance of write() in file based IO

Posted by Jozef Vilcek <jo...@gmail.com>.
Just for reference, there is a JIRA open for
FileBasedSink.moveToOutputFiles()  and filesystem move behavior

https://issues.apache.org/jira/browse/BEAM-5036


On Wed, Aug 22, 2018 at 9:15 PM Tim Robertson <ti...@gmail.com>
wrote:

> Reuven, I think you might be on to something
>
> The Beam HadoopFileSystem copy() does indeed stream through the driver
> [1], and the FileBasedSink.moveToOutputFiles() seemingly uses that method
> [2].
> I'll cobble together a patched version to test using a rename() rather
> than a copy() and report back findings before we consider the implications.
>
> Thanks
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L124
> [2]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L288
>
> On Wed, Aug 22, 2018 at 8:52 PM Tim Robertson <ti...@gmail.com>
> wrote:
>
>> > Does HDFS support a fast rename operation?
>>
>> Yes. From the shell it is “mv” and in the Java API it is “rename(Path
>> src, Path dst)”.
>> I am not aware of a fast copy though. I think an HDFS copy streams the
>> bytes through the driver (unless a distcp is issued which is a MR job).
>>
>> (Thanks for engaging in this discussion folks)
>>
>>
>> On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax <re...@google.com> wrote:
>>
>>> I have another theory: in FileBasedSink.moveToOutputFiles we copy the
>>> temporary files to the final destination and then delete the temp files.
>>> Does HDFS support a fast rename operation? If so, I bet Spark is using that
>>> instead of paying the cost of copying the files.
>>>
>>> On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Ismael, that should already be true. If not using dynamic destinations
>>>> there might be some edges in the graph that are never used (i.e. no records
>>>> are ever published on them), but that should not affect performance. If
>>>> this is not the case we should fix it.
>>>>
>>>> Reuven
>>>>
>>>> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía <ie...@gmail.com> wrote:
>>>>
>>>>> Spark runner uses the Spark broadcast mechanism to materialize the
>>>>> side input PCollections in the workers, not sure exactly if this is
>>>>> efficient assigned in an optimal way but seems logical at least.
>>>>>
>>>>> Just wondering if we shouldn't better first tackle the fact that if
>>>>> the pipeline does not have dynamic destinations (this case) WriteFiles
>>>>> should not be doing so much extra magic?
>>>>>
>>>>> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax <re...@google.com> wrote:
>>>>> >
>>>>> > Often only the metadata (i.e. temp file names) are shuffled, except
>>>>> in the "spilling" case (which should only happen when using dynamic
>>>>> destinations).
>>>>> >
>>>>> > WriteFiles depends heavily on side inputs. How are side inputs
>>>>> implemented in the Spark runner?
>>>>> >
>>>>> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw <ro...@google.com>
>>>>> wrote:
>>>>> >>
>>>>> >> Yes, I stand corrected, dynamic writes is now much more than the
>>>>> >> primitive window-based naming we used to have.
>>>>> >>
>>>>> >> It would be interesting to visualize how much of this codepath is
>>>>> >> metatada vs. the actual data.
>>>>> >>
>>>>> >> In the case of file writing, it seems one could (maybe?) avoid
>>>>> >> requiring a stable input, as shards are accepted as a whole (unlike,
>>>>> >> say, sinks where a deterministic uid is needed for deduplication on
>>>>> >> retry).
>>>>> >>
>>>>> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax <re...@google.com>
>>>>> wrote:
>>>>> >> >
>>>>> >> > Robert - much of the complexity isn't due to streaming, but
>>>>> rather because WriteFiles supports "dynamic" output (where the user can
>>>>> choose a destination file based on the input record). In practice if a
>>>>> pipeline is not using dynamic destinations the full graph is still
>>>>> generated, but much of that graph is never used (empty PCollections).
>>>>> >> >
>>>>> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw <
>>>>> robertwb@google.com> wrote:
>>>>> >> >>
>>>>> >> >> I agree that this is concerning. Some of the complexity may have
>>>>> also
>>>>> >> >> been introduced to accommodate writing files in Streaming mode,
>>>>> but it
>>>>> >> >> seems we should be able to execute this as a single Map
>>>>> operation.
>>>>> >> >>
>>>>> >> >> Have you profiled to see which stages and/or operations are
>>>>> taking up the time?
>>>>> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>>>>> >> >> <ti...@gmail.com> wrote:
>>>>> >> >> >
>>>>> >> >> > Hi folks,
>>>>> >> >> >
>>>>> >> >> > I've recently been involved in projects rewriting Avro files
>>>>> and have discovered a concerning performance trait in Beam.
>>>>> >> >> >
>>>>> >> >> > I have observed Beam between 6-20x slower than native Spark or
>>>>> MapReduce code for a simple pipeline of read Avro, modify, write Avro.
>>>>> >> >> >
>>>>> >> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
>>>>> Beam/Spark, 40 minutes with a map-only MR job
>>>>> >> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using
>>>>> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
>>>>> >> >> >
>>>>> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x
>>>>> clusters (Spark / YARN) on reference Dell / Cloudera hardware.
>>>>> >> >> >
>>>>> >> >> > I have only just started exploring but I believe the cause is
>>>>> rooted in the WriteFiles which is used by all our file based IO. WriteFiles
>>>>> is reasonably complex with reshuffles, spilling to temporary files
>>>>> (presumably to accommodate varying bundle sizes/avoid small files), a
>>>>> union, a GBK etc.
>>>>> >> >> >
>>>>> >> >> > Before I go too far with exploration I'd appreciate thoughts
>>>>> on whether we believe this is a concern (I do), if we should explore
>>>>> optimisations or any insight from previous work in this area.
>>>>> >> >> >
>>>>> >> >> > Thanks,
>>>>> >> >> > Tim
>>>>> >> >> >
>>>>> >> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>>>>>
>>>>

Re: [DISCUSS] Performance of write() in file based IO

Posted by Tim Robertson <ti...@gmail.com>.
Reuven, I think you might be on to something

The Beam HadoopFileSystem copy() does indeed stream through the driver [1],
and the FileBasedSink.moveToOutputFiles() seemingly uses that method [2].
I'll cobble together a patched version to test using a rename() rather than
a copy() and report back findings before we consider the implications.

Thanks

[1]
https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L124
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L288

On Wed, Aug 22, 2018 at 8:52 PM Tim Robertson <ti...@gmail.com>
wrote:

> > Does HDFS support a fast rename operation?
>
> Yes. From the shell it is “mv” and in the Java API it is “rename(Path src,
> Path dst)”.
> I am not aware of a fast copy though. I think an HDFS copy streams the
> bytes through the driver (unless a distcp is issued which is a MR job).
>
> (Thanks for engaging in this discussion folks)
>
>
> On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax <re...@google.com> wrote:
>
>> I have another theory: in FileBasedSink.moveToOutputFiles we copy the
>> temporary files to the final destination and then delete the temp files.
>> Does HDFS support a fast rename operation? If so, I bet Spark is using that
>> instead of paying the cost of copying the files.
>>
>> On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax <re...@google.com> wrote:
>>
>>> Ismael, that should already be true. If not using dynamic destinations
>>> there might be some edges in the graph that are never used (i.e. no records
>>> are ever published on them), but that should not affect performance. If
>>> this is not the case we should fix it.
>>>
>>> Reuven
>>>
>>> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía <ie...@gmail.com> wrote:
>>>
>>>> Spark runner uses the Spark broadcast mechanism to materialize the
>>>> side input PCollections in the workers, not sure exactly if this is
>>>> efficient assigned in an optimal way but seems logical at least.
>>>>
>>>> Just wondering if we shouldn't better first tackle the fact that if
>>>> the pipeline does not have dynamic destinations (this case) WriteFiles
>>>> should not be doing so much extra magic?
>>>>
>>>> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax <re...@google.com> wrote:
>>>> >
>>>> > Often only the metadata (i.e. temp file names) are shuffled, except
>>>> in the "spilling" case (which should only happen when using dynamic
>>>> destinations).
>>>> >
>>>> > WriteFiles depends heavily on side inputs. How are side inputs
>>>> implemented in the Spark runner?
>>>> >
>>>> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>> >>
>>>> >> Yes, I stand corrected, dynamic writes is now much more than the
>>>> >> primitive window-based naming we used to have.
>>>> >>
>>>> >> It would be interesting to visualize how much of this codepath is
>>>> >> metatada vs. the actual data.
>>>> >>
>>>> >> In the case of file writing, it seems one could (maybe?) avoid
>>>> >> requiring a stable input, as shards are accepted as a whole (unlike,
>>>> >> say, sinks where a deterministic uid is needed for deduplication on
>>>> >> retry).
>>>> >>
>>>> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax <re...@google.com> wrote:
>>>> >> >
>>>> >> > Robert - much of the complexity isn't due to streaming, but rather
>>>> because WriteFiles supports "dynamic" output (where the user can choose a
>>>> destination file based on the input record). In practice if a pipeline is
>>>> not using dynamic destinations the full graph is still generated, but much
>>>> of that graph is never used (empty PCollections).
>>>> >> >
>>>> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw <
>>>> robertwb@google.com> wrote:
>>>> >> >>
>>>> >> >> I agree that this is concerning. Some of the complexity may have
>>>> also
>>>> >> >> been introduced to accommodate writing files in Streaming mode,
>>>> but it
>>>> >> >> seems we should be able to execute this as a single Map operation.
>>>> >> >>
>>>> >> >> Have you profiled to see which stages and/or operations are
>>>> taking up the time?
>>>> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>>>> >> >> <ti...@gmail.com> wrote:
>>>> >> >> >
>>>> >> >> > Hi folks,
>>>> >> >> >
>>>> >> >> > I've recently been involved in projects rewriting Avro files
>>>> and have discovered a concerning performance trait in Beam.
>>>> >> >> >
>>>> >> >> > I have observed Beam between 6-20x slower than native Spark or
>>>> MapReduce code for a simple pipeline of read Avro, modify, write Avro.
>>>> >> >> >
>>>> >> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
>>>> Beam/Spark, 40 minutes with a map-only MR job
>>>> >> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using
>>>> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
>>>> >> >> >
>>>> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters
>>>> (Spark / YARN) on reference Dell / Cloudera hardware.
>>>> >> >> >
>>>> >> >> > I have only just started exploring but I believe the cause is
>>>> rooted in the WriteFiles which is used by all our file based IO. WriteFiles
>>>> is reasonably complex with reshuffles, spilling to temporary files
>>>> (presumably to accommodate varying bundle sizes/avoid small files), a
>>>> union, a GBK etc.
>>>> >> >> >
>>>> >> >> > Before I go too far with exploration I'd appreciate thoughts on
>>>> whether we believe this is a concern (I do), if we should explore
>>>> optimisations or any insight from previous work in this area.
>>>> >> >> >
>>>> >> >> > Thanks,
>>>> >> >> > Tim
>>>> >> >> >
>>>> >> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>>>>
>>>

Re: [DISCUSS] Performance of write() in file based IO

Posted by Tim Robertson <ti...@gmail.com>.
> Does HDFS support a fast rename operation?

Yes. From the shell it is “mv” and in the Java API it is “rename(Path src,
Path dst)”.
I am not aware of a fast copy though. I think an HDFS copy streams the
bytes through the driver (unless a distcp is issued which is a MR job).

(Thanks for engaging in this discussion folks)


On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax <re...@google.com> wrote:

> I have another theory: in FileBasedSink.moveToOutputFiles we copy the
> temporary files to the final destination and then delete the temp files.
> Does HDFS support a fast rename operation? If so, I bet Spark is using that
> instead of paying the cost of copying the files.
>
> On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax <re...@google.com> wrote:
>
>> Ismael, that should already be true. If not using dynamic destinations
>> there might be some edges in the graph that are never used (i.e. no records
>> are ever published on them), but that should not affect performance. If
>> this is not the case we should fix it.
>>
>> Reuven
>>
>> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía <ie...@gmail.com> wrote:
>>
>>> Spark runner uses the Spark broadcast mechanism to materialize the
>>> side input PCollections in the workers, not sure exactly if this is
>>> efficient assigned in an optimal way but seems logical at least.
>>>
>>> Just wondering if we shouldn't better first tackle the fact that if
>>> the pipeline does not have dynamic destinations (this case) WriteFiles
>>> should not be doing so much extra magic?
>>>
>>> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax <re...@google.com> wrote:
>>> >
>>> > Often only the metadata (i.e. temp file names) are shuffled, except in
>>> the "spilling" case (which should only happen when using dynamic
>>> destinations).
>>> >
>>> > WriteFiles depends heavily on side inputs. How are side inputs
>>> implemented in the Spark runner?
>>> >
>>> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>> >>
>>> >> Yes, I stand corrected, dynamic writes is now much more than the
>>> >> primitive window-based naming we used to have.
>>> >>
>>> >> It would be interesting to visualize how much of this codepath is
>>> >> metatada vs. the actual data.
>>> >>
>>> >> In the case of file writing, it seems one could (maybe?) avoid
>>> >> requiring a stable input, as shards are accepted as a whole (unlike,
>>> >> say, sinks where a deterministic uid is needed for deduplication on
>>> >> retry).
>>> >>
>>> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax <re...@google.com> wrote:
>>> >> >
>>> >> > Robert - much of the complexity isn't due to streaming, but rather
>>> because WriteFiles supports "dynamic" output (where the user can choose a
>>> destination file based on the input record). In practice if a pipeline is
>>> not using dynamic destinations the full graph is still generated, but much
>>> of that graph is never used (empty PCollections).
>>> >> >
>>> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw <
>>> robertwb@google.com> wrote:
>>> >> >>
>>> >> >> I agree that this is concerning. Some of the complexity may have
>>> also
>>> >> >> been introduced to accommodate writing files in Streaming mode,
>>> but it
>>> >> >> seems we should be able to execute this as a single Map operation.
>>> >> >>
>>> >> >> Have you profiled to see which stages and/or operations are taking
>>> up the time?
>>> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>>> >> >> <ti...@gmail.com> wrote:
>>> >> >> >
>>> >> >> > Hi folks,
>>> >> >> >
>>> >> >> > I've recently been involved in projects rewriting Avro files and
>>> have discovered a concerning performance trait in Beam.
>>> >> >> >
>>> >> >> > I have observed Beam between 6-20x slower than native Spark or
>>> MapReduce code for a simple pipeline of read Avro, modify, write Avro.
>>> >> >> >
>>> >> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
>>> Beam/Spark, 40 minutes with a map-only MR job
>>> >> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using
>>> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
>>> >> >> >
>>> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters
>>> (Spark / YARN) on reference Dell / Cloudera hardware.
>>> >> >> >
>>> >> >> > I have only just started exploring but I believe the cause is
>>> rooted in the WriteFiles which is used by all our file based IO. WriteFiles
>>> is reasonably complex with reshuffles, spilling to temporary files
>>> (presumably to accommodate varying bundle sizes/avoid small files), a
>>> union, a GBK etc.
>>> >> >> >
>>> >> >> > Before I go too far with exploration I'd appreciate thoughts on
>>> whether we believe this is a concern (I do), if we should explore
>>> optimisations or any insight from previous work in this area.
>>> >> >> >
>>> >> >> > Thanks,
>>> >> >> > Tim
>>> >> >> >
>>> >> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>>>
>>

Re: [DISCUSS] Performance of write() in file based IO

Posted by Reuven Lax <re...@google.com>.
I have another theory: in FileBasedSink.moveToOutputFiles we copy the
temporary files to the final destination and then delete the temp files.
Does HDFS support a fast rename operation? If so, I bet Spark is using that
instead of paying the cost of copying the files.

On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax <re...@google.com> wrote:

> Ismael, that should already be true. If not using dynamic destinations
> there might be some edges in the graph that are never used (i.e. no records
> are ever published on them), but that should not affect performance. If
> this is not the case we should fix it.
>
> Reuven
>
> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía <ie...@gmail.com> wrote:
>
>> Spark runner uses the Spark broadcast mechanism to materialize the
>> side input PCollections in the workers, not sure exactly if this is
>> efficient assigned in an optimal way but seems logical at least.
>>
>> Just wondering if we shouldn't better first tackle the fact that if
>> the pipeline does not have dynamic destinations (this case) WriteFiles
>> should not be doing so much extra magic?
>>
>> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax <re...@google.com> wrote:
>> >
>> > Often only the metadata (i.e. temp file names) are shuffled, except in
>> the "spilling" case (which should only happen when using dynamic
>> destinations).
>> >
>> > WriteFiles depends heavily on side inputs. How are side inputs
>> implemented in the Spark runner?
>> >
>> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >>
>> >> Yes, I stand corrected, dynamic writes is now much more than the
>> >> primitive window-based naming we used to have.
>> >>
>> >> It would be interesting to visualize how much of this codepath is
>> >> metatada vs. the actual data.
>> >>
>> >> In the case of file writing, it seems one could (maybe?) avoid
>> >> requiring a stable input, as shards are accepted as a whole (unlike,
>> >> say, sinks where a deterministic uid is needed for deduplication on
>> >> retry).
>> >>
>> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax <re...@google.com> wrote:
>> >> >
>> >> > Robert - much of the complexity isn't due to streaming, but rather
>> because WriteFiles supports "dynamic" output (where the user can choose a
>> destination file based on the input record). In practice if a pipeline is
>> not using dynamic destinations the full graph is still generated, but much
>> of that graph is never used (empty PCollections).
>> >> >
>> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >> >>
>> >> >> I agree that this is concerning. Some of the complexity may have
>> also
>> >> >> been introduced to accommodate writing files in Streaming mode, but
>> it
>> >> >> seems we should be able to execute this as a single Map operation.
>> >> >>
>> >> >> Have you profiled to see which stages and/or operations are taking
>> up the time?
>> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>> >> >> <ti...@gmail.com> wrote:
>> >> >> >
>> >> >> > Hi folks,
>> >> >> >
>> >> >> > I've recently been involved in projects rewriting Avro files and
>> have discovered a concerning performance trait in Beam.
>> >> >> >
>> >> >> > I have observed Beam between 6-20x slower than native Spark or
>> MapReduce code for a simple pipeline of read Avro, modify, write Avro.
>> >> >> >
>> >> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
>> Beam/Spark, 40 minutes with a map-only MR job
>> >> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using
>> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
>> >> >> >
>> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters
>> (Spark / YARN) on reference Dell / Cloudera hardware.
>> >> >> >
>> >> >> > I have only just started exploring but I believe the cause is
>> rooted in the WriteFiles which is used by all our file based IO. WriteFiles
>> is reasonably complex with reshuffles, spilling to temporary files
>> (presumably to accommodate varying bundle sizes/avoid small files), a
>> union, a GBK etc.
>> >> >> >
>> >> >> > Before I go too far with exploration I'd appreciate thoughts on
>> whether we believe this is a concern (I do), if we should explore
>> optimisations or any insight from previous work in this area.
>> >> >> >
>> >> >> > Thanks,
>> >> >> > Tim
>> >> >> >
>> >> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>>
>

Re: [DISCUSS] Performance of write() in file based IO

Posted by Reuven Lax <re...@google.com>.
Ismael, that should already be true. If not using dynamic destinations
there might be some edges in the graph that are never used (i.e. no records
are ever published on them), but that should not affect performance. If
this is not the case we should fix it.

Reuven

On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía <ie...@gmail.com> wrote:

> Spark runner uses the Spark broadcast mechanism to materialize the
> side input PCollections in the workers, not sure exactly if this is
> efficient assigned in an optimal way but seems logical at least.
>
> Just wondering if we shouldn't better first tackle the fact that if
> the pipeline does not have dynamic destinations (this case) WriteFiles
> should not be doing so much extra magic?
>
> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax <re...@google.com> wrote:
> >
> > Often only the metadata (i.e. temp file names) are shuffled, except in
> the "spilling" case (which should only happen when using dynamic
> destinations).
> >
> > WriteFiles depends heavily on side inputs. How are side inputs
> implemented in the Spark runner?
> >
> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> Yes, I stand corrected, dynamic writes is now much more than the
> >> primitive window-based naming we used to have.
> >>
> >> It would be interesting to visualize how much of this codepath is
> >> metatada vs. the actual data.
> >>
> >> In the case of file writing, it seems one could (maybe?) avoid
> >> requiring a stable input, as shards are accepted as a whole (unlike,
> >> say, sinks where a deterministic uid is needed for deduplication on
> >> retry).
> >>
> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax <re...@google.com> wrote:
> >> >
> >> > Robert - much of the complexity isn't due to streaming, but rather
> because WriteFiles supports "dynamic" output (where the user can choose a
> destination file based on the input record). In practice if a pipeline is
> not using dynamic destinations the full graph is still generated, but much
> of that graph is never used (empty PCollections).
> >> >
> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >> >>
> >> >> I agree that this is concerning. Some of the complexity may have also
> >> >> been introduced to accommodate writing files in Streaming mode, but
> it
> >> >> seems we should be able to execute this as a single Map operation.
> >> >>
> >> >> Have you profiled to see which stages and/or operations are taking
> up the time?
> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
> >> >> <ti...@gmail.com> wrote:
> >> >> >
> >> >> > Hi folks,
> >> >> >
> >> >> > I've recently been involved in projects rewriting Avro files and
> have discovered a concerning performance trait in Beam.
> >> >> >
> >> >> > I have observed Beam between 6-20x slower than native Spark or
> MapReduce code for a simple pipeline of read Avro, modify, write Avro.
> >> >> >
> >> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
> Beam/Spark, 40 minutes with a map-only MR job
> >> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using
> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
> >> >> >
> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters
> (Spark / YARN) on reference Dell / Cloudera hardware.
> >> >> >
> >> >> > I have only just started exploring but I believe the cause is
> rooted in the WriteFiles which is used by all our file based IO. WriteFiles
> is reasonably complex with reshuffles, spilling to temporary files
> (presumably to accommodate varying bundle sizes/avoid small files), a
> union, a GBK etc.
> >> >> >
> >> >> > Before I go too far with exploration I'd appreciate thoughts on
> whether we believe this is a concern (I do), if we should explore
> optimisations or any insight from previous work in this area.
> >> >> >
> >> >> > Thanks,
> >> >> > Tim
> >> >> >
> >> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>

Re: [DISCUSS] Performance of write() in file based IO

Posted by Ismaël Mejía <ie...@gmail.com>.
Spark runner uses the Spark broadcast mechanism to materialize the
side input PCollections in the workers, not sure exactly if this is
efficient assigned in an optimal way but seems logical at least.

Just wondering if we shouldn't better first tackle the fact that if
the pipeline does not have dynamic destinations (this case) WriteFiles
should not be doing so much extra magic?

On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax <re...@google.com> wrote:
>
> Often only the metadata (i.e. temp file names) are shuffled, except in the "spilling" case (which should only happen when using dynamic destinations).
>
> WriteFiles depends heavily on side inputs. How are side inputs implemented in the Spark runner?
>
> On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw <ro...@google.com> wrote:
>>
>> Yes, I stand corrected, dynamic writes is now much more than the
>> primitive window-based naming we used to have.
>>
>> It would be interesting to visualize how much of this codepath is
>> metatada vs. the actual data.
>>
>> In the case of file writing, it seems one could (maybe?) avoid
>> requiring a stable input, as shards are accepted as a whole (unlike,
>> say, sinks where a deterministic uid is needed for deduplication on
>> retry).
>>
>> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax <re...@google.com> wrote:
>> >
>> > Robert - much of the complexity isn't due to streaming, but rather because WriteFiles supports "dynamic" output (where the user can choose a destination file based on the input record). In practice if a pipeline is not using dynamic destinations the full graph is still generated, but much of that graph is never used (empty PCollections).
>> >
>> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw <ro...@google.com> wrote:
>> >>
>> >> I agree that this is concerning. Some of the complexity may have also
>> >> been introduced to accommodate writing files in Streaming mode, but it
>> >> seems we should be able to execute this as a single Map operation.
>> >>
>> >> Have you profiled to see which stages and/or operations are taking up the time?
>> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>> >> <ti...@gmail.com> wrote:
>> >> >
>> >> > Hi folks,
>> >> >
>> >> > I've recently been involved in projects rewriting Avro files and have discovered a concerning performance trait in Beam.
>> >> >
>> >> > I have observed Beam between 6-20x slower than native Spark or MapReduce code for a simple pipeline of read Avro, modify, write Avro.
>> >> >
>> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark, 40 minutes with a map-only MR job
>> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
>> >> >
>> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark / YARN) on reference Dell / Cloudera hardware.
>> >> >
>> >> > I have only just started exploring but I believe the cause is rooted in the WriteFiles which is used by all our file based IO. WriteFiles is reasonably complex with reshuffles, spilling to temporary files (presumably to accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
>> >> >
>> >> > Before I go too far with exploration I'd appreciate thoughts on whether we believe this is a concern (I do), if we should explore optimisations or any insight from previous work in this area.
>> >> >
>> >> > Thanks,
>> >> > Tim
>> >> >
>> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro

Re: [DISCUSS] Performance of write() in file based IO

Posted by Reuven Lax <re...@google.com>.
Often only the metadata (i.e. temp file names) are shuffled, except in the
"spilling" case (which should only happen when using dynamic destinations).

WriteFiles depends heavily on side inputs. How are side inputs implemented
in the Spark runner?

On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw <ro...@google.com> wrote:

> Yes, I stand corrected, dynamic writes is now much more than the
> primitive window-based naming we used to have.
>
> It would be interesting to visualize how much of this codepath is
> metatada vs. the actual data.
>
> In the case of file writing, it seems one could (maybe?) avoid
> requiring a stable input, as shards are accepted as a whole (unlike,
> say, sinks where a deterministic uid is needed for deduplication on
> retry).
>
> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax <re...@google.com> wrote:
> >
> > Robert - much of the complexity isn't due to streaming, but rather
> because WriteFiles supports "dynamic" output (where the user can choose a
> destination file based on the input record). In practice if a pipeline is
> not using dynamic destinations the full graph is still generated, but much
> of that graph is never used (empty PCollections).
> >
> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> I agree that this is concerning. Some of the complexity may have also
> >> been introduced to accommodate writing files in Streaming mode, but it
> >> seems we should be able to execute this as a single Map operation.
> >>
> >> Have you profiled to see which stages and/or operations are taking up
> the time?
> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
> >> <ti...@gmail.com> wrote:
> >> >
> >> > Hi folks,
> >> >
> >> > I've recently been involved in projects rewriting Avro files and have
> discovered a concerning performance trait in Beam.
> >> >
> >> > I have observed Beam between 6-20x slower than native Spark or
> MapReduce code for a simple pipeline of read Avro, modify, write Avro.
> >> >
> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
> Beam/Spark, 40 minutes with a map-only MR job
> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark,
> 18 minutes using vanilla Spark code. Test code available [1]
> >> >
> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters
> (Spark / YARN) on reference Dell / Cloudera hardware.
> >> >
> >> > I have only just started exploring but I believe the cause is rooted
> in the WriteFiles which is used by all our file based IO. WriteFiles is
> reasonably complex with reshuffles, spilling to temporary files (presumably
> to accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
> >> >
> >> > Before I go too far with exploration I'd appreciate thoughts on
> whether we believe this is a concern (I do), if we should explore
> optimisations or any insight from previous work in this area.
> >> >
> >> > Thanks,
> >> > Tim
> >> >
> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>

Re: [DISCUSS] Performance of write() in file based IO

Posted by Robert Bradshaw <ro...@google.com>.
Yes, I stand corrected, dynamic writes is now much more than the
primitive window-based naming we used to have.

It would be interesting to visualize how much of this codepath is
metatada vs. the actual data.

In the case of file writing, it seems one could (maybe?) avoid
requiring a stable input, as shards are accepted as a whole (unlike,
say, sinks where a deterministic uid is needed for deduplication on
retry).

On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax <re...@google.com> wrote:
>
> Robert - much of the complexity isn't due to streaming, but rather because WriteFiles supports "dynamic" output (where the user can choose a destination file based on the input record). In practice if a pipeline is not using dynamic destinations the full graph is still generated, but much of that graph is never used (empty PCollections).
>
> On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw <ro...@google.com> wrote:
>>
>> I agree that this is concerning. Some of the complexity may have also
>> been introduced to accommodate writing files in Streaming mode, but it
>> seems we should be able to execute this as a single Map operation.
>>
>> Have you profiled to see which stages and/or operations are taking up the time?
>> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>> <ti...@gmail.com> wrote:
>> >
>> > Hi folks,
>> >
>> > I've recently been involved in projects rewriting Avro files and have discovered a concerning performance trait in Beam.
>> >
>> > I have observed Beam between 6-20x slower than native Spark or MapReduce code for a simple pipeline of read Avro, modify, write Avro.
>> >
>> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark, 40 minutes with a map-only MR job
>> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
>> >
>> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark / YARN) on reference Dell / Cloudera hardware.
>> >
>> > I have only just started exploring but I believe the cause is rooted in the WriteFiles which is used by all our file based IO. WriteFiles is reasonably complex with reshuffles, spilling to temporary files (presumably to accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
>> >
>> > Before I go too far with exploration I'd appreciate thoughts on whether we believe this is a concern (I do), if we should explore optimisations or any insight from previous work in this area.
>> >
>> > Thanks,
>> > Tim
>> >
>> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro

Re: [DISCUSS] Performance of write() in file based IO

Posted by Reuven Lax <re...@google.com>.
Robert - much of the complexity isn't due to streaming, but rather because
WriteFiles supports "dynamic" output (where the user can choose a
destination file based on the input record). In practice if a pipeline is
not using dynamic destinations the full graph is still generated, but much
of that graph is never used (empty PCollections).

On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw <ro...@google.com> wrote:

> I agree that this is concerning. Some of the complexity may have also
> been introduced to accommodate writing files in Streaming mode, but it
> seems we should be able to execute this as a single Map operation.
>
> Have you profiled to see which stages and/or operations are taking up the
> time?
> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
> <ti...@gmail.com> wrote:
> >
> > Hi folks,
> >
> > I've recently been involved in projects rewriting Avro files and have
> discovered a concerning performance trait in Beam.
> >
> > I have observed Beam between 6-20x slower than native Spark or MapReduce
> code for a simple pipeline of read Avro, modify, write Avro.
> >
> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark,
> 40 minutes with a map-only MR job
> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18
> minutes using vanilla Spark code. Test code available [1]
> >
> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark /
> YARN) on reference Dell / Cloudera hardware.
> >
> > I have only just started exploring but I believe the cause is rooted in
> the WriteFiles which is used by all our file based IO. WriteFiles is
> reasonably complex with reshuffles, spilling to temporary files (presumably
> to accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
> >
> > Before I go too far with exploration I'd appreciate thoughts on whether
> we believe this is a concern (I do), if we should explore optimisations or
> any insight from previous work in this area.
> >
> > Thanks,
> > Tim
> >
> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>

Re: [DISCUSS] Performance of write() in file based IO

Posted by Robert Bradshaw <ro...@google.com>.
I agree that this is concerning. Some of the complexity may have also
been introduced to accommodate writing files in Streaming mode, but it
seems we should be able to execute this as a single Map operation.

Have you profiled to see which stages and/or operations are taking up the time?
On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
<ti...@gmail.com> wrote:
>
> Hi folks,
>
> I've recently been involved in projects rewriting Avro files and have discovered a concerning performance trait in Beam.
>
> I have observed Beam between 6-20x slower than native Spark or MapReduce code for a simple pipeline of read Avro, modify, write Avro.
>
>  - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark, 40 minutes with a map-only MR job
>  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
>
> These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark / YARN) on reference Dell / Cloudera hardware.
>
> I have only just started exploring but I believe the cause is rooted in the WriteFiles which is used by all our file based IO. WriteFiles is reasonably complex with reshuffles, spilling to temporary files (presumably to accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
>
> Before I go too far with exploration I'd appreciate thoughts on whether we believe this is a concern (I do), if we should explore optimisations or any insight from previous work in this area.
>
> Thanks,
> Tim
>
> [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro