You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Hongshun Wang (Jira)" <ji...@apache.org> on 2024/03/11 02:28:00 UTC

[jira] [Commented] (FLINK-34634) Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed

    [ https://issues.apache.org/jira/browse/FLINK-34634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825132#comment-17825132 ] 

Hongshun Wang commented on FLINK-34634:
---------------------------------------

[~ruanhang1993] , [~renqs] , [~Leonard] , CC

> Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed
> -------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-34634
>                 URL: https://issues.apache.org/jira/browse/FLINK-34634
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>            Reporter: Hongshun Wang
>            Priority: Major
>             Fix For: cdc-3.1.0
>
>         Attachments: image-2024-03-09-15-25-26-187.png, image-2024-03-09-15-27-46-073.png
>
>
> h3. What's the problem
> Once, I removed a table from the option and then restarted the job from the savepoint, but the job couldn't read the binlog anymore. When I checked the logs, I found an Error level log stating:
> ' The enumerator received invalid request meta group id 6, the valid meta group id range is [0, 4].'
> It appears that the Reader is requesting more splits than the Enumerator is aware of.
> However, the code should indeed remove redundant split information from the Reader as seen in [https://github.com/ververica/flink-cdc-connectors/pull/2292]. So why does this issue occur?
>  
> h3. why occurs
> !image-2024-03-09-15-25-26-187.png|width=751,height=329!
> Upon examining the code, I discovered the cause. If the job stops before completing all the split meta information and then restarts, this issue occurs. Suppose that the totalFinishedSplitSize of binlogSplit in the Reader is 6, and no meta information has been synchronized, leaving the finishedSnapshotSplitInfos of binlogSplit in the Reader empty. After restarting, the totalFinishedSplitSize of binlogSplit in the Reader equals (6 - (0 - 0)) which is still 6, but in the Enumerator, it is only 4(the removed table have two split). This could lead to an out-of-range request.
> !image-2024-03-09-15-27-46-073.png|width=755,height=305!
> h3. How to reproduce
>  * Add Thread.sleep(1000L) in com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#handleSourceEvents to postpone split meta infos synchronization.
> {code:java}
> public void handleSourceEvents(SourceEvent sourceEvent) {
> else if (sourceEvent instanceof BinlogSplitMetaEvent) {
>     LOG.debug(
>             "Source reader {} receives binlog meta with group id {}.",
>             subtaskId,
>             ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId());
>     try {
>         Thread.sleep(1000L);
>     } catch (InterruptedException e) {
>         throw new RuntimeException(e);
>     }
>     fillMetadataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);
> } {code}
>  * Add Thread.sleep(500L) in com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testRemoveTablesOneByOne to trigger savepoint before meta infos synchronization finishes.
>  
> {code:java}
> // step 2: execute insert and trigger savepoint with all tables added
> {
>     // ..ingore 
>     waitForSinkSize("sink", fetchedDataList.size());
>     Thread.sleep(500L);
>     assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
>     finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
>     jobClient.cancel().get();
> }
> // test removing table one by one, note that there should be at least one table remaining
> for (int round = 0; round < captureAddressTables.length - 1; round++) {
> ...
> }
> {code}
>  
>  * Add chunk-meta.group.size  =2 in com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#getCreateTableStatement
> Then, run test(com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testJobManagerFailoverForRemoveTable), the error log will occur.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)