You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/08 02:35:58 UTC

[inlong] branch master updated: [INLONG-5315][Sort] Import all changelog mode data ingest into JDBC (#5316)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ee93e746 [INLONG-5315][Sort] Import all changelog mode data ingest into JDBC (#5316)
6ee93e746 is described below

commit 6ee93e74656dafd3a83ea2b607bb4b7d07f8dc2f
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Mon Aug 8 10:35:54 2022 +0800

    [INLONG-5315][Sort] Import all changelog mode data ingest into JDBC (#5316)
    
    Co-authored-by: thesumery <15...@qq.com>
---
 .../jdbc/table/JdbcDynamicOutputFormatBuilder.java | 277 +++++++++++++++++++++
 .../sort/jdbc/table/JdbcDynamicTableFactory.java   |  16 +-
 .../sort/jdbc/table/JdbcDynamicTableSink.java      | 132 ++++++++++
 licenses/inlong-sort-connectors/LICENSE            |   8 +
 4 files changed, 430 insertions(+), 3 deletions(-)

diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
new file mode 100644
index 000000000..dd49d0215
--- /dev/null
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
@@ -0,0 +1,277 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.table;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.TableBufferedStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.TableInsertOrUpdateStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.function.Function;
+
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Copy from org.apache.flink:flink-connector-jdbc_2.11:1.13.5
+ *
+ * Builder for {@link JdbcBatchingOutputFormat} for Table/SQL.
+ * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
+ */
+public class JdbcDynamicOutputFormatBuilder implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private JdbcOptions jdbcOptions;
+    private JdbcExecutionOptions executionOptions;
+    private JdbcDmlOptions dmlOptions;
+    private boolean appendMode;
+    private TypeInformation<RowData> rowDataTypeInformation;
+    private DataType[] fieldDataTypes;
+
+    public JdbcDynamicOutputFormatBuilder() {
+
+    }
+
+    public JdbcDynamicOutputFormatBuilder setAppendMode(boolean appendMode) {
+        this.appendMode = appendMode;
+        return this;
+    }
+
+    public JdbcDynamicOutputFormatBuilder setJdbcOptions(JdbcOptions jdbcOptions) {
+        this.jdbcOptions = jdbcOptions;
+        return this;
+    }
+
+    public JdbcDynamicOutputFormatBuilder setJdbcExecutionOptions(
+            JdbcExecutionOptions executionOptions) {
+        this.executionOptions = executionOptions;
+        return this;
+    }
+
+    public JdbcDynamicOutputFormatBuilder setJdbcDmlOptions(JdbcDmlOptions dmlOptions) {
+        this.dmlOptions = dmlOptions;
+        return this;
+    }
+
+    public JdbcDynamicOutputFormatBuilder setRowDataTypeInfo(
+            TypeInformation<RowData> rowDataTypeInfo) {
+        this.rowDataTypeInformation = rowDataTypeInfo;
+        return this;
+    }
+
+    public JdbcDynamicOutputFormatBuilder setFieldDataTypes(DataType[] fieldDataTypes) {
+        this.fieldDataTypes = fieldDataTypes;
+        return this;
+    }
+
+    public JdbcBatchingOutputFormat<RowData, ?, ?> build() {
+        checkNotNull(jdbcOptions, "jdbc options can not be null");
+        checkNotNull(dmlOptions, "jdbc dml options can not be null");
+        checkNotNull(executionOptions, "jdbc execution options can not be null");
+
+        final LogicalType[] logicalTypes =
+                Arrays.stream(fieldDataTypes)
+                        .map(DataType::getLogicalType)
+                        .toArray(LogicalType[]::new);
+        if (dmlOptions.getKeyFields().isPresent() && dmlOptions.getKeyFields().get().length > 0 && !appendMode) {
+            // upsert query
+            return new JdbcBatchingOutputFormat<>(
+                    new SimpleJdbcConnectionProvider(jdbcOptions),
+                    executionOptions,
+                    ctx ->
+                            createBufferReduceExecutor(
+                                    dmlOptions, ctx, rowDataTypeInformation, logicalTypes),
+                    JdbcBatchingOutputFormat.RecordExtractor.identity());
+        } else {
+            // append only query
+            final String sql =
+                    dmlOptions
+                            .getDialect()
+                            .getInsertIntoStatement(
+                                    dmlOptions.getTableName(), dmlOptions.getFieldNames());
+            return new JdbcBatchingOutputFormat<>(
+                    new SimpleJdbcConnectionProvider(jdbcOptions),
+                    executionOptions,
+                    ctx ->
+                            createSimpleBufferedExecutor(
+                                    ctx,
+                                    dmlOptions.getDialect(),
+                                    dmlOptions.getFieldNames(),
+                                    logicalTypes,
+                                    sql,
+                                    rowDataTypeInformation),
+                    JdbcBatchingOutputFormat.RecordExtractor.identity());
+        }
+    }
+
+    private static JdbcBatchStatementExecutor<RowData> createBufferReduceExecutor(
+            JdbcDmlOptions opt,
+            RuntimeContext ctx,
+            TypeInformation<RowData> rowDataTypeInfo,
+            LogicalType[] fieldTypes) {
+        checkArgument(opt.getKeyFields().isPresent());
+        JdbcDialect dialect = opt.getDialect();
+        String tableName = opt.getTableName();
+        String[] pkNames = opt.getKeyFields().get();
+        int[] pkFields =
+                Arrays.stream(pkNames)
+                        .mapToInt(Arrays.asList(opt.getFieldNames())::indexOf)
+                        .toArray();
+        LogicalType[] pkTypes =
+                Arrays.stream(pkFields).mapToObj(f -> fieldTypes[f]).toArray(LogicalType[]::new);
+        final TypeSerializer<RowData> typeSerializer =
+                rowDataTypeInfo.createSerializer(ctx.getExecutionConfig());
+        final Function<RowData, RowData> valueTransform =
+                ctx.getExecutionConfig().isObjectReuseEnabled()
+                        ? typeSerializer::copy
+                        : Function.identity();
+
+        return new TableBufferReducedStatementExecutor(
+                createUpsertRowExecutor(
+                        dialect,
+                        tableName,
+                        opt.getFieldNames(),
+                        fieldTypes,
+                        pkFields,
+                        pkNames,
+                        pkTypes),
+                createDeleteExecutor(dialect, tableName, pkNames, pkTypes),
+                createRowKeyExtractor(fieldTypes, pkFields),
+                valueTransform);
+    }
+
+    private static JdbcBatchStatementExecutor<RowData> createSimpleBufferedExecutor(
+            RuntimeContext ctx,
+            JdbcDialect dialect,
+            String[] fieldNames,
+            LogicalType[] fieldTypes,
+            String sql,
+            TypeInformation<RowData> rowDataTypeInfo) {
+        final TypeSerializer<RowData> typeSerializer =
+                rowDataTypeInfo.createSerializer(ctx.getExecutionConfig());
+        return new TableBufferedStatementExecutor(
+                createSimpleRowExecutor(dialect, fieldNames, fieldTypes, sql),
+                ctx.getExecutionConfig().isObjectReuseEnabled()
+                        ? typeSerializer::copy
+                        : Function.identity());
+    }
+
+    private static JdbcBatchStatementExecutor<RowData> createUpsertRowExecutor(
+            JdbcDialect dialect,
+            String tableName,
+            String[] fieldNames,
+            LogicalType[] fieldTypes,
+            int[] pkFields,
+            String[] pkNames,
+            LogicalType[] pkTypes) {
+        return dialect.getUpsertStatement(tableName, fieldNames, pkNames)
+                .map(sql -> createSimpleRowExecutor(dialect, fieldNames, fieldTypes, sql))
+                .orElseGet(
+                        () ->
+                                createInsertOrUpdateExecutor(
+                                        dialect,
+                                        tableName,
+                                        fieldNames,
+                                        fieldTypes,
+                                        pkFields,
+                                        pkNames,
+                                        pkTypes));
+    }
+
+    private static JdbcBatchStatementExecutor<RowData> createDeleteExecutor(
+            JdbcDialect dialect, String tableName, String[] pkNames, LogicalType[] pkTypes) {
+        String deleteSql = dialect.getDeleteStatement(tableName, pkNames);
+        return createSimpleRowExecutor(dialect, pkNames, pkTypes, deleteSql);
+    }
+
+    private static JdbcBatchStatementExecutor<RowData> createSimpleRowExecutor(
+            JdbcDialect dialect, String[] fieldNames, LogicalType[] fieldTypes, final String sql) {
+        final JdbcRowConverter rowConverter = dialect.getRowConverter(RowType.of(fieldTypes));
+        return new TableSimpleStatementExecutor(
+                connection ->
+                        FieldNamedPreparedStatement.prepareStatement(connection, sql, fieldNames),
+                rowConverter);
+    }
+
+    private static JdbcBatchStatementExecutor<RowData> createInsertOrUpdateExecutor(
+            JdbcDialect dialect,
+            String tableName,
+            String[] fieldNames,
+            LogicalType[] fieldTypes,
+            int[] pkFields,
+            String[] pkNames,
+            LogicalType[] pkTypes) {
+        final String existStmt = dialect.getRowExistsStatement(tableName, pkNames);
+        final String insertStmt = dialect.getInsertIntoStatement(tableName, fieldNames);
+        final String updateStmt = dialect.getUpdateStatement(tableName, fieldNames, pkNames);
+        return new TableInsertOrUpdateStatementExecutor(
+                connection ->
+                        FieldNamedPreparedStatement.prepareStatement(
+                                connection, existStmt, pkNames),
+                connection ->
+                        FieldNamedPreparedStatement.prepareStatement(
+                                connection, insertStmt, fieldNames),
+                connection ->
+                        FieldNamedPreparedStatement.prepareStatement(
+                                connection, updateStmt, fieldNames),
+                dialect.getRowConverter(RowType.of(pkTypes)),
+                dialect.getRowConverter(RowType.of(fieldTypes)),
+                dialect.getRowConverter(RowType.of(fieldTypes)),
+                createRowKeyExtractor(fieldTypes, pkFields));
+    }
+
+    private static Function<RowData, RowData> createRowKeyExtractor(
+            LogicalType[] logicalTypes, int[] pkFields) {
+        final RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[pkFields.length];
+        for (int i = 0; i < pkFields.length; i++) {
+            fieldGetters[i] = createFieldGetter(logicalTypes[pkFields[i]], pkFields[i]);
+        }
+        return row -> getPrimaryKey(row, fieldGetters);
+    }
+
+    private static RowData getPrimaryKey(RowData row, RowData.FieldGetter[] fieldGetters) {
+        GenericRowData pkRow = new GenericRowData(fieldGetters.length);
+        for (int i = 0; i < fieldGetters.length; i++) {
+            pkRow.setField(i, fieldGetters[i].getFieldOrNull(row));
+        }
+        return pkRow;
+    }
+}
+
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
index 1aa99c21b..8712df945 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
@@ -27,7 +27,6 @@ import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSink;
 import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -50,7 +49,10 @@ import static org.apache.flink.util.Preconditions.checkState;
  * Copy from org.apache.flink:flink-connector-jdbc_2.11:1.13.5
  * <p>
  * Factory for creating configured instances of {@link JdbcDynamicTableSource} and {@link
- * JdbcDynamicTableSink}.We modify it to strengthen capacity of registering other dialect.</p>
+ * JdbcDynamicTableSink}.
+ * We modify it to strengthen capacity of registering other dialect.
+ * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
+ * </p>
  */
 public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
 
@@ -166,6 +168,11 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
                     .intType()
                     .defaultValue(3)
                     .withDescription("The max retry times if writing records to database failed.");
+    private static final ConfigOption<Boolean> SINK_APPEND_MODE =
+            ConfigOptions.key("sink.ignore.changelog")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether to support sink update/delete data without primaryKey.");
 
     @Override
     public DynamicTableSink createDynamicTableSink(Context context) {
@@ -178,12 +185,14 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
         JdbcOptions jdbcOptions = getJdbcOptions(config);
         TableSchema physicalSchema =
                 TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+        boolean appendMode = config.get(SINK_APPEND_MODE);
 
         return new JdbcDynamicTableSink(
                 jdbcOptions,
                 getJdbcExecutionOptions(config),
                 getJdbcDmlOptions(jdbcOptions, physicalSchema),
-                physicalSchema);
+                physicalSchema,
+                appendMode);
     }
 
     @Override
@@ -305,6 +314,7 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
         optionalOptions.add(SINK_BUFFER_FLUSH_MAX_ROWS);
         optionalOptions.add(SINK_BUFFER_FLUSH_INTERVAL);
         optionalOptions.add(SINK_MAX_RETRIES);
+        optionalOptions.add(SINK_APPEND_MODE);
         optionalOptions.add(FactoryUtil.SINK_PARALLELISM);
         optionalOptions.add(MAX_RETRY_TIMEOUT);
         optionalOptions.add(DIALECT_IMPL);
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
new file mode 100644
index 000000000..7ed1bcd59
--- /dev/null
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
@@ -0,0 +1,132 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Copy from org.apache.flink:flink-connector-jdbc_2.11:1.13.5
+ *
+ * A {@link DynamicTableSink} for JDBC.
+ * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey. 
+ */
+@Internal
+public class JdbcDynamicTableSink implements DynamicTableSink {
+
+    private final JdbcOptions jdbcOptions;
+    private final JdbcExecutionOptions executionOptions;
+    private final JdbcDmlOptions dmlOptions;
+    private final TableSchema tableSchema;
+    private final String dialectName;
+    private final boolean appendMode;
+
+    public JdbcDynamicTableSink(
+            JdbcOptions jdbcOptions,
+            JdbcExecutionOptions executionOptions,
+            JdbcDmlOptions dmlOptions,
+            TableSchema tableSchema,
+            boolean appendMode) {
+        this.jdbcOptions = jdbcOptions;
+        this.executionOptions = executionOptions;
+        this.dmlOptions = dmlOptions;
+        this.tableSchema = tableSchema;
+        this.dialectName = dmlOptions.getDialect().dialectName();
+        this.appendMode = appendMode;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        validatePrimaryKey(requestedMode);
+        return ChangelogMode.newBuilder()
+                .addContainedKind(RowKind.INSERT)
+                .addContainedKind(RowKind.DELETE)
+                .addContainedKind(RowKind.UPDATE_AFTER)
+                .build();
+    }
+
+    private void validatePrimaryKey(ChangelogMode requestedMode) {
+        checkState(
+                ChangelogMode.insertOnly().equals(requestedMode)
+                        || dmlOptions.getKeyFields().isPresent() || appendMode,
+                "please declare primary key or appendMode for sink table when query contains update/delete record.");
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        final TypeInformation<RowData> rowDataTypeInformation =
+                context.createTypeInformation(tableSchema.toRowDataType());
+        final JdbcDynamicOutputFormatBuilder builder = new JdbcDynamicOutputFormatBuilder();
+
+        builder.setAppendMode(appendMode);
+        builder.setJdbcOptions(jdbcOptions);
+        builder.setJdbcDmlOptions(dmlOptions);
+        builder.setJdbcExecutionOptions(executionOptions);
+        builder.setRowDataTypeInfo(rowDataTypeInformation);
+        builder.setFieldDataTypes(tableSchema.getFieldDataTypes());
+
+        return SinkFunctionProvider.of(
+                new GenericJdbcSinkFunction<>(builder.build()), jdbcOptions.getParallelism());
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        return new JdbcDynamicTableSink(jdbcOptions, executionOptions, dmlOptions, tableSchema, appendMode);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "JDBC:" + dialectName;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof JdbcDynamicTableSink)) {
+            return false;
+        }
+        JdbcDynamicTableSink that = (JdbcDynamicTableSink) o;
+        return Objects.equals(jdbcOptions, that.jdbcOptions)
+                && Objects.equals(executionOptions, that.executionOptions)
+                && Objects.equals(dmlOptions, that.dmlOptions)
+                && Objects.equals(tableSchema, that.tableSchema)
+                && Objects.equals(dialectName, that.dialectName);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(jdbcOptions, executionOptions, dmlOptions, tableSchema, dialectName);
+    }
+}
\ No newline at end of file
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 73bf1dcdd..db56dcfee 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -502,6 +502,14 @@
   Source  : iceberg-flink-runtime-1.13 1.13.5 (Please note that the software have been modified.)
   License : https://github.com/apache/iceberg/LICENSE
 
+ 1.3.7 inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/AbstractJdbcDialect.java
+       inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
+       inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
+       inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
+       inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
+  Source  : org.apache.flink:flink-connector-jdbc_2.11:1.13.5 (Please note that the software have been modified.)
+  License : https://github.com/apache/flink/blob/master/LICENSE
+
 =======================================================================
 Apache InLong Subcomponents: