You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2023/04/11 07:36:37 UTC

[inlong] branch master updated: [INLONG-7790][Sort] Capture newly added tables when setting "scan.newly-added-table.enabled=true" and "scan.startup.mode=latest-offset" (#7794)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new adbece9c2 [INLONG-7790][Sort] Capture newly added tables when setting "scan.newly-added-table.enabled=true" and "scan.startup.mode=latest-offset" (#7794)
adbece9c2 is described below

commit adbece9c227c80372a60d3ab4ea1d765d2f32510
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Tue Apr 11 15:36:32 2023 +0800

    [INLONG-7790][Sort] Capture newly added tables when setting "scan.newly-added-table.enabled=true" and "scan.startup.mode=latest-offset" (#7794)
---
 .../cdc/mysql/source/reader/MySqlSourceReader.java | 18 +++++++++---
 .../mysql/source/utils/TableDiscoveryUtils.java    | 33 ++++++++++++++++++----
 2 files changed, 41 insertions(+), 10 deletions(-)

diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
index 0a7ea82c6..10ae8b4f5 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
@@ -235,12 +235,22 @@ public class MySqlSourceReader<T>
 
     private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(MySqlBinlogSplit split) {
         final String splitId = split.splitId();
-        if (split.getTableSchemas().isEmpty()) {
+        if (split.getTableSchemas().isEmpty() || sourceConfig.isScanNewlyAddedTableEnabled()) {
             try (MySqlConnection jdbc =
                     DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) {
-                Map<TableId, TableChanges.TableChange> tableSchemas =
-                        TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);
-                LOG.info("The table schema discovery for binlog split {} success", splitId);
+                Map<TableId, TableChanges.TableChange> tableSchemas;
+                if (split.getTableSchemas().isEmpty()) {
+                    tableSchemas =
+                            TableDiscoveryUtils.discoverSchemaForCapturedTableSchemas(sourceConfig, jdbc);
+                    LOG.info("The table schema discovery for binlog split {} success", splitId);
+                } else {
+                    List<TableId> existedTables = new ArrayList<>(split.getTableSchemas().keySet());
+                    tableSchemas =
+                            TableDiscoveryUtils.discoverSchemaForNewAddedTables(existedTables, sourceConfig, jdbc);
+                    LOG.info(
+                            "The table schema discovery for new added tables of binlog split {} success",
+                            split.splitId());
+                }
                 return MySqlBinlogSplit.fillTableSchemas(split, tableSchemas);
             } catch (SQLException e) {
                 LOG.error("Failed to obtains table schemas due to {}", e.getMessage());
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/TableDiscoveryUtils.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/TableDiscoveryUtils.java
index abf925e89..005f36332 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/TableDiscoveryUtils.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/TableDiscoveryUtils.java
@@ -22,6 +22,8 @@ import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.RelationalTableFilters;
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.stream.Collectors;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.inlong.sort.cdc.mysql.schema.MySqlSchema;
 import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfig;
@@ -102,7 +104,7 @@ public class TableDiscoveryUtils {
     /**
      * Discover schemas of table.
      */
-    public static Map<TableId, TableChanges.TableChange> discoverCapturedTableSchemas(
+    public static Map<TableId, TableChanges.TableChange> discoverSchemaForCapturedTableSchemas(
             MySqlSourceConfig sourceConfig, MySqlConnection jdbc) {
         final List<TableId> capturedTableIds;
         try {
@@ -110,19 +112,38 @@ public class TableDiscoveryUtils {
         } catch (SQLException e) {
             throw new FlinkRuntimeException("Failed to discover captured tables", e);
         }
+        return discoverSchemaForCapturedTables(capturedTableIds, sourceConfig, jdbc);
+    }
+
+    public static Map<TableId, TableChange> discoverSchemaForNewAddedTables(
+            List<TableId> existedTables, MySqlSourceConfig sourceConfig, MySqlConnection jdbc) {
+        final List<TableId> capturedTableIds;
+        try {
+            capturedTableIds =
+                    listTables(jdbc, sourceConfig.getTableFilters()).stream()
+                            .filter(tableId -> !existedTables.contains(tableId))
+                            .collect(Collectors.toList());
+        } catch (SQLException e) {
+            throw new FlinkRuntimeException("Failed to discover captured tables", e);
+        }
+        return capturedTableIds.isEmpty()
+                ? new HashMap<>()
+                : discoverSchemaForCapturedTables(capturedTableIds, sourceConfig, jdbc);
+    }
+
+    public static Map<TableId, TableChange> discoverSchemaForCapturedTables(
+            List<TableId> capturedTableIds, MySqlSourceConfig sourceConfig, MySqlConnection jdbc) {
         if (capturedTableIds.isEmpty()) {
             throw new IllegalArgumentException(
                     String.format(
-                            "Can't find any matched tables, please check your "
-                                    + "configured database-name: %s and table-name: %s",
+                            "Can't find any matched tables, please check your configured database-name: %s and table-name: %s",
                             sourceConfig.getDatabaseList(), sourceConfig.getTableList()));
         }
-
         // fetch table schemas
         MySqlSchema mySqlSchema = new MySqlSchema(sourceConfig, jdbc.isTableIdCaseSensitive());
-        Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
+        Map<TableId, TableChange> tableSchemas = new HashMap<>();
         for (TableId tableId : capturedTableIds) {
-            TableChanges.TableChange tableSchema = mySqlSchema.getTableSchema(jdbc, tableId);
+            TableChange tableSchema = mySqlSchema.getTableSchema(jdbc, tableId);
             tableSchemas.put(tableId, tableSchema);
         }
         return tableSchemas;