You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by amit kumar <ak...@gmail.com> on 2020/06/15 08:14:28 UTC

DynamicMessage in protobufs for re-usable beam pipelines

Hi,


I intend to use Protobuf options to trigger different transforms and use
metadata from storage proto options for sink partitioning  etc.. and also
allow different protobuf message types flowing via the same pipeline,
running as different instances of the pipeline.

I am able to parse descriptors, fields and options from file descriptors
compiled externally to the beam pipeline jar.


I am not able to use dynamicMessage.getDefaultInstanceForType() in the Sink
transforms PTransform<PCollection<T>, PDone> which need a defaultInstance
of the message type to persist the data since it throws
com.google.protobuf.DynamicMessage not Serializable.

I wanted to check if there is a way to use a generic proto in a beam
pipeline and if there are any examples of protobuf reflection which can be
used in this case or if there is any recommended way to achieve this
functionality.



Many Thanks,

Amit

Re: DynamicMessage in protobufs for re-usable beam pipelines

Posted by amit kumar <ak...@gmail.com>.
Thanks Brian for your response, Alex's code is very helpful. For my current
case I will use reflections to get default instance types of proto messages.
Another way I think would be to de-couple converters and sinks to bypass
this issue and do some of the conversions inside DoFn.

Regards,
Amit

On Mon, Jun 15, 2020 at 11:28 AM Brian Hulette <bh...@google.com> wrote:

> I don't think I can help with your specific issue, but I can point you to
> some potentially useful code. +Alex Van Boxel <al...@vanboxel.be> was
> working on a very similar strategy and added a lot of code for mapping
> protobufs to Beam schemas which you may be able to take advantage of. He
> added options to Beam schemas [1], and the ability to map protobuf options
> to schema options. He also added schema support for dynamic messages in [2].
>
> Brian
>
> [1]
> https://cwiki.apache.org/confluence/display/BEAM/%5BBIP-1%5D+Beam+Schema+Options
> [2] https://github.com/apache/beam/pull/10502
>
> On Mon, Jun 15, 2020 at 1:15 AM amit kumar <ak...@gmail.com> wrote:
>
>> Hi,
>>
>>
>> I intend to use Protobuf options to trigger different transforms and use
>> metadata from storage proto options for sink partitioning  etc.. and
>> also allow different protobuf message types flowing via the same pipeline,
>> running as different instances of the pipeline.
>>
>> I am able to parse descriptors, fields and options from file descriptors
>> compiled externally to the beam pipeline jar.
>>
>>
>> I am not able to use dynamicMessage.getDefaultInstanceForType() in the
>> Sink transforms PTransform<PCollection<T>, PDone> which need a
>> defaultInstance of the message type to persist the data since it throws
>> com.google.protobuf.DynamicMessage not Serializable.
>>
>> I wanted to check if there is a way to use a generic proto in a beam
>> pipeline and if there are any examples of protobuf reflection which can be
>> used in this case or if there is any recommended way to achieve this
>> functionality.
>>
>>
>>
>> Many Thanks,
>>
>> Amit
>>
>

Re: DynamicMessage in protobufs for re-usable beam pipelines

Posted by Brian Hulette <bh...@google.com>.
I don't think I can help with your specific issue, but I can point you to
some potentially useful code. +Alex Van Boxel <al...@vanboxel.be> was
working on a very similar strategy and added a lot of code for mapping
protobufs to Beam schemas which you may be able to take advantage of. He
added options to Beam schemas [1], and the ability to map protobuf options
to schema options. He also added schema support for dynamic messages in [2].

Brian

[1]
https://cwiki.apache.org/confluence/display/BEAM/%5BBIP-1%5D+Beam+Schema+Options
[2] https://github.com/apache/beam/pull/10502

On Mon, Jun 15, 2020 at 1:15 AM amit kumar <ak...@gmail.com> wrote:

> Hi,
>
>
> I intend to use Protobuf options to trigger different transforms and use
> metadata from storage proto options for sink partitioning  etc.. and also
> allow different protobuf message types flowing via the same pipeline,
> running as different instances of the pipeline.
>
> I am able to parse descriptors, fields and options from file descriptors
> compiled externally to the beam pipeline jar.
>
>
> I am not able to use dynamicMessage.getDefaultInstanceForType() in the
> Sink transforms PTransform<PCollection<T>, PDone> which need a
> defaultInstance of the message type to persist the data since it throws
> com.google.protobuf.DynamicMessage not Serializable.
>
> I wanted to check if there is a way to use a generic proto in a beam
> pipeline and if there are any examples of protobuf reflection which can be
> used in this case or if there is any recommended way to achieve this
> functionality.
>
>
>
> Many Thanks,
>
> Amit
>