You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ic...@apache.org on 2023/05/04 08:56:13 UTC

[incubator-seatunnel] branch dev updated: [feature][catalog] Support for multiplexing connections (#4550)

This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 41277d7f7 [feature][catalog] Support for multiplexing connections (#4550)
41277d7f7 is described below

commit 41277d7f787ed7fb0da2b78406516ffd66de1212
Author: Zongwen Li <zo...@apache.org>
AuthorDate: Thu May 4 16:56:06 2023 +0800

    [feature][catalog] Support for multiplexing connections (#4550)
---
 .../seatunnel/api/table/catalog/Catalog.java       | 42 +++++++++-
 .../api/table/catalog/CatalogTableUtil.java        | 54 +++----------
 .../cdc/mysql/source/MySqlIncrementalSource.java   | 12 +--
 .../jdbc/catalog/AbstractJdbcCatalog.java          | 20 +++--
 .../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java | 91 +++++++++++++++++-----
 5 files changed, 142 insertions(+), 77 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
index 6dd98336d..c84d6684a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.api.table.catalog;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
 import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
@@ -24,14 +25,19 @@ import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistExcepti
 import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.factory.Factory;
 
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.regex.Pattern;
 
 /**
  * Interface for reading and writing table metadata from SeaTunnel. Each connector need to contain
  * the implementation of Catalog.
  */
-public interface Catalog {
+public interface Catalog extends AutoCloseable {
 
     default Optional<Factory> getFactory() {
         return Optional.empty();
@@ -115,6 +121,40 @@ public interface Catalog {
      */
     CatalogTable getTable(TablePath tablePath) throws CatalogException, TableNotExistException;
 
+    default List<CatalogTable> getTables(ReadonlyConfig config) throws CatalogException {
+        // Get the list of specified tables
+        List<String> tableNames = config.get(CatalogOptions.TABLE_NAMES);
+        List<CatalogTable> catalogTables = new ArrayList<>();
+        if (tableNames != null && tableNames.size() >= 1) {
+            for (String tableName : tableNames) {
+                TablePath tablePath = TablePath.of(tableName);
+                if (this.tableExists(tablePath)) {
+                    catalogTables.add(this.getTable(tablePath));
+                }
+            }
+            return catalogTables;
+        }
+
+        // Get the list of table pattern
+        String tablePatternStr = config.get(CatalogOptions.TABLE_PATTERN);
+        if (StringUtils.isBlank(tablePatternStr)) {
+            return Collections.emptyList();
+        }
+        Pattern databasePattern = Pattern.compile(config.get(CatalogOptions.DATABASE_PATTERN));
+        Pattern tablePattern = Pattern.compile(config.get(CatalogOptions.TABLE_PATTERN));
+        List<String> allDatabase = this.listDatabases();
+        allDatabase.removeIf(s -> !databasePattern.matcher(s).matches());
+        for (String databaseName : allDatabase) {
+            tableNames = this.listTables(databaseName);
+            for (String tableName : tableNames) {
+                if (tablePattern.matcher(databaseName + "." + tableName).matches()) {
+                    catalogTables.add(this.getTable(TablePath.of(databaseName, tableName)));
+                }
+            }
+        }
+        return catalogTables;
+    }
+
     /**
      * Create a new table in this catalog.
      *
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index 7894f2e13..0ab5e8799 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -41,8 +41,6 @@ import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.utils.JsonUtils;
 
-import org.apache.commons.lang3.StringUtils;
-
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -54,7 +52,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.regex.Pattern;
 
 @Slf4j
 public class CatalogTableUtil implements Serializable {
@@ -120,52 +117,21 @@ public class CatalogTableUtil implements Serializable {
                         factoryId);
         return optionalCatalog
                 .map(
-                        catalog -> {
+                        c -> {
                             long startTime = System.currentTimeMillis();
-                            List<CatalogTable> catalogTables =
-                                    getCatalogTables(catalogConfig, catalog);
-                            log.info(
-                                    String.format(
-                                            "Get catalog tables, cost time: %d",
-                                            System.currentTimeMillis() - startTime));
-                            return catalogTables;
+                            try (Catalog catalog = c) {
+                                catalog.open();
+                                List<CatalogTable> catalogTables = catalog.getTables(catalogConfig);
+                                log.info(
+                                        String.format(
+                                                "Get catalog tables, cost time: %d",
+                                                System.currentTimeMillis() - startTime));
+                                return catalogTables;
+                            }
                         })
                 .orElse(Collections.emptyList());
     }
 
-    public static List<CatalogTable> getCatalogTables(
-            ReadonlyConfig catalogConfig, Catalog catalog) {
-        // Get the list of specified tables
-        List<String> tableNames = catalogConfig.get(CatalogOptions.TABLE_NAMES);
-        List<CatalogTable> catalogTables = new ArrayList<>();
-        if (tableNames != null && tableNames.size() >= 1) {
-            for (String tableName : tableNames) {
-                catalogTables.add(catalog.getTable(TablePath.of(tableName)));
-            }
-            return catalogTables;
-        }
-
-        // Get the list of table pattern
-        String tablePatternStr = catalogConfig.get(CatalogOptions.TABLE_PATTERN);
-        if (StringUtils.isBlank(tablePatternStr)) {
-            return Collections.emptyList();
-        }
-        Pattern databasePattern =
-                Pattern.compile(catalogConfig.get(CatalogOptions.DATABASE_PATTERN));
-        Pattern tablePattern = Pattern.compile(catalogConfig.get(CatalogOptions.TABLE_PATTERN));
-        List<String> allDatabase = catalog.listDatabases();
-        allDatabase.removeIf(s -> !databasePattern.matcher(s).matches());
-        for (String databaseName : allDatabase) {
-            tableNames = catalog.listTables(databaseName);
-            for (String tableName : tableNames) {
-                if (tablePattern.matcher(databaseName + "." + tableName).matches()) {
-                    catalogTables.add(catalog.getTable(TablePath.of(databaseName, tableName)));
-                }
-            }
-        }
-        return catalogTables;
-    }
-
     public static CatalogTableUtil buildWithConfig(Config config) {
         CheckResult checkResult = CheckConfigUtil.checkAllExists(config, "schema");
         if (!checkResult.isSuccess()) {
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index 6f886bf62..43d8e505d 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -106,11 +106,13 @@ public class MySqlIncrementalSource<T> extends IncrementalSource<T, JdbcSourceCo
         SeaTunnelDataType<SeaTunnelRow> physicalRowType;
         if (dataType == null) {
             // TODO: support metadata keys
-            Catalog mySqlCatalog = new MySqlCatalogFactory().createCatalog("mysql", config);
-            CatalogTable table =
-                    mySqlCatalog.getTable(
-                            TablePath.of(config.get(CatalogOptions.TABLE_NAMES).get(0)));
-            physicalRowType = table.getTableSchema().toPhysicalRowDataType();
+            try (Catalog catalog = new MySqlCatalogFactory().createCatalog("mysql", config)) {
+                catalog.open();
+                CatalogTable table =
+                        catalog.getTable(
+                                TablePath.of(config.get(CatalogOptions.TABLE_NAMES).get(0)));
+                physicalRowType = table.getTableSchema().toPhysicalRowDataType();
+            }
         } else {
             physicalRowType = dataType;
         }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index bd516e325..28da81432 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -64,6 +64,8 @@ public abstract class AbstractJdbcCatalog implements Catalog {
     protected final String suffix;
     protected final String defaultUrl;
 
+    protected Connection defaultConnection;
+
     public AbstractJdbcCatalog(
             String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {
 
@@ -105,8 +107,8 @@ public abstract class AbstractJdbcCatalog implements Catalog {
 
     @Override
     public void open() throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
-            // test connection, fail early if we cannot connect to database
+        try {
+            defaultConnection = DriverManager.getConnection(defaultUrl, username, pwd);
         } catch (SQLException e) {
             throw new CatalogException(
                     String.format("Failed connecting to %s via JDBC.", defaultUrl), e);
@@ -117,6 +119,15 @@ public abstract class AbstractJdbcCatalog implements Catalog {
 
     @Override
     public void close() throws CatalogException {
+        if (defaultConnection == null) {
+            return;
+        }
+        try {
+            defaultConnection.close();
+        } catch (SQLException e) {
+            throw new CatalogException(
+                    String.format("Failed to close %s via JDBC.", defaultUrl), e);
+        }
         LOG.info("Catalog {} closing", catalogName);
     }
 
@@ -198,11 +209,6 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         return new ArrayList<>(constraintKeyMap.values());
     }
 
-    protected Optional<String> getColumnDefaultValue(
-            DatabaseMetaData metaData, String table, String column) throws SQLException {
-        return getColumnDefaultValue(metaData, null, null, table, column);
-    }
-
     protected Optional<String> getColumnDefaultValue(
             DatabaseMetaData metaData, String database, String schema, String table, String column)
             throws SQLException {
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
index d8534d5df..463c7a8bf 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
@@ -34,6 +34,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalo
 
 import com.mysql.cj.MysqlType;
 import com.mysql.cj.jdbc.result.ResultSetImpl;
+import com.mysql.cj.util.StringUtils;
 
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
@@ -50,6 +51,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class MySqlCatalog extends AbstractJdbcCatalog {
 
@@ -62,15 +64,43 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         SYS_DATABASES.add("sys");
     }
 
+    protected final Map<String, Connection> connectionMap;
+
     public MySqlCatalog(
             String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {
         super(catalogName, username, pwd, urlInfo);
+        this.connectionMap = new ConcurrentHashMap<>();
+    }
+
+    public Connection getConnection(String url) {
+        if (connectionMap.containsKey(url)) {
+            return connectionMap.get(url);
+        }
+        try {
+            Connection connection = DriverManager.getConnection(url, username, pwd);
+            connectionMap.put(url, connection);
+            return connection;
+        } catch (SQLException e) {
+            throw new CatalogException(String.format("Failed connecting to %s via JDBC.", url), e);
+        }
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        for (Map.Entry<String, Connection> entry : connectionMap.entrySet()) {
+            try {
+                entry.getValue().close();
+            } catch (SQLException e) {
+                throw new CatalogException(
+                        String.format("Failed to close %s via JDBC.", entry.getKey()), e);
+            }
+        }
+        super.close();
     }
 
     @Override
     public List<String> listDatabases() throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
-                PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;")) {
+        try (PreparedStatement ps = defaultConnection.prepareStatement("SHOW DATABASES;")) {
 
             List<String> databases = new ArrayList<>();
             ResultSet rs = ps.executeQuery();
@@ -97,8 +127,7 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         }
 
         String dbUrl = getUrlFromDatabaseName(databaseName);
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
-                PreparedStatement ps = conn.prepareStatement("SHOW TABLES;")) {
+        try (PreparedStatement ps = getConnection(dbUrl).prepareStatement("SHOW TABLES;")) {
 
             ResultSet rs = ps.executeQuery();
 
@@ -123,13 +152,15 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         }
 
         String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
+        Connection conn = getConnection(dbUrl);
+        try {
             DatabaseMetaData metaData = conn.getMetaData();
             Optional<PrimaryKey> primaryKey =
                     getPrimaryKey(metaData, tablePath.getDatabaseName(), tablePath.getTableName());
             List<ConstraintKey> constraintKeys =
                     getConstraintKeys(
                             metaData, tablePath.getDatabaseName(), tablePath.getTableName());
+            Map<String, Object> columnsDefaultValue = getColumnsDefaultValue(tablePath, conn);
 
             try (PreparedStatement ps =
                     conn.prepareStatement(
@@ -138,6 +169,7 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
                                     tablePath.getFullNameWithQuoted()))) {
                 ResultSetMetaData tableMetaData = ps.getMetaData();
                 TableSchema.Builder builder = TableSchema.builder();
+
                 // add column
                 for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
                     String columnName = tableMetaData.getColumnName(i);
@@ -146,9 +178,7 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
                     String comment = tableMetaData.getColumnLabel(i);
                     boolean isNullable =
                             tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable;
-                    Object defaultValue =
-                            getColumnDefaultValue(metaData, tablePath.getTableName(), columnName)
-                                    .orElse(null);
+                    Object defaultValue = columnsDefaultValue.get(columnName);
 
                     PhysicalColumn physicalColumn =
                             PhysicalColumn.of(
@@ -181,6 +211,29 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         }
     }
 
+    public static Map<String, Object> getColumnsDefaultValue(TablePath tablePath, Connection conn) {
+        StringBuilder queryBuf = new StringBuilder("SHOW FULL COLUMNS FROM ");
+        queryBuf.append(StringUtils.quoteIdentifier(tablePath.getTableName(), "`", false));
+        queryBuf.append(" FROM ");
+        queryBuf.append(StringUtils.quoteIdentifier(tablePath.getDatabaseName(), "`", false));
+        try (PreparedStatement ps2 = conn.prepareStatement(queryBuf.toString())) {
+            ResultSet rs = ps2.executeQuery();
+            Map<String, Object> result = new HashMap<>();
+            while (rs.next()) {
+                String field = rs.getString("Field");
+                Object defaultValue = rs.getObject("Default");
+                result.put(field, defaultValue);
+            }
+            return result;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed getting table(%s) columns default value",
+                            tablePath.getFullName()),
+                    e);
+        }
+    }
+
     // todo: If the origin source is mysql, we can directly use create table like to create the
     // target table?
     @Override
@@ -188,8 +241,7 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
             throws CatalogException {
         String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
         String createTableSql = MysqlCreateTableSqlBuilder.builder(tablePath, table).build();
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
-                PreparedStatement ps = conn.prepareStatement(createTableSql)) {
+        try (PreparedStatement ps = getConnection(dbUrl).prepareStatement(createTableSql)) {
             return ps.execute();
         } catch (Exception e) {
             throw new CatalogException(
@@ -200,9 +252,9 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
     @Override
     protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
         String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
+        try (PreparedStatement ps =
+                getConnection(dbUrl)
+                        .prepareStatement(
                                 String.format(
                                         "DROP TABLE %s IF EXIST;", tablePath.getFullName()))) {
             // Will there exist concurrent drop for one table?
@@ -215,10 +267,9 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
 
     @Override
     protected boolean createDatabaseInternal(String databaseName) throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
-                                String.format("CREATE DATABASE `%s`;", databaseName))) {
+        try (PreparedStatement ps =
+                defaultConnection.prepareStatement(
+                        String.format("CREATE DATABASE `%s`;", databaseName))) {
             return ps.execute();
         } catch (Exception e) {
             throw new CatalogException(
@@ -231,9 +282,9 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
 
     @Override
     protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(String.format("DROP DATABASE `%s`;", databaseName))) {
+        try (PreparedStatement ps =
+                defaultConnection.prepareStatement(
+                        String.format("DROP DATABASE `%s`;", databaseName))) {
             return ps.execute();
         } catch (Exception e) {
             throw new CatalogException(