You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/03 22:50:54 UTC

[GitHub] [beam] kennknowles opened a new issue, #19159: OOMs on Pub/Sub to BigQuery via FILE_LOADS

kennknowles opened a new issue, #19159:
URL: https://github.com/apache/beam/issues/19159

   I've got a streaming Dataflow job which streams data from a Pub/Sub subscription to a single BigQuery table that I'm experimenting with moving to batch loads via BigQueryIO.Method.FILE_LOADS, but the only way I can get the job to successfully run is by increasing worker memory from 15GB to 52GB, which seems like a lot.
   
   I haven't been able to get a heap dump, but observing the job I can see ~5GB of records accumulate in GroupByDestination before the trigger duration elapses and WriteGroupedRecords processes those, at which point I see OOM errors in WriteGroupedRecords:
   
   `Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.OutOfMemoryError: Java heap space``        org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)``        org.apache.beam.sdk.io.gcp.bigquery.WriteGroupedRecordsToFiles$DoFnInvoker.invokeProcessElement(Unknown Source)`
   
   Like I said, I can resolve this by running the job with n1-highmem-8 machines, but this seems odd. The job is explicitly sharding data to keep per-worker requirements low but there's still a per-worker bottleneck about the size of the entire dataset. Increasing numFileShards doesn't seem to affect this, either — increasing from 100 to 1,000 to 10,000 changed the number of files but not the OOMs.
   
   The pipeline is fairly standard, but here's the code edited for confidentiality:
   
   ```
   
       pipeline
           .apply("Read", PubsubIO.readMessages().fromSubscription(subscription))
       
      .apply("Transform", ParDo.of(new MtoNFunction()))
           .apply(
               "Write",
         
        BigQueryIO.<TableRow>write()
                   .withFormatFunction(a -> a)
                   .to(tableRef)
   
                  .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
                   .withTriggeringFrequency(Duration.standardMinutes(5))
   
                  .withNumFileShards(100_000)
                   .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
   
                  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
              
       .withJsonSchema("redacted")
                   .withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(opts.getGcpTempLocation())));
   
   ```
   
   
   
   Imported from Jira [BEAM-5694](https://issues.apache.org/jira/browse/BEAM-5694). Original Jira may contain additional context.
   Reported by: mc-coda.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org