You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2023/06/30 02:17:04 UTC

[inlong] branch master updated: [INLONG-8363][Sort] MySQL connector supports to discard tables without capturing in snapshot phase #8364

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e47a7bb840 [INLONG-8363][Sort] MySQL connector supports to discard tables without capturing in snapshot phase #8364
e47a7bb840 is described below

commit e47a7bb840a9565aea6b448f8c43f048a0ff91f2
Author: Liao Rui <li...@users.noreply.github.com>
AuthorDate: Fri Jun 30 10:16:57 2023 +0800

    [INLONG-8363][Sort] MySQL connector supports to discard tables without capturing in snapshot phase #8364
---
 .../mysql/debezium/reader/SnapshotSplitReader.java |  8 ++++++
 .../debezium/task/MySqlSnapshotSplitReadTask.java  | 13 +++++++--
 .../inlong/sort/cdc/mysql/source/MySqlSource.java  | 31 ++++++++++++++++++++++
 3 files changed, 50 insertions(+), 2 deletions(-)

diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
index c1200deac1..6c424f2ec9 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
@@ -36,6 +36,7 @@ import io.debezium.heartbeat.Heartbeat;
 import io.debezium.pipeline.DataChangeEvent;
 import io.debezium.pipeline.source.spi.ChangeEventSource;
 import io.debezium.pipeline.spi.SnapshotResult;
+import io.debezium.pipeline.spi.SnapshotResult.SnapshotResultStatus;
 import io.debezium.util.SchemaNameAdjuster;
 import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -126,6 +127,13 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
                             return;
                         }
 
+                        if (SnapshotResultStatus.SKIPPED == snapshotResult.getStatus()) {
+                            LOG.info("Skip binlog split: {}", backfillBinlogSplit);
+                            currentTaskRunning = false;
+                            hasNextElement.set(false);
+                            return;
+                        }
+
                         // execute binlog read task
                         if (snapshotResult.isCompletedOrSkipped()) {
                             final MySqlBinlogSplitReadTask backfillBinlogReadTask =
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
index 446967def4..37c43a9281 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
@@ -160,6 +160,11 @@ public class MySqlSnapshotSplitReadTask extends AbstractSnapshotChangeEventSourc
         ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
                 .setHighWatermark(highWatermark);
 
+        Table table = databaseSchema.tableFor(snapshotSplit.getTableId());
+        if (table == null) {
+            return SnapshotResult.skipped(ctx.offset);
+        }
+
         return SnapshotResult.completed(ctx.offset);
     }
 
@@ -205,8 +210,12 @@ public class MySqlSnapshotSplitReadTask extends AbstractSnapshotChangeEventSourc
         EventDispatcher.SnapshotReceiver snapshotReceiver =
                 dispatcher.getSnapshotChangeEventReceiver();
         LOG.debug("Snapshotting table {}", tableId);
-        createDataEventsForTable(
-                snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId));
+        Table table = databaseSchema.tableFor(tableId);
+        if (table != null) {
+            createDataEventsForTable(snapshotContext, snapshotReceiver, table);
+        } else {
+            LOG.warn("Debezium doesn't capture {}, ignore this table", snapshotSplit.getTableId());
+        }
         snapshotReceiver.completeSnapshot();
     }
 
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
index 1a3b1a888a..ad3f7b422e 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
@@ -29,6 +29,7 @@ import org.apache.inlong.sort.cdc.mysql.source.assigners.state.BinlogPendingSpli
 import org.apache.inlong.sort.cdc.mysql.source.assigners.state.HybridPendingSplitsState;
 import org.apache.inlong.sort.cdc.mysql.source.assigners.state.PendingSplitsState;
 import org.apache.inlong.sort.cdc.mysql.source.assigners.state.PendingSplitsStateSerializer;
+import org.apache.inlong.sort.cdc.mysql.source.assigners.state.SnapshotPendingSplitsState;
 import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfig;
 import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfigFactory;
 import org.apache.inlong.sort.cdc.mysql.source.enumerator.MySqlSourceEnumerator;
@@ -37,6 +38,7 @@ import org.apache.inlong.sort.cdc.mysql.source.reader.MySqlRecordEmitter;
 import org.apache.inlong.sort.cdc.mysql.source.reader.MySqlSourceReader;
 import org.apache.inlong.sort.cdc.mysql.source.reader.MySqlSourceReaderContext;
 import org.apache.inlong.sort.cdc.mysql.source.reader.MySqlSplitReader;
+import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSchemalessSnapshotSplit;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplit;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplitSerializer;
 import org.apache.inlong.sort.cdc.mysql.table.StartupMode;
@@ -59,10 +61,15 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import static org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.discoverCapturedTables;
 import static org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.openJdbcConnection;
@@ -101,6 +108,8 @@ public class MySqlSource<T>
             Source<T, MySqlSplit, PendingSplitsState>,
             ResultTypeQueryable<T> {
 
+    private static final Logger LOG = LoggerFactory.getLogger(MySqlSource.class);
+
     private static final long serialVersionUID = 1L;
 
     private final MySqlSourceConfigFactory configFactory;
@@ -208,6 +217,28 @@ public class MySqlSource<T>
 
         final MySqlSplitAssigner splitAssigner;
         if (checkpoint instanceof HybridPendingSplitsState) {
+            try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
+                final List<TableId> capturedTables = discoverCapturedTables(jdbc, sourceConfig);
+                SnapshotPendingSplitsState splitsState =
+                        ((HybridPendingSplitsState) checkpoint).getSnapshotPendingSplits();
+                List<TableId> tables = new ArrayList<>(splitsState.getAlreadyProcessedTables());
+                tables.addAll(splitsState.getRemainingTables());
+                LOG.info("Checkpoint tables: {}", Arrays.deepToString(tables.toArray()));
+                tables.removeAll(capturedTables);
+                if (tables.size() > 0) {
+                    LOG.info("Debezium doesn't capture {} tables, remove them from checkpoint",
+                            Arrays.deepToString(tables.toArray()));
+                    splitsState.getRemainingTables().removeAll(tables);
+                    splitsState.getAlreadyProcessedTables().addAll(tables);
+                    List<MySqlSchemalessSnapshotSplit> snapshotSplits = splitsState.getRemainingSplits().stream()
+                            .filter(it -> tables.contains(it.getTableId())).collect(Collectors.toList());
+                    splitsState.getRemainingSplits().removeAll(snapshotSplits);
+                    LOG.info("Remaining splits: {}", Arrays.deepToString(splitsState.getRemainingSplits().toArray()));
+                }
+            } catch (Exception e) {
+                throw new FlinkRuntimeException("Failed to discover captured tables for enumerator", e);
+            }
+
             splitAssigner =
                     new MySqlHybridSplitAssigner(
                             sourceConfig,