You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Benoit Hanotte <b....@criteo.com> on 2020/01/14 18:18:08 UTC

Re: [BULK]Re: Incorrect Physical Plan when unioning two different windows, giving incorrect SQL query results

Hello Till,
thanks for your reply!
I have been able to debug the issue and reported it in https://issues.apache.org/jira/browse/FLINK-15577.
It seems the old planner does not add the window specs to the Logical nodes' digests, leading the HepPlanner to consider the aggregations to be equivalent, when they are not because they use different time windows. I explained the issue more in details in the ticket above, and have submitted a PR earlier today: https://github.com/apache/flink/pull/10854.
[https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<https://github.com/apache/flink/pull/10854>
[FLINK-15577][table-planner] Fix similar aggregations with different windows being considered the same by BenoitHanotte * Pull Request #10854 * apache/flink<https://github.com/apache/flink/pull/10854>
What is the purpose of the change The RelNode&#39;s digest is used by the Calcite HepPlanner to avoid adding duplicate vertices to the graph. If an equivalent vertex was already present in the grap...
github.com
Best,
Benoit
________________________________
From: Till Rohrmann <tr...@apache.org>
Sent: Tuesday, January 14, 2020 7:13 PM
To: Benoit Hanotte <b....@criteo.com>
Cc: user@flink.apache.org <us...@flink.apache.org>; Jingsong Li <ji...@gmail.com>; twalthr@apache.org <tw...@apache.org>
Subject: [BULK]Re: Incorrect Physical Plan when unioning two different windows, giving incorrect SQL query results

Hi Benoit,

thanks for reporting this issue. Since I'm not too familiar with the SQL component I've pulled in Timo and Jingsong who know much better what could be wrong than I do.

Cheers,
Till

On Mon, Jan 13, 2020 at 11:48 AM Benoit Hanotte <b....@criteo.com>> wrote:
Hello,

We seem to be facing an issue with Flink where the physical plan after planner optimization is not correct.
I have been able to reproduce the issue in the following "simplified" use case (it doesn't seem to happen in trivial cases):

  1.  We open 2 event streams ("clicks" and "displays")
  2.  We compute the click rate (ctr) over 2 hours and 6 hours sliding windows.
  3.  We then union to output one row per hour with the max value between the values computed over 2 and 6hrs.

You can find SQL query below [1].
After activating the debug logging for calcite, I can see that the original logical plan is valid: the top-level UNION is between two LogicalProjects, for the 2hr and 6hrs HOP windows [2].
However, in the final Physical plan, we can see that both sides of the UNION now have 6hrs HOP windows instead of one window over 2hr and one over 6hr [3].

I pushed a commit to my fork to reproduce the issue: https://github.com/BenoitHanotte/flink/commit/3d388f153b44bb35b57b8400407ff24a2e91661f<https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FBenoitHanotte%2Fflink%2Fcommit%2F3d388f153b44bb35b57b8400407ff24a2e91661f&data=02%7C01%7Cb.hanotte%40criteo.com%7C8d67980aeae743d0f35108d7991d781d%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637146224161863515&sdata=IVYMkxnS8HKu6rBQ5WAAFarkz37PMyqOs6WlNA4vyDg%3D&reserved=0>, unfortunately simplifying the query seems to make the issue disappear.

Is there anything obvious I am missing, or do you have any pointer of what could trigger this issue? I looked at the different rules applied by the planner [4], but, as I am not familiar with them, I haven't yet been able to find the root cause.

Thanks a lot for your help!

Benoit Hanotte

********************************* [1] SQL query *********************************

    WITH displays AS (
        SELECT `timestamp`, 1 as nb_displays, 0 as nb_clicks FROM my_catalog.my_db.display
    ),

    clicks AS (
        SELECT `timestamp`, 0 as nb_displays, 1 as nb_clicks FROM my_catalog.my_db.click
    ),

    counts_2h AS (
        SELECT
            SUM(nb_clicks) / SUM(nb_displays) as ctr,
            HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as `timestamp`
        FROM (
            (SELECT * FROM displays)
            UNION ALL
            (SELECT * FROM clicks)
        ) t
        GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
    ),

    counts_6h AS (
        SELECT
            SUM(nb_clicks) / SUM(nb_displays) as ctr,
            HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR) as `timestamp`
        FROM (
            (SELECT * FROM displays)
            UNION ALL
            (SELECT * FROM clicks)
        ) t
        GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR)
    )

    SELECT
        TUMBLE_END(`timestamp`, INTERVAL '1' HOUR) as `timestamp`,
        MAX(ctr)
    FROM (
        (SELECT * FROM counts_6h)
        UNION ALL
        (SELECT * FROM counts_2h)
    ) t
    GROUP BY TUMBLE(`timestamp`, INTERVAL '1' HOUR)


********************* [2] Logical plan (before optimization) ***********************

    LogicalProject(timestamp=[TUMBLE_END($0)], EXPR$1=[$1])
      LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)])
        LogicalProject($f0=[TUMBLE($1, 3600000:INTERVAL HOUR)], ctr=[$0])
          LogicalUnion(all=[true])
            LogicalProject(ctr=[$0], timestamp=[$1])
              LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
                LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[SUM($2)])
                  LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 21600000:INTERVAL HOUR)], nb_clicks=[$2], nb_displays=[$1])
                    LogicalProject(timestamp=[$0], nb_displays=[0], nb_clicks=[1])
                      LogicalTableScan(table=[[my_catalog, my_db, click]])
            LogicalProject(ctr=[$0], timestamp=[$1])
              LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
                LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[SUM($2)])
                  LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)], nb_clicks=[$2], nb_displays=[$1])
                    LogicalProject(timestamp=[$0], nb_displays=[1], nb_clicks=[0])
                      LogicalTableScan(table=[[my_catalog, my_db, display]])


****************** [3] Resulting physical plan (after optimization) ********************

    DataStreamCalc(select=[w$end AS timestamp, CAST(EXPR$1) AS EXPR$1]): rowcount = 400.0, cumulative cost = {3200.0 rows, 3600.0 cpu, 4800.0 io}, id = 556
      DataStreamGroupWindowAggregate(window=[TumblingGroupWindow('w$, 'timestamp, 3600000.millis)], select=[MAX(ctr) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 400.0, cumulative cost = {2800.0 rows, 3200.0 cpu, 4800.0 io}, id = 555
        DataStreamUnion(all=[true], union all=[ctr, timestamp]): rowcount = 400.0, cumulative cost = {2400.0 rows, 2800.0 cpu, 4800.0 io}, id = 554
          DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 cpu, 2400.0 io}, id = 548
            DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 7200000.millis, 3600000.millis)], select=[SUM(nb_clicks) AS $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0, cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 547
              DataStreamUnion(all=[true], union all=[timestamp, nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, 600.0 cpu, 2400.0 io}, id = 546
                DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 544
                  StreamTableSourceScan(table=[[my_catalog, my_db, display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
                DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 545
                  StreamTableSourceScan(table=[[my_catalog, my_db, click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542
          DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 cpu, 2400.0 io}, id = 553
            DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 7200000.millis, 3600000.millis)], select=[SUM(nb_clicks) AS $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0, cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 552
              DataStreamUnion(all=[true], union all=[timestamp, nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, 600.0 cpu, 2400.0 io}, id = 551
                DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 549
                  StreamTableSourceScan(table=[[my_catalog, my_db, display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
                DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 550
                  StreamTableSourceScan(table=[[my_catalog, my_db, click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542


Re: [BULK]Re: Incorrect Physical Plan when unioning two different windows, giving incorrect SQL query results

Posted by Till Rohrmann <tr...@apache.org>.
Great, thanks a lot for looking into the problem and fixing it. I assume
that your PR will be merged very soon.

Cheers,
Till

On Tue, Jan 14, 2020 at 7:18 PM Benoit Hanotte <b....@criteo.com> wrote:

> Hello Till,
> thanks for your reply!
> I have been able to debug the issue and reported it in
> https://issues.apache.org/jira/browse/FLINK-15577.
> It seems the old planner does not add the window specs to the Logical
> nodes' digests, leading the HepPlanner to consider the aggregations to be
> equivalent, when they are not because they use different time windows. I
> explained the issue more in details in the ticket above, and have submitted
> a PR earlier today: https://github.com/apache/flink/pull/10854.
> <https://github.com/apache/flink/pull/10854>
> [FLINK-15577][table-planner] Fix similar aggregations with different
> windows being considered the same by BenoitHanotte · Pull Request #10854 ·
> apache/flink <https://github.com/apache/flink/pull/10854>
> What is the purpose of the change The RelNode&#39;s digest is used by the
> Calcite HepPlanner to avoid adding duplicate vertices to the graph. If an
> equivalent vertex was already present in the grap...
> github.com
> Best,
> Benoit
> ------------------------------
> *From:* Till Rohrmann <tr...@apache.org>
> *Sent:* Tuesday, January 14, 2020 7:13 PM
> *To:* Benoit Hanotte <b....@criteo.com>
> *Cc:* user@flink.apache.org <us...@flink.apache.org>; Jingsong Li <
> jingsonglee0@gmail.com>; twalthr@apache.org <tw...@apache.org>
> *Subject:* [BULK]Re: Incorrect Physical Plan when unioning two different
> windows, giving incorrect SQL query results
>
> Hi Benoit,
>
> thanks for reporting this issue. Since I'm not too familiar with the SQL
> component I've pulled in Timo and Jingsong who know much better what could
> be wrong than I do.
>
> Cheers,
> Till
>
> On Mon, Jan 13, 2020 at 11:48 AM Benoit Hanotte <b....@criteo.com>
> wrote:
>
> Hello,
>
> We seem to be facing an issue with Flink where the physical plan after
> planner optimization is not correct.
> I have been able to reproduce the issue in the following "simplified" use
> case (it doesn't seem to happen in trivial cases):
>
>    1. We open 2 event streams ("clicks" and "displays")
>    2. We compute the click rate (ctr) over 2 hours and 6 hours sliding
>    windows.
>    3. We then union to output one row per hour with the max value between
>    the values computed over 2 and 6hrs.
>
> You can find SQL query below [1].
> After activating the debug logging for calcite, I can see that the
> original logical plan is valid: the top-level UNION is between two
> LogicalProjects, for the 2hr and 6hrs HOP windows [2].
> However, in the final Physical plan, we can see that both sides of the
> UNION now have 6hrs HOP windows instead of one window over 2hr and one over
> 6hr [3].
>
> I pushed a commit to my fork to reproduce the issue:
> https://github.com/BenoitHanotte/flink/commit/3d388f153b44bb35b57b8400407ff24a2e91661f
> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FBenoitHanotte%2Fflink%2Fcommit%2F3d388f153b44bb35b57b8400407ff24a2e91661f&data=02%7C01%7Cb.hanotte%40criteo.com%7C8d67980aeae743d0f35108d7991d781d%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637146224161863515&sdata=IVYMkxnS8HKu6rBQ5WAAFarkz37PMyqOs6WlNA4vyDg%3D&reserved=0>,
> unfortunately simplifying the query seems to make the issue disappear.
>
> Is there anything obvious I am missing, or do you have any pointer of what
> could trigger this issue? I looked at the different rules applied by the
> planner [4], but, as I am not familiar with them, I haven't yet been able
> to find the root cause.
>
> Thanks a lot for your help!
>
> Benoit Hanotte
>
> ********************************* [1] SQL query
> *********************************
>
>     WITH displays AS (
>         SELECT `timestamp`, 1 as nb_displays, 0 as nb_clicks FROM
> my_catalog.my_db.display
>     ),
>
>     clicks AS (
>         SELECT `timestamp`, 0 as nb_displays, 1 as nb_clicks FROM
> my_catalog.my_db.click
>     ),
>
>     counts_2h AS (
>         SELECT
>             SUM(nb_clicks) / SUM(nb_displays) as ctr,
>             HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
> as `timestamp`
>         FROM (
>             (SELECT * FROM displays)
>             UNION ALL
>             (SELECT * FROM clicks)
>         ) t
>         GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR,* INTERVAL '2' HOUR)*
>     ),
>
>     counts_6h AS (
>         SELECT
>             SUM(nb_clicks) / SUM(nb_displays) as ctr,
>             HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR)
> as `timestamp`
>         FROM (
>             (SELECT * FROM displays)
>             UNION ALL
>             (SELECT * FROM clicks)
>         ) t
>         GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, *INTERVAL '6' HOUR*)
>     )
>
>     SELECT
>         TUMBLE_END(`timestamp`, INTERVAL '1' HOUR) as `timestamp`,
>         MAX(ctr)
>     FROM (
>         (SELECT * FROM counts_6h)
>         UNION ALL
>         (SELECT * FROM counts_2h)
>     ) t
>     GROUP BY TUMBLE(`timestamp`, INTERVAL '1' HOUR)
>
>
> ********************* [2] Logical plan (before optimization)
> ***********************
>
>     LogicalProject(timestamp=[TUMBLE_END($0)], EXPR$1=[$1])
>       LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)])
>         LogicalProject($f0=[TUMBLE($1, 3600000:INTERVAL HOUR)], ctr=[$0])
>           LogicalUnion(all=[true])
>             LogicalProject(ctr=[$0], timestamp=[$1])
>               LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
>                 LogicalAggregate(group=[{0}], agg#0=[SUM($1)],
> agg#1=[SUM($2)])
>                   LogicalProject($f0=[*HOP($0, 3600000:INTERVAL HOUR,
> 21600000:INTERVAL HOUR)*], nb_clicks=[$2], nb_displays=[$1])
>                     LogicalProject(timestamp=[$0], nb_displays=[0],
> nb_clicks=[1])
>                       LogicalTableScan(table=[[my_catalog, my_db, click]])
>             LogicalProject(ctr=[$0], timestamp=[$1])
>               LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
>                 LogicalAggregate(group=[{0}], agg#0=[SUM($1)],
> agg#1=[SUM($2)])
>                   LogicalProject($f0=[*HOP($0, 3600000:INTERVAL HOUR,
> 7200000:INTERVAL HOUR)*], nb_clicks=[$2], nb_displays=[$1])
>                     LogicalProject(timestamp=[$0], nb_displays=[1],
> nb_clicks=[0])
>                       LogicalTableScan(table=[[my_catalog, my_db,
> display]])
>
>
> ****************** [3] Resulting physical plan (after optimization)
> ********************
>
>     DataStreamCalc(select=[w$end AS timestamp, CAST(EXPR$1) AS EXPR$1]):
> rowcount = 400.0, cumulative cost = {3200.0 rows, 3600.0 cpu, 4800.0 io},
> id = 556
>       DataStreamGroupWindowAggregate(window=[TumblingGroupWindow('w$,
> 'timestamp, 3600000.millis)], select=[MAX(ctr) AS EXPR$1, start('w$) AS
> w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS
> w$proctime]): rowcount = 400.0, cumulative cost = {2800.0 rows, 3200.0 cpu,
> 4800.0 io}, id = 555
>         DataStreamUnion(all=[true], union all=[ctr, timestamp]): rowcount
> = 400.0, cumulative cost = {2400.0 rows, 2800.0 cpu, 4800.0 io}, id = 554
>           DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime
> AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0
> cpu, 2400.0 io}, id = 548
>             DataStreamGroupWindowAggregate(window=[*SlidingGroupWindow('w$,
> 'timestamp,** 7200000.millis, 3600000.millis**)]*, select=[SUM(nb_clicks)
> AS $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end,
> rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0,
> cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 547
>               DataStreamUnion(all=[true], union all=[timestamp,
> nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows,
> 600.0 cpu, 2400.0 io}, id = 546
>                 DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS
> nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu,
> 1200.0 io}, id = 544
>                   StreamTableSourceScan(table=[[my_catalog, my_db,
> display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0,
> cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
>                 DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS
> nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu,
> 1200.0 io}, id = 545
>                   StreamTableSourceScan(table=[[my_catalog, my_db,
> click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0,
> cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542
>           DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime
> AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0
> cpu, 2400.0 io}, id = 553
>             DataStreamGroupWindowAggregate(window=[*SlidingGroupWindow('w$,
> 'timestamp, 7200000.millis, 3600000.millis)*], select=[SUM(nb_clicks) AS
> $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end,
> rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0,
> cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 552
>               DataStreamUnion(all=[true], union all=[timestamp,
> nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows,
> 600.0 cpu, 2400.0 io}, id = 551
>                 DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS
> nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu,
> 1200.0 io}, id = 549
>                   StreamTableSourceScan(table=[[my_catalog, my_db,
> display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0,
> cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
>                 DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS
> nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu,
> 1200.0 io}, id = 550
>                   StreamTableSourceScan(table=[[my_catalog, my_db,
> click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0,
> cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542
>
>