You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "ivan.rosero@agilent.com" <iv...@agilent.com> on 2022/04/06 14:45:07 UTC
python table api
Hello,
I'm trying to understand tumbling windows at the level of the python table api. For this short example:
Input csv
Print output
2022-01-01 10:00:23.000000000, "data line 3"
2022-01-01 10:00:24.000000000, "data line 4"
2022-01-01 10:00:18.000000000, "data line 1"
2022-01-01 10:00:25.000000000, "data line 5"
2022-01-01 10:00:26.000000000, "data line 6"
2022-01-01 10:00:27.000000000, "data line 7"
2022-01-01 10:00:22.000000000, "data line 2"
2022-01-01 10:00:28.000000000, "data line 8"
2022-01-01 10:00:29.000000000, "data line 9"
2022-01-01 10:00:30.000000000, "data line 10"
+I[2022-01-01T10:00:23, "data line 3"]
+I[2022-01-01T10:00:24, "data line 4"]
+I[2022-01-01T10:00:18, "data line 1"]
+I[2022-01-01T10:00:25, "data line 5"]
+I[2022-01-01T10:00:26, "data line 6"]
+I[2022-01-01T10:00:27, "data line 7"]
+I[2022-01-01T10:00:28, "data line 8"]
+I[2022-01-01T10:00:29, "data line 9"]
+I[2022-01-01T10:00:22, "data line 2"]
+I[2022-01-01T10:00:30, "data line 10"]
Below, I'm trying to process this data in 5 second windows. So I would at least expect not to see the bold line above, in print output.
Am I not really configuring tumbling windows in the source table?
from pyflink.table import EnvironmentSettings, TableEnvironment
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("parallelism.default", "1")
t_env.execute_sql("""
create table source (
ts TIMESTAMP(3),
data STRING,
WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '{}'
)
""".format("source.csv"))
t_env.execute_sql("""
CREATE TABLE print (
ts TIMESTAMP(3),
data STRING
) WITH (
'connector' = 'print'
)
""")
t_env.execute_sql("INSERT INTO print SELECT * FROM source").wait()
Thank you,
Ivan
RE: python table api
Posted by "ivan.rosero@agilent.com" <iv...@agilent.com>.
Hello Dian,
Indeed. Thank you very much. Now getting
+I[2022-01-01T10:00:20, 2022-01-01T10:00:25, 2]
+I[2022-01-01T10:00:25, 2022-01-01T10:00:30, 5]
+I[2022-01-01T10:00:30, 2022-01-01T10:00:35, 1]
from pyflink.table import EnvironmentSettings, TableEnvironment
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("parallelism.default", "1")
t_env.execute_sql("""
create table source (
ts TIMESTAMP(3),
data STRING,
WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '{}'
)
""".format("source.csv"))
t_env.execute_sql("""
CREATE TABLE print (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
how_any BIGINT
) WITH (
'connector' = 'print'
)
""")
t_env.execute_sql("""
INSERT INTO print (
SELECT window_start, window_end, COUNT(*) as how_any
FROM TABLE(
TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '5' SECONDS))
GROUP BY window_start, window_end
)
""").wait()
From: Dian Fu <di...@gmail.com>
Sent: Thursday, April 7, 2022 3:08 AM
To: ROSERO,IVAN (Agilent CHE) <iv...@agilent.com>
Cc: user <us...@flink.apache.org>
Subject: Re: python table api
External Sender - Use caution opening files, clicking links, or responding to requests.
You have not configured the tumbling window at all. Please refer to [1] for more details.
Regards,
Dian
[1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-agg/#group-window-aggregation<https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fnightlies.apache.org%2Fflink%2Fflink-docs-release-1.14%2Fdocs%2Fdev%2Ftable%2Fsql%2Fqueries%2Fwindow-agg%2F%23group-window-aggregation&data=04%7C01%7Civan.rosero%40agilent.com%7Cb26fbd594a37416f4ace08da183333b6%7Ca9c0bc098b46420693512ba12fb4a5c0%7C0%7C0%7C637848905883722815%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=XS0PYznJn8YcBQlyf538Jcq%2BkWlXpEkj6HEN%2B%2FsvrDU%3D&reserved=0>
On Wed, Apr 6, 2022 at 10:46 PM ivan.rosero@agilent.com<ma...@agilent.com> <iv...@agilent.com>> wrote:
Hello,
I'm trying to understand tumbling windows at the level of the python table api. For this short example:
Input csv
Print output
2022-01-01 10:00:23.000000000, "data line 3"
2022-01-01 10:00:24.000000000, "data line 4"
2022-01-01 10:00:18.000000000, "data line 1"
2022-01-01 10:00:25.000000000, "data line 5"
2022-01-01 10:00:26.000000000, "data line 6"
2022-01-01 10:00:27.000000000, "data line 7"
2022-01-01 10:00:22.000000000, "data line 2"
2022-01-01 10:00:28.000000000, "data line 8"
2022-01-01 10:00:29.000000000, "data line 9"
2022-01-01 10:00:30.000000000, "data line 10"
+I[2022-01-01T10:00:23, "data line 3"]
+I[2022-01-01T10:00:24, "data line 4"]
+I[2022-01-01T10:00:18, "data line 1"]
+I[2022-01-01T10:00:25, "data line 5"]
+I[2022-01-01T10:00:26, "data line 6"]
+I[2022-01-01T10:00:27, "data line 7"]
+I[2022-01-01T10:00:28, "data line 8"]
+I[2022-01-01T10:00:29, "data line 9"]
+I[2022-01-01T10:00:22, "data line 2"]
+I[2022-01-01T10:00:30, "data line 10"]
Below, I'm trying to process this data in 5 second windows. So I would at least expect not to see the bold line above, in print output.
Am I not really configuring tumbling windows in the source table?
from pyflink.table import EnvironmentSettings, TableEnvironment
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("parallelism.default", "1")
t_env.execute_sql("""
create table source (
ts TIMESTAMP(3),
data STRING,
WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '{}'
)
""".format("source.csv"))
t_env.execute_sql("""
CREATE TABLE print (
ts TIMESTAMP(3),
data STRING
) WITH (
'connector' = 'print'
)
""")
t_env.execute_sql("INSERT INTO print SELECT * FROM source").wait()
Thank you,
Ivan
Re: python table api
Posted by Francesco Guardiani <fr...@ververica.com>.
As Dian sad, your insert into query is just selecting records from source
to print, flowing them without any computation whatsoever.
Please check out [1] and [2] to learn how to develop queries that perform
aggregations over windows. Note that the second method (window tvf) is
preferred and recommended over the first.
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-agg/#group-window-aggregation
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/
On Thu, Apr 7, 2022 at 3:09 AM Dian Fu <di...@gmail.com> wrote:
> You have not configured the tumbling window at all. Please refer to [1]
> for more details.
>
> Regards,
> Dian
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-agg/#group-window-aggregation
>
> On Wed, Apr 6, 2022 at 10:46 PM ivan.rosero@agilent.com <
> ivan.rosero@agilent.com> wrote:
>
>> Hello,
>>
>>
>>
>> I’m trying to understand tumbling windows at the level of the python
>> table api. For this short example:
>>
>>
>>
>> Input csv
>>
>> Print output
>>
>> 2022-01-01 10:00:23.000000000, "data line 3"
>>
>> 2022-01-01 10:00:24.000000000, "data line 4"
>>
>> 2022-01-01 10:00:18.000000000, "data line 1"
>>
>> 2022-01-01 10:00:25.000000000, "data line 5"
>>
>> 2022-01-01 10:00:26.000000000, "data line 6"
>>
>> 2022-01-01 10:00:27.000000000, "data line 7"
>>
>> 2022-01-01 10:00:22.000000000, "data line 2"
>>
>> 2022-01-01 10:00:28.000000000, "data line 8"
>>
>> 2022-01-01 10:00:29.000000000, "data line 9"
>>
>> 2022-01-01 10:00:30.000000000, "data line 10"
>>
>> +I[2022-01-01T10:00:23, "data line 3"]
>>
>> +I[2022-01-01T10:00:24, "data line 4"]
>>
>> +I[2022-01-01T10:00:18, "data line 1"]
>>
>> +I[2022-01-01T10:00:25, "data line 5"]
>>
>> +I[2022-01-01T10:00:26, "data line 6"]
>>
>> +I[2022-01-01T10:00:27, "data line 7"]
>>
>> +I[2022-01-01T10:00:28, "data line 8"]
>>
>> +I[2022-01-01T10:00:29, "data line 9"]
>>
>> *+I[2022-01-01T10:00:22, "data line 2"]*
>>
>> +I[2022-01-01T10:00:30, "data line 10"]
>>
>>
>>
>> Below, I’m trying to process this data in 5 second windows. So I would
>> at least expect not to see the bold line above, in print output.
>>
>>
>>
>> Am I not really configuring tumbling windows in the source table?
>>
>>
>>
>> from pyflink.table import EnvironmentSettings, TableEnvironment
>>
>> t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
>>
>> t_env.get_config().get_configuration().set_string("parallelism.default",
>> "1")
>>
>>
>>
>> t_env.execute_sql("""
>>
>> create table source (
>>
>> ts TIMESTAMP(3),
>>
>> data STRING,
>>
>> WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
>>
>> ) with (
>>
>> 'connector' = 'filesystem',
>>
>> 'format' = 'csv',
>>
>> 'path' = '{}'
>>
>> )
>>
>> """.format("source.csv"))
>>
>>
>>
>> t_env.execute_sql("""
>>
>> CREATE TABLE print (
>>
>> ts TIMESTAMP(3),
>>
>> data STRING
>>
>> ) WITH (
>>
>> 'connector' = 'print'
>>
>> )
>>
>> """)
>>
>>
>>
>> t_env.execute_sql("INSERT INTO print SELECT * FROM source").wait()
>>
>>
>>
>>
>>
>> Thank you,
>>
>>
>>
>> Ivan
>>
>
Re: python table api
Posted by Dian Fu <di...@gmail.com>.
You have not configured the tumbling window at all. Please refer to [1] for
more details.
Regards,
Dian
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-agg/#group-window-aggregation
On Wed, Apr 6, 2022 at 10:46 PM ivan.rosero@agilent.com <
ivan.rosero@agilent.com> wrote:
> Hello,
>
>
>
> I’m trying to understand tumbling windows at the level of the python table
> api. For this short example:
>
>
>
> Input csv
>
> Print output
>
> 2022-01-01 10:00:23.000000000, "data line 3"
>
> 2022-01-01 10:00:24.000000000, "data line 4"
>
> 2022-01-01 10:00:18.000000000, "data line 1"
>
> 2022-01-01 10:00:25.000000000, "data line 5"
>
> 2022-01-01 10:00:26.000000000, "data line 6"
>
> 2022-01-01 10:00:27.000000000, "data line 7"
>
> 2022-01-01 10:00:22.000000000, "data line 2"
>
> 2022-01-01 10:00:28.000000000, "data line 8"
>
> 2022-01-01 10:00:29.000000000, "data line 9"
>
> 2022-01-01 10:00:30.000000000, "data line 10"
>
> +I[2022-01-01T10:00:23, "data line 3"]
>
> +I[2022-01-01T10:00:24, "data line 4"]
>
> +I[2022-01-01T10:00:18, "data line 1"]
>
> +I[2022-01-01T10:00:25, "data line 5"]
>
> +I[2022-01-01T10:00:26, "data line 6"]
>
> +I[2022-01-01T10:00:27, "data line 7"]
>
> +I[2022-01-01T10:00:28, "data line 8"]
>
> +I[2022-01-01T10:00:29, "data line 9"]
>
> *+I[2022-01-01T10:00:22, "data line 2"]*
>
> +I[2022-01-01T10:00:30, "data line 10"]
>
>
>
> Below, I’m trying to process this data in 5 second windows. So I would at
> least expect not to see the bold line above, in print output.
>
>
>
> Am I not really configuring tumbling windows in the source table?
>
>
>
> from pyflink.table import EnvironmentSettings, TableEnvironment
>
> t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
>
> t_env.get_config().get_configuration().set_string("parallelism.default",
> "1")
>
>
>
> t_env.execute_sql("""
>
> create table source (
>
> ts TIMESTAMP(3),
>
> data STRING,
>
> WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
>
> ) with (
>
> 'connector' = 'filesystem',
>
> 'format' = 'csv',
>
> 'path' = '{}'
>
> )
>
> """.format("source.csv"))
>
>
>
> t_env.execute_sql("""
>
> CREATE TABLE print (
>
> ts TIMESTAMP(3),
>
> data STRING
>
> ) WITH (
>
> 'connector' = 'print'
>
> )
>
> """)
>
>
>
> t_env.execute_sql("INSERT INTO print SELECT * FROM source").wait()
>
>
>
>
>
> Thank you,
>
>
>
> Ivan
>