You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Chamikara Jayalath (JIRA)" <ji...@apache.org> on 2019/01/10 16:32:00 UTC

[jira] [Commented] (BEAM-5694) OOMs on Pub/Sub to BigQuery via FILE_LOADS

    [ https://issues.apache.org/jira/browse/BEAM-5694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16739588#comment-16739588 ] 

Chamikara Jayalath commented on BEAM-5694:
------------------------------------------

R: [~reuvenlax]

> OOMs on Pub/Sub to BigQuery via FILE_LOADS
> ------------------------------------------
>
>                 Key: BEAM-5694
>                 URL: https://issues.apache.org/jira/browse/BEAM-5694
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.7.0
>         Environment: Google Dataflow
>            Reporter: Coda Hale
>            Assignee: Chamikara Jayalath
>            Priority: Major
>
> I've got a streaming Dataflow job which streams data from a Pub/Sub subscription to a single BigQuery table that I'm experimenting with moving to batch loads via BigQueryIO.Method.FILE_LOADS, but the only way I can get the job to successfully run is by increasing worker memory from 15GB to 52GB, which seems like a lot.
> I haven't been able to get a heap dump, but observing the job I can see ~5GB of records accumulate in GroupByDestination before the trigger duration elapses and WriteGroupedRecords processes those, at which point I see OOM errors in WriteGroupedRecords:
> {{Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.OutOfMemoryError: Java heap space}}{{        org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)}}{{        org.apache.beam.sdk.io.gcp.bigquery.WriteGroupedRecordsToFiles$DoFnInvoker.invokeProcessElement(Unknown Source)}}
> Like I said, I can resolve this by running the job with n1-highmem-8 machines, but this seems odd. The job is explicitly sharding data to keep per-worker requirements low but there's still a per-worker bottleneck about the size of the entire dataset. Increasing numFileShards doesn't seem to affect this, either — increasing from 100 to 1,000 to 10,000 changed the number of files but not the OOMs.
> The pipeline is fairly standard, but here's the code edited for confidentiality:
> {code:java}
>     pipeline
>         .apply("Read", PubsubIO.readMessages().fromSubscription(subscription))
>         .apply("Transform", ParDo.of(new MtoNFunction()))
>         .apply(
>             "Write",
>             BigQueryIO.<TableRow>write()
>                 .withFormatFunction(a -> a)
>                 .to(tableRef)
>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>                 .withTriggeringFrequency(Duration.standardMinutes(5))
>                 .withNumFileShards(100_000)
>                 .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>                 .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>                 .withJsonSchema("redacted")
>                 .withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(opts.getGcpTempLocation())));
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)