You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/12/05 03:44:26 UTC

(seatunnel) branch dev updated: [Improve] Replace SeaTunnelRowType with TableSchema in the JdbcRowConverter

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1cc1b1b8cd [Improve] Replace SeaTunnelRowType with TableSchema in the JdbcRowConverter
1cc1b1b8cd is described below

commit 1cc1b1b8cdff8072ad51dcf354781d5a44b2f0b9
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Tue Dec 5 11:44:21 2023 +0800

    [Improve] Replace SeaTunnelRowType with TableSchema in the JdbcRowConverter
---
 .../seatunnel/api/table/catalog/TableSchema.java   |   4 +
 .../seatunnel/jdbc/internal/JdbcInputFormat.java   |  13 ++-
 .../jdbc/internal/JdbcOutputFormatBuilder.java     | 124 +++++++++++----------
 .../converter/AbstractJdbcRowConverter.java        |   7 +-
 .../jdbc/internal/converter/JdbcRowConverter.java  |   6 +-
 .../dialect/hive/HiveJdbcRowConverter.java         |   4 +-
 .../dialect/kingbase/KingbaseJdbcRowConverter.java |   7 +-
 .../dialect/psql/PostgresJdbcRowConverter.java     |   4 +-
 .../InsertOrUpdateBatchStatementExecutor.java      |  23 ++--
 .../executor/SimpleBatchStatementExecutor.java     |   6 +-
 .../jdbc/sink/JdbcExactlyOnceSinkWriter.java       |   6 +-
 .../connectors/seatunnel/jdbc/sink/JdbcSink.java   |  21 ++--
 .../seatunnel/jdbc/sink/JdbcSinkWriter.java        |  14 ++-
 .../seatunnel/jdbc/source/JdbcSource.java          |  11 +-
 .../seatunnel/jdbc/source/JdbcSourceReader.java    |   4 +-
 15 files changed, 134 insertions(+), 120 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
index 7fd277d2b0..d327a0668b 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
@@ -57,6 +57,10 @@ public final class TableSchema implements Serializable {
         return new SeaTunnelRowType(fields, fieldTypes);
     }
 
+    public String[] getFieldNames() {
+        return columns.stream().map(Column::getName).toArray(String[]::new);
+    }
+
     public static final class Builder {
         private final List<Column> columns = new ArrayList<>();
 
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
index c73df90ce9..c2fec61341 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
@@ -18,10 +18,11 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
@@ -54,16 +55,16 @@ public class JdbcInputFormat implements Serializable {
 
     private final JdbcDialect jdbcDialect;
     private final JdbcRowConverter jdbcRowConverter;
-    private final Map<TablePath, SeaTunnelRowType> tables;
+    private final Map<TablePath, CatalogTable> tables;
     private final ChunkSplitter chunkSplitter;
 
     private transient String splitTableId;
-    private transient SeaTunnelRowType splitRowType;
+    private transient TableSchema splitTableSchema;
     private transient PreparedStatement statement;
     private transient ResultSet resultSet;
     private volatile boolean hasNext;
 
-    public JdbcInputFormat(JdbcSourceConfig config, Map<TablePath, SeaTunnelRowType> tables) {
+    public JdbcInputFormat(JdbcSourceConfig config, Map<TablePath, CatalogTable> tables) {
         this.jdbcDialect =
                 JdbcDialectLoader.load(
                         config.getJdbcConnectionConfig().getUrl(), config.getCompatibleMode());
@@ -91,7 +92,7 @@ public class JdbcInputFormat implements Serializable {
      */
     public void open(JdbcSourceSplit inputSplit) throws IOException {
         try {
-            splitRowType = tables.get(inputSplit.getTablePath());
+            splitTableSchema = tables.get(inputSplit.getTablePath()).getTableSchema();
             splitTableId = inputSplit.getTablePath().toString();
 
             statement = chunkSplitter.generateSplitStatement(inputSplit);
@@ -142,7 +143,7 @@ public class JdbcInputFormat implements Serializable {
             if (!hasNext) {
                 return null;
             }
-            SeaTunnelRow seaTunnelRow = jdbcRowConverter.toInternal(resultSet, splitRowType);
+            SeaTunnelRow seaTunnelRow = jdbcRowConverter.toInternal(resultSet, splitTableSchema);
             seaTunnelRow.setTableId(splitTableId);
             seaTunnelRow.setRowKind(RowKind.INSERT);
 
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
index 5c05c2feff..4a296f9b4d 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
@@ -17,10 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal;
 
+import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
@@ -43,6 +43,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.function.Function;
 import java.util.function.IntFunction;
+import java.util.stream.Collectors;
 
 @Slf4j
 @RequiredArgsConstructor
@@ -50,7 +51,7 @@ public class JdbcOutputFormatBuilder {
     @NonNull private final JdbcDialect dialect;
     @NonNull private final JdbcConnectionProvider connectionProvider;
     @NonNull private final JdbcSinkConfig jdbcSinkConfig;
-    @NonNull private final SeaTunnelRowType seaTunnelRowType;
+    @NonNull private final TableSchema tableSchema;
 
     public JdbcOutputFormat build() {
         JdbcOutputFormat.StatementExecutorFactory statementExecutorFactory;
@@ -67,11 +68,11 @@ public class JdbcOutputFormatBuilder {
                     () ->
                             createSimpleBufferedExecutor(
                                     jdbcSinkConfig.getSimpleSql(),
-                                    seaTunnelRowType,
+                                    tableSchema,
                                     dialect.getRowConverter());
         } else if (primaryKeys == null || primaryKeys.isEmpty()) {
             statementExecutorFactory =
-                    () -> createSimpleBufferedExecutor(dialect, database, table, seaTunnelRowType);
+                    () -> createSimpleBufferedExecutor(dialect, database, table, tableSchema);
         } else {
             statementExecutorFactory =
                     () ->
@@ -79,7 +80,7 @@ public class JdbcOutputFormatBuilder {
                                     dialect,
                                     database,
                                     table,
-                                    seaTunnelRowType,
+                                    tableSchema,
                                     primaryKeys.toArray(new String[0]),
                                     jdbcSinkConfig.isEnableUpsert(),
                                     jdbcSinkConfig.isPrimaryKeyUpdated(),
@@ -93,15 +94,16 @@ public class JdbcOutputFormatBuilder {
     }
 
     private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleBufferedExecutor(
-            JdbcDialect dialect, String database, String table, SeaTunnelRowType rowType) {
-        String insertSQL = dialect.getInsertIntoStatement(database, table, rowType.getFieldNames());
-        return createSimpleBufferedExecutor(insertSQL, rowType, dialect.getRowConverter());
+            JdbcDialect dialect, String database, String table, TableSchema tableSchema) {
+        String insertSQL =
+                dialect.getInsertIntoStatement(database, table, tableSchema.getFieldNames());
+        return createSimpleBufferedExecutor(insertSQL, tableSchema, dialect.getRowConverter());
     }
 
     private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleBufferedExecutor(
-            String sql, SeaTunnelRowType rowType, JdbcRowConverter rowConverter) {
+            String sql, TableSchema tableSchema, JdbcRowConverter rowConverter) {
         JdbcBatchStatementExecutor<SeaTunnelRow> simpleRowExecutor =
-                createSimpleExecutor(sql, rowType, rowConverter);
+                createSimpleExecutor(sql, tableSchema, rowConverter);
         return new BufferedBatchStatementExecutor(simpleRowExecutor, Function.identity());
     }
 
@@ -109,28 +111,36 @@ public class JdbcOutputFormatBuilder {
             JdbcDialect dialect,
             String database,
             String table,
-            SeaTunnelRowType rowType,
+            TableSchema tableSchema,
             String[] pkNames,
             boolean enableUpsert,
             boolean isPrimaryKeyUpdated,
             boolean supportUpsertByInsertOnly) {
-        int[] pkFields = Arrays.stream(pkNames).mapToInt(rowType::indexOf).toArray();
-        SeaTunnelDataType[] pkTypes =
-                Arrays.stream(pkFields)
-                        .mapToObj((IntFunction<SeaTunnelDataType>) rowType::getFieldType)
-                        .toArray(SeaTunnelDataType[]::new);
+        int[] pkFields =
+                Arrays.stream(pkNames)
+                        .mapToInt(tableSchema.toPhysicalRowDataType()::indexOf)
+                        .toArray();
+
+        TableSchema pkSchema =
+                TableSchema.builder()
+                        .columns(
+                                Arrays.stream(pkFields)
+                                        .mapToObj(
+                                                (IntFunction<Column>) tableSchema.getColumns()::get)
+                                        .collect(Collectors.toList()))
+                        .build();
 
         Function<SeaTunnelRow, SeaTunnelRow> keyExtractor = createKeyExtractor(pkFields);
         JdbcBatchStatementExecutor<SeaTunnelRow> deleteExecutor =
-                createDeleteExecutor(dialect, database, table, pkNames, pkTypes);
+                createDeleteExecutor(dialect, database, table, pkNames, pkSchema);
         JdbcBatchStatementExecutor<SeaTunnelRow> upsertExecutor =
                 createUpsertExecutor(
                         dialect,
                         database,
                         table,
-                        rowType,
+                        tableSchema,
                         pkNames,
-                        pkTypes,
+                        pkSchema,
                         keyExtractor,
                         enableUpsert,
                         isPrimaryKeyUpdated,
@@ -143,47 +153,49 @@ public class JdbcOutputFormatBuilder {
             JdbcDialect dialect,
             String database,
             String table,
-            SeaTunnelRowType rowType,
+            TableSchema tableSchema,
             String[] pkNames,
-            SeaTunnelDataType[] pkTypes,
+            TableSchema pkTableSchema,
             Function<SeaTunnelRow, SeaTunnelRow> keyExtractor,
             boolean enableUpsert,
             boolean isPrimaryKeyUpdated,
             boolean supportUpsertByInsertOnly) {
         if (supportUpsertByInsertOnly) {
-            return createInsertOnlyExecutor(dialect, database, table, rowType);
+            return createInsertOnlyExecutor(dialect, database, table, tableSchema);
         }
         if (enableUpsert) {
             Optional<String> upsertSQL =
-                    dialect.getUpsertStatement(database, table, rowType.getFieldNames(), pkNames);
+                    dialect.getUpsertStatement(
+                            database, table, tableSchema.getFieldNames(), pkNames);
             if (upsertSQL.isPresent()) {
-                return createSimpleExecutor(upsertSQL.get(), rowType, dialect.getRowConverter());
+                return createSimpleExecutor(
+                        upsertSQL.get(), tableSchema, dialect.getRowConverter());
             }
             return createInsertOrUpdateByQueryExecutor(
                     dialect,
                     database,
                     table,
-                    rowType,
+                    tableSchema,
                     pkNames,
-                    pkTypes,
+                    pkTableSchema,
                     keyExtractor,
                     isPrimaryKeyUpdated);
         }
         return createInsertOrUpdateExecutor(
-                dialect, database, table, rowType, pkNames, isPrimaryKeyUpdated);
+                dialect, database, table, tableSchema, pkNames, isPrimaryKeyUpdated);
     }
 
     private static JdbcBatchStatementExecutor<SeaTunnelRow> createInsertOnlyExecutor(
-            JdbcDialect dialect, String database, String table, SeaTunnelRowType rowType) {
+            JdbcDialect dialect, String database, String table, TableSchema tableSchema) {
 
         return new SimpleBatchStatementExecutor(
                 connection ->
                         FieldNamedPreparedStatement.prepareStatement(
                                 connection,
                                 dialect.getInsertIntoStatement(
-                                        database, table, rowType.getFieldNames()),
-                                rowType.getFieldNames()),
-                rowType,
+                                        database, table, tableSchema.getFieldNames()),
+                                tableSchema.getFieldNames()),
+                tableSchema,
                 dialect.getRowConverter());
     }
 
@@ -191,7 +203,7 @@ public class JdbcOutputFormatBuilder {
             JdbcDialect dialect,
             String database,
             String table,
-            SeaTunnelRowType rowType,
+            TableSchema tableSchema,
             String[] pkNames,
             boolean isPrimaryKeyUpdated) {
 
@@ -200,19 +212,19 @@ public class JdbcOutputFormatBuilder {
                         FieldNamedPreparedStatement.prepareStatement(
                                 connection,
                                 dialect.getInsertIntoStatement(
-                                        database, table, rowType.getFieldNames()),
-                                rowType.getFieldNames()),
+                                        database, table, tableSchema.getFieldNames()),
+                                tableSchema.getFieldNames()),
                 connection ->
                         FieldNamedPreparedStatement.prepareStatement(
                                 connection,
                                 dialect.getUpdateStatement(
                                         database,
                                         table,
-                                        rowType.getFieldNames(),
+                                        tableSchema.getFieldNames(),
                                         pkNames,
                                         isPrimaryKeyUpdated),
-                                rowType.getFieldNames()),
-                rowType,
+                                tableSchema.getFieldNames()),
+                tableSchema,
                 dialect.getRowConverter());
     }
 
@@ -220,12 +232,11 @@ public class JdbcOutputFormatBuilder {
             JdbcDialect dialect,
             String database,
             String table,
-            SeaTunnelRowType rowType,
+            TableSchema tableSchema,
             String[] pkNames,
-            SeaTunnelDataType[] pkTypes,
+            TableSchema pkTableSchema,
             Function<SeaTunnelRow, SeaTunnelRow> keyExtractor,
             boolean isPrimaryKeyUpdated) {
-        SeaTunnelRowType keyRowType = new SeaTunnelRowType(pkNames, pkTypes);
         return new InsertOrUpdateBatchStatementExecutor(
                 connection ->
                         FieldNamedPreparedStatement.prepareStatement(
@@ -236,21 +247,21 @@ public class JdbcOutputFormatBuilder {
                         FieldNamedPreparedStatement.prepareStatement(
                                 connection,
                                 dialect.getInsertIntoStatement(
-                                        database, table, rowType.getFieldNames()),
-                                rowType.getFieldNames()),
+                                        database, table, tableSchema.getFieldNames()),
+                                tableSchema.getFieldNames()),
                 connection ->
                         FieldNamedPreparedStatement.prepareStatement(
                                 connection,
                                 dialect.getUpdateStatement(
                                         database,
                                         table,
-                                        rowType.getFieldNames(),
+                                        tableSchema.getFieldNames(),
                                         pkNames,
                                         isPrimaryKeyUpdated),
-                                rowType.getFieldNames()),
-                keyRowType,
+                                tableSchema.getFieldNames()),
+                pkTableSchema,
                 keyExtractor,
-                rowType,
+                tableSchema,
                 dialect.getRowConverter());
     }
 
@@ -259,27 +270,18 @@ public class JdbcOutputFormatBuilder {
             String database,
             String table,
             String[] pkNames,
-            SeaTunnelDataType[] pkTypes) {
+            TableSchema pkTableSchema) {
         String deleteSQL = dialect.getDeleteStatement(database, table, pkNames);
-        return createSimpleExecutor(deleteSQL, pkNames, pkTypes, dialect.getRowConverter());
-    }
-
-    private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleExecutor(
-            String sql,
-            String[] fieldNames,
-            SeaTunnelDataType[] fieldTypes,
-            JdbcRowConverter rowConverter) {
-        SeaTunnelRowType rowType = new SeaTunnelRowType(fieldNames, fieldTypes);
-        return createSimpleExecutor(sql, rowType, rowConverter);
+        return createSimpleExecutor(deleteSQL, pkTableSchema, dialect.getRowConverter());
     }
 
     private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleExecutor(
-            String sql, SeaTunnelRowType rowType, JdbcRowConverter rowConverter) {
+            String sql, TableSchema tableSchema, JdbcRowConverter rowConverter) {
         return new SimpleBatchStatementExecutor(
                 connection ->
                         FieldNamedPreparedStatement.prepareStatement(
-                                connection, sql, rowType.getFieldNames()),
-                rowType,
+                                connection, sql, tableSchema.getFieldNames()),
+                tableSchema,
                 rowConverter);
     }
 
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
index 8dda7c362d..e18a16bdd9 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter;
 
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -47,7 +48,8 @@ public abstract class AbstractJdbcRowConverter implements JdbcRowConverter {
     public AbstractJdbcRowConverter() {}
 
     @Override
-    public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws SQLException {
+    public SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws SQLException {
+        SeaTunnelRowType typeInfo = tableSchema.toPhysicalRowDataType();
         Object[] fields = new Object[typeInfo.getTotalFields()];
         for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); fieldIndex++) {
             SeaTunnelDataType<?> seaTunnelDataType = typeInfo.getFieldType(fieldIndex);
@@ -117,8 +119,9 @@ public abstract class AbstractJdbcRowConverter implements JdbcRowConverter {
 
     @Override
     public PreparedStatement toExternal(
-            SeaTunnelRowType rowType, SeaTunnelRow row, PreparedStatement statement)
+            TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement)
             throws SQLException {
+        SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
         for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) {
             SeaTunnelDataType<?> seaTunnelDataType = rowType.getFieldType(fieldIndex);
             int statementIndex = fieldIndex + 1;
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java
index 1ce18732e2..a8c7c079d3 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter;
 
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 
 import java.io.Serializable;
 import java.sql.PreparedStatement;
@@ -36,9 +36,9 @@ public interface JdbcRowConverter extends Serializable {
      *
      * @param rs ResultSet from JDBC
      */
-    SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws SQLException;
+    SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws SQLException;
 
     PreparedStatement toExternal(
-            SeaTunnelRowType rowType, SeaTunnelRow row, PreparedStatement statement)
+            TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement)
             throws SQLException;
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java
index 91ed90105c..28f7fdf425 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive;
 
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
@@ -35,7 +35,7 @@ public class HiveJdbcRowConverter extends AbstractJdbcRowConverter {
 
     @Override
     public PreparedStatement toExternal(
-            SeaTunnelRowType rowType, SeaTunnelRow row, PreparedStatement statement) {
+            TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement) {
         throw new JdbcConnectorException(
                 JdbcConnectorErrorCode.DONT_SUPPORT_SINK,
                 "The Hive jdbc connector don't support sink");
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
index 23f71271a4..4aa41c0f4c 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase;
 
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -47,7 +48,8 @@ public class KingbaseJdbcRowConverter extends AbstractJdbcRowConverter {
 
     @Override
     @SuppressWarnings("checkstyle:Indentation")
-    public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws SQLException {
+    public SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws SQLException {
+        SeaTunnelRowType typeInfo = tableSchema.toPhysicalRowDataType();
         Object[] fields = new Object[typeInfo.getTotalFields()];
         for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); fieldIndex++) {
             SeaTunnelDataType<?> seaTunnelDataType = typeInfo.getFieldType(fieldIndex);
@@ -117,8 +119,9 @@ public class KingbaseJdbcRowConverter extends AbstractJdbcRowConverter {
 
     @Override
     public PreparedStatement toExternal(
-            SeaTunnelRowType rowType, SeaTunnelRow row, PreparedStatement statement)
+            TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement)
             throws SQLException {
+        SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
         for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) {
             SeaTunnelDataType<?> seaTunnelDataType = rowType.getFieldType(fieldIndex);
             int statementIndex = fieldIndex + 1;
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
index 8ddc0b2000..877f4fdd9a 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql;
 
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -45,7 +46,8 @@ public class PostgresJdbcRowConverter extends AbstractJdbcRowConverter {
     }
 
     @Override
-    public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws SQLException {
+    public SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws SQLException {
+        SeaTunnelRowType typeInfo = tableSchema.toPhysicalRowDataType();
         Object[] fields = new Object[typeInfo.getTotalFields()];
         for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); fieldIndex++) {
             SeaTunnelDataType<?> seaTunnelDataType = typeInfo.getFieldType(fieldIndex);
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/InsertOrUpdateBatchStatementExecutor.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/InsertOrUpdateBatchStatementExecutor.java
index 22cb784efe..8a4b2a579c 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/InsertOrUpdateBatchStatementExecutor.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/InsertOrUpdateBatchStatementExecutor.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
 
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
@@ -39,9 +39,9 @@ public class InsertOrUpdateBatchStatementExecutor
     private final StatementFactory existStmtFactory;
     @NonNull private final StatementFactory insertStmtFactory;
     @NonNull private final StatementFactory updateStmtFactory;
-    private final SeaTunnelRowType keyRowType;
+    private final TableSchema keyTableSchema;
     private final Function<SeaTunnelRow, SeaTunnelRow> keyExtractor;
-    @NonNull private final SeaTunnelRowType valueRowType;
+    @NonNull private final TableSchema valueTableSchema;
     @NonNull private final JdbcRowConverter rowConverter;
     private transient PreparedStatement existStatement;
     private transient PreparedStatement insertStatement;
@@ -52,9 +52,16 @@ public class InsertOrUpdateBatchStatementExecutor
     public InsertOrUpdateBatchStatementExecutor(
             StatementFactory insertStmtFactory,
             StatementFactory updateStmtFactory,
-            SeaTunnelRowType valueRowType,
+            TableSchema valueTableSchema,
             JdbcRowConverter rowConverter) {
-        this(null, insertStmtFactory, updateStmtFactory, null, null, valueRowType, rowConverter);
+        this(
+                null,
+                insertStmtFactory,
+                updateStmtFactory,
+                null,
+                null,
+                valueTableSchema,
+                rowConverter);
     }
 
     @Override
@@ -74,14 +81,14 @@ public class InsertOrUpdateBatchStatementExecutor
                 insertStatement.executeBatch();
                 insertStatement.clearBatch();
             }
-            rowConverter.toExternal(valueRowType, record, updateStatement);
+            rowConverter.toExternal(valueTableSchema, record, updateStatement);
             updateStatement.addBatch();
         } else {
             if (preExistFlag != null && preExistFlag) {
                 updateStatement.executeBatch();
                 updateStatement.clearBatch();
             }
-            rowConverter.toExternal(valueRowType, record, insertStatement);
+            rowConverter.toExternal(valueTableSchema, record, insertStatement);
             insertStatement.addBatch();
         }
 
@@ -137,7 +144,7 @@ public class InsertOrUpdateBatchStatementExecutor
     }
 
     private boolean exist(SeaTunnelRow pk) throws SQLException {
-        rowConverter.toExternal(keyRowType, pk, existStatement);
+        rowConverter.toExternal(keyTableSchema, pk, existStatement);
         try (ResultSet resultSet = existStatement.executeQuery()) {
             return resultSet.next();
         }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java
index 08b436eee6..a2f0add260 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
 
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 
 import lombok.NonNull;
@@ -31,7 +31,7 @@ import java.sql.SQLException;
 @RequiredArgsConstructor
 public class SimpleBatchStatementExecutor implements JdbcBatchStatementExecutor<SeaTunnelRow> {
     @NonNull private final StatementFactory statementFactory;
-    @NonNull private final SeaTunnelRowType rowType;
+    @NonNull private final TableSchema tableSchema;
     @NonNull private final JdbcRowConverter converter;
     private transient PreparedStatement statement;
 
@@ -42,7 +42,7 @@ public class SimpleBatchStatementExecutor implements JdbcBatchStatementExecutor<
 
     @Override
     public void addToBatch(SeaTunnelRow record) throws SQLException {
-        converter.toExternal(rowType, record, statement);
+        converter.toExternal(tableSchema, record, statement);
         statement.addBatch();
     }
 
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
index 854e60e8c9..77e673ae6b 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
@@ -20,8 +20,8 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
@@ -84,7 +84,7 @@ public class JdbcExactlyOnceSinkWriter
             JobContext context,
             JdbcDialect dialect,
             JdbcSinkConfig jdbcSinkConfig,
-            SeaTunnelRowType rowType,
+            TableSchema tableSchema,
             List<JdbcSinkState> states) {
         checkArgument(
                 jdbcSinkConfig.getJdbcConnectionConfig().getMaxRetries() == 0,
@@ -99,7 +99,7 @@ public class JdbcExactlyOnceSinkWriter
         this.xaFacade =
                 XaFacade.fromJdbcConnectionOptions(jdbcSinkConfig.getJdbcConnectionConfig());
         this.outputFormat =
-                new JdbcOutputFormatBuilder(dialect, xaFacade, jdbcSinkConfig, rowType).build();
+                new JdbcOutputFormatBuilder(dialect, xaFacade, jdbcSinkConfig, tableSchema).build();
         this.xaGroupOps = new XaGroupOpsImpl(xaFacade);
     }
 
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 4234503a09..3fcb1fc832 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -33,8 +33,8 @@ import org.apache.seatunnel.api.sink.SupportSaveMode;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
@@ -60,7 +60,7 @@ public class JdbcSink
                 SupportSaveMode,
                 SupportMultiTableSink {
 
-    private final SeaTunnelRowType seaTunnelRowType;
+    private final TableSchema tableSchema;
 
     private JobContext jobContext;
 
@@ -89,7 +89,7 @@ public class JdbcSink
         this.schemaSaveMode = schemaSaveMode;
         this.dataSaveMode = dataSaveMode;
         this.catalogTable = catalogTable;
-        this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
+        this.tableSchema = catalogTable.getTableSchema();
     }
 
     @Override
@@ -108,20 +108,17 @@ public class JdbcSink
                             jobContext,
                             dialect,
                             jdbcSinkConfig,
-                            seaTunnelRowType,
+                            tableSchema,
                             new ArrayList<>());
         } else {
             if (catalogTable != null && catalogTable.getTableSchema().getPrimaryKey() != null) {
-                String keyName =
-                        catalogTable.getTableSchema().getPrimaryKey().getColumnNames().get(0);
-                int index = seaTunnelRowType.indexOf(keyName);
+                String keyName = tableSchema.getPrimaryKey().getColumnNames().get(0);
+                int index = tableSchema.toPhysicalRowDataType().indexOf(keyName);
                 if (index > -1) {
-                    return new JdbcSinkWriter(
-                            context, dialect, jdbcSinkConfig, seaTunnelRowType, index);
+                    return new JdbcSinkWriter(context, dialect, jdbcSinkConfig, tableSchema, index);
                 }
             }
-            sinkWriter =
-                    new JdbcSinkWriter(context, dialect, jdbcSinkConfig, seaTunnelRowType, null);
+            sinkWriter = new JdbcSinkWriter(context, dialect, jdbcSinkConfig, tableSchema, null);
         }
         return sinkWriter;
     }
@@ -131,7 +128,7 @@ public class JdbcSink
             SinkWriter.Context context, List<JdbcSinkState> states) throws IOException {
         if (jdbcSinkConfig.isExactlyOnce()) {
             return new JdbcExactlyOnceSinkWriter(
-                    context, jobContext, dialect, jdbcSinkConfig, seaTunnelRowType, states);
+                    context, jobContext, dialect, jdbcSinkConfig, tableSchema, states);
         }
         return SeaTunnelSink.super.restoreWriter(context, states);
     }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index 1fbcbe2ead..abb447c085 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -20,8 +20,8 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
 import org.apache.seatunnel.api.sink.MultiTableResourceManager;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
@@ -49,7 +49,7 @@ public class JdbcSinkWriter
     private JdbcOutputFormat<SeaTunnelRow, JdbcBatchStatementExecutor<SeaTunnelRow>> outputFormat;
     private final SinkWriter.Context context;
     private final JdbcDialect dialect;
-    private final SeaTunnelRowType rowType;
+    private final TableSchema tableSchema;
     private JdbcConnectionProvider connectionProvider;
     private transient boolean isOpen;
     private final Integer primaryKeyIndex;
@@ -59,17 +59,18 @@ public class JdbcSinkWriter
             SinkWriter.Context context,
             JdbcDialect dialect,
             JdbcSinkConfig jdbcSinkConfig,
-            SeaTunnelRowType rowType,
+            TableSchema tableSchema,
             Integer primaryKeyIndex) {
         this.context = context;
         this.jdbcSinkConfig = jdbcSinkConfig;
         this.dialect = dialect;
-        this.rowType = rowType;
+        this.tableSchema = tableSchema;
         this.primaryKeyIndex = primaryKeyIndex;
         this.connectionProvider =
                 dialect.getJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig());
         this.outputFormat =
-                new JdbcOutputFormatBuilder(dialect, connectionProvider, jdbcSinkConfig, rowType)
+                new JdbcOutputFormatBuilder(
+                                dialect, connectionProvider, jdbcSinkConfig, tableSchema)
                         .build();
     }
 
@@ -101,7 +102,8 @@ public class JdbcSinkWriter
                         jdbcSinkConfig.getJdbcConnectionConfig(),
                         queueIndex);
         this.outputFormat =
-                new JdbcOutputFormatBuilder(dialect, connectionProvider, jdbcSinkConfig, rowType)
+                new JdbcOutputFormatBuilder(
+                                dialect, connectionProvider, jdbcSinkConfig, tableSchema)
                         .build();
     }
 
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index 1e3797c476..8443aeb044 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -27,7 +27,6 @@ import org.apache.seatunnel.api.source.SupportParallelism;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcCatalogUtils;
@@ -80,15 +79,9 @@ public class JdbcSource
     @Override
     public SourceReader<SeaTunnelRow, JdbcSourceSplit> createReader(
             SourceReader.Context readerContext) throws Exception {
-        Map<TablePath, SeaTunnelRowType> tables = new HashMap<>();
+        Map<TablePath, CatalogTable> tables = new HashMap<>();
         for (TablePath tablePath : jdbcSourceTables.keySet()) {
-            SeaTunnelRowType rowType =
-                    jdbcSourceTables
-                            .get(tablePath)
-                            .getCatalogTable()
-                            .getTableSchema()
-                            .toPhysicalRowDataType();
-            tables.put(tablePath, rowType);
+            tables.put(tablePath, jdbcSourceTables.get(tablePath).getCatalogTable());
         }
         return new JdbcSourceReader(readerContext, jdbcSourceConfig, tables);
     }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java
index 2139412aac..f1b85dbfc7 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java
@@ -19,9 +19,9 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
 
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcInputFormat;
 
@@ -42,7 +42,7 @@ public class JdbcSourceReader implements SourceReader<SeaTunnelRow, JdbcSourceSp
     private volatile boolean noMoreSplit;
 
     public JdbcSourceReader(
-            Context context, JdbcSourceConfig config, Map<TablePath, SeaTunnelRowType> tables) {
+            Context context, JdbcSourceConfig config, Map<TablePath, CatalogTable> tables) {
         this.inputFormat = new JdbcInputFormat(config, tables);
         this.context = context;
     }