You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 元灵 <li...@126.com> on 2020/06/03 09:39:31 UTC
pyflink window创建失败
大家好,请教个问题:
我在pyflink中使用SQL DDL创建kafka源,如下:
kafka_source_ddl = """
CREATE TABLE kafka_source_tb (
name VARCHAR,
number INT,
msgtime TIMESTAMP,
WATERMARK FOR msgtime AS msgtime
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'mytopic',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'format.type' = 'json',
'format.derive-schema' = 'true'
)
"""
st_env.sql_update(kafka_source_ddl)
在使用窗口时报错,代码如下:
st_env.from_path("kafka_source_tb") \
.window(Slide.over("10.secends").every("1.secends").on("msgtime").alias("msgtime")) \
.group_by("msgtime") \
.select("msgtime.start as b, msgtime.end as c, msgtime.rowtime as d") \
报错如下
: org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.
at org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
at org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
at org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
at org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:243)
at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:762)
at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:747)
我看到有人说直接使用api的rowtime函数好像有bug,就选了用DDL的,这个error是否相关?或者是我哪里写错了?
请大家帮忙看一下
谢谢!
Re:Re: pyflink window创建失败
Posted by 元灵 <li...@126.com>.
谢谢回复,等1.11.0出来我再试试。
在 2020-06-03 18:00:06,"godfrey he" <go...@gmail.com> 写道:
> hi 元灵,
> 这个是个已知bug: https://issues.apache.org/jira/browse/FLINK-17753,目前正在fix。
>
>Bests,
>Godfrey
>
>元灵 <li...@126.com> 于2020年6月3日周三 下午5:39写道:
>
>> 大家好,请教个问题:
>> 我在pyflink中使用SQL DDL创建kafka源,如下:
>> kafka_source_ddl = """
>> CREATE TABLE kafka_source_tb (
>> name VARCHAR,
>> number INT,
>> msgtime TIMESTAMP,
>> WATERMARK FOR msgtime AS msgtime
>> ) WITH (
>> 'connector.type' = 'kafka',
>> 'connector.version' = 'universal',
>> 'connector.topic' = 'mytopic',
>> 'connector.properties.zookeeper.connect' = 'localhost:2181',
>> 'connector.properties.bootstrap.servers' = 'localhost:9092',
>> 'format.type' = 'json',
>> 'format.derive-schema' = 'true'
>> )
>> """
>> st_env.sql_update(kafka_source_ddl)
>>
>>
>> 在使用窗口时报错,代码如下:
>> st_env.from_path("kafka_source_tb") \
>>
>> .window(Slide.over("10.secends").every("1.secends").on("msgtime").alias("msgtime"))
>> \
>> .group_by("msgtime") \
>> .select("msgtime.start as b, msgtime.end as c, msgtime.rowtime
>> as d") \
>>
>>
>> 报错如下
>> : org.apache.flink.table.api.ValidationException: A group window expects a
>> time attribute for grouping in a stream environment.
>> at
>> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
>> at
>> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
>> at
>> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
>> at
>> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
>> at
>> org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:243)
>> at
>> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:762)
>> at
>> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:747)
>>
>>
>> 我看到有人说直接使用api的rowtime函数好像有bug,就选了用DDL的,这个error是否相关?或者是我哪里写错了?
>>
>>
>> 请大家帮忙看一下
>> 谢谢!
>>
>>
Re: pyflink window创建失败
Posted by godfrey he <go...@gmail.com>.
hi 元灵,
这个是个已知bug: https://issues.apache.org/jira/browse/FLINK-17753,目前正在fix。
Bests,
Godfrey
元灵 <li...@126.com> 于2020年6月3日周三 下午5:39写道:
> 大家好,请教个问题:
> 我在pyflink中使用SQL DDL创建kafka源,如下:
> kafka_source_ddl = """
> CREATE TABLE kafka_source_tb (
> name VARCHAR,
> number INT,
> msgtime TIMESTAMP,
> WATERMARK FOR msgtime AS msgtime
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal',
> 'connector.topic' = 'mytopic',
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
> 'connector.properties.bootstrap.servers' = 'localhost:9092',
> 'format.type' = 'json',
> 'format.derive-schema' = 'true'
> )
> """
> st_env.sql_update(kafka_source_ddl)
>
>
> 在使用窗口时报错,代码如下:
> st_env.from_path("kafka_source_tb") \
>
> .window(Slide.over("10.secends").every("1.secends").on("msgtime").alias("msgtime"))
> \
> .group_by("msgtime") \
> .select("msgtime.start as b, msgtime.end as c, msgtime.rowtime
> as d") \
>
>
> 报错如下
> : org.apache.flink.table.api.ValidationException: A group window expects a
> time attribute for grouping in a stream environment.
> at
> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
> at
> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
> at
> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
> at
> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:243)
> at
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:762)
> at
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:747)
>
>
> 我看到有人说直接使用api的rowtime函数好像有bug,就选了用DDL的,这个error是否相关?或者是我哪里写错了?
>
>
> 请大家帮忙看一下
> 谢谢!
>
>