You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "ivan.rosero@agilent.com" <iv...@agilent.com> on 2022/04/06 14:45:07 UTC

python table api

Hello,

I'm trying to understand tumbling windows at the level of the python table api.    For this short example:

Input csv
Print output
2022-01-01 10:00:23.000000000, "data line 3"
2022-01-01 10:00:24.000000000, "data line 4"
2022-01-01 10:00:18.000000000, "data line 1"
2022-01-01 10:00:25.000000000, "data line 5"
2022-01-01 10:00:26.000000000, "data line 6"
2022-01-01 10:00:27.000000000, "data line 7"
2022-01-01 10:00:22.000000000, "data line 2"
2022-01-01 10:00:28.000000000, "data line 8"
2022-01-01 10:00:29.000000000, "data line 9"
2022-01-01 10:00:30.000000000, "data line 10"
+I[2022-01-01T10:00:23,  "data line 3"]
+I[2022-01-01T10:00:24,  "data line 4"]
+I[2022-01-01T10:00:18,  "data line 1"]
+I[2022-01-01T10:00:25,  "data line 5"]
+I[2022-01-01T10:00:26,  "data line 6"]
+I[2022-01-01T10:00:27,  "data line 7"]
+I[2022-01-01T10:00:28,  "data line 8"]
+I[2022-01-01T10:00:29,  "data line 9"]
+I[2022-01-01T10:00:22,  "data line 2"]
+I[2022-01-01T10:00:30,  "data line 10"]

Below, I'm trying to process this data in 5 second windows.  So I would at least expect not to see the bold line above, in print output.

Am I not really configuring tumbling windows in the source table?

from pyflink.table import EnvironmentSettings, TableEnvironment
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("parallelism.default", "1")

t_env.execute_sql("""
    create table source (
        ts TIMESTAMP(3),
        data STRING,
        WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '{}'
    )
""".format("source.csv"))

t_env.execute_sql("""
    CREATE TABLE print (
        ts TIMESTAMP(3),
        data STRING
    ) WITH (
        'connector' = 'print'
    )
""")

t_env.execute_sql("INSERT INTO print SELECT * FROM source").wait()


Thank you,

Ivan

RE: python table api

Posted by "ivan.rosero@agilent.com" <iv...@agilent.com>.
Hello Dian,

Indeed.  Thank you very much.  Now getting

+I[2022-01-01T10:00:20, 2022-01-01T10:00:25, 2]
+I[2022-01-01T10:00:25, 2022-01-01T10:00:30, 5]
+I[2022-01-01T10:00:30, 2022-01-01T10:00:35, 1]



from pyflink.table import EnvironmentSettings, TableEnvironment
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("parallelism.default", "1")

t_env.execute_sql("""
    create table source (
        ts TIMESTAMP(3),
        data STRING,
        WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '{}'
    )
""".format("source.csv"))

t_env.execute_sql("""
    CREATE TABLE print (
        window_start TIMESTAMP(3),
        window_end TIMESTAMP(3),
        how_any BIGINT
    ) WITH (
        'connector' = 'print'
    )
""")

t_env.execute_sql("""
INSERT INTO print (
    SELECT window_start, window_end, COUNT(*) as how_any
    FROM TABLE(
        TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '5' SECONDS))
    GROUP BY window_start, window_end
)
""").wait()


From: Dian Fu <di...@gmail.com>
Sent: Thursday, April 7, 2022 3:08 AM
To: ROSERO,IVAN (Agilent CHE) <iv...@agilent.com>
Cc: user <us...@flink.apache.org>
Subject: Re: python table api



External Sender - Use caution opening files, clicking links, or responding to requests.

You have not configured the tumbling window at all. Please refer to [1] for more details.

Regards,
Dian

[1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-agg/#group-window-aggregation<https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fnightlies.apache.org%2Fflink%2Fflink-docs-release-1.14%2Fdocs%2Fdev%2Ftable%2Fsql%2Fqueries%2Fwindow-agg%2F%23group-window-aggregation&data=04%7C01%7Civan.rosero%40agilent.com%7Cb26fbd594a37416f4ace08da183333b6%7Ca9c0bc098b46420693512ba12fb4a5c0%7C0%7C0%7C637848905883722815%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=XS0PYznJn8YcBQlyf538Jcq%2BkWlXpEkj6HEN%2B%2FsvrDU%3D&reserved=0>

On Wed, Apr 6, 2022 at 10:46 PM ivan.rosero@agilent.com<ma...@agilent.com> <iv...@agilent.com>> wrote:
Hello,

I'm trying to understand tumbling windows at the level of the python table api.    For this short example:

Input csv
Print output
2022-01-01 10:00:23.000000000, "data line 3"
2022-01-01 10:00:24.000000000, "data line 4"
2022-01-01 10:00:18.000000000, "data line 1"
2022-01-01 10:00:25.000000000, "data line 5"
2022-01-01 10:00:26.000000000, "data line 6"
2022-01-01 10:00:27.000000000, "data line 7"
2022-01-01 10:00:22.000000000, "data line 2"
2022-01-01 10:00:28.000000000, "data line 8"
2022-01-01 10:00:29.000000000, "data line 9"
2022-01-01 10:00:30.000000000, "data line 10"
+I[2022-01-01T10:00:23,  "data line 3"]
+I[2022-01-01T10:00:24,  "data line 4"]
+I[2022-01-01T10:00:18,  "data line 1"]
+I[2022-01-01T10:00:25,  "data line 5"]
+I[2022-01-01T10:00:26,  "data line 6"]
+I[2022-01-01T10:00:27,  "data line 7"]
+I[2022-01-01T10:00:28,  "data line 8"]
+I[2022-01-01T10:00:29,  "data line 9"]
+I[2022-01-01T10:00:22,  "data line 2"]
+I[2022-01-01T10:00:30,  "data line 10"]

Below, I'm trying to process this data in 5 second windows.  So I would at least expect not to see the bold line above, in print output.

Am I not really configuring tumbling windows in the source table?

from pyflink.table import EnvironmentSettings, TableEnvironment
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("parallelism.default", "1")

t_env.execute_sql("""
    create table source (
        ts TIMESTAMP(3),
        data STRING,
        WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '{}'
    )
""".format("source.csv"))

t_env.execute_sql("""
    CREATE TABLE print (
        ts TIMESTAMP(3),
        data STRING
    ) WITH (
        'connector' = 'print'
    )
""")

t_env.execute_sql("INSERT INTO print SELECT * FROM source").wait()


Thank you,

Ivan

Re: python table api

Posted by Francesco Guardiani <fr...@ververica.com>.
As Dian sad, your insert into query is just selecting records from source
to print, flowing them without any computation whatsoever.

Please check out [1] and [2] to learn how to develop queries that perform
aggregations over windows. Note that the second method (window tvf) is
preferred and recommended over the first.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-agg/#group-window-aggregation
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/

On Thu, Apr 7, 2022 at 3:09 AM Dian Fu <di...@gmail.com> wrote:

> You have not configured the tumbling window at all. Please refer to [1]
> for more details.
>
> Regards,
> Dian
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-agg/#group-window-aggregation
>
> On Wed, Apr 6, 2022 at 10:46 PM ivan.rosero@agilent.com <
> ivan.rosero@agilent.com> wrote:
>
>> Hello,
>>
>>
>>
>> I’m trying to understand tumbling windows at the level of the python
>> table api.    For this short example:
>>
>>
>>
>> Input csv
>>
>> Print output
>>
>> 2022-01-01 10:00:23.000000000, "data line 3"
>>
>> 2022-01-01 10:00:24.000000000, "data line 4"
>>
>> 2022-01-01 10:00:18.000000000, "data line 1"
>>
>> 2022-01-01 10:00:25.000000000, "data line 5"
>>
>> 2022-01-01 10:00:26.000000000, "data line 6"
>>
>> 2022-01-01 10:00:27.000000000, "data line 7"
>>
>> 2022-01-01 10:00:22.000000000, "data line 2"
>>
>> 2022-01-01 10:00:28.000000000, "data line 8"
>>
>> 2022-01-01 10:00:29.000000000, "data line 9"
>>
>> 2022-01-01 10:00:30.000000000, "data line 10"
>>
>> +I[2022-01-01T10:00:23,  "data line 3"]
>>
>> +I[2022-01-01T10:00:24,  "data line 4"]
>>
>> +I[2022-01-01T10:00:18,  "data line 1"]
>>
>> +I[2022-01-01T10:00:25,  "data line 5"]
>>
>> +I[2022-01-01T10:00:26,  "data line 6"]
>>
>> +I[2022-01-01T10:00:27,  "data line 7"]
>>
>> +I[2022-01-01T10:00:28,  "data line 8"]
>>
>> +I[2022-01-01T10:00:29,  "data line 9"]
>>
>> *+I[2022-01-01T10:00:22,  "data line 2"]*
>>
>> +I[2022-01-01T10:00:30,  "data line 10"]
>>
>>
>>
>> Below, I’m trying to process this data in 5 second windows.  So I would
>> at least expect not to see the bold line above, in print output.
>>
>>
>>
>> Am I not really configuring tumbling windows in the source table?
>>
>>
>>
>> from pyflink.table import EnvironmentSettings, TableEnvironment
>>
>> t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
>>
>> t_env.get_config().get_configuration().set_string("parallelism.default",
>> "1")
>>
>>
>>
>> t_env.execute_sql("""
>>
>>     create table source (
>>
>>         ts TIMESTAMP(3),
>>
>>         data STRING,
>>
>>         WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
>>
>>     ) with (
>>
>>         'connector' = 'filesystem',
>>
>>         'format' = 'csv',
>>
>>         'path' = '{}'
>>
>>     )
>>
>> """.format("source.csv"))
>>
>>
>>
>> t_env.execute_sql("""
>>
>>     CREATE TABLE print (
>>
>>         ts TIMESTAMP(3),
>>
>>         data STRING
>>
>>     ) WITH (
>>
>>         'connector' = 'print'
>>
>>     )
>>
>> """)
>>
>>
>>
>> t_env.execute_sql("INSERT INTO print SELECT * FROM source").wait()
>>
>>
>>
>>
>>
>> Thank you,
>>
>>
>>
>> Ivan
>>
>

Re: python table api

Posted by Dian Fu <di...@gmail.com>.
You have not configured the tumbling window at all. Please refer to [1] for
more details.

Regards,
Dian

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-agg/#group-window-aggregation

On Wed, Apr 6, 2022 at 10:46 PM ivan.rosero@agilent.com <
ivan.rosero@agilent.com> wrote:

> Hello,
>
>
>
> I’m trying to understand tumbling windows at the level of the python table
> api.    For this short example:
>
>
>
> Input csv
>
> Print output
>
> 2022-01-01 10:00:23.000000000, "data line 3"
>
> 2022-01-01 10:00:24.000000000, "data line 4"
>
> 2022-01-01 10:00:18.000000000, "data line 1"
>
> 2022-01-01 10:00:25.000000000, "data line 5"
>
> 2022-01-01 10:00:26.000000000, "data line 6"
>
> 2022-01-01 10:00:27.000000000, "data line 7"
>
> 2022-01-01 10:00:22.000000000, "data line 2"
>
> 2022-01-01 10:00:28.000000000, "data line 8"
>
> 2022-01-01 10:00:29.000000000, "data line 9"
>
> 2022-01-01 10:00:30.000000000, "data line 10"
>
> +I[2022-01-01T10:00:23,  "data line 3"]
>
> +I[2022-01-01T10:00:24,  "data line 4"]
>
> +I[2022-01-01T10:00:18,  "data line 1"]
>
> +I[2022-01-01T10:00:25,  "data line 5"]
>
> +I[2022-01-01T10:00:26,  "data line 6"]
>
> +I[2022-01-01T10:00:27,  "data line 7"]
>
> +I[2022-01-01T10:00:28,  "data line 8"]
>
> +I[2022-01-01T10:00:29,  "data line 9"]
>
> *+I[2022-01-01T10:00:22,  "data line 2"]*
>
> +I[2022-01-01T10:00:30,  "data line 10"]
>
>
>
> Below, I’m trying to process this data in 5 second windows.  So I would at
> least expect not to see the bold line above, in print output.
>
>
>
> Am I not really configuring tumbling windows in the source table?
>
>
>
> from pyflink.table import EnvironmentSettings, TableEnvironment
>
> t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
>
> t_env.get_config().get_configuration().set_string("parallelism.default",
> "1")
>
>
>
> t_env.execute_sql("""
>
>     create table source (
>
>         ts TIMESTAMP(3),
>
>         data STRING,
>
>         WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
>
>     ) with (
>
>         'connector' = 'filesystem',
>
>         'format' = 'csv',
>
>         'path' = '{}'
>
>     )
>
> """.format("source.csv"))
>
>
>
> t_env.execute_sql("""
>
>     CREATE TABLE print (
>
>         ts TIMESTAMP(3),
>
>         data STRING
>
>     ) WITH (
>
>         'connector' = 'print'
>
>     )
>
> """)
>
>
>
> t_env.execute_sql("INSERT INTO print SELECT * FROM source").wait()
>
>
>
>
>
> Thank you,
>
>
>
> Ivan
>