You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vinod Mehra <vm...@lyft.com> on 2020/03/10 18:26:11 UTC

time-windowed joins and tumbling windows

Hi!

We are testing the following 3 way time windowed join to keep the retained
state size small. Using joins for the first time here. It works in unit
tests but we are not able to get expected results in production. We are
still troubleshooting this issue. Can you please help us review this in
case we missed something or our assumptions are wrong?

SELECT o.region_code,
       concat_ws(
         '/',
         CAST(sum(CASE WHEN r.order_id IS NOT NULL AND c.order_id IS
NULL THEN 1 ELSE 0 END) AS VARCHAR),
         CAST(count(1) AS VARCHAR)
       ) AS offer_conversion_5m
  FROM (
        SELECT region_code,
               offer_id,
               rowtime
          FROM event_offer_created
         WHERE ...
        ) o
   LEFT JOIN (
        SELECT offer_id,
               order_id,
               rowtime
          FROM event_order_requested
         WHERE ...
        ) r
     ON o.offer_id = r.offer_id
     AND r.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL '1' hour
   LEFT JOIN (
        SELECT order_id,
               rowtime
          FROM event_order_cancelled
         WHERE ...
        ) c
     ON r.order_id = c.order_id
     AND c.rowtime BETWEEN r.rowtime AND r.rowtime + INTERVAL '1' hour
 GROUP BY
       o.region_code,
       TUMBLE(o.rowtime, INTERVAL '5' minute)


The sequence of events is:

   1. At time X an offer is created (event stream = "*event_offer_created"*)
   2. At time Y that offer is used to create an order (event stream = "
   *event_order_requested*"). Left join because not all offers get used.
   3. At time Z that order is cancelled (event stream = "
   *event_order_cancelled*"). Left join because not all orders get
   cancelled.

"*offer_conversion_5m*" represents: number of converted orders / total
number of offerings" in a 5 minutes bucket. If an order gets cancelled we
don't want to count that. That's why we have [c.order_id IS NULL THEN 1 ELSE
0 END] in the select.

We picked 1 hour time windows because that's the maximum time we expect the
successive events to take for a given record chain.

The outer GROUP BY is to get 5 minute aggregation for each "region". As
expected the watermark lags 2 hour from the current time because of the two
time-window joins above. The IdleStateRetentionTime is not set, so the
expectation is that the state will be retained as per the time window size
and as the records fall off the window the state will be cleaned up. The
aggregated state is expected to be kept around for 5 minutes (GROUP BY).

However, we are unable to see the conversion (offer_created ->
order_requested (without order_cancelled)). '*offer_conversion_5m*' is
always zero although we know the streams contain records that should have
incremented the count. Any idea what could be wrong? Is the state being
dropped too early (5 mins) because of the outer 5 minute tumbling window?

Thanks,
Vinod

Re: time-windowed joins and tumbling windows

Posted by Vinod Mehra <vm...@lyft.com.INVALID>.
I wanted to add that when I used the following the watermark was delayed by
3 hours instead of 2 hours that I would have expected:

AND o.rowtime BETWEEN c.rowtime - INTERVAL '2' hour AND c.rowtime

(time window constraint between o and c: 1st and 3rd table)

Thanks,
Vinod

On Fri, Mar 13, 2020 at 3:56 PM Vinod Mehra <vm...@lyft.com> wrote:

> Thanks Timo for responding back! Answers below:
>
> > 1) Which planner are you using?
>
> We are using Flink 1.8 and using the default planner
> (org.apache.flink.table.calcite.FlinkPlannerImpl)
> from: org.apache.flink:flink-table-planner_2.11:1.8
>
> > 2) How do you create your watermarks?
>
> We are using periodic watermarking and have configured stream time
> characteristics as TimeCharacteristic.EventTime. The watermark assigner
> extracts the timestamp from time attributes from the event and keeps it 5
> seconds behind the maximum timestamp seen in order to allow for stale
> events.
>
> > 3) Did you unit test with only parallelism of 1 or higher?
>
> I tried both 1 and higher values in tests and for all parallelism values
> the unit tests works as expected.
>
> 4) Can you share the output of TableEnvironment.explain() with us?
>
> Attached. Please note that I had obfuscated the query a bit in my original
> post for clarity. I have pasted the actual query along with the plan so
> that you can correlate it.
>
> > Shouldn't c have a rowtime constraint around o instead of r? Such that
> all time-based operations work on o.rowtime?
>
> I have tried both (and some more variations). Got the same results (unit
> tests passes but production execution doesn't join as expected). Here is
> the modified query:
>
> SELECT o.region_code,
>        concat_ws(
>          '/',
>          CAST(sum(CASE WHEN r.order_id IS NOT NULL AND c.order_id IS NULL THEN 1 ELSE 0 END) AS VARCHAR),
>          CAST(count(1) AS VARCHAR)
>        ) AS offer_conversion_5m
>   FROM (
>         SELECT region_code,
>                offer_id,
>                rowtime
>           FROM event_offer_created
>          WHERE ...
>         ) o
>    LEFT JOIN (
>         SELECT offer_id,
>                order_id,
>                rowtime
>           FROM event_order_requested
>          WHERE ...
>         ) r
>      ON o.offer_id = r.offer_id
>      AND o.rowtime BETWEEN r.rowtime - INTERVAL '1' hour AND r.rowtime
>
>    LEFT JOIN (
>         SELECT order_id,
>                rowtime
>           FROM event_order_cancelled
>          WHERE ...
>         ) c
>      ON r.order_id = c.order_id
>      AND o.rowtime BETWEEN c.rowtime - INTERVAL '2' hour AND c.rowtime
>
>  GROUP BY
>        o.region_code,
>        TUMBLE(o.rowtime, INTERVAL '5' minute)
>
>
> We used minus two hours ("c.rowtime - INTERVAL '2' hour")  in the 2nd
> time window because it is from the first table and 3rd one.
>
> -- Vinod
>
> On Fri, Mar 13, 2020 at 6:42 AM Timo Walther <tw...@apache.org> wrote:
>
>> Hi Vinod,
>>
>> I cannot spot any problems in your SQL query.
>>
>> Some questions for clarification:
>> 1) Which planner are you using?
>> 2) How do you create your watermarks?
>> 3) Did you unit test with only parallelism of 1 or higher?
>> 4) Can you share the output of TableEnvironment.explain() with us?
>>
>> Shouldn't c have a rowtime constraint around o instead of r? Such that
>> all time-based operations work on o.rowtime?
>>
>> Regards,
>> Timo
>>
>>
>> On 10.03.20 19:26, Vinod Mehra wrote:
>> > Hi!
>> >
>> > We are testing the following 3 way time windowed join to keep the
>> > retained state size small. Using joins for the first time here. It
>> works
>> > in unit tests but we are not able to get expected results in
>> production.
>> > We are still troubleshooting this issue. Can you please help us review
>> > this in case we missed something or our assumptions are wrong?
>> >
>> > SELECT o.region_code,
>> >         concat_ws(
>> >           '/',
>> >           CAST(sum(CASE WHEN r.order_idIS NOT NULL AND c.order_idIS
>> NULL THEN 1 ELSE 0 END)AS VARCHAR),
>> >           CAST(count(1)AS VARCHAR)
>> >         )AS offer_conversion_5m
>> >    FROM (
>> >          SELECT region_code,
>> >                 offer_id,
>> >                 rowtime
>> >            FROM event_offer_created
>> >           WHERE ...
>> > ) o
>> >     LEFT JOIN (
>> >          SELECT offer_id,
>> >                 order_id,
>> >                 rowtime
>> >            FROM event_order_requested
>> >           WHERE ...
>> > ) r
>> >       ON o.offer_id = r.offer_id
>> >       AND r.rowtimeBETWEEN o.rowtimeAND o.rowtime +INTERVAL '1' hour
>> > LEFT JOIN (
>> >          SELECT order_id,
>> >                 rowtime
>> >            FROM event_order_cancelled
>> >           WHERE ...
>> > )c
>> > ON r.order_id =c.order_id
>> >       AND c.rowtimeBETWEEN r.rowtimeAND r.rowtime +INTERVAL '1' hour
>> > GROUP BY
>> > o.region_code,
>> >         TUMBLE(o.rowtime,INTERVAL '5' minute)
>> >
>> >
>> > The sequence of events is:
>> >
>> >  1. At time X an offer is created (event stream =
>> "*event_offer_created"*)
>> >  2. At time Y that offer is used to create an order (event stream =
>> >     "*event_order_requested*"). Left join because not all offers get
>> used.
>> >  3. At time Z that order is cancelled (event stream =
>> >     "*event_order_cancelled*"). Left join because not all orders get
>> >     cancelled.
>> >
>> > "*offer_conversion_5m*" represents: number of converted orders / total
>> > number of offerings" in a 5 minutes bucket. If an order gets cancelled
>> > we don't want to count that. That's why we have [c.order_id IS NULL
>> THEN
>> > 1 ELSE 0 END] in the select.
>> >
>> > We picked 1 hour time windows because that's the maximum time we expect
>> > the successive events to take for a given record chain.
>> >
>> > The outer GROUP BY is to get 5 minute aggregation for each "region". As
>> > expected the watermark lags 2 hour from the current time because of the
>> > two time-window joins above. The IdleStateRetentionTime is not set, so
>> > the expectation is that the state will be retained as per the time
>> > window size and as the records fall off the window the state will be
>> > cleaned up. The aggregated state is expected to be kept around for 5
>> > minutes (GROUP BY).
>> >
>> > However, we are unable to see the conversion (offer_created ->
>> > order_requested (without order_cancelled)). '*offer_conversion_5m*' is
>> > always zero although we know the streams contain records that should
>> > have incremented the count. Any idea what could be wrong? Is the state
>> > being dropped too early (5 mins) because of the outer 5 minute tumbling
>> > window?
>> >
>> > Thanks,
>> > Vinod
>>
>>

Re: time-windowed joins and tumbling windows

Posted by Vinod Mehra <vm...@lyft.com>.
Thanks Timo for the suggestion! Also apologies for missing your response
last week. I will try to come up with a reproducible test case.

On Wed, Mar 18, 2020 at 9:27 AM Timo Walther <tw...@apache.org> wrote:

> Hi Vinod,
>
> thanks for answering my questions. The == Optimized Logical Plan ==
> looks as expected. However, the == Physical Execution Plan == seems to
> be quite complex. Are you sure that watermarks don't get lost in some of
> those custom operators before entering the SQL part of the pipeline?
>
> I think if there is a bug in the SQL code base, we would need to come up
> with a small table program that reproduces the described problem. Such
> that we can do a remote debugging session.
>
> Maybe you can do this in your local cluster? There are basically two
> runtime operators
>
> org.apache.flink.table.runtime.join.RowTimeBoundedStreamJoin
>
> and
>
> the regular DataStream API windows. I don't expect bugs in DataStream
> API windows, so I would suggest to verify the join operator.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
>
> On 13.03.20 23:56, Vinod Mehra wrote:
> > Thanks Timo for responding back! Answers below:
> >
> >  > 1) Which planner are you using?
> >
> > We are using Flink 1.8 and using the default planner
> > (org.apache.flink.table.calcite.FlinkPlannerImpl)
> > from: org.apache.flink:flink-table-planner_2.11:1.8
> >
> >  > 2) How do you create your watermarks?
> >
> > We are using periodic watermarking and have configured stream time
> > characteristics as TimeCharacteristic.EventTime. The watermark assigner
> > extracts the timestamp from time attributes from the event and keeps it
> > 5 seconds behind the maximum timestamp seen in order to allow for stale
> > events.
> >
> >  > 3) Did you unit test with only parallelism of 1 or higher?
> >
> > I tried both 1 and higher values in tests and for all parallelism values
> > the unit tests works as expected.
> >
> > 4) Can you share the output of TableEnvironment.explain() with us?
> >
> > Attached. Please note that I had obfuscated the query a bit in my
> > original post for clarity. I have pasted the actual query along with the
> > plan so that you can correlate it.
> >
> >  > Shouldn't c have a rowtime constraint around o instead of r? Such
> > that all time-based operations work on o.rowtime?
> >
> > I have tried both (and some more variations). Got the same results (unit
> > tests passes but production execution doesn't join as expected). Here is
> > the modified query:
> >
> > SELECT o.region_code,
> > concat_ws(
> > '/',
> > CAST(sum(CASE WHEN r.order_id IS NOT NULL AND c.order_id IS NULL THEN 1
> > ELSE 0 END) AS VARCHAR),
> > CAST(count(1) AS VARCHAR)
> > ) AS offer_conversion_5m
> > FROM (
> > SELECT region_code,
> > offer_id,
> > rowtime
> > FROM event_offer_created
> > WHERE ...
> > ) o
> > LEFT JOIN (
> > SELECT offer_id,
> > order_id,
> > rowtime
> > FROM event_order_requested
> > WHERE ...
> > ) r
> > ON o.offer_id = r.offer_id
> > AND o.rowtime BETWEEN r.rowtime - INTERVAL '1' hour AND r.rowtime
> >
> > LEFT JOIN (
> > SELECT order_id,
> > rowtime
> > FROM event_order_cancelled
> > WHERE ...
> > ) c
> > ON r.order_id = c.order_id
> > AND o.rowtime BETWEEN c.rowtime - INTERVAL '2' hour AND c.rowtime
> >
> > GROUP BY
> > o.region_code,
> >         TUMBLE(o.rowtime,INTERVAL '5' minute)
> >
> >
> > We used minus two hours ("c.rowtime - INTERVAL '2' hour")  in the 2nd
> > time window because it is from the first table and 3rd one.
> >
> > -- Vinod
> >
> > On Fri, Mar 13, 2020 at 6:42 AM Timo Walther <twalthr@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     Hi Vinod,
> >
> >     I cannot spot any problems in your SQL query.
> >
> >     Some questions for clarification:
> >     1) Which planner are you using?
> >     2) How do you create your watermarks?
> >     3) Did you unit test with only parallelism of 1 or higher?
> >     4) Can you share the output of TableEnvironment.explain() with us?
> >
> >     Shouldn't c have a rowtime constraint around o instead of r? Such
> that
> >     all time-based operations work on o.rowtime?
> >
> >     Regards,
> >     Timo
> >
> >
> >     On 10.03.20 19:26, Vinod Mehra wrote:
> >      > Hi!
> >      >
> >      > We are testing the following 3 way time windowed join to keep the
> >      > retained state size small. Using joins for the first time here.
> >     It works
> >      > in unit tests but we are not able to get expected results in
> >     production.
> >      > We are still troubleshooting this issue. Can you please help us
> >     review
> >      > this in case we missed something or our assumptions are wrong?
> >      >
> >      > SELECT o.region_code,
> >      >         concat_ws(
> >      >           '/',
> >      >           CAST(sum(CASE WHEN r.order_idIS NOT NULL AND
> >     c.order_idIS NULL THEN 1 ELSE 0 END)AS VARCHAR),
> >      >           CAST(count(1)AS VARCHAR)
> >      >         )AS offer_conversion_5m
> >      >    FROM (
> >      >          SELECT region_code,
> >      >                 offer_id,
> >      >                 rowtime
> >      >            FROM event_offer_created
> >      >           WHERE ...
> >      > ) o
> >      >     LEFT JOIN (
> >      >          SELECT offer_id,
> >      >                 order_id,
> >      >                 rowtime
> >      >            FROM event_order_requested
> >      >           WHERE ...
> >      > ) r
> >      >       ON o.offer_id = r.offer_id
> >      >       AND r.rowtimeBETWEEN o.rowtimeAND o.rowtime +INTERVAL '1'
> hour
> >      > LEFT JOIN (
> >      >          SELECT order_id,
> >      >                 rowtime
> >      >            FROM event_order_cancelled
> >      >           WHERE ...
> >      > )c
> >      > ON r.order_id =c.order_id
> >      >       AND c.rowtimeBETWEEN r.rowtimeAND r.rowtime +INTERVAL '1'
> hour
> >      > GROUP BY
> >      > o.region_code,
> >      >         TUMBLE(o.rowtime,INTERVAL '5' minute)
> >      >
> >      >
> >      > The sequence of events is:
> >      >
> >      >  1. At time X an offer is created (event stream =
> >     "*event_offer_created"*)
> >      >  2. At time Y that offer is used to create an order (event stream
> =
> >      >     "*event_order_requested*"). Left join because not all offers
> >     get used.
> >      >  3. At time Z that order is cancelled (event stream =
> >      >     "*event_order_cancelled*"). Left join because not all orders
> get
> >      >     cancelled.
> >      >
> >      > "*offer_conversion_5m*" represents: number of converted orders /
> >     total
> >      > number of offerings" in a 5 minutes bucket. If an order gets
> >     cancelled
> >      > we don't want to count that. That's why we have [c.order_id IS
> >     NULL THEN
> >      > 1 ELSE 0 END] in the select.
> >      >
> >      > We picked 1 hour time windows because that's the maximum time we
> >     expect
> >      > the successive events to take for a given record chain.
> >      >
> >      > The outer GROUP BY is to get 5 minute aggregation for each
> >     "region". As
> >      > expected the watermark lags 2 hour from the current time because
> >     of the
> >      > two time-window joins above. The IdleStateRetentionTime is not
> >     set, so
> >      > the expectation is that the state will be retained as per the time
> >      > window size and as the records fall off the window the state will
> be
> >      > cleaned up. The aggregated state is expected to be kept around
> for 5
> >      > minutes (GROUP BY).
> >      >
> >      > However, we are unable to see the conversion (offer_created ->
> >      > order_requested (without order_cancelled)).
> >     '*offer_conversion_5m*' is
> >      > always zero although we know the streams contain records that
> should
> >      > have incremented the count. Any idea what could be wrong? Is the
> >     state
> >      > being dropped too early (5 mins) because of the outer 5 minute
> >     tumbling
> >      > window?
> >      >
> >      > Thanks,
> >      > Vinod
> >
>
>

Re: time-windowed joins and tumbling windows

Posted by Timo Walther <tw...@apache.org>.
Hi Vinod,

thanks for answering my questions. The == Optimized Logical Plan == 
looks as expected. However, the == Physical Execution Plan == seems to 
be quite complex. Are you sure that watermarks don't get lost in some of 
those custom operators before entering the SQL part of the pipeline?

I think if there is a bug in the SQL code base, we would need to come up 
with a small table program that reproduces the described problem. Such 
that we can do a remote debugging session.

Maybe you can do this in your local cluster? There are basically two 
runtime operators

org.apache.flink.table.runtime.join.RowTimeBoundedStreamJoin

and

the regular DataStream API windows. I don't expect bugs in DataStream 
API windows, so I would suggest to verify the join operator.

I hope this helps.

Regards,
Timo



On 13.03.20 23:56, Vinod Mehra wrote:
> Thanks Timo for responding back! Answers below:
> 
>  > 1) Which planner are you using?
> 
> We are using Flink 1.8 and using the default planner 
> (org.apache.flink.table.calcite.FlinkPlannerImpl) 
> from: org.apache.flink:flink-table-planner_2.11:1.8
> 
>  > 2) How do you create your watermarks?
> 
> We are using periodic watermarking and have configured stream time 
> characteristics as TimeCharacteristic.EventTime. The watermark assigner 
> extracts the timestamp from time attributes from the event and keeps it 
> 5 seconds behind the maximum timestamp seen in order to allow for stale 
> events.
> 
>  > 3) Did you unit test with only parallelism of 1 or higher?
> 
> I tried both 1 and higher values in tests and for all parallelism values 
> the unit tests works as expected.
> 
> 4) Can you share the output of TableEnvironment.explain() with us?
> 
> Attached. Please note that I had obfuscated the query a bit in my 
> original post for clarity. I have pasted the actual query along with the 
> plan so that you can correlate it.
> 
>  > Shouldn't c have a rowtime constraint around o instead of r? Such 
> that all time-based operations work on o.rowtime?
> 
> I have tried both (and some more variations). Got the same results (unit 
> tests passes but production execution doesn't join as expected). Here is 
> the modified query:
> 
> SELECT o.region_code,
> concat_ws(
> '/',
> CAST(sum(CASE WHEN r.order_id IS NOT NULL AND c.order_id IS NULL THEN 1 
> ELSE 0 END) AS VARCHAR),
> CAST(count(1) AS VARCHAR)
> ) AS offer_conversion_5m
> FROM (
> SELECT region_code,
> offer_id,
> rowtime
> FROM event_offer_created
> WHERE ...
> ) o
> LEFT JOIN (
> SELECT offer_id,
> order_id,
> rowtime
> FROM event_order_requested
> WHERE ...
> ) r
> ON o.offer_id = r.offer_id
> AND o.rowtime BETWEEN r.rowtime - INTERVAL '1' hour AND r.rowtime
> 
> LEFT JOIN (
> SELECT order_id,
> rowtime
> FROM event_order_cancelled
> WHERE ...
> ) c
> ON r.order_id = c.order_id
> AND o.rowtime BETWEEN c.rowtime - INTERVAL '2' hour AND c.rowtime
> 
> GROUP BY
> o.region_code,
>         TUMBLE(o.rowtime,INTERVAL '5' minute)
> 
> 
> We used minus two hours ("c.rowtime - INTERVAL '2' hour")  in the 2nd 
> time window because it is from the first table and 3rd one.
> 
> -- Vinod
> 
> On Fri, Mar 13, 2020 at 6:42 AM Timo Walther <twalthr@apache.org 
> <ma...@apache.org>> wrote:
> 
>     Hi Vinod,
> 
>     I cannot spot any problems in your SQL query.
> 
>     Some questions for clarification:
>     1) Which planner are you using?
>     2) How do you create your watermarks?
>     3) Did you unit test with only parallelism of 1 or higher?
>     4) Can you share the output of TableEnvironment.explain() with us?
> 
>     Shouldn't c have a rowtime constraint around o instead of r? Such that
>     all time-based operations work on o.rowtime?
> 
>     Regards,
>     Timo
> 
> 
>     On 10.03.20 19:26, Vinod Mehra wrote:
>      > Hi!
>      >
>      > We are testing the following 3 way time windowed join to keep the
>      > retained state size small. Using joins for the first time here.
>     It works
>      > in unit tests but we are not able to get expected results in
>     production.
>      > We are still troubleshooting this issue. Can you please help us
>     review
>      > this in case we missed something or our assumptions are wrong?
>      >
>      > SELECT o.region_code,
>      >         concat_ws(
>      >           '/',
>      >           CAST(sum(CASE WHEN r.order_idIS NOT NULL AND
>     c.order_idIS NULL THEN 1 ELSE 0 END)AS VARCHAR),
>      >           CAST(count(1)AS VARCHAR)
>      >         )AS offer_conversion_5m
>      >    FROM (
>      >          SELECT region_code,
>      >                 offer_id,
>      >                 rowtime
>      >            FROM event_offer_created
>      >           WHERE ...
>      > ) o
>      >     LEFT JOIN (
>      >          SELECT offer_id,
>      >                 order_id,
>      >                 rowtime
>      >            FROM event_order_requested
>      >           WHERE ...
>      > ) r
>      >       ON o.offer_id = r.offer_id
>      >       AND r.rowtimeBETWEEN o.rowtimeAND o.rowtime +INTERVAL '1' hour
>      > LEFT JOIN (
>      >          SELECT order_id,
>      >                 rowtime
>      >            FROM event_order_cancelled
>      >           WHERE ...
>      > )c
>      > ON r.order_id =c.order_id
>      >       AND c.rowtimeBETWEEN r.rowtimeAND r.rowtime +INTERVAL '1' hour
>      > GROUP BY
>      > o.region_code,
>      >         TUMBLE(o.rowtime,INTERVAL '5' minute)
>      >
>      >
>      > The sequence of events is:
>      >
>      >  1. At time X an offer is created (event stream =
>     "*event_offer_created"*)
>      >  2. At time Y that offer is used to create an order (event stream =
>      >     "*event_order_requested*"). Left join because not all offers
>     get used.
>      >  3. At time Z that order is cancelled (event stream =
>      >     "*event_order_cancelled*"). Left join because not all orders get
>      >     cancelled.
>      >
>      > "*offer_conversion_5m*" represents: number of converted orders /
>     total
>      > number of offerings" in a 5 minutes bucket. If an order gets
>     cancelled
>      > we don't want to count that. That's why we have [c.order_id IS
>     NULL THEN
>      > 1 ELSE 0 END] in the select.
>      >
>      > We picked 1 hour time windows because that's the maximum time we
>     expect
>      > the successive events to take for a given record chain.
>      >
>      > The outer GROUP BY is to get 5 minute aggregation for each
>     "region". As
>      > expected the watermark lags 2 hour from the current time because
>     of the
>      > two time-window joins above. The IdleStateRetentionTime is not
>     set, so
>      > the expectation is that the state will be retained as per the time
>      > window size and as the records fall off the window the state will be
>      > cleaned up. The aggregated state is expected to be kept around for 5
>      > minutes (GROUP BY).
>      >
>      > However, we are unable to see the conversion (offer_created ->
>      > order_requested (without order_cancelled)).
>     '*offer_conversion_5m*' is
>      > always zero although we know the streams contain records that should
>      > have incremented the count. Any idea what could be wrong? Is the
>     state
>      > being dropped too early (5 mins) because of the outer 5 minute
>     tumbling
>      > window?
>      >
>      > Thanks,
>      > Vinod
> 


Re: time-windowed joins and tumbling windows

Posted by Vinod Mehra <vm...@lyft.com>.
I wanted to add that when I used the following the watermark was delayed by
3 hours instead of 2 hours that I would have expected:

AND o.rowtime BETWEEN c.rowtime - INTERVAL '2' hour AND c.rowtime

(time window constraint between o and c: 1st and 3rd table)

Thanks,
Vinod

On Fri, Mar 13, 2020 at 3:56 PM Vinod Mehra <vm...@lyft.com> wrote:

> Thanks Timo for responding back! Answers below:
>
> > 1) Which planner are you using?
>
> We are using Flink 1.8 and using the default planner
> (org.apache.flink.table.calcite.FlinkPlannerImpl)
> from: org.apache.flink:flink-table-planner_2.11:1.8
>
> > 2) How do you create your watermarks?
>
> We are using periodic watermarking and have configured stream time
> characteristics as TimeCharacteristic.EventTime. The watermark assigner
> extracts the timestamp from time attributes from the event and keeps it 5
> seconds behind the maximum timestamp seen in order to allow for stale
> events.
>
> > 3) Did you unit test with only parallelism of 1 or higher?
>
> I tried both 1 and higher values in tests and for all parallelism values
> the unit tests works as expected.
>
> 4) Can you share the output of TableEnvironment.explain() with us?
>
> Attached. Please note that I had obfuscated the query a bit in my original
> post for clarity. I have pasted the actual query along with the plan so
> that you can correlate it.
>
> > Shouldn't c have a rowtime constraint around o instead of r? Such that
> all time-based operations work on o.rowtime?
>
> I have tried both (and some more variations). Got the same results (unit
> tests passes but production execution doesn't join as expected). Here is
> the modified query:
>
> SELECT o.region_code,
>        concat_ws(
>          '/',
>          CAST(sum(CASE WHEN r.order_id IS NOT NULL AND c.order_id IS NULL THEN 1 ELSE 0 END) AS VARCHAR),
>          CAST(count(1) AS VARCHAR)
>        ) AS offer_conversion_5m
>   FROM (
>         SELECT region_code,
>                offer_id,
>                rowtime
>           FROM event_offer_created
>          WHERE ...
>         ) o
>    LEFT JOIN (
>         SELECT offer_id,
>                order_id,
>                rowtime
>           FROM event_order_requested
>          WHERE ...
>         ) r
>      ON o.offer_id = r.offer_id
>      AND o.rowtime BETWEEN r.rowtime - INTERVAL '1' hour AND r.rowtime
>
>    LEFT JOIN (
>         SELECT order_id,
>                rowtime
>           FROM event_order_cancelled
>          WHERE ...
>         ) c
>      ON r.order_id = c.order_id
>      AND o.rowtime BETWEEN c.rowtime - INTERVAL '2' hour AND c.rowtime
>
>  GROUP BY
>        o.region_code,
>        TUMBLE(o.rowtime, INTERVAL '5' minute)
>
>
> We used minus two hours ("c.rowtime - INTERVAL '2' hour")  in the 2nd
> time window because it is from the first table and 3rd one.
>
> -- Vinod
>
> On Fri, Mar 13, 2020 at 6:42 AM Timo Walther <tw...@apache.org> wrote:
>
>> Hi Vinod,
>>
>> I cannot spot any problems in your SQL query.
>>
>> Some questions for clarification:
>> 1) Which planner are you using?
>> 2) How do you create your watermarks?
>> 3) Did you unit test with only parallelism of 1 or higher?
>> 4) Can you share the output of TableEnvironment.explain() with us?
>>
>> Shouldn't c have a rowtime constraint around o instead of r? Such that
>> all time-based operations work on o.rowtime?
>>
>> Regards,
>> Timo
>>
>>
>> On 10.03.20 19:26, Vinod Mehra wrote:
>> > Hi!
>> >
>> > We are testing the following 3 way time windowed join to keep the
>> > retained state size small. Using joins for the first time here. It
>> works
>> > in unit tests but we are not able to get expected results in
>> production.
>> > We are still troubleshooting this issue. Can you please help us review
>> > this in case we missed something or our assumptions are wrong?
>> >
>> > SELECT o.region_code,
>> >         concat_ws(
>> >           '/',
>> >           CAST(sum(CASE WHEN r.order_idIS NOT NULL AND c.order_idIS
>> NULL THEN 1 ELSE 0 END)AS VARCHAR),
>> >           CAST(count(1)AS VARCHAR)
>> >         )AS offer_conversion_5m
>> >    FROM (
>> >          SELECT region_code,
>> >                 offer_id,
>> >                 rowtime
>> >            FROM event_offer_created
>> >           WHERE ...
>> > ) o
>> >     LEFT JOIN (
>> >          SELECT offer_id,
>> >                 order_id,
>> >                 rowtime
>> >            FROM event_order_requested
>> >           WHERE ...
>> > ) r
>> >       ON o.offer_id = r.offer_id
>> >       AND r.rowtimeBETWEEN o.rowtimeAND o.rowtime +INTERVAL '1' hour
>> > LEFT JOIN (
>> >          SELECT order_id,
>> >                 rowtime
>> >            FROM event_order_cancelled
>> >           WHERE ...
>> > )c
>> > ON r.order_id =c.order_id
>> >       AND c.rowtimeBETWEEN r.rowtimeAND r.rowtime +INTERVAL '1' hour
>> > GROUP BY
>> > o.region_code,
>> >         TUMBLE(o.rowtime,INTERVAL '5' minute)
>> >
>> >
>> > The sequence of events is:
>> >
>> >  1. At time X an offer is created (event stream =
>> "*event_offer_created"*)
>> >  2. At time Y that offer is used to create an order (event stream =
>> >     "*event_order_requested*"). Left join because not all offers get
>> used.
>> >  3. At time Z that order is cancelled (event stream =
>> >     "*event_order_cancelled*"). Left join because not all orders get
>> >     cancelled.
>> >
>> > "*offer_conversion_5m*" represents: number of converted orders / total
>> > number of offerings" in a 5 minutes bucket. If an order gets cancelled
>> > we don't want to count that. That's why we have [c.order_id IS NULL
>> THEN
>> > 1 ELSE 0 END] in the select.
>> >
>> > We picked 1 hour time windows because that's the maximum time we expect
>> > the successive events to take for a given record chain.
>> >
>> > The outer GROUP BY is to get 5 minute aggregation for each "region". As
>> > expected the watermark lags 2 hour from the current time because of the
>> > two time-window joins above. The IdleStateRetentionTime is not set, so
>> > the expectation is that the state will be retained as per the time
>> > window size and as the records fall off the window the state will be
>> > cleaned up. The aggregated state is expected to be kept around for 5
>> > minutes (GROUP BY).
>> >
>> > However, we are unable to see the conversion (offer_created ->
>> > order_requested (without order_cancelled)). '*offer_conversion_5m*' is
>> > always zero although we know the streams contain records that should
>> > have incremented the count. Any idea what could be wrong? Is the state
>> > being dropped too early (5 mins) because of the outer 5 minute tumbling
>> > window?
>> >
>> > Thanks,
>> > Vinod
>>
>>

Re: time-windowed joins and tumbling windows

Posted by Vinod Mehra <vm...@lyft.com>.
Thanks Timo for responding back! Answers below:

> 1) Which planner are you using?

We are using Flink 1.8 and using the default planner
(org.apache.flink.table.calcite.FlinkPlannerImpl)
from: org.apache.flink:flink-table-planner_2.11:1.8

> 2) How do you create your watermarks?

We are using periodic watermarking and have configured stream time
characteristics as TimeCharacteristic.EventTime. The watermark assigner
extracts the timestamp from time attributes from the event and keeps it 5
seconds behind the maximum timestamp seen in order to allow for stale
events.

> 3) Did you unit test with only parallelism of 1 or higher?

I tried both 1 and higher values in tests and for all parallelism values
the unit tests works as expected.

4) Can you share the output of TableEnvironment.explain() with us?

Attached. Please note that I had obfuscated the query a bit in my original
post for clarity. I have pasted the actual query along with the plan so
that you can correlate it.

> Shouldn't c have a rowtime constraint around o instead of r? Such that
all time-based operations work on o.rowtime?

I have tried both (and some more variations). Got the same results (unit
tests passes but production execution doesn't join as expected). Here is
the modified query:

SELECT o.region_code,
       concat_ws(
         '/',
         CAST(sum(CASE WHEN r.order_id IS NOT NULL AND c.order_id IS
NULL THEN 1 ELSE 0 END) AS VARCHAR),
         CAST(count(1) AS VARCHAR)
       ) AS offer_conversion_5m
  FROM (
        SELECT region_code,
               offer_id,
               rowtime
          FROM event_offer_created
         WHERE ...
        ) o
   LEFT JOIN (
        SELECT offer_id,
               order_id,
               rowtime
          FROM event_order_requested
         WHERE ...
        ) r
     ON o.offer_id = r.offer_id
     AND o.rowtime BETWEEN r.rowtime - INTERVAL '1' hour AND r.rowtime

   LEFT JOIN (
        SELECT order_id,
               rowtime
          FROM event_order_cancelled
         WHERE ...
        ) c
     ON r.order_id = c.order_id
     AND o.rowtime BETWEEN c.rowtime - INTERVAL '2' hour AND c.rowtime

 GROUP BY
       o.region_code,
       TUMBLE(o.rowtime, INTERVAL '5' minute)


We used minus two hours ("c.rowtime - INTERVAL '2' hour")  in the 2nd time
window because it is from the first table and 3rd one.

-- Vinod

On Fri, Mar 13, 2020 at 6:42 AM Timo Walther <tw...@apache.org> wrote:

> Hi Vinod,
>
> I cannot spot any problems in your SQL query.
>
> Some questions for clarification:
> 1) Which planner are you using?
> 2) How do you create your watermarks?
> 3) Did you unit test with only parallelism of 1 or higher?
> 4) Can you share the output of TableEnvironment.explain() with us?
>
> Shouldn't c have a rowtime constraint around o instead of r? Such that
> all time-based operations work on o.rowtime?
>
> Regards,
> Timo
>
>
> On 10.03.20 19:26, Vinod Mehra wrote:
> > Hi!
> >
> > We are testing the following 3 way time windowed join to keep the
> > retained state size small. Using joins for the first time here. It works
> > in unit tests but we are not able to get expected results in production.
> > We are still troubleshooting this issue. Can you please help us review
> > this in case we missed something or our assumptions are wrong?
> >
> > SELECT o.region_code,
> >         concat_ws(
> >           '/',
> >           CAST(sum(CASE WHEN r.order_idIS NOT NULL AND c.order_idIS NULL
> THEN 1 ELSE 0 END)AS VARCHAR),
> >           CAST(count(1)AS VARCHAR)
> >         )AS offer_conversion_5m
> >    FROM (
> >          SELECT region_code,
> >                 offer_id,
> >                 rowtime
> >            FROM event_offer_created
> >           WHERE ...
> > ) o
> >     LEFT JOIN (
> >          SELECT offer_id,
> >                 order_id,
> >                 rowtime
> >            FROM event_order_requested
> >           WHERE ...
> > ) r
> >       ON o.offer_id = r.offer_id
> >       AND r.rowtimeBETWEEN o.rowtimeAND o.rowtime +INTERVAL '1' hour
> > LEFT JOIN (
> >          SELECT order_id,
> >                 rowtime
> >            FROM event_order_cancelled
> >           WHERE ...
> > )c
> > ON r.order_id =c.order_id
> >       AND c.rowtimeBETWEEN r.rowtimeAND r.rowtime +INTERVAL '1' hour
> > GROUP BY
> > o.region_code,
> >         TUMBLE(o.rowtime,INTERVAL '5' minute)
> >
> >
> > The sequence of events is:
> >
> >  1. At time X an offer is created (event stream =
> "*event_offer_created"*)
> >  2. At time Y that offer is used to create an order (event stream =
> >     "*event_order_requested*"). Left join because not all offers get
> used.
> >  3. At time Z that order is cancelled (event stream =
> >     "*event_order_cancelled*"). Left join because not all orders get
> >     cancelled.
> >
> > "*offer_conversion_5m*" represents: number of converted orders / total
> > number of offerings" in a 5 minutes bucket. If an order gets cancelled
> > we don't want to count that. That's why we have [c.order_id IS NULL THEN
> > 1 ELSE 0 END] in the select.
> >
> > We picked 1 hour time windows because that's the maximum time we expect
> > the successive events to take for a given record chain.
> >
> > The outer GROUP BY is to get 5 minute aggregation for each "region". As
> > expected the watermark lags 2 hour from the current time because of the
> > two time-window joins above. The IdleStateRetentionTime is not set, so
> > the expectation is that the state will be retained as per the time
> > window size and as the records fall off the window the state will be
> > cleaned up. The aggregated state is expected to be kept around for 5
> > minutes (GROUP BY).
> >
> > However, we are unable to see the conversion (offer_created ->
> > order_requested (without order_cancelled)). '*offer_conversion_5m*' is
> > always zero although we know the streams contain records that should
> > have incremented the count. Any idea what could be wrong? Is the state
> > being dropped too early (5 mins) because of the outer 5 minute tumbling
> > window?
> >
> > Thanks,
> > Vinod
>
>

Re: time-windowed joins and tumbling windows

Posted by Timo Walther <tw...@apache.org>.
Hi Vinod,

I cannot spot any problems in your SQL query.

Some questions for clarification:
1) Which planner are you using?
2) How do you create your watermarks?
3) Did you unit test with only parallelism of 1 or higher?
4) Can you share the output of TableEnvironment.explain() with us?

Shouldn't c have a rowtime constraint around o instead of r? Such that 
all time-based operations work on o.rowtime?

Regards,
Timo


On 10.03.20 19:26, Vinod Mehra wrote:
> Hi!
> 
> We are testing the following 3 way time windowed join to keep the 
> retained state size small. Using joins for the first time here. It works 
> in unit tests but we are not able to get expected results in production. 
> We are still troubleshooting this issue. Can you please help us review 
> this in case we missed something or our assumptions are wrong?
> 
> SELECT o.region_code,
>         concat_ws(
>           '/',
>           CAST(sum(CASE WHEN r.order_idIS NOT NULL AND c.order_idIS NULL THEN 1 ELSE 0 END)AS VARCHAR),
>           CAST(count(1)AS VARCHAR)
>         )AS offer_conversion_5m
>    FROM (
>          SELECT region_code,
>                 offer_id,
>                 rowtime
>            FROM event_offer_created
>           WHERE ...
> ) o
>     LEFT JOIN (
>          SELECT offer_id,
>                 order_id,
>                 rowtime
>            FROM event_order_requested
>           WHERE ...
> ) r
>       ON o.offer_id = r.offer_id
>       AND r.rowtimeBETWEEN o.rowtimeAND o.rowtime +INTERVAL '1' hour
> LEFT JOIN (
>          SELECT order_id,
>                 rowtime
>            FROM event_order_cancelled
>           WHERE ...
> )c
> ON r.order_id =c.order_id
>       AND c.rowtimeBETWEEN r.rowtimeAND r.rowtime +INTERVAL '1' hour
> GROUP BY
> o.region_code,
>         TUMBLE(o.rowtime,INTERVAL '5' minute)
> 
> 
> The sequence of events is:
> 
>  1. At time X an offer is created (event stream = "*event_offer_created"*)
>  2. At time Y that offer is used to create an order (event stream =
>     "*event_order_requested*"). Left join because not all offers get used.
>  3. At time Z that order is cancelled (event stream =
>     "*event_order_cancelled*"). Left join because not all orders get
>     cancelled.
> 
> "*offer_conversion_5m*" represents: number of converted orders / total 
> number of offerings" in a 5 minutes bucket. If an order gets cancelled 
> we don't want to count that. That's why we have [c.order_id IS NULL THEN 
> 1 ELSE 0 END] in the select.
> 
> We picked 1 hour time windows because that's the maximum time we expect 
> the successive events to take for a given record chain.
> 
> The outer GROUP BY is to get 5 minute aggregation for each "region". As 
> expected the watermark lags 2 hour from the current time because of the 
> two time-window joins above. The IdleStateRetentionTime is not set, so 
> the expectation is that the state will be retained as per the time 
> window size and as the records fall off the window the state will be 
> cleaned up. The aggregated state is expected to be kept around for 5 
> minutes (GROUP BY).
> 
> However, we are unable to see the conversion (offer_created -> 
> order_requested (without order_cancelled)). '*offer_conversion_5m*' is 
> always zero although we know the streams contain records that should 
> have incremented the count. Any idea what could be wrong? Is the state 
> being dropped too early (5 mins) because of the outer 5 minute tumbling 
> window?
> 
> Thanks,
> Vinod