You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Flink CDC Issue Import (Jira)" <ji...@apache.org> on 2024/03/20 09:28:00 UTC
[jira] [Created] (FLINK-34833) [Bug][sqlserver] SqlServer incremental source cannot support exactly-once
Flink CDC Issue Import created FLINK-34833:
----------------------------------------------
Summary: [Bug][sqlserver] SqlServer incremental source cannot support exactly-once
Key: FLINK-34833
URL: https://issues.apache.org/jira/browse/FLINK-34833
Project: Flink
Issue Type: Bug
Components: Flink CDC
Reporter: Flink CDC Issue Import
## Search before asking
- [X] I searched in the [issues|https://github.com/ververica/flink-cdc-connectors/issues] and found nothing similar.
## Flink version
1.18
## Flink CDC version
3.0
## Reason
### overview
At first, we can see what SqlServer incremental source do to guarantee exactly-once sematics in parallel read。
First, split the table into multiple chunks based on the key, each chunk as a snapshot split. These splits can be read in parallel.
In snapshot phase, for each snapshot split between [low_key, high_key]:
1. use SqlServerDialect#displayCurrentOffset to get LSN as low_watermark
2. read snapshot data between [low_key, high_key] as a temporary state `state1` by JDBC connection query.
3. use SqlServerDialect#displayCurrentOffset to get LSN as high_watermark
4. read log between (low_watermark, high_watermark), will update temporary state `state1` and generate **final state of high_watermark**, then emit to downstream.
Then in stream phase, we read log between [high_watermark, +∞)for this split between [low_key, high_key].
### problem
However, `SqlServerDialect#displayCurrentOffset → SqlServerUtils#currentLsn → SqlServerConnection#getMaxTransactionLsn` return the max LSN of system table `cdc.lsn_time_mapping` , which is not the real latest LSN of whole database system. In this incremental source framework, only the real latest LSN of whole database system can guarantee exactly-once sematics.
As shown in https://stackoverflow.com/questions/29477391/cdc-data-only-shows-up-after-5-minutes, this developer find that [CDC Data Only Shows up After 5 Minutes|https://stackoverflow.com/questions/29477391/cdc-data-only-shows-up-after-5-minutes) , and the reason is :
> Because the capture process extracts change data from the transaction log, there is a built in latency between the time that a change is committed to a source table and the time that the change appears within its associated change table.
For example, low_mark and high_watermark maybe 5 minutes lower than the Lsn of step two(read snapshot data). Then in streaming phase, the log which should be ignored will be read again.
## How to verify
Modify test: com.ververica.cdc.connectors.oracle.source.OracleSourceITCase#testEnableBackfillWithDMLPostLowWaterMark, we can see three dml operations are read twicely.
```java
@Test
public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
List<String> records = testBackfillWhenWritingEvents(false, 25, USE_POST_LOWWATERMARK_HOOK];
List<String> expectedRecords =
Arrays.asList(
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[2000, user_21, Pittsburgh, 123567891234]",
"+I[15213, user_15213, Shanghai, 123567891234]",
// the operations already be applied in snapshot phase, but are read again in streaming phase
"+I[15213, user_15213, Shanghai, 123567891234]",
"-U[2000, user_21, Shanghai, 123567891234]",
"+U[2000, user_21, Pittsburgh, 123567891234]",
"-D[1019, user_20, Shanghai, 123567891234]");
assertEqualsInAnyOrder(expectedRecords, records);
}
private List<String> testBackfillWhenWritingEvents(
boolean skipSnapshotBackfill, int fetchSize, int hookType) throws Exception {
createAndInitialize("customer.sql");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(200L);
env.setParallelism(1);
ResolvedSchema customersSchema =
new ResolvedSchema(
Arrays.asList(
physical("ID", BIGINT().notNull()),
physical("NAME", STRING()),
physical("ADDRESS", STRING()),
physical("PHONE_NUMBER", STRING())),
new ArrayList<>(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("ID")));
TestTable customerTable =
new TestTable(ORACLE_DATABASE, ORACLE_SCHEMA, "CUSTOMERS", customersSchema);
String tableId = customerTable.getTableId();
OracleSourceBuilder.OracleIncrementalSource source =
OracleSourceBuilder.OracleIncrementalSource.<RowData>builder()
.hostname(ORACLE_CONTAINER.getHost())
.port(ORACLE_CONTAINER.getOraclePort())
.username(CONNECTOR_USER)
.password(CONNECTOR_PWD)
.databaseList(ORACLE_DATABASE)
.schemaList(ORACLE_SCHEMA)
.tableList("DEBEZIUM.CUSTOMERS")
.skipSnapshotBackfill(skipSnapshotBackfill)
.startupOptions(StartupOptions.initial())
.deserializer(customerTable.getDeserializer())
.build();
// Do some database operations during hook in snapshot period.
SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
String[] statements =
new String[] {
String.format(
"INSERT INTO %s VALUES (15213, 'user_15213', 'Shanghai', '123567891234')",
tableId),
String.format("UPDATE %s SET address='Pittsburgh' WHERE id=2000", tableId),
String.format("DELETE FROM %s WHERE id=1019", tableId)
};
SnapshotPhaseHook snapshotPhaseHook =
(sourceConfig, split) -> {
// database update operations use TEST_USER rather than CONNECTOR_USER
JdbcConfiguration configuration =
JdbcConfiguration.copy(
((JdbcSourceConfig) sourceConfig)
.getDbzConnectorConfig()
.getJdbcConfig())
.withUser("debezium")
.withPassword("dbz")
.build();
try (OracleConnection oracleConnection =
OracleConnectionUtils.createOracleConnection(configuration)) {
oracleConnection.setAutoCommit(false);
oracleConnection.execute(statements);
oracleConnection.commit();
}
};
if (hookType == USE_POST_LOWWATERMARK_HOOK) {
hooks.setPostLowWatermarkAction(snapshotPhaseHook);
} else if (hookType == USE_PRE_HIGHWATERMARK_HOOK) {
hooks.setPreHighWatermarkAction(snapshotPhaseHook);
}
source.setSnapshotHooks(hooks);
List<String> records = new ArrayList<>();
try (CloseableIterator<RowData> iterator =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source")
.executeAndCollect()) {
records = fetchRowData(iterator, fetchSize, customerTable::stringify);
env.close();
}
return records;
}
```
### Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!
---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/2853
Created by: [loserwang1024|https://github.com/loserwang1024]
Labels: bug,
Assignee: [loserwang1024|https://github.com/loserwang1024]
Created at: Tue Dec 12 10:44:24 CST 2023
State: open
--
This message was sent by Atlassian Jira
(v8.20.10#820010)