You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jonathan Weaver <my...@gmail.com> on 2022/03/01 19:13:27 UTC

Looking for advice on moving Datastream job to Table SQL

I'm doing a POC on moving an existing Datastream API job to use Table SQL
to make it more accessible for some of my teammates.

However I'm at a loss on how to handle watermarking in a similar way to how
it was handled in the Datastream API.

In the existing job a CDC stream is read, and 3 SQL tables are written out.

CDC ----> table1 \
         |---> table2  ---- pk/watermark stream -> recovery table
         |---> table3 /


The 3 tables are all written in an AsyncFunction which writes out the
table primary keys and the CDC watermark which then gets written to a
fourth table for recovery and tracking purposes. (If the job is
stop/started/crashed we are not relying on Flink state currently but on the
recovery table to restart where processing left off).

Is there a way to do something similar in the SQL API where I can store the
LEAST watermark of all 3 table writes in a 4th table?

I'm drawing at a loss on how to do it short of writing a custom sink.
(Currently using the JDBC connector sink).

var statements = tEnv.createStatementSet();
... insert table1 select ... from cdc;
... insert table2 select ... from cdc;
... insert table3 select ... from cdc;
statements.attachAsDataStream();

Or is there a way to do something similar within the Table API?  Use a
completely different approach?

The CDC watermark after inserting into the 3 tables is what I'm after. (CDC
source is custom table source).

Any ideas?

Thanks!