You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dhavan Vaidya <dh...@kofluence.com> on 2022/08/25 12:14:39 UTC

When does JDBC upsert execute DELETE?

Hello,

I have postgres as source and mysql as sink. The user authenticating with
mysql does _not_ have DELETE privileges.

In some cases, flink throws error because it is trying to _delete_ records
and the user does not have privilege. In most cases (of the same job),
upsert is working as expected and no exception is thrown.

My question is, when does flink execute DELETE statements if it is running
in upsert mode?

I have found
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java#L98
that might be useful, but I don't understand the logic here (I don't know
Java anyway).

Can someone help me understand when the delete statements will be executed?

Thanks!

Re:When does JDBC upsert execute DELETE?

Posted by Xuyang <xy...@163.com>.
Hi, maybe you can see here[1]. 
When a record is written into the jdbc sink, it will first be collected with previous records as a batch. If the size of the input records exceeds the limit, the code you mentioned will work. It first emits the upsert records(insert or update_after), and then emit deleted records.


[1] https://github.com/apache/flink/blob/fe392645421d10923c75cd5438b91d9ed55900d3/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java#L195




--

    Best!
    Xuyang




At 2022-08-25 20:14:39, "Dhavan Vaidya" <dh...@kofluence.com> wrote:

Hello,


I have postgres as source and mysql as sink. The user authenticating with mysql does _not_ have DELETE privileges.


In some cases, flink throws error because it is trying to _delete_ records and the user does not have privilege. In most cases (of the same job), upsert is working as expected and no exception is thrown.



My question is, when does flink execute DELETE statements if it is running in upsert mode?


I have found https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java#L98 that might be useful, but I don't understand the logic here (I don't know Java anyway).


Can someone help me understand when the delete statements will be executed?


Thanks!