You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by mark <mp...@gmail.com> on 2023/03/15 14:12:05 UTC

IntervalJoin invisibly becomes a regular Join - why?

Hello,
I'm seeing some strange behaviour in Flink SQL where adding a new SELECT
statement causes a previously created Interval Join to be changed into a
regular Join. I'm concerned because the Flink docs make clear that regular
Joins are not safe because their memory usage can grow indefinitely.

I have put a worked example in https://github.com/mnuttall/flink-debug. I
have an interval join,

CREATE TEMPORARY VIEW suspiciousOrders AS
    SELECT s.orderId, s.customer, s.product, s.quantity AS order_quantity,
l.cancel_quantity, l.order_ts AS large_ts, s.ts as small_ts, l.cancel_ts
    FROM smallOrders s JOIN largeCancellations l
    ON s.product = l.product AND s.customer = l.customer
    WHERE s.ts BETWEEN l.cancel_ts - interval '1' day AND l.cancel_ts;

which evaluates to

[13]:IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-86400000, leftUpperBound=0, leftTimeIndex=0,
rightTimeIndex=1], where=[((product = product0) AND (customer = customer0)
AND (ts &gt;= (cancel_ts - 86400000:INTERVAL DAY)) AND (ts &lt;=
cancel_ts))], select=[ts, orderId, customer, product, quantity, order_ts,
cancel_ts, product0, customer0, cancel_quantity])
+- [14]:Calc(select=[orderId, customer, product, quantity AS
order_quantity, cancel_quantity, order_ts AS large_ts, ts AS small_ts,
cancel_ts])
   +- [15]:ConstraintEnforcer[NotNullEnforcer(fields=[order_quantity,
cancel_quantity])]
      +- Sink: Collect table sink

but adding a further temporary view

CREATE TEMPORARY VIEW filteredResults AS
    SELECT * from suspiciousOrders WHERE small_ts > large_ts;

changes the interval join to a regular join,

 [13]:Join(joinType=[InnerJoin], where=[((product = product0) AND (customer
= customer0) AND (ts &gt;= (cancel_ts - 86400000:INTERVAL DAY)) AND (ts
&lt;= cancel_ts) AND (ts &gt; order_ts))], select=[ts, orderId, customer,
product, quantity, order_ts, cancel_ts, product0, customer0,
cancel_quantity], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+- [14]:Calc(select=[orderId, customer, product, quantity AS
order_quantity, cancel_quantity, order_ts AS large_ts, ts AS small_ts,
cancel_ts])
   +- [15]:ConstraintEnforcer[NotNullEnforcer(fields=[order_quantity,
cancel_quantity])]
      +- Sink: Collect table sink

Please can someone explain what's happening here? It looks as though my
(safe) interval join is being converted to an (unsafe) regular join - is
that true?

Many thanks in advance.
Regards,

Mark Nuttall

Re: IntervalJoin invisibly becomes a regular Join - why?

Posted by mark <mp...@gmail.com>.
Is there anything that an author can do to prevent a well-formed Interval
Join from being 'optimised' into a regular, non-interval join by later
queries? Does the Flink runtime make any guarantees that when it alters an
interval join into a regular join, that the safety of the original interval
join is retained?

Many thanks in advance for any insight that anyone can offer.

Regards,
Mark

On Wed, 15 Mar 2023 at 16:02, Leonard Xu <xb...@gmail.com> wrote:

> >
> > CREATE TEMPORARY VIEW filteredResults AS
> >     SELECT * from suspiciousOrders WHERE small_ts > large_ts;
>
> Looks like after added the condition, the final expanded query should not
> match the condition[1] of an interval join that leads to the planner
> recognize it as an interval join. It’s not a bug, interval join is a
> special case of regular join, thus the result would be still correct.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/operators/joining/#interval-join

Re: IntervalJoin invisibly becomes a regular Join - why?

Posted by Leonard Xu <xb...@gmail.com>.
> 
> CREATE TEMPORARY VIEW filteredResults AS
>     SELECT * from suspiciousOrders WHERE small_ts > large_ts;

Looks like after added the condition, the final expanded query should not match the condition[1] of an interval join that leads to the planner recognize it as an interval join. It’s not a bug, interval join is a special case of regular join, thus the result would be still correct.

[1] https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/operators/joining/#interval-join