You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Rafael Fernandez (JIRA)" <ji...@apache.org> on 2018/08/08 19:03:00 UTC

[jira] [Resolved] (BEAM-3234) PubsubIO batch size should be configurable

     [ https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Rafael Fernandez resolved BEAM-3234.
------------------------------------
       Resolution: Duplicate
    Fix Version/s: Not applicable

> PubsubIO batch size should be configurable
> ------------------------------------------
>
>                 Key: BEAM-3234
>                 URL: https://issues.apache.org/jira/browse/BEAM-3234
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.1.0, 2.2.0
>            Reporter: Neville Li
>            Priority: Minor
>             Fix For: Not applicable
>
>
> Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard coded batch size that may cause this limit to be exceeded in some cases.
> https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885
> {code}
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.GenerateSequence;
> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.MapElements;
> import org.apache.beam.sdk.values.TypeDescriptor;
> public class Test {
>   public static void main(String[] args) {
>     PipelineOptions options = PipelineOptionsFactory.create();
>     Pipeline pipeline = Pipeline.create(options);
>     pipeline
>         .apply(GenerateSequence.from(0).to(100))
>         .apply(MapElements
>             .into(TypeDescriptor.of(String.class))
>             .via(x -> {
>               StringBuilder b = new StringBuilder();
>               for (int i = 0; i < 10000000; i++) {
>                 b.append("x");
>               }
>               return b.toString();
>             }))
>         .apply(PubsubIO
>             .writeStrings()
>             .to("projects/scio-playground/topics/payload-test"));
>     pipeline.run().waitUntilFinish();
>   }
> }
> {code}
> The above code throws the following error:
> {code}
> [error] Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
> [error] {
> [error]   "code" : 400,
> [error]   "errors" : [ {
> [error]     "domain" : "global",
> [error]     "message" : "Request payload size exceeds the limit: 10485760 bytes.",
> [error]     "reason" : "badRequest"
> [error]   } ],
> [error]   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
> [error]   "status" : "INVALID_ARGUMENT"
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)