You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by zhangbin <zh...@aliyun.com.INVALID> on 2022/05/26 14:19:54 UTC

Flink-1.15.0 在使用cumulate window报key group 不在范围内的错误

        DataStream<DataSource> skuDataStream = stream.map(
                (MapFunction<String, DataSource>) s -> {
                    DataSource ret = JSONObject.parseObject(s, DataSource.class);
                    ret.setEvent_ts(dateTime.parse(ret.getLog_creation_time(), pattern).getMillis());

                    return ret;
                }
        ).filter((FilterFunction<DataSource>) DataSource -> DataSource.getregion_id() > 0
                && DataSource.getprovince_id() > 0
                && DataSource.getCity_id() > 0
                && DataSource.getDistrict_id() > 0
                && DataSource.getCode() > 0
                && DataSource.getCategory1_id() > 0
                && DataSource.getCategory2_id() > 0
                && DataSource.getCategory3_id() > 0 && DataSource.getUnion_id() != null);

        Schema schema = Schema.newBuilder()
                .column("code", "bigint")
                .column("flow_source", "string")
                .column("origin_sku_id","bigint")
                .column("sku_id","bigint")
                .column("region_id","bigint")
                .column("province_id","bigint")
                .column("city_id","bigint")
                .column("district_id","bigint")
                .column("category1_id","bigint")
                .column("category2_id","bigint")
                .column("category3_id","bigint")
                .column("union_id", "string")
                .column("channel1_id", "string")
                .column("channel2_id", "string")
                .columnByExpression("row_ltz", Expressions.callSql("TO_TIMESTAMP_LTZ(event_ts, 3)"))
                .watermark("row_ltz", "row_ltz - INTERVAL '10' SECOND")
                .build();
        Table inputTable = tableEnv.fromDataStream(skuDataStream, schema);

        tableEnv.createTemporaryView("source_table", inputTable);

        String sink = "CREATE TABLE sink_table (\n" +
                "    window_start string,\n" +
                "    window_end   string,\n" +
                "    region_id      bigint,\n" +
                "    province_id      bigint,\n" +
                "    city_id      bigint,\n" +
                "    district_id      bigint,\n" +
                "    code      bigint,\n" +
                "    category1_id      bigint,\n" +
                "    category2_id      bigint,\n" +
                "    category3_id      bigint,\n" +
                "    uv      bigint,\n" +
                "    uv1      bigint,\n" +
                "    uv2      bigint,\n" +
                "    uv3      bigint,\n" +
                "    uv4      bigint,\n" +
                "    uv5      bigint,\n" +
                "    uv6      bigint\n" +
                ") WITH (\n" +
                "  'connector' = 'print'\n" +
                ")";

        String sql =
                "insert into sink_table \n" +
                        "SELECT  \n" +
                        "cast(window_start as string) as window_start, \n" +
                        "cast(window_end as string) as window_end, \n" +
                        "region_id, \n" +
                        "province_id, \n" +
                        "city_id, \n" +
                        "district_id, \n" +
                        "code, \n" +
                        "category1_id, \n" +
                        "category2_id, \n" +
                        "category3_id, \n" +
                        "count(distinct union_id) as uv, \n" +
                        "COUNT(DISTINCT union_id) FILTER (WHERE channel1_id in ('1', '2')) AS uv1, \n" +
                        "COUNT(DISTINCT union_id) FILTER (WHERE channel1_id ='1') AS uv2, \n" +
                        "COUNT(DISTINCT union_id) FILTER (WHERE channel1_id ='3') AS uv3, \n" +
                        "COUNT(DISTINCT union_id) FILTER (WHERE channel1_id ='1' and channel2_id='1') AS uv4, \n" +
                        "COUNT(DISTINCT union_id) FILTER (WHERE channel1_id ='3' and channel2_id='1') AS uv5, \n" +
                        "COUNT(DISTINCT union_id) FILTER (WHERE channel1_id ='1' and origin_sku_id = sku_id) AS uv6 \n" +
                        "FROM TABLE(CUMULATE(TABLE source_table, DESCRIPTOR(row_ltz), INTERVAL '10' MINUTES, INTERVAL '1' DAY)) \n" +
                        //"GROUP BY window_start, window_end,code,region_id,province_id,city_id,district_id,category1_id,category2_id,category3_id";
                        "GROUP BY window_start, window_end," +
                        "GROUPING SETS (\n" +
                        "(code,region_id,province_id,city_id,district_id,category1_id,category2_id,category3_id),\n" +
                        "(code,region_id,province_id,city_id,district_id,category1_id,category2_id),\n" +
                        "(code,region_id,province_id,city_id,district_id,category1_id),\n" +
                        "\n" +
                        "(code,region_id,province_id,city_id,category1_id,category2_id,category3_id),\n" +
                        "(code,region_id,province_id,category1_id,category2_id,category3_id),\n" +
                        "(code,region_id,category1_id,category2_id,category3_id),\n" +
                        "\n" +
                        "(code,region_id,province_id,city_id,district_id),\n" +
                        "(code,region_id,province_id,city_id),\n" +
                        "(code,region_id,province_id),\n" +
                        "(code,region_id),\n" +
                        "\n" +
                        "(code,category1_id,category2_id,category3_id),\n" +
                        "(code,category1_id,category2_id),\n" +
                        "(code,category1_id),\n" +
                        "\n" +
                        "(code),\n" +
                        "\n" +
                        "\n" +
                        "(region_id,province_id,city_id,district_id,category1_id,category2_id,category3_id),\n" +
                        "(region_id,province_id,city_id,district_id,category1_id,category2_id),\n" +
                        "(region_id,province_id,city_id,district_id,category1_id),\n" +
                        "\n" +
                        "(region_id,province_id,city_id,category1_id,category2_id,category3_id),\n" +
                        "(region_id,province_id,category1_id,category2_id,category3_id),\n" +
                        "(region_id,category1_id,category2_id,category3_id),\n" +
                        "\n" +
                        "(region_id,province_id,city_id,district_id),\n" +
                        "(region_id,province_id,city_id),\n" +
                        "(region_id,province_id),\n" +
                        "(region_id),\n" +
                        "\n" +
                        "(category1_id,category2_id,category3_id),\n" +
                        "(category1_id,category2_id),\n" +
                        "(category1_id),\n" +
                        "()\n" +
                        ");";
        tableEnv.executeSql(sink);
        tableEnv.executeSql(sql);
        

}

Re: Flink-1.15.0 在使用cumulate window报key group 不在范围内的错误

Posted by godfrey he <go...@gmail.com>.
把异常栈也发出来吧

zhangbin <zh...@aliyun.com.invalid> 于2022年5月26日周四 22:54写道:

Re: Flink-1.15.0 在使用cumulate window报key group 不在范围内的错误

Posted by godfrey he <go...@gmail.com>.
使用了什么state backend?能描述一下产生上述问题的步骤吗?
是直接跑作业就产生上述错误,还是作业有基于sp启动,或者是中途重启过?

zhangbin <zh...@aliyun.com> 于2022年5月27日周五 13:34写道:

>
> Retry
> ---- 回复的原邮件 ----
> 发件人 zhangbin<zh...@aliyun.com> <zh...@aliyun.com>
> 发送日期 2022年05月27日 10:11
> 收件人 godfreyhe@gmail.com<go...@gmail.com> <go...@gmail.com>
> 抄送人 user-zh<us...@flink.apache.org> <us...@flink.apache.org>
> 主题 回复:Flink-1.15.0 在使用cumulate window报key group 不在范围内的错误
> 确认了下邮件的内容的确是异常信息,这里却把附件的内容展示出来了。我再重新发一次。
>
> 我们在使用Flink-1.15.0 cumulate window + grouping sets 执行SQL过程中,发现个问题,报如下错误:
>    java.lang.IllegalArgumentException: key group from 93 to 95 does not
> contain 478
>    at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
>    at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
>    at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
>    at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
>    at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
>    at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
>    at
> org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl.registerEventTimeWindowTimer(WindowTimerServiceImpl.java:61)
>    at
> org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl.registerEventTimeWindowTimer(WindowTimerServiceImpl.java:27)
>    at
> org.apache.flink.table.runtime.operators.aggregate.window.processors.AbstractWindowAggProcessor.processElement(AbstractWindowAggProcessor.java:164)
>    at
> org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator.processElement(SlicingWindowOperator.java:221)
>    at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>    at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>    at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>    at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>    at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>    at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>    at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>    at java.lang.Thread.run(Thread.java:748)
>
>