You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Taras Moisiuk <t....@gmail.com> on 2020/12/22 18:45:34 UTC

Flink SQL continuous join checkpointing

Hi everyone!
I'm using Flink *1.12.0* with SQL API. 
My streaming job is basically a join of two dynamic tables (from kafka
topics) and insertion the result into PostgreSQL table. 

I have enabled watermarking based on kafka topic timestamp column for each
table in join:

CREATE TABLE table1 (
....,

kafka_time TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR kafka_time AS kafka_time - INTERVAL '2' HOUR
)
WITH ('format' = 'json',
...

But checkpoint data size for join task is permanently increasing despite the
watermarks on the tables and "Low watermark" mark in UI. 

As far as I understand outdated records from both tables must be dropped
from checkpoint after 2 hours, but looks like it holds all job state since
the beginning.

Should I enable this behavior for outdated records explicitly of modify join
query? 
Thank you!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink SQL continuous join checkpointing

Posted by Taras Moisiuk <t....@gmail.com>.
Hi Leonard,

Thank you for answer, in fact I used regular join because my interval
condition was based on wrong column. 

I extended my join with attribute column condition and it solved the
problem:
 ...
         FROM table_fx fx
         LEFT JOIN table_v v ON v.active = fx.instrument_active_id
               AND v.kafka_time BETWEEN fx.kafka_time - INTERVAL '10' MINUTE
AND fx.kafka_time
...






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink SQL continuous join checkpointing

Posted by Leonard Xu <xb...@gmail.com>.
Hi, Taras

> But checkpoint data size for join task is permanently increasing despite the watermarks on the tables and "Low watermark" mark in UI. 
> As far as I understand outdated records from both tables must be dropped from checkpoint after 2 hours, but looks like it holds all job state since the beginning.

What kind of join do you use?

If you’re using interval join [1], the outdated data in join operator state will be cleaned after interval join time period + watermark interval,and your understanding is wright in this case.

If you’re using regular join, Flink regular join will keep all data in state to ensure join semantics is same with classical DB which means the operator state will continuous to increase,  you can set the option table.exec.state.ttl [2] to clean the outdated data in state according your business. 


Best,
Leonard
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#interval-joins <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#interval-joins>
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/config.html#table-exec-state-ttl <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/config.html#table-exec-state-ttl>