You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Hannes Gustafsson <ha...@hoxtonanalytics.com> on 2022/05/10 15:00:00 UTC

Re: Go SDK pubsubio.Write does not output anything running on Dataflow

This issue is still present on v2.38.0 for us, and we're working around it using a custom sink and the upstream PubSub API client directly.

I've now had a chance to experiment further and have made some progress.

Based on trial and error, it seems that the pubsubio.Write transform does not serialize messages into the format that the external transform expects, somehow.

Without modification I'm not able to write anything back to PubSub using the previous example, regardless of pubsubio.ReadOptions.WithAttributes. However, when modifying the pubsubio.Write transform to pass along PCollecton<*PubSubMessage> verbatim and turn PCollection<[]byte> to PCollection<*pb.PubsubMessage>, then it all starts to come through.

Here is the modification

> diff --git a/sdks/go/pkg/beam/io/pubsubio/pubsubio.go b/sdks/go/pkg/beam/io/pubsubio/pubsubio.go
> index 88b7395f44..e7d005f84f 100644
> --- a/sdks/go/pkg/beam/io/pubsubio/pubsubio.go
> +++ b/sdks/go/pkg/beam/io/pubsubio/pubsubio.go
> @@ -38,6 +38,7 @@ var (
>  func init() {
>         beam.RegisterType(reflect.TypeOf((*pb.PubsubMessage)(nil)).Elem())
>         beam.RegisterFunction(unmarshalMessageFn)
> + beam.RegisterFunction(wrapFn)
>  }
>
>  // ReadOptions represents options for reading from PubSub.
> @@ -81,6 +82,13 @@ func unmarshalMessageFn(raw []byte) (*pb.PubsubMessage, error) {
>         return &msg, nil
>  }
>
> +func wrapFn(data []byte) *pb.PubsubMessage {
> + m := pb.PubsubMessage{
> +         Data: data,
> + }
> + return &m
> +}
> +
>  // Write writes PubSubMessages or bytes to the given pubsub topic.
>  func Write(s beam.Scope, project, topic string, col beam.PCollection) {
>         s = s.Scope("pubsubio.Write")
> @@ -90,8 +98,8 @@ func Write(s beam.Scope, project, topic string, col beam.PCollection) {
>         }
>
>         out := col
> -   if col.Type().Type() != reflectx.ByteSlice {
> -           out = beam.ParDo(s, proto.Marshal, col)
> + if col.Type().Type() == reflectx.ByteSlice {
> +         out = beam.ParDo(s, wrapFn, col)
>         }
>         beam.External(s, writeURN, protox.MustEncode(payload), []beam.PCollection{out}, nil, false)
>  }



________________________________
From: Hannes Gustafsson <ha...@hoxtonanalytics.com>
Sent: 22 November 2021 09:12
To: user@beam.apache.org <us...@beam.apache.org>; Robert Burke <re...@google.com>
Subject: Re: Go SDK pubsubio.Write does not output anything running on Dataflow

Just wanted to follow up to mention that the topic format is probably a red herring for this issue. I noticed the Go documentation uses the same formats for PubSubWritePayload and PubSubReadPayload [1] [2] and while it may be that it needs an update, the Python SDK, also seemingly with stale documentation, still wants the projects/<project>/topics/<topic> format when it comes down to it.

​> ValueError: PubSub topic must be in the form "projects/<project>/topics/<topic>" (got '/topics/xyz-123/test').


I am seeing similar issues with the Python SDK where no output is sent to the output topic, but the symptoms are slightly different, presumably because of SDK differences. The following is the pipeline I've been testing with. I will be investigating a bit further but will reach for the workaround in the near-term.


> import argparse
> import logging
>
> import apache_beam as beam
>
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import StandardOptions
> #from apache_beam.io.external.gcp.pubsub import ReadFromPubSub, WriteToPubSub
> from apache_beam.io import ReadFromPubSub, WriteToPubSub
>
>
> if __name__ == '__main__':
>     parser = argparse.ArgumentParser()
>     logging.getLogger().setLevel(logging.INFO)
>     options = PipelineOptions()
>     options.view_as(StandardOptions).streaming = True
>     with beam.Pipeline(options=options) as pipeline:
>         (pipeline
>         | ReadFromPubSub(topic="projects/xyz-123/topics/input")
>         | WriteToPubSub(topic="projects/xyz-123/topics/output"))


[1] https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubWritePayload
[2] https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubReadPayload

From: Luke Cwik <lc...@google.com>
Sent: 19 November 2021 19:30
To: user@beam.apache.org <us...@beam.apache.org>; Robert Burke <re...@google.com>
Subject: Re: Go SDK pubsubio.Write does not output anything running on Dataflow

+Robert Burke

On Tue, Nov 16, 2021 at 10:41 AM Hannes Gustafsson <ha...@hoxtonanalytics.com> wrote:
While trying to reproduce the pipeline using the Python SDK I've noticed that the topic format is different for the write transform [1] compared to the read transform [2]. It seems it uses /topics/<project>/<topic> and /projects/<project>/topics/<topic> respectively. This is also documented in the Python SDK documentation [3].

Although note the doc string for PubSubWritePayload says

> // Topic format is: /topics/project_id/subscription_name

presumably meaning topic_name rather than subscription_name.

I'll try using the different format and report back.

[1] https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubWritePayload
[2] https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubReadPayload
[3] https://beam.apache.org/releases/pydoc/2.34.0/apache_beam.io.external.gcp.pubsub.html

Re: Go SDK pubsubio.Write does not output anything running on Dataflow

Posted by Robert Burke <re...@google.com>.
https://issues.apache.org/jira/browse/BEAM-14486 is now resolved (I ended
up finishing it up already)

The pubsub has been updated for the 2.40.0 release (cut on June 15th,
release likely July-ish) to always wrap and convert to the pubsub message
first. Not that only []byte will be automatically wrapped, otherwise the
PCollection must be the PubsubMessages. Not doing so will now cause a panic
at construction time, avoiding accidental misuse.

 As a bonus, it has the new Generic optimization registrations to reduce
per element overhead.

Thank you for your patience, and sorry for not getting this resolved sooner.

Robert Burke
Beam Go Busybody

On Tue, May 10, 2022 at 9:33 AM Robert Burke <re...@google.com> wrote:

> That's definitely news to me. Thank you for following up!
>
> Since your initial email. I recently validated that when one sends strings
> through (converted to []byte()), that we get the same strings back out. So
> the described behavior is weird. I had only checked the "raw bytes" path.
> In that case, the bytes would then be encoded with a length prefix and sent
> along.
>
> But based on your query, I then wondered if the original author for the Go
> pubsubio assumed that because the PubsubMessage Data field is the first
> field, that the wrapped and unwrapped encodings would be identical. I've
> done a cursory check of this, and they are not.
>
> Looking at the Python and Java implementations though, you are right: Go
> isn't properly wrapping values in PubsubMessage. The main thing we do need
> to maintain is that the coder for PCollection into the External Sink
> remains []byte. The Go SDK does automatically use proto marshalling for
> protos, but it's implemented as a custom coder, which would cause issues.
>
> I already have a doc update for pubsubio.Write in progress (to indicate
> that it doesn't work for Batch pipelines, unfortunately one must write
> one's own portable Sink in that case...), but I can also add a fix for this
> behavior, and additional usage documentation.
>
> The work around for older SDK versions would be to do the wrapping to
> PubsubMessage oneself and send that through, rather than rely on the
> frankly unspecified behavior.
>
> I'll link the PR to this thread when it's ready. I need to validate the
> various Write[Wrapped, Unwrapped] x Read[Wrapped, Unwrapped] cases first.
>
> Thanks again for following up,
> Robert Burke
> Beam Go Busybody
>
>
>
>
> On Tue, May 10, 2022 at 8:00 AM Hannes Gustafsson <
> hannes@hoxtonanalytics.com> wrote:
>
>> This issue is still present on v2.38.0 for us, and we're working around
>> it using a custom sink and the upstream PubSub API client directly.
>>
>> I've now had a chance to experiment further and have made some progress.
>>
>> Based on trial and error, it seems that the pubsubio.Write transform does
>> not serialize messages into the format that the external transform expects,
>> somehow.
>>
>> Without modification I'm not able to write anything back to PubSub using
>> the previous example, regardless of pubsubio.ReadOptions.WithAttributes.
>> However, when modifying the pubsubio.Write transform to pass along
>> PCollecton<*PubSubMessage> verbatim and turn PCollection<[]byte> to
>> PCollection<*pb.PubsubMessage>, then it all starts to come through.
>>
>> Here is the modification
>>
>> > diff --git a/sdks/go/pkg/beam/io/pubsubio/pubsubio.go
>> b/sdks/go/pkg/beam/io/pubsubio/pubsubio.go
>> > index 88b7395f44..e7d005f84f 100644
>> > --- a/sdks/go/pkg/beam/io/pubsubio/pubsubio.go
>> > +++ b/sdks/go/pkg/beam/io/pubsubio/pubsubio.go
>> > @@ -38,6 +38,7 @@ var (
>> >  func init() {
>> >
>> beam.RegisterType(reflect.TypeOf((*pb.PubsubMessage)(nil)).Elem())
>> >         beam.RegisterFunction(unmarshalMessageFn)
>> > + beam.RegisterFunction(wrapFn)
>> >  }
>> >
>> >  // ReadOptions represents options for reading from PubSub.
>> > @@ -81,6 +82,13 @@ func unmarshalMessageFn(raw []byte)
>> (*pb.PubsubMessage, error) {
>> >         return &msg, nil
>> >  }
>> >
>> > +func wrapFn(data []byte) *pb.PubsubMessage {
>> > + m := pb.PubsubMessage{
>> > +         Data: data,
>> > + }
>> > + return &m
>> > +}
>> > +
>> >  // Write writes PubSubMessages or bytes to the given pubsub topic.
>> >  func Write(s beam.Scope, project, topic string, col beam.PCollection) {
>> >         s = s.Scope("pubsubio.Write")
>> > @@ -90,8 +98,8 @@ func Write(s beam.Scope, project, topic string, col
>> beam.PCollection) {
>> >         }
>> >
>> >         out := col
>> > -   if col.Type().Type() != reflectx.ByteSlice {
>> > -           out = beam.ParDo(s, proto.Marshal, col)
>> > + if col.Type().Type() == reflectx.ByteSlice {
>> > +         out = beam.ParDo(s, wrapFn, col)
>> >         }
>> >         beam.External(s, writeURN, protox.MustEncode(payload),
>> []beam.PCollection{out}, nil, false)
>> >  }
>>
>>
>>
>> ------------------------------
>> *From:* Hannes Gustafsson <ha...@hoxtonanalytics.com>
>> *Sent:* 22 November 2021 09:12
>> *To:* user@beam.apache.org <us...@beam.apache.org>; Robert Burke <
>> rebo@google.com>
>> *Subject:* Re: Go SDK pubsubio.Write does not output anything running on
>> Dataflow
>>
>> Just wanted to follow up to mention that the topic format is probably a
>> red herring for this issue. I noticed the Go documentation uses the same
>> formats for PubSubWritePayload and PubSubReadPayload [1] [2] and while it
>> may be that it needs an update, the Python SDK, also seemingly with stale
>> documentation, still wants the projects/<project>/topics/<topic> format
>> when it comes down to it.
>>
>> ​> ValueError: PubSub topic must be in the form
>> "projects/<project>/topics/<topic>" (got '/topics/xyz-123/test').
>>
>>
>> I am seeing similar issues with the Python SDK where no output is sent to
>> the output topic, but the symptoms are slightly different, presumably
>> because of SDK differences. The following is the pipeline I've been testing
>> with. I will be investigating a bit further but will reach for the
>> workaround in the near-term.
>>
>>
>> > import argparse
>> > import logging
>> >
>> > import apache_beam as beam
>> >
>> > from apache_beam.options.pipeline_options import PipelineOptions
>> > from apache_beam.options.pipeline_options import StandardOptions
>> > #from apache_beam.io.external.gcp.pubsub import ReadFromPubSub,
>> WriteToPubSub
>> > from apache_beam.io import ReadFromPubSub, WriteToPubSub
>> >
>> >
>> > if __name__ == '__main__':
>> >     parser = argparse.ArgumentParser()
>> >     logging.getLogger().setLevel(logging.INFO)
>> >     options = PipelineOptions()
>> >     options.view_as(StandardOptions).streaming = True
>> >     with beam.Pipeline(options=options) as pipeline:
>> >         (pipeline
>> >         | ReadFromPubSub(topic="projects/xyz-123/topics/input")
>> >         | WriteToPubSub(topic="projects/xyz-123/topics/output"))
>>
>>
>> [1]
>> https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubWritePayload
>> [2]
>> https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubReadPayload
>>
>> From: Luke Cwik <lc...@google.com>
>> Sent: 19 November 2021 19:30
>> To: user@beam.apache.org <us...@beam.apache.org>; Robert Burke <
>> rebo@google.com>
>> Subject: Re: Go SDK pubsubio.Write does not output anything running on
>> Dataflow
>>
>> +Robert Burke
>>
>> On Tue, Nov 16, 2021 at 10:41 AM Hannes Gustafsson <
>> hannes@hoxtonanalytics.com> wrote:
>> While trying to reproduce the pipeline using the Python SDK I've noticed
>> that the topic format is different for the write transform [1] compared to
>> the read transform [2]. It seems it uses /topics/<project>/<topic> and
>> /projects/<project>/topics/<topic> respectively. This is also documented in
>> the Python SDK documentation [3].
>>
>> Although note the doc string for PubSubWritePayload says
>>
>> > // Topic format is: /topics/project_id/subscription_name
>>
>> presumably meaning topic_name rather than subscription_name.
>>
>> I'll try using the different format and report back.
>>
>> [1]
>> https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubWritePayload
>> [2]
>> https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubReadPayload
>> [3]
>> https://beam.apache.org/releases/pydoc/2.34.0/apache_beam.io.external.gcp.pubsub.html
>>
>

Re: Go SDK pubsubio.Write does not output anything running on Dataflow

Posted by Robert Burke <re...@google.com>.
That's definitely news to me. Thank you for following up!

Since your initial email. I recently validated that when one sends strings
through (converted to []byte()), that we get the same strings back out. So
the described behavior is weird. I had only checked the "raw bytes" path.
In that case, the bytes would then be encoded with a length prefix and sent
along.

But based on your query, I then wondered if the original author for the Go
pubsubio assumed that because the PubsubMessage Data field is the first
field, that the wrapped and unwrapped encodings would be identical. I've
done a cursory check of this, and they are not.

Looking at the Python and Java implementations though, you are right: Go
isn't properly wrapping values in PubsubMessage. The main thing we do need
to maintain is that the coder for PCollection into the External Sink
remains []byte. The Go SDK does automatically use proto marshalling for
protos, but it's implemented as a custom coder, which would cause issues.

I already have a doc update for pubsubio.Write in progress (to indicate
that it doesn't work for Batch pipelines, unfortunately one must write
one's own portable Sink in that case...), but I can also add a fix for this
behavior, and additional usage documentation.

The work around for older SDK versions would be to do the wrapping to
PubsubMessage oneself and send that through, rather than rely on the
frankly unspecified behavior.

I'll link the PR to this thread when it's ready. I need to validate the
various Write[Wrapped, Unwrapped] x Read[Wrapped, Unwrapped] cases first.

Thanks again for following up,
Robert Burke
Beam Go Busybody




On Tue, May 10, 2022 at 8:00 AM Hannes Gustafsson <
hannes@hoxtonanalytics.com> wrote:

> This issue is still present on v2.38.0 for us, and we're working around it
> using a custom sink and the upstream PubSub API client directly.
>
> I've now had a chance to experiment further and have made some progress.
>
> Based on trial and error, it seems that the pubsubio.Write transform does
> not serialize messages into the format that the external transform expects,
> somehow.
>
> Without modification I'm not able to write anything back to PubSub using
> the previous example, regardless of pubsubio.ReadOptions.WithAttributes.
> However, when modifying the pubsubio.Write transform to pass along
> PCollecton<*PubSubMessage> verbatim and turn PCollection<[]byte> to
> PCollection<*pb.PubsubMessage>, then it all starts to come through.
>
> Here is the modification
>
> > diff --git a/sdks/go/pkg/beam/io/pubsubio/pubsubio.go
> b/sdks/go/pkg/beam/io/pubsubio/pubsubio.go
> > index 88b7395f44..e7d005f84f 100644
> > --- a/sdks/go/pkg/beam/io/pubsubio/pubsubio.go
> > +++ b/sdks/go/pkg/beam/io/pubsubio/pubsubio.go
> > @@ -38,6 +38,7 @@ var (
> >  func init() {
> >
> beam.RegisterType(reflect.TypeOf((*pb.PubsubMessage)(nil)).Elem())
> >         beam.RegisterFunction(unmarshalMessageFn)
> > + beam.RegisterFunction(wrapFn)
> >  }
> >
> >  // ReadOptions represents options for reading from PubSub.
> > @@ -81,6 +82,13 @@ func unmarshalMessageFn(raw []byte)
> (*pb.PubsubMessage, error) {
> >         return &msg, nil
> >  }
> >
> > +func wrapFn(data []byte) *pb.PubsubMessage {
> > + m := pb.PubsubMessage{
> > +         Data: data,
> > + }
> > + return &m
> > +}
> > +
> >  // Write writes PubSubMessages or bytes to the given pubsub topic.
> >  func Write(s beam.Scope, project, topic string, col beam.PCollection) {
> >         s = s.Scope("pubsubio.Write")
> > @@ -90,8 +98,8 @@ func Write(s beam.Scope, project, topic string, col
> beam.PCollection) {
> >         }
> >
> >         out := col
> > -   if col.Type().Type() != reflectx.ByteSlice {
> > -           out = beam.ParDo(s, proto.Marshal, col)
> > + if col.Type().Type() == reflectx.ByteSlice {
> > +         out = beam.ParDo(s, wrapFn, col)
> >         }
> >         beam.External(s, writeURN, protox.MustEncode(payload),
> []beam.PCollection{out}, nil, false)
> >  }
>
>
>
> ------------------------------
> *From:* Hannes Gustafsson <ha...@hoxtonanalytics.com>
> *Sent:* 22 November 2021 09:12
> *To:* user@beam.apache.org <us...@beam.apache.org>; Robert Burke <
> rebo@google.com>
> *Subject:* Re: Go SDK pubsubio.Write does not output anything running on
> Dataflow
>
> Just wanted to follow up to mention that the topic format is probably a
> red herring for this issue. I noticed the Go documentation uses the same
> formats for PubSubWritePayload and PubSubReadPayload [1] [2] and while it
> may be that it needs an update, the Python SDK, also seemingly with stale
> documentation, still wants the projects/<project>/topics/<topic> format
> when it comes down to it.
>
> ​> ValueError: PubSub topic must be in the form
> "projects/<project>/topics/<topic>" (got '/topics/xyz-123/test').
>
>
> I am seeing similar issues with the Python SDK where no output is sent to
> the output topic, but the symptoms are slightly different, presumably
> because of SDK differences. The following is the pipeline I've been testing
> with. I will be investigating a bit further but will reach for the
> workaround in the near-term.
>
>
> > import argparse
> > import logging
> >
> > import apache_beam as beam
> >
> > from apache_beam.options.pipeline_options import PipelineOptions
> > from apache_beam.options.pipeline_options import StandardOptions
> > #from apache_beam.io.external.gcp.pubsub import ReadFromPubSub,
> WriteToPubSub
> > from apache_beam.io import ReadFromPubSub, WriteToPubSub
> >
> >
> > if __name__ == '__main__':
> >     parser = argparse.ArgumentParser()
> >     logging.getLogger().setLevel(logging.INFO)
> >     options = PipelineOptions()
> >     options.view_as(StandardOptions).streaming = True
> >     with beam.Pipeline(options=options) as pipeline:
> >         (pipeline
> >         | ReadFromPubSub(topic="projects/xyz-123/topics/input")
> >         | WriteToPubSub(topic="projects/xyz-123/topics/output"))
>
>
> [1]
> https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubWritePayload
> [2]
> https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubReadPayload
>
> From: Luke Cwik <lc...@google.com>
> Sent: 19 November 2021 19:30
> To: user@beam.apache.org <us...@beam.apache.org>; Robert Burke <
> rebo@google.com>
> Subject: Re: Go SDK pubsubio.Write does not output anything running on
> Dataflow
>
> +Robert Burke
>
> On Tue, Nov 16, 2021 at 10:41 AM Hannes Gustafsson <
> hannes@hoxtonanalytics.com> wrote:
> While trying to reproduce the pipeline using the Python SDK I've noticed
> that the topic format is different for the write transform [1] compared to
> the read transform [2]. It seems it uses /topics/<project>/<topic> and
> /projects/<project>/topics/<topic> respectively. This is also documented in
> the Python SDK documentation [3].
>
> Although note the doc string for PubSubWritePayload says
>
> > // Topic format is: /topics/project_id/subscription_name
>
> presumably meaning topic_name rather than subscription_name.
>
> I'll try using the different format and report back.
>
> [1]
> https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubWritePayload
> [2]
> https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubReadPayload
> [3]
> https://beam.apache.org/releases/pydoc/2.34.0/apache_beam.io.external.gcp.pubsub.html
>