You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Ziyad Muhammed <mm...@gmail.com> on 2019/09/04 09:42:19 UTC

AvroIO Windowed Writes - Number of files to specify

Hi all

I have a beam pipeline running with cloud dataflow that produces avro files
on GCS. Window duration is 1 minute and currently the job is running with
64 cores (16 * n1-standard-4). Per minute the data produced is around 2GB.

Is there any recommendation on the number of avro files to specify?
Currently I'm using 64 (to match with the number of cores). Will a very
high number help in increasing the write throughput?
I saw that BigqueryIO with FILE_LOADS is using a default value of 1000
files.

I tried some random values, but couldn't infer a pattern when is it more
performant.

Any suggestion is hugely appreciated.

Best
Ziyad

Re: AvroIO Windowed Writes - Number of files to specify

Posted by Chamikara Jayalath <ch...@google.com>.
I'm bit confused since we mention
https://issues.apache.org/jira/browse/BEAM-1438 before that error but that
JIRA has been fixed a few years ago.
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L312

+Reuven Lax <re...@google.com> can you comment on whether we can remove
this restriction now that the JIRA has been fixed ?

Thanks,
Cham

On Thu, Sep 12, 2019 at 5:34 AM Ziyad Muhammed <mm...@gmail.com> wrote:

> Hi Cham
>
> Any update on this?
>
> Best
> Ziyad
>
>
> On Thu, Sep 5, 2019 at 5:43 PM Ziyad Muhammed <mm...@gmail.com> wrote:
>
>> Hi Cham
>>
>> I tried that before. Apparently it's not accepted by either direct runner
>> or dataflow runner. I get the below error:
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: When
>>> applying WriteFiles to an unbounded PCollection, must specify number of
>>> output shards explicitly
>>> at
>>> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>>> at org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:299)
>>> at org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:109)
>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
>>> at org.apache.beam.sdk.io.AvroIO$TypedWrite.expand(AvroIO.java:1519)
>>> at org.apache.beam.sdk.io.AvroIO$TypedWrite.expand(AvroIO.java:1155)
>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:357)
>>> at org.apache.beam.sdk.io.AvroIO$Write.expand(AvroIO.java:1659)
>>> at org.apache.beam.sdk.io.AvroIO$Write.expand(AvroIO.java:1541)
>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
>>>
>>
>>
>>
>> Best
>> Ziyad
>>
>>
>> On Wed, Sep 4, 2019 at 6:45 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>> Do you mean the value to specify for number of shards to write [1] ?
>>>
>>> For this I think it's better to not specify any value which will give
>>> the runner the most flexibility.
>>>
>>> Thanks,
>>> Cham
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java#L1455
>>>
>>> On Wed, Sep 4, 2019 at 2:42 AM Ziyad Muhammed <mm...@gmail.com> wrote:
>>>
>>>> Hi all
>>>>
>>>> I have a beam pipeline running with cloud dataflow that produces avro
>>>> files on GCS. Window duration is 1 minute and currently the job is running
>>>> with 64 cores (16 * n1-standard-4). Per minute the data produced is around
>>>> 2GB.
>>>>
>>>> Is there any recommendation on the number of avro files to specify?
>>>> Currently I'm using 64 (to match with the number of cores). Will a very
>>>> high number help in increasing the write throughput?
>>>> I saw that BigqueryIO with FILE_LOADS is using a default value of 1000
>>>> files.
>>>>
>>>> I tried some random values, but couldn't infer a pattern when is it
>>>> more performant.
>>>>
>>>> Any suggestion is hugely appreciated.
>>>>
>>>> Best
>>>> Ziyad
>>>>
>>>

Re: AvroIO Windowed Writes - Number of files to specify

Posted by Ziyad Muhammed <mm...@gmail.com>.
Hi Cham

Any update on this?

Best
Ziyad


On Thu, Sep 5, 2019 at 5:43 PM Ziyad Muhammed <mm...@gmail.com> wrote:

> Hi Cham
>
> I tried that before. Apparently it's not accepted by either direct runner
> or dataflow runner. I get the below error:
>
> Exception in thread "main" java.lang.IllegalArgumentException: When
>> applying WriteFiles to an unbounded PCollection, must specify number of
>> output shards explicitly
>> at
>> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>> at org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:299)
>> at org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:109)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
>> at org.apache.beam.sdk.io.AvroIO$TypedWrite.expand(AvroIO.java:1519)
>> at org.apache.beam.sdk.io.AvroIO$TypedWrite.expand(AvroIO.java:1155)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:357)
>> at org.apache.beam.sdk.io.AvroIO$Write.expand(AvroIO.java:1659)
>> at org.apache.beam.sdk.io.AvroIO$Write.expand(AvroIO.java:1541)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
>>
>
>
>
> Best
> Ziyad
>
>
> On Wed, Sep 4, 2019 at 6:45 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>> Do you mean the value to specify for number of shards to write [1] ?
>>
>> For this I think it's better to not specify any value which will give the
>> runner the most flexibility.
>>
>> Thanks,
>> Cham
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java#L1455
>>
>> On Wed, Sep 4, 2019 at 2:42 AM Ziyad Muhammed <mm...@gmail.com> wrote:
>>
>>> Hi all
>>>
>>> I have a beam pipeline running with cloud dataflow that produces avro
>>> files on GCS. Window duration is 1 minute and currently the job is running
>>> with 64 cores (16 * n1-standard-4). Per minute the data produced is around
>>> 2GB.
>>>
>>> Is there any recommendation on the number of avro files to specify?
>>> Currently I'm using 64 (to match with the number of cores). Will a very
>>> high number help in increasing the write throughput?
>>> I saw that BigqueryIO with FILE_LOADS is using a default value of 1000
>>> files.
>>>
>>> I tried some random values, but couldn't infer a pattern when is it more
>>> performant.
>>>
>>> Any suggestion is hugely appreciated.
>>>
>>> Best
>>> Ziyad
>>>
>>

Re: AvroIO Windowed Writes - Number of files to specify

Posted by Ziyad Muhammed <mm...@gmail.com>.
Hi Cham

I tried that before. Apparently it's not accepted by either direct runner
or dataflow runner. I get the below error:

Exception in thread "main" java.lang.IllegalArgumentException: When
> applying WriteFiles to an unbounded PCollection, must specify number of
> output shards explicitly
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
> at org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:299)
> at org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:109)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
> at org.apache.beam.sdk.io.AvroIO$TypedWrite.expand(AvroIO.java:1519)
> at org.apache.beam.sdk.io.AvroIO$TypedWrite.expand(AvroIO.java:1155)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:357)
> at org.apache.beam.sdk.io.AvroIO$Write.expand(AvroIO.java:1659)
> at org.apache.beam.sdk.io.AvroIO$Write.expand(AvroIO.java:1541)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
>



Best
Ziyad


On Wed, Sep 4, 2019 at 6:45 PM Chamikara Jayalath <ch...@google.com>
wrote:

> Do you mean the value to specify for number of shards to write [1] ?
>
> For this I think it's better to not specify any value which will give the
> runner the most flexibility.
>
> Thanks,
> Cham
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java#L1455
>
> On Wed, Sep 4, 2019 at 2:42 AM Ziyad Muhammed <mm...@gmail.com> wrote:
>
>> Hi all
>>
>> I have a beam pipeline running with cloud dataflow that produces avro
>> files on GCS. Window duration is 1 minute and currently the job is running
>> with 64 cores (16 * n1-standard-4). Per minute the data produced is around
>> 2GB.
>>
>> Is there any recommendation on the number of avro files to specify?
>> Currently I'm using 64 (to match with the number of cores). Will a very
>> high number help in increasing the write throughput?
>> I saw that BigqueryIO with FILE_LOADS is using a default value of 1000
>> files.
>>
>> I tried some random values, but couldn't infer a pattern when is it more
>> performant.
>>
>> Any suggestion is hugely appreciated.
>>
>> Best
>> Ziyad
>>
>

Re: AvroIO Windowed Writes - Number of files to specify

Posted by Chamikara Jayalath <ch...@google.com>.
Do you mean the value to specify for number of shards to write [1] ?

For this I think it's better to not specify any value which will give the
runner the most flexibility.

Thanks,
Cham

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java#L1455

On Wed, Sep 4, 2019 at 2:42 AM Ziyad Muhammed <mm...@gmail.com> wrote:

> Hi all
>
> I have a beam pipeline running with cloud dataflow that produces avro
> files on GCS. Window duration is 1 minute and currently the job is running
> with 64 cores (16 * n1-standard-4). Per minute the data produced is around
> 2GB.
>
> Is there any recommendation on the number of avro files to specify?
> Currently I'm using 64 (to match with the number of cores). Will a very
> high number help in increasing the write throughput?
> I saw that BigqueryIO with FILE_LOADS is using a default value of 1000
> files.
>
> I tried some random values, but couldn't infer a pattern when is it more
> performant.
>
> Any suggestion is hugely appreciated.
>
> Best
> Ziyad
>