You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "tim yu (Jira)" <ji...@apache.org> on 2022/04/13 06:43:00 UTC
[jira] [Created] (FLINK-27215) JDBC sink transiently deleted a record because of -u message of that record
tim yu created FLINK-27215:
------------------------------
Summary: JDBC sink transiently deleted a record because of -u message of that record
Key: FLINK-27215
URL: https://issues.apache.org/jira/browse/FLINK-27215
Project: Flink
Issue Type: Improvement
Components: Connectors / JDBC
Affects Versions: 1.12.0
Reporter: tim yu
A record is deleted transiently when using JDBC sink in upsert mode.
The -U message is processed as delete operation in class TableBufferReducedStatementExecutor.
The following codes show how to process -U message:
{code:java}
/**
* Returns true if the row kind is INSERT or UPDATE_AFTER, returns false if the row kind is
* DELETE or UPDATE_BEFORE.
*/
private boolean changeFlag(RowKind rowKind) {
switch (rowKind) {
case INSERT:
case UPDATE_AFTER:
return true;
case DELETE:
case UPDATE_BEFORE:
return false;
default:
throw new UnsupportedOperationException(
String.format(
"Unknown row kind, the supported row kinds is: INSERT, UPDATE_BEFORE, UPDATE_AFTER,"
+ " DELETE, but get: %s.",
rowKind));
}
}
@Override
public void executeBatch() throws SQLException {
for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : reduceBuffer.entrySet()) {
if (entry.getValue().f0) {
upsertExecutor.addToBatch(entry.getValue().f1);
} else {
// delete by key
deleteExecutor.addToBatch(entry.getKey());
}
}
upsertExecutor.executeBatch();
deleteExecutor.executeBatch();
reduceBuffer.clear();
}
{code}
If -U and +U messages of one record are executed separately in different JDBC batches, that record will be deleted transiently in external database and then insert a new updated record to it. In fact, this record should be merely updated once in the external database.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)