You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "tim yu (Jira)" <ji...@apache.org> on 2022/04/13 06:54:00 UTC
[jira] [Commented] (FLINK-27215) JDBC sink transiently deleted a record because of -u message of that record
[ https://issues.apache.org/jira/browse/FLINK-27215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17521495#comment-17521495 ]
tim yu commented on FLINK-27215:
--------------------------------
Hi [~jark], Should JDBC sink skip -U message in upsert mode?
> JDBC sink transiently deleted a record because of -u message of that record
> ---------------------------------------------------------------------------
>
> Key: FLINK-27215
> URL: https://issues.apache.org/jira/browse/FLINK-27215
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / JDBC
> Affects Versions: 1.12.0
> Reporter: tim yu
> Priority: Major
>
> A record is deleted transiently when using JDBC sink in upsert mode.
> The -U message is processed as delete operation in class TableBufferReducedStatementExecutor.
> The following codes show how to process -U message:
> {code:java}
> /**
> * Returns true if the row kind is INSERT or UPDATE_AFTER, returns false if the row kind is
> * DELETE or UPDATE_BEFORE.
> */
> private boolean changeFlag(RowKind rowKind) {
> switch (rowKind) {
> case INSERT:
> case UPDATE_AFTER:
> return true;
> case DELETE:
> case UPDATE_BEFORE:
> return false;
> default:
> throw new UnsupportedOperationException(
> String.format(
> "Unknown row kind, the supported row kinds is: INSERT, UPDATE_BEFORE, UPDATE_AFTER,"
> + " DELETE, but get: %s.",
> rowKind));
> }
> }
> @Override
> public void executeBatch() throws SQLException {
> for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : reduceBuffer.entrySet()) {
> if (entry.getValue().f0) {
> upsertExecutor.addToBatch(entry.getValue().f1);
> } else {
> // delete by key
> deleteExecutor.addToBatch(entry.getKey());
> }
> }
> upsertExecutor.executeBatch();
> deleteExecutor.executeBatch();
> reduceBuffer.clear();
> }
> {code}
> If -U and +U messages of one record are executed separately in different JDBC batches, that record will be deleted transiently in external database and then insert a new updated record to it. In fact, this record should be merely updated once in the external database.
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)