You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Flink CDC Issue Import (Jira)" <ji...@apache.org> on 2024/03/20 09:25:00 UTC

[jira] [Created] (FLINK-34820) [postgres] Remove unnecessary schema fresh to improve performance.

Flink CDC Issue Import created FLINK-34820:
----------------------------------------------

             Summary: [postgres] Remove unnecessary schema fresh to improve performance.
                 Key: FLINK-34820
                 URL: https://issues.apache.org/jira/browse/FLINK-34820
             Project: Flink
          Issue Type: Improvement
          Components: Flink CDC
            Reporter: Flink CDC Issue Import


### Search before asking

- [X] I searched in the [issues|https://github.com/ververica/flink-cdc-connectors/issues] and found nothing similar.


### Motivation

It's very time-consuming for postgres to refresh schema if there are many tables to read. According to our testing, refreshing 400 tables takes 15 minutes. Because it takes a long time to refresh the current table's schema when reading a chunk in the scan stage, the data rate shows the following sawtooth pattern. Therefore, we need to minimize unnecessary shema refreshes as much as possible。

<img width="437" alt="image" src="https://github.com/ververica/flink-cdc-connectors/assets/5181963/ffadeae0-5e8f-46d7-8941-3c1e3a0e7240">


### Solution

Firstly, the origin schema of postgres cdc is [the schema filed of PostgresSourceFetchTaskContext|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java#L91], which is created and refreshed when [PostgresSourceFetchTaskContext#configure|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java#L176] is called, and both [the schema refresh of scan stage|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java#L281] and [stream stage|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java#L118] are refreshing the schema of PostgresSourceFetchTaskContext. A new PostgresSourceFetchTaskContext is created in IncrementalSourceSplitReader#checkSplitOrStartNext for each split (both [SnapshotSplit|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java#L124] and [StreamSplit|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java#L134)]. For snapshot splits, even with the condition of [whether the currentFetcher is equal to null|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java#L129], in many cases it still leads to the re creation of the PostgresSourceFetchTaskContext because the IncrementalSourceSplitReader is often discarded by the Flink kernal when a snapshot chunk is finished and become idle. See
1. [SourceReaderBase#pollNext|https://github.com/apache/flink/blob/c7beda0da81ffc4bbb01befafd2eed08b7b35854/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L149] -> [SourceReaderBase#finishedOrAvailableLater|https://github.com/apache/flink/blob/c7beda0da81ffc4bbb01befafd2eed08b7b35854/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L332] -> [SplitFetcherManager#maybeShutdownFinishedFetchers|https://github.com/apache/flink/blob/c7beda0da81ffc4bbb01befafd2eed08b7b35854/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java#L233) -> SplitFetcher#shutdown and SplitFetcher is removed. 
2. IncrementalSourceSplitReader(implements SplitReader] is [SplitFetcher#splitReader|https://github.com/apache/flink/blob/c7beda0da81ffc4bbb01befafd2eed08b7b35854/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java#L57] and removed also.

Based on the analysis of the above, we get two optimizations.
1. It's enough to refresh the schema when [PostgresSourceFetchTaskContext#configure|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java#L176] is called, and there is no need to refresh the schema afterwards.
2. Reuse PostgresSourceFetchTaskContext between SnapshotSplits based on sourceConfig, as PostgresSourceFetchTaskContext is created for almost every SnapshotSplit.

### Alternatives

_No response_

### Anything else?

_No response_

### Are you willing to submit a PR?

- [X] I'm willing to submit a PR!

---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/2570
Created by: [lzshlzsh|https://github.com/lzshlzsh]
Labels: enhancement, 
Created at: Sun Oct 22 00:28:09 CST 2023
State: open




--
This message was sent by Atlassian Jira
(v8.20.10#820010)