You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/04/20 07:03:06 UTC

[incubator-seatunnel] branch revert-4540-refactor-catalog created (now 4c9d4bb3b)

This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a change to branch revert-4540-refactor-catalog
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


      at 4c9d4bb3b Revert "[Improve][Catalog] refactor catalog (#4540)"

This branch includes the following new commits:

     new 4c9d4bb3b Revert "[Improve][Catalog] refactor catalog (#4540)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-seatunnel] 01/01: Revert "[Improve][Catalog] refactor catalog (#4540)"

Posted by wa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch revert-4540-refactor-catalog
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git

commit 4c9d4bb3b5a70f0e5493b27ccce1addf168660ee
Author: hailin0 <ha...@gmail.com>
AuthorDate: Thu Apr 20 15:03:00 2023 +0800

    Revert "[Improve][Catalog] refactor catalog (#4540)"
    
    This reverts commit b0a701cb83a2353da3b1f93a483c19cddd55747c.
---
 .../jdbc/catalog/AbstractJdbcCatalog.java          | 234 +--------------------
 .../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java | 219 ++++++++++++++++++-
 .../catalog/mysql/MysqlCreateTableSqlBuilder.java  |   2 +-
 .../jdbc/catalog/sqlserver/SqlServerCatalog.java   | 228 +++++++++++++++++++-
 .../jdbc/catalog/sqlserver/SqlServerType.java      |   2 +-
 .../jdbc/internal/dialect/JdbcDialect.java         |  38 ----
 .../jdbc/internal/dialect/mysql/MysqlDialect.java  |   8 -
 .../dialect/sqlserver/SqlServerDialect.java        |  30 ---
 .../connector-starrocks/pom.xml                    |   5 -
 .../starrocks/catalog/StarRocksCatalog.java        | 177 +++++++++++++---
 .../starrocks/catalog/StarRocksDialect.java        |  27 ---
 11 files changed, 605 insertions(+), 365 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index 4fb68775f..bd516e325 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -21,20 +21,14 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
-import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
 import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
 import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
-import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
-import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -46,19 +40,14 @@ import org.slf4j.LoggerFactory;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -66,6 +55,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 public abstract class AbstractJdbcCatalog implements Catalog {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class);
+
     protected final String catalogName;
     protected final String defaultDatabase;
     protected final String username;
@@ -73,8 +63,6 @@ public abstract class AbstractJdbcCatalog implements Catalog {
     protected final String baseUrl;
     protected final String suffix;
     protected final String defaultUrl;
-    protected final JdbcDialect jdbcDialect;
-    protected static final Set<String> SYS_DATABASES = new HashSet<>();
 
     public AbstractJdbcCatalog(
             String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {
@@ -92,7 +80,6 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
         this.defaultUrl = urlInfo.getOrigin();
         this.suffix = urlInfo.getSuffix();
-        this.jdbcDialect = JdbcDialectLoader.load(this.baseUrl);
     }
 
     @Override
@@ -120,7 +107,6 @@ public abstract class AbstractJdbcCatalog implements Catalog {
     public void open() throws CatalogException {
         try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
             // test connection, fail early if we cannot connect to database
-            conn.getCatalog();
         } catch (SQLException e) {
             throw new CatalogException(
                     String.format("Failed connecting to %s via JDBC.", defaultUrl), e);
@@ -134,56 +120,6 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         LOG.info("Catalog {} closing", catalogName);
     }
 
-    @Override
-    public List<String> listDatabases() throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
-
-            PreparedStatement ps = conn.prepareStatement(jdbcDialect.listDatabases());
-
-            List<String> databases = new ArrayList<>();
-            ResultSet rs = ps.executeQuery();
-
-            while (rs.next()) {
-                String databaseName = rs.getString(1);
-                if (!getSysDatabases().contains(databaseName)) {
-                    databases.add(rs.getString(1));
-                }
-            }
-
-            return databases;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", this.catalogName), e);
-        }
-    }
-
-    @Override
-    public List<String> listTables(String databaseName)
-            throws CatalogException, DatabaseNotExistException {
-        if (!databaseExists(databaseName)) {
-            throw new DatabaseNotExistException(this.catalogName, databaseName);
-        }
-
-        String dbUrl = jdbcDialect.getUrlFromDatabaseName(baseUrl, databaseName, suffix);
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(jdbcDialect.listTableSql(databaseName))) {
-
-            ResultSet rs = ps.executeQuery();
-
-            List<String> tables = new ArrayList<>();
-
-            while (rs.next()) {
-                tables.add(jdbcDialect.getTableName(rs));
-            }
-
-            return tables;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", catalogName), e);
-        }
-    }
-
     protected Optional<PrimaryKey> getPrimaryKey(
             DatabaseMetaData metaData, String database, String table) throws SQLException {
         return getPrimaryKey(metaData, database, table, table);
@@ -290,8 +226,7 @@ public abstract class AbstractJdbcCatalog implements Catalog {
     public boolean tableExists(TablePath tablePath) throws CatalogException {
         try {
             return databaseExists(tablePath.getDatabaseName())
-                    && listTables(tablePath.getDatabaseName())
-                            .contains(jdbcDialect.getTableName(tablePath));
+                    && listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
         } catch (DatabaseNotExistException e) {
             return false;
         }
@@ -310,86 +245,8 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         }
     }
 
-    public CatalogTable getTable(TablePath tablePath)
-            throws CatalogException, TableNotExistException {
-        if (!tableExists(tablePath)) {
-            throw new TableNotExistException(catalogName, tablePath);
-        }
-
-        String dbUrl =
-                jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix);
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
-            DatabaseMetaData metaData = conn.getMetaData();
-            Optional<PrimaryKey> primaryKey =
-                    getPrimaryKey(
-                            metaData,
-                            tablePath.getDatabaseName(),
-                            tablePath.getSchemaName(),
-                            tablePath.getTableName());
-            List<ConstraintKey> constraintKeys =
-                    getConstraintKeys(
-                            metaData,
-                            tablePath.getDatabaseName(),
-                            tablePath.getSchemaName(),
-                            tablePath.getTableName());
-
-            try (PreparedStatement ps =
-                    conn.prepareStatement(
-                            String.format(
-                                    "SELECT * FROM %s WHERE 1 = 0;",
-                                    tablePath.getFullNameWithQuoted("\"")))) {
-                ResultSetMetaData tableMetaData = ps.getMetaData();
-                TableSchema.Builder builder = TableSchema.builder();
-                // add column
-                for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
-                    String columnName = tableMetaData.getColumnName(i);
-                    SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
-                    int columnDisplaySize = tableMetaData.getColumnDisplaySize(i);
-                    String comment = tableMetaData.getColumnLabel(i);
-                    boolean isNullable =
-                            tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable;
-                    Object defaultValue =
-                            getColumnDefaultValue(
-                                            metaData,
-                                            tablePath.getDatabaseName(),
-                                            tablePath.getSchemaName(),
-                                            tablePath.getTableName(),
-                                            columnName)
-                                    .orElse(null);
-
-                    PhysicalColumn physicalColumn =
-                            PhysicalColumn.of(
-                                    columnName,
-                                    type,
-                                    columnDisplaySize,
-                                    isNullable,
-                                    defaultValue,
-                                    comment);
-                    builder.column(physicalColumn);
-                }
-                // add primary key
-                primaryKey.ifPresent(builder::primaryKey);
-                // add constraint key
-                constraintKeys.forEach(builder::constraintKey);
-                TableIdentifier tableIdentifier =
-                        TableIdentifier.of(
-                                catalogName,
-                                tablePath.getDatabaseName(),
-                                tablePath.getSchemaName(),
-                                tablePath.getTableName());
-                return CatalogTable.of(
-                        tableIdentifier,
-                        builder.build(),
-                        buildConnectorOptions(tablePath),
-                        Collections.emptyList(),
-                        "");
-            }
-
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed getting table %s", tablePath.getFullName()), e);
-        }
-    }
+    protected abstract boolean createTableInternal(TablePath tablePath, CatalogTable table)
+            throws CatalogException;
 
     @Override
     public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
@@ -400,20 +257,7 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         }
     }
 
-    protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
-        String dbUrl =
-                jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix);
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
-                                jdbcDialect.getDropTableSql(tablePath.getFullName()))) {
-            // Will there exist concurrent drop for one table?
-            return ps.execute();
-        } catch (SQLException e) {
-            throw new CatalogException(
-                    String.format("Failed dropping table %s", tablePath.getFullName()), e);
-        }
-    }
+    protected abstract boolean dropTableInternal(TablePath tablePath) throws CatalogException;
 
     @Override
     public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
@@ -429,6 +273,8 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         }
     }
 
+    protected abstract boolean createDatabaseInternal(String databaseName);
+
     @Override
     public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
             throws DatabaseNotExistException, CatalogException {
@@ -440,69 +286,5 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         }
     }
 
-    protected SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
-            throws SQLException {
-        return null;
-    }
-
-    protected Set<String> getSysDatabases() {
-        return SYS_DATABASES;
-    }
-
-    protected Map<String, String> buildConnectorOptions(TablePath tablePath) {
-        Map<String, String> options = new HashMap<>(8);
-        options.put("connector", "jdbc");
-        options.put(
-                "url",
-                jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix));
-        options.put("table-name", tablePath.getFullName());
-        options.put("username", username);
-        options.put("password", pwd);
-        return options;
-    }
-
-    protected boolean createDatabaseInternal(String databaseName) {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
-                                String.format(jdbcDialect.createDatabaseSql(databaseName)))) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format(
-                            "Failed creating database %s in catalog %s",
-                            databaseName, this.catalogName),
-                    e);
-        }
-    }
-
-    protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(jdbcDialect.dropDatabaseSql(databaseName))) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format(
-                            "Failed dropping database %s in catalog %s",
-                            databaseName, this.catalogName),
-                    e);
-        }
-    }
-
-    // todo: If the origin source is mysql, we can directly use create table like to create the
-    // target table?
-    protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
-            throws CatalogException {
-        String dbUrl =
-                jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix);
-        String createTableSql = jdbcDialect.createTableSql(tablePath, table);
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
-                PreparedStatement ps = conn.prepareStatement(createTableSql)) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed creating table %s", tablePath.getFullName()), e);
-        }
-    }
+    protected abstract boolean dropDatabaseInternal(String databaseName) throws CatalogException;
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
index f6c9cc679..d8534d5df 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
@@ -18,6 +18,16 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
@@ -25,13 +35,26 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalo
 import com.mysql.cj.MysqlType;
 import com.mysql.cj.jdbc.result.ResultSetImpl;
 
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 
 public class MySqlCatalog extends AbstractJdbcCatalog {
 
+    protected static final Set<String> SYS_DATABASES = new HashSet<>(4);
+
     static {
         SYS_DATABASES.add("information_schema");
         SYS_DATABASES.add("mysql");
@@ -44,12 +67,189 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         super(catalogName, username, pwd, urlInfo);
     }
 
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
+                PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;")) {
+
+            List<String> databases = new ArrayList<>();
+            ResultSet rs = ps.executeQuery();
+
+            while (rs.next()) {
+                String databaseName = rs.getString(1);
+                if (!SYS_DATABASES.contains(databaseName)) {
+                    databases.add(rs.getString(1));
+                }
+            }
+
+            return databases;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", this.catalogName), e);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(this.catalogName, databaseName);
+        }
+
+        String dbUrl = getUrlFromDatabaseName(databaseName);
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
+                PreparedStatement ps = conn.prepareStatement("SHOW TABLES;")) {
+
+            ResultSet rs = ps.executeQuery();
+
+            List<String> tables = new ArrayList<>();
+
+            while (rs.next()) {
+                tables.add(rs.getString(1));
+            }
+
+            return tables;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", catalogName), e);
+        }
+    }
+
+    @Override
+    public CatalogTable getTable(TablePath tablePath)
+            throws CatalogException, TableNotExistException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(catalogName, tablePath);
+        }
+
+        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
+            DatabaseMetaData metaData = conn.getMetaData();
+            Optional<PrimaryKey> primaryKey =
+                    getPrimaryKey(metaData, tablePath.getDatabaseName(), tablePath.getTableName());
+            List<ConstraintKey> constraintKeys =
+                    getConstraintKeys(
+                            metaData, tablePath.getDatabaseName(), tablePath.getTableName());
+
+            try (PreparedStatement ps =
+                    conn.prepareStatement(
+                            String.format(
+                                    "SELECT * FROM %s WHERE 1 = 0;",
+                                    tablePath.getFullNameWithQuoted()))) {
+                ResultSetMetaData tableMetaData = ps.getMetaData();
+                TableSchema.Builder builder = TableSchema.builder();
+                // add column
+                for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
+                    String columnName = tableMetaData.getColumnName(i);
+                    SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
+                    int columnDisplaySize = tableMetaData.getColumnDisplaySize(i);
+                    String comment = tableMetaData.getColumnLabel(i);
+                    boolean isNullable =
+                            tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable;
+                    Object defaultValue =
+                            getColumnDefaultValue(metaData, tablePath.getTableName(), columnName)
+                                    .orElse(null);
+
+                    PhysicalColumn physicalColumn =
+                            PhysicalColumn.of(
+                                    columnName,
+                                    type,
+                                    columnDisplaySize,
+                                    isNullable,
+                                    defaultValue,
+                                    comment);
+                    builder.column(physicalColumn);
+                }
+                // add primary key
+                primaryKey.ifPresent(builder::primaryKey);
+                // add constraint key
+                constraintKeys.forEach(builder::constraintKey);
+                TableIdentifier tableIdentifier =
+                        TableIdentifier.of(
+                                catalogName, tablePath.getDatabaseName(), tablePath.getTableName());
+                return CatalogTable.of(
+                        tableIdentifier,
+                        builder.build(),
+                        buildConnectorOptions(tablePath),
+                        Collections.emptyList(),
+                        "");
+            }
+
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed getting table %s", tablePath.getFullName()), e);
+        }
+    }
+
+    // todo: If the origin source is mysql, we can directly use create table like to create the
+    // target table?
+    @Override
+    protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
+            throws CatalogException {
+        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
+        String createTableSql = MysqlCreateTableSqlBuilder.builder(tablePath, table).build();
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
+                PreparedStatement ps = conn.prepareStatement(createTableSql)) {
+            return ps.execute();
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed creating table %s", tablePath.getFullName()), e);
+        }
+    }
+
+    @Override
+    protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
+        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(
+                                String.format(
+                                        "DROP TABLE %s IF EXIST;", tablePath.getFullName()))) {
+            // Will there exist concurrent drop for one table?
+            return ps.execute();
+        } catch (SQLException e) {
+            throw new CatalogException(
+                    String.format("Failed dropping table %s", tablePath.getFullName()), e);
+        }
+    }
+
+    @Override
+    protected boolean createDatabaseInternal(String databaseName) throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(
+                                String.format("CREATE DATABASE `%s`;", databaseName))) {
+            return ps.execute();
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed creating database %s in catalog %s",
+                            databaseName, this.catalogName),
+                    e);
+        }
+    }
+
+    @Override
+    protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(String.format("DROP DATABASE `%s`;", databaseName))) {
+            return ps.execute();
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed dropping database %s in catalog %s",
+                            databaseName, this.catalogName),
+                    e);
+        }
+    }
+
     /**
      * @see com.mysql.cj.MysqlType
      * @see ResultSetImpl#getObjectStoredProc(int, int)
      */
-    @Override
-    public SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
+    @SuppressWarnings("unchecked")
+    private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
             throws SQLException {
         MysqlType mysqlType = MysqlType.getByName(metadata.getColumnTypeName(colIndex));
         Map<String, Object> dataTypeProperties = new HashMap<>();
@@ -57,4 +257,19 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         dataTypeProperties.put(MysqlDataTypeConvertor.SCALE, metadata.getScale(colIndex));
         return new MysqlDataTypeConvertor().toSeaTunnelType(mysqlType, dataTypeProperties);
     }
+
+    @SuppressWarnings("MagicNumber")
+    private Map<String, String> buildConnectorOptions(TablePath tablePath) {
+        Map<String, String> options = new HashMap<>(8);
+        options.put("connector", "jdbc");
+        options.put("url", baseUrl + tablePath.getDatabaseName());
+        options.put("table-name", tablePath.getFullName());
+        options.put("username", username);
+        options.put("password", pwd);
+        return options;
+    }
+
+    private String getUrlFromDatabaseName(String databaseName) {
+        return baseUrl + databaseName + suffix;
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
index c2e300516..9a015ca73 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
@@ -49,7 +49,7 @@ public class MysqlCreateTableSqlBuilder {
 
     private List<ConstraintKey> constraintKeys;
 
-    private final MysqlDataTypeConvertor mysqlDataTypeConvertor;
+    private MysqlDataTypeConvertor mysqlDataTypeConvertor;
 
     private MysqlCreateTableSqlBuilder(String tableName) {
         checkNotNull(tableName, "tableName must not be null");
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
index f15a0e746..25c02e6b1 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
@@ -19,21 +19,41 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
 
 import org.apache.commons.lang3.tuple.Pair;
 
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 
 public class SqlServerCatalog extends AbstractJdbcCatalog {
 
+    private static final Set<String> SYS_DATABASES = new HashSet<>(4);
+
     static {
         SYS_DATABASES.add("master");
         SYS_DATABASES.add("tempdb");
@@ -46,6 +66,150 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
         super(catalogName, username, pwd, urlInfo);
     }
 
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
+                PreparedStatement ps = conn.prepareStatement("SELECT NAME FROM SYS.DATABASES")) {
+
+            List<String> databases = new ArrayList<>();
+            ResultSet rs = ps.executeQuery();
+
+            while (rs.next()) {
+                String databaseName = rs.getString(1);
+                if (!SYS_DATABASES.contains(databaseName)) {
+                    databases.add(databaseName);
+                }
+            }
+
+            return databases;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", this.catalogName), e);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(this.catalogName, databaseName);
+        }
+
+        String dbUrl = getUrlFromDatabaseName(databaseName);
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(
+                                "SELECT TABLE_SCHEMA, TABLE_NAME FROM "
+                                        + databaseName
+                                        + ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'")) {
+
+            ResultSet rs = ps.executeQuery();
+
+            List<String> tables = new ArrayList<>();
+
+            while (rs.next()) {
+                tables.add(rs.getString(1) + "." + rs.getString(2));
+            }
+
+            return tables;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", catalogName), e);
+        }
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        try {
+            return databaseExists(tablePath.getDatabaseName())
+                    && listTables(tablePath.getDatabaseName())
+                            .contains(tablePath.getSchemaAndTableName());
+        } catch (DatabaseNotExistException e) {
+            return false;
+        }
+    }
+
+    @Override
+    public CatalogTable getTable(TablePath tablePath)
+            throws CatalogException, TableNotExistException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(catalogName, tablePath);
+        }
+
+        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
+            DatabaseMetaData metaData = conn.getMetaData();
+            Optional<PrimaryKey> primaryKey =
+                    getPrimaryKey(
+                            metaData,
+                            tablePath.getDatabaseName(),
+                            tablePath.getSchemaName(),
+                            tablePath.getTableName());
+            List<ConstraintKey> constraintKeys =
+                    getConstraintKeys(
+                            metaData,
+                            tablePath.getDatabaseName(),
+                            tablePath.getSchemaName(),
+                            tablePath.getTableName());
+
+            try (PreparedStatement ps =
+                    conn.prepareStatement(
+                            String.format(
+                                    "SELECT * FROM %s WHERE 1 = 0;",
+                                    tablePath.getFullNameWithQuoted("\"")))) {
+                ResultSetMetaData tableMetaData = ps.getMetaData();
+                TableSchema.Builder builder = TableSchema.builder();
+                // add column
+                for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
+                    String columnName = tableMetaData.getColumnName(i);
+                    SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
+                    int columnDisplaySize = tableMetaData.getColumnDisplaySize(i);
+                    String comment = tableMetaData.getColumnLabel(i);
+                    boolean isNullable =
+                            tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable;
+                    Object defaultValue =
+                            getColumnDefaultValue(
+                                            metaData,
+                                            tablePath.getDatabaseName(),
+                                            tablePath.getSchemaName(),
+                                            tablePath.getTableName(),
+                                            columnName)
+                                    .orElse(null);
+
+                    PhysicalColumn physicalColumn =
+                            PhysicalColumn.of(
+                                    columnName,
+                                    type,
+                                    columnDisplaySize,
+                                    isNullable,
+                                    defaultValue,
+                                    comment);
+                    builder.column(physicalColumn);
+                }
+                // add primary key
+                primaryKey.ifPresent(builder::primaryKey);
+                // add constraint key
+                constraintKeys.forEach(builder::constraintKey);
+                TableIdentifier tableIdentifier =
+                        TableIdentifier.of(
+                                catalogName,
+                                tablePath.getDatabaseName(),
+                                tablePath.getSchemaName(),
+                                tablePath.getTableName());
+                return CatalogTable.of(
+                        tableIdentifier,
+                        builder.build(),
+                        buildConnectorOptions(tablePath),
+                        Collections.emptyList(),
+                        "");
+            }
+
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed getting table %s", tablePath.getFullName()), e);
+        }
+    }
+
     @Override
     protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
             throws CatalogException {
@@ -53,7 +217,54 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
     }
 
     @Override
-    public SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
+    protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
+        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(
+                                String.format("DROP TABLE IF EXIST %s", tablePath.getFullName()))) {
+            // Will there exist concurrent drop for one table?
+            return ps.execute();
+        } catch (SQLException e) {
+            throw new CatalogException(
+                    String.format("Failed dropping table %s", tablePath.getFullName()), e);
+        }
+    }
+
+    @Override
+    protected boolean createDatabaseInternal(String databaseName) throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(
+                                String.format("CREATE DATABASE `%s`", databaseName))) {
+            return ps.execute();
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed creating database %s in catalog %s",
+                            databaseName, this.catalogName),
+                    e);
+        }
+    }
+
+    @Override
+    protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(
+                                String.format("DROP DATABASE IF EXISTS `%s`;", databaseName))) {
+            return ps.execute();
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed dropping database %s in catalog %s",
+                            databaseName, this.catalogName),
+                    e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
             throws SQLException {
         Pair<SqlServerType, Map<String, Object>> pair =
                 SqlServerType.parse(metadata.getColumnTypeName(colIndex));
@@ -63,4 +274,19 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
         dataTypeProperties.put(SqlServerDataTypeConvertor.SCALE, metadata.getScale(colIndex));
         return new SqlServerDataTypeConvertor().toSeaTunnelType(pair.getLeft(), dataTypeProperties);
     }
+
+    @SuppressWarnings("MagicNumber")
+    private Map<String, String> buildConnectorOptions(TablePath tablePath) {
+        Map<String, String> options = new HashMap<>(8);
+        options.put("connector", "jdbc");
+        options.put("url", getUrlFromDatabaseName(tablePath.getDatabaseName()));
+        options.put("table-name", tablePath.getFullName());
+        options.put("username", username);
+        options.put("password", pwd);
+        return options;
+    }
+
+    private String getUrlFromDatabaseName(String databaseName) {
+        return baseUrl + ";databaseName=" + databaseName + ";" + suffix;
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
index 0b0fa91f2..e848498c9 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
@@ -140,7 +140,7 @@ public enum SqlServerType implements SQLType {
     public static Pair<SqlServerType, Map<String, Object>> parse(String fullTypeName) {
         Map<String, Object> params = new HashMap<>();
         String typeName = fullTypeName;
-        if (fullTypeName.contains("(")) {
+        if (fullTypeName.indexOf("(") != -1) {
             typeName = fullTypeName.substring(0, fullTypeName.indexOf("(")).trim();
             String paramsStr =
                     fullTypeName.substring(
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 36e47eda1..64ed388df 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -17,8 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
 
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 
@@ -195,40 +193,4 @@ public interface JdbcDialect extends Serializable {
         PreparedStatement ps = conn.prepareStatement(jdbcSourceConfig.getQuery());
         return ps.getMetaData();
     }
-
-    default String listDatabases() {
-        return "SHOW DATABASES;";
-    }
-
-    default String getUrlFromDatabaseName(String baseUrl, String databaseName, String suffix) {
-        return baseUrl + databaseName + suffix;
-    }
-
-    default String createDatabaseSql(String databaseName) {
-        return String.format("CREATE DATABASE IF NOT EXISTS %s;", quoteIdentifier(databaseName));
-    }
-
-    default String dropDatabaseSql(String databaseName) {
-        return String.format("DROP DATABASE IF EXISTS %s;", quoteIdentifier(databaseName));
-    }
-
-    default String getTableName(ResultSet rs) throws SQLException {
-        return rs.getString(1);
-    }
-
-    default String getTableName(TablePath tablePath) {
-        return tablePath.getTableName();
-    }
-
-    default String listTableSql(String databaseName) {
-        return "SHOW TABLES;";
-    }
-
-    default String getDropTableSql(String tableName) {
-        return String.format("DROP TABLE %s IF EXIST;", tableName);
-    }
-
-    default String createTableSql(TablePath tablePath, CatalogTable catalogTable) {
-        return "";
-    }
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index 63584d8ab..128b8ae4b 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -17,9 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
 
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlCreateTableSqlBuilder;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
@@ -81,9 +78,4 @@ public class MysqlDialect implements JdbcDialect {
         statement.setFetchSize(Integer.MIN_VALUE);
         return statement;
     }
-
-    @Override
-    public String createTableSql(TablePath tablePath, CatalogTable catalogTable) {
-        return MysqlCreateTableSqlBuilder.builder(tablePath, catalogTable).build();
-    }
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
index 90b376624..697d2d2dc 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
@@ -17,13 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver;
 
-import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
 
-import java.sql.ResultSet;
-import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
@@ -103,31 +100,4 @@ public class SqlServerDialect implements JdbcDialect {
 
         return Optional.of(upsertSQL);
     }
-
-    @Override
-    public String listDatabases() {
-        return "SELECT NAME FROM SYS.DATABASES";
-    }
-
-    @Override
-    public String listTableSql(String databaseName) {
-        return "SELECT TABLE_SCHEMA, TABLE_NAME FROM "
-                + databaseName
-                + ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'";
-    }
-
-    @Override
-    public String getTableName(ResultSet rs) throws SQLException {
-        return rs.getString(1) + "." + rs.getString(2);
-    }
-
-    @Override
-    public String getTableName(TablePath tablePath) {
-        return tablePath.getSchemaName() + "." + tablePath.getTableName();
-    }
-
-    @Override
-    public String getUrlFromDatabaseName(String baseUrl, String databaseName, String suffix) {
-        return baseUrl + ";databaseName=" + databaseName + ";" + suffix;
-    }
 }
diff --git a/seatunnel-connectors-v2/connector-starrocks/pom.xml b/seatunnel-connectors-v2/connector-starrocks/pom.xml
index 44b899885..08e49bc0f 100644
--- a/seatunnel-connectors-v2/connector-starrocks/pom.xml
+++ b/seatunnel-connectors-v2/connector-starrocks/pom.xml
@@ -49,11 +49,6 @@
             <artifactId>connector-common</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-jdbc</artifactId>
-            <version>${project.version}</version>
-        </dependency>
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
index 79d9bf901..7bf308b1c 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -17,9 +17,13 @@
 
 package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
 
+import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
 import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
@@ -32,7 +36,6 @@ import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
-import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
 import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
 
 import org.apache.commons.lang3.StringUtils;
@@ -44,18 +47,22 @@ import com.mysql.cj.MysqlType;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-public class StarRocksCatalog extends AbstractJdbcCatalog {
+public class StarRocksCatalog implements Catalog {
 
     protected final String catalogName;
     protected String defaultDatabase = "information_schema";
@@ -64,6 +71,8 @@ public class StarRocksCatalog extends AbstractJdbcCatalog {
     protected final String baseUrl;
     protected String defaultUrl;
     private final JdbcUrlUtil.UrlInfo urlInfo;
+
+    private static final Set<String> SYS_DATABASES = new HashSet<>();
     private static final Logger LOG = LoggerFactory.getLogger(StarRocksCatalog.class);
 
     static {
@@ -73,8 +82,6 @@ public class StarRocksCatalog extends AbstractJdbcCatalog {
 
     public StarRocksCatalog(String catalogName, String username, String pwd, String defaultUrl) {
 
-        super(catalogName, username, pwd, JdbcUrlUtil.getUrlInfo(defaultUrl));
-
         checkArgument(StringUtils.isNotBlank(username));
         checkArgument(StringUtils.isNotBlank(pwd));
         checkArgument(StringUtils.isNotBlank(defaultUrl));
@@ -90,9 +97,104 @@ public class StarRocksCatalog extends AbstractJdbcCatalog {
     }
 
     @Override
-    protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
-            throws CatalogException {
-        throw new UnsupportedOperationException("Unsupported create table");
+    public List<String> listDatabases() throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+
+            PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;");
+
+            List<String> databases = new ArrayList<>();
+            ResultSet rs = ps.executeQuery();
+
+            while (rs.next()) {
+                String databaseName = rs.getString(1);
+                if (!SYS_DATABASES.contains(databaseName)) {
+                    databases.add(rs.getString(1));
+                }
+            }
+
+            return databases;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", this.catalogName), e);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(this.catalogName, databaseName);
+        }
+
+        try (Connection conn =
+                DriverManager.getConnection(
+                        urlInfo.getUrlWithDatabase(databaseName), username, pwd)) {
+            PreparedStatement ps = conn.prepareStatement("SHOW TABLES;");
+
+            ResultSet rs = ps.executeQuery();
+
+            List<String> tables = new ArrayList<>();
+
+            while (rs.next()) {
+                tables.add(rs.getString(1));
+            }
+
+            return tables;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", catalogName), e);
+        }
+    }
+
+    @Override
+    public CatalogTable getTable(TablePath tablePath)
+            throws CatalogException, TableNotExistException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(catalogName, tablePath);
+        }
+
+        String dbUrl = urlInfo.getUrlWithDatabase(tablePath.getDatabaseName());
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
+            Optional<PrimaryKey> primaryKey =
+                    getPrimaryKey(tablePath.getDatabaseName(), tablePath.getTableName());
+
+            PreparedStatement ps =
+                    conn.prepareStatement(
+                            String.format(
+                                    "SELECT * FROM %s WHERE 1 = 0;",
+                                    tablePath.getFullNameWithQuoted()));
+
+            ResultSetMetaData tableMetaData = ps.getMetaData();
+
+            TableSchema.Builder builder = TableSchema.builder();
+            for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
+                SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
+                // TODO add default value and test it
+                builder.column(
+                        PhysicalColumn.of(
+                                tableMetaData.getColumnName(i),
+                                type,
+                                tableMetaData.getColumnDisplaySize(i),
+                                tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable,
+                                null,
+                                tableMetaData.getColumnLabel(i)));
+            }
+
+            primaryKey.ifPresent(builder::primaryKey);
+
+            TableIdentifier tableIdentifier =
+                    TableIdentifier.of(
+                            catalogName, tablePath.getDatabaseName(), tablePath.getTableName());
+            return CatalogTable.of(
+                    tableIdentifier,
+                    builder.build(),
+                    buildConnectorOptions(tablePath),
+                    Collections.emptyList(),
+                    "");
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed getting table %s", tablePath.getFullName()), e);
+        }
     }
 
     @Override
@@ -107,11 +209,6 @@ public class StarRocksCatalog extends AbstractJdbcCatalog {
         throw new UnsupportedOperationException();
     }
 
-    @Override
-    protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
-        throw new UnsupportedOperationException();
-    }
-
     @Override
     public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
             throws DatabaseAlreadyExistException, CatalogException {
@@ -132,11 +229,6 @@ public class StarRocksCatalog extends AbstractJdbcCatalog {
         }
     }
 
-    @Override
-    protected boolean createDatabaseInternal(String databaseName) {
-        throw new UnsupportedOperationException();
-    }
-
     @Override
     public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
             throws DatabaseNotExistException, CatalogException {
@@ -154,14 +246,8 @@ public class StarRocksCatalog extends AbstractJdbcCatalog {
         }
     }
 
-    @Override
-    protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
-        throw new UnsupportedOperationException();
-    }
-
     /** @see com.mysql.cj.MysqlType */
-    @Override
-    public SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
+    private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
             throws SQLException {
         MysqlType starrocksType = MysqlType.getByName(metadata.getColumnTypeName(colIndex));
         switch (starrocksType) {
@@ -229,8 +315,7 @@ public class StarRocksCatalog extends AbstractJdbcCatalog {
     }
 
     @SuppressWarnings("MagicNumber")
-    @Override
-    public Map<String, String> buildConnectorOptions(TablePath tablePath) {
+    private Map<String, String> buildConnectorOptions(TablePath tablePath) {
         Map<String, String> options = new HashMap<>(8);
         options.put("connector", "starrocks");
         options.put("url", baseUrl + tablePath.getDatabaseName());
@@ -284,6 +369,29 @@ public class StarRocksCatalog extends AbstractJdbcCatalog {
         return res;
     }
 
+    @Override
+    public String getDefaultDatabase() {
+        return defaultDatabase;
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+            // test connection, fail early if we cannot connect to database
+            conn.getCatalog();
+        } catch (SQLException e) {
+            throw new CatalogException(
+                    String.format("Failed connecting to %s via JDBC.", defaultUrl), e);
+        }
+
+        LOG.info("Catalog {} established connection to {}", catalogName, defaultUrl);
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        LOG.info("Catalog {} closing", catalogName);
+    }
+
     protected Optional<PrimaryKey> getPrimaryKey(String schema, String table) throws SQLException {
 
         List<String> pkFields = new ArrayList<>();
@@ -306,4 +414,21 @@ public class StarRocksCatalog extends AbstractJdbcCatalog {
         }
         return Optional.empty();
     }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException {
+        checkArgument(StringUtils.isNotBlank(databaseName));
+
+        return listDatabases().contains(databaseName);
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        try {
+            return databaseExists(tablePath.getDatabaseName())
+                    && listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
+        } catch (DatabaseNotExistException e) {
+            return false;
+        }
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDialect.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDialect.java
deleted file mode 100644
index 9286e628f..000000000
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDialect.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
-
-import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
-
-public class StarRocksDialect extends MysqlDialect {
-    @Override
-    public String dialectName() {
-        return "StarRocks";
-    }
-}