You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Королькевич Михаил <mk...@yandex.ru> on 2021/12/10 10:09:59 UTC

PyFlink accumulate streaming data

Hello flink team!

How to properly accumulate streaming data into the avro file partition by the
hour.

My current implementation data from the data stream is converted to a table
and it is saved in an avro file.

Similar to this:

t_env.execute_sql("""

CREATE TABLE mySink (

id STRING,

name STRING,

data_ranges ARRAY<ROW<start BIGINT, end BIGINT>>,

meta ARRAY<ROW<name STRING, text STRING>>,

current_hour INT

) partitioned by(current_hour) WITH (

'connector' = 'filesystem',

'format' = 'avro',

'path' = '/opt/pyflink-walkthrough/output/table',

'sink.rolling-policy.rollover-interval' = '1 hour',

'partition.time-extractor.timestamp-pattern'='$current_hour',

'sink.partition-commit.delay'='1 hour',

'sink.partition-commit.trigger'='process-time',

'sink.partition-commit.policy.kind'='success-file'

)

""")

Maybe it can be done better? (I'm not sure if this works properly at all)