You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "tim yu (Jira)" <ji...@apache.org> on 2022/04/13 06:54:00 UTC

[jira] [Commented] (FLINK-27215) JDBC sink transiently deleted a record because of -u message of that record

    [ https://issues.apache.org/jira/browse/FLINK-27215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17521495#comment-17521495 ] 

tim yu commented on FLINK-27215:
--------------------------------

Hi [~jark], Should JDBC sink skip -U message in upsert mode?

> JDBC sink transiently deleted a record because of -u message of that record
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-27215
>                 URL: https://issues.apache.org/jira/browse/FLINK-27215
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / JDBC
>    Affects Versions: 1.12.0
>            Reporter: tim yu
>            Priority: Major
>
> A record is deleted transiently when using JDBC sink in upsert mode.
> The -U message is processed as delete operation in class TableBufferReducedStatementExecutor.
> The following codes show how to process -U message:
> {code:java}
>     /**
>      * Returns true if the row kind is INSERT or UPDATE_AFTER, returns false if the row kind is
>      * DELETE or UPDATE_BEFORE.
>      */
>     private boolean changeFlag(RowKind rowKind) {
>         switch (rowKind) {
>             case INSERT:
>             case UPDATE_AFTER:
>                 return true;
>             case DELETE:
>             case UPDATE_BEFORE:
>                 return false;
>             default:
>                 throw new UnsupportedOperationException(
>                         String.format(
>                                 "Unknown row kind, the supported row kinds is: INSERT, UPDATE_BEFORE, UPDATE_AFTER,"
>                                         + " DELETE, but get: %s.",
>                                 rowKind));
>         }
>     }
>     @Override
>     public void executeBatch() throws SQLException {
>         for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : reduceBuffer.entrySet()) {
>             if (entry.getValue().f0) {
>                 upsertExecutor.addToBatch(entry.getValue().f1);
>             } else {
>                 // delete by key
>                 deleteExecutor.addToBatch(entry.getKey());
>             }
>         }
>         upsertExecutor.executeBatch();
>         deleteExecutor.executeBatch();
>         reduceBuffer.clear();
>     }
> {code}
> If -U and +U messages of one record are executed separately in different JDBC batches, that record will be deleted transiently in external database and then insert a new updated record to it. In fact, this record should be merely updated once in the external database.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)