You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Flink CDC Issue Import (Jira)" <ji...@apache.org> on 2024/03/20 09:20:00 UTC
[jira] [Created] (FLINK-34767) flink-sql-connector-mysql-cdc cannot read incremental data in initial mode
Flink CDC Issue Import created FLINK-34767:
----------------------------------------------
Summary: flink-sql-connector-mysql-cdc cannot read incremental data in initial mode
Key: FLINK-34767
URL: https://issues.apache.org/jira/browse/FLINK-34767
Project: Flink
Issue Type: Bug
Components: Flink CDC
Reporter: Flink CDC Issue Import
**Describe the bug(Please use English)**
A clear and concise description of what the bug is.
**Environment :**
- Flink version : flink-1.14.2
- Flink CDC version: 2.3.0
- Database and version: mysql 5.7.37
**To Reproduce**
Steps to reproduce the behavior:
1. The test data :Synchronize all tables in a database , about 7 tables;
2. The test code :
```java
public static MySqlSource<String> getMySqlSource() {
return MySqlSource.<String>builder()
.hostname(SOURCE_IP)
.port(SOURCE_PORT)
.databaseList(SOURCE_DB) // set captured database
.tableList(SOURCE_TABLS) // set captured table
.username(SOURCE_USER)
.password(SOURCE_PWD)
.serverTimeZone("Asia/Shanghai") //控制MYSQL中的时间戳类型如何转换为字符串
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
}
```
3. The error :
There is no error message, I opened the debug log of yarn, and found that the program could not reach this step
```java
@Override
public void addSplits(List<MySqlSplit> splits) {
// restore for finishedUnackedSplits
List<MySqlSplit> unfinishedSplits = new ArrayList<>();
for (MySqlSplit split : splits) {
LOG.info("Add Split: " + split);
if (split.isSnapshotSplit()) {
MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();
if (snapshotSplit.isSnapshotReadFinished()) {
finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
} else {
unfinishedSplits.add(split);
}
} else {
MySqlBinlogSplit binlogSplit = split.asBinlogSplit();
// the binlog split is suspended
if (binlogSplit.isSuspended()) {
suspendedBinlogSplit = binlogSplit;
} else if (!binlogSplit.isCompletedSplit()) {
uncompletedBinlogSplits.put(split.splitId(), split.asBinlogSplit());
requestBinlogSplitMetaIfNeeded(split.asBinlogSplit());
} else {
uncompletedBinlogSplits.remove(split.splitId());
MySqlBinlogSplit mySqlBinlogSplit =
discoverTableSchemasForBinlogSplit(split.asBinlogSplit());
unfinishedSplits.add(mySqlBinlogSplit);
}
}
}
// notify split enumerator again about the finished unacked snapshot splits
reportFinishedSnapshotSplitsIfNeed();
// add all un-finished splits (including binlog split) to SourceReaderBase
if (!unfinishedSplits.isEmpty()) {
super.addSplits(unfinishedSplits);
}
}
```
The log of the table DiscoveryUtils discovery table will not appear in the log like a normal program;
The default task manager memory of the cluster is 1.7GB. If I increase the memory allocated by yarn to task manager to 4GB, this problem can be solved
---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/1869
Created by: [red-cy|https://github.com/red-cy]
Labels: bug,
Created at: Wed Jan 11 19:15:30 CST 2023
State: open
--
This message was sent by Atlassian Jira
(v8.20.10#820010)