You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Tim Robertson <ti...@gmail.com> on 2019/10/03 07:24:33 UTC

Beam 2.15.0 SparkRunner issues

Hi all,

We haven't dug enough into this to know where to log issues, but I'll start
by sharing here.

After upgrading from Beam 2.10.0 to 2.15.0 we see issues on SparkRunner -
we suspect all of this related.

1. spark.default.parallelism is not respected

2. File writing (Avro) with dynamic destinations (grouped into folders by a
field name) consistently fail with
org.apache.beam.sdk.util.UserCodeException:
java.nio.file.FileAlreadyExistsException: Unable to rename resource
hdfs://ha-nn/pipelines/export-20190930-0854/.temp-beam-d4fd89ed-fc7a-4b1e-aceb-68f9d72d50f0/6e086f60-8bda-4d0e-b29d-1b47fdfc88c0
to
hdfs://ha-nn/pipelines/export-20190930-0854/7c9d2aec-f762-11e1-a439-00145eb45e9a/verbatimHBaseExport-00000-of-00001.avro
as destination already exists and couldn't be deleted.

3. GBK operations that run over 500M small records consistently fail with
OOM. We tried different configs with 48GB, 60GB, 80GB executor memory

Our pipelines run are batch, simple transformations with either an
HBaseSnapshot to Avro files or a merge of records in Avro (the GBK issue)
pushed to ElasticSearch (it fails upstream of the ElasticsearchIO in the
GBK stage).

We notice operations that were mapToPair  in 2.10.0 become repartition
operations ( (mapToPair at GroupCombineFunctions.java:68 becomes
repartition at GroupCombineFunctions.java:202)) which might be related to
this and looks surprising.

I'll report more as we learn. If anyone has any immediate ideas based on
their commits or reviews or if you wish an tests run on other Beam versions
please say.

Thanks,
Tim

Re: Beam 2.15.0 SparkRunner issues

Posted by Ismaël Mejía <ie...@gmail.com>.
Thanks for reporting back Tim and great you found that it was a GC issue.

In the meantime I filled BEAM-8384 [1] for the
`spark.default.parallelism` issue. The Spark runner (both classic and
structured streaming) only in exceptional cases tries to improve the
default configuration values, but if the user overwrites a default
value the runner should respect this value otherwise users will be
confused and won’t be able to use well known Spark tunings like in
your case.

[1] https://issues.apache.org/jira/browse/BEAM-8384

On Tue, Oct 8, 2019 at 1:19 PM Tim Robertson <ti...@gmail.com> wrote:
>
> I'm sorry for not replying. We are super busy trying to prepare data to release.
>
> An update:
> - We were using G1GC and through slack were advised against that. This fixed the OOM error we saw and all our 2.15.0 jobs did complete
>
> When we have time (after 3 weeks) I'll try and isolate a test case with the reshuffle example and parallelism.
>
> Thanks,
> Tim
>
>
> On Thu, Oct 3, 2019 at 1:21 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>> Hi Tim,
>>
>> can you please elaborate more about some parts?
>>
>> 1) What happens actually in your case? What is the specific settings you
>> use?
>>
>> 3) Can you share stacktrace? Is it always the same, or does it change?
>>
>> The mentioned GroupCombineFunctions.java:202 comes from a Reshuffle,
>> which seems to make a little sense to me regarding the logic you
>> described. Do you use Reshuffle transform or does it expand from some
>> other transform?
>>
>> Jan
>>
>> On 10/3/19 9:24 AM, Tim Robertson wrote:
>> > Hi all,
>> >
>> > We haven't dug enough into this to know where to log issues, but I'll
>> > start by sharing here.
>> >
>> > After upgrading from Beam 2.10.0 to 2.15.0 we see issues on
>> > SparkRunner - we suspect all of this related.
>> >
>> > 1. spark.default.parallelism is not respected
>> >
>> > 2. File writing (Avro) with dynamic destinations (grouped into folders
>> > by a field name) consistently fail with
>> > org.apache.beam.sdk.util.UserCodeException:
>> > java.nio.file.FileAlreadyExistsException: Unable to rename resource
>> > hdfs://ha-nn/pipelines/export-20190930-0854/.temp-beam-d4fd89ed-fc7a-4b1e-aceb-68f9d72d50f0/6e086f60-8bda-4d0e-b29d-1b47fdfc88c0
>> > to
>> > hdfs://ha-nn/pipelines/export-20190930-0854/7c9d2aec-f762-11e1-a439-00145eb45e9a/verbatimHBaseExport-00000-of-00001.avro
>> > as destination already exists and couldn't be deleted.
>> >
>> > 3. GBK operations that run over 500M small records consistently fail
>> > with OOM. We tried different configs with 48GB, 60GB, 80GB executor
>> > memory
>> >
>> > Our pipelines run are batch, simple transformations with either an
>> > HBaseSnapshot to Avro files or a merge of records in Avro (the GBK
>> > issue) pushed to ElasticSearch (it fails upstream of the
>> > ElasticsearchIO in the GBK stage).
>> >
>> > We notice operations that were mapToPair  in 2.10.0 become repartition
>> > operations ( (mapToPair at GroupCombineFunctions.java:68 becomes
>> > repartition at GroupCombineFunctions.java:202)) which might be related
>> > to this and looks surprising.
>> >
>> > I'll report more as we learn. If anyone has any immediate ideas based
>> > on their commits or reviews or if you wish an tests run on other Beam
>> > versions please say.
>> >
>> > Thanks,
>> > Tim
>> >
>> >
>> >

Re: Beam 2.15.0 SparkRunner issues

Posted by Brian Hulette <bh...@verily.com>.
For anyone curious, this slack conversation can be found at
https://the-asf.slack.com/archives/CE3DHTMK9/p1570099388008900

Brian

On Tue, Oct 8, 2019 at 4:19 AM Tim Robertson <ti...@gmail.com>
wrote:

> I'm sorry for not replying. We are super busy trying to prepare data to
> release.
>
> An update:
> - We were using G1GC and through slack were advised against that. This
> fixed the OOM error we saw and all our 2.15.0 jobs did complete
>
> When we have time (after 3 weeks) I'll try and isolate a test case with
> the reshuffle example and parallelism.
>
> Thanks,
> Tim
>
>
> On Thu, Oct 3, 2019 at 1:21 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Tim,
>>
>> can you please elaborate more about some parts?
>>
>> 1) What happens actually in your case? What is the specific settings you
>> use?
>>
>> 3) Can you share stacktrace? Is it always the same, or does it change?
>>
>> The mentioned GroupCombineFunctions.java:202 comes from a Reshuffle,
>> which seems to make a little sense to me regarding the logic you
>> described. Do you use Reshuffle transform or does it expand from some
>> other transform?
>>
>> Jan
>>
>> On 10/3/19 9:24 AM, Tim Robertson wrote:
>> > Hi all,
>> >
>> > We haven't dug enough into this to know where to log issues, but I'll
>> > start by sharing here.
>> >
>> > After upgrading from Beam 2.10.0 to 2.15.0 we see issues on
>> > SparkRunner - we suspect all of this related.
>> >
>> > 1. spark.default.parallelism is not respected
>> >
>> > 2. File writing (Avro) with dynamic destinations (grouped into folders
>> > by a field name) consistently fail with
>> > org.apache.beam.sdk.util.UserCodeException:
>> > java.nio.file.FileAlreadyExistsException: Unable to rename resource
>> >
>> hdfs://ha-nn/pipelines/export-20190930-0854/.temp-beam-d4fd89ed-fc7a-4b1e-aceb-68f9d72d50f0/6e086f60-8bda-4d0e-b29d-1b47fdfc88c0
>>
>> > to
>> >
>> hdfs://ha-nn/pipelines/export-20190930-0854/7c9d2aec-f762-11e1-a439-00145eb45e9a/verbatimHBaseExport-00000-of-00001.avro
>>
>> > as destination already exists and couldn't be deleted.
>> >
>> > 3. GBK operations that run over 500M small records consistently fail
>> > with OOM. We tried different configs with 48GB, 60GB, 80GB executor
>> > memory
>> >
>> > Our pipelines run are batch, simple transformations with either an
>> > HBaseSnapshot to Avro files or a merge of records in Avro (the GBK
>> > issue) pushed to ElasticSearch (it fails upstream of the
>> > ElasticsearchIO in the GBK stage).
>> >
>> > We notice operations that were mapToPair  in 2.10.0 become repartition
>> > operations ( (mapToPair at GroupCombineFunctions.java:68 becomes
>> > repartition at GroupCombineFunctions.java:202)) which might be related
>> > to this and looks surprising.
>> >
>> > I'll report more as we learn. If anyone has any immediate ideas based
>> > on their commits or reviews or if you wish an tests run on other Beam
>> > versions please say.
>> >
>> > Thanks,
>> > Tim
>> >
>> >
>> >
>>
>

Re: Beam 2.15.0 SparkRunner issues

Posted by Tim Robertson <ti...@gmail.com>.
I'm sorry for not replying. We are super busy trying to prepare data to
release.

An update:
- We were using G1GC and through slack were advised against that. This
fixed the OOM error we saw and all our 2.15.0 jobs did complete

When we have time (after 3 weeks) I'll try and isolate a test case with the
reshuffle example and parallelism.

Thanks,
Tim


On Thu, Oct 3, 2019 at 1:21 PM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Tim,
>
> can you please elaborate more about some parts?
>
> 1) What happens actually in your case? What is the specific settings you
> use?
>
> 3) Can you share stacktrace? Is it always the same, or does it change?
>
> The mentioned GroupCombineFunctions.java:202 comes from a Reshuffle,
> which seems to make a little sense to me regarding the logic you
> described. Do you use Reshuffle transform or does it expand from some
> other transform?
>
> Jan
>
> On 10/3/19 9:24 AM, Tim Robertson wrote:
> > Hi all,
> >
> > We haven't dug enough into this to know where to log issues, but I'll
> > start by sharing here.
> >
> > After upgrading from Beam 2.10.0 to 2.15.0 we see issues on
> > SparkRunner - we suspect all of this related.
> >
> > 1. spark.default.parallelism is not respected
> >
> > 2. File writing (Avro) with dynamic destinations (grouped into folders
> > by a field name) consistently fail with
> > org.apache.beam.sdk.util.UserCodeException:
> > java.nio.file.FileAlreadyExistsException: Unable to rename resource
> >
> hdfs://ha-nn/pipelines/export-20190930-0854/.temp-beam-d4fd89ed-fc7a-4b1e-aceb-68f9d72d50f0/6e086f60-8bda-4d0e-b29d-1b47fdfc88c0
>
> > to
> >
> hdfs://ha-nn/pipelines/export-20190930-0854/7c9d2aec-f762-11e1-a439-00145eb45e9a/verbatimHBaseExport-00000-of-00001.avro
>
> > as destination already exists and couldn't be deleted.
> >
> > 3. GBK operations that run over 500M small records consistently fail
> > with OOM. We tried different configs with 48GB, 60GB, 80GB executor
> > memory
> >
> > Our pipelines run are batch, simple transformations with either an
> > HBaseSnapshot to Avro files or a merge of records in Avro (the GBK
> > issue) pushed to ElasticSearch (it fails upstream of the
> > ElasticsearchIO in the GBK stage).
> >
> > We notice operations that were mapToPair  in 2.10.0 become repartition
> > operations ( (mapToPair at GroupCombineFunctions.java:68 becomes
> > repartition at GroupCombineFunctions.java:202)) which might be related
> > to this and looks surprising.
> >
> > I'll report more as we learn. If anyone has any immediate ideas based
> > on their commits or reviews or if you wish an tests run on other Beam
> > versions please say.
> >
> > Thanks,
> > Tim
> >
> >
> >
>

Re: Beam 2.15.0 SparkRunner issues

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Tim,

can you please elaborate more about some parts?

1) What happens actually in your case? What is the specific settings you 
use?

3) Can you share stacktrace? Is it always the same, or does it change?

The mentioned GroupCombineFunctions.java:202 comes from a Reshuffle, 
which seems to make a little sense to me regarding the logic you 
described. Do you use Reshuffle transform or does it expand from some 
other transform?

Jan

On 10/3/19 9:24 AM, Tim Robertson wrote:
> Hi all,
>
> We haven't dug enough into this to know where to log issues, but I'll 
> start by sharing here.
>
> After upgrading from Beam 2.10.0 to 2.15.0 we see issues on 
> SparkRunner - we suspect all of this related.
>
> 1. spark.default.parallelism is not respected
>
> 2. File writing (Avro) with dynamic destinations (grouped into folders 
> by a field name) consistently fail with
> org.apache.beam.sdk.util.UserCodeException: 
> java.nio.file.FileAlreadyExistsException: Unable to rename resource 
> hdfs://ha-nn/pipelines/export-20190930-0854/.temp-beam-d4fd89ed-fc7a-4b1e-aceb-68f9d72d50f0/6e086f60-8bda-4d0e-b29d-1b47fdfc88c0 
> to 
> hdfs://ha-nn/pipelines/export-20190930-0854/7c9d2aec-f762-11e1-a439-00145eb45e9a/verbatimHBaseExport-00000-of-00001.avro 
> as destination already exists and couldn't be deleted.
>
> 3. GBK operations that run over 500M small records consistently fail 
> with OOM. We tried different configs with 48GB, 60GB, 80GB executor 
> memory
>
> Our pipelines run are batch, simple transformations with either an 
> HBaseSnapshot to Avro files or a merge of records in Avro (the GBK 
> issue) pushed to ElasticSearch (it fails upstream of the 
> ElasticsearchIO in the GBK stage).
>
> We notice operations that were mapToPair  in 2.10.0 become repartition 
> operations ( (mapToPair at GroupCombineFunctions.java:68 becomes 
> repartition at GroupCombineFunctions.java:202)) which might be related 
> to this and looks surprising.
>
> I'll report more as we learn. If anyone has any immediate ideas based 
> on their commits or reviews or if you wish an tests run on other Beam 
> versions please say.
>
> Thanks,
> Tim
>
>
>

Re: Beam 2.15.0 SparkRunner issues

Posted by Jozef Vilcek <jo...@gmail.com>.
We do have 2.15.0 Beam batch jobs running on Spark runner. I did have a bit
of tricky time with spark.default.parallelism, but at the end it works fine
for us (custom parallelism on source stages and spark.default.parallelism
on all other stages after shuffles)

Tricky part in my case was interaction between `spark.default.parallelism`
and `beam.bundleSize`. I had a problem that default parallelism was
enforced on inputs too, therefore splitting them too much or too little.
Configuring bundleSize and custom config on inputs (e.g. hadoop input
format max/min split size) did the trick. TransformTranslator does make a
decision on parishioner based on bundleSize, however I am not sure how it
is later on used:
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java#L571

On Thu, Oct 3, 2019 at 9:25 AM Tim Robertson <ti...@gmail.com>
wrote:

> Hi all,
>
> We haven't dug enough into this to know where to log issues, but I'll
> start by sharing here.
>
> After upgrading from Beam 2.10.0 to 2.15.0 we see issues on SparkRunner -
> we suspect all of this related.
>
> 1. spark.default.parallelism is not respected
>
> 2. File writing (Avro) with dynamic destinations (grouped into folders by
> a field name) consistently fail with
> org.apache.beam.sdk.util.UserCodeException:
> java.nio.file.FileAlreadyExistsException: Unable to rename resource
> hdfs://ha-nn/pipelines/export-20190930-0854/.temp-beam-d4fd89ed-fc7a-4b1e-aceb-68f9d72d50f0/6e086f60-8bda-4d0e-b29d-1b47fdfc88c0
> to
> hdfs://ha-nn/pipelines/export-20190930-0854/7c9d2aec-f762-11e1-a439-00145eb45e9a/verbatimHBaseExport-00000-of-00001.avro
> as destination already exists and couldn't be deleted.
>
> 3. GBK operations that run over 500M small records consistently fail with
> OOM. We tried different configs with 48GB, 60GB, 80GB executor memory
>
> Our pipelines run are batch, simple transformations with either an
> HBaseSnapshot to Avro files or a merge of records in Avro (the GBK issue)
> pushed to ElasticSearch (it fails upstream of the ElasticsearchIO in the
> GBK stage).
>
> We notice operations that were mapToPair  in 2.10.0 become repartition
> operations ( (mapToPair at GroupCombineFunctions.java:68 becomes
> repartition at GroupCombineFunctions.java:202)) which might be related to
> this and looks surprising.
>
> I'll report more as we learn. If anyone has any immediate ideas based on
> their commits or reviews or if you wish an tests run on other Beam versions
> please say.
>
> Thanks,
> Tim
>
>
>
>