You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "tim yu (Jira)" <ji...@apache.org> on 2022/04/13 06:43:00 UTC

[jira] [Created] (FLINK-27215) JDBC sink transiently deleted a record because of -u message of that record

tim yu created FLINK-27215:
------------------------------

             Summary: JDBC sink transiently deleted a record because of -u message of that record
                 Key: FLINK-27215
                 URL: https://issues.apache.org/jira/browse/FLINK-27215
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / JDBC
    Affects Versions: 1.12.0
            Reporter: tim yu


A record is deleted transiently when using JDBC sink in upsert mode.

The -U message is processed as delete operation in class TableBufferReducedStatementExecutor.
The following codes show how to process -U message:
{code:java}
    /**
     * Returns true if the row kind is INSERT or UPDATE_AFTER, returns false if the row kind is
     * DELETE or UPDATE_BEFORE.
     */
    private boolean changeFlag(RowKind rowKind) {
        switch (rowKind) {
            case INSERT:
            case UPDATE_AFTER:
                return true;
            case DELETE:
            case UPDATE_BEFORE:
                return false;
            default:
                throw new UnsupportedOperationException(
                        String.format(
                                "Unknown row kind, the supported row kinds is: INSERT, UPDATE_BEFORE, UPDATE_AFTER,"
                                        + " DELETE, but get: %s.",
                                rowKind));
        }
    }

    @Override
    public void executeBatch() throws SQLException {
        for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : reduceBuffer.entrySet()) {
            if (entry.getValue().f0) {
                upsertExecutor.addToBatch(entry.getValue().f1);
            } else {
                // delete by key
                deleteExecutor.addToBatch(entry.getKey());
            }
        }
        upsertExecutor.executeBatch();
        deleteExecutor.executeBatch();
        reduceBuffer.clear();
    }
{code}
If -U and +U messages of one record are executed separately in different JDBC batches, that record will be deleted transiently in external database and then insert a new updated record to it. In fact, this record should be merely updated once in the external database.


 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)