You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Marco Villalobos <mv...@kineteque.com> on 2021/09/27 05:44:15 UTC

How do I verify data is written to a JDBC sink?

In my Flink Job, I am using event time to process time-series data.

Due to our business requirements, I need to verify that a specific subset
of data written to a JDBC sink has been written before I send an activemq
message to another component.

My job flows like this:

1. Kafka Source
2. Split source message by flat map
3. Aggregate messages in a 15 minute window. (Keyed by rounding timestamp
up to nearest quarter, and name, note that there are 120,000 names.
5. Insert, forward-fill, or back-fill time-series data (keyed by name,
again there are 120,000 names).  Forward fills are done through an event
time timer. Collect range of data processed in a side output.
6. In a window function determine when time-series (rounded up to nearest
quarter) aligned to the same quarter.
7. Verify that a subset of aligned time series are already written to the
database (keyed by name, and there are 120,000 of them) and collect an
active mq message when that happens.

I could not find a good way to verify data was written to the database, so
I introduced a Keyed Process Function with a timer that creates a JDBC
connection and then polls the database to verify it has been written. If
the first attempt fails, it then uses a processing time timer to check a
minute later.  Please keep in mind that there are 120000 keys, but only
about 1000 records need this database verification.

This approach caused checkpoint times to take 2-4 hours. Previously,
checkpoint times were only a few seconds.

I am experimenting with using an RichAsyncFunction, and the R2DBC Postgres
async driver instead.  My R2DBC async code also has a 1 minute timer in it.
So far this async approach fails.

I feel as though I am at a crossroads. These are my options:

1. Continue to tune checkpoints to work with my blocking JDBC calls used in
a Keyed Process function that polls for database writes.

Or

2. Experiment more writing code that uses a rich async function that does
the verification.

3. Write the data that needs to be verified to another kafka queue and have
another Flink Job do the verification. It would only need to do that with
1000 records approximately every 15 minutes.

Does anybody else have other ideas I can use to solve this?

Re: How do I verify data is written to a JDBC sink?

Posted by Roman Khachatryan <ro...@apache.org>.
Hi,

Do I understand correctly, that long checkpointing times are caused by
slow queries to the database?
If so, async querying might resolve the issue on Flink side, but the
unnecessary load on DB will remain.

Instead, maybe you can use CDC to stream DB changes and send messages
to RabbitMQ when necessary [1][2]?

Another option is to implement a custom JDBC writing Function (using
JdbcOutputFormat) that would send a message once the relevant write
succeeds (it doesn't have to be a sink function).
This can be achieved by overriding JdbcOutputFormat.attemptFlush method [3]

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/
[2] https://github.com/ververica/flink-cdc-connectors
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java//org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.html#attemptFlush--

Regards,
Roman

On Mon, Sep 27, 2021 at 7:44 AM Marco Villalobos
<mv...@kineteque.com> wrote:
>
>
> In my Flink Job, I am using event time to process time-series data.
>
> Due to our business requirements, I need to verify that a specific subset of data written to a JDBC sink has been written before I send an activemq message to another component.
>
> My job flows like this:
>
> 1. Kafka Source
> 2. Split source message by flat map
> 3. Aggregate messages in a 15 minute window. (Keyed by rounding timestamp up to nearest quarter, and name, note that there are 120,000 names.
> 5. Insert, forward-fill, or back-fill time-series data (keyed by name, again there are 120,000 names).  Forward fills are done through an event time timer. Collect range of data processed in a side output.
> 6. In a window function determine when time-series (rounded up to nearest quarter) aligned to the same quarter.
> 7. Verify that a subset of aligned time series are already written to the database (keyed by name, and there are 120,000 of them) and collect an active mq message when that happens.
>
> I could not find a good way to verify data was written to the database, so I introduced a Keyed Process Function with a timer that creates a JDBC connection and then polls the database to verify it has been written. If the first attempt fails, it then uses a processing time timer to check a minute later.  Please keep in mind that there are 120000 keys, but only about 1000 records need this database verification.
>
> This approach caused checkpoint times to take 2-4 hours. Previously, checkpoint times were only a few seconds.
>
> I am experimenting with using an RichAsyncFunction, and the R2DBC Postgres async driver instead.  My R2DBC async code also has a 1 minute timer in it. So far this async approach fails.
>
> I feel as though I am at a crossroads. These are my options:
>
> 1. Continue to tune checkpoints to work with my blocking JDBC calls used in a Keyed Process function that polls for database writes.
>
> Or
>
> 2. Experiment more writing code that uses a rich async function that does the verification.
>
> 3. Write the data that needs to be verified to another kafka queue and have another Flink Job do the verification. It would only need to do that with 1000 records approximately every 15 minutes.
>
> Does anybody else have other ideas I can use to solve this?