You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Charles Tan <ct...@gmail.com> on 2023/03/14 17:44:22 UTC

LEFT and FULL interval joins in Flink SQL leads to very out of order outputs

Hi everyone,

I have been playing around with Flink SQL’s interval joins and noticed that
some outputs from unmatched LEFT or FULL joins are arriving much later than
I expected. Take the following query for example:
SELECT * FROM orders o LEFT JOIN shipments s
ON (o.orderID = s.orderID) AND o.rowtime BETWEEN s.rowtime - INTERVAL '1'
HOUR AND s.rowtime + INTERVAL '1' HOUR

I expect any unmatched records from orders to be output once the watermark
for both orders and shipments advances 1+ hours past the order record’s
rowtime. However, I’m noticing that the watermarks actually need to advance
by 2+ hours. When I looked into this further, I found these formulas in the
TimeIntervalJoin class [1]:
minCleanUpInterval = (leftRelativeSize + rightRelativeSize) / 2
long cleanUpTime = rowTime + leftRelativeSize + minCleanUpInterval +
allowedLateness + 1;

In this case, the cleanUpTime equals rowtime + 1h + 1h + 0 + 1, which
matches the 2+ hours I was observing. From the commit history and
documentation, I could not understand why the cleanUpTime is calculated
this way. Why does the minCleanUpInterval exist and why is its value an
average between the left and right relative sizes? I found a similar JIRA
issue opened a few years ago, FLINK-18996 [2]. This issue can negatively
affect the performance of downstream jobs that ingest from this output
because delaying outputs of the interval join can lead to the output stream
being very out of order. A downstream Flink job for example would have to
adjust the allowed latency, or bounded out of orderness, to accommodate for
this delay.

Thanks,
Charles

[1]
https://github.com/apache/flink/blob/ab70dcfa19827febd2c3cdc5cb81e942caa5b2f0/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalJoin.java#L366
[2] https://issues.apache.org/jira/browse/FLINK-18996

Re: LEFT and FULL interval joins in Flink SQL leads to very out of order outputs

Posted by Charles Tan <ct...@gmail.com>.
Hi Flink users,

Last week I sent an email about some very delayed outputs as a result of
running LEFT or FULL interval joins in Flink SQL. I noticed that in a left
join, when a record arrives from the left source but there is no matching
record from the right source, the watermark for both sides needs to advance
much further than I expected before the result is emitted. Looking more
closely at the code, I noticed a formula for cleanUpTime that didn't make
sense to me (cleanUpTime = rowTime + leftRelativeSize + minCleanUpInterval
+ allowedLateness + 1).  I don't understand why there needs to be a
minCleanUpInterval at all and unnecessarily delaying outputs can negatively
impact downstream jobs as they now have to deal with a stream that could be
very out of order. More details in the original email. I wanted to raise
this issue again this week to see if anyone has any context on interval
joins and can explain cleanUpTime and minCleanUpInterval.

Thanks,
Charles

On Tue, Mar 14, 2023 at 1:44 PM Charles Tan <ct...@gmail.com> wrote:

> Hi everyone,
>
> I have been playing around with Flink SQL’s interval joins and noticed
> that some outputs from unmatched LEFT or FULL joins are arriving much later
> than I expected. Take the following query for example:
> SELECT * FROM orders o LEFT JOIN shipments s
> ON (o.orderID = s.orderID) AND o.rowtime BETWEEN s.rowtime - INTERVAL '1'
> HOUR AND s.rowtime + INTERVAL '1' HOUR
>
> I expect any unmatched records from orders to be output once the watermark
> for both orders and shipments advances 1+ hours past the order record’s
> rowtime. However, I’m noticing that the watermarks actually need to advance
> by 2+ hours. When I looked into this further, I found these formulas in the
> TimeIntervalJoin class [1]:
> minCleanUpInterval = (leftRelativeSize + rightRelativeSize) / 2
> long cleanUpTime = rowTime + leftRelativeSize + minCleanUpInterval +
> allowedLateness + 1;
>
> In this case, the cleanUpTime equals rowtime + 1h + 1h + 0 + 1, which
> matches the 2+ hours I was observing. From the commit history and
> documentation, I could not understand why the cleanUpTime is calculated
> this way. Why does the minCleanUpInterval exist and why is its value an
> average between the left and right relative sizes? I found a similar JIRA
> issue opened a few years ago, FLINK-18996 [2]. This issue can negatively
> affect the performance of downstream jobs that ingest from this output
> because delaying outputs of the interval join can lead to the output stream
> being very out of order. A downstream Flink job for example would have to
> adjust the allowed latency, or bounded out of orderness, to accommodate for
> this delay.
>
> Thanks,
> Charles
>
> [1]
> https://github.com/apache/flink/blob/ab70dcfa19827febd2c3cdc5cb81e942caa5b2f0/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalJoin.java#L366
> [2] https://issues.apache.org/jira/browse/FLINK-18996
>