You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Manas Kale <ma...@gmail.com> on 2020/07/13 09:16:19 UTC

PyFlink Table API - "A group window expects a time attribute for grouping in a stream environment."

Hi,
I have the following piece of code (for pyFlink v1.11) :

t_env.from_path(INPUT_TABLE) \
    .select("monitorId, data, rowtime") \
    .window(Tumble.over("5.minutes").on("rowtime").alias("five_sec_window")) \
    .group_by("five_sec_window, monitorId") \
    .select("monitorId, data.avg, data.min, data.max,
five_sec_window.rowtime") \
    .execute_insert(OUTPUT_TABLE)

Which is generating the exception :

Traceback (most recent call last):


* File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 124, in
<module>    .select("monitorId, data.avg, data.min, data.max,
five_sec_window.rowtime") \*  File
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table.py",
line 907, in select
    return Table(self._j_table.select(fields), self._t_env)
  File
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 147, in deco
    return f(*a, **kw)
  File
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o87.select.

*: org.apache.flink.table.api.ValidationException: A group window expects a
time attribute for grouping in a stream environment.*

The "rowtime" attribute in INPUT_TABLE is created as :

exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env,

environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
                                      )

...

     .field("rowtime", DataTypes.TIMESTAMP(3))
        .rowtime(
            Rowtime()
            .timestamps_from_field("time_st")
            .watermarks_periodic_ascending())

).create_temporary_table(INPUT_TABLE)


What is wrong with the code? I believe that I have already indicated which
attribute has to be treated as the time attribute.

Thank you,
Manas

Re: PyFlink Table API - "A group window expects a time attribute for grouping in a stream environment."

Posted by Xingbo Huang <hx...@gmail.com>.
Hi Manas,

Yes, this is a bug which I have also encountered in the Descriptor API, but
I don't found the corresponding issue. You can create an issue to report
this problem. There are similar bugs in the current descriptor API, so DDL
is more recommended way. Now the community has started a discussion on
refactoring the Descriptor API[1]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-129-Refactor-Descriptor-API-to-register-connector-in-Table-API-tt42995.html

Best,
Xingbo

Manas Kale <ma...@gmail.com> 于2020年7月14日周二 下午12:50写道:

> @Roman - yes, I have the error if I do that.
> @Xingbo Huang <hx...@gmail.com> - okay, I didn't know DDL was the more
> recommended way.
> Please let me know if you confirm that this is a bug.
> Thanks!
>
> On Mon, Jul 13, 2020 at 5:07 PM Xingbo Huang <hx...@gmail.com> wrote:
>
>> Hi Manas,
>> Maybe it is the bug of Java Descriptor. You can try the DDL[1] way which
>> is the more recommended way
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/
>>
>> Best,
>> Xingbo
>>
>> Khachatryan Roman <kh...@gmail.com> 于2020年7月13日周一 下午7:23写道:
>>
>>> Hi Manas,
>>>
>>> Do you have the same error if you replace
>>>
>>>     .group_by("five_sec_window, monitorId") \
>>>
>>> with
>>>
>>>     .group_by("five_sec_window") \
>>>
>>> ?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Mon, Jul 13, 2020 at 11:16 AM Manas Kale <ma...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> I have the following piece of code (for pyFlink v1.11) :
>>>>
>>>> t_env.from_path(INPUT_TABLE) \
>>>>     .select("monitorId, data, rowtime") \
>>>>     .window(Tumble.over("5.minutes").on("rowtime").alias("five_sec_window")) \
>>>>     .group_by("five_sec_window, monitorId") \
>>>>     .select("monitorId, data.avg, data.min, data.max, five_sec_window.rowtime") \
>>>>     .execute_insert(OUTPUT_TABLE)
>>>>
>>>> Which is generating the exception :
>>>>
>>>> Traceback (most recent call last):
>>>>
>>>>
>>>> * File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 124,
>>>> in <module>    .select("monitorId, data.avg, data.min, data.max,
>>>> five_sec_window.rowtime") \*  File
>>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table.py",
>>>> line 907, in select
>>>>     return Table(self._j_table.select(fields), self._t_env)
>>>>   File
>>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
>>>> line 1286, in __call__
>>>>     answer, self.gateway_client, self.target_id, self.name)
>>>>   File
>>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>>>> line 147, in deco
>>>>     return f(*a, **kw)
>>>>   File
>>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
>>>> line 328, in get_return_value
>>>>     format(target_id, ".", name), value)
>>>> py4j.protocol.Py4JJavaError: An error occurred while calling o87.select.
>>>>
>>>> *: org.apache.flink.table.api.ValidationException: A group window
>>>> expects a time attribute for grouping in a stream environment.*
>>>>
>>>> The "rowtime" attribute in INPUT_TABLE is created as :
>>>>
>>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>> t_env = StreamTableEnvironment.create(exec_env,
>>>>                                       environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>>>                                       )
>>>>
>>>> ...
>>>>
>>>>      .field("rowtime", DataTypes.TIMESTAMP(3))
>>>>         .rowtime(
>>>>             Rowtime()
>>>>             .timestamps_from_field("time_st")
>>>>             .watermarks_periodic_ascending())
>>>>
>>>> ).create_temporary_table(INPUT_TABLE)
>>>>
>>>>
>>>> What is wrong with the code? I believe that I have already indicated
>>>> which attribute has to be treated as the time attribute.
>>>>
>>>> Thank you,
>>>> Manas
>>>>
>>>

Re: PyFlink Table API - "A group window expects a time attribute for grouping in a stream environment."

Posted by Manas Kale <ma...@gmail.com>.
@Roman - yes, I have the error if I do that.
@Xingbo Huang <hx...@gmail.com> - okay, I didn't know DDL was the more
recommended way.
Please let me know if you confirm that this is a bug.
Thanks!

On Mon, Jul 13, 2020 at 5:07 PM Xingbo Huang <hx...@gmail.com> wrote:

> Hi Manas,
> Maybe it is the bug of Java Descriptor. You can try the DDL[1] way which
> is the more recommended way
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/
>
> Best,
> Xingbo
>
> Khachatryan Roman <kh...@gmail.com> 于2020年7月13日周一 下午7:23写道:
>
>> Hi Manas,
>>
>> Do you have the same error if you replace
>>
>>     .group_by("five_sec_window, monitorId") \
>>
>> with
>>
>>     .group_by("five_sec_window") \
>>
>> ?
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Jul 13, 2020 at 11:16 AM Manas Kale <ma...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I have the following piece of code (for pyFlink v1.11) :
>>>
>>> t_env.from_path(INPUT_TABLE) \
>>>     .select("monitorId, data, rowtime") \
>>>     .window(Tumble.over("5.minutes").on("rowtime").alias("five_sec_window")) \
>>>     .group_by("five_sec_window, monitorId") \
>>>     .select("monitorId, data.avg, data.min, data.max, five_sec_window.rowtime") \
>>>     .execute_insert(OUTPUT_TABLE)
>>>
>>> Which is generating the exception :
>>>
>>> Traceback (most recent call last):
>>>
>>>
>>> * File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 124,
>>> in <module>    .select("monitorId, data.avg, data.min, data.max,
>>> five_sec_window.rowtime") \*  File
>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table.py",
>>> line 907, in select
>>>     return Table(self._j_table.select(fields), self._t_env)
>>>   File
>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
>>> line 1286, in __call__
>>>     answer, self.gateway_client, self.target_id, self.name)
>>>   File
>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>>> line 147, in deco
>>>     return f(*a, **kw)
>>>   File
>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
>>> line 328, in get_return_value
>>>     format(target_id, ".", name), value)
>>> py4j.protocol.Py4JJavaError: An error occurred while calling o87.select.
>>>
>>> *: org.apache.flink.table.api.ValidationException: A group window
>>> expects a time attribute for grouping in a stream environment.*
>>>
>>> The "rowtime" attribute in INPUT_TABLE is created as :
>>>
>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>> t_env = StreamTableEnvironment.create(exec_env,
>>>                                       environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>>                                       )
>>>
>>> ...
>>>
>>>      .field("rowtime", DataTypes.TIMESTAMP(3))
>>>         .rowtime(
>>>             Rowtime()
>>>             .timestamps_from_field("time_st")
>>>             .watermarks_periodic_ascending())
>>>
>>> ).create_temporary_table(INPUT_TABLE)
>>>
>>>
>>> What is wrong with the code? I believe that I have already indicated
>>> which attribute has to be treated as the time attribute.
>>>
>>> Thank you,
>>> Manas
>>>
>>

Re: PyFlink Table API - "A group window expects a time attribute for grouping in a stream environment."

Posted by Xingbo Huang <hx...@gmail.com>.
Hi Manas,
Maybe it is the bug of Java Descriptor. You can try the DDL[1] way which is
the more recommended way

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/

Best,
Xingbo

Khachatryan Roman <kh...@gmail.com> 于2020年7月13日周一 下午7:23写道:

> Hi Manas,
>
> Do you have the same error if you replace
>
>     .group_by("five_sec_window, monitorId") \
>
> with
>
>     .group_by("five_sec_window") \
>
> ?
>
> Regards,
> Roman
>
>
> On Mon, Jul 13, 2020 at 11:16 AM Manas Kale <ma...@gmail.com> wrote:
>
>> Hi,
>> I have the following piece of code (for pyFlink v1.11) :
>>
>> t_env.from_path(INPUT_TABLE) \
>>     .select("monitorId, data, rowtime") \
>>     .window(Tumble.over("5.minutes").on("rowtime").alias("five_sec_window")) \
>>     .group_by("five_sec_window, monitorId") \
>>     .select("monitorId, data.avg, data.min, data.max, five_sec_window.rowtime") \
>>     .execute_insert(OUTPUT_TABLE)
>>
>> Which is generating the exception :
>>
>> Traceback (most recent call last):
>>
>>
>> * File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 124, in
>> <module>    .select("monitorId, data.avg, data.min, data.max,
>> five_sec_window.rowtime") \*  File
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table.py",
>> line 907, in select
>>     return Table(self._j_table.select(fields), self._t_env)
>>   File
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
>> line 1286, in __call__
>>     answer, self.gateway_client, self.target_id, self.name)
>>   File
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>> line 147, in deco
>>     return f(*a, **kw)
>>   File
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
>> line 328, in get_return_value
>>     format(target_id, ".", name), value)
>> py4j.protocol.Py4JJavaError: An error occurred while calling o87.select.
>>
>> *: org.apache.flink.table.api.ValidationException: A group window expects
>> a time attribute for grouping in a stream environment.*
>>
>> The "rowtime" attribute in INPUT_TABLE is created as :
>>
>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>> t_env = StreamTableEnvironment.create(exec_env,
>>                                       environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>                                       )
>>
>> ...
>>
>>      .field("rowtime", DataTypes.TIMESTAMP(3))
>>         .rowtime(
>>             Rowtime()
>>             .timestamps_from_field("time_st")
>>             .watermarks_periodic_ascending())
>>
>> ).create_temporary_table(INPUT_TABLE)
>>
>>
>> What is wrong with the code? I believe that I have already indicated
>> which attribute has to be treated as the time attribute.
>>
>> Thank you,
>> Manas
>>
>

Re: PyFlink Table API - "A group window expects a time attribute for grouping in a stream environment."

Posted by Khachatryan Roman <kh...@gmail.com>.
Hi Manas,

Do you have the same error if you replace

    .group_by("five_sec_window, monitorId") \

with

    .group_by("five_sec_window") \

?

Regards,
Roman


On Mon, Jul 13, 2020 at 11:16 AM Manas Kale <ma...@gmail.com> wrote:

> Hi,
> I have the following piece of code (for pyFlink v1.11) :
>
> t_env.from_path(INPUT_TABLE) \
>     .select("monitorId, data, rowtime") \
>     .window(Tumble.over("5.minutes").on("rowtime").alias("five_sec_window")) \
>     .group_by("five_sec_window, monitorId") \
>     .select("monitorId, data.avg, data.min, data.max, five_sec_window.rowtime") \
>     .execute_insert(OUTPUT_TABLE)
>
> Which is generating the exception :
>
> Traceback (most recent call last):
>
>
> * File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 124, in
> <module>    .select("monitorId, data.avg, data.min, data.max,
> five_sec_window.rowtime") \*  File
> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table.py",
> line 907, in select
>     return Table(self._j_table.select(fields), self._t_env)
>   File
> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
> line 1286, in __call__
>     answer, self.gateway_client, self.target_id, self.name)
>   File
> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
> line 147, in deco
>     return f(*a, **kw)
>   File
> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
> line 328, in get_return_value
>     format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o87.select.
>
> *: org.apache.flink.table.api.ValidationException: A group window expects
> a time attribute for grouping in a stream environment.*
>
> The "rowtime" attribute in INPUT_TABLE is created as :
>
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> t_env = StreamTableEnvironment.create(exec_env,
>                                       environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>                                       )
>
> ...
>
>      .field("rowtime", DataTypes.TIMESTAMP(3))
>         .rowtime(
>             Rowtime()
>             .timestamps_from_field("time_st")
>             .watermarks_periodic_ascending())
>
> ).create_temporary_table(INPUT_TABLE)
>
>
> What is wrong with the code? I believe that I have already indicated which
> attribute has to be treated as the time attribute.
>
> Thank you,
> Manas
>