You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Kenyore (Jira)" <ji...@apache.org> on 2021/10/25 02:15:00 UTC

[jira] [Updated] (FLINK-24626) Flink JDBC Sink may lose data in left join

     [ https://issues.apache.org/jira/browse/FLINK-24626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kenyore updated FLINK-24626:
----------------------------
    Summary: Flink JDBC Sink may lose data in left join  (was: Flink JDBC Sink may lose data in retract stream)

> Flink JDBC Sink may lose data in left join
> ------------------------------------------
>
>                 Key: FLINK-24626
>                 URL: https://issues.apache.org/jira/browse/FLINK-24626
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>            Reporter: Kenyore
>            Priority: Major
>
> The JDBC sink will lose some data while using TableBufferReducedStatementExecutor.
> Here are some snippets.
> {code}
>     @Override
>     public void addToBatch(RowData record) throws SQLException {
>         RowData key = keyExtractor.apply(record);
>         if(record.getRowKind()==RowKind.DELETE) {
>             //XXX cut delete off because the retract stream would generate
>             return;
>         }
>         boolean flag = changeFlag(record.getRowKind());
>         RowData value = valueTransform.apply(record); // copy or not
>         reduceBuffer.put(key, Tuple2.of(flag, value));
>     }
>     private boolean changeFlag(RowKind rowKind) {
>         switch (rowKind) {
>             case INSERT:
>             case UPDATE_AFTER:
>                 return true;
>             case DELETE:
>             case UPDATE_BEFORE:
>                 return false;
>             default:
>                 throw new UnsupportedOperationException(
>                         String.format(
>                                 "Unknown row kind, the supported row kinds is: INSERT, UPDATE_BEFORE, UPDATE_AFTER,"
>                                         + " DELETE, but get: %s.",
>                                 rowKind));
>         }
>     }
> {code}
> The code above add changeFlag to Tuple2 as the sign of upsert or delete
> {code}
>     @Override
>     public void executeBatch() throws SQLException {
>         for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : reduceBuffer.entrySet()) {
>             if (entry.getValue().f0) {
>                 upsertExecutor.addToBatch(entry.getValue().f1);
>             } else {
>                 // delete by key
>                 deleteExecutor.addToBatch(entry.getKey());
>             }
>         }
>         upsertExecutor.executeBatch();
>         deleteExecutor.executeBatch();
>         reduceBuffer.clear();
>     }
> {code}
> executeBatch deletes all false flag data after true flag data.
> It means that the UPDATE_BEFORE could be execute after UPDATE_AFTER,and we would meet data lose because of this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)