You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/08 02:35:58 UTC
[inlong] branch master updated: [INLONG-5315][Sort] Import all changelog mode data ingest into JDBC (#5316)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6ee93e746 [INLONG-5315][Sort] Import all changelog mode data ingest into JDBC (#5316)
6ee93e746 is described below
commit 6ee93e74656dafd3a83ea2b607bb4b7d07f8dc2f
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Mon Aug 8 10:35:54 2022 +0800
[INLONG-5315][Sort] Import all changelog mode data ingest into JDBC (#5316)
Co-authored-by: thesumery <15...@qq.com>
---
.../jdbc/table/JdbcDynamicOutputFormatBuilder.java | 277 +++++++++++++++++++++
.../sort/jdbc/table/JdbcDynamicTableFactory.java | 16 +-
.../sort/jdbc/table/JdbcDynamicTableSink.java | 132 ++++++++++
licenses/inlong-sort-connectors/LICENSE | 8 +
4 files changed, 430 insertions(+), 3 deletions(-)
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
new file mode 100644
index 000000000..dd49d0215
--- /dev/null
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.table;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.TableBufferedStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.TableInsertOrUpdateStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.function.Function;
+
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Copy from org.apache.flink:flink-connector-jdbc_2.11:1.13.5
+ *
+ * Builder for {@link JdbcBatchingOutputFormat} for Table/SQL.
+ * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
+ */
+public class JdbcDynamicOutputFormatBuilder implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private JdbcOptions jdbcOptions;
+ private JdbcExecutionOptions executionOptions;
+ private JdbcDmlOptions dmlOptions;
+ private boolean appendMode;
+ private TypeInformation<RowData> rowDataTypeInformation;
+ private DataType[] fieldDataTypes;
+
+ public JdbcDynamicOutputFormatBuilder() {
+
+ }
+
+ public JdbcDynamicOutputFormatBuilder setAppendMode(boolean appendMode) {
+ this.appendMode = appendMode;
+ return this;
+ }
+
+ public JdbcDynamicOutputFormatBuilder setJdbcOptions(JdbcOptions jdbcOptions) {
+ this.jdbcOptions = jdbcOptions;
+ return this;
+ }
+
+ public JdbcDynamicOutputFormatBuilder setJdbcExecutionOptions(
+ JdbcExecutionOptions executionOptions) {
+ this.executionOptions = executionOptions;
+ return this;
+ }
+
+ public JdbcDynamicOutputFormatBuilder setJdbcDmlOptions(JdbcDmlOptions dmlOptions) {
+ this.dmlOptions = dmlOptions;
+ return this;
+ }
+
+ public JdbcDynamicOutputFormatBuilder setRowDataTypeInfo(
+ TypeInformation<RowData> rowDataTypeInfo) {
+ this.rowDataTypeInformation = rowDataTypeInfo;
+ return this;
+ }
+
+ public JdbcDynamicOutputFormatBuilder setFieldDataTypes(DataType[] fieldDataTypes) {
+ this.fieldDataTypes = fieldDataTypes;
+ return this;
+ }
+
+ public JdbcBatchingOutputFormat<RowData, ?, ?> build() {
+ checkNotNull(jdbcOptions, "jdbc options can not be null");
+ checkNotNull(dmlOptions, "jdbc dml options can not be null");
+ checkNotNull(executionOptions, "jdbc execution options can not be null");
+
+ final LogicalType[] logicalTypes =
+ Arrays.stream(fieldDataTypes)
+ .map(DataType::getLogicalType)
+ .toArray(LogicalType[]::new);
+ if (dmlOptions.getKeyFields().isPresent() && dmlOptions.getKeyFields().get().length > 0 && !appendMode) {
+ // upsert query
+ return new JdbcBatchingOutputFormat<>(
+ new SimpleJdbcConnectionProvider(jdbcOptions),
+ executionOptions,
+ ctx ->
+ createBufferReduceExecutor(
+ dmlOptions, ctx, rowDataTypeInformation, logicalTypes),
+ JdbcBatchingOutputFormat.RecordExtractor.identity());
+ } else {
+ // append only query
+ final String sql =
+ dmlOptions
+ .getDialect()
+ .getInsertIntoStatement(
+ dmlOptions.getTableName(), dmlOptions.getFieldNames());
+ return new JdbcBatchingOutputFormat<>(
+ new SimpleJdbcConnectionProvider(jdbcOptions),
+ executionOptions,
+ ctx ->
+ createSimpleBufferedExecutor(
+ ctx,
+ dmlOptions.getDialect(),
+ dmlOptions.getFieldNames(),
+ logicalTypes,
+ sql,
+ rowDataTypeInformation),
+ JdbcBatchingOutputFormat.RecordExtractor.identity());
+ }
+ }
+
+ private static JdbcBatchStatementExecutor<RowData> createBufferReduceExecutor(
+ JdbcDmlOptions opt,
+ RuntimeContext ctx,
+ TypeInformation<RowData> rowDataTypeInfo,
+ LogicalType[] fieldTypes) {
+ checkArgument(opt.getKeyFields().isPresent());
+ JdbcDialect dialect = opt.getDialect();
+ String tableName = opt.getTableName();
+ String[] pkNames = opt.getKeyFields().get();
+ int[] pkFields =
+ Arrays.stream(pkNames)
+ .mapToInt(Arrays.asList(opt.getFieldNames())::indexOf)
+ .toArray();
+ LogicalType[] pkTypes =
+ Arrays.stream(pkFields).mapToObj(f -> fieldTypes[f]).toArray(LogicalType[]::new);
+ final TypeSerializer<RowData> typeSerializer =
+ rowDataTypeInfo.createSerializer(ctx.getExecutionConfig());
+ final Function<RowData, RowData> valueTransform =
+ ctx.getExecutionConfig().isObjectReuseEnabled()
+ ? typeSerializer::copy
+ : Function.identity();
+
+ return new TableBufferReducedStatementExecutor(
+ createUpsertRowExecutor(
+ dialect,
+ tableName,
+ opt.getFieldNames(),
+ fieldTypes,
+ pkFields,
+ pkNames,
+ pkTypes),
+ createDeleteExecutor(dialect, tableName, pkNames, pkTypes),
+ createRowKeyExtractor(fieldTypes, pkFields),
+ valueTransform);
+ }
+
+ private static JdbcBatchStatementExecutor<RowData> createSimpleBufferedExecutor(
+ RuntimeContext ctx,
+ JdbcDialect dialect,
+ String[] fieldNames,
+ LogicalType[] fieldTypes,
+ String sql,
+ TypeInformation<RowData> rowDataTypeInfo) {
+ final TypeSerializer<RowData> typeSerializer =
+ rowDataTypeInfo.createSerializer(ctx.getExecutionConfig());
+ return new TableBufferedStatementExecutor(
+ createSimpleRowExecutor(dialect, fieldNames, fieldTypes, sql),
+ ctx.getExecutionConfig().isObjectReuseEnabled()
+ ? typeSerializer::copy
+ : Function.identity());
+ }
+
+ private static JdbcBatchStatementExecutor<RowData> createUpsertRowExecutor(
+ JdbcDialect dialect,
+ String tableName,
+ String[] fieldNames,
+ LogicalType[] fieldTypes,
+ int[] pkFields,
+ String[] pkNames,
+ LogicalType[] pkTypes) {
+ return dialect.getUpsertStatement(tableName, fieldNames, pkNames)
+ .map(sql -> createSimpleRowExecutor(dialect, fieldNames, fieldTypes, sql))
+ .orElseGet(
+ () ->
+ createInsertOrUpdateExecutor(
+ dialect,
+ tableName,
+ fieldNames,
+ fieldTypes,
+ pkFields,
+ pkNames,
+ pkTypes));
+ }
+
+ private static JdbcBatchStatementExecutor<RowData> createDeleteExecutor(
+ JdbcDialect dialect, String tableName, String[] pkNames, LogicalType[] pkTypes) {
+ String deleteSql = dialect.getDeleteStatement(tableName, pkNames);
+ return createSimpleRowExecutor(dialect, pkNames, pkTypes, deleteSql);
+ }
+
+ private static JdbcBatchStatementExecutor<RowData> createSimpleRowExecutor(
+ JdbcDialect dialect, String[] fieldNames, LogicalType[] fieldTypes, final String sql) {
+ final JdbcRowConverter rowConverter = dialect.getRowConverter(RowType.of(fieldTypes));
+ return new TableSimpleStatementExecutor(
+ connection ->
+ FieldNamedPreparedStatement.prepareStatement(connection, sql, fieldNames),
+ rowConverter);
+ }
+
+ private static JdbcBatchStatementExecutor<RowData> createInsertOrUpdateExecutor(
+ JdbcDialect dialect,
+ String tableName,
+ String[] fieldNames,
+ LogicalType[] fieldTypes,
+ int[] pkFields,
+ String[] pkNames,
+ LogicalType[] pkTypes) {
+ final String existStmt = dialect.getRowExistsStatement(tableName, pkNames);
+ final String insertStmt = dialect.getInsertIntoStatement(tableName, fieldNames);
+ final String updateStmt = dialect.getUpdateStatement(tableName, fieldNames, pkNames);
+ return new TableInsertOrUpdateStatementExecutor(
+ connection ->
+ FieldNamedPreparedStatement.prepareStatement(
+ connection, existStmt, pkNames),
+ connection ->
+ FieldNamedPreparedStatement.prepareStatement(
+ connection, insertStmt, fieldNames),
+ connection ->
+ FieldNamedPreparedStatement.prepareStatement(
+ connection, updateStmt, fieldNames),
+ dialect.getRowConverter(RowType.of(pkTypes)),
+ dialect.getRowConverter(RowType.of(fieldTypes)),
+ dialect.getRowConverter(RowType.of(fieldTypes)),
+ createRowKeyExtractor(fieldTypes, pkFields));
+ }
+
+ private static Function<RowData, RowData> createRowKeyExtractor(
+ LogicalType[] logicalTypes, int[] pkFields) {
+ final RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[pkFields.length];
+ for (int i = 0; i < pkFields.length; i++) {
+ fieldGetters[i] = createFieldGetter(logicalTypes[pkFields[i]], pkFields[i]);
+ }
+ return row -> getPrimaryKey(row, fieldGetters);
+ }
+
+ private static RowData getPrimaryKey(RowData row, RowData.FieldGetter[] fieldGetters) {
+ GenericRowData pkRow = new GenericRowData(fieldGetters.length);
+ for (int i = 0; i < fieldGetters.length; i++) {
+ pkRow.setField(i, fieldGetters[i].getFieldOrNull(row));
+ }
+ return pkRow;
+ }
+}
+
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
index 1aa99c21b..8712df945 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
@@ -27,7 +27,6 @@ import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSink;
import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -50,7 +49,10 @@ import static org.apache.flink.util.Preconditions.checkState;
* Copy from org.apache.flink:flink-connector-jdbc_2.11:1.13.5
* <p>
* Factory for creating configured instances of {@link JdbcDynamicTableSource} and {@link
- * JdbcDynamicTableSink}.We modify it to strengthen capacity of registering other dialect.</p>
+ * JdbcDynamicTableSink}.
+ * We modify it to strengthen capacity of registering other dialect.
+ * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
+ * </p>
*/
public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
@@ -166,6 +168,11 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
.intType()
.defaultValue(3)
.withDescription("The max retry times if writing records to database failed.");
+ private static final ConfigOption<Boolean> SINK_APPEND_MODE =
+ ConfigOptions.key("sink.ignore.changelog")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to support sink update/delete data without primaryKey.");
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
@@ -178,12 +185,14 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
JdbcOptions jdbcOptions = getJdbcOptions(config);
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+ boolean appendMode = config.get(SINK_APPEND_MODE);
return new JdbcDynamicTableSink(
jdbcOptions,
getJdbcExecutionOptions(config),
getJdbcDmlOptions(jdbcOptions, physicalSchema),
- physicalSchema);
+ physicalSchema,
+ appendMode);
}
@Override
@@ -305,6 +314,7 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
optionalOptions.add(SINK_BUFFER_FLUSH_MAX_ROWS);
optionalOptions.add(SINK_BUFFER_FLUSH_INTERVAL);
optionalOptions.add(SINK_MAX_RETRIES);
+ optionalOptions.add(SINK_APPEND_MODE);
optionalOptions.add(FactoryUtil.SINK_PARALLELISM);
optionalOptions.add(MAX_RETRY_TIMEOUT);
optionalOptions.add(DIALECT_IMPL);
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
new file mode 100644
index 000000000..7ed1bcd59
--- /dev/null
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Copy from org.apache.flink:flink-connector-jdbc_2.11:1.13.5
+ *
+ * A {@link DynamicTableSink} for JDBC.
+ * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
+ */
+@Internal
+public class JdbcDynamicTableSink implements DynamicTableSink {
+
+ private final JdbcOptions jdbcOptions;
+ private final JdbcExecutionOptions executionOptions;
+ private final JdbcDmlOptions dmlOptions;
+ private final TableSchema tableSchema;
+ private final String dialectName;
+ private final boolean appendMode;
+
+ public JdbcDynamicTableSink(
+ JdbcOptions jdbcOptions,
+ JdbcExecutionOptions executionOptions,
+ JdbcDmlOptions dmlOptions,
+ TableSchema tableSchema,
+ boolean appendMode) {
+ this.jdbcOptions = jdbcOptions;
+ this.executionOptions = executionOptions;
+ this.dmlOptions = dmlOptions;
+ this.tableSchema = tableSchema;
+ this.dialectName = dmlOptions.getDialect().dialectName();
+ this.appendMode = appendMode;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ validatePrimaryKey(requestedMode);
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.DELETE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .build();
+ }
+
+ private void validatePrimaryKey(ChangelogMode requestedMode) {
+ checkState(
+ ChangelogMode.insertOnly().equals(requestedMode)
+ || dmlOptions.getKeyFields().isPresent() || appendMode,
+ "please declare primary key or appendMode for sink table when query contains update/delete record.");
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ final TypeInformation<RowData> rowDataTypeInformation =
+ context.createTypeInformation(tableSchema.toRowDataType());
+ final JdbcDynamicOutputFormatBuilder builder = new JdbcDynamicOutputFormatBuilder();
+
+ builder.setAppendMode(appendMode);
+ builder.setJdbcOptions(jdbcOptions);
+ builder.setJdbcDmlOptions(dmlOptions);
+ builder.setJdbcExecutionOptions(executionOptions);
+ builder.setRowDataTypeInfo(rowDataTypeInformation);
+ builder.setFieldDataTypes(tableSchema.getFieldDataTypes());
+
+ return SinkFunctionProvider.of(
+ new GenericJdbcSinkFunction<>(builder.build()), jdbcOptions.getParallelism());
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return new JdbcDynamicTableSink(jdbcOptions, executionOptions, dmlOptions, tableSchema, appendMode);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "JDBC:" + dialectName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof JdbcDynamicTableSink)) {
+ return false;
+ }
+ JdbcDynamicTableSink that = (JdbcDynamicTableSink) o;
+ return Objects.equals(jdbcOptions, that.jdbcOptions)
+ && Objects.equals(executionOptions, that.executionOptions)
+ && Objects.equals(dmlOptions, that.dmlOptions)
+ && Objects.equals(tableSchema, that.tableSchema)
+ && Objects.equals(dialectName, that.dialectName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jdbcOptions, executionOptions, dmlOptions, tableSchema, dialectName);
+ }
+}
\ No newline at end of file
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 73bf1dcdd..db56dcfee 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -502,6 +502,14 @@
Source : iceberg-flink-runtime-1.13 1.13.5 (Please note that the software have been modified.)
License : https://github.com/apache/iceberg/LICENSE
+ 1.3.7 inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/AbstractJdbcDialect.java
+ inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
+ inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
+ inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
+ inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
+ Source : org.apache.flink:flink-connector-jdbc_2.11:1.13.5 (Please note that the software have been modified.)
+ License : https://github.com/apache/flink/blob/master/LICENSE
+
=======================================================================
Apache InLong Subcomponents: