You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Flink CDC Issue Import (Jira)" <ji...@apache.org> on 2024/03/20 09:20:00 UTC

[jira] [Created] (FLINK-34767) flink-sql-connector-mysql-cdc cannot read incremental data in initial mode

Flink CDC Issue Import created FLINK-34767:
----------------------------------------------

             Summary: flink-sql-connector-mysql-cdc cannot read incremental data in initial mode
                 Key: FLINK-34767
                 URL: https://issues.apache.org/jira/browse/FLINK-34767
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
            Reporter: Flink CDC Issue Import


**Describe the bug(Please use English)**
A clear and concise description of what the bug is.

**Environment :**
 - Flink version :  flink-1.14.2
 - Flink CDC version: 2.3.0
 - Database and version: mysql 5.7.37

**To Reproduce**
Steps to reproduce the behavior:
1. The test data :Synchronize all tables in a database , about 7 tables;
2. The test code :
```java
public static MySqlSource<String> getMySqlSource() {
        return MySqlSource.<String>builder()
                .hostname(SOURCE_IP)
                .port(SOURCE_PORT)
                .databaseList(SOURCE_DB) // set captured database
                .tableList(SOURCE_TABLS) // set captured table
                .username(SOURCE_USER)
                .password(SOURCE_PWD)
                .serverTimeZone("Asia/Shanghai") //控制MYSQL中的时间戳类型如何转换为字符串
                .startupOptions(StartupOptions.initial())
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();
    }
```
3. The error :
There is no error message, I opened the debug log of yarn, and found that the program could not reach this step
```java
 @Override
    public void addSplits(List<MySqlSplit> splits) {
        // restore for finishedUnackedSplits
        List<MySqlSplit> unfinishedSplits = new ArrayList<>();
        for (MySqlSplit split : splits) {
            LOG.info("Add Split: " + split);
            if (split.isSnapshotSplit()) {
                MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();
                if (snapshotSplit.isSnapshotReadFinished()) {
                    finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
                } else {
                    unfinishedSplits.add(split);
                }
            } else {
                MySqlBinlogSplit binlogSplit = split.asBinlogSplit();
                // the binlog split is suspended
                if (binlogSplit.isSuspended()) {
                    suspendedBinlogSplit = binlogSplit;
                } else if (!binlogSplit.isCompletedSplit()) {
                    uncompletedBinlogSplits.put(split.splitId(), split.asBinlogSplit());
                    requestBinlogSplitMetaIfNeeded(split.asBinlogSplit());
                } else {
                    uncompletedBinlogSplits.remove(split.splitId());
                    MySqlBinlogSplit mySqlBinlogSplit =
                            discoverTableSchemasForBinlogSplit(split.asBinlogSplit());
                    unfinishedSplits.add(mySqlBinlogSplit);
                }
            }
        }
        // notify split enumerator again about the finished unacked snapshot splits
        reportFinishedSnapshotSplitsIfNeed();
        // add all un-finished splits (including binlog split) to SourceReaderBase
        if (!unfinishedSplits.isEmpty()) {
            super.addSplits(unfinishedSplits);
        }
    }
```
The log of the table DiscoveryUtils discovery table will not appear in the log like a normal program;
The default task manager memory of the cluster is 1.7GB. If I increase the memory allocated by yarn to task manager to 4GB, this problem can be solved



---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/1869
Created by: [red-cy|https://github.com/red-cy]
Labels: bug, 
Created at: Wed Jan 11 19:15:30 CST 2023
State: open




--
This message was sent by Atlassian Jira
(v8.20.10#820010)