You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Kenyore (Jira)" <ji...@apache.org> on 2021/10/25 02:15:00 UTC
[jira] [Updated] (FLINK-24626) Flink JDBC Sink may lose data in
left join
[ https://issues.apache.org/jira/browse/FLINK-24626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kenyore updated FLINK-24626:
----------------------------
Summary: Flink JDBC Sink may lose data in left join (was: Flink JDBC Sink may lose data in retract stream)
> Flink JDBC Sink may lose data in left join
> ------------------------------------------
>
> Key: FLINK-24626
> URL: https://issues.apache.org/jira/browse/FLINK-24626
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Reporter: Kenyore
> Priority: Major
>
> The JDBC sink will lose some data while using TableBufferReducedStatementExecutor.
> Here are some snippets.
> {code}
> @Override
> public void addToBatch(RowData record) throws SQLException {
> RowData key = keyExtractor.apply(record);
> if(record.getRowKind()==RowKind.DELETE) {
> //XXX cut delete off because the retract stream would generate
> return;
> }
> boolean flag = changeFlag(record.getRowKind());
> RowData value = valueTransform.apply(record); // copy or not
> reduceBuffer.put(key, Tuple2.of(flag, value));
> }
> private boolean changeFlag(RowKind rowKind) {
> switch (rowKind) {
> case INSERT:
> case UPDATE_AFTER:
> return true;
> case DELETE:
> case UPDATE_BEFORE:
> return false;
> default:
> throw new UnsupportedOperationException(
> String.format(
> "Unknown row kind, the supported row kinds is: INSERT, UPDATE_BEFORE, UPDATE_AFTER,"
> + " DELETE, but get: %s.",
> rowKind));
> }
> }
> {code}
> The code above add changeFlag to Tuple2 as the sign of upsert or delete
> {code}
> @Override
> public void executeBatch() throws SQLException {
> for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : reduceBuffer.entrySet()) {
> if (entry.getValue().f0) {
> upsertExecutor.addToBatch(entry.getValue().f1);
> } else {
> // delete by key
> deleteExecutor.addToBatch(entry.getKey());
> }
> }
> upsertExecutor.executeBatch();
> deleteExecutor.executeBatch();
> reduceBuffer.clear();
> }
> {code}
> executeBatch deletes all false flag data after true flag data.
> It means that the UPDATE_BEFORE could be execute after UPDATE_AFTER,and we would meet data lose because of this.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)