You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Coda Hale (JIRA)" <ji...@apache.org> on 2018/10/09 20:11:00 UTC

[jira] [Updated] (BEAM-5694) OOMs on Pub/Sub to BigQuery via FILE_LOADS

     [ https://issues.apache.org/jira/browse/BEAM-5694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Coda Hale updated BEAM-5694:
----------------------------
    Description: 
I've got a streaming Dataflow job which streams data from a Pub/Sub subscription to a single BigQuery table that I'm experimenting with moving to batch loads via BigQueryIO.Method.FILE_LOADS, but the only way I can get the job to successfully run is by increasing worker memory from 15GB to 52GB, which seems like a lot.

I haven't been able to get a heap dump, but observing the job I can see ~5GB of records accumulate in GroupByDestination before the trigger duration elapses and WriteGroupedRecords processes those, at which point I see OOM errors in WriteGroupedRecords:

{{Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.OutOfMemoryError: Java heap space}}{{        org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)}}{{        org.apache.beam.sdk.io.gcp.bigquery.WriteGroupedRecordsToFiles$DoFnInvoker.invokeProcessElement(Unknown Source)}}

Like I said, I can resolve this by running the job with n1-highmem-8 machines, but this seems odd. The job is explicitly sharding data to keep per-worker requirements low but there's still a per-worker bottleneck about the size of the entire dataset. Increasing numFileShards doesn't seem to affect this, either — increasing from 100 to 1,000 to 10,000 changed the number of files but not the OOMs.

The pipeline is fairly standard, but here's the code edited for confidentiality:

{code:java}
    pipeline
        .apply("Read", PubsubIO.readMessages().fromSubscription(subscription))
        .apply("Transform", ParDo.of(new MtoNFunction()))
        .apply(
            "Write",
            BigQueryIO.<TableRow>write()
                .withFormatFunction(a -> a)
                .to(tableRef)
                .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
                .withTriggeringFrequency(Duration.standardMinutes(5))
                .withNumFileShards(100_000)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                .withJsonSchema("redacted")
                .withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(opts.getGcpTempLocation())));
{code}


  was:
I've got a streaming Dataflow job which streams data from a Pub/Sub subscription to a single BigQuery table that I'm experimenting with moving to batch loads via BigQueryIO.Method.FILE_LOADS, but the only way I can get the job to successfully run is by increasing worker memory from 15GB to 52GB, which seems like a lot.

I haven't been able to get a heap dump, but observing the job I can see ~5GB of records accumulate in GroupByDestination before the trigger duration elapses and WriteGroupedRecords processes those, at which point I see OOM errors in WriteGroupedRecords:

{{Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.OutOfMemoryError: Java heap space}}{{        org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)}}{{        org.apache.beam.sdk.io.gcp.bigquery.WriteGroupedRecordsToFiles$DoFnInvoker.invokeProcessElement(Unknown Source)}}

Like I said, I can resolve this by running the job with n1-highmem-8 machines, but this seems odd. The job is explicitly sharding data to keep per-worker requirements low but there's still a per-worker bottleneck about the size of the entire dataset. Increasing numFileShards doesn't seem to affect this, either — increasing from 100 to 1,000 to 10,000 changed the number of files but not the OOMs.

The pipeline is fairly standard, but here's the code edited for confidentiality:

{{ pipeline}}
{{ .apply("Read", PubsubIO.readMessages().fromSubscription(subscription))}}
{{ .apply("Transform", ParDo.of(new MtoNFunction()))}}
{{ .apply("Write",}}
{{ BigQueryIO.<TableRow>write()}}
{{ .withFormatFunction(a -> a)}}
{{ .to(tableRef)}}
{{ .withMethod(BigQueryIO.Write.Method.FILE_LOADS)}}
{{ .withTriggeringFrequency(Duration.standardMinutes(5))}}
{{ .withNumFileShards(100_000)}}
{{ .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)}}
{{ .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)}}
{{ .withJsonSchema("redacted"])}}
{{ .withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(opts.getGcpTempLocation())));}}


> OOMs on Pub/Sub to BigQuery via FILE_LOADS
> ------------------------------------------
>
>                 Key: BEAM-5694
>                 URL: https://issues.apache.org/jira/browse/BEAM-5694
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.7.0
>         Environment: Google Dataflow
>            Reporter: Coda Hale
>            Assignee: Chamikara Jayalath
>            Priority: Major
>
> I've got a streaming Dataflow job which streams data from a Pub/Sub subscription to a single BigQuery table that I'm experimenting with moving to batch loads via BigQueryIO.Method.FILE_LOADS, but the only way I can get the job to successfully run is by increasing worker memory from 15GB to 52GB, which seems like a lot.
> I haven't been able to get a heap dump, but observing the job I can see ~5GB of records accumulate in GroupByDestination before the trigger duration elapses and WriteGroupedRecords processes those, at which point I see OOM errors in WriteGroupedRecords:
> {{Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.OutOfMemoryError: Java heap space}}{{        org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)}}{{        org.apache.beam.sdk.io.gcp.bigquery.WriteGroupedRecordsToFiles$DoFnInvoker.invokeProcessElement(Unknown Source)}}
> Like I said, I can resolve this by running the job with n1-highmem-8 machines, but this seems odd. The job is explicitly sharding data to keep per-worker requirements low but there's still a per-worker bottleneck about the size of the entire dataset. Increasing numFileShards doesn't seem to affect this, either — increasing from 100 to 1,000 to 10,000 changed the number of files but not the OOMs.
> The pipeline is fairly standard, but here's the code edited for confidentiality:
> {code:java}
>     pipeline
>         .apply("Read", PubsubIO.readMessages().fromSubscription(subscription))
>         .apply("Transform", ParDo.of(new MtoNFunction()))
>         .apply(
>             "Write",
>             BigQueryIO.<TableRow>write()
>                 .withFormatFunction(a -> a)
>                 .to(tableRef)
>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>                 .withTriggeringFrequency(Duration.standardMinutes(5))
>                 .withNumFileShards(100_000)
>                 .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>                 .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>                 .withJsonSchema("redacted")
>                 .withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(opts.getGcpTempLocation())));
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)