You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 明寒 <jo...@qq.com.INVALID> on 2022/06/30 10:09:01 UTC

flink sql生成执行图中GroupWindowAggregate算子数不符合预期

HI:​在flink1.12中,对于如下的Sql,生成的执行图中有两个GroupWindowAggregate算子,该如何调整Sql或者配置保证只生成一个GroupWindowAggregate算子
CREATE TEMPORARY TABLE RawSource (
    `key` STRING,
    `accessNum` INT,
    `status` STRING,
    rowTime TIMESTAMP(3),
    WATERMARK FOR rowTime AS rowTime - INTERVAL '10' SECOND
) WITH (
    'connector' = 'datagen'
);

CREATE TEMPORARY TABLE TrashSink (
    `tag` STRING,
    `key` STRING,
    `value` BIGINT
) WITH (
    'connector' = 'blackhole'
);

CREATE TEMPORARY VIEW AccView AS SELECT
    COUNT(*) AS accAll,
    COUNT(*) FILTER (WHERE status in ('error')) AS accError,
    `key`
FROM RawSource
GROUP BY TUMBLE(rowTime, INTERVAL '60' SECOND),`key`;

INSERT INTO TrashSink SELECT * FROM (
    SELECT 'accAll', `key`, accAll FROM AccView
    UNION ALL
    SELECT 'accErr', `key`, accError FROM AccView
);

Re: flink sql生成执行图中GroupWindowAggregate算子数不符合预期

Posted by Shengkai Fang <fs...@gmail.com>.
hi.

能展示下具体想要的plan 和实际的 plan 吗?

Best,
Shengkai

明寒 <jo...@qq.com.invalid> 于2022年7月1日周五 09:50写道:

>
> HI:​在flink1.12中,对于如下的Sql,生成的执行图中有两个GroupWindowAggregate算子,该如何调整Sql或者配置保证只生成一个GroupWindowAggregate算子
> CREATE TEMPORARY TABLE RawSource (
>     `key` STRING,
>     `accessNum` INT,
>     `status` STRING,
>     rowTime TIMESTAMP(3),
>     WATERMARK FOR rowTime AS rowTime - INTERVAL '10' SECOND
> ) WITH (
>     'connector' = 'datagen'
> );
>
> CREATE TEMPORARY TABLE TrashSink (
>     `tag` STRING,
>     `key` STRING,
>     `value` BIGINT
> ) WITH (
>     'connector' = 'blackhole'
> );
>
> CREATE TEMPORARY VIEW AccView AS SELECT
>     COUNT(*) AS accAll,
>     COUNT(*) FILTER (WHERE status in ('error')) AS accError,
>     `key`
> FROM RawSource
> GROUP BY TUMBLE(rowTime, INTERVAL '60' SECOND),`key`;
>
> INSERT INTO TrashSink SELECT * FROM (
>     SELECT 'accAll', `key`, accAll FROM AccView
>     UNION ALL
>     SELECT 'accErr', `key`, accError FROM AccView
> );