You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Flink CDC Issue Import (Jira)" <ji...@apache.org> on 2024/03/20 09:28:00 UTC

[jira] [Created] (FLINK-34833) [Bug][sqlserver] SqlServer incremental source cannot support exactly-once

Flink CDC Issue Import created FLINK-34833:
----------------------------------------------

             Summary: [Bug][sqlserver] SqlServer incremental source cannot support exactly-once 
                 Key: FLINK-34833
                 URL: https://issues.apache.org/jira/browse/FLINK-34833
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
            Reporter: Flink CDC Issue Import


## Search before asking

- [X] I searched in the [issues|https://github.com/ververica/flink-cdc-connectors/issues] and found nothing similar.


## Flink version

1.18

## Flink CDC version

3.0


## Reason
### overview 
At first,  we can see what SqlServer incremental source do to guarantee exactly-once sematics in parallel read。
First, split the table into multiple chunks based on the key, each chunk as a snapshot split. These  splits can be read in parallel.

In snapshot phase, for each snapshot split between [low_key, high_key]: 
1. use SqlServerDialect#displayCurrentOffset  to get LSN as low_watermark
2. read snapshot data between [low_key, high_key] as a temporary state `state1` by JDBC connection query.
3. use SqlServerDialect#displayCurrentOffset  to get LSN as high_watermark
4. read log between (low_watermark, high_watermark), will update temporary state `state1` and generate  **final state of  high_watermark**, then emit to downstream.

Then in stream phase, we read log between [high_watermark, +∞)for this split between [low_key, high_key].

### problem
However,  `SqlServerDialect#displayCurrentOffset → SqlServerUtils#currentLsn → SqlServerConnection#getMaxTransactionLsn` return the max LSN of system table `cdc.lsn_time_mapping` , which is not the real latest LSN of whole database system. In this incremental source framework, only the real latest LSN of whole database system can guarantee exactly-once sematics.

As shown in https://stackoverflow.com/questions/29477391/cdc-data-only-shows-up-after-5-minutes,  this developer find that [CDC Data Only Shows up After 5 Minutes|https://stackoverflow.com/questions/29477391/cdc-data-only-shows-up-after-5-minutes) , and the reason is :
> Because the capture process extracts change data from the transaction log, there is a built in latency between the time that a change is committed to a source table and the time that the change appears within its associated change table. 

For example, low_mark and high_watermark maybe  5 minutes lower than the Lsn of step two(read snapshot data). Then in streaming phase, the log which should be ignored will be read again.



## How to verify
Modify test: com.ververica.cdc.connectors.oracle.source.OracleSourceITCase#testEnableBackfillWithDMLPostLowWaterMark, we can see three dml operations are read twicely.
```java
  @Test
    public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {

        List<String> records = testBackfillWhenWritingEvents(false, 25, USE_POST_LOWWATERMARK_HOOK];

        List<String> expectedRecords =
                Arrays.asList(
                        "+I[101, user_1, Shanghai, 123567891234]",
                        "+I[102, user_2, Shanghai, 123567891234]",
                        "+I[103, user_3, Shanghai, 123567891234]",
                        "+I[109, user_4, Shanghai, 123567891234]",
                        "+I[110, user_5, Shanghai, 123567891234]",
                        "+I[111, user_6, Shanghai, 123567891234]",
                        "+I[118, user_7, Shanghai, 123567891234]",
                        "+I[121, user_8, Shanghai, 123567891234]",
                        "+I[123, user_9, Shanghai, 123567891234]",
                        "+I[1009, user_10, Shanghai, 123567891234]",
                        "+I[1010, user_11, Shanghai, 123567891234]",
                        "+I[1011, user_12, Shanghai, 123567891234]",
                        "+I[1012, user_13, Shanghai, 123567891234]",
                        "+I[1013, user_14, Shanghai, 123567891234]",
                        "+I[1014, user_15, Shanghai, 123567891234]",
                        "+I[1015, user_16, Shanghai, 123567891234]",
                        "+I[1016, user_17, Shanghai, 123567891234]",
                        "+I[1017, user_18, Shanghai, 123567891234]",
                        "+I[1018, user_19, Shanghai, 123567891234]",
                        "+I[2000, user_21, Pittsburgh, 123567891234]",
                        "+I[15213, user_15213, Shanghai, 123567891234]",
                        // the operations  already be applied in snapshot phase, but are read again in streaming phase
                        "+I[15213, user_15213, Shanghai, 123567891234]",
                        "-U[2000, user_21, Shanghai, 123567891234]",
                        "+U[2000, user_21, Pittsburgh, 123567891234]",
                        "-D[1019, user_20, Shanghai, 123567891234]");
        assertEqualsInAnyOrder(expectedRecords, records);
    }
 private List<String> testBackfillWhenWritingEvents(
            boolean skipSnapshotBackfill, int fetchSize, int hookType) throws Exception {
        createAndInitialize("customer.sql");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(200L);
        env.setParallelism(1);

        ResolvedSchema customersSchema =
                new ResolvedSchema(
                        Arrays.asList(
                                physical("ID", BIGINT().notNull()),
                                physical("NAME", STRING()),
                                physical("ADDRESS", STRING()),
                                physical("PHONE_NUMBER", STRING())),
                        new ArrayList<>(),
                        UniqueConstraint.primaryKey("pk", Collections.singletonList("ID")));
        TestTable customerTable =
                new TestTable(ORACLE_DATABASE, ORACLE_SCHEMA, "CUSTOMERS", customersSchema);
        String tableId = customerTable.getTableId();

        OracleSourceBuilder.OracleIncrementalSource source =
                OracleSourceBuilder.OracleIncrementalSource.<RowData>builder()
                        .hostname(ORACLE_CONTAINER.getHost())
                        .port(ORACLE_CONTAINER.getOraclePort())
                        .username(CONNECTOR_USER)
                        .password(CONNECTOR_PWD)
                        .databaseList(ORACLE_DATABASE)
                        .schemaList(ORACLE_SCHEMA)
                        .tableList("DEBEZIUM.CUSTOMERS")
                        .skipSnapshotBackfill(skipSnapshotBackfill)
                        .startupOptions(StartupOptions.initial())
                        .deserializer(customerTable.getDeserializer())
                        .build();

        // Do some database operations during hook in snapshot period.
        SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
        String[] statements =
                new String[] {
                    String.format(
                            "INSERT INTO %s VALUES (15213, 'user_15213', 'Shanghai', '123567891234')",
                            tableId),
                    String.format("UPDATE %s SET address='Pittsburgh' WHERE id=2000", tableId),
                    String.format("DELETE FROM %s WHERE id=1019", tableId)
                };
        SnapshotPhaseHook snapshotPhaseHook =
                (sourceConfig, split) -> {
                    // database update operations use TEST_USER rather than CONNECTOR_USER
                    JdbcConfiguration configuration =
                            JdbcConfiguration.copy(
                                            ((JdbcSourceConfig) sourceConfig)
                                                    .getDbzConnectorConfig()
                                                    .getJdbcConfig())
                                    .withUser("debezium")
                                    .withPassword("dbz")
                                    .build();
                    try (OracleConnection oracleConnection =
                            OracleConnectionUtils.createOracleConnection(configuration)) {
                        oracleConnection.setAutoCommit(false);
                        oracleConnection.execute(statements);
                        oracleConnection.commit();
                    }
                };

        if (hookType == USE_POST_LOWWATERMARK_HOOK) {
            hooks.setPostLowWatermarkAction(snapshotPhaseHook);
        } else if (hookType == USE_PRE_HIGHWATERMARK_HOOK) {
            hooks.setPreHighWatermarkAction(snapshotPhaseHook);
        }
        source.setSnapshotHooks(hooks);

        List<String> records = new ArrayList<>();
        try (CloseableIterator<RowData> iterator =
                env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source")
                        .executeAndCollect()) {
            records = fetchRowData(iterator, fetchSize, customerTable::stringify);
            env.close();
        }
        return records;
    }
```


### Are you willing to submit a PR?

- [ ] I'm willing to submit a PR!

---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/2853
Created by: [loserwang1024|https://github.com/loserwang1024]
Labels: bug, 
Assignee: [loserwang1024|https://github.com/loserwang1024]
Created at: Tue Dec 12 10:44:24 CST 2023
State: open




--
This message was sent by Atlassian Jira
(v8.20.10#820010)