You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2023/04/11 07:36:37 UTC
[inlong] branch master updated: [INLONG-7790][Sort] Capture newly added tables when setting "scan.newly-added-table.enabled=true" and "scan.startup.mode=latest-offset" (#7794)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new adbece9c2 [INLONG-7790][Sort] Capture newly added tables when setting "scan.newly-added-table.enabled=true" and "scan.startup.mode=latest-offset" (#7794)
adbece9c2 is described below
commit adbece9c227c80372a60d3ab4ea1d765d2f32510
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Tue Apr 11 15:36:32 2023 +0800
[INLONG-7790][Sort] Capture newly added tables when setting "scan.newly-added-table.enabled=true" and "scan.startup.mode=latest-offset" (#7794)
---
.../cdc/mysql/source/reader/MySqlSourceReader.java | 18 +++++++++---
.../mysql/source/utils/TableDiscoveryUtils.java | 33 ++++++++++++++++++----
2 files changed, 41 insertions(+), 10 deletions(-)
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
index 0a7ea82c6..10ae8b4f5 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
@@ -235,12 +235,22 @@ public class MySqlSourceReader<T>
private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(MySqlBinlogSplit split) {
final String splitId = split.splitId();
- if (split.getTableSchemas().isEmpty()) {
+ if (split.getTableSchemas().isEmpty() || sourceConfig.isScanNewlyAddedTableEnabled()) {
try (MySqlConnection jdbc =
DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) {
- Map<TableId, TableChanges.TableChange> tableSchemas =
- TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);
- LOG.info("The table schema discovery for binlog split {} success", splitId);
+ Map<TableId, TableChanges.TableChange> tableSchemas;
+ if (split.getTableSchemas().isEmpty()) {
+ tableSchemas =
+ TableDiscoveryUtils.discoverSchemaForCapturedTableSchemas(sourceConfig, jdbc);
+ LOG.info("The table schema discovery for binlog split {} success", splitId);
+ } else {
+ List<TableId> existedTables = new ArrayList<>(split.getTableSchemas().keySet());
+ tableSchemas =
+ TableDiscoveryUtils.discoverSchemaForNewAddedTables(existedTables, sourceConfig, jdbc);
+ LOG.info(
+ "The table schema discovery for new added tables of binlog split {} success",
+ split.splitId());
+ }
return MySqlBinlogSplit.fillTableSchemas(split, tableSchemas);
} catch (SQLException e) {
LOG.error("Failed to obtains table schemas due to {}", e.getMessage());
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/TableDiscoveryUtils.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/TableDiscoveryUtils.java
index abf925e89..005f36332 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/TableDiscoveryUtils.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/TableDiscoveryUtils.java
@@ -22,6 +22,8 @@ import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.stream.Collectors;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.inlong.sort.cdc.mysql.schema.MySqlSchema;
import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfig;
@@ -102,7 +104,7 @@ public class TableDiscoveryUtils {
/**
* Discover schemas of table.
*/
- public static Map<TableId, TableChanges.TableChange> discoverCapturedTableSchemas(
+ public static Map<TableId, TableChanges.TableChange> discoverSchemaForCapturedTableSchemas(
MySqlSourceConfig sourceConfig, MySqlConnection jdbc) {
final List<TableId> capturedTableIds;
try {
@@ -110,19 +112,38 @@ public class TableDiscoveryUtils {
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to discover captured tables", e);
}
+ return discoverSchemaForCapturedTables(capturedTableIds, sourceConfig, jdbc);
+ }
+
+ public static Map<TableId, TableChange> discoverSchemaForNewAddedTables(
+ List<TableId> existedTables, MySqlSourceConfig sourceConfig, MySqlConnection jdbc) {
+ final List<TableId> capturedTableIds;
+ try {
+ capturedTableIds =
+ listTables(jdbc, sourceConfig.getTableFilters()).stream()
+ .filter(tableId -> !existedTables.contains(tableId))
+ .collect(Collectors.toList());
+ } catch (SQLException e) {
+ throw new FlinkRuntimeException("Failed to discover captured tables", e);
+ }
+ return capturedTableIds.isEmpty()
+ ? new HashMap<>()
+ : discoverSchemaForCapturedTables(capturedTableIds, sourceConfig, jdbc);
+ }
+
+ public static Map<TableId, TableChange> discoverSchemaForCapturedTables(
+ List<TableId> capturedTableIds, MySqlSourceConfig sourceConfig, MySqlConnection jdbc) {
if (capturedTableIds.isEmpty()) {
throw new IllegalArgumentException(
String.format(
- "Can't find any matched tables, please check your "
- + "configured database-name: %s and table-name: %s",
+ "Can't find any matched tables, please check your configured database-name: %s and table-name: %s",
sourceConfig.getDatabaseList(), sourceConfig.getTableList()));
}
-
// fetch table schemas
MySqlSchema mySqlSchema = new MySqlSchema(sourceConfig, jdbc.isTableIdCaseSensitive());
- Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
+ Map<TableId, TableChange> tableSchemas = new HashMap<>();
for (TableId tableId : capturedTableIds) {
- TableChanges.TableChange tableSchema = mySqlSchema.getTableSchema(jdbc, tableId);
+ TableChange tableSchema = mySqlSchema.getTableSchema(jdbc, tableId);
tableSchemas.put(tableId, tableSchema);
}
return tableSchemas;