You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by zhangbin <zh...@aliyun.com.INVALID> on 2022/05/26 14:19:54 UTC
Flink-1.15.0 在使用cumulate window报key group 不在范围内的错误
DataStream<DataSource> skuDataStream = stream.map(
(MapFunction<String, DataSource>) s -> {
DataSource ret = JSONObject.parseObject(s, DataSource.class);
ret.setEvent_ts(dateTime.parse(ret.getLog_creation_time(), pattern).getMillis());
return ret;
}
).filter((FilterFunction<DataSource>) DataSource -> DataSource.getregion_id() > 0
&& DataSource.getprovince_id() > 0
&& DataSource.getCity_id() > 0
&& DataSource.getDistrict_id() > 0
&& DataSource.getCode() > 0
&& DataSource.getCategory1_id() > 0
&& DataSource.getCategory2_id() > 0
&& DataSource.getCategory3_id() > 0 && DataSource.getUnion_id() != null);
Schema schema = Schema.newBuilder()
.column("code", "bigint")
.column("flow_source", "string")
.column("origin_sku_id","bigint")
.column("sku_id","bigint")
.column("region_id","bigint")
.column("province_id","bigint")
.column("city_id","bigint")
.column("district_id","bigint")
.column("category1_id","bigint")
.column("category2_id","bigint")
.column("category3_id","bigint")
.column("union_id", "string")
.column("channel1_id", "string")
.column("channel2_id", "string")
.columnByExpression("row_ltz", Expressions.callSql("TO_TIMESTAMP_LTZ(event_ts, 3)"))
.watermark("row_ltz", "row_ltz - INTERVAL '10' SECOND")
.build();
Table inputTable = tableEnv.fromDataStream(skuDataStream, schema);
tableEnv.createTemporaryView("source_table", inputTable);
String sink = "CREATE TABLE sink_table (\n" +
" window_start string,\n" +
" window_end string,\n" +
" region_id bigint,\n" +
" province_id bigint,\n" +
" city_id bigint,\n" +
" district_id bigint,\n" +
" code bigint,\n" +
" category1_id bigint,\n" +
" category2_id bigint,\n" +
" category3_id bigint,\n" +
" uv bigint,\n" +
" uv1 bigint,\n" +
" uv2 bigint,\n" +
" uv3 bigint,\n" +
" uv4 bigint,\n" +
" uv5 bigint,\n" +
" uv6 bigint\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")";
String sql =
"insert into sink_table \n" +
"SELECT \n" +
"cast(window_start as string) as window_start, \n" +
"cast(window_end as string) as window_end, \n" +
"region_id, \n" +
"province_id, \n" +
"city_id, \n" +
"district_id, \n" +
"code, \n" +
"category1_id, \n" +
"category2_id, \n" +
"category3_id, \n" +
"count(distinct union_id) as uv, \n" +
"COUNT(DISTINCT union_id) FILTER (WHERE channel1_id in ('1', '2')) AS uv1, \n" +
"COUNT(DISTINCT union_id) FILTER (WHERE channel1_id ='1') AS uv2, \n" +
"COUNT(DISTINCT union_id) FILTER (WHERE channel1_id ='3') AS uv3, \n" +
"COUNT(DISTINCT union_id) FILTER (WHERE channel1_id ='1' and channel2_id='1') AS uv4, \n" +
"COUNT(DISTINCT union_id) FILTER (WHERE channel1_id ='3' and channel2_id='1') AS uv5, \n" +
"COUNT(DISTINCT union_id) FILTER (WHERE channel1_id ='1' and origin_sku_id = sku_id) AS uv6 \n" +
"FROM TABLE(CUMULATE(TABLE source_table, DESCRIPTOR(row_ltz), INTERVAL '10' MINUTES, INTERVAL '1' DAY)) \n" +
//"GROUP BY window_start, window_end,code,region_id,province_id,city_id,district_id,category1_id,category2_id,category3_id";
"GROUP BY window_start, window_end," +
"GROUPING SETS (\n" +
"(code,region_id,province_id,city_id,district_id,category1_id,category2_id,category3_id),\n" +
"(code,region_id,province_id,city_id,district_id,category1_id,category2_id),\n" +
"(code,region_id,province_id,city_id,district_id,category1_id),\n" +
"\n" +
"(code,region_id,province_id,city_id,category1_id,category2_id,category3_id),\n" +
"(code,region_id,province_id,category1_id,category2_id,category3_id),\n" +
"(code,region_id,category1_id,category2_id,category3_id),\n" +
"\n" +
"(code,region_id,province_id,city_id,district_id),\n" +
"(code,region_id,province_id,city_id),\n" +
"(code,region_id,province_id),\n" +
"(code,region_id),\n" +
"\n" +
"(code,category1_id,category2_id,category3_id),\n" +
"(code,category1_id,category2_id),\n" +
"(code,category1_id),\n" +
"\n" +
"(code),\n" +
"\n" +
"\n" +
"(region_id,province_id,city_id,district_id,category1_id,category2_id,category3_id),\n" +
"(region_id,province_id,city_id,district_id,category1_id,category2_id),\n" +
"(region_id,province_id,city_id,district_id,category1_id),\n" +
"\n" +
"(region_id,province_id,city_id,category1_id,category2_id,category3_id),\n" +
"(region_id,province_id,category1_id,category2_id,category3_id),\n" +
"(region_id,category1_id,category2_id,category3_id),\n" +
"\n" +
"(region_id,province_id,city_id,district_id),\n" +
"(region_id,province_id,city_id),\n" +
"(region_id,province_id),\n" +
"(region_id),\n" +
"\n" +
"(category1_id,category2_id,category3_id),\n" +
"(category1_id,category2_id),\n" +
"(category1_id),\n" +
"()\n" +
");";
tableEnv.executeSql(sink);
tableEnv.executeSql(sql);
}
Re: Flink-1.15.0 在使用cumulate window报key group 不在范围内的错误
Posted by godfrey he <go...@gmail.com>.
把异常栈也发出来吧
zhangbin <zh...@aliyun.com.invalid> 于2022年5月26日周四 22:54写道:
Re: Flink-1.15.0 在使用cumulate window报key group 不在范围内的错误
Posted by godfrey he <go...@gmail.com>.
使用了什么state backend?能描述一下产生上述问题的步骤吗?
是直接跑作业就产生上述错误,还是作业有基于sp启动,或者是中途重启过?
zhangbin <zh...@aliyun.com> 于2022年5月27日周五 13:34写道:
>
> Retry
> ---- 回复的原邮件 ----
> 发件人 zhangbin<zh...@aliyun.com> <zh...@aliyun.com>
> 发送日期 2022年05月27日 10:11
> 收件人 godfreyhe@gmail.com<go...@gmail.com> <go...@gmail.com>
> 抄送人 user-zh<us...@flink.apache.org> <us...@flink.apache.org>
> 主题 回复:Flink-1.15.0 在使用cumulate window报key group 不在范围内的错误
> 确认了下邮件的内容的确是异常信息,这里却把附件的内容展示出来了。我再重新发一次。
>
> 我们在使用Flink-1.15.0 cumulate window + grouping sets 执行SQL过程中,发现个问题,报如下错误:
> java.lang.IllegalArgumentException: key group from 93 to 95 does not
> contain 478
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
> at
> org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl.registerEventTimeWindowTimer(WindowTimerServiceImpl.java:61)
> at
> org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl.registerEventTimeWindowTimer(WindowTimerServiceImpl.java:27)
> at
> org.apache.flink.table.runtime.operators.aggregate.window.processors.AbstractWindowAggProcessor.processElement(AbstractWindowAggProcessor.java:164)
> at
> org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator.processElement(SlicingWindowOperator.java:221)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:748)
>
>