You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2023/06/30 02:17:04 UTC
[inlong] branch master updated: [INLONG-8363][Sort] MySQL connector supports to discard tables without capturing in snapshot phase #8364
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e47a7bb840 [INLONG-8363][Sort] MySQL connector supports to discard tables without capturing in snapshot phase #8364
e47a7bb840 is described below
commit e47a7bb840a9565aea6b448f8c43f048a0ff91f2
Author: Liao Rui <li...@users.noreply.github.com>
AuthorDate: Fri Jun 30 10:16:57 2023 +0800
[INLONG-8363][Sort] MySQL connector supports to discard tables without capturing in snapshot phase #8364
---
.../mysql/debezium/reader/SnapshotSplitReader.java | 8 ++++++
.../debezium/task/MySqlSnapshotSplitReadTask.java | 13 +++++++--
.../inlong/sort/cdc/mysql/source/MySqlSource.java | 31 ++++++++++++++++++++++
3 files changed, 50 insertions(+), 2 deletions(-)
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
index c1200deac1..6c424f2ec9 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
@@ -36,6 +36,7 @@ import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.spi.SnapshotResult;
+import io.debezium.pipeline.spi.SnapshotResult.SnapshotResultStatus;
import io.debezium.util.SchemaNameAdjuster;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.util.FlinkRuntimeException;
@@ -126,6 +127,13 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
return;
}
+ if (SnapshotResultStatus.SKIPPED == snapshotResult.getStatus()) {
+ LOG.info("Skip binlog split: {}", backfillBinlogSplit);
+ currentTaskRunning = false;
+ hasNextElement.set(false);
+ return;
+ }
+
// execute binlog read task
if (snapshotResult.isCompletedOrSkipped()) {
final MySqlBinlogSplitReadTask backfillBinlogReadTask =
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
index 446967def4..37c43a9281 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
@@ -160,6 +160,11 @@ public class MySqlSnapshotSplitReadTask extends AbstractSnapshotChangeEventSourc
((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
.setHighWatermark(highWatermark);
+ Table table = databaseSchema.tableFor(snapshotSplit.getTableId());
+ if (table == null) {
+ return SnapshotResult.skipped(ctx.offset);
+ }
+
return SnapshotResult.completed(ctx.offset);
}
@@ -205,8 +210,12 @@ public class MySqlSnapshotSplitReadTask extends AbstractSnapshotChangeEventSourc
EventDispatcher.SnapshotReceiver snapshotReceiver =
dispatcher.getSnapshotChangeEventReceiver();
LOG.debug("Snapshotting table {}", tableId);
- createDataEventsForTable(
- snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId));
+ Table table = databaseSchema.tableFor(tableId);
+ if (table != null) {
+ createDataEventsForTable(snapshotContext, snapshotReceiver, table);
+ } else {
+ LOG.warn("Debezium doesn't capture {}, ignore this table", snapshotSplit.getTableId());
+ }
snapshotReceiver.completeSnapshot();
}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
index 1a3b1a888a..ad3f7b422e 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
@@ -29,6 +29,7 @@ import org.apache.inlong.sort.cdc.mysql.source.assigners.state.BinlogPendingSpli
import org.apache.inlong.sort.cdc.mysql.source.assigners.state.HybridPendingSplitsState;
import org.apache.inlong.sort.cdc.mysql.source.assigners.state.PendingSplitsState;
import org.apache.inlong.sort.cdc.mysql.source.assigners.state.PendingSplitsStateSerializer;
+import org.apache.inlong.sort.cdc.mysql.source.assigners.state.SnapshotPendingSplitsState;
import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfig;
import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.inlong.sort.cdc.mysql.source.enumerator.MySqlSourceEnumerator;
@@ -37,6 +38,7 @@ import org.apache.inlong.sort.cdc.mysql.source.reader.MySqlRecordEmitter;
import org.apache.inlong.sort.cdc.mysql.source.reader.MySqlSourceReader;
import org.apache.inlong.sort.cdc.mysql.source.reader.MySqlSourceReaderContext;
import org.apache.inlong.sort.cdc.mysql.source.reader.MySqlSplitReader;
+import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSchemalessSnapshotSplit;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplit;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplitSerializer;
import org.apache.inlong.sort.cdc.mysql.table.StartupMode;
@@ -59,10 +61,15 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import static org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.discoverCapturedTables;
import static org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.openJdbcConnection;
@@ -101,6 +108,8 @@ public class MySqlSource<T>
Source<T, MySqlSplit, PendingSplitsState>,
ResultTypeQueryable<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(MySqlSource.class);
+
private static final long serialVersionUID = 1L;
private final MySqlSourceConfigFactory configFactory;
@@ -208,6 +217,28 @@ public class MySqlSource<T>
final MySqlSplitAssigner splitAssigner;
if (checkpoint instanceof HybridPendingSplitsState) {
+ try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
+ final List<TableId> capturedTables = discoverCapturedTables(jdbc, sourceConfig);
+ SnapshotPendingSplitsState splitsState =
+ ((HybridPendingSplitsState) checkpoint).getSnapshotPendingSplits();
+ List<TableId> tables = new ArrayList<>(splitsState.getAlreadyProcessedTables());
+ tables.addAll(splitsState.getRemainingTables());
+ LOG.info("Checkpoint tables: {}", Arrays.deepToString(tables.toArray()));
+ tables.removeAll(capturedTables);
+ if (tables.size() > 0) {
+ LOG.info("Debezium doesn't capture {} tables, remove them from checkpoint",
+ Arrays.deepToString(tables.toArray()));
+ splitsState.getRemainingTables().removeAll(tables);
+ splitsState.getAlreadyProcessedTables().addAll(tables);
+ List<MySqlSchemalessSnapshotSplit> snapshotSplits = splitsState.getRemainingSplits().stream()
+ .filter(it -> tables.contains(it.getTableId())).collect(Collectors.toList());
+ splitsState.getRemainingSplits().removeAll(snapshotSplits);
+ LOG.info("Remaining splits: {}", Arrays.deepToString(splitsState.getRemainingSplits().toArray()));
+ }
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Failed to discover captured tables for enumerator", e);
+ }
+
splitAssigner =
new MySqlHybridSplitAssigner(
sourceConfig,