You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by "Mitchell, Patrick" <Pa...@anz.com> on 2020/09/10 23:49:03 UTC

Go SDK & Dataflow

Beam team,

I’m currently making use of the Beam Go SDK to construct a pipeline for creating and inserting large text data files into GCS.

The pipeline is relatively simple; prepare a bracket of source data, transform it into fleshed out data sets, write to GCS with the textio package.

func main() {
   flag.Parse()
   ctx := context.Background()
   beam.Init()
   p := beam.NewPipeline()
   s := p.Root()
   // Get a PCollection of account numbers
   accNumCol := beam.CreateList(s, makeRange(*accountStart, *accountEnd))
   // Transform the PCollection of account numbers into a collection of strings that each represent a group of account data
   accountCol := beam.ParDo(s, accountNumToAccountWithRows, accNumCol)
   // Write all account blocks to a single file in GCS
   textio.Write(s, fmt.Sprintf("gs://files/CC%d-ACC%d.txt", *accountStart, *accountEnd), accountCol)
   if err := beamx.Run(context.Background(), p); err != nil {
      log.Exitf(ctx, "Failed to execute job: %v", err)
   }
}

Currently, for smaller files (e.g, 13MB or 30,000 lines of text) we are not experiencing any issues with the pipeline when executed in Dataflow. When preparing larger datasets that would generate medium sized files of approx. 130MB, executing the write to GCS step in the Dataflow pipeline fails consistently with the following kind of errors:

Error message from worker: process bundle failed for instruction process_bundle-3 using plan process-bundle-descriptor-47 : while executing Process for Plan[process-bundle-descriptor-47]:
2: ParDo[textio.writeFileFn] Out:[]
1: DataSource[S[ptransform-46@localhost:12371], 0] Coder:W;coder-63<CoGBK;coder-64<int[varintz;c2];coder-65,string;coder-67[string]>>!GWC Out:2
         caused by:
source failed
         caused by:
rpc error: code = ResourceExhausted desc = grpc: received message larger than max (104858536 vs. 52428800)
&
  "jsonPayload": {
    "worker": "go-job-1-1599375664191334-09060001-zizk-harness-ptm6",
    "job": "2020-09-06_00_01_25-2750019264422346896",
    "work": "process_bundle-1",
    "message": "DataChannel.read localhost:12371 bad: rpc error: code = ResourceExhausted desc = grpc: received message larger than max (104938332 vs. 52428800)",
    "logger": <project>/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/datamgr.go:261",
    "portability_worker_id": "1"
}

When executing the pipeline locally, without integration with GCS i.e. writing to example.txt instead of gs://example.txt no issues are apparent and files of arbitrary size can be generated. Running the pipeline locally with an integration to GCS also seems to work with files of any size, thought the upload can be quite slow. It is only when executing the pipeline on Dataflow that writing the files to GCS fail, it seems odd that we would encounter issues uploading files of relatively medium sizes to GCS, with such a simple pipeline. The errors seem to indicate that something is receiving a message size larger than 52MB (52428800), but I am unable to determine what, and where this magic number is coming from.

I would appreciate any insight into this issue with our pipeline’s integration with GCS, anyone seen this before?

Regards,
Patrick Mitchell
"This e-mail and any attachments to it (the "Communication") is, unless otherwise stated, confidential, may contain copyright material and is for the use only of the intended recipient. If you receive the Communication in error, please notify the sender immediately by return e-mail, delete the Communication and the return e-mail, and do not read, copy, retransmit or otherwise deal with it. Any views expressed in the Communication are those of the individual sender only, unless expressly stated to be those of Australia and New Zealand Banking Group Limited ABN 11 005 357 522, or any of its related entities including ANZ Bank New Zealand Limited (together "ANZ"). ANZ does not accept liability in connection with the integrity of or errors in the Communication, computer virus, data corruption, interference or delay arising from or in respect of the Communication."

Re: Go SDK & Dataflow

Posted by Robert Burke <ro...@frantil.com>.
This is a default limit on the receive Buffer on the SDK side.

We set that it's maximum on July 30th. It's likely your copy doesn't have
that change yet.

https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/util/grpcx/dial.go#L39

The Go SDK is experimental and not meaningfully versioned so syncing to
head to pick up the change isn't unreasonable.

Further Dataflow doesn't officially support the Go SDK at this time, and is
currently use at your own risk. It may or may not work, and isn't
sufficiently tested on Dataflow for all cases at this time (in particular
the IOs).

On Thu, Sep 10, 2020, 5:09 PM Mitchell, Patrick <Pa...@anz.com>
wrote:

> Beam team,
>
> I’m currently making use of the Beam Go SDK to construct a pipeline for
> creating and inserting large text data files into GCS.
>
> The pipeline is relatively simple; prepare a bracket of source data,
> transform it into fleshed out data sets, write to GCS with the textio
> package.
>
> func main() {
>    flag.Parse()
>    ctx := context.Background()
>    beam.Init()
>    p := beam.NewPipeline()
>    s := p.Root()
>    // Get a PCollection of account numbers
>    accNumCol := beam.CreateList(s, makeRange(*accountStart, *accountEnd))
>    // Transform the PCollection of account numbers into a collection of
> strings that each represent a group of account data
>    accountCol := beam.ParDo(s, accountNumToAccountWithRows, accNumCol)
>    // Write all account blocks to a single file in GCS
>    textio.Write(s, fmt.Sprintf("gs://files/CC%d-ACC%d.txt", *accountStart,
> *accountEnd), accountCol)
>    if err := beamx.Run(context.Background(), p); err != nil {
>       log.Exitf(ctx, "Failed to execute job: %v", err)
>    }
> }
>
> Currently, for smaller files (e.g, 13MB or 30,000 lines of text) we are
> not experiencing any issues with the pipeline when executed in Dataflow.
> When preparing larger datasets that would generate medium sized files of
> approx. 130MB, executing the write to GCS step in the Dataflow pipeline
> fails consistently with the following kind of errors:
>
> Error message from worker: process bundle failed for instruction
> process_bundle-3 using plan process-bundle-descriptor-47 : while executing
> Process for Plan[process-bundle-descriptor-47]:
> 2: ParDo[textio.writeFileFn] Out:[]
> 1: DataSource[S[ptransform-46@localhost:12371], 0]
> Coder:W;coder-63<CoGBK;coder-64<int[varintz;c2];coder-65,string;coder-67[string]>>!GWC
> Out:2
>          caused by:
> source failed
>          caused by:
> rpc error: code = ResourceExhausted desc = grpc: received message larger
> than max (104858536 vs. 52428800)
> &
>   "jsonPayload": {
>     "worker": "go-job-1-1599375664191334-09060001-zizk-harness-ptm6",
>     "job": "2020-09-06_00_01_25-2750019264422346896",
>     "work": "process_bundle-1",
>     "message": "DataChannel.read localhost:12371 bad: rpc error: code =
> ResourceExhausted desc = grpc: received message larger than max (104938332
> vs. 52428800)",
>     "logger": <project>/vendor/
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/datamgr.go:261
> ",
>     "portability_worker_id": "1"
> }
>
> When executing the pipeline locally, without integration with GCS i.e.
> writing to example.txt instead of gs://example.txt no issues are apparent
> and files of arbitrary size can be generated. Running the pipeline locally
> with an integration to GCS also seems to work with files of any size,
> thought the upload can be quite slow. It is only when executing the
> pipeline on Dataflow that writing the files to GCS fail, it seems odd that
> we would encounter issues uploading files of relatively medium sizes to
> GCS, with such a simple pipeline. The errors seem to indicate that
> something is receiving a message size larger than 52MB (52428800), but I am
> unable to determine what, and where this magic number is coming from.
>
> I would appreciate any insight into this issue with our pipeline’s
> integration with GCS, anyone seen this before?
>
> Regards,
> Patrick Mitchell
> "This e-mail and any attachments to it (the "Communication") is, unless
> otherwise stated, confidential, may contain copyright material and is for
> the use only of the intended recipient. If you receive the Communication in
> error, please notify the sender immediately by return e-mail, delete the
> Communication and the return e-mail, and do not read, copy, retransmit or
> otherwise deal with it. Any views expressed in the Communication are those
> of the individual sender only, unless expressly stated to be those of
> Australia and New Zealand Banking Group Limited ABN 11 005 357 522, or any
> of its related entities including ANZ Bank New Zealand Limited (together
> "ANZ"). ANZ does not accept liability in connection with the integrity of
> or errors in the Communication, computer virus, data corruption,
> interference or delay arising from or in respect of the Communication."
>