You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by li...@apache.org on 2023/04/20 07:07:33 UTC

[incubator-seatunnel] branch revert-4628-revert-4540-refactor-catalog created (now 063c6e4cd)

This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a change to branch revert-4628-revert-4540-refactor-catalog
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


      at 063c6e4cd Revert "Revert "[Improve][Catalog] refactor catalog (#4540)" (#4628)"

This branch includes the following new commits:

     new 063c6e4cd Revert "Revert "[Improve][Catalog] refactor catalog (#4540)" (#4628)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-seatunnel] 01/01: Revert "Revert "[Improve][Catalog] refactor catalog (#4540)" (#4628)"

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch revert-4628-revert-4540-refactor-catalog
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git

commit 063c6e4cdf57dbdee6f5d0ba7a1e823cbe814ccf
Author: Guangdong Liu <80...@qq.com>
AuthorDate: Thu Apr 20 15:07:29 2023 +0800

    Revert "Revert "[Improve][Catalog] refactor catalog (#4540)" (#4628)"
    
    This reverts commit 2d1933195d39271134bff04bd68e5723ba627745.
---
 .../jdbc/catalog/AbstractJdbcCatalog.java          | 234 ++++++++++++++++++++-
 .../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java | 219 +------------------
 .../catalog/mysql/MysqlCreateTableSqlBuilder.java  |   2 +-
 .../jdbc/catalog/sqlserver/SqlServerCatalog.java   | 228 +-------------------
 .../jdbc/catalog/sqlserver/SqlServerType.java      |   2 +-
 .../jdbc/internal/dialect/JdbcDialect.java         |  38 ++++
 .../jdbc/internal/dialect/mysql/MysqlDialect.java  |   8 +
 .../dialect/sqlserver/SqlServerDialect.java        |  30 +++
 .../connector-starrocks/pom.xml                    |   5 +
 .../starrocks/catalog/StarRocksCatalog.java        | 177 +++-------------
 .../starrocks/catalog/StarRocksDialect.java        |  27 +++
 11 files changed, 365 insertions(+), 605 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index bd516e325..4fb68775f 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -21,14 +21,20 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
 import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
 import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -40,14 +46,19 @@ import org.slf4j.LoggerFactory;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -55,7 +66,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 public abstract class AbstractJdbcCatalog implements Catalog {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class);
-
     protected final String catalogName;
     protected final String defaultDatabase;
     protected final String username;
@@ -63,6 +73,8 @@ public abstract class AbstractJdbcCatalog implements Catalog {
     protected final String baseUrl;
     protected final String suffix;
     protected final String defaultUrl;
+    protected final JdbcDialect jdbcDialect;
+    protected static final Set<String> SYS_DATABASES = new HashSet<>();
 
     public AbstractJdbcCatalog(
             String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {
@@ -80,6 +92,7 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
         this.defaultUrl = urlInfo.getOrigin();
         this.suffix = urlInfo.getSuffix();
+        this.jdbcDialect = JdbcDialectLoader.load(this.baseUrl);
     }
 
     @Override
@@ -107,6 +120,7 @@ public abstract class AbstractJdbcCatalog implements Catalog {
     public void open() throws CatalogException {
         try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
             // test connection, fail early if we cannot connect to database
+            conn.getCatalog();
         } catch (SQLException e) {
             throw new CatalogException(
                     String.format("Failed connecting to %s via JDBC.", defaultUrl), e);
@@ -120,6 +134,56 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         LOG.info("Catalog {} closing", catalogName);
     }
 
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+
+            PreparedStatement ps = conn.prepareStatement(jdbcDialect.listDatabases());
+
+            List<String> databases = new ArrayList<>();
+            ResultSet rs = ps.executeQuery();
+
+            while (rs.next()) {
+                String databaseName = rs.getString(1);
+                if (!getSysDatabases().contains(databaseName)) {
+                    databases.add(rs.getString(1));
+                }
+            }
+
+            return databases;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", this.catalogName), e);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(this.catalogName, databaseName);
+        }
+
+        String dbUrl = jdbcDialect.getUrlFromDatabaseName(baseUrl, databaseName, suffix);
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(jdbcDialect.listTableSql(databaseName))) {
+
+            ResultSet rs = ps.executeQuery();
+
+            List<String> tables = new ArrayList<>();
+
+            while (rs.next()) {
+                tables.add(jdbcDialect.getTableName(rs));
+            }
+
+            return tables;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", catalogName), e);
+        }
+    }
+
     protected Optional<PrimaryKey> getPrimaryKey(
             DatabaseMetaData metaData, String database, String table) throws SQLException {
         return getPrimaryKey(metaData, database, table, table);
@@ -226,7 +290,8 @@ public abstract class AbstractJdbcCatalog implements Catalog {
     public boolean tableExists(TablePath tablePath) throws CatalogException {
         try {
             return databaseExists(tablePath.getDatabaseName())
-                    && listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
+                    && listTables(tablePath.getDatabaseName())
+                            .contains(jdbcDialect.getTableName(tablePath));
         } catch (DatabaseNotExistException e) {
             return false;
         }
@@ -245,8 +310,86 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         }
     }
 
-    protected abstract boolean createTableInternal(TablePath tablePath, CatalogTable table)
-            throws CatalogException;
+    public CatalogTable getTable(TablePath tablePath)
+            throws CatalogException, TableNotExistException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(catalogName, tablePath);
+        }
+
+        String dbUrl =
+                jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix);
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
+            DatabaseMetaData metaData = conn.getMetaData();
+            Optional<PrimaryKey> primaryKey =
+                    getPrimaryKey(
+                            metaData,
+                            tablePath.getDatabaseName(),
+                            tablePath.getSchemaName(),
+                            tablePath.getTableName());
+            List<ConstraintKey> constraintKeys =
+                    getConstraintKeys(
+                            metaData,
+                            tablePath.getDatabaseName(),
+                            tablePath.getSchemaName(),
+                            tablePath.getTableName());
+
+            try (PreparedStatement ps =
+                    conn.prepareStatement(
+                            String.format(
+                                    "SELECT * FROM %s WHERE 1 = 0;",
+                                    tablePath.getFullNameWithQuoted("\"")))) {
+                ResultSetMetaData tableMetaData = ps.getMetaData();
+                TableSchema.Builder builder = TableSchema.builder();
+                // add column
+                for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
+                    String columnName = tableMetaData.getColumnName(i);
+                    SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
+                    int columnDisplaySize = tableMetaData.getColumnDisplaySize(i);
+                    String comment = tableMetaData.getColumnLabel(i);
+                    boolean isNullable =
+                            tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable;
+                    Object defaultValue =
+                            getColumnDefaultValue(
+                                            metaData,
+                                            tablePath.getDatabaseName(),
+                                            tablePath.getSchemaName(),
+                                            tablePath.getTableName(),
+                                            columnName)
+                                    .orElse(null);
+
+                    PhysicalColumn physicalColumn =
+                            PhysicalColumn.of(
+                                    columnName,
+                                    type,
+                                    columnDisplaySize,
+                                    isNullable,
+                                    defaultValue,
+                                    comment);
+                    builder.column(physicalColumn);
+                }
+                // add primary key
+                primaryKey.ifPresent(builder::primaryKey);
+                // add constraint key
+                constraintKeys.forEach(builder::constraintKey);
+                TableIdentifier tableIdentifier =
+                        TableIdentifier.of(
+                                catalogName,
+                                tablePath.getDatabaseName(),
+                                tablePath.getSchemaName(),
+                                tablePath.getTableName());
+                return CatalogTable.of(
+                        tableIdentifier,
+                        builder.build(),
+                        buildConnectorOptions(tablePath),
+                        Collections.emptyList(),
+                        "");
+            }
+
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed getting table %s", tablePath.getFullName()), e);
+        }
+    }
 
     @Override
     public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
@@ -257,7 +400,20 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         }
     }
 
-    protected abstract boolean dropTableInternal(TablePath tablePath) throws CatalogException;
+    protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
+        String dbUrl =
+                jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix);
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(
+                                jdbcDialect.getDropTableSql(tablePath.getFullName()))) {
+            // Will there exist concurrent drop for one table?
+            return ps.execute();
+        } catch (SQLException e) {
+            throw new CatalogException(
+                    String.format("Failed dropping table %s", tablePath.getFullName()), e);
+        }
+    }
 
     @Override
     public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
@@ -273,8 +429,6 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         }
     }
 
-    protected abstract boolean createDatabaseInternal(String databaseName);
-
     @Override
     public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
             throws DatabaseNotExistException, CatalogException {
@@ -286,5 +440,69 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         }
     }
 
-    protected abstract boolean dropDatabaseInternal(String databaseName) throws CatalogException;
+    protected SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
+            throws SQLException {
+        return null;
+    }
+
+    protected Set<String> getSysDatabases() {
+        return SYS_DATABASES;
+    }
+
+    protected Map<String, String> buildConnectorOptions(TablePath tablePath) {
+        Map<String, String> options = new HashMap<>(8);
+        options.put("connector", "jdbc");
+        options.put(
+                "url",
+                jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix));
+        options.put("table-name", tablePath.getFullName());
+        options.put("username", username);
+        options.put("password", pwd);
+        return options;
+    }
+
+    protected boolean createDatabaseInternal(String databaseName) {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(
+                                String.format(jdbcDialect.createDatabaseSql(databaseName)))) {
+            return ps.execute();
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed creating database %s in catalog %s",
+                            databaseName, this.catalogName),
+                    e);
+        }
+    }
+
+    protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(jdbcDialect.dropDatabaseSql(databaseName))) {
+            return ps.execute();
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed dropping database %s in catalog %s",
+                            databaseName, this.catalogName),
+                    e);
+        }
+    }
+
+    // todo: If the origin source is mysql, we can directly use create table like to create the
+    // target table?
+    protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
+            throws CatalogException {
+        String dbUrl =
+                jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix);
+        String createTableSql = jdbcDialect.createTableSql(tablePath, table);
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
+                PreparedStatement ps = conn.prepareStatement(createTableSql)) {
+            return ps.execute();
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed creating table %s", tablePath.getFullName()), e);
+        }
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
index d8534d5df..f6c9cc679 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
@@ -18,16 +18,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql;
 
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.ConstraintKey;
-import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
-import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
-import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
-import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
@@ -35,26 +25,13 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalo
 import com.mysql.cj.MysqlType;
 import com.mysql.cj.jdbc.result.ResultSetImpl;
 
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
 
 public class MySqlCatalog extends AbstractJdbcCatalog {
 
-    protected static final Set<String> SYS_DATABASES = new HashSet<>(4);
-
     static {
         SYS_DATABASES.add("information_schema");
         SYS_DATABASES.add("mysql");
@@ -67,189 +44,12 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         super(catalogName, username, pwd, urlInfo);
     }
 
-    @Override
-    public List<String> listDatabases() throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
-                PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;")) {
-
-            List<String> databases = new ArrayList<>();
-            ResultSet rs = ps.executeQuery();
-
-            while (rs.next()) {
-                String databaseName = rs.getString(1);
-                if (!SYS_DATABASES.contains(databaseName)) {
-                    databases.add(rs.getString(1));
-                }
-            }
-
-            return databases;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", this.catalogName), e);
-        }
-    }
-
-    @Override
-    public List<String> listTables(String databaseName)
-            throws CatalogException, DatabaseNotExistException {
-        if (!databaseExists(databaseName)) {
-            throw new DatabaseNotExistException(this.catalogName, databaseName);
-        }
-
-        String dbUrl = getUrlFromDatabaseName(databaseName);
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
-                PreparedStatement ps = conn.prepareStatement("SHOW TABLES;")) {
-
-            ResultSet rs = ps.executeQuery();
-
-            List<String> tables = new ArrayList<>();
-
-            while (rs.next()) {
-                tables.add(rs.getString(1));
-            }
-
-            return tables;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", catalogName), e);
-        }
-    }
-
-    @Override
-    public CatalogTable getTable(TablePath tablePath)
-            throws CatalogException, TableNotExistException {
-        if (!tableExists(tablePath)) {
-            throw new TableNotExistException(catalogName, tablePath);
-        }
-
-        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
-            DatabaseMetaData metaData = conn.getMetaData();
-            Optional<PrimaryKey> primaryKey =
-                    getPrimaryKey(metaData, tablePath.getDatabaseName(), tablePath.getTableName());
-            List<ConstraintKey> constraintKeys =
-                    getConstraintKeys(
-                            metaData, tablePath.getDatabaseName(), tablePath.getTableName());
-
-            try (PreparedStatement ps =
-                    conn.prepareStatement(
-                            String.format(
-                                    "SELECT * FROM %s WHERE 1 = 0;",
-                                    tablePath.getFullNameWithQuoted()))) {
-                ResultSetMetaData tableMetaData = ps.getMetaData();
-                TableSchema.Builder builder = TableSchema.builder();
-                // add column
-                for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
-                    String columnName = tableMetaData.getColumnName(i);
-                    SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
-                    int columnDisplaySize = tableMetaData.getColumnDisplaySize(i);
-                    String comment = tableMetaData.getColumnLabel(i);
-                    boolean isNullable =
-                            tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable;
-                    Object defaultValue =
-                            getColumnDefaultValue(metaData, tablePath.getTableName(), columnName)
-                                    .orElse(null);
-
-                    PhysicalColumn physicalColumn =
-                            PhysicalColumn.of(
-                                    columnName,
-                                    type,
-                                    columnDisplaySize,
-                                    isNullable,
-                                    defaultValue,
-                                    comment);
-                    builder.column(physicalColumn);
-                }
-                // add primary key
-                primaryKey.ifPresent(builder::primaryKey);
-                // add constraint key
-                constraintKeys.forEach(builder::constraintKey);
-                TableIdentifier tableIdentifier =
-                        TableIdentifier.of(
-                                catalogName, tablePath.getDatabaseName(), tablePath.getTableName());
-                return CatalogTable.of(
-                        tableIdentifier,
-                        builder.build(),
-                        buildConnectorOptions(tablePath),
-                        Collections.emptyList(),
-                        "");
-            }
-
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed getting table %s", tablePath.getFullName()), e);
-        }
-    }
-
-    // todo: If the origin source is mysql, we can directly use create table like to create the
-    // target table?
-    @Override
-    protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
-            throws CatalogException {
-        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-        String createTableSql = MysqlCreateTableSqlBuilder.builder(tablePath, table).build();
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
-                PreparedStatement ps = conn.prepareStatement(createTableSql)) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed creating table %s", tablePath.getFullName()), e);
-        }
-    }
-
-    @Override
-    protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
-        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
-                                String.format(
-                                        "DROP TABLE %s IF EXIST;", tablePath.getFullName()))) {
-            // Will there exist concurrent drop for one table?
-            return ps.execute();
-        } catch (SQLException e) {
-            throw new CatalogException(
-                    String.format("Failed dropping table %s", tablePath.getFullName()), e);
-        }
-    }
-
-    @Override
-    protected boolean createDatabaseInternal(String databaseName) throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
-                                String.format("CREATE DATABASE `%s`;", databaseName))) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format(
-                            "Failed creating database %s in catalog %s",
-                            databaseName, this.catalogName),
-                    e);
-        }
-    }
-
-    @Override
-    protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(String.format("DROP DATABASE `%s`;", databaseName))) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format(
-                            "Failed dropping database %s in catalog %s",
-                            databaseName, this.catalogName),
-                    e);
-        }
-    }
-
     /**
      * @see com.mysql.cj.MysqlType
      * @see ResultSetImpl#getObjectStoredProc(int, int)
      */
-    @SuppressWarnings("unchecked")
-    private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
+    @Override
+    public SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
             throws SQLException {
         MysqlType mysqlType = MysqlType.getByName(metadata.getColumnTypeName(colIndex));
         Map<String, Object> dataTypeProperties = new HashMap<>();
@@ -257,19 +57,4 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         dataTypeProperties.put(MysqlDataTypeConvertor.SCALE, metadata.getScale(colIndex));
         return new MysqlDataTypeConvertor().toSeaTunnelType(mysqlType, dataTypeProperties);
     }
-
-    @SuppressWarnings("MagicNumber")
-    private Map<String, String> buildConnectorOptions(TablePath tablePath) {
-        Map<String, String> options = new HashMap<>(8);
-        options.put("connector", "jdbc");
-        options.put("url", baseUrl + tablePath.getDatabaseName());
-        options.put("table-name", tablePath.getFullName());
-        options.put("username", username);
-        options.put("password", pwd);
-        return options;
-    }
-
-    private String getUrlFromDatabaseName(String databaseName) {
-        return baseUrl + databaseName + suffix;
-    }
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
index 9a015ca73..c2e300516 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
@@ -49,7 +49,7 @@ public class MysqlCreateTableSqlBuilder {
 
     private List<ConstraintKey> constraintKeys;
 
-    private MysqlDataTypeConvertor mysqlDataTypeConvertor;
+    private final MysqlDataTypeConvertor mysqlDataTypeConvertor;
 
     private MysqlCreateTableSqlBuilder(String tableName) {
         checkNotNull(tableName, "tableName must not be null");
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
index 25c02e6b1..f15a0e746 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
@@ -19,41 +19,21 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.ConstraintKey;
-import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
-import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
 
 import org.apache.commons.lang3.tuple.Pair;
 
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
 
 public class SqlServerCatalog extends AbstractJdbcCatalog {
 
-    private static final Set<String> SYS_DATABASES = new HashSet<>(4);
-
     static {
         SYS_DATABASES.add("master");
         SYS_DATABASES.add("tempdb");
@@ -66,150 +46,6 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
         super(catalogName, username, pwd, urlInfo);
     }
 
-    @Override
-    public List<String> listDatabases() throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
-                PreparedStatement ps = conn.prepareStatement("SELECT NAME FROM SYS.DATABASES")) {
-
-            List<String> databases = new ArrayList<>();
-            ResultSet rs = ps.executeQuery();
-
-            while (rs.next()) {
-                String databaseName = rs.getString(1);
-                if (!SYS_DATABASES.contains(databaseName)) {
-                    databases.add(databaseName);
-                }
-            }
-
-            return databases;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", this.catalogName), e);
-        }
-    }
-
-    @Override
-    public List<String> listTables(String databaseName)
-            throws CatalogException, DatabaseNotExistException {
-        if (!databaseExists(databaseName)) {
-            throw new DatabaseNotExistException(this.catalogName, databaseName);
-        }
-
-        String dbUrl = getUrlFromDatabaseName(databaseName);
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
-                                "SELECT TABLE_SCHEMA, TABLE_NAME FROM "
-                                        + databaseName
-                                        + ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'")) {
-
-            ResultSet rs = ps.executeQuery();
-
-            List<String> tables = new ArrayList<>();
-
-            while (rs.next()) {
-                tables.add(rs.getString(1) + "." + rs.getString(2));
-            }
-
-            return tables;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", catalogName), e);
-        }
-    }
-
-    @Override
-    public boolean tableExists(TablePath tablePath) throws CatalogException {
-        try {
-            return databaseExists(tablePath.getDatabaseName())
-                    && listTables(tablePath.getDatabaseName())
-                            .contains(tablePath.getSchemaAndTableName());
-        } catch (DatabaseNotExistException e) {
-            return false;
-        }
-    }
-
-    @Override
-    public CatalogTable getTable(TablePath tablePath)
-            throws CatalogException, TableNotExistException {
-        if (!tableExists(tablePath)) {
-            throw new TableNotExistException(catalogName, tablePath);
-        }
-
-        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
-            DatabaseMetaData metaData = conn.getMetaData();
-            Optional<PrimaryKey> primaryKey =
-                    getPrimaryKey(
-                            metaData,
-                            tablePath.getDatabaseName(),
-                            tablePath.getSchemaName(),
-                            tablePath.getTableName());
-            List<ConstraintKey> constraintKeys =
-                    getConstraintKeys(
-                            metaData,
-                            tablePath.getDatabaseName(),
-                            tablePath.getSchemaName(),
-                            tablePath.getTableName());
-
-            try (PreparedStatement ps =
-                    conn.prepareStatement(
-                            String.format(
-                                    "SELECT * FROM %s WHERE 1 = 0;",
-                                    tablePath.getFullNameWithQuoted("\"")))) {
-                ResultSetMetaData tableMetaData = ps.getMetaData();
-                TableSchema.Builder builder = TableSchema.builder();
-                // add column
-                for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
-                    String columnName = tableMetaData.getColumnName(i);
-                    SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
-                    int columnDisplaySize = tableMetaData.getColumnDisplaySize(i);
-                    String comment = tableMetaData.getColumnLabel(i);
-                    boolean isNullable =
-                            tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable;
-                    Object defaultValue =
-                            getColumnDefaultValue(
-                                            metaData,
-                                            tablePath.getDatabaseName(),
-                                            tablePath.getSchemaName(),
-                                            tablePath.getTableName(),
-                                            columnName)
-                                    .orElse(null);
-
-                    PhysicalColumn physicalColumn =
-                            PhysicalColumn.of(
-                                    columnName,
-                                    type,
-                                    columnDisplaySize,
-                                    isNullable,
-                                    defaultValue,
-                                    comment);
-                    builder.column(physicalColumn);
-                }
-                // add primary key
-                primaryKey.ifPresent(builder::primaryKey);
-                // add constraint key
-                constraintKeys.forEach(builder::constraintKey);
-                TableIdentifier tableIdentifier =
-                        TableIdentifier.of(
-                                catalogName,
-                                tablePath.getDatabaseName(),
-                                tablePath.getSchemaName(),
-                                tablePath.getTableName());
-                return CatalogTable.of(
-                        tableIdentifier,
-                        builder.build(),
-                        buildConnectorOptions(tablePath),
-                        Collections.emptyList(),
-                        "");
-            }
-
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed getting table %s", tablePath.getFullName()), e);
-        }
-    }
-
     @Override
     protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
             throws CatalogException {
@@ -217,54 +53,7 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
     }
 
     @Override
-    protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
-        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
-                                String.format("DROP TABLE IF EXIST %s", tablePath.getFullName()))) {
-            // Will there exist concurrent drop for one table?
-            return ps.execute();
-        } catch (SQLException e) {
-            throw new CatalogException(
-                    String.format("Failed dropping table %s", tablePath.getFullName()), e);
-        }
-    }
-
-    @Override
-    protected boolean createDatabaseInternal(String databaseName) throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
-                                String.format("CREATE DATABASE `%s`", databaseName))) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format(
-                            "Failed creating database %s in catalog %s",
-                            databaseName, this.catalogName),
-                    e);
-        }
-    }
-
-    @Override
-    protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
-                                String.format("DROP DATABASE IF EXISTS `%s`;", databaseName))) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format(
-                            "Failed dropping database %s in catalog %s",
-                            databaseName, this.catalogName),
-                    e);
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
+    public SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
             throws SQLException {
         Pair<SqlServerType, Map<String, Object>> pair =
                 SqlServerType.parse(metadata.getColumnTypeName(colIndex));
@@ -274,19 +63,4 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
         dataTypeProperties.put(SqlServerDataTypeConvertor.SCALE, metadata.getScale(colIndex));
         return new SqlServerDataTypeConvertor().toSeaTunnelType(pair.getLeft(), dataTypeProperties);
     }
-
-    @SuppressWarnings("MagicNumber")
-    private Map<String, String> buildConnectorOptions(TablePath tablePath) {
-        Map<String, String> options = new HashMap<>(8);
-        options.put("connector", "jdbc");
-        options.put("url", getUrlFromDatabaseName(tablePath.getDatabaseName()));
-        options.put("table-name", tablePath.getFullName());
-        options.put("username", username);
-        options.put("password", pwd);
-        return options;
-    }
-
-    private String getUrlFromDatabaseName(String databaseName) {
-        return baseUrl + ";databaseName=" + databaseName + ";" + suffix;
-    }
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
index e848498c9..0b0fa91f2 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
@@ -140,7 +140,7 @@ public enum SqlServerType implements SQLType {
     public static Pair<SqlServerType, Map<String, Object>> parse(String fullTypeName) {
         Map<String, Object> params = new HashMap<>();
         String typeName = fullTypeName;
-        if (fullTypeName.indexOf("(") != -1) {
+        if (fullTypeName.contains("(")) {
             typeName = fullTypeName.substring(0, fullTypeName.indexOf("(")).trim();
             String paramsStr =
                     fullTypeName.substring(
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 64ed388df..36e47eda1 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 
@@ -193,4 +195,40 @@ public interface JdbcDialect extends Serializable {
         PreparedStatement ps = conn.prepareStatement(jdbcSourceConfig.getQuery());
         return ps.getMetaData();
     }
+
+    default String listDatabases() {
+        return "SHOW DATABASES;";
+    }
+
+    default String getUrlFromDatabaseName(String baseUrl, String databaseName, String suffix) {
+        return baseUrl + databaseName + suffix;
+    }
+
+    default String createDatabaseSql(String databaseName) {
+        return String.format("CREATE DATABASE IF NOT EXISTS %s;", quoteIdentifier(databaseName));
+    }
+
+    default String dropDatabaseSql(String databaseName) {
+        return String.format("DROP DATABASE IF EXISTS %s;", quoteIdentifier(databaseName));
+    }
+
+    default String getTableName(ResultSet rs) throws SQLException {
+        return rs.getString(1);
+    }
+
+    default String getTableName(TablePath tablePath) {
+        return tablePath.getTableName();
+    }
+
+    default String listTableSql(String databaseName) {
+        return "SHOW TABLES;";
+    }
+
+    default String getDropTableSql(String tableName) {
+        return String.format("DROP TABLE %s IF EXIST;", tableName);
+    }
+
+    default String createTableSql(TablePath tablePath, CatalogTable catalogTable) {
+        return "";
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index 128b8ae4b..63584d8ab 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlCreateTableSqlBuilder;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
@@ -78,4 +81,9 @@ public class MysqlDialect implements JdbcDialect {
         statement.setFetchSize(Integer.MIN_VALUE);
         return statement;
     }
+
+    @Override
+    public String createTableSql(TablePath tablePath, CatalogTable catalogTable) {
+        return MysqlCreateTableSqlBuilder.builder(tablePath, catalogTable).build();
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
index 697d2d2dc..90b376624 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
@@ -17,10 +17,13 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver;
 
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
 
+import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
@@ -100,4 +103,31 @@ public class SqlServerDialect implements JdbcDialect {
 
         return Optional.of(upsertSQL);
     }
+
+    @Override
+    public String listDatabases() {
+        return "SELECT NAME FROM SYS.DATABASES";
+    }
+
+    @Override
+    public String listTableSql(String databaseName) {
+        return "SELECT TABLE_SCHEMA, TABLE_NAME FROM "
+                + databaseName
+                + ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'";
+    }
+
+    @Override
+    public String getTableName(ResultSet rs) throws SQLException {
+        return rs.getString(1) + "." + rs.getString(2);
+    }
+
+    @Override
+    public String getTableName(TablePath tablePath) {
+        return tablePath.getSchemaName() + "." + tablePath.getTableName();
+    }
+
+    @Override
+    public String getUrlFromDatabaseName(String baseUrl, String databaseName, String suffix) {
+        return baseUrl + ";databaseName=" + databaseName + ";" + suffix;
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-starrocks/pom.xml b/seatunnel-connectors-v2/connector-starrocks/pom.xml
index 08e49bc0f..44b899885 100644
--- a/seatunnel-connectors-v2/connector-starrocks/pom.xml
+++ b/seatunnel-connectors-v2/connector-starrocks/pom.xml
@@ -49,6 +49,11 @@
             <artifactId>connector-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-jdbc</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
index 7bf308b1c..79d9bf901 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -17,13 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
 
-import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
 import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
@@ -36,6 +32,7 @@ import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
 import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
 
 import org.apache.commons.lang3.StringUtils;
@@ -47,22 +44,18 @@ import com.mysql.cj.MysqlType;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-public class StarRocksCatalog implements Catalog {
+public class StarRocksCatalog extends AbstractJdbcCatalog {
 
     protected final String catalogName;
     protected String defaultDatabase = "information_schema";
@@ -71,8 +64,6 @@ public class StarRocksCatalog implements Catalog {
     protected final String baseUrl;
     protected String defaultUrl;
     private final JdbcUrlUtil.UrlInfo urlInfo;
-
-    private static final Set<String> SYS_DATABASES = new HashSet<>();
     private static final Logger LOG = LoggerFactory.getLogger(StarRocksCatalog.class);
 
     static {
@@ -82,6 +73,8 @@ public class StarRocksCatalog implements Catalog {
 
     public StarRocksCatalog(String catalogName, String username, String pwd, String defaultUrl) {
 
+        super(catalogName, username, pwd, JdbcUrlUtil.getUrlInfo(defaultUrl));
+
         checkArgument(StringUtils.isNotBlank(username));
         checkArgument(StringUtils.isNotBlank(pwd));
         checkArgument(StringUtils.isNotBlank(defaultUrl));
@@ -97,104 +90,9 @@ public class StarRocksCatalog implements Catalog {
     }
 
     @Override
-    public List<String> listDatabases() throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
-
-            PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;");
-
-            List<String> databases = new ArrayList<>();
-            ResultSet rs = ps.executeQuery();
-
-            while (rs.next()) {
-                String databaseName = rs.getString(1);
-                if (!SYS_DATABASES.contains(databaseName)) {
-                    databases.add(rs.getString(1));
-                }
-            }
-
-            return databases;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", this.catalogName), e);
-        }
-    }
-
-    @Override
-    public List<String> listTables(String databaseName)
-            throws CatalogException, DatabaseNotExistException {
-        if (!databaseExists(databaseName)) {
-            throw new DatabaseNotExistException(this.catalogName, databaseName);
-        }
-
-        try (Connection conn =
-                DriverManager.getConnection(
-                        urlInfo.getUrlWithDatabase(databaseName), username, pwd)) {
-            PreparedStatement ps = conn.prepareStatement("SHOW TABLES;");
-
-            ResultSet rs = ps.executeQuery();
-
-            List<String> tables = new ArrayList<>();
-
-            while (rs.next()) {
-                tables.add(rs.getString(1));
-            }
-
-            return tables;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", catalogName), e);
-        }
-    }
-
-    @Override
-    public CatalogTable getTable(TablePath tablePath)
-            throws CatalogException, TableNotExistException {
-        if (!tableExists(tablePath)) {
-            throw new TableNotExistException(catalogName, tablePath);
-        }
-
-        String dbUrl = urlInfo.getUrlWithDatabase(tablePath.getDatabaseName());
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
-            Optional<PrimaryKey> primaryKey =
-                    getPrimaryKey(tablePath.getDatabaseName(), tablePath.getTableName());
-
-            PreparedStatement ps =
-                    conn.prepareStatement(
-                            String.format(
-                                    "SELECT * FROM %s WHERE 1 = 0;",
-                                    tablePath.getFullNameWithQuoted()));
-
-            ResultSetMetaData tableMetaData = ps.getMetaData();
-
-            TableSchema.Builder builder = TableSchema.builder();
-            for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
-                SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
-                // TODO add default value and test it
-                builder.column(
-                        PhysicalColumn.of(
-                                tableMetaData.getColumnName(i),
-                                type,
-                                tableMetaData.getColumnDisplaySize(i),
-                                tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable,
-                                null,
-                                tableMetaData.getColumnLabel(i)));
-            }
-
-            primaryKey.ifPresent(builder::primaryKey);
-
-            TableIdentifier tableIdentifier =
-                    TableIdentifier.of(
-                            catalogName, tablePath.getDatabaseName(), tablePath.getTableName());
-            return CatalogTable.of(
-                    tableIdentifier,
-                    builder.build(),
-                    buildConnectorOptions(tablePath),
-                    Collections.emptyList(),
-                    "");
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed getting table %s", tablePath.getFullName()), e);
-        }
+    protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
+            throws CatalogException {
+        throw new UnsupportedOperationException("Unsupported create table");
     }
 
     @Override
@@ -209,6 +107,11 @@ public class StarRocksCatalog implements Catalog {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
             throws DatabaseAlreadyExistException, CatalogException {
@@ -229,6 +132,11 @@ public class StarRocksCatalog implements Catalog {
         }
     }
 
+    @Override
+    protected boolean createDatabaseInternal(String databaseName) {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
             throws DatabaseNotExistException, CatalogException {
@@ -246,8 +154,14 @@ public class StarRocksCatalog implements Catalog {
         }
     }
 
+    @Override
+    protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
     /** @see com.mysql.cj.MysqlType */
-    private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
+    @Override
+    public SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
             throws SQLException {
         MysqlType starrocksType = MysqlType.getByName(metadata.getColumnTypeName(colIndex));
         switch (starrocksType) {
@@ -315,7 +229,8 @@ public class StarRocksCatalog implements Catalog {
     }
 
     @SuppressWarnings("MagicNumber")
-    private Map<String, String> buildConnectorOptions(TablePath tablePath) {
+    @Override
+    public Map<String, String> buildConnectorOptions(TablePath tablePath) {
         Map<String, String> options = new HashMap<>(8);
         options.put("connector", "starrocks");
         options.put("url", baseUrl + tablePath.getDatabaseName());
@@ -369,29 +284,6 @@ public class StarRocksCatalog implements Catalog {
         return res;
     }
 
-    @Override
-    public String getDefaultDatabase() {
-        return defaultDatabase;
-    }
-
-    @Override
-    public void open() throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
-            // test connection, fail early if we cannot connect to database
-            conn.getCatalog();
-        } catch (SQLException e) {
-            throw new CatalogException(
-                    String.format("Failed connecting to %s via JDBC.", defaultUrl), e);
-        }
-
-        LOG.info("Catalog {} established connection to {}", catalogName, defaultUrl);
-    }
-
-    @Override
-    public void close() throws CatalogException {
-        LOG.info("Catalog {} closing", catalogName);
-    }
-
     protected Optional<PrimaryKey> getPrimaryKey(String schema, String table) throws SQLException {
 
         List<String> pkFields = new ArrayList<>();
@@ -414,21 +306,4 @@ public class StarRocksCatalog implements Catalog {
         }
         return Optional.empty();
     }
-
-    @Override
-    public boolean databaseExists(String databaseName) throws CatalogException {
-        checkArgument(StringUtils.isNotBlank(databaseName));
-
-        return listDatabases().contains(databaseName);
-    }
-
-    @Override
-    public boolean tableExists(TablePath tablePath) throws CatalogException {
-        try {
-            return databaseExists(tablePath.getDatabaseName())
-                    && listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
-        } catch (DatabaseNotExistException e) {
-            return false;
-        }
-    }
 }
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDialect.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDialect.java
new file mode 100644
index 000000000..9286e628f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDialect.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
+
+public class StarRocksDialect extends MysqlDialect {
+    @Override
+    public String dialectName() {
+        return "StarRocks";
+    }
+}