You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/04/19 01:52:14 UTC

[incubator-seatunnel] branch dev updated: [Improve][Catalog] refactor catalog (#4540)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b0a701cb8 [Improve][Catalog] refactor catalog (#4540)
b0a701cb8 is described below

commit b0a701cb83a2353da3b1f93a483c19cddd55747c
Author: Guangdong Liu <80...@qq.com>
AuthorDate: Wed Apr 19 09:52:06 2023 +0800

    [Improve][Catalog] refactor catalog (#4540)
---
 .../jdbc/catalog/AbstractJdbcCatalog.java          | 234 ++++++++++++++++++++-
 .../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java | 219 +------------------
 .../catalog/mysql/MysqlCreateTableSqlBuilder.java  |   2 +-
 .../jdbc/catalog/sqlserver/SqlServerCatalog.java   | 228 +-------------------
 .../jdbc/catalog/sqlserver/SqlServerType.java      |   2 +-
 .../jdbc/internal/dialect/JdbcDialect.java         |  38 ++++
 .../jdbc/internal/dialect/mysql/MysqlDialect.java  |   8 +
 .../dialect/sqlserver/SqlServerDialect.java        |  30 +++
 .../connector-starrocks/pom.xml                    |   5 +
 .../starrocks/catalog/StarRocksCatalog.java        | 177 +++-------------
 .../starrocks/catalog/StarRocksDialect.java        |  27 +++
 11 files changed, 365 insertions(+), 605 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index bd516e325..4fb68775f 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -21,14 +21,20 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
 import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
 import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -40,14 +46,19 @@ import org.slf4j.LoggerFactory;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -55,7 +66,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 public abstract class AbstractJdbcCatalog implements Catalog {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class);
-
     protected final String catalogName;
     protected final String defaultDatabase;
     protected final String username;
@@ -63,6 +73,8 @@ public abstract class AbstractJdbcCatalog implements Catalog {
     protected final String baseUrl;
     protected final String suffix;
     protected final String defaultUrl;
+    protected final JdbcDialect jdbcDialect;
+    protected static final Set<String> SYS_DATABASES = new HashSet<>();
 
     public AbstractJdbcCatalog(
             String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {
@@ -80,6 +92,7 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
         this.defaultUrl = urlInfo.getOrigin();
         this.suffix = urlInfo.getSuffix();
+        this.jdbcDialect = JdbcDialectLoader.load(this.baseUrl);
     }
 
     @Override
@@ -107,6 +120,7 @@ public abstract class AbstractJdbcCatalog implements Catalog {
     public void open() throws CatalogException {
         try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
             // test connection, fail early if we cannot connect to database
+            conn.getCatalog();
         } catch (SQLException e) {
             throw new CatalogException(
                     String.format("Failed connecting to %s via JDBC.", defaultUrl), e);
@@ -120,6 +134,56 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         LOG.info("Catalog {} closing", catalogName);
     }
 
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+
+            PreparedStatement ps = conn.prepareStatement(jdbcDialect.listDatabases());
+
+            List<String> databases = new ArrayList<>();
+            ResultSet rs = ps.executeQuery();
+
+            while (rs.next()) {
+                String databaseName = rs.getString(1);
+                if (!getSysDatabases().contains(databaseName)) {
+                    databases.add(rs.getString(1));
+                }
+            }
+
+            return databases;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", this.catalogName), e);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(this.catalogName, databaseName);
+        }
+
+        String dbUrl = jdbcDialect.getUrlFromDatabaseName(baseUrl, databaseName, suffix);
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(jdbcDialect.listTableSql(databaseName))) {
+
+            ResultSet rs = ps.executeQuery();
+
+            List<String> tables = new ArrayList<>();
+
+            while (rs.next()) {
+                tables.add(jdbcDialect.getTableName(rs));
+            }
+
+            return tables;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", catalogName), e);
+        }
+    }
+
     protected Optional<PrimaryKey> getPrimaryKey(
             DatabaseMetaData metaData, String database, String table) throws SQLException {
         return getPrimaryKey(metaData, database, table, table);
@@ -226,7 +290,8 @@ public abstract class AbstractJdbcCatalog implements Catalog {
     public boolean tableExists(TablePath tablePath) throws CatalogException {
         try {
             return databaseExists(tablePath.getDatabaseName())
-                    && listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
+                    && listTables(tablePath.getDatabaseName())
+                            .contains(jdbcDialect.getTableName(tablePath));
         } catch (DatabaseNotExistException e) {
             return false;
         }
@@ -245,8 +310,86 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         }
     }
 
-    protected abstract boolean createTableInternal(TablePath tablePath, CatalogTable table)
-            throws CatalogException;
+    public CatalogTable getTable(TablePath tablePath)
+            throws CatalogException, TableNotExistException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(catalogName, tablePath);
+        }
+
+        String dbUrl =
+                jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix);
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
+            DatabaseMetaData metaData = conn.getMetaData();
+            Optional<PrimaryKey> primaryKey =
+                    getPrimaryKey(
+                            metaData,
+                            tablePath.getDatabaseName(),
+                            tablePath.getSchemaName(),
+                            tablePath.getTableName());
+            List<ConstraintKey> constraintKeys =
+                    getConstraintKeys(
+                            metaData,
+                            tablePath.getDatabaseName(),
+                            tablePath.getSchemaName(),
+                            tablePath.getTableName());
+
+            try (PreparedStatement ps =
+                    conn.prepareStatement(
+                            String.format(
+                                    "SELECT * FROM %s WHERE 1 = 0;",
+                                    tablePath.getFullNameWithQuoted("\"")))) {
+                ResultSetMetaData tableMetaData = ps.getMetaData();
+                TableSchema.Builder builder = TableSchema.builder();
+                // add column
+                for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
+                    String columnName = tableMetaData.getColumnName(i);
+                    SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
+                    int columnDisplaySize = tableMetaData.getColumnDisplaySize(i);
+                    String comment = tableMetaData.getColumnLabel(i);
+                    boolean isNullable =
+                            tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable;
+                    Object defaultValue =
+                            getColumnDefaultValue(
+                                            metaData,
+                                            tablePath.getDatabaseName(),
+                                            tablePath.getSchemaName(),
+                                            tablePath.getTableName(),
+                                            columnName)
+                                    .orElse(null);
+
+                    PhysicalColumn physicalColumn =
+                            PhysicalColumn.of(
+                                    columnName,
+                                    type,
+                                    columnDisplaySize,
+                                    isNullable,
+                                    defaultValue,
+                                    comment);
+                    builder.column(physicalColumn);
+                }
+                // add primary key
+                primaryKey.ifPresent(builder::primaryKey);
+                // add constraint key
+                constraintKeys.forEach(builder::constraintKey);
+                TableIdentifier tableIdentifier =
+                        TableIdentifier.of(
+                                catalogName,
+                                tablePath.getDatabaseName(),
+                                tablePath.getSchemaName(),
+                                tablePath.getTableName());
+                return CatalogTable.of(
+                        tableIdentifier,
+                        builder.build(),
+                        buildConnectorOptions(tablePath),
+                        Collections.emptyList(),
+                        "");
+            }
+
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed getting table %s", tablePath.getFullName()), e);
+        }
+    }
 
     @Override
     public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
@@ -257,7 +400,20 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         }
     }
 
-    protected abstract boolean dropTableInternal(TablePath tablePath) throws CatalogException;
+    protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
+        String dbUrl =
+                jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix);
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(
+                                jdbcDialect.getDropTableSql(tablePath.getFullName()))) {
+            // Will there exist concurrent drop for one table?
+            return ps.execute();
+        } catch (SQLException e) {
+            throw new CatalogException(
+                    String.format("Failed dropping table %s", tablePath.getFullName()), e);
+        }
+    }
 
     @Override
     public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
@@ -273,8 +429,6 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         }
     }
 
-    protected abstract boolean createDatabaseInternal(String databaseName);
-
     @Override
     public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
             throws DatabaseNotExistException, CatalogException {
@@ -286,5 +440,69 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         }
     }
 
-    protected abstract boolean dropDatabaseInternal(String databaseName) throws CatalogException;
+    protected SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
+            throws SQLException {
+        return null;
+    }
+
+    protected Set<String> getSysDatabases() {
+        return SYS_DATABASES;
+    }
+
+    protected Map<String, String> buildConnectorOptions(TablePath tablePath) {
+        Map<String, String> options = new HashMap<>(8);
+        options.put("connector", "jdbc");
+        options.put(
+                "url",
+                jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix));
+        options.put("table-name", tablePath.getFullName());
+        options.put("username", username);
+        options.put("password", pwd);
+        return options;
+    }
+
+    protected boolean createDatabaseInternal(String databaseName) {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(
+                                String.format(jdbcDialect.createDatabaseSql(databaseName)))) {
+            return ps.execute();
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed creating database %s in catalog %s",
+                            databaseName, this.catalogName),
+                    e);
+        }
+    }
+
+    protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(jdbcDialect.dropDatabaseSql(databaseName))) {
+            return ps.execute();
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed dropping database %s in catalog %s",
+                            databaseName, this.catalogName),
+                    e);
+        }
+    }
+
+    // todo: If the origin source is mysql, we can directly use create table like to create the
+    // target table?
+    protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
+            throws CatalogException {
+        String dbUrl =
+                jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix);
+        String createTableSql = jdbcDialect.createTableSql(tablePath, table);
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
+                PreparedStatement ps = conn.prepareStatement(createTableSql)) {
+            return ps.execute();
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed creating table %s", tablePath.getFullName()), e);
+        }
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
index d8534d5df..f6c9cc679 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
@@ -18,16 +18,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql;
 
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.ConstraintKey;
-import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
-import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
-import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
-import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
@@ -35,26 +25,13 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalo
 import com.mysql.cj.MysqlType;
 import com.mysql.cj.jdbc.result.ResultSetImpl;
 
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
 
 public class MySqlCatalog extends AbstractJdbcCatalog {
 
-    protected static final Set<String> SYS_DATABASES = new HashSet<>(4);
-
     static {
         SYS_DATABASES.add("information_schema");
         SYS_DATABASES.add("mysql");
@@ -67,189 +44,12 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         super(catalogName, username, pwd, urlInfo);
     }
 
-    @Override
-    public List<String> listDatabases() throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
-                PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;")) {
-
-            List<String> databases = new ArrayList<>();
-            ResultSet rs = ps.executeQuery();
-
-            while (rs.next()) {
-                String databaseName = rs.getString(1);
-                if (!SYS_DATABASES.contains(databaseName)) {
-                    databases.add(rs.getString(1));
-                }
-            }
-
-            return databases;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", this.catalogName), e);
-        }
-    }
-
-    @Override
-    public List<String> listTables(String databaseName)
-            throws CatalogException, DatabaseNotExistException {
-        if (!databaseExists(databaseName)) {
-            throw new DatabaseNotExistException(this.catalogName, databaseName);
-        }
-
-        String dbUrl = getUrlFromDatabaseName(databaseName);
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
-                PreparedStatement ps = conn.prepareStatement("SHOW TABLES;")) {
-
-            ResultSet rs = ps.executeQuery();
-
-            List<String> tables = new ArrayList<>();
-
-            while (rs.next()) {
-                tables.add(rs.getString(1));
-            }
-
-            return tables;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", catalogName), e);
-        }
-    }
-
-    @Override
-    public CatalogTable getTable(TablePath tablePath)
-            throws CatalogException, TableNotExistException {
-        if (!tableExists(tablePath)) {
-            throw new TableNotExistException(catalogName, tablePath);
-        }
-
-        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
-            DatabaseMetaData metaData = conn.getMetaData();
-            Optional<PrimaryKey> primaryKey =
-                    getPrimaryKey(metaData, tablePath.getDatabaseName(), tablePath.getTableName());
-            List<ConstraintKey> constraintKeys =
-                    getConstraintKeys(
-                            metaData, tablePath.getDatabaseName(), tablePath.getTableName());
-
-            try (PreparedStatement ps =
-                    conn.prepareStatement(
-                            String.format(
-                                    "SELECT * FROM %s WHERE 1 = 0;",
-                                    tablePath.getFullNameWithQuoted()))) {
-                ResultSetMetaData tableMetaData = ps.getMetaData();
-                TableSchema.Builder builder = TableSchema.builder();
-                // add column
-                for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
-                    String columnName = tableMetaData.getColumnName(i);
-                    SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
-                    int columnDisplaySize = tableMetaData.getColumnDisplaySize(i);
-                    String comment = tableMetaData.getColumnLabel(i);
-                    boolean isNullable =
-                            tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable;
-                    Object defaultValue =
-                            getColumnDefaultValue(metaData, tablePath.getTableName(), columnName)
-                                    .orElse(null);
-
-                    PhysicalColumn physicalColumn =
-                            PhysicalColumn.of(
-                                    columnName,
-                                    type,
-                                    columnDisplaySize,
-                                    isNullable,
-                                    defaultValue,
-                                    comment);
-                    builder.column(physicalColumn);
-                }
-                // add primary key
-                primaryKey.ifPresent(builder::primaryKey);
-                // add constraint key
-                constraintKeys.forEach(builder::constraintKey);
-                TableIdentifier tableIdentifier =
-                        TableIdentifier.of(
-                                catalogName, tablePath.getDatabaseName(), tablePath.getTableName());
-                return CatalogTable.of(
-                        tableIdentifier,
-                        builder.build(),
-                        buildConnectorOptions(tablePath),
-                        Collections.emptyList(),
-                        "");
-            }
-
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed getting table %s", tablePath.getFullName()), e);
-        }
-    }
-
-    // todo: If the origin source is mysql, we can directly use create table like to create the
-    // target table?
-    @Override
-    protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
-            throws CatalogException {
-        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-        String createTableSql = MysqlCreateTableSqlBuilder.builder(tablePath, table).build();
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
-                PreparedStatement ps = conn.prepareStatement(createTableSql)) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed creating table %s", tablePath.getFullName()), e);
-        }
-    }
-
-    @Override
-    protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
-        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
-                                String.format(
-                                        "DROP TABLE %s IF EXIST;", tablePath.getFullName()))) {
-            // Will there exist concurrent drop for one table?
-            return ps.execute();
-        } catch (SQLException e) {
-            throw new CatalogException(
-                    String.format("Failed dropping table %s", tablePath.getFullName()), e);
-        }
-    }
-
-    @Override
-    protected boolean createDatabaseInternal(String databaseName) throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
-                                String.format("CREATE DATABASE `%s`;", databaseName))) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format(
-                            "Failed creating database %s in catalog %s",
-                            databaseName, this.catalogName),
-                    e);
-        }
-    }
-
-    @Override
-    protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(String.format("DROP DATABASE `%s`;", databaseName))) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format(
-                            "Failed dropping database %s in catalog %s",
-                            databaseName, this.catalogName),
-                    e);
-        }
-    }
-
     /**
      * @see com.mysql.cj.MysqlType
      * @see ResultSetImpl#getObjectStoredProc(int, int)
      */
-    @SuppressWarnings("unchecked")
-    private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
+    @Override
+    public SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
             throws SQLException {
         MysqlType mysqlType = MysqlType.getByName(metadata.getColumnTypeName(colIndex));
         Map<String, Object> dataTypeProperties = new HashMap<>();
@@ -257,19 +57,4 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         dataTypeProperties.put(MysqlDataTypeConvertor.SCALE, metadata.getScale(colIndex));
         return new MysqlDataTypeConvertor().toSeaTunnelType(mysqlType, dataTypeProperties);
     }
-
-    @SuppressWarnings("MagicNumber")
-    private Map<String, String> buildConnectorOptions(TablePath tablePath) {
-        Map<String, String> options = new HashMap<>(8);
-        options.put("connector", "jdbc");
-        options.put("url", baseUrl + tablePath.getDatabaseName());
-        options.put("table-name", tablePath.getFullName());
-        options.put("username", username);
-        options.put("password", pwd);
-        return options;
-    }
-
-    private String getUrlFromDatabaseName(String databaseName) {
-        return baseUrl + databaseName + suffix;
-    }
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
index 9a015ca73..c2e300516 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
@@ -49,7 +49,7 @@ public class MysqlCreateTableSqlBuilder {
 
     private List<ConstraintKey> constraintKeys;
 
-    private MysqlDataTypeConvertor mysqlDataTypeConvertor;
+    private final MysqlDataTypeConvertor mysqlDataTypeConvertor;
 
     private MysqlCreateTableSqlBuilder(String tableName) {
         checkNotNull(tableName, "tableName must not be null");
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
index 25c02e6b1..f15a0e746 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
@@ -19,41 +19,21 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.ConstraintKey;
-import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
-import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
 
 import org.apache.commons.lang3.tuple.Pair;
 
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
 
 public class SqlServerCatalog extends AbstractJdbcCatalog {
 
-    private static final Set<String> SYS_DATABASES = new HashSet<>(4);
-
     static {
         SYS_DATABASES.add("master");
         SYS_DATABASES.add("tempdb");
@@ -66,150 +46,6 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
         super(catalogName, username, pwd, urlInfo);
     }
 
-    @Override
-    public List<String> listDatabases() throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
-                PreparedStatement ps = conn.prepareStatement("SELECT NAME FROM SYS.DATABASES")) {
-
-            List<String> databases = new ArrayList<>();
-            ResultSet rs = ps.executeQuery();
-
-            while (rs.next()) {
-                String databaseName = rs.getString(1);
-                if (!SYS_DATABASES.contains(databaseName)) {
-                    databases.add(databaseName);
-                }
-            }
-
-            return databases;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", this.catalogName), e);
-        }
-    }
-
-    @Override
-    public List<String> listTables(String databaseName)
-            throws CatalogException, DatabaseNotExistException {
-        if (!databaseExists(databaseName)) {
-            throw new DatabaseNotExistException(this.catalogName, databaseName);
-        }
-
-        String dbUrl = getUrlFromDatabaseName(databaseName);
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
-                                "SELECT TABLE_SCHEMA, TABLE_NAME FROM "
-                                        + databaseName
-                                        + ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'")) {
-
-            ResultSet rs = ps.executeQuery();
-
-            List<String> tables = new ArrayList<>();
-
-            while (rs.next()) {
-                tables.add(rs.getString(1) + "." + rs.getString(2));
-            }
-
-            return tables;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", catalogName), e);
-        }
-    }
-
-    @Override
-    public boolean tableExists(TablePath tablePath) throws CatalogException {
-        try {
-            return databaseExists(tablePath.getDatabaseName())
-                    && listTables(tablePath.getDatabaseName())
-                            .contains(tablePath.getSchemaAndTableName());
-        } catch (DatabaseNotExistException e) {
-            return false;
-        }
-    }
-
-    @Override
-    public CatalogTable getTable(TablePath tablePath)
-            throws CatalogException, TableNotExistException {
-        if (!tableExists(tablePath)) {
-            throw new TableNotExistException(catalogName, tablePath);
-        }
-
-        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
-            DatabaseMetaData metaData = conn.getMetaData();
-            Optional<PrimaryKey> primaryKey =
-                    getPrimaryKey(
-                            metaData,
-                            tablePath.getDatabaseName(),
-                            tablePath.getSchemaName(),
-                            tablePath.getTableName());
-            List<ConstraintKey> constraintKeys =
-                    getConstraintKeys(
-                            metaData,
-                            tablePath.getDatabaseName(),
-                            tablePath.getSchemaName(),
-                            tablePath.getTableName());
-
-            try (PreparedStatement ps =
-                    conn.prepareStatement(
-                            String.format(
-                                    "SELECT * FROM %s WHERE 1 = 0;",
-                                    tablePath.getFullNameWithQuoted("\"")))) {
-                ResultSetMetaData tableMetaData = ps.getMetaData();
-                TableSchema.Builder builder = TableSchema.builder();
-                // add column
-                for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
-                    String columnName = tableMetaData.getColumnName(i);
-                    SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
-                    int columnDisplaySize = tableMetaData.getColumnDisplaySize(i);
-                    String comment = tableMetaData.getColumnLabel(i);
-                    boolean isNullable =
-                            tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable;
-                    Object defaultValue =
-                            getColumnDefaultValue(
-                                            metaData,
-                                            tablePath.getDatabaseName(),
-                                            tablePath.getSchemaName(),
-                                            tablePath.getTableName(),
-                                            columnName)
-                                    .orElse(null);
-
-                    PhysicalColumn physicalColumn =
-                            PhysicalColumn.of(
-                                    columnName,
-                                    type,
-                                    columnDisplaySize,
-                                    isNullable,
-                                    defaultValue,
-                                    comment);
-                    builder.column(physicalColumn);
-                }
-                // add primary key
-                primaryKey.ifPresent(builder::primaryKey);
-                // add constraint key
-                constraintKeys.forEach(builder::constraintKey);
-                TableIdentifier tableIdentifier =
-                        TableIdentifier.of(
-                                catalogName,
-                                tablePath.getDatabaseName(),
-                                tablePath.getSchemaName(),
-                                tablePath.getTableName());
-                return CatalogTable.of(
-                        tableIdentifier,
-                        builder.build(),
-                        buildConnectorOptions(tablePath),
-                        Collections.emptyList(),
-                        "");
-            }
-
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed getting table %s", tablePath.getFullName()), e);
-        }
-    }
-
     @Override
     protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
             throws CatalogException {
@@ -217,54 +53,7 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
     }
 
     @Override
-    protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
-        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
-                                String.format("DROP TABLE IF EXIST %s", tablePath.getFullName()))) {
-            // Will there exist concurrent drop for one table?
-            return ps.execute();
-        } catch (SQLException e) {
-            throw new CatalogException(
-                    String.format("Failed dropping table %s", tablePath.getFullName()), e);
-        }
-    }
-
-    @Override
-    protected boolean createDatabaseInternal(String databaseName) throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
-                                String.format("CREATE DATABASE `%s`", databaseName))) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format(
-                            "Failed creating database %s in catalog %s",
-                            databaseName, this.catalogName),
-                    e);
-        }
-    }
-
-    @Override
-    protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
-                                String.format("DROP DATABASE IF EXISTS `%s`;", databaseName))) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format(
-                            "Failed dropping database %s in catalog %s",
-                            databaseName, this.catalogName),
-                    e);
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
+    public SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
             throws SQLException {
         Pair<SqlServerType, Map<String, Object>> pair =
                 SqlServerType.parse(metadata.getColumnTypeName(colIndex));
@@ -274,19 +63,4 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
         dataTypeProperties.put(SqlServerDataTypeConvertor.SCALE, metadata.getScale(colIndex));
         return new SqlServerDataTypeConvertor().toSeaTunnelType(pair.getLeft(), dataTypeProperties);
     }
-
-    @SuppressWarnings("MagicNumber")
-    private Map<String, String> buildConnectorOptions(TablePath tablePath) {
-        Map<String, String> options = new HashMap<>(8);
-        options.put("connector", "jdbc");
-        options.put("url", getUrlFromDatabaseName(tablePath.getDatabaseName()));
-        options.put("table-name", tablePath.getFullName());
-        options.put("username", username);
-        options.put("password", pwd);
-        return options;
-    }
-
-    private String getUrlFromDatabaseName(String databaseName) {
-        return baseUrl + ";databaseName=" + databaseName + ";" + suffix;
-    }
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
index e848498c9..0b0fa91f2 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
@@ -140,7 +140,7 @@ public enum SqlServerType implements SQLType {
     public static Pair<SqlServerType, Map<String, Object>> parse(String fullTypeName) {
         Map<String, Object> params = new HashMap<>();
         String typeName = fullTypeName;
-        if (fullTypeName.indexOf("(") != -1) {
+        if (fullTypeName.contains("(")) {
             typeName = fullTypeName.substring(0, fullTypeName.indexOf("(")).trim();
             String paramsStr =
                     fullTypeName.substring(
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 64ed388df..36e47eda1 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 
@@ -193,4 +195,40 @@ public interface JdbcDialect extends Serializable {
         PreparedStatement ps = conn.prepareStatement(jdbcSourceConfig.getQuery());
         return ps.getMetaData();
     }
+
+    default String listDatabases() {
+        return "SHOW DATABASES;";
+    }
+
+    default String getUrlFromDatabaseName(String baseUrl, String databaseName, String suffix) {
+        return baseUrl + databaseName + suffix;
+    }
+
+    default String createDatabaseSql(String databaseName) {
+        return String.format("CREATE DATABASE IF NOT EXISTS %s;", quoteIdentifier(databaseName));
+    }
+
+    default String dropDatabaseSql(String databaseName) {
+        return String.format("DROP DATABASE IF EXISTS %s;", quoteIdentifier(databaseName));
+    }
+
+    default String getTableName(ResultSet rs) throws SQLException {
+        return rs.getString(1);
+    }
+
+    default String getTableName(TablePath tablePath) {
+        return tablePath.getTableName();
+    }
+
+    default String listTableSql(String databaseName) {
+        return "SHOW TABLES;";
+    }
+
+    default String getDropTableSql(String tableName) {
+        return String.format("DROP TABLE %s IF EXIST;", tableName);
+    }
+
+    default String createTableSql(TablePath tablePath, CatalogTable catalogTable) {
+        return "";
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index 128b8ae4b..63584d8ab 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlCreateTableSqlBuilder;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
@@ -78,4 +81,9 @@ public class MysqlDialect implements JdbcDialect {
         statement.setFetchSize(Integer.MIN_VALUE);
         return statement;
     }
+
+    @Override
+    public String createTableSql(TablePath tablePath, CatalogTable catalogTable) {
+        return MysqlCreateTableSqlBuilder.builder(tablePath, catalogTable).build();
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
index 697d2d2dc..90b376624 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
@@ -17,10 +17,13 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver;
 
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
 
+import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
@@ -100,4 +103,31 @@ public class SqlServerDialect implements JdbcDialect {
 
         return Optional.of(upsertSQL);
     }
+
+    @Override
+    public String listDatabases() {
+        return "SELECT NAME FROM SYS.DATABASES";
+    }
+
+    @Override
+    public String listTableSql(String databaseName) {
+        return "SELECT TABLE_SCHEMA, TABLE_NAME FROM "
+                + databaseName
+                + ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'";
+    }
+
+    @Override
+    public String getTableName(ResultSet rs) throws SQLException {
+        return rs.getString(1) + "." + rs.getString(2);
+    }
+
+    @Override
+    public String getTableName(TablePath tablePath) {
+        return tablePath.getSchemaName() + "." + tablePath.getTableName();
+    }
+
+    @Override
+    public String getUrlFromDatabaseName(String baseUrl, String databaseName, String suffix) {
+        return baseUrl + ";databaseName=" + databaseName + ";" + suffix;
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-starrocks/pom.xml b/seatunnel-connectors-v2/connector-starrocks/pom.xml
index 08e49bc0f..44b899885 100644
--- a/seatunnel-connectors-v2/connector-starrocks/pom.xml
+++ b/seatunnel-connectors-v2/connector-starrocks/pom.xml
@@ -49,6 +49,11 @@
             <artifactId>connector-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-jdbc</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
index 7bf308b1c..79d9bf901 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -17,13 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
 
-import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
 import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
@@ -36,6 +32,7 @@ import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
 import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
 
 import org.apache.commons.lang3.StringUtils;
@@ -47,22 +44,18 @@ import com.mysql.cj.MysqlType;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-public class StarRocksCatalog implements Catalog {
+public class StarRocksCatalog extends AbstractJdbcCatalog {
 
     protected final String catalogName;
     protected String defaultDatabase = "information_schema";
@@ -71,8 +64,6 @@ public class StarRocksCatalog implements Catalog {
     protected final String baseUrl;
     protected String defaultUrl;
     private final JdbcUrlUtil.UrlInfo urlInfo;
-
-    private static final Set<String> SYS_DATABASES = new HashSet<>();
     private static final Logger LOG = LoggerFactory.getLogger(StarRocksCatalog.class);
 
     static {
@@ -82,6 +73,8 @@ public class StarRocksCatalog implements Catalog {
 
     public StarRocksCatalog(String catalogName, String username, String pwd, String defaultUrl) {
 
+        super(catalogName, username, pwd, JdbcUrlUtil.getUrlInfo(defaultUrl));
+
         checkArgument(StringUtils.isNotBlank(username));
         checkArgument(StringUtils.isNotBlank(pwd));
         checkArgument(StringUtils.isNotBlank(defaultUrl));
@@ -97,104 +90,9 @@ public class StarRocksCatalog implements Catalog {
     }
 
     @Override
-    public List<String> listDatabases() throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
-
-            PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;");
-
-            List<String> databases = new ArrayList<>();
-            ResultSet rs = ps.executeQuery();
-
-            while (rs.next()) {
-                String databaseName = rs.getString(1);
-                if (!SYS_DATABASES.contains(databaseName)) {
-                    databases.add(rs.getString(1));
-                }
-            }
-
-            return databases;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", this.catalogName), e);
-        }
-    }
-
-    @Override
-    public List<String> listTables(String databaseName)
-            throws CatalogException, DatabaseNotExistException {
-        if (!databaseExists(databaseName)) {
-            throw new DatabaseNotExistException(this.catalogName, databaseName);
-        }
-
-        try (Connection conn =
-                DriverManager.getConnection(
-                        urlInfo.getUrlWithDatabase(databaseName), username, pwd)) {
-            PreparedStatement ps = conn.prepareStatement("SHOW TABLES;");
-
-            ResultSet rs = ps.executeQuery();
-
-            List<String> tables = new ArrayList<>();
-
-            while (rs.next()) {
-                tables.add(rs.getString(1));
-            }
-
-            return tables;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", catalogName), e);
-        }
-    }
-
-    @Override
-    public CatalogTable getTable(TablePath tablePath)
-            throws CatalogException, TableNotExistException {
-        if (!tableExists(tablePath)) {
-            throw new TableNotExistException(catalogName, tablePath);
-        }
-
-        String dbUrl = urlInfo.getUrlWithDatabase(tablePath.getDatabaseName());
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
-            Optional<PrimaryKey> primaryKey =
-                    getPrimaryKey(tablePath.getDatabaseName(), tablePath.getTableName());
-
-            PreparedStatement ps =
-                    conn.prepareStatement(
-                            String.format(
-                                    "SELECT * FROM %s WHERE 1 = 0;",
-                                    tablePath.getFullNameWithQuoted()));
-
-            ResultSetMetaData tableMetaData = ps.getMetaData();
-
-            TableSchema.Builder builder = TableSchema.builder();
-            for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
-                SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
-                // TODO add default value and test it
-                builder.column(
-                        PhysicalColumn.of(
-                                tableMetaData.getColumnName(i),
-                                type,
-                                tableMetaData.getColumnDisplaySize(i),
-                                tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable,
-                                null,
-                                tableMetaData.getColumnLabel(i)));
-            }
-
-            primaryKey.ifPresent(builder::primaryKey);
-
-            TableIdentifier tableIdentifier =
-                    TableIdentifier.of(
-                            catalogName, tablePath.getDatabaseName(), tablePath.getTableName());
-            return CatalogTable.of(
-                    tableIdentifier,
-                    builder.build(),
-                    buildConnectorOptions(tablePath),
-                    Collections.emptyList(),
-                    "");
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed getting table %s", tablePath.getFullName()), e);
-        }
+    protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
+            throws CatalogException {
+        throw new UnsupportedOperationException("Unsupported create table");
     }
 
     @Override
@@ -209,6 +107,11 @@ public class StarRocksCatalog implements Catalog {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
             throws DatabaseAlreadyExistException, CatalogException {
@@ -229,6 +132,11 @@ public class StarRocksCatalog implements Catalog {
         }
     }
 
+    @Override
+    protected boolean createDatabaseInternal(String databaseName) {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
             throws DatabaseNotExistException, CatalogException {
@@ -246,8 +154,14 @@ public class StarRocksCatalog implements Catalog {
         }
     }
 
+    @Override
+    protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
     /** @see com.mysql.cj.MysqlType */
-    private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
+    @Override
+    public SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
             throws SQLException {
         MysqlType starrocksType = MysqlType.getByName(metadata.getColumnTypeName(colIndex));
         switch (starrocksType) {
@@ -315,7 +229,8 @@ public class StarRocksCatalog implements Catalog {
     }
 
     @SuppressWarnings("MagicNumber")
-    private Map<String, String> buildConnectorOptions(TablePath tablePath) {
+    @Override
+    public Map<String, String> buildConnectorOptions(TablePath tablePath) {
         Map<String, String> options = new HashMap<>(8);
         options.put("connector", "starrocks");
         options.put("url", baseUrl + tablePath.getDatabaseName());
@@ -369,29 +284,6 @@ public class StarRocksCatalog implements Catalog {
         return res;
     }
 
-    @Override
-    public String getDefaultDatabase() {
-        return defaultDatabase;
-    }
-
-    @Override
-    public void open() throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
-            // test connection, fail early if we cannot connect to database
-            conn.getCatalog();
-        } catch (SQLException e) {
-            throw new CatalogException(
-                    String.format("Failed connecting to %s via JDBC.", defaultUrl), e);
-        }
-
-        LOG.info("Catalog {} established connection to {}", catalogName, defaultUrl);
-    }
-
-    @Override
-    public void close() throws CatalogException {
-        LOG.info("Catalog {} closing", catalogName);
-    }
-
     protected Optional<PrimaryKey> getPrimaryKey(String schema, String table) throws SQLException {
 
         List<String> pkFields = new ArrayList<>();
@@ -414,21 +306,4 @@ public class StarRocksCatalog implements Catalog {
         }
         return Optional.empty();
     }
-
-    @Override
-    public boolean databaseExists(String databaseName) throws CatalogException {
-        checkArgument(StringUtils.isNotBlank(databaseName));
-
-        return listDatabases().contains(databaseName);
-    }
-
-    @Override
-    public boolean tableExists(TablePath tablePath) throws CatalogException {
-        try {
-            return databaseExists(tablePath.getDatabaseName())
-                    && listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
-        } catch (DatabaseNotExistException e) {
-            return false;
-        }
-    }
 }
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDialect.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDialect.java
new file mode 100644
index 000000000..9286e628f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDialect.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
+
+public class StarRocksDialect extends MysqlDialect {
+    @Override
+    public String dialectName() {
+        return "StarRocks";
+    }
+}