You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/04/20 07:03:07 UTC

[incubator-seatunnel] 01/01: Revert "[Improve][Catalog] refactor catalog (#4540)"

This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch revert-4540-refactor-catalog
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git

commit 4c9d4bb3b5a70f0e5493b27ccce1addf168660ee
Author: hailin0 <ha...@gmail.com>
AuthorDate: Thu Apr 20 15:03:00 2023 +0800

    Revert "[Improve][Catalog] refactor catalog (#4540)"
    
    This reverts commit b0a701cb83a2353da3b1f93a483c19cddd55747c.
---
 .../jdbc/catalog/AbstractJdbcCatalog.java          | 234 +--------------------
 .../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java | 219 ++++++++++++++++++-
 .../catalog/mysql/MysqlCreateTableSqlBuilder.java  |   2 +-
 .../jdbc/catalog/sqlserver/SqlServerCatalog.java   | 228 +++++++++++++++++++-
 .../jdbc/catalog/sqlserver/SqlServerType.java      |   2 +-
 .../jdbc/internal/dialect/JdbcDialect.java         |  38 ----
 .../jdbc/internal/dialect/mysql/MysqlDialect.java  |   8 -
 .../dialect/sqlserver/SqlServerDialect.java        |  30 ---
 .../connector-starrocks/pom.xml                    |   5 -
 .../starrocks/catalog/StarRocksCatalog.java        | 177 +++++++++++++---
 .../starrocks/catalog/StarRocksDialect.java        |  27 ---
 11 files changed, 605 insertions(+), 365 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index 4fb68775f..bd516e325 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -21,20 +21,14 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
-import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
 import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
 import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
-import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
-import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -46,19 +40,14 @@ import org.slf4j.LoggerFactory;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -66,6 +55,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 public abstract class AbstractJdbcCatalog implements Catalog {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class);
+
     protected final String catalogName;
     protected final String defaultDatabase;
     protected final String username;
@@ -73,8 +63,6 @@ public abstract class AbstractJdbcCatalog implements Catalog {
     protected final String baseUrl;
     protected final String suffix;
     protected final String defaultUrl;
-    protected final JdbcDialect jdbcDialect;
-    protected static final Set<String> SYS_DATABASES = new HashSet<>();
 
     public AbstractJdbcCatalog(
             String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {
@@ -92,7 +80,6 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
         this.defaultUrl = urlInfo.getOrigin();
         this.suffix = urlInfo.getSuffix();
-        this.jdbcDialect = JdbcDialectLoader.load(this.baseUrl);
     }
 
     @Override
@@ -120,7 +107,6 @@ public abstract class AbstractJdbcCatalog implements Catalog {
     public void open() throws CatalogException {
         try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
             // test connection, fail early if we cannot connect to database
-            conn.getCatalog();
         } catch (SQLException e) {
             throw new CatalogException(
                     String.format("Failed connecting to %s via JDBC.", defaultUrl), e);
@@ -134,56 +120,6 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         LOG.info("Catalog {} closing", catalogName);
     }
 
-    @Override
-    public List<String> listDatabases() throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
-
-            PreparedStatement ps = conn.prepareStatement(jdbcDialect.listDatabases());
-
-            List<String> databases = new ArrayList<>();
-            ResultSet rs = ps.executeQuery();
-
-            while (rs.next()) {
-                String databaseName = rs.getString(1);
-                if (!getSysDatabases().contains(databaseName)) {
-                    databases.add(rs.getString(1));
-                }
-            }
-
-            return databases;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", this.catalogName), e);
-        }
-    }
-
-    @Override
-    public List<String> listTables(String databaseName)
-            throws CatalogException, DatabaseNotExistException {
-        if (!databaseExists(databaseName)) {
-            throw new DatabaseNotExistException(this.catalogName, databaseName);
-        }
-
-        String dbUrl = jdbcDialect.getUrlFromDatabaseName(baseUrl, databaseName, suffix);
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(jdbcDialect.listTableSql(databaseName))) {
-
-            ResultSet rs = ps.executeQuery();
-
-            List<String> tables = new ArrayList<>();
-
-            while (rs.next()) {
-                tables.add(jdbcDialect.getTableName(rs));
-            }
-
-            return tables;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed listing database in catalog %s", catalogName), e);
-        }
-    }
-
     protected Optional<PrimaryKey> getPrimaryKey(
             DatabaseMetaData metaData, String database, String table) throws SQLException {
         return getPrimaryKey(metaData, database, table, table);
@@ -290,8 +226,7 @@ public abstract class AbstractJdbcCatalog implements Catalog {
     public boolean tableExists(TablePath tablePath) throws CatalogException {
         try {
             return databaseExists(tablePath.getDatabaseName())
-                    && listTables(tablePath.getDatabaseName())
-                            .contains(jdbcDialect.getTableName(tablePath));
+                    && listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
         } catch (DatabaseNotExistException e) {
             return false;
         }
@@ -310,86 +245,8 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         }
     }
 
-    public CatalogTable getTable(TablePath tablePath)
-            throws CatalogException, TableNotExistException {
-        if (!tableExists(tablePath)) {
-            throw new TableNotExistException(catalogName, tablePath);
-        }
-
-        String dbUrl =
-                jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix);
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
-            DatabaseMetaData metaData = conn.getMetaData();
-            Optional<PrimaryKey> primaryKey =
-                    getPrimaryKey(
-                            metaData,
-                            tablePath.getDatabaseName(),
-                            tablePath.getSchemaName(),
-                            tablePath.getTableName());
-            List<ConstraintKey> constraintKeys =
-                    getConstraintKeys(
-                            metaData,
-                            tablePath.getDatabaseName(),
-                            tablePath.getSchemaName(),
-                            tablePath.getTableName());
-
-            try (PreparedStatement ps =
-                    conn.prepareStatement(
-                            String.format(
-                                    "SELECT * FROM %s WHERE 1 = 0;",
-                                    tablePath.getFullNameWithQuoted("\"")))) {
-                ResultSetMetaData tableMetaData = ps.getMetaData();
-                TableSchema.Builder builder = TableSchema.builder();
-                // add column
-                for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
-                    String columnName = tableMetaData.getColumnName(i);
-                    SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
-                    int columnDisplaySize = tableMetaData.getColumnDisplaySize(i);
-                    String comment = tableMetaData.getColumnLabel(i);
-                    boolean isNullable =
-                            tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable;
-                    Object defaultValue =
-                            getColumnDefaultValue(
-                                            metaData,
-                                            tablePath.getDatabaseName(),
-                                            tablePath.getSchemaName(),
-                                            tablePath.getTableName(),
-                                            columnName)
-                                    .orElse(null);
-
-                    PhysicalColumn physicalColumn =
-                            PhysicalColumn.of(
-                                    columnName,
-                                    type,
-                                    columnDisplaySize,
-                                    isNullable,
-                                    defaultValue,
-                                    comment);
-                    builder.column(physicalColumn);
-                }
-                // add primary key
-                primaryKey.ifPresent(builder::primaryKey);
-                // add constraint key
-                constraintKeys.forEach(builder::constraintKey);
-                TableIdentifier tableIdentifier =
-                        TableIdentifier.of(
-                                catalogName,
-                                tablePath.getDatabaseName(),
-                                tablePath.getSchemaName(),
-                                tablePath.getTableName());
-                return CatalogTable.of(
-                        tableIdentifier,
-                        builder.build(),
-                        buildConnectorOptions(tablePath),
-                        Collections.emptyList(),
-                        "");
-            }
-
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed getting table %s", tablePath.getFullName()), e);
-        }
-    }
+    protected abstract boolean createTableInternal(TablePath tablePath, CatalogTable table)
+            throws CatalogException;
 
     @Override
     public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
@@ -400,20 +257,7 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         }
     }
 
-    protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
-        String dbUrl =
-                jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix);
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
-                                jdbcDialect.getDropTableSql(tablePath.getFullName()))) {
-            // Will there exist concurrent drop for one table?
-            return ps.execute();
-        } catch (SQLException e) {
-            throw new CatalogException(
-                    String.format("Failed dropping table %s", tablePath.getFullName()), e);
-        }
-    }
+    protected abstract boolean dropTableInternal(TablePath tablePath) throws CatalogException;
 
     @Override
     public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
@@ -429,6 +273,8 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         }
     }
 
+    protected abstract boolean createDatabaseInternal(String databaseName);
+
     @Override
     public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
             throws DatabaseNotExistException, CatalogException {
@@ -440,69 +286,5 @@ public abstract class AbstractJdbcCatalog implements Catalog {
         }
     }
 
-    protected SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
-            throws SQLException {
-        return null;
-    }
-
-    protected Set<String> getSysDatabases() {
-        return SYS_DATABASES;
-    }
-
-    protected Map<String, String> buildConnectorOptions(TablePath tablePath) {
-        Map<String, String> options = new HashMap<>(8);
-        options.put("connector", "jdbc");
-        options.put(
-                "url",
-                jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix));
-        options.put("table-name", tablePath.getFullName());
-        options.put("username", username);
-        options.put("password", pwd);
-        return options;
-    }
-
-    protected boolean createDatabaseInternal(String databaseName) {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(
-                                String.format(jdbcDialect.createDatabaseSql(databaseName)))) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format(
-                            "Failed creating database %s in catalog %s",
-                            databaseName, this.catalogName),
-                    e);
-        }
-    }
-
-    protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
-                PreparedStatement ps =
-                        conn.prepareStatement(jdbcDialect.dropDatabaseSql(databaseName))) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format(
-                            "Failed dropping database %s in catalog %s",
-                            databaseName, this.catalogName),
-                    e);
-        }
-    }
-
-    // todo: If the origin source is mysql, we can directly use create table like to create the
-    // target table?
-    protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
-            throws CatalogException {
-        String dbUrl =
-                jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix);
-        String createTableSql = jdbcDialect.createTableSql(tablePath, table);
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
-                PreparedStatement ps = conn.prepareStatement(createTableSql)) {
-            return ps.execute();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed creating table %s", tablePath.getFullName()), e);
-        }
-    }
+    protected abstract boolean dropDatabaseInternal(String databaseName) throws CatalogException;
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
index f6c9cc679..d8534d5df 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
@@ -18,6 +18,16 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
@@ -25,13 +35,26 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalo
 import com.mysql.cj.MysqlType;
 import com.mysql.cj.jdbc.result.ResultSetImpl;
 
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 
 public class MySqlCatalog extends AbstractJdbcCatalog {
 
+    protected static final Set<String> SYS_DATABASES = new HashSet<>(4);
+
     static {
         SYS_DATABASES.add("information_schema");
         SYS_DATABASES.add("mysql");
@@ -44,12 +67,189 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         super(catalogName, username, pwd, urlInfo);
     }
 
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
+                PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;")) {
+
+            List<String> databases = new ArrayList<>();
+            ResultSet rs = ps.executeQuery();
+
+            while (rs.next()) {
+                String databaseName = rs.getString(1);
+                if (!SYS_DATABASES.contains(databaseName)) {
+                    databases.add(rs.getString(1));
+                }
+            }
+
+            return databases;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", this.catalogName), e);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(this.catalogName, databaseName);
+        }
+
+        String dbUrl = getUrlFromDatabaseName(databaseName);
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
+                PreparedStatement ps = conn.prepareStatement("SHOW TABLES;")) {
+
+            ResultSet rs = ps.executeQuery();
+
+            List<String> tables = new ArrayList<>();
+
+            while (rs.next()) {
+                tables.add(rs.getString(1));
+            }
+
+            return tables;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", catalogName), e);
+        }
+    }
+
+    @Override
+    public CatalogTable getTable(TablePath tablePath)
+            throws CatalogException, TableNotExistException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(catalogName, tablePath);
+        }
+
+        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
+            DatabaseMetaData metaData = conn.getMetaData();
+            Optional<PrimaryKey> primaryKey =
+                    getPrimaryKey(metaData, tablePath.getDatabaseName(), tablePath.getTableName());
+            List<ConstraintKey> constraintKeys =
+                    getConstraintKeys(
+                            metaData, tablePath.getDatabaseName(), tablePath.getTableName());
+
+            try (PreparedStatement ps =
+                    conn.prepareStatement(
+                            String.format(
+                                    "SELECT * FROM %s WHERE 1 = 0;",
+                                    tablePath.getFullNameWithQuoted()))) {
+                ResultSetMetaData tableMetaData = ps.getMetaData();
+                TableSchema.Builder builder = TableSchema.builder();
+                // add column
+                for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
+                    String columnName = tableMetaData.getColumnName(i);
+                    SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
+                    int columnDisplaySize = tableMetaData.getColumnDisplaySize(i);
+                    String comment = tableMetaData.getColumnLabel(i);
+                    boolean isNullable =
+                            tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable;
+                    Object defaultValue =
+                            getColumnDefaultValue(metaData, tablePath.getTableName(), columnName)
+                                    .orElse(null);
+
+                    PhysicalColumn physicalColumn =
+                            PhysicalColumn.of(
+                                    columnName,
+                                    type,
+                                    columnDisplaySize,
+                                    isNullable,
+                                    defaultValue,
+                                    comment);
+                    builder.column(physicalColumn);
+                }
+                // add primary key
+                primaryKey.ifPresent(builder::primaryKey);
+                // add constraint key
+                constraintKeys.forEach(builder::constraintKey);
+                TableIdentifier tableIdentifier =
+                        TableIdentifier.of(
+                                catalogName, tablePath.getDatabaseName(), tablePath.getTableName());
+                return CatalogTable.of(
+                        tableIdentifier,
+                        builder.build(),
+                        buildConnectorOptions(tablePath),
+                        Collections.emptyList(),
+                        "");
+            }
+
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed getting table %s", tablePath.getFullName()), e);
+        }
+    }
+
+    // todo: If the origin source is mysql, we can directly use create table like to create the
+    // target table?
+    @Override
+    protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
+            throws CatalogException {
+        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
+        String createTableSql = MysqlCreateTableSqlBuilder.builder(tablePath, table).build();
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
+                PreparedStatement ps = conn.prepareStatement(createTableSql)) {
+            return ps.execute();
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed creating table %s", tablePath.getFullName()), e);
+        }
+    }
+
+    @Override
+    protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
+        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(
+                                String.format(
+                                        "DROP TABLE %s IF EXIST;", tablePath.getFullName()))) {
+            // Will there exist concurrent drop for one table?
+            return ps.execute();
+        } catch (SQLException e) {
+            throw new CatalogException(
+                    String.format("Failed dropping table %s", tablePath.getFullName()), e);
+        }
+    }
+
+    @Override
+    protected boolean createDatabaseInternal(String databaseName) throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(
+                                String.format("CREATE DATABASE `%s`;", databaseName))) {
+            return ps.execute();
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed creating database %s in catalog %s",
+                            databaseName, this.catalogName),
+                    e);
+        }
+    }
+
+    @Override
+    protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(String.format("DROP DATABASE `%s`;", databaseName))) {
+            return ps.execute();
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed dropping database %s in catalog %s",
+                            databaseName, this.catalogName),
+                    e);
+        }
+    }
+
     /**
      * @see com.mysql.cj.MysqlType
      * @see ResultSetImpl#getObjectStoredProc(int, int)
      */
-    @Override
-    public SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
+    @SuppressWarnings("unchecked")
+    private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
             throws SQLException {
         MysqlType mysqlType = MysqlType.getByName(metadata.getColumnTypeName(colIndex));
         Map<String, Object> dataTypeProperties = new HashMap<>();
@@ -57,4 +257,19 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         dataTypeProperties.put(MysqlDataTypeConvertor.SCALE, metadata.getScale(colIndex));
         return new MysqlDataTypeConvertor().toSeaTunnelType(mysqlType, dataTypeProperties);
     }
+
+    @SuppressWarnings("MagicNumber")
+    private Map<String, String> buildConnectorOptions(TablePath tablePath) {
+        Map<String, String> options = new HashMap<>(8);
+        options.put("connector", "jdbc");
+        options.put("url", baseUrl + tablePath.getDatabaseName());
+        options.put("table-name", tablePath.getFullName());
+        options.put("username", username);
+        options.put("password", pwd);
+        return options;
+    }
+
+    private String getUrlFromDatabaseName(String databaseName) {
+        return baseUrl + databaseName + suffix;
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
index c2e300516..9a015ca73 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
@@ -49,7 +49,7 @@ public class MysqlCreateTableSqlBuilder {
 
     private List<ConstraintKey> constraintKeys;
 
-    private final MysqlDataTypeConvertor mysqlDataTypeConvertor;
+    private MysqlDataTypeConvertor mysqlDataTypeConvertor;
 
     private MysqlCreateTableSqlBuilder(String tableName) {
         checkNotNull(tableName, "tableName must not be null");
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
index f15a0e746..25c02e6b1 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
@@ -19,21 +19,41 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
 
 import org.apache.commons.lang3.tuple.Pair;
 
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 
 public class SqlServerCatalog extends AbstractJdbcCatalog {
 
+    private static final Set<String> SYS_DATABASES = new HashSet<>(4);
+
     static {
         SYS_DATABASES.add("master");
         SYS_DATABASES.add("tempdb");
@@ -46,6 +66,150 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
         super(catalogName, username, pwd, urlInfo);
     }
 
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
+                PreparedStatement ps = conn.prepareStatement("SELECT NAME FROM SYS.DATABASES")) {
+
+            List<String> databases = new ArrayList<>();
+            ResultSet rs = ps.executeQuery();
+
+            while (rs.next()) {
+                String databaseName = rs.getString(1);
+                if (!SYS_DATABASES.contains(databaseName)) {
+                    databases.add(databaseName);
+                }
+            }
+
+            return databases;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", this.catalogName), e);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(this.catalogName, databaseName);
+        }
+
+        String dbUrl = getUrlFromDatabaseName(databaseName);
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(
+                                "SELECT TABLE_SCHEMA, TABLE_NAME FROM "
+                                        + databaseName
+                                        + ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'")) {
+
+            ResultSet rs = ps.executeQuery();
+
+            List<String> tables = new ArrayList<>();
+
+            while (rs.next()) {
+                tables.add(rs.getString(1) + "." + rs.getString(2));
+            }
+
+            return tables;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", catalogName), e);
+        }
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        try {
+            return databaseExists(tablePath.getDatabaseName())
+                    && listTables(tablePath.getDatabaseName())
+                            .contains(tablePath.getSchemaAndTableName());
+        } catch (DatabaseNotExistException e) {
+            return false;
+        }
+    }
+
+    @Override
+    public CatalogTable getTable(TablePath tablePath)
+            throws CatalogException, TableNotExistException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(catalogName, tablePath);
+        }
+
+        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
+            DatabaseMetaData metaData = conn.getMetaData();
+            Optional<PrimaryKey> primaryKey =
+                    getPrimaryKey(
+                            metaData,
+                            tablePath.getDatabaseName(),
+                            tablePath.getSchemaName(),
+                            tablePath.getTableName());
+            List<ConstraintKey> constraintKeys =
+                    getConstraintKeys(
+                            metaData,
+                            tablePath.getDatabaseName(),
+                            tablePath.getSchemaName(),
+                            tablePath.getTableName());
+
+            try (PreparedStatement ps =
+                    conn.prepareStatement(
+                            String.format(
+                                    "SELECT * FROM %s WHERE 1 = 0;",
+                                    tablePath.getFullNameWithQuoted("\"")))) {
+                ResultSetMetaData tableMetaData = ps.getMetaData();
+                TableSchema.Builder builder = TableSchema.builder();
+                // add column
+                for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
+                    String columnName = tableMetaData.getColumnName(i);
+                    SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
+                    int columnDisplaySize = tableMetaData.getColumnDisplaySize(i);
+                    String comment = tableMetaData.getColumnLabel(i);
+                    boolean isNullable =
+                            tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable;
+                    Object defaultValue =
+                            getColumnDefaultValue(
+                                            metaData,
+                                            tablePath.getDatabaseName(),
+                                            tablePath.getSchemaName(),
+                                            tablePath.getTableName(),
+                                            columnName)
+                                    .orElse(null);
+
+                    PhysicalColumn physicalColumn =
+                            PhysicalColumn.of(
+                                    columnName,
+                                    type,
+                                    columnDisplaySize,
+                                    isNullable,
+                                    defaultValue,
+                                    comment);
+                    builder.column(physicalColumn);
+                }
+                // add primary key
+                primaryKey.ifPresent(builder::primaryKey);
+                // add constraint key
+                constraintKeys.forEach(builder::constraintKey);
+                TableIdentifier tableIdentifier =
+                        TableIdentifier.of(
+                                catalogName,
+                                tablePath.getDatabaseName(),
+                                tablePath.getSchemaName(),
+                                tablePath.getTableName());
+                return CatalogTable.of(
+                        tableIdentifier,
+                        builder.build(),
+                        buildConnectorOptions(tablePath),
+                        Collections.emptyList(),
+                        "");
+            }
+
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed getting table %s", tablePath.getFullName()), e);
+        }
+    }
+
     @Override
     protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
             throws CatalogException {
@@ -53,7 +217,54 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
     }
 
     @Override
-    public SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
+    protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
+        String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(
+                                String.format("DROP TABLE IF EXIST %s", tablePath.getFullName()))) {
+            // Will there exist concurrent drop for one table?
+            return ps.execute();
+        } catch (SQLException e) {
+            throw new CatalogException(
+                    String.format("Failed dropping table %s", tablePath.getFullName()), e);
+        }
+    }
+
+    @Override
+    protected boolean createDatabaseInternal(String databaseName) throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(
+                                String.format("CREATE DATABASE `%s`", databaseName))) {
+            return ps.execute();
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed creating database %s in catalog %s",
+                            databaseName, this.catalogName),
+                    e);
+        }
+    }
+
+    @Override
+    protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(
+                                String.format("DROP DATABASE IF EXISTS `%s`;", databaseName))) {
+            return ps.execute();
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed dropping database %s in catalog %s",
+                            databaseName, this.catalogName),
+                    e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
             throws SQLException {
         Pair<SqlServerType, Map<String, Object>> pair =
                 SqlServerType.parse(metadata.getColumnTypeName(colIndex));
@@ -63,4 +274,19 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
         dataTypeProperties.put(SqlServerDataTypeConvertor.SCALE, metadata.getScale(colIndex));
         return new SqlServerDataTypeConvertor().toSeaTunnelType(pair.getLeft(), dataTypeProperties);
     }
+
+    @SuppressWarnings("MagicNumber")
+    private Map<String, String> buildConnectorOptions(TablePath tablePath) {
+        Map<String, String> options = new HashMap<>(8);
+        options.put("connector", "jdbc");
+        options.put("url", getUrlFromDatabaseName(tablePath.getDatabaseName()));
+        options.put("table-name", tablePath.getFullName());
+        options.put("username", username);
+        options.put("password", pwd);
+        return options;
+    }
+
+    private String getUrlFromDatabaseName(String databaseName) {
+        return baseUrl + ";databaseName=" + databaseName + ";" + suffix;
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
index 0b0fa91f2..e848498c9 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
@@ -140,7 +140,7 @@ public enum SqlServerType implements SQLType {
     public static Pair<SqlServerType, Map<String, Object>> parse(String fullTypeName) {
         Map<String, Object> params = new HashMap<>();
         String typeName = fullTypeName;
-        if (fullTypeName.contains("(")) {
+        if (fullTypeName.indexOf("(") != -1) {
             typeName = fullTypeName.substring(0, fullTypeName.indexOf("(")).trim();
             String paramsStr =
                     fullTypeName.substring(
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 36e47eda1..64ed388df 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -17,8 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
 
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 
@@ -195,40 +193,4 @@ public interface JdbcDialect extends Serializable {
         PreparedStatement ps = conn.prepareStatement(jdbcSourceConfig.getQuery());
         return ps.getMetaData();
     }
-
-    default String listDatabases() {
-        return "SHOW DATABASES;";
-    }
-
-    default String getUrlFromDatabaseName(String baseUrl, String databaseName, String suffix) {
-        return baseUrl + databaseName + suffix;
-    }
-
-    default String createDatabaseSql(String databaseName) {
-        return String.format("CREATE DATABASE IF NOT EXISTS %s;", quoteIdentifier(databaseName));
-    }
-
-    default String dropDatabaseSql(String databaseName) {
-        return String.format("DROP DATABASE IF EXISTS %s;", quoteIdentifier(databaseName));
-    }
-
-    default String getTableName(ResultSet rs) throws SQLException {
-        return rs.getString(1);
-    }
-
-    default String getTableName(TablePath tablePath) {
-        return tablePath.getTableName();
-    }
-
-    default String listTableSql(String databaseName) {
-        return "SHOW TABLES;";
-    }
-
-    default String getDropTableSql(String tableName) {
-        return String.format("DROP TABLE %s IF EXIST;", tableName);
-    }
-
-    default String createTableSql(TablePath tablePath, CatalogTable catalogTable) {
-        return "";
-    }
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index 63584d8ab..128b8ae4b 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -17,9 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
 
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlCreateTableSqlBuilder;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
@@ -81,9 +78,4 @@ public class MysqlDialect implements JdbcDialect {
         statement.setFetchSize(Integer.MIN_VALUE);
         return statement;
     }
-
-    @Override
-    public String createTableSql(TablePath tablePath, CatalogTable catalogTable) {
-        return MysqlCreateTableSqlBuilder.builder(tablePath, catalogTable).build();
-    }
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
index 90b376624..697d2d2dc 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
@@ -17,13 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver;
 
-import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
 
-import java.sql.ResultSet;
-import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
@@ -103,31 +100,4 @@ public class SqlServerDialect implements JdbcDialect {
 
         return Optional.of(upsertSQL);
     }
-
-    @Override
-    public String listDatabases() {
-        return "SELECT NAME FROM SYS.DATABASES";
-    }
-
-    @Override
-    public String listTableSql(String databaseName) {
-        return "SELECT TABLE_SCHEMA, TABLE_NAME FROM "
-                + databaseName
-                + ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'";
-    }
-
-    @Override
-    public String getTableName(ResultSet rs) throws SQLException {
-        return rs.getString(1) + "." + rs.getString(2);
-    }
-
-    @Override
-    public String getTableName(TablePath tablePath) {
-        return tablePath.getSchemaName() + "." + tablePath.getTableName();
-    }
-
-    @Override
-    public String getUrlFromDatabaseName(String baseUrl, String databaseName, String suffix) {
-        return baseUrl + ";databaseName=" + databaseName + ";" + suffix;
-    }
 }
diff --git a/seatunnel-connectors-v2/connector-starrocks/pom.xml b/seatunnel-connectors-v2/connector-starrocks/pom.xml
index 44b899885..08e49bc0f 100644
--- a/seatunnel-connectors-v2/connector-starrocks/pom.xml
+++ b/seatunnel-connectors-v2/connector-starrocks/pom.xml
@@ -49,11 +49,6 @@
             <artifactId>connector-common</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-jdbc</artifactId>
-            <version>${project.version}</version>
-        </dependency>
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
index 79d9bf901..7bf308b1c 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -17,9 +17,13 @@
 
 package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
 
+import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
 import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
@@ -32,7 +36,6 @@ import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
-import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
 import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
 
 import org.apache.commons.lang3.StringUtils;
@@ -44,18 +47,22 @@ import com.mysql.cj.MysqlType;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-public class StarRocksCatalog extends AbstractJdbcCatalog {
+public class StarRocksCatalog implements Catalog {
 
     protected final String catalogName;
     protected String defaultDatabase = "information_schema";
@@ -64,6 +71,8 @@ public class StarRocksCatalog extends AbstractJdbcCatalog {
     protected final String baseUrl;
     protected String defaultUrl;
     private final JdbcUrlUtil.UrlInfo urlInfo;
+
+    private static final Set<String> SYS_DATABASES = new HashSet<>();
     private static final Logger LOG = LoggerFactory.getLogger(StarRocksCatalog.class);
 
     static {
@@ -73,8 +82,6 @@ public class StarRocksCatalog extends AbstractJdbcCatalog {
 
     public StarRocksCatalog(String catalogName, String username, String pwd, String defaultUrl) {
 
-        super(catalogName, username, pwd, JdbcUrlUtil.getUrlInfo(defaultUrl));
-
         checkArgument(StringUtils.isNotBlank(username));
         checkArgument(StringUtils.isNotBlank(pwd));
         checkArgument(StringUtils.isNotBlank(defaultUrl));
@@ -90,9 +97,104 @@ public class StarRocksCatalog extends AbstractJdbcCatalog {
     }
 
     @Override
-    protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
-            throws CatalogException {
-        throw new UnsupportedOperationException("Unsupported create table");
+    public List<String> listDatabases() throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+
+            PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;");
+
+            List<String> databases = new ArrayList<>();
+            ResultSet rs = ps.executeQuery();
+
+            while (rs.next()) {
+                String databaseName = rs.getString(1);
+                if (!SYS_DATABASES.contains(databaseName)) {
+                    databases.add(rs.getString(1));
+                }
+            }
+
+            return databases;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", this.catalogName), e);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(this.catalogName, databaseName);
+        }
+
+        try (Connection conn =
+                DriverManager.getConnection(
+                        urlInfo.getUrlWithDatabase(databaseName), username, pwd)) {
+            PreparedStatement ps = conn.prepareStatement("SHOW TABLES;");
+
+            ResultSet rs = ps.executeQuery();
+
+            List<String> tables = new ArrayList<>();
+
+            while (rs.next()) {
+                tables.add(rs.getString(1));
+            }
+
+            return tables;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", catalogName), e);
+        }
+    }
+
+    @Override
+    public CatalogTable getTable(TablePath tablePath)
+            throws CatalogException, TableNotExistException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(catalogName, tablePath);
+        }
+
+        String dbUrl = urlInfo.getUrlWithDatabase(tablePath.getDatabaseName());
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
+            Optional<PrimaryKey> primaryKey =
+                    getPrimaryKey(tablePath.getDatabaseName(), tablePath.getTableName());
+
+            PreparedStatement ps =
+                    conn.prepareStatement(
+                            String.format(
+                                    "SELECT * FROM %s WHERE 1 = 0;",
+                                    tablePath.getFullNameWithQuoted()));
+
+            ResultSetMetaData tableMetaData = ps.getMetaData();
+
+            TableSchema.Builder builder = TableSchema.builder();
+            for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
+                SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
+                // TODO add default value and test it
+                builder.column(
+                        PhysicalColumn.of(
+                                tableMetaData.getColumnName(i),
+                                type,
+                                tableMetaData.getColumnDisplaySize(i),
+                                tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable,
+                                null,
+                                tableMetaData.getColumnLabel(i)));
+            }
+
+            primaryKey.ifPresent(builder::primaryKey);
+
+            TableIdentifier tableIdentifier =
+                    TableIdentifier.of(
+                            catalogName, tablePath.getDatabaseName(), tablePath.getTableName());
+            return CatalogTable.of(
+                    tableIdentifier,
+                    builder.build(),
+                    buildConnectorOptions(tablePath),
+                    Collections.emptyList(),
+                    "");
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed getting table %s", tablePath.getFullName()), e);
+        }
     }
 
     @Override
@@ -107,11 +209,6 @@ public class StarRocksCatalog extends AbstractJdbcCatalog {
         throw new UnsupportedOperationException();
     }
 
-    @Override
-    protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
-        throw new UnsupportedOperationException();
-    }
-
     @Override
     public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
             throws DatabaseAlreadyExistException, CatalogException {
@@ -132,11 +229,6 @@ public class StarRocksCatalog extends AbstractJdbcCatalog {
         }
     }
 
-    @Override
-    protected boolean createDatabaseInternal(String databaseName) {
-        throw new UnsupportedOperationException();
-    }
-
     @Override
     public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
             throws DatabaseNotExistException, CatalogException {
@@ -154,14 +246,8 @@ public class StarRocksCatalog extends AbstractJdbcCatalog {
         }
     }
 
-    @Override
-    protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
-        throw new UnsupportedOperationException();
-    }
-
     /** @see com.mysql.cj.MysqlType */
-    @Override
-    public SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
+    private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
             throws SQLException {
         MysqlType starrocksType = MysqlType.getByName(metadata.getColumnTypeName(colIndex));
         switch (starrocksType) {
@@ -229,8 +315,7 @@ public class StarRocksCatalog extends AbstractJdbcCatalog {
     }
 
     @SuppressWarnings("MagicNumber")
-    @Override
-    public Map<String, String> buildConnectorOptions(TablePath tablePath) {
+    private Map<String, String> buildConnectorOptions(TablePath tablePath) {
         Map<String, String> options = new HashMap<>(8);
         options.put("connector", "starrocks");
         options.put("url", baseUrl + tablePath.getDatabaseName());
@@ -284,6 +369,29 @@ public class StarRocksCatalog extends AbstractJdbcCatalog {
         return res;
     }
 
+    @Override
+    public String getDefaultDatabase() {
+        return defaultDatabase;
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+            // test connection, fail early if we cannot connect to database
+            conn.getCatalog();
+        } catch (SQLException e) {
+            throw new CatalogException(
+                    String.format("Failed connecting to %s via JDBC.", defaultUrl), e);
+        }
+
+        LOG.info("Catalog {} established connection to {}", catalogName, defaultUrl);
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        LOG.info("Catalog {} closing", catalogName);
+    }
+
     protected Optional<PrimaryKey> getPrimaryKey(String schema, String table) throws SQLException {
 
         List<String> pkFields = new ArrayList<>();
@@ -306,4 +414,21 @@ public class StarRocksCatalog extends AbstractJdbcCatalog {
         }
         return Optional.empty();
     }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException {
+        checkArgument(StringUtils.isNotBlank(databaseName));
+
+        return listDatabases().contains(databaseName);
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        try {
+            return databaseExists(tablePath.getDatabaseName())
+                    && listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
+        } catch (DatabaseNotExistException e) {
+            return false;
+        }
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDialect.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDialect.java
deleted file mode 100644
index 9286e628f..000000000
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDialect.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
-
-import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
-
-public class StarRocksDialect extends MysqlDialect {
-    @Override
-    public String dialectName() {
-        return "StarRocks";
-    }
-}