You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/04/19 01:52:14 UTC
[incubator-seatunnel] branch dev updated: [Improve][Catalog] refactor catalog (#4540)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b0a701cb8 [Improve][Catalog] refactor catalog (#4540)
b0a701cb8 is described below
commit b0a701cb83a2353da3b1f93a483c19cddd55747c
Author: Guangdong Liu <80...@qq.com>
AuthorDate: Wed Apr 19 09:52:06 2023 +0800
[Improve][Catalog] refactor catalog (#4540)
---
.../jdbc/catalog/AbstractJdbcCatalog.java | 234 ++++++++++++++++++++-
.../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java | 219 +------------------
.../catalog/mysql/MysqlCreateTableSqlBuilder.java | 2 +-
.../jdbc/catalog/sqlserver/SqlServerCatalog.java | 228 +-------------------
.../jdbc/catalog/sqlserver/SqlServerType.java | 2 +-
.../jdbc/internal/dialect/JdbcDialect.java | 38 ++++
.../jdbc/internal/dialect/mysql/MysqlDialect.java | 8 +
.../dialect/sqlserver/SqlServerDialect.java | 30 +++
.../connector-starrocks/pom.xml | 5 +
.../starrocks/catalog/StarRocksCatalog.java | 177 +++-------------
.../starrocks/catalog/StarRocksDialect.java | 27 +++
11 files changed, 365 insertions(+), 605 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index bd516e325..4fb68775f 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -21,14 +21,20 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -40,14 +46,19 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
@@ -55,7 +66,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
public abstract class AbstractJdbcCatalog implements Catalog {
private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class);
-
protected final String catalogName;
protected final String defaultDatabase;
protected final String username;
@@ -63,6 +73,8 @@ public abstract class AbstractJdbcCatalog implements Catalog {
protected final String baseUrl;
protected final String suffix;
protected final String defaultUrl;
+ protected final JdbcDialect jdbcDialect;
+ protected static final Set<String> SYS_DATABASES = new HashSet<>();
public AbstractJdbcCatalog(
String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {
@@ -80,6 +92,7 @@ public abstract class AbstractJdbcCatalog implements Catalog {
this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
this.defaultUrl = urlInfo.getOrigin();
this.suffix = urlInfo.getSuffix();
+ this.jdbcDialect = JdbcDialectLoader.load(this.baseUrl);
}
@Override
@@ -107,6 +120,7 @@ public abstract class AbstractJdbcCatalog implements Catalog {
public void open() throws CatalogException {
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
// test connection, fail early if we cannot connect to database
+ conn.getCatalog();
} catch (SQLException e) {
throw new CatalogException(
String.format("Failed connecting to %s via JDBC.", defaultUrl), e);
@@ -120,6 +134,56 @@ public abstract class AbstractJdbcCatalog implements Catalog {
LOG.info("Catalog {} closing", catalogName);
}
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+
+ PreparedStatement ps = conn.prepareStatement(jdbcDialect.listDatabases());
+
+ List<String> databases = new ArrayList<>();
+ ResultSet rs = ps.executeQuery();
+
+ while (rs.next()) {
+ String databaseName = rs.getString(1);
+ if (!getSysDatabases().contains(databaseName)) {
+ databases.add(rs.getString(1));
+ }
+ }
+
+ return databases;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed listing database in catalog %s", this.catalogName), e);
+ }
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws CatalogException, DatabaseNotExistException {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(this.catalogName, databaseName);
+ }
+
+ String dbUrl = jdbcDialect.getUrlFromDatabaseName(baseUrl, databaseName, suffix);
+ try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
+ PreparedStatement ps =
+ conn.prepareStatement(jdbcDialect.listTableSql(databaseName))) {
+
+ ResultSet rs = ps.executeQuery();
+
+ List<String> tables = new ArrayList<>();
+
+ while (rs.next()) {
+ tables.add(jdbcDialect.getTableName(rs));
+ }
+
+ return tables;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed listing database in catalog %s", catalogName), e);
+ }
+ }
+
protected Optional<PrimaryKey> getPrimaryKey(
DatabaseMetaData metaData, String database, String table) throws SQLException {
return getPrimaryKey(metaData, database, table, table);
@@ -226,7 +290,8 @@ public abstract class AbstractJdbcCatalog implements Catalog {
public boolean tableExists(TablePath tablePath) throws CatalogException {
try {
return databaseExists(tablePath.getDatabaseName())
- && listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
+ && listTables(tablePath.getDatabaseName())
+ .contains(jdbcDialect.getTableName(tablePath));
} catch (DatabaseNotExistException e) {
return false;
}
@@ -245,8 +310,86 @@ public abstract class AbstractJdbcCatalog implements Catalog {
}
}
- protected abstract boolean createTableInternal(TablePath tablePath, CatalogTable table)
- throws CatalogException;
+ public CatalogTable getTable(TablePath tablePath)
+ throws CatalogException, TableNotExistException {
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+
+ String dbUrl =
+ jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix);
+ try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
+ DatabaseMetaData metaData = conn.getMetaData();
+ Optional<PrimaryKey> primaryKey =
+ getPrimaryKey(
+ metaData,
+ tablePath.getDatabaseName(),
+ tablePath.getSchemaName(),
+ tablePath.getTableName());
+ List<ConstraintKey> constraintKeys =
+ getConstraintKeys(
+ metaData,
+ tablePath.getDatabaseName(),
+ tablePath.getSchemaName(),
+ tablePath.getTableName());
+
+ try (PreparedStatement ps =
+ conn.prepareStatement(
+ String.format(
+ "SELECT * FROM %s WHERE 1 = 0;",
+ tablePath.getFullNameWithQuoted("\"")))) {
+ ResultSetMetaData tableMetaData = ps.getMetaData();
+ TableSchema.Builder builder = TableSchema.builder();
+ // add column
+ for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
+ String columnName = tableMetaData.getColumnName(i);
+ SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
+ int columnDisplaySize = tableMetaData.getColumnDisplaySize(i);
+ String comment = tableMetaData.getColumnLabel(i);
+ boolean isNullable =
+ tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable;
+ Object defaultValue =
+ getColumnDefaultValue(
+ metaData,
+ tablePath.getDatabaseName(),
+ tablePath.getSchemaName(),
+ tablePath.getTableName(),
+ columnName)
+ .orElse(null);
+
+ PhysicalColumn physicalColumn =
+ PhysicalColumn.of(
+ columnName,
+ type,
+ columnDisplaySize,
+ isNullable,
+ defaultValue,
+ comment);
+ builder.column(physicalColumn);
+ }
+ // add primary key
+ primaryKey.ifPresent(builder::primaryKey);
+ // add constraint key
+ constraintKeys.forEach(builder::constraintKey);
+ TableIdentifier tableIdentifier =
+ TableIdentifier.of(
+ catalogName,
+ tablePath.getDatabaseName(),
+ tablePath.getSchemaName(),
+ tablePath.getTableName());
+ return CatalogTable.of(
+ tableIdentifier,
+ builder.build(),
+ buildConnectorOptions(tablePath),
+ Collections.emptyList(),
+ "");
+ }
+
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed getting table %s", tablePath.getFullName()), e);
+ }
+ }
@Override
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
@@ -257,7 +400,20 @@ public abstract class AbstractJdbcCatalog implements Catalog {
}
}
- protected abstract boolean dropTableInternal(TablePath tablePath) throws CatalogException;
+ protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
+ String dbUrl =
+ jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix);
+ try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
+ PreparedStatement ps =
+ conn.prepareStatement(
+ jdbcDialect.getDropTableSql(tablePath.getFullName()))) {
+ // Will there exist concurrent drop for one table?
+ return ps.execute();
+ } catch (SQLException e) {
+ throw new CatalogException(
+ String.format("Failed dropping table %s", tablePath.getFullName()), e);
+ }
+ }
@Override
public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
@@ -273,8 +429,6 @@ public abstract class AbstractJdbcCatalog implements Catalog {
}
}
- protected abstract boolean createDatabaseInternal(String databaseName);
-
@Override
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
@@ -286,5 +440,69 @@ public abstract class AbstractJdbcCatalog implements Catalog {
}
}
- protected abstract boolean dropDatabaseInternal(String databaseName) throws CatalogException;
+ protected SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
+ throws SQLException {
+ return null;
+ }
+
+ protected Set<String> getSysDatabases() {
+ return SYS_DATABASES;
+ }
+
+ protected Map<String, String> buildConnectorOptions(TablePath tablePath) {
+ Map<String, String> options = new HashMap<>(8);
+ options.put("connector", "jdbc");
+ options.put(
+ "url",
+ jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix));
+ options.put("table-name", tablePath.getFullName());
+ options.put("username", username);
+ options.put("password", pwd);
+ return options;
+ }
+
+ protected boolean createDatabaseInternal(String databaseName) {
+ try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
+ PreparedStatement ps =
+ conn.prepareStatement(
+ String.format(jdbcDialect.createDatabaseSql(databaseName)))) {
+ return ps.execute();
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "Failed creating database %s in catalog %s",
+ databaseName, this.catalogName),
+ e);
+ }
+ }
+
+ protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
+ try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
+ PreparedStatement ps =
+ conn.prepareStatement(jdbcDialect.dropDatabaseSql(databaseName))) {
+ return ps.execute();
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "Failed dropping database %s in catalog %s",
+ databaseName, this.catalogName),
+ e);
+ }
+ }
+
+ // todo: If the origin source is mysql, we can directly use create table like to create the
+ // target table?
+ protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
+ throws CatalogException {
+ String dbUrl =
+ jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix);
+ String createTableSql = jdbcDialect.createTableSql(tablePath, table);
+ try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
+ PreparedStatement ps = conn.prepareStatement(createTableSql)) {
+ return ps.execute();
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed creating table %s", tablePath.getFullName()), e);
+ }
+ }
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
index d8534d5df..f6c9cc679 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
@@ -18,16 +18,6 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.ConstraintKey;
-import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
-import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
-import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
-import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
@@ -35,26 +25,13 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalo
import com.mysql.cj.MysqlType;
import com.mysql.cj.jdbc.result.ResultSetImpl;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
public class MySqlCatalog extends AbstractJdbcCatalog {
- protected static final Set<String> SYS_DATABASES = new HashSet<>(4);
-
static {
SYS_DATABASES.add("information_schema");
SYS_DATABASES.add("mysql");
@@ -67,189 +44,12 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
super(catalogName, username, pwd, urlInfo);
}
- @Override
- public List<String> listDatabases() throws CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
- PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;")) {
-
- List<String> databases = new ArrayList<>();
- ResultSet rs = ps.executeQuery();
-
- while (rs.next()) {
- String databaseName = rs.getString(1);
- if (!SYS_DATABASES.contains(databaseName)) {
- databases.add(rs.getString(1));
- }
- }
-
- return databases;
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed listing database in catalog %s", this.catalogName), e);
- }
- }
-
- @Override
- public List<String> listTables(String databaseName)
- throws CatalogException, DatabaseNotExistException {
- if (!databaseExists(databaseName)) {
- throw new DatabaseNotExistException(this.catalogName, databaseName);
- }
-
- String dbUrl = getUrlFromDatabaseName(databaseName);
- try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
- PreparedStatement ps = conn.prepareStatement("SHOW TABLES;")) {
-
- ResultSet rs = ps.executeQuery();
-
- List<String> tables = new ArrayList<>();
-
- while (rs.next()) {
- tables.add(rs.getString(1));
- }
-
- return tables;
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed listing database in catalog %s", catalogName), e);
- }
- }
-
- @Override
- public CatalogTable getTable(TablePath tablePath)
- throws CatalogException, TableNotExistException {
- if (!tableExists(tablePath)) {
- throw new TableNotExistException(catalogName, tablePath);
- }
-
- String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
- try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
- DatabaseMetaData metaData = conn.getMetaData();
- Optional<PrimaryKey> primaryKey =
- getPrimaryKey(metaData, tablePath.getDatabaseName(), tablePath.getTableName());
- List<ConstraintKey> constraintKeys =
- getConstraintKeys(
- metaData, tablePath.getDatabaseName(), tablePath.getTableName());
-
- try (PreparedStatement ps =
- conn.prepareStatement(
- String.format(
- "SELECT * FROM %s WHERE 1 = 0;",
- tablePath.getFullNameWithQuoted()))) {
- ResultSetMetaData tableMetaData = ps.getMetaData();
- TableSchema.Builder builder = TableSchema.builder();
- // add column
- for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
- String columnName = tableMetaData.getColumnName(i);
- SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
- int columnDisplaySize = tableMetaData.getColumnDisplaySize(i);
- String comment = tableMetaData.getColumnLabel(i);
- boolean isNullable =
- tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable;
- Object defaultValue =
- getColumnDefaultValue(metaData, tablePath.getTableName(), columnName)
- .orElse(null);
-
- PhysicalColumn physicalColumn =
- PhysicalColumn.of(
- columnName,
- type,
- columnDisplaySize,
- isNullable,
- defaultValue,
- comment);
- builder.column(physicalColumn);
- }
- // add primary key
- primaryKey.ifPresent(builder::primaryKey);
- // add constraint key
- constraintKeys.forEach(builder::constraintKey);
- TableIdentifier tableIdentifier =
- TableIdentifier.of(
- catalogName, tablePath.getDatabaseName(), tablePath.getTableName());
- return CatalogTable.of(
- tableIdentifier,
- builder.build(),
- buildConnectorOptions(tablePath),
- Collections.emptyList(),
- "");
- }
-
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed getting table %s", tablePath.getFullName()), e);
- }
- }
-
- // todo: If the origin source is mysql, we can directly use create table like to create the
- // target table?
- @Override
- protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
- throws CatalogException {
- String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
- String createTableSql = MysqlCreateTableSqlBuilder.builder(tablePath, table).build();
- try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
- PreparedStatement ps = conn.prepareStatement(createTableSql)) {
- return ps.execute();
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed creating table %s", tablePath.getFullName()), e);
- }
- }
-
- @Override
- protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
- String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
- try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
- PreparedStatement ps =
- conn.prepareStatement(
- String.format(
- "DROP TABLE %s IF EXIST;", tablePath.getFullName()))) {
- // Will there exist concurrent drop for one table?
- return ps.execute();
- } catch (SQLException e) {
- throw new CatalogException(
- String.format("Failed dropping table %s", tablePath.getFullName()), e);
- }
- }
-
- @Override
- protected boolean createDatabaseInternal(String databaseName) throws CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
- PreparedStatement ps =
- conn.prepareStatement(
- String.format("CREATE DATABASE `%s`;", databaseName))) {
- return ps.execute();
- } catch (Exception e) {
- throw new CatalogException(
- String.format(
- "Failed creating database %s in catalog %s",
- databaseName, this.catalogName),
- e);
- }
- }
-
- @Override
- protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
- PreparedStatement ps =
- conn.prepareStatement(String.format("DROP DATABASE `%s`;", databaseName))) {
- return ps.execute();
- } catch (Exception e) {
- throw new CatalogException(
- String.format(
- "Failed dropping database %s in catalog %s",
- databaseName, this.catalogName),
- e);
- }
- }
-
/**
* @see com.mysql.cj.MysqlType
* @see ResultSetImpl#getObjectStoredProc(int, int)
*/
- @SuppressWarnings("unchecked")
- private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
+ @Override
+ public SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
throws SQLException {
MysqlType mysqlType = MysqlType.getByName(metadata.getColumnTypeName(colIndex));
Map<String, Object> dataTypeProperties = new HashMap<>();
@@ -257,19 +57,4 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
dataTypeProperties.put(MysqlDataTypeConvertor.SCALE, metadata.getScale(colIndex));
return new MysqlDataTypeConvertor().toSeaTunnelType(mysqlType, dataTypeProperties);
}
-
- @SuppressWarnings("MagicNumber")
- private Map<String, String> buildConnectorOptions(TablePath tablePath) {
- Map<String, String> options = new HashMap<>(8);
- options.put("connector", "jdbc");
- options.put("url", baseUrl + tablePath.getDatabaseName());
- options.put("table-name", tablePath.getFullName());
- options.put("username", username);
- options.put("password", pwd);
- return options;
- }
-
- private String getUrlFromDatabaseName(String databaseName) {
- return baseUrl + databaseName + suffix;
- }
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
index 9a015ca73..c2e300516 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
@@ -49,7 +49,7 @@ public class MysqlCreateTableSqlBuilder {
private List<ConstraintKey> constraintKeys;
- private MysqlDataTypeConvertor mysqlDataTypeConvertor;
+ private final MysqlDataTypeConvertor mysqlDataTypeConvertor;
private MysqlCreateTableSqlBuilder(String tableName) {
checkNotNull(tableName, "tableName must not be null");
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
index 25c02e6b1..f15a0e746 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
@@ -19,41 +19,21 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.ConstraintKey;
-import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
-import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
import org.apache.commons.lang3.tuple.Pair;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
public class SqlServerCatalog extends AbstractJdbcCatalog {
- private static final Set<String> SYS_DATABASES = new HashSet<>(4);
-
static {
SYS_DATABASES.add("master");
SYS_DATABASES.add("tempdb");
@@ -66,150 +46,6 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
super(catalogName, username, pwd, urlInfo);
}
- @Override
- public List<String> listDatabases() throws CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
- PreparedStatement ps = conn.prepareStatement("SELECT NAME FROM SYS.DATABASES")) {
-
- List<String> databases = new ArrayList<>();
- ResultSet rs = ps.executeQuery();
-
- while (rs.next()) {
- String databaseName = rs.getString(1);
- if (!SYS_DATABASES.contains(databaseName)) {
- databases.add(databaseName);
- }
- }
-
- return databases;
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed listing database in catalog %s", this.catalogName), e);
- }
- }
-
- @Override
- public List<String> listTables(String databaseName)
- throws CatalogException, DatabaseNotExistException {
- if (!databaseExists(databaseName)) {
- throw new DatabaseNotExistException(this.catalogName, databaseName);
- }
-
- String dbUrl = getUrlFromDatabaseName(databaseName);
- try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
- PreparedStatement ps =
- conn.prepareStatement(
- "SELECT TABLE_SCHEMA, TABLE_NAME FROM "
- + databaseName
- + ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'")) {
-
- ResultSet rs = ps.executeQuery();
-
- List<String> tables = new ArrayList<>();
-
- while (rs.next()) {
- tables.add(rs.getString(1) + "." + rs.getString(2));
- }
-
- return tables;
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed listing database in catalog %s", catalogName), e);
- }
- }
-
- @Override
- public boolean tableExists(TablePath tablePath) throws CatalogException {
- try {
- return databaseExists(tablePath.getDatabaseName())
- && listTables(tablePath.getDatabaseName())
- .contains(tablePath.getSchemaAndTableName());
- } catch (DatabaseNotExistException e) {
- return false;
- }
- }
-
- @Override
- public CatalogTable getTable(TablePath tablePath)
- throws CatalogException, TableNotExistException {
- if (!tableExists(tablePath)) {
- throw new TableNotExistException(catalogName, tablePath);
- }
-
- String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
- try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
- DatabaseMetaData metaData = conn.getMetaData();
- Optional<PrimaryKey> primaryKey =
- getPrimaryKey(
- metaData,
- tablePath.getDatabaseName(),
- tablePath.getSchemaName(),
- tablePath.getTableName());
- List<ConstraintKey> constraintKeys =
- getConstraintKeys(
- metaData,
- tablePath.getDatabaseName(),
- tablePath.getSchemaName(),
- tablePath.getTableName());
-
- try (PreparedStatement ps =
- conn.prepareStatement(
- String.format(
- "SELECT * FROM %s WHERE 1 = 0;",
- tablePath.getFullNameWithQuoted("\"")))) {
- ResultSetMetaData tableMetaData = ps.getMetaData();
- TableSchema.Builder builder = TableSchema.builder();
- // add column
- for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
- String columnName = tableMetaData.getColumnName(i);
- SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
- int columnDisplaySize = tableMetaData.getColumnDisplaySize(i);
- String comment = tableMetaData.getColumnLabel(i);
- boolean isNullable =
- tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable;
- Object defaultValue =
- getColumnDefaultValue(
- metaData,
- tablePath.getDatabaseName(),
- tablePath.getSchemaName(),
- tablePath.getTableName(),
- columnName)
- .orElse(null);
-
- PhysicalColumn physicalColumn =
- PhysicalColumn.of(
- columnName,
- type,
- columnDisplaySize,
- isNullable,
- defaultValue,
- comment);
- builder.column(physicalColumn);
- }
- // add primary key
- primaryKey.ifPresent(builder::primaryKey);
- // add constraint key
- constraintKeys.forEach(builder::constraintKey);
- TableIdentifier tableIdentifier =
- TableIdentifier.of(
- catalogName,
- tablePath.getDatabaseName(),
- tablePath.getSchemaName(),
- tablePath.getTableName());
- return CatalogTable.of(
- tableIdentifier,
- builder.build(),
- buildConnectorOptions(tablePath),
- Collections.emptyList(),
- "");
- }
-
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed getting table %s", tablePath.getFullName()), e);
- }
- }
-
@Override
protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
throws CatalogException {
@@ -217,54 +53,7 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
}
@Override
- protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
- String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
- try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
- PreparedStatement ps =
- conn.prepareStatement(
- String.format("DROP TABLE IF EXIST %s", tablePath.getFullName()))) {
- // Will there exist concurrent drop for one table?
- return ps.execute();
- } catch (SQLException e) {
- throw new CatalogException(
- String.format("Failed dropping table %s", tablePath.getFullName()), e);
- }
- }
-
- @Override
- protected boolean createDatabaseInternal(String databaseName) throws CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
- PreparedStatement ps =
- conn.prepareStatement(
- String.format("CREATE DATABASE `%s`", databaseName))) {
- return ps.execute();
- } catch (Exception e) {
- throw new CatalogException(
- String.format(
- "Failed creating database %s in catalog %s",
- databaseName, this.catalogName),
- e);
- }
- }
-
- @Override
- protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
- PreparedStatement ps =
- conn.prepareStatement(
- String.format("DROP DATABASE IF EXISTS `%s`;", databaseName))) {
- return ps.execute();
- } catch (Exception e) {
- throw new CatalogException(
- String.format(
- "Failed dropping database %s in catalog %s",
- databaseName, this.catalogName),
- e);
- }
- }
-
- @SuppressWarnings("unchecked")
- private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
+ public SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
throws SQLException {
Pair<SqlServerType, Map<String, Object>> pair =
SqlServerType.parse(metadata.getColumnTypeName(colIndex));
@@ -274,19 +63,4 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
dataTypeProperties.put(SqlServerDataTypeConvertor.SCALE, metadata.getScale(colIndex));
return new SqlServerDataTypeConvertor().toSeaTunnelType(pair.getLeft(), dataTypeProperties);
}
-
- @SuppressWarnings("MagicNumber")
- private Map<String, String> buildConnectorOptions(TablePath tablePath) {
- Map<String, String> options = new HashMap<>(8);
- options.put("connector", "jdbc");
- options.put("url", getUrlFromDatabaseName(tablePath.getDatabaseName()));
- options.put("table-name", tablePath.getFullName());
- options.put("username", username);
- options.put("password", pwd);
- return options;
- }
-
- private String getUrlFromDatabaseName(String databaseName) {
- return baseUrl + ";databaseName=" + databaseName + ";" + suffix;
- }
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
index e848498c9..0b0fa91f2 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
@@ -140,7 +140,7 @@ public enum SqlServerType implements SQLType {
public static Pair<SqlServerType, Map<String, Object>> parse(String fullTypeName) {
Map<String, Object> params = new HashMap<>();
String typeName = fullTypeName;
- if (fullTypeName.indexOf("(") != -1) {
+ if (fullTypeName.contains("(")) {
typeName = fullTypeName.substring(0, fullTypeName.indexOf("(")).trim();
String paramsStr =
fullTypeName.substring(
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 64ed388df..36e47eda1 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
@@ -193,4 +195,40 @@ public interface JdbcDialect extends Serializable {
PreparedStatement ps = conn.prepareStatement(jdbcSourceConfig.getQuery());
return ps.getMetaData();
}
+
+ default String listDatabases() {
+ return "SHOW DATABASES;";
+ }
+
+ default String getUrlFromDatabaseName(String baseUrl, String databaseName, String suffix) {
+ return baseUrl + databaseName + suffix;
+ }
+
+ default String createDatabaseSql(String databaseName) {
+ return String.format("CREATE DATABASE IF NOT EXISTS %s;", quoteIdentifier(databaseName));
+ }
+
+ default String dropDatabaseSql(String databaseName) {
+ return String.format("DROP DATABASE IF EXISTS %s;", quoteIdentifier(databaseName));
+ }
+
+ default String getTableName(ResultSet rs) throws SQLException {
+ return rs.getString(1);
+ }
+
+ default String getTableName(TablePath tablePath) {
+ return tablePath.getTableName();
+ }
+
+ default String listTableSql(String databaseName) {
+ return "SHOW TABLES;";
+ }
+
+ default String getDropTableSql(String tableName) {
+ return String.format("DROP TABLE %s IF EXIST;", tableName);
+ }
+
+ default String createTableSql(TablePath tablePath, CatalogTable catalogTable) {
+ return "";
+ }
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index 128b8ae4b..63584d8ab 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlCreateTableSqlBuilder;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
@@ -78,4 +81,9 @@ public class MysqlDialect implements JdbcDialect {
statement.setFetchSize(Integer.MIN_VALUE);
return statement;
}
+
+ @Override
+ public String createTableSql(TablePath tablePath, CatalogTable catalogTable) {
+ return MysqlCreateTableSqlBuilder.builder(tablePath, catalogTable).build();
+ }
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
index 697d2d2dc..90b376624 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
@@ -17,10 +17,13 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import java.sql.ResultSet;
+import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
@@ -100,4 +103,31 @@ public class SqlServerDialect implements JdbcDialect {
return Optional.of(upsertSQL);
}
+
+ @Override
+ public String listDatabases() {
+ return "SELECT NAME FROM SYS.DATABASES";
+ }
+
+ @Override
+ public String listTableSql(String databaseName) {
+ return "SELECT TABLE_SCHEMA, TABLE_NAME FROM "
+ + databaseName
+ + ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'";
+ }
+
+ @Override
+ public String getTableName(ResultSet rs) throws SQLException {
+ return rs.getString(1) + "." + rs.getString(2);
+ }
+
+ @Override
+ public String getTableName(TablePath tablePath) {
+ return tablePath.getSchemaName() + "." + tablePath.getTableName();
+ }
+
+ @Override
+ public String getUrlFromDatabaseName(String baseUrl, String databaseName, String suffix) {
+ return baseUrl + ";databaseName=" + databaseName + ";" + suffix;
+ }
}
diff --git a/seatunnel-connectors-v2/connector-starrocks/pom.xml b/seatunnel-connectors-v2/connector-starrocks/pom.xml
index 08e49bc0f..44b899885 100644
--- a/seatunnel-connectors-v2/connector-starrocks/pom.xml
+++ b/seatunnel-connectors-v2/connector-starrocks/pom.xml
@@ -49,6 +49,11 @@
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-jdbc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
index 7bf308b1c..79d9bf901 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -17,13 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
-import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
@@ -36,6 +32,7 @@ import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import org.apache.commons.lang3.StringUtils;
@@ -47,22 +44,18 @@ import com.mysql.cj.MysqlType;
import java.sql.Connection;
import java.sql.DriverManager;
-import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import static com.google.common.base.Preconditions.checkArgument;
-public class StarRocksCatalog implements Catalog {
+public class StarRocksCatalog extends AbstractJdbcCatalog {
protected final String catalogName;
protected String defaultDatabase = "information_schema";
@@ -71,8 +64,6 @@ public class StarRocksCatalog implements Catalog {
protected final String baseUrl;
protected String defaultUrl;
private final JdbcUrlUtil.UrlInfo urlInfo;
-
- private static final Set<String> SYS_DATABASES = new HashSet<>();
private static final Logger LOG = LoggerFactory.getLogger(StarRocksCatalog.class);
static {
@@ -82,6 +73,8 @@ public class StarRocksCatalog implements Catalog {
public StarRocksCatalog(String catalogName, String username, String pwd, String defaultUrl) {
+ super(catalogName, username, pwd, JdbcUrlUtil.getUrlInfo(defaultUrl));
+
checkArgument(StringUtils.isNotBlank(username));
checkArgument(StringUtils.isNotBlank(pwd));
checkArgument(StringUtils.isNotBlank(defaultUrl));
@@ -97,104 +90,9 @@ public class StarRocksCatalog implements Catalog {
}
@Override
- public List<String> listDatabases() throws CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
-
- PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;");
-
- List<String> databases = new ArrayList<>();
- ResultSet rs = ps.executeQuery();
-
- while (rs.next()) {
- String databaseName = rs.getString(1);
- if (!SYS_DATABASES.contains(databaseName)) {
- databases.add(rs.getString(1));
- }
- }
-
- return databases;
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed listing database in catalog %s", this.catalogName), e);
- }
- }
-
- @Override
- public List<String> listTables(String databaseName)
- throws CatalogException, DatabaseNotExistException {
- if (!databaseExists(databaseName)) {
- throw new DatabaseNotExistException(this.catalogName, databaseName);
- }
-
- try (Connection conn =
- DriverManager.getConnection(
- urlInfo.getUrlWithDatabase(databaseName), username, pwd)) {
- PreparedStatement ps = conn.prepareStatement("SHOW TABLES;");
-
- ResultSet rs = ps.executeQuery();
-
- List<String> tables = new ArrayList<>();
-
- while (rs.next()) {
- tables.add(rs.getString(1));
- }
-
- return tables;
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed listing database in catalog %s", catalogName), e);
- }
- }
-
- @Override
- public CatalogTable getTable(TablePath tablePath)
- throws CatalogException, TableNotExistException {
- if (!tableExists(tablePath)) {
- throw new TableNotExistException(catalogName, tablePath);
- }
-
- String dbUrl = urlInfo.getUrlWithDatabase(tablePath.getDatabaseName());
- try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
- Optional<PrimaryKey> primaryKey =
- getPrimaryKey(tablePath.getDatabaseName(), tablePath.getTableName());
-
- PreparedStatement ps =
- conn.prepareStatement(
- String.format(
- "SELECT * FROM %s WHERE 1 = 0;",
- tablePath.getFullNameWithQuoted()));
-
- ResultSetMetaData tableMetaData = ps.getMetaData();
-
- TableSchema.Builder builder = TableSchema.builder();
- for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
- SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
- // TODO add default value and test it
- builder.column(
- PhysicalColumn.of(
- tableMetaData.getColumnName(i),
- type,
- tableMetaData.getColumnDisplaySize(i),
- tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable,
- null,
- tableMetaData.getColumnLabel(i)));
- }
-
- primaryKey.ifPresent(builder::primaryKey);
-
- TableIdentifier tableIdentifier =
- TableIdentifier.of(
- catalogName, tablePath.getDatabaseName(), tablePath.getTableName());
- return CatalogTable.of(
- tableIdentifier,
- builder.build(),
- buildConnectorOptions(tablePath),
- Collections.emptyList(),
- "");
- } catch (Exception e) {
- throw new CatalogException(
- String.format("Failed getting table %s", tablePath.getFullName()), e);
- }
+ protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
+ throws CatalogException {
+ throw new UnsupportedOperationException("Unsupported create table");
}
@Override
@@ -209,6 +107,11 @@ public class StarRocksCatalog implements Catalog {
throw new UnsupportedOperationException();
}
+ @Override
+ protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
@@ -229,6 +132,11 @@ public class StarRocksCatalog implements Catalog {
}
}
+ @Override
+ protected boolean createDatabaseInternal(String databaseName) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
@@ -246,8 +154,14 @@ public class StarRocksCatalog implements Catalog {
}
}
+ @Override
+ protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
/** @see com.mysql.cj.MysqlType */
- private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
+ @Override
+ public SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
throws SQLException {
MysqlType starrocksType = MysqlType.getByName(metadata.getColumnTypeName(colIndex));
switch (starrocksType) {
@@ -315,7 +229,8 @@ public class StarRocksCatalog implements Catalog {
}
@SuppressWarnings("MagicNumber")
- private Map<String, String> buildConnectorOptions(TablePath tablePath) {
+ @Override
+ public Map<String, String> buildConnectorOptions(TablePath tablePath) {
Map<String, String> options = new HashMap<>(8);
options.put("connector", "starrocks");
options.put("url", baseUrl + tablePath.getDatabaseName());
@@ -369,29 +284,6 @@ public class StarRocksCatalog implements Catalog {
return res;
}
- @Override
- public String getDefaultDatabase() {
- return defaultDatabase;
- }
-
- @Override
- public void open() throws CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
- // test connection, fail early if we cannot connect to database
- conn.getCatalog();
- } catch (SQLException e) {
- throw new CatalogException(
- String.format("Failed connecting to %s via JDBC.", defaultUrl), e);
- }
-
- LOG.info("Catalog {} established connection to {}", catalogName, defaultUrl);
- }
-
- @Override
- public void close() throws CatalogException {
- LOG.info("Catalog {} closing", catalogName);
- }
-
protected Optional<PrimaryKey> getPrimaryKey(String schema, String table) throws SQLException {
List<String> pkFields = new ArrayList<>();
@@ -414,21 +306,4 @@ public class StarRocksCatalog implements Catalog {
}
return Optional.empty();
}
-
- @Override
- public boolean databaseExists(String databaseName) throws CatalogException {
- checkArgument(StringUtils.isNotBlank(databaseName));
-
- return listDatabases().contains(databaseName);
- }
-
- @Override
- public boolean tableExists(TablePath tablePath) throws CatalogException {
- try {
- return databaseExists(tablePath.getDatabaseName())
- && listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
- } catch (DatabaseNotExistException e) {
- return false;
- }
- }
}
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDialect.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDialect.java
new file mode 100644
index 000000000..9286e628f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDialect.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
+
+public class StarRocksDialect extends MysqlDialect {
+ @Override
+ public String dialectName() {
+ return "StarRocks";
+ }
+}