You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Anjana Pydi <an...@bahwancybertek.com> on 2019/09/19 18:05:04 UTC

Publish message to pubsub topic after processing current input in beam streaming pipeline

Hi ,

I have a beam streaming pipeline which reads data from pubsub topic, use it to call an API and get responses, apply some transformations on the obtained responses and writes to output sinks.

Now, I need to add logic to write a 'process completed' message to another pubsub topic once after the process gets finished. Can some one please provide your thoughts on how can I add it.

I actually want to achieve this:

topic1 (data)-> triggers pipeline and writes complete message at end-> topic2 (complete msg)
When topic1 sees message on topic2, it again posts new data to topic1

Below is the code sample:

  pubsub_message = p | 'Read From Pubsub' >> beam.io.ReadStringsFromPubSub(topic=known_args.input_topic) | 'split and add' >> beam.ParDo(split_item)

  data = pubsub_message | 'API call' >> beam.FlatMap(lambda x: get_responses(x[0],
                datetime.strptime(x[1], "%Y-%m-%d %H:%M:%S"),
                datetime.strptime(x[2], "%Y-%m-%d %H:%M:%S"))) | 'WriteOutput' >> beam.Map(lambda output: send_to_output(output))

Thanks,
Anjana


----------------------------------------------------------------------------------------------------------------------- The information contained in this communication is intended solely for the use of the individual or entity to whom it is addressed and others authorized to receive it. It may contain confidential or legally privileged information. If you are not the intended recipient you are hereby notified that any disclosure, copying, distribution or taking any action in reliance on the contents of this information is strictly prohibited and may be unlawful. If you are not the intended recipient, please notify us immediately by responding to this email and then delete it from your system. Bahwan Cybertek is neither liable for the proper and complete transmission of the information contained in this communication nor for any delay in its receipt.

Re: Publish message to pubsub topic after processing current input in beam streaming pipeline

Posted by Robert Bradshaw <ro...@google.com>.
OK, as I read it get_responses yields several elements for a single
input message. It sounds like you want to defer writing to PubSub
until after all of them are processed. The easiest way to do this
would be for get_responses to return a list of messages and
send_to_output to process the whole list of messages, returning the
single "done" signal which could then get written.

You could also do something by tagging the various outputs of
get_responses with a common key, and then doing a (windowed)
group-by-key to colocate them again, but that might be overkill in
this case.

On Thu, Sep 19, 2019 at 2:54 PM Anjana Pydi <an...@bahwancybertek.com> wrote:
>
> Hi Robert,
>
> Thanks for the reply. 'get_responses' method return a list of dictionaries and 'send_to_output' method takes each element of the list and simultaneously posts it to an API end point.
>
> My question is I just need to send a signal(write a message 'process completed' ) to pubsub topic2 only after completion of send_to_output step (i.e. after processing entire list from before step).
>
> If I add it after last step like below, I suspect it would write message for every element of the list. Is there any way that I can do it only once at the end?
>
> beam.Map(lambda output: send_to_output(output)) | 'process completed' >> beam.io.WriteStringsToPubSub(topic=known_args.output_topic)
>
> Regards,
> Anjana
> ________________________________________
> From: Robert Bradshaw [robertwb@google.com]
> Sent: Thursday, September 19, 2019 2:31 PM
> To: user
> Cc: Richard Amrith Lourdu
> Subject: Re: Publish message to pubsub topic after processing current input in beam streaming pipeline
>
> On Thu, Sep 19, 2019 at 11:05 AM Anjana Pydi
> <an...@bahwancybertek.com> wrote:
> >
> > Hi ,
> >
> > I have a beam streaming pipeline which reads data from pubsub topic, use it to call an API and get responses, apply some transformations on the obtained responses and writes to output sinks.
> >
> > Now, I need to add logic to write a 'process completed' message to another pubsub topic once after the process gets finished. Can some one please provide your thoughts on how can I add it.
> >
> > I actually want to achieve this:
> >
> > topic1 (data)-> triggers pipeline and writes complete message at end-> topic2 (complete msg)
>
> Your code below looks fine so far. I'm assuming your send_to_output
> function produces the message that you want to send to topic2, right?
> (BTW, you can write beam.Map(send_to_topic) rather than having to
> write beam.Map(lambda output: send_to_output(output)).)
>
> In that case, you just need to add
>
>     data | beam.io.WriteStringsToPubSub(topic2)
>
> > When topic1 sees message on topic2, it again posts new data to topic1
> >
> > Below is the code sample:
> >
> >   pubsub_message = p | 'Read From Pubsub' >> beam.io.ReadStringsFromPubSub(topic=known_args.input_topic) | 'split and add' >> beam.ParDo(split_item)
> >
> >   data = pubsub_message | 'API call' >> beam.FlatMap(lambda x: get_responses(x[0],
> >                 datetime.strptime(x[1], "%Y-%m-%d %H:%M:%S"),
> >                 datetime.strptime(x[2], "%Y-%m-%d %H:%M:%S"))) | 'WriteOutput' >> beam.Map(lambda output: send_to_output(output))
> >
> > Thanks,
> > Anjana
> >
> >
> > ----------------------------------------------------------------------------------------------------------------------- The information contained in this communication is intended solely for the use of the individual or entity to whom it is addressed and others authorized to receive it. It may contain confidential or legally privileged information. If you are not the intended recipient you are hereby notified that any disclosure, copying, distribution or taking any action in reliance on the contents of this information is strictly prohibited and may be unlawful. If you are not the intended recipient, please notify us immediately by responding to this email and then delete it from your system. Bahwan Cybertek is neither liable for the proper and complete transmission of the information contained in this communication nor for any delay in its receipt.
> ----------------------------------------------------------------------------------------------------------------------- The information contained in this communication is intended solely for the use of the individual or entity to whom it is addressed and others authorized to receive it. It may contain confidential or legally privileged information. If you are not the intended recipient you are hereby notified that any disclosure, copying, distribution or taking any action in reliance on the contents of this information is strictly prohibited and may be unlawful. If you are not the intended recipient, please notify us immediately by responding to this email and then delete it from your system. Bahwan Cybertek is neither liable for the proper and complete transmission of the information contained in this communication nor for any delay in its receipt.

RE: Publish message to pubsub topic after processing current input in beam streaming pipeline

Posted by Anjana Pydi <an...@bahwancybertek.com>.
Hi Robert,

Thanks for the reply. 'get_responses' method return a list of dictionaries and 'send_to_output' method takes each element of the list and simultaneously posts it to an API end point.

My question is I just need to send a signal(write a message 'process completed' ) to pubsub topic2 only after completion of send_to_output step (i.e. after processing entire list from before step).

If I add it after last step like below, I suspect it would write message for every element of the list. Is there any way that I can do it only once at the end?

beam.Map(lambda output: send_to_output(output)) | 'process completed' >> beam.io.WriteStringsToPubSub(topic=known_args.output_topic)

Regards,
Anjana
________________________________________
From: Robert Bradshaw [robertwb@google.com]
Sent: Thursday, September 19, 2019 2:31 PM
To: user
Cc: Richard Amrith Lourdu
Subject: Re: Publish message to pubsub topic after processing current input in beam streaming pipeline

On Thu, Sep 19, 2019 at 11:05 AM Anjana Pydi
<an...@bahwancybertek.com> wrote:
>
> Hi ,
>
> I have a beam streaming pipeline which reads data from pubsub topic, use it to call an API and get responses, apply some transformations on the obtained responses and writes to output sinks.
>
> Now, I need to add logic to write a 'process completed' message to another pubsub topic once after the process gets finished. Can some one please provide your thoughts on how can I add it.
>
> I actually want to achieve this:
>
> topic1 (data)-> triggers pipeline and writes complete message at end-> topic2 (complete msg)

Your code below looks fine so far. I'm assuming your send_to_output
function produces the message that you want to send to topic2, right?
(BTW, you can write beam.Map(send_to_topic) rather than having to
write beam.Map(lambda output: send_to_output(output)).)

In that case, you just need to add

    data | beam.io.WriteStringsToPubSub(topic2)

> When topic1 sees message on topic2, it again posts new data to topic1
>
> Below is the code sample:
>
>   pubsub_message = p | 'Read From Pubsub' >> beam.io.ReadStringsFromPubSub(topic=known_args.input_topic) | 'split and add' >> beam.ParDo(split_item)
>
>   data = pubsub_message | 'API call' >> beam.FlatMap(lambda x: get_responses(x[0],
>                 datetime.strptime(x[1], "%Y-%m-%d %H:%M:%S"),
>                 datetime.strptime(x[2], "%Y-%m-%d %H:%M:%S"))) | 'WriteOutput' >> beam.Map(lambda output: send_to_output(output))
>
> Thanks,
> Anjana
>
>
> ----------------------------------------------------------------------------------------------------------------------- The information contained in this communication is intended solely for the use of the individual or entity to whom it is addressed and others authorized to receive it. It may contain confidential or legally privileged information. If you are not the intended recipient you are hereby notified that any disclosure, copying, distribution or taking any action in reliance on the contents of this information is strictly prohibited and may be unlawful. If you are not the intended recipient, please notify us immediately by responding to this email and then delete it from your system. Bahwan Cybertek is neither liable for the proper and complete transmission of the information contained in this communication nor for any delay in its receipt.
----------------------------------------------------------------------------------------------------------------------- The information contained in this communication is intended solely for the use of the individual or entity to whom it is addressed and others authorized to receive it. It may contain confidential or legally privileged information. If you are not the intended recipient you are hereby notified that any disclosure, copying, distribution or taking any action in reliance on the contents of this information is strictly prohibited and may be unlawful. If you are not the intended recipient, please notify us immediately by responding to this email and then delete it from your system. Bahwan Cybertek is neither liable for the proper and complete transmission of the information contained in this communication nor for any delay in its receipt.

Re: Publish message to pubsub topic after processing current input in beam streaming pipeline

Posted by Robert Bradshaw <ro...@google.com>.
On Thu, Sep 19, 2019 at 11:05 AM Anjana Pydi
<an...@bahwancybertek.com> wrote:
>
> Hi ,
>
> I have a beam streaming pipeline which reads data from pubsub topic, use it to call an API and get responses, apply some transformations on the obtained responses and writes to output sinks.
>
> Now, I need to add logic to write a 'process completed' message to another pubsub topic once after the process gets finished. Can some one please provide your thoughts on how can I add it.
>
> I actually want to achieve this:
>
> topic1 (data)-> triggers pipeline and writes complete message at end-> topic2 (complete msg)

Your code below looks fine so far. I'm assuming your send_to_output
function produces the message that you want to send to topic2, right?
(BTW, you can write beam.Map(send_to_topic) rather than having to
write beam.Map(lambda output: send_to_output(output)).)

In that case, you just need to add

    data | beam.io.WriteStringsToPubSub(topic2)

> When topic1 sees message on topic2, it again posts new data to topic1
>
> Below is the code sample:
>
>   pubsub_message = p | 'Read From Pubsub' >> beam.io.ReadStringsFromPubSub(topic=known_args.input_topic) | 'split and add' >> beam.ParDo(split_item)
>
>   data = pubsub_message | 'API call' >> beam.FlatMap(lambda x: get_responses(x[0],
>                 datetime.strptime(x[1], "%Y-%m-%d %H:%M:%S"),
>                 datetime.strptime(x[2], "%Y-%m-%d %H:%M:%S"))) | 'WriteOutput' >> beam.Map(lambda output: send_to_output(output))
>
> Thanks,
> Anjana
>
>
> ----------------------------------------------------------------------------------------------------------------------- The information contained in this communication is intended solely for the use of the individual or entity to whom it is addressed and others authorized to receive it. It may contain confidential or legally privileged information. If you are not the intended recipient you are hereby notified that any disclosure, copying, distribution or taking any action in reliance on the contents of this information is strictly prohibited and may be unlawful. If you are not the intended recipient, please notify us immediately by responding to this email and then delete it from your system. Bahwan Cybertek is neither liable for the proper and complete transmission of the information contained in this communication nor for any delay in its receipt.