You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/12/05 03:44:26 UTC
(seatunnel) branch dev updated: [Improve] Replace SeaTunnelRowType with TableSchema in the JdbcRowConverter
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1cc1b1b8cd [Improve] Replace SeaTunnelRowType with TableSchema in the JdbcRowConverter
1cc1b1b8cd is described below
commit 1cc1b1b8cdff8072ad51dcf354781d5a44b2f0b9
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Tue Dec 5 11:44:21 2023 +0800
[Improve] Replace SeaTunnelRowType with TableSchema in the JdbcRowConverter
---
.../seatunnel/api/table/catalog/TableSchema.java | 4 +
.../seatunnel/jdbc/internal/JdbcInputFormat.java | 13 ++-
.../jdbc/internal/JdbcOutputFormatBuilder.java | 124 +++++++++++----------
.../converter/AbstractJdbcRowConverter.java | 7 +-
.../jdbc/internal/converter/JdbcRowConverter.java | 6 +-
.../dialect/hive/HiveJdbcRowConverter.java | 4 +-
.../dialect/kingbase/KingbaseJdbcRowConverter.java | 7 +-
.../dialect/psql/PostgresJdbcRowConverter.java | 4 +-
.../InsertOrUpdateBatchStatementExecutor.java | 23 ++--
.../executor/SimpleBatchStatementExecutor.java | 6 +-
.../jdbc/sink/JdbcExactlyOnceSinkWriter.java | 6 +-
.../connectors/seatunnel/jdbc/sink/JdbcSink.java | 21 ++--
.../seatunnel/jdbc/sink/JdbcSinkWriter.java | 14 ++-
.../seatunnel/jdbc/source/JdbcSource.java | 11 +-
.../seatunnel/jdbc/source/JdbcSourceReader.java | 4 +-
15 files changed, 134 insertions(+), 120 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
index 7fd277d2b0..d327a0668b 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
@@ -57,6 +57,10 @@ public final class TableSchema implements Serializable {
return new SeaTunnelRowType(fields, fieldTypes);
}
+ public String[] getFieldNames() {
+ return columns.stream().map(Column::getName).toArray(String[]::new);
+ }
+
public static final class Builder {
private final List<Column> columns = new ArrayList<>();
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
index c73df90ce9..c2fec61341 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
@@ -18,10 +18,11 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
@@ -54,16 +55,16 @@ public class JdbcInputFormat implements Serializable {
private final JdbcDialect jdbcDialect;
private final JdbcRowConverter jdbcRowConverter;
- private final Map<TablePath, SeaTunnelRowType> tables;
+ private final Map<TablePath, CatalogTable> tables;
private final ChunkSplitter chunkSplitter;
private transient String splitTableId;
- private transient SeaTunnelRowType splitRowType;
+ private transient TableSchema splitTableSchema;
private transient PreparedStatement statement;
private transient ResultSet resultSet;
private volatile boolean hasNext;
- public JdbcInputFormat(JdbcSourceConfig config, Map<TablePath, SeaTunnelRowType> tables) {
+ public JdbcInputFormat(JdbcSourceConfig config, Map<TablePath, CatalogTable> tables) {
this.jdbcDialect =
JdbcDialectLoader.load(
config.getJdbcConnectionConfig().getUrl(), config.getCompatibleMode());
@@ -91,7 +92,7 @@ public class JdbcInputFormat implements Serializable {
*/
public void open(JdbcSourceSplit inputSplit) throws IOException {
try {
- splitRowType = tables.get(inputSplit.getTablePath());
+ splitTableSchema = tables.get(inputSplit.getTablePath()).getTableSchema();
splitTableId = inputSplit.getTablePath().toString();
statement = chunkSplitter.generateSplitStatement(inputSplit);
@@ -142,7 +143,7 @@ public class JdbcInputFormat implements Serializable {
if (!hasNext) {
return null;
}
- SeaTunnelRow seaTunnelRow = jdbcRowConverter.toInternal(resultSet, splitRowType);
+ SeaTunnelRow seaTunnelRow = jdbcRowConverter.toInternal(resultSet, splitTableSchema);
seaTunnelRow.setTableId(splitTableId);
seaTunnelRow.setRowKind(RowKind.INSERT);
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
index 5c05c2feff..4a296f9b4d 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
@@ -17,10 +17,10 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal;
+import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
@@ -43,6 +43,7 @@ import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.IntFunction;
+import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
@@ -50,7 +51,7 @@ public class JdbcOutputFormatBuilder {
@NonNull private final JdbcDialect dialect;
@NonNull private final JdbcConnectionProvider connectionProvider;
@NonNull private final JdbcSinkConfig jdbcSinkConfig;
- @NonNull private final SeaTunnelRowType seaTunnelRowType;
+ @NonNull private final TableSchema tableSchema;
public JdbcOutputFormat build() {
JdbcOutputFormat.StatementExecutorFactory statementExecutorFactory;
@@ -67,11 +68,11 @@ public class JdbcOutputFormatBuilder {
() ->
createSimpleBufferedExecutor(
jdbcSinkConfig.getSimpleSql(),
- seaTunnelRowType,
+ tableSchema,
dialect.getRowConverter());
} else if (primaryKeys == null || primaryKeys.isEmpty()) {
statementExecutorFactory =
- () -> createSimpleBufferedExecutor(dialect, database, table, seaTunnelRowType);
+ () -> createSimpleBufferedExecutor(dialect, database, table, tableSchema);
} else {
statementExecutorFactory =
() ->
@@ -79,7 +80,7 @@ public class JdbcOutputFormatBuilder {
dialect,
database,
table,
- seaTunnelRowType,
+ tableSchema,
primaryKeys.toArray(new String[0]),
jdbcSinkConfig.isEnableUpsert(),
jdbcSinkConfig.isPrimaryKeyUpdated(),
@@ -93,15 +94,16 @@ public class JdbcOutputFormatBuilder {
}
private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleBufferedExecutor(
- JdbcDialect dialect, String database, String table, SeaTunnelRowType rowType) {
- String insertSQL = dialect.getInsertIntoStatement(database, table, rowType.getFieldNames());
- return createSimpleBufferedExecutor(insertSQL, rowType, dialect.getRowConverter());
+ JdbcDialect dialect, String database, String table, TableSchema tableSchema) {
+ String insertSQL =
+ dialect.getInsertIntoStatement(database, table, tableSchema.getFieldNames());
+ return createSimpleBufferedExecutor(insertSQL, tableSchema, dialect.getRowConverter());
}
private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleBufferedExecutor(
- String sql, SeaTunnelRowType rowType, JdbcRowConverter rowConverter) {
+ String sql, TableSchema tableSchema, JdbcRowConverter rowConverter) {
JdbcBatchStatementExecutor<SeaTunnelRow> simpleRowExecutor =
- createSimpleExecutor(sql, rowType, rowConverter);
+ createSimpleExecutor(sql, tableSchema, rowConverter);
return new BufferedBatchStatementExecutor(simpleRowExecutor, Function.identity());
}
@@ -109,28 +111,36 @@ public class JdbcOutputFormatBuilder {
JdbcDialect dialect,
String database,
String table,
- SeaTunnelRowType rowType,
+ TableSchema tableSchema,
String[] pkNames,
boolean enableUpsert,
boolean isPrimaryKeyUpdated,
boolean supportUpsertByInsertOnly) {
- int[] pkFields = Arrays.stream(pkNames).mapToInt(rowType::indexOf).toArray();
- SeaTunnelDataType[] pkTypes =
- Arrays.stream(pkFields)
- .mapToObj((IntFunction<SeaTunnelDataType>) rowType::getFieldType)
- .toArray(SeaTunnelDataType[]::new);
+ int[] pkFields =
+ Arrays.stream(pkNames)
+ .mapToInt(tableSchema.toPhysicalRowDataType()::indexOf)
+ .toArray();
+
+ TableSchema pkSchema =
+ TableSchema.builder()
+ .columns(
+ Arrays.stream(pkFields)
+ .mapToObj(
+ (IntFunction<Column>) tableSchema.getColumns()::get)
+ .collect(Collectors.toList()))
+ .build();
Function<SeaTunnelRow, SeaTunnelRow> keyExtractor = createKeyExtractor(pkFields);
JdbcBatchStatementExecutor<SeaTunnelRow> deleteExecutor =
- createDeleteExecutor(dialect, database, table, pkNames, pkTypes);
+ createDeleteExecutor(dialect, database, table, pkNames, pkSchema);
JdbcBatchStatementExecutor<SeaTunnelRow> upsertExecutor =
createUpsertExecutor(
dialect,
database,
table,
- rowType,
+ tableSchema,
pkNames,
- pkTypes,
+ pkSchema,
keyExtractor,
enableUpsert,
isPrimaryKeyUpdated,
@@ -143,47 +153,49 @@ public class JdbcOutputFormatBuilder {
JdbcDialect dialect,
String database,
String table,
- SeaTunnelRowType rowType,
+ TableSchema tableSchema,
String[] pkNames,
- SeaTunnelDataType[] pkTypes,
+ TableSchema pkTableSchema,
Function<SeaTunnelRow, SeaTunnelRow> keyExtractor,
boolean enableUpsert,
boolean isPrimaryKeyUpdated,
boolean supportUpsertByInsertOnly) {
if (supportUpsertByInsertOnly) {
- return createInsertOnlyExecutor(dialect, database, table, rowType);
+ return createInsertOnlyExecutor(dialect, database, table, tableSchema);
}
if (enableUpsert) {
Optional<String> upsertSQL =
- dialect.getUpsertStatement(database, table, rowType.getFieldNames(), pkNames);
+ dialect.getUpsertStatement(
+ database, table, tableSchema.getFieldNames(), pkNames);
if (upsertSQL.isPresent()) {
- return createSimpleExecutor(upsertSQL.get(), rowType, dialect.getRowConverter());
+ return createSimpleExecutor(
+ upsertSQL.get(), tableSchema, dialect.getRowConverter());
}
return createInsertOrUpdateByQueryExecutor(
dialect,
database,
table,
- rowType,
+ tableSchema,
pkNames,
- pkTypes,
+ pkTableSchema,
keyExtractor,
isPrimaryKeyUpdated);
}
return createInsertOrUpdateExecutor(
- dialect, database, table, rowType, pkNames, isPrimaryKeyUpdated);
+ dialect, database, table, tableSchema, pkNames, isPrimaryKeyUpdated);
}
private static JdbcBatchStatementExecutor<SeaTunnelRow> createInsertOnlyExecutor(
- JdbcDialect dialect, String database, String table, SeaTunnelRowType rowType) {
+ JdbcDialect dialect, String database, String table, TableSchema tableSchema) {
return new SimpleBatchStatementExecutor(
connection ->
FieldNamedPreparedStatement.prepareStatement(
connection,
dialect.getInsertIntoStatement(
- database, table, rowType.getFieldNames()),
- rowType.getFieldNames()),
- rowType,
+ database, table, tableSchema.getFieldNames()),
+ tableSchema.getFieldNames()),
+ tableSchema,
dialect.getRowConverter());
}
@@ -191,7 +203,7 @@ public class JdbcOutputFormatBuilder {
JdbcDialect dialect,
String database,
String table,
- SeaTunnelRowType rowType,
+ TableSchema tableSchema,
String[] pkNames,
boolean isPrimaryKeyUpdated) {
@@ -200,19 +212,19 @@ public class JdbcOutputFormatBuilder {
FieldNamedPreparedStatement.prepareStatement(
connection,
dialect.getInsertIntoStatement(
- database, table, rowType.getFieldNames()),
- rowType.getFieldNames()),
+ database, table, tableSchema.getFieldNames()),
+ tableSchema.getFieldNames()),
connection ->
FieldNamedPreparedStatement.prepareStatement(
connection,
dialect.getUpdateStatement(
database,
table,
- rowType.getFieldNames(),
+ tableSchema.getFieldNames(),
pkNames,
isPrimaryKeyUpdated),
- rowType.getFieldNames()),
- rowType,
+ tableSchema.getFieldNames()),
+ tableSchema,
dialect.getRowConverter());
}
@@ -220,12 +232,11 @@ public class JdbcOutputFormatBuilder {
JdbcDialect dialect,
String database,
String table,
- SeaTunnelRowType rowType,
+ TableSchema tableSchema,
String[] pkNames,
- SeaTunnelDataType[] pkTypes,
+ TableSchema pkTableSchema,
Function<SeaTunnelRow, SeaTunnelRow> keyExtractor,
boolean isPrimaryKeyUpdated) {
- SeaTunnelRowType keyRowType = new SeaTunnelRowType(pkNames, pkTypes);
return new InsertOrUpdateBatchStatementExecutor(
connection ->
FieldNamedPreparedStatement.prepareStatement(
@@ -236,21 +247,21 @@ public class JdbcOutputFormatBuilder {
FieldNamedPreparedStatement.prepareStatement(
connection,
dialect.getInsertIntoStatement(
- database, table, rowType.getFieldNames()),
- rowType.getFieldNames()),
+ database, table, tableSchema.getFieldNames()),
+ tableSchema.getFieldNames()),
connection ->
FieldNamedPreparedStatement.prepareStatement(
connection,
dialect.getUpdateStatement(
database,
table,
- rowType.getFieldNames(),
+ tableSchema.getFieldNames(),
pkNames,
isPrimaryKeyUpdated),
- rowType.getFieldNames()),
- keyRowType,
+ tableSchema.getFieldNames()),
+ pkTableSchema,
keyExtractor,
- rowType,
+ tableSchema,
dialect.getRowConverter());
}
@@ -259,27 +270,18 @@ public class JdbcOutputFormatBuilder {
String database,
String table,
String[] pkNames,
- SeaTunnelDataType[] pkTypes) {
+ TableSchema pkTableSchema) {
String deleteSQL = dialect.getDeleteStatement(database, table, pkNames);
- return createSimpleExecutor(deleteSQL, pkNames, pkTypes, dialect.getRowConverter());
- }
-
- private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleExecutor(
- String sql,
- String[] fieldNames,
- SeaTunnelDataType[] fieldTypes,
- JdbcRowConverter rowConverter) {
- SeaTunnelRowType rowType = new SeaTunnelRowType(fieldNames, fieldTypes);
- return createSimpleExecutor(sql, rowType, rowConverter);
+ return createSimpleExecutor(deleteSQL, pkTableSchema, dialect.getRowConverter());
}
private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleExecutor(
- String sql, SeaTunnelRowType rowType, JdbcRowConverter rowConverter) {
+ String sql, TableSchema tableSchema, JdbcRowConverter rowConverter) {
return new SimpleBatchStatementExecutor(
connection ->
FieldNamedPreparedStatement.prepareStatement(
- connection, sql, rowType.getFieldNames()),
- rowType,
+ connection, sql, tableSchema.getFieldNames()),
+ tableSchema,
rowConverter);
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
index 8dda7c362d..e18a16bdd9 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -47,7 +48,8 @@ public abstract class AbstractJdbcRowConverter implements JdbcRowConverter {
public AbstractJdbcRowConverter() {}
@Override
- public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws SQLException {
+ public SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws SQLException {
+ SeaTunnelRowType typeInfo = tableSchema.toPhysicalRowDataType();
Object[] fields = new Object[typeInfo.getTotalFields()];
for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); fieldIndex++) {
SeaTunnelDataType<?> seaTunnelDataType = typeInfo.getFieldType(fieldIndex);
@@ -117,8 +119,9 @@ public abstract class AbstractJdbcRowConverter implements JdbcRowConverter {
@Override
public PreparedStatement toExternal(
- SeaTunnelRowType rowType, SeaTunnelRow row, PreparedStatement statement)
+ TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement)
throws SQLException {
+ SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) {
SeaTunnelDataType<?> seaTunnelDataType = rowType.getFieldType(fieldIndex);
int statementIndex = fieldIndex + 1;
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java
index 1ce18732e2..a8c7c079d3 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import java.io.Serializable;
import java.sql.PreparedStatement;
@@ -36,9 +36,9 @@ public interface JdbcRowConverter extends Serializable {
*
* @param rs ResultSet from JDBC
*/
- SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws SQLException;
+ SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws SQLException;
PreparedStatement toExternal(
- SeaTunnelRowType rowType, SeaTunnelRow row, PreparedStatement statement)
+ TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement)
throws SQLException;
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java
index 91ed90105c..28f7fdf425 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
@@ -35,7 +35,7 @@ public class HiveJdbcRowConverter extends AbstractJdbcRowConverter {
@Override
public PreparedStatement toExternal(
- SeaTunnelRowType rowType, SeaTunnelRow row, PreparedStatement statement) {
+ TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement) {
throw new JdbcConnectorException(
JdbcConnectorErrorCode.DONT_SUPPORT_SINK,
"The Hive jdbc connector don't support sink");
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
index 23f71271a4..4aa41c0f4c 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -47,7 +48,8 @@ public class KingbaseJdbcRowConverter extends AbstractJdbcRowConverter {
@Override
@SuppressWarnings("checkstyle:Indentation")
- public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws SQLException {
+ public SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws SQLException {
+ SeaTunnelRowType typeInfo = tableSchema.toPhysicalRowDataType();
Object[] fields = new Object[typeInfo.getTotalFields()];
for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); fieldIndex++) {
SeaTunnelDataType<?> seaTunnelDataType = typeInfo.getFieldType(fieldIndex);
@@ -117,8 +119,9 @@ public class KingbaseJdbcRowConverter extends AbstractJdbcRowConverter {
@Override
public PreparedStatement toExternal(
- SeaTunnelRowType rowType, SeaTunnelRow row, PreparedStatement statement)
+ TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement)
throws SQLException {
+ SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) {
SeaTunnelDataType<?> seaTunnelDataType = rowType.getFieldType(fieldIndex);
int statementIndex = fieldIndex + 1;
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
index 8ddc0b2000..877f4fdd9a 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -45,7 +46,8 @@ public class PostgresJdbcRowConverter extends AbstractJdbcRowConverter {
}
@Override
- public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws SQLException {
+ public SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws SQLException {
+ SeaTunnelRowType typeInfo = tableSchema.toPhysicalRowDataType();
Object[] fields = new Object[typeInfo.getTotalFields()];
for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); fieldIndex++) {
SeaTunnelDataType<?> seaTunnelDataType = typeInfo.getFieldType(fieldIndex);
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/InsertOrUpdateBatchStatementExecutor.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/InsertOrUpdateBatchStatementExecutor.java
index 22cb784efe..8a4b2a579c 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/InsertOrUpdateBatchStatementExecutor.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/InsertOrUpdateBatchStatementExecutor.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
@@ -39,9 +39,9 @@ public class InsertOrUpdateBatchStatementExecutor
private final StatementFactory existStmtFactory;
@NonNull private final StatementFactory insertStmtFactory;
@NonNull private final StatementFactory updateStmtFactory;
- private final SeaTunnelRowType keyRowType;
+ private final TableSchema keyTableSchema;
private final Function<SeaTunnelRow, SeaTunnelRow> keyExtractor;
- @NonNull private final SeaTunnelRowType valueRowType;
+ @NonNull private final TableSchema valueTableSchema;
@NonNull private final JdbcRowConverter rowConverter;
private transient PreparedStatement existStatement;
private transient PreparedStatement insertStatement;
@@ -52,9 +52,16 @@ public class InsertOrUpdateBatchStatementExecutor
public InsertOrUpdateBatchStatementExecutor(
StatementFactory insertStmtFactory,
StatementFactory updateStmtFactory,
- SeaTunnelRowType valueRowType,
+ TableSchema valueTableSchema,
JdbcRowConverter rowConverter) {
- this(null, insertStmtFactory, updateStmtFactory, null, null, valueRowType, rowConverter);
+ this(
+ null,
+ insertStmtFactory,
+ updateStmtFactory,
+ null,
+ null,
+ valueTableSchema,
+ rowConverter);
}
@Override
@@ -74,14 +81,14 @@ public class InsertOrUpdateBatchStatementExecutor
insertStatement.executeBatch();
insertStatement.clearBatch();
}
- rowConverter.toExternal(valueRowType, record, updateStatement);
+ rowConverter.toExternal(valueTableSchema, record, updateStatement);
updateStatement.addBatch();
} else {
if (preExistFlag != null && preExistFlag) {
updateStatement.executeBatch();
updateStatement.clearBatch();
}
- rowConverter.toExternal(valueRowType, record, insertStatement);
+ rowConverter.toExternal(valueTableSchema, record, insertStatement);
insertStatement.addBatch();
}
@@ -137,7 +144,7 @@ public class InsertOrUpdateBatchStatementExecutor
}
private boolean exist(SeaTunnelRow pk) throws SQLException {
- rowConverter.toExternal(keyRowType, pk, existStatement);
+ rowConverter.toExternal(keyTableSchema, pk, existStatement);
try (ResultSet resultSet = existStatement.executeQuery()) {
return resultSet.next();
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java
index 08b436eee6..a2f0add260 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import lombok.NonNull;
@@ -31,7 +31,7 @@ import java.sql.SQLException;
@RequiredArgsConstructor
public class SimpleBatchStatementExecutor implements JdbcBatchStatementExecutor<SeaTunnelRow> {
@NonNull private final StatementFactory statementFactory;
- @NonNull private final SeaTunnelRowType rowType;
+ @NonNull private final TableSchema tableSchema;
@NonNull private final JdbcRowConverter converter;
private transient PreparedStatement statement;
@@ -42,7 +42,7 @@ public class SimpleBatchStatementExecutor implements JdbcBatchStatementExecutor<
@Override
public void addToBatch(SeaTunnelRow record) throws SQLException {
- converter.toExternal(rowType, record, statement);
+ converter.toExternal(tableSchema, record, statement);
statement.addBatch();
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
index 854e60e8c9..77e673ae6b 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
@@ -20,8 +20,8 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
@@ -84,7 +84,7 @@ public class JdbcExactlyOnceSinkWriter
JobContext context,
JdbcDialect dialect,
JdbcSinkConfig jdbcSinkConfig,
- SeaTunnelRowType rowType,
+ TableSchema tableSchema,
List<JdbcSinkState> states) {
checkArgument(
jdbcSinkConfig.getJdbcConnectionConfig().getMaxRetries() == 0,
@@ -99,7 +99,7 @@ public class JdbcExactlyOnceSinkWriter
this.xaFacade =
XaFacade.fromJdbcConnectionOptions(jdbcSinkConfig.getJdbcConnectionConfig());
this.outputFormat =
- new JdbcOutputFormatBuilder(dialect, xaFacade, jdbcSinkConfig, rowType).build();
+ new JdbcOutputFormatBuilder(dialect, xaFacade, jdbcSinkConfig, tableSchema).build();
this.xaGroupOps = new XaGroupOpsImpl(xaFacade);
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 4234503a09..3fcb1fc832 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -33,8 +33,8 @@ import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
@@ -60,7 +60,7 @@ public class JdbcSink
SupportSaveMode,
SupportMultiTableSink {
- private final SeaTunnelRowType seaTunnelRowType;
+ private final TableSchema tableSchema;
private JobContext jobContext;
@@ -89,7 +89,7 @@ public class JdbcSink
this.schemaSaveMode = schemaSaveMode;
this.dataSaveMode = dataSaveMode;
this.catalogTable = catalogTable;
- this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
+ this.tableSchema = catalogTable.getTableSchema();
}
@Override
@@ -108,20 +108,17 @@ public class JdbcSink
jobContext,
dialect,
jdbcSinkConfig,
- seaTunnelRowType,
+ tableSchema,
new ArrayList<>());
} else {
if (catalogTable != null && catalogTable.getTableSchema().getPrimaryKey() != null) {
- String keyName =
- catalogTable.getTableSchema().getPrimaryKey().getColumnNames().get(0);
- int index = seaTunnelRowType.indexOf(keyName);
+ String keyName = tableSchema.getPrimaryKey().getColumnNames().get(0);
+ int index = tableSchema.toPhysicalRowDataType().indexOf(keyName);
if (index > -1) {
- return new JdbcSinkWriter(
- context, dialect, jdbcSinkConfig, seaTunnelRowType, index);
+ return new JdbcSinkWriter(context, dialect, jdbcSinkConfig, tableSchema, index);
}
}
- sinkWriter =
- new JdbcSinkWriter(context, dialect, jdbcSinkConfig, seaTunnelRowType, null);
+ sinkWriter = new JdbcSinkWriter(context, dialect, jdbcSinkConfig, tableSchema, null);
}
return sinkWriter;
}
@@ -131,7 +128,7 @@ public class JdbcSink
SinkWriter.Context context, List<JdbcSinkState> states) throws IOException {
if (jdbcSinkConfig.isExactlyOnce()) {
return new JdbcExactlyOnceSinkWriter(
- context, jobContext, dialect, jdbcSinkConfig, seaTunnelRowType, states);
+ context, jobContext, dialect, jdbcSinkConfig, tableSchema, states);
}
return SeaTunnelSink.super.restoreWriter(context, states);
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index 1fbcbe2ead..abb447c085 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -20,8 +20,8 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
@@ -49,7 +49,7 @@ public class JdbcSinkWriter
private JdbcOutputFormat<SeaTunnelRow, JdbcBatchStatementExecutor<SeaTunnelRow>> outputFormat;
private final SinkWriter.Context context;
private final JdbcDialect dialect;
- private final SeaTunnelRowType rowType;
+ private final TableSchema tableSchema;
private JdbcConnectionProvider connectionProvider;
private transient boolean isOpen;
private final Integer primaryKeyIndex;
@@ -59,17 +59,18 @@ public class JdbcSinkWriter
SinkWriter.Context context,
JdbcDialect dialect,
JdbcSinkConfig jdbcSinkConfig,
- SeaTunnelRowType rowType,
+ TableSchema tableSchema,
Integer primaryKeyIndex) {
this.context = context;
this.jdbcSinkConfig = jdbcSinkConfig;
this.dialect = dialect;
- this.rowType = rowType;
+ this.tableSchema = tableSchema;
this.primaryKeyIndex = primaryKeyIndex;
this.connectionProvider =
dialect.getJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig());
this.outputFormat =
- new JdbcOutputFormatBuilder(dialect, connectionProvider, jdbcSinkConfig, rowType)
+ new JdbcOutputFormatBuilder(
+ dialect, connectionProvider, jdbcSinkConfig, tableSchema)
.build();
}
@@ -101,7 +102,8 @@ public class JdbcSinkWriter
jdbcSinkConfig.getJdbcConnectionConfig(),
queueIndex);
this.outputFormat =
- new JdbcOutputFormatBuilder(dialect, connectionProvider, jdbcSinkConfig, rowType)
+ new JdbcOutputFormatBuilder(
+ dialect, connectionProvider, jdbcSinkConfig, tableSchema)
.build();
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index 1e3797c476..8443aeb044 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -27,7 +27,6 @@ import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcCatalogUtils;
@@ -80,15 +79,9 @@ public class JdbcSource
@Override
public SourceReader<SeaTunnelRow, JdbcSourceSplit> createReader(
SourceReader.Context readerContext) throws Exception {
- Map<TablePath, SeaTunnelRowType> tables = new HashMap<>();
+ Map<TablePath, CatalogTable> tables = new HashMap<>();
for (TablePath tablePath : jdbcSourceTables.keySet()) {
- SeaTunnelRowType rowType =
- jdbcSourceTables
- .get(tablePath)
- .getCatalogTable()
- .getTableSchema()
- .toPhysicalRowDataType();
- tables.put(tablePath, rowType);
+ tables.put(tablePath, jdbcSourceTables.get(tablePath).getCatalogTable());
}
return new JdbcSourceReader(readerContext, jdbcSourceConfig, tables);
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java
index 2139412aac..f1b85dbfc7 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java
@@ -19,9 +19,9 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcInputFormat;
@@ -42,7 +42,7 @@ public class JdbcSourceReader implements SourceReader<SeaTunnelRow, JdbcSourceSp
private volatile boolean noMoreSplit;
public JdbcSourceReader(
- Context context, JdbcSourceConfig config, Map<TablePath, SeaTunnelRowType> tables) {
+ Context context, JdbcSourceConfig config, Map<TablePath, CatalogTable> tables) {
this.inputFormat = new JdbcInputFormat(config, tables);
this.context = context;
}