You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ic...@apache.org on 2023/05/04 08:56:13 UTC
[incubator-seatunnel] branch dev updated: [feature][catalog] Support for multiplexing connections (#4550)
This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 41277d7f7 [feature][catalog] Support for multiplexing connections (#4550)
41277d7f7 is described below
commit 41277d7f787ed7fb0da2b78406516ffd66de1212
Author: Zongwen Li <zo...@apache.org>
AuthorDate: Thu May 4 16:56:06 2023 +0800
[feature][catalog] Support for multiplexing connections (#4550)
---
.../seatunnel/api/table/catalog/Catalog.java | 42 +++++++++-
.../api/table/catalog/CatalogTableUtil.java | 54 +++----------
.../cdc/mysql/source/MySqlIncrementalSource.java | 12 +--
.../jdbc/catalog/AbstractJdbcCatalog.java | 20 +++--
.../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java | 91 +++++++++++++++++-----
5 files changed, 142 insertions(+), 77 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
index 6dd98336d..c84d6684a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.api.table.catalog;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
@@ -24,14 +25,19 @@ import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistExcepti
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.regex.Pattern;
/**
* Interface for reading and writing table metadata from SeaTunnel. Each connector need to contain
* the implementation of Catalog.
*/
-public interface Catalog {
+public interface Catalog extends AutoCloseable {
default Optional<Factory> getFactory() {
return Optional.empty();
@@ -115,6 +121,40 @@ public interface Catalog {
*/
CatalogTable getTable(TablePath tablePath) throws CatalogException, TableNotExistException;
+ default List<CatalogTable> getTables(ReadonlyConfig config) throws CatalogException {
+ // Get the list of specified tables
+ List<String> tableNames = config.get(CatalogOptions.TABLE_NAMES);
+ List<CatalogTable> catalogTables = new ArrayList<>();
+ if (tableNames != null && tableNames.size() >= 1) {
+ for (String tableName : tableNames) {
+ TablePath tablePath = TablePath.of(tableName);
+ if (this.tableExists(tablePath)) {
+ catalogTables.add(this.getTable(tablePath));
+ }
+ }
+ return catalogTables;
+ }
+
+ // Get the list of table pattern
+ String tablePatternStr = config.get(CatalogOptions.TABLE_PATTERN);
+ if (StringUtils.isBlank(tablePatternStr)) {
+ return Collections.emptyList();
+ }
+ Pattern databasePattern = Pattern.compile(config.get(CatalogOptions.DATABASE_PATTERN));
+ Pattern tablePattern = Pattern.compile(config.get(CatalogOptions.TABLE_PATTERN));
+ List<String> allDatabase = this.listDatabases();
+ allDatabase.removeIf(s -> !databasePattern.matcher(s).matches());
+ for (String databaseName : allDatabase) {
+ tableNames = this.listTables(databaseName);
+ for (String tableName : tableNames) {
+ if (tablePattern.matcher(databaseName + "." + tableName).matches()) {
+ catalogTables.add(this.getTable(TablePath.of(databaseName, tableName)));
+ }
+ }
+ }
+ return catalogTables;
+ }
+
/**
* Create a new table in this catalog.
*
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index 7894f2e13..0ab5e8799 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -41,8 +41,6 @@ import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.utils.JsonUtils;
-import org.apache.commons.lang3.StringUtils;
-
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -54,7 +52,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.regex.Pattern;
@Slf4j
public class CatalogTableUtil implements Serializable {
@@ -120,52 +117,21 @@ public class CatalogTableUtil implements Serializable {
factoryId);
return optionalCatalog
.map(
- catalog -> {
+ c -> {
long startTime = System.currentTimeMillis();
- List<CatalogTable> catalogTables =
- getCatalogTables(catalogConfig, catalog);
- log.info(
- String.format(
- "Get catalog tables, cost time: %d",
- System.currentTimeMillis() - startTime));
- return catalogTables;
+ try (Catalog catalog = c) {
+ catalog.open();
+ List<CatalogTable> catalogTables = catalog.getTables(catalogConfig);
+ log.info(
+ String.format(
+ "Get catalog tables, cost time: %d",
+ System.currentTimeMillis() - startTime));
+ return catalogTables;
+ }
})
.orElse(Collections.emptyList());
}
- public static List<CatalogTable> getCatalogTables(
- ReadonlyConfig catalogConfig, Catalog catalog) {
- // Get the list of specified tables
- List<String> tableNames = catalogConfig.get(CatalogOptions.TABLE_NAMES);
- List<CatalogTable> catalogTables = new ArrayList<>();
- if (tableNames != null && tableNames.size() >= 1) {
- for (String tableName : tableNames) {
- catalogTables.add(catalog.getTable(TablePath.of(tableName)));
- }
- return catalogTables;
- }
-
- // Get the list of table pattern
- String tablePatternStr = catalogConfig.get(CatalogOptions.TABLE_PATTERN);
- if (StringUtils.isBlank(tablePatternStr)) {
- return Collections.emptyList();
- }
- Pattern databasePattern =
- Pattern.compile(catalogConfig.get(CatalogOptions.DATABASE_PATTERN));
- Pattern tablePattern = Pattern.compile(catalogConfig.get(CatalogOptions.TABLE_PATTERN));
- List<String> allDatabase = catalog.listDatabases();
- allDatabase.removeIf(s -> !databasePattern.matcher(s).matches());
- for (String databaseName : allDatabase) {
- tableNames = catalog.listTables(databaseName);
- for (String tableName : tableNames) {
- if (tablePattern.matcher(databaseName + "." + tableName).matches()) {
- catalogTables.add(catalog.getTable(TablePath.of(databaseName, tableName)));
- }
- }
- }
- return catalogTables;
- }
-
public static CatalogTableUtil buildWithConfig(Config config) {
CheckResult checkResult = CheckConfigUtil.checkAllExists(config, "schema");
if (!checkResult.isSuccess()) {
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index 6f886bf62..43d8e505d 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -106,11 +106,13 @@ public class MySqlIncrementalSource<T> extends IncrementalSource<T, JdbcSourceCo
SeaTunnelDataType<SeaTunnelRow> physicalRowType;
if (dataType == null) {
// TODO: support metadata keys
- Catalog mySqlCatalog = new MySqlCatalogFactory().createCatalog("mysql", config);
- CatalogTable table =
- mySqlCatalog.getTable(
- TablePath.of(config.get(CatalogOptions.TABLE_NAMES).get(0)));
- physicalRowType = table.getTableSchema().toPhysicalRowDataType();
+ try (Catalog catalog = new MySqlCatalogFactory().createCatalog("mysql", config)) {
+ catalog.open();
+ CatalogTable table =
+ catalog.getTable(
+ TablePath.of(config.get(CatalogOptions.TABLE_NAMES).get(0)));
+ physicalRowType = table.getTableSchema().toPhysicalRowDataType();
+ }
} else {
physicalRowType = dataType;
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index bd516e325..28da81432 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -64,6 +64,8 @@ public abstract class AbstractJdbcCatalog implements Catalog {
protected final String suffix;
protected final String defaultUrl;
+ protected Connection defaultConnection;
+
public AbstractJdbcCatalog(
String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {
@@ -105,8 +107,8 @@ public abstract class AbstractJdbcCatalog implements Catalog {
@Override
public void open() throws CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
- // test connection, fail early if we cannot connect to database
+ try {
+ defaultConnection = DriverManager.getConnection(defaultUrl, username, pwd);
} catch (SQLException e) {
throw new CatalogException(
String.format("Failed connecting to %s via JDBC.", defaultUrl), e);
@@ -117,6 +119,15 @@ public abstract class AbstractJdbcCatalog implements Catalog {
@Override
public void close() throws CatalogException {
+ if (defaultConnection == null) {
+ return;
+ }
+ try {
+ defaultConnection.close();
+ } catch (SQLException e) {
+ throw new CatalogException(
+ String.format("Failed to close %s via JDBC.", defaultUrl), e);
+ }
LOG.info("Catalog {} closing", catalogName);
}
@@ -198,11 +209,6 @@ public abstract class AbstractJdbcCatalog implements Catalog {
return new ArrayList<>(constraintKeyMap.values());
}
- protected Optional<String> getColumnDefaultValue(
- DatabaseMetaData metaData, String table, String column) throws SQLException {
- return getColumnDefaultValue(metaData, null, null, table, column);
- }
-
protected Optional<String> getColumnDefaultValue(
DatabaseMetaData metaData, String database, String schema, String table, String column)
throws SQLException {
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
index d8534d5df..463c7a8bf 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
@@ -34,6 +34,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalo
import com.mysql.cj.MysqlType;
import com.mysql.cj.jdbc.result.ResultSetImpl;
+import com.mysql.cj.util.StringUtils;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@@ -50,6 +51,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
public class MySqlCatalog extends AbstractJdbcCatalog {
@@ -62,15 +64,43 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
SYS_DATABASES.add("sys");
}
+ protected final Map<String, Connection> connectionMap;
+
public MySqlCatalog(
String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {
super(catalogName, username, pwd, urlInfo);
+ this.connectionMap = new ConcurrentHashMap<>();
+ }
+
+ public Connection getConnection(String url) {
+ if (connectionMap.containsKey(url)) {
+ return connectionMap.get(url);
+ }
+ try {
+ Connection connection = DriverManager.getConnection(url, username, pwd);
+ connectionMap.put(url, connection);
+ return connection;
+ } catch (SQLException e) {
+ throw new CatalogException(String.format("Failed connecting to %s via JDBC.", url), e);
+ }
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ for (Map.Entry<String, Connection> entry : connectionMap.entrySet()) {
+ try {
+ entry.getValue().close();
+ } catch (SQLException e) {
+ throw new CatalogException(
+ String.format("Failed to close %s via JDBC.", entry.getKey()), e);
+ }
+ }
+ super.close();
}
@Override
public List<String> listDatabases() throws CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
- PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;")) {
+ try (PreparedStatement ps = defaultConnection.prepareStatement("SHOW DATABASES;")) {
List<String> databases = new ArrayList<>();
ResultSet rs = ps.executeQuery();
@@ -97,8 +127,7 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
}
String dbUrl = getUrlFromDatabaseName(databaseName);
- try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
- PreparedStatement ps = conn.prepareStatement("SHOW TABLES;")) {
+ try (PreparedStatement ps = getConnection(dbUrl).prepareStatement("SHOW TABLES;")) {
ResultSet rs = ps.executeQuery();
@@ -123,13 +152,15 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
}
String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
- try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
+ Connection conn = getConnection(dbUrl);
+ try {
DatabaseMetaData metaData = conn.getMetaData();
Optional<PrimaryKey> primaryKey =
getPrimaryKey(metaData, tablePath.getDatabaseName(), tablePath.getTableName());
List<ConstraintKey> constraintKeys =
getConstraintKeys(
metaData, tablePath.getDatabaseName(), tablePath.getTableName());
+ Map<String, Object> columnsDefaultValue = getColumnsDefaultValue(tablePath, conn);
try (PreparedStatement ps =
conn.prepareStatement(
@@ -138,6 +169,7 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
tablePath.getFullNameWithQuoted()))) {
ResultSetMetaData tableMetaData = ps.getMetaData();
TableSchema.Builder builder = TableSchema.builder();
+
// add column
for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
String columnName = tableMetaData.getColumnName(i);
@@ -146,9 +178,7 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
String comment = tableMetaData.getColumnLabel(i);
boolean isNullable =
tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable;
- Object defaultValue =
- getColumnDefaultValue(metaData, tablePath.getTableName(), columnName)
- .orElse(null);
+ Object defaultValue = columnsDefaultValue.get(columnName);
PhysicalColumn physicalColumn =
PhysicalColumn.of(
@@ -181,6 +211,29 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
}
}
+ public static Map<String, Object> getColumnsDefaultValue(TablePath tablePath, Connection conn) {
+ StringBuilder queryBuf = new StringBuilder("SHOW FULL COLUMNS FROM ");
+ queryBuf.append(StringUtils.quoteIdentifier(tablePath.getTableName(), "`", false));
+ queryBuf.append(" FROM ");
+ queryBuf.append(StringUtils.quoteIdentifier(tablePath.getDatabaseName(), "`", false));
+ try (PreparedStatement ps2 = conn.prepareStatement(queryBuf.toString())) {
+ ResultSet rs = ps2.executeQuery();
+ Map<String, Object> result = new HashMap<>();
+ while (rs.next()) {
+ String field = rs.getString("Field");
+ Object defaultValue = rs.getObject("Default");
+ result.put(field, defaultValue);
+ }
+ return result;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "Failed getting table(%s) columns default value",
+ tablePath.getFullName()),
+ e);
+ }
+ }
+
// todo: If the origin source is mysql, we can directly use create table like to create the
// target table?
@Override
@@ -188,8 +241,7 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
throws CatalogException {
String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
String createTableSql = MysqlCreateTableSqlBuilder.builder(tablePath, table).build();
- try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
- PreparedStatement ps = conn.prepareStatement(createTableSql)) {
+ try (PreparedStatement ps = getConnection(dbUrl).prepareStatement(createTableSql)) {
return ps.execute();
} catch (Exception e) {
throw new CatalogException(
@@ -200,9 +252,9 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
@Override
protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
- try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
- PreparedStatement ps =
- conn.prepareStatement(
+ try (PreparedStatement ps =
+ getConnection(dbUrl)
+ .prepareStatement(
String.format(
"DROP TABLE %s IF EXIST;", tablePath.getFullName()))) {
// Will there exist concurrent drop for one table?
@@ -215,10 +267,9 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
@Override
protected boolean createDatabaseInternal(String databaseName) throws CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
- PreparedStatement ps =
- conn.prepareStatement(
- String.format("CREATE DATABASE `%s`;", databaseName))) {
+ try (PreparedStatement ps =
+ defaultConnection.prepareStatement(
+ String.format("CREATE DATABASE `%s`;", databaseName))) {
return ps.execute();
} catch (Exception e) {
throw new CatalogException(
@@ -231,9 +282,9 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
@Override
protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
- PreparedStatement ps =
- conn.prepareStatement(String.format("DROP DATABASE `%s`;", databaseName))) {
+ try (PreparedStatement ps =
+ defaultConnection.prepareStatement(
+ String.format("DROP DATABASE `%s`;", databaseName))) {
return ps.execute();
} catch (Exception e) {
throw new CatalogException(