You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Rafael Fernandez (JIRA)" <ji...@apache.org> on 2018/08/08 19:04:00 UTC
[jira] [Commented] (BEAM-3234) PubsubIO batch size should be
configurable
[ https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16573716#comment-16573716 ]
Rafael Fernandez commented on BEAM-3234:
----------------------------------------
For Dataflow: Not that this fix will affect Dataflow Batch only.
> PubsubIO batch size should be configurable
> ------------------------------------------
>
> Key: BEAM-3234
> URL: https://issues.apache.org/jira/browse/BEAM-3234
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp
> Affects Versions: 2.1.0, 2.2.0
> Reporter: Neville Li
> Priority: Minor
> Fix For: Not applicable
>
>
> Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard coded batch size that may cause this limit to be exceeded in some cases.
> https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885
> {code}
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.GenerateSequence;
> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.MapElements;
> import org.apache.beam.sdk.values.TypeDescriptor;
> public class Test {
> public static void main(String[] args) {
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply(GenerateSequence.from(0).to(100))
> .apply(MapElements
> .into(TypeDescriptor.of(String.class))
> .via(x -> {
> StringBuilder b = new StringBuilder();
> for (int i = 0; i < 10000000; i++) {
> b.append("x");
> }
> return b.toString();
> }))
> .apply(PubsubIO
> .writeStrings()
> .to("projects/scio-playground/topics/payload-test"));
> pipeline.run().waitUntilFinish();
> }
> }
> {code}
> The above code throws the following error:
> {code}
> [error] Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
> [error] {
> [error] "code" : 400,
> [error] "errors" : [ {
> [error] "domain" : "global",
> [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.",
> [error] "reason" : "badRequest"
> [error] } ],
> [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.",
> [error] "status" : "INVALID_ARGUMENT"
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)