You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "tim yu (Jira)" <ji...@apache.org> on 2022/04/14 03:42:00 UTC
[jira] [Updated] (FLINK-27215) JDBC sink transiently deleted a record because of -u message of that record
[ https://issues.apache.org/jira/browse/FLINK-27215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
tim yu updated FLINK-27215:
---------------------------
Affects Version/s: 1.14.3
1.13.5
1.12.7
(was: 1.12.0)
Issue Type: Bug (was: Improvement)
> JDBC sink transiently deleted a record because of -u message of that record
> ---------------------------------------------------------------------------
>
> Key: FLINK-27215
> URL: https://issues.apache.org/jira/browse/FLINK-27215
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Affects Versions: 1.12.7, 1.13.5, 1.14.3
> Reporter: tim yu
> Priority: Major
>
> A record is deleted transiently when using JDBC sink in upsert mode.
> The -U message is processed as delete operation in class TableBufferReducedStatementExecutor.
> The following codes show how to process -U message:
> {code:java}
> /**
> * Returns true if the row kind is INSERT or UPDATE_AFTER, returns false if the row kind is
> * DELETE or UPDATE_BEFORE.
> */
> private boolean changeFlag(RowKind rowKind) {
> switch (rowKind) {
> case INSERT:
> case UPDATE_AFTER:
> return true;
> case DELETE:
> case UPDATE_BEFORE:
> return false;
> default:
> throw new UnsupportedOperationException(
> String.format(
> "Unknown row kind, the supported row kinds is: INSERT, UPDATE_BEFORE, UPDATE_AFTER,"
> + " DELETE, but get: %s.",
> rowKind));
> }
> }
> @Override
> public void executeBatch() throws SQLException {
> for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : reduceBuffer.entrySet()) {
> if (entry.getValue().f0) {
> upsertExecutor.addToBatch(entry.getValue().f1);
> } else {
> // delete by key
> deleteExecutor.addToBatch(entry.getKey());
> }
> }
> upsertExecutor.executeBatch();
> deleteExecutor.executeBatch();
> reduceBuffer.clear();
> }
> {code}
> If -U and +U messages of one record are executed separately in different JDBC batches, that record will be deleted transiently in external database and then insert a new updated record to it. In fact, this record should be merely updated once in the external database.
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)