You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Kenneth Knowles <ke...@apache.org> on 2018/12/11 04:14:16 UTC

Re: Joining bounded and unbounded data not working using non-global window

Hi Shrijit,

+dev@beam.apache.org to fact check my memory here and re-raise the issue

You have hit a known usability problem. It has been discussed but not
addressed due to focusing on more holistic fixes, and also backwards
compatibility concerns... if someone was counting on the very unfortunate
current behavior.

You have this triggering set up:

    .triggering(
        AfterProcessingTime
            .pastFirstElementInPane()
            .plusDelayOf(Duration.standardSeconds(20)))

On our original design for triggers, this will fire only one time and then
"close" the window. I believe we agreed on the dev list that it should
actually also emit the remaining data at window expiration time.

What you want is probably this:

    .triggering(
        Repeatedly.forever(
            AfterProcessingTime
                .pastFirstElementInPane()
                .plusDelayOf(Duration.standardSeconds(20))))

But the recommended approach is to always use the AfterWatermark trigger
with early/late firings, so it would look like this:

    .triggering(
        AfterWatermark.pastEndOfWindow()
            .withEarlyFirings(
                AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(20))))))

This will emit a designated "on time" output as well as early firings
according to the latency you have set up.

If this does not solve your problem, can you say more about what is going
wrong?

Kenn

On Mon, Dec 10, 2018 at 7:41 PM Reza Ardeshir Rokni <ra...@gmail.com>
wrote:

> Hi,
>
> A couple of thoughts;
>
> 1- If the amount of data in Hbase that you need to join with is small and
> does not change, could you use a Side Input? If it does change you could
> try making use of pattern slowly changing lookup cache (ref below).
> 2- If the amount of data is large, would a direct hbase client call from a
> DoFn work to get the data you need to enrich the element? Similar to
> pattern Calling external service, (ref below)
>
> Ref :
> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>
> Cheers
>
> Reza
>
> On Tue, 11 Dec 2018 at 00:12, Shrijit Pillai <pi...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I'm trying to join an unbounded data source and a bounded one using
>> CoGroupByKey. The bounded data source is HBase and the unbounded one is
>> Kafka.
>>
>> The co-group works if the global window strategy is used but not with a
>> non-global one. I've tried the accumulatingFiredPanes mode(using the
>> non-global window) but that didn't help either. Am I missing something to
>> make the co-group work using a non-global window like FixedWindows or is
>> the GlobalWindow the only way to go about it? I'm using beam 2.8.0
>>
>> Here's the code snippet:
>> https://gist.github.com/shrijitpillai/5e9e642f92dd23b3b7bd60e3ce8056bb
>>
>> Thanks
>> Shrijit
>>
>

Re: Joining bounded and unbounded data not working using non-global window

Posted by Kenneth Knowles <ke...@apache.org>.
Hi Shrijit,

The reason the HBase data has that timestamp is that it is the minimum
timestamp (we call it "-infinity" or the "beginning of time"). Bounded data
sources read all their data at this timestamp and you can use
WithTimestamps transform to shift the timestamps later.

The reason for this is that it is always OK to make a timestamp later, but
usually not OK to make it earlier. So starting at the minimum allows you to
set it to any time you want. Since it is a bounded source, the watermark is
not so useful so it is usually kept at -infinity until all data is output,
then instantly moves to +infinity, the maximum timestamp.

One minor thing that will save you about ~1 line of code is that you can
use Window.configure().triggering(...) instead of Window.into(GlobalWindow)
since all data from a source is already in the global window.

Kenn

On Wed, Dec 12, 2018 at 11:00 AM Shrijit Pillai <pi...@gmail.com>
wrote:

> Thanks Reza and Kenn for your suggestions.
>
> Kenn, I replaced the my existing trigger with the AfterWatermark with
> early firings trigger you suggested. It did not solve my problem.
>
> The timestamp on the HBase entries showed -290308–12–21T19:59:05.225Z and
> the Kafka entries showed the current instant. So I applied the
> GlobalWindows after reading from HBase and set the timestamp using
> WithTimestamps on the hbase entries and then applied the FixedWindow on the
> timestamped transform using the trigger you suggested.
>
> And then the join worked.
>
> Here's the updated code:-
> https://gist.github.com/shrijitpillai/55c85fda62c84c514b9a24cdb1a81671
>
> On 2018/12/11 04:14:16, Kenneth Knowles <ke...@apache.org> wrote:
> > Hi Shrijit,
> >
> > +dev@beam.apache.org to fact check my memory here and re-raise the issue
> >
> > You have hit a known usability problem. It has been discussed but not
> > addressed due to focusing on more holistic fixes, and also backwards
> > compatibility concerns... if someone was counting on the very unfortunate
> > current behavior.
> >
> > You have this triggering set up:
> >
> >     .triggering(
> >         AfterProcessingTime
> >             .pastFirstElementInPane()
> >             .plusDelayOf(Duration.standardSeconds(20)))
> >
> > On our original design for triggers, this will fire only one time and
> then
> > "close" the window. I believe we agreed on the dev list that it should
> > actually also emit the remaining data at window expiration time.
> >
> > What you want is probably this:
> >
> >     .triggering(
> >         Repeatedly.forever(
> >             AfterProcessingTime
> >                 .pastFirstElementInPane()
> >                 .plusDelayOf(Duration.standardSeconds(20))))
> >
> > But the recommended approach is to always use the AfterWatermark trigger
> > with early/late firings, so it would look like this:
> >
> >     .triggering(
> >         AfterWatermark.pastEndOfWindow()
> >             .withEarlyFirings(
> >                 AfterProcessingTime
> >                     .pastFirstElementInPane()
> >                     .plusDelayOf(Duration.standardSeconds(20))))))
> >
> > This will emit a designated "on time" output as well as early firings
> > according to the latency you have set up.
> >
> > If this does not solve your problem, can you say more about what is going
> > wrong?
> >
> > Kenn
> >
> > On Mon, Dec 10, 2018 at 7:41 PM Reza Ardeshir Rokni <ra...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > A couple of thoughts;
> > >
> > > 1- If the amount of data in Hbase that you need to join with is small
> and
> > > does not change, could you use a Side Input? If it does change you
> could
> > > try making use of pattern slowly changing lookup cache (ref below).
> > > 2- If the amount of data is large, would a direct hbase client call
> from a
> > > DoFn work to get the data you need to enrich the element? Similar to
> > > pattern Calling external service, (ref below)
> > >
> > > Ref :
> > >
> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
> > >
> > > Cheers
> > >
> > > Reza
> > >
> > > On Tue, 11 Dec 2018 at 00:12, Shrijit Pillai <
> pillaishrijit5022@gmail.com>
> > > wrote:
> > >
> > >> Hello,
> > >>
> > >> I'm trying to join an unbounded data source and a bounded one using
> > >> CoGroupByKey. The bounded data source is HBase and the unbounded one
> is
> > >> Kafka.
> > >>
> > >> The co-group works if the global window strategy is used but not with
> a
> > >> non-global one. I've tried the accumulatingFiredPanes mode(using the
> > >> non-global window) but that didn't help either. Am I missing
> something to
> > >> make the co-group work using a non-global window like FixedWindows or
> is
> > >> the GlobalWindow the only way to go about it? I'm using beam 2.8.0
> > >>
> > >> Here's the code snippet:
> > >>
> https://gist.github.com/shrijitpillai/5e9e642f92dd23b3b7bd60e3ce8056bb
> > >>
> > >> Thanks
> > >> Shrijit
> > >>
> > >
> >
>

Re: Joining bounded and unbounded data not working using non-global window

Posted by Shrijit Pillai <pi...@gmail.com>.
Thanks Reza and Kenn for your suggestions.

Kenn, I replaced the my existing trigger with the AfterWatermark with early firings trigger you suggested. It did not solve my problem.

The timestamp on the HBase entries showed -290308–12–21T19:59:05.225Z and the Kafka entries showed the current instant. So I applied the GlobalWindows after reading from HBase and set the timestamp using WithTimestamps on the hbase entries and then applied the FixedWindow on the timestamped transform using the trigger you suggested.

And then the join worked.

Here's the updated code:-
https://gist.github.com/shrijitpillai/55c85fda62c84c514b9a24cdb1a81671

On 2018/12/11 04:14:16, Kenneth Knowles <ke...@apache.org> wrote: 
> Hi Shrijit,
> 
> +dev@beam.apache.org to fact check my memory here and re-raise the issue
> 
> You have hit a known usability problem. It has been discussed but not
> addressed due to focusing on more holistic fixes, and also backwards
> compatibility concerns... if someone was counting on the very unfortunate
> current behavior.
> 
> You have this triggering set up:
> 
>     .triggering(
>         AfterProcessingTime
>             .pastFirstElementInPane()
>             .plusDelayOf(Duration.standardSeconds(20)))
> 
> On our original design for triggers, this will fire only one time and then
> "close" the window. I believe we agreed on the dev list that it should
> actually also emit the remaining data at window expiration time.
> 
> What you want is probably this:
> 
>     .triggering(
>         Repeatedly.forever(
>             AfterProcessingTime
>                 .pastFirstElementInPane()
>                 .plusDelayOf(Duration.standardSeconds(20))))
> 
> But the recommended approach is to always use the AfterWatermark trigger
> with early/late firings, so it would look like this:
> 
>     .triggering(
>         AfterWatermark.pastEndOfWindow()
>             .withEarlyFirings(
>                 AfterProcessingTime
>                     .pastFirstElementInPane()
>                     .plusDelayOf(Duration.standardSeconds(20))))))
> 
> This will emit a designated "on time" output as well as early firings
> according to the latency you have set up.
> 
> If this does not solve your problem, can you say more about what is going
> wrong?
> 
> Kenn
> 
> On Mon, Dec 10, 2018 at 7:41 PM Reza Ardeshir Rokni <ra...@gmail.com>
> wrote:
> 
> > Hi,
> >
> > A couple of thoughts;
> >
> > 1- If the amount of data in Hbase that you need to join with is small and
> > does not change, could you use a Side Input? If it does change you could
> > try making use of pattern slowly changing lookup cache (ref below).
> > 2- If the amount of data is large, would a direct hbase client call from a
> > DoFn work to get the data you need to enrich the element? Similar to
> > pattern Calling external service, (ref below)
> >
> > Ref :
> > https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
> >
> > Cheers
> >
> > Reza
> >
> > On Tue, 11 Dec 2018 at 00:12, Shrijit Pillai <pi...@gmail.com>
> > wrote:
> >
> >> Hello,
> >>
> >> I'm trying to join an unbounded data source and a bounded one using
> >> CoGroupByKey. The bounded data source is HBase and the unbounded one is
> >> Kafka.
> >>
> >> The co-group works if the global window strategy is used but not with a
> >> non-global one. I've tried the accumulatingFiredPanes mode(using the
> >> non-global window) but that didn't help either. Am I missing something to
> >> make the co-group work using a non-global window like FixedWindows or is
> >> the GlobalWindow the only way to go about it? I'm using beam 2.8.0
> >>
> >> Here's the code snippet:
> >> https://gist.github.com/shrijitpillai/5e9e642f92dd23b3b7bd60e3ce8056bb
> >>
> >> Thanks
> >> Shrijit
> >>
> >
>