You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Chamikara Jayalath (JIRA)" <ji...@apache.org> on 2019/01/10 16:32:00 UTC
[jira] [Commented] (BEAM-5694) OOMs on Pub/Sub to BigQuery via
FILE_LOADS
[ https://issues.apache.org/jira/browse/BEAM-5694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16739588#comment-16739588 ]
Chamikara Jayalath commented on BEAM-5694:
------------------------------------------
R: [~reuvenlax]
> OOMs on Pub/Sub to BigQuery via FILE_LOADS
> ------------------------------------------
>
> Key: BEAM-5694
> URL: https://issues.apache.org/jira/browse/BEAM-5694
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp
> Affects Versions: 2.7.0
> Environment: Google Dataflow
> Reporter: Coda Hale
> Assignee: Chamikara Jayalath
> Priority: Major
>
> I've got a streaming Dataflow job which streams data from a Pub/Sub subscription to a single BigQuery table that I'm experimenting with moving to batch loads via BigQueryIO.Method.FILE_LOADS, but the only way I can get the job to successfully run is by increasing worker memory from 15GB to 52GB, which seems like a lot.
> I haven't been able to get a heap dump, but observing the job I can see ~5GB of records accumulate in GroupByDestination before the trigger duration elapses and WriteGroupedRecords processes those, at which point I see OOM errors in WriteGroupedRecords:
> {{Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.OutOfMemoryError: Java heap space}}{{ org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)}}{{ org.apache.beam.sdk.io.gcp.bigquery.WriteGroupedRecordsToFiles$DoFnInvoker.invokeProcessElement(Unknown Source)}}
> Like I said, I can resolve this by running the job with n1-highmem-8 machines, but this seems odd. The job is explicitly sharding data to keep per-worker requirements low but there's still a per-worker bottleneck about the size of the entire dataset. Increasing numFileShards doesn't seem to affect this, either — increasing from 100 to 1,000 to 10,000 changed the number of files but not the OOMs.
> The pipeline is fairly standard, but here's the code edited for confidentiality:
> {code:java}
> pipeline
> .apply("Read", PubsubIO.readMessages().fromSubscription(subscription))
> .apply("Transform", ParDo.of(new MtoNFunction()))
> .apply(
> "Write",
> BigQueryIO.<TableRow>write()
> .withFormatFunction(a -> a)
> .to(tableRef)
> .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
> .withTriggeringFrequency(Duration.standardMinutes(5))
> .withNumFileShards(100_000)
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
> .withJsonSchema("redacted")
> .withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(opts.getGcpTempLocation())));
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)