You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/16 08:43:48 UTC

[GitHub] [flink] wuchong commented on a change in pull request #12176: [FLINK-17029][jdbc]Introduce a new JDBC connector with new property keys

wuchong commented on a change in pull request #12176:
URL: https://github.com/apache/flink/pull/12176#discussion_r426125630



##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicInputFormat.java
##########
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.converter.JdbcToRowConverter;
+import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+
+/**
+ * InputFormat for {@link JdbcDynamicTableSource}.
+ */
+@Internal
+public class JdbcDynamicInputFormat extends RichInputFormat<RowData, InputSplit> implements ResultTypeQueryable<RowData> {

Review comment:
       Rename to `JdbcRowDataInputFormat`. `Dynamic` is a logical concept in SQL and we should avoid to use this word in runtime code. 

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicLookupFunction.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialects;
+import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A lookup function for {@link JdbcDynamicTableSource}.
+ */
+@Internal
+public class JdbcDynamicLookupFunction extends TableFunction<RowData> {

Review comment:
       Rename to `JdbcRowDataLookupFunction`.

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormat.java
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.executor.InsertOrUpdateJdbcExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * OutputFormat for {@link JdbcDynamicTableSource}.
+ */
+@Internal
+public class JdbcDynamicOutputFormat extends JdbcBatchingOutputFormat<RowData, RowData, JdbcBatchStatementExecutor<RowData>> {

Review comment:
       Rename to `JdbcRowDataOutputFormat`.

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicInputFormat.java
##########
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.converter.JdbcToRowConverter;
+import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+
+/**
+ * InputFormat for {@link JdbcDynamicTableSource}.
+ */
+@Internal
+public class JdbcDynamicInputFormat extends RichInputFormat<RowData, InputSplit> implements ResultTypeQueryable<RowData> {
+
+	private static final long serialVersionUID = 1L;
+	private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicInputFormat.class);
+
+	private JdbcConnectionOptions connectionOptions;
+	private int fetchSize;
+	private Boolean autoCommit;
+	private Object[][] parameterValues;
+	private String queryTemplate;
+	private int resultSetType;
+	private int resultSetConcurrency;
+	private JdbcToRowConverter rowConverter;
+
+	private transient Connection dbConn;
+	private transient PreparedStatement statement;
+	private transient ResultSet resultSet;
+	private transient boolean hasNext;
+
+	private JdbcDynamicInputFormat(
+		JdbcConnectionOptions connectionOptions,
+		int fetchSize,
+		Boolean autoCommit,
+		Object[][] parameterValues,
+		String queryTemplate,
+		int resultSetType,
+		int resultSetConcurrency,
+		JdbcToRowConverter rowConverter) {
+		this.connectionOptions = connectionOptions;
+		this.fetchSize = fetchSize;
+		this.autoCommit = autoCommit;
+		this.parameterValues = parameterValues;
+		this.queryTemplate = queryTemplate;
+		this.resultSetType = resultSetType;
+		this.resultSetConcurrency = resultSetConcurrency;
+		this.rowConverter = rowConverter;
+	}
+
+	@Override
+	public void configure(Configuration parameters) {
+		//do nothing here
+	}
+
+	@Override
+	public void openInputFormat() {
+		//called once per inputFormat (on open)
+		try {
+			dbConn = new SimpleJdbcConnectionProvider(connectionOptions).getConnection();
+			// set autoCommit mode only if it was explicitly configured.
+			// keep connection default otherwise.
+			if (autoCommit != null) {
+				dbConn.setAutoCommit(autoCommit);
+			}
+			statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
+			if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) {
+				statement.setFetchSize(fetchSize);
+			}
+		} catch (SQLException se) {
+			throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
+		} catch (ClassNotFoundException cnfe) {
+			throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe);
+		}
+	}
+
+	@Override
+	public void closeInputFormat() {
+		//called once per inputFormat (on close)
+		try {
+			if (statement != null) {
+				statement.close();
+			}
+		} catch (SQLException se) {
+			LOG.info("Inputformat Statement couldn't be closed - " + se.getMessage());
+		} finally {
+			statement = null;
+		}
+
+		try {
+			if (dbConn != null) {
+				dbConn.close();
+			}
+		} catch (SQLException se) {
+			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
+		} finally {
+			dbConn = null;
+		}
+
+		parameterValues = null;
+	}
+
+	/**
+	 * Connects to the source database and executes the query in a <b>parallel
+	 * fashion</b> if
+	 * this {@link InputFormat} is built using a parameterized query (i.e. using
+	 * a {@link PreparedStatement})
+	 * and a proper {@link JdbcParameterValuesProvider}, in a <b>non-parallel
+	 * fashion</b> otherwise.
+	 *
+	 * @param inputSplit which is ignored if this InputFormat is executed as a
+	 *                   non-parallel source,
+	 *                   a "hook" to the query parameters otherwise (using its
+	 *                   <i>splitNumber</i>)
+	 * @throws IOException if there's an error during the execution of the query
+	 */
+	@Override
+	public void open(InputSplit inputSplit) throws IOException {
+		try {
+			if (inputSplit != null && parameterValues != null) {
+				for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) {
+					Object param = parameterValues[inputSplit.getSplitNumber()][i];
+					if (param instanceof String) {
+						statement.setString(i + 1, (String) param);
+					} else if (param instanceof Long) {
+						statement.setLong(i + 1, (Long) param);
+					} else if (param instanceof Integer) {
+						statement.setInt(i + 1, (Integer) param);
+					} else if (param instanceof Double) {
+						statement.setDouble(i + 1, (Double) param);
+					} else if (param instanceof Boolean) {
+						statement.setBoolean(i + 1, (Boolean) param);
+					} else if (param instanceof Float) {
+						statement.setFloat(i + 1, (Float) param);
+					} else if (param instanceof BigDecimal) {
+						statement.setBigDecimal(i + 1, (BigDecimal) param);
+					} else if (param instanceof Byte) {
+						statement.setByte(i + 1, (Byte) param);
+					} else if (param instanceof Short) {
+						statement.setShort(i + 1, (Short) param);
+					} else if (param instanceof Date) {
+						statement.setDate(i + 1, (Date) param);
+					} else if (param instanceof Time) {
+						statement.setTime(i + 1, (Time) param);
+					} else if (param instanceof Timestamp) {
+						statement.setTimestamp(i + 1, (Timestamp) param);
+					} else if (param instanceof Array) {
+						statement.setArray(i + 1, (Array) param);
+					} else {
+						//extends with other types if needed
+						throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet).");
+					}
+				}
+				if (LOG.isDebugEnabled()) {
+					LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
+				}
+			}
+			resultSet = statement.executeQuery();
+			hasNext = resultSet.next();
+		} catch (SQLException se) {
+			throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
+		}
+	}
+
+	/**
+	 * Closes all resources used.
+	 *
+	 * @throws IOException Indicates that a resource could not be closed.
+	 */
+	@Override
+	public void close() throws IOException {
+		if (resultSet == null) {
+			return;
+		}
+		try {
+			resultSet.close();
+		} catch (SQLException se) {
+			LOG.info("Inputformat ResultSet couldn't be closed - " + se.getMessage());
+		}
+	}
+
+	@Override
+	public TypeInformation<RowData> getProducedType() {
+		return new GenericTypeInfo<RowData>(RowData.class);

Review comment:
       We can pass in the RowData TypeInformation as construct parameter which can be get from `ScanTableSource#Context#createTypeInformation`. 

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicLookupFunction.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialects;
+import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A lookup function for {@link JdbcDynamicTableSource}.
+ */
+@Internal
+public class JdbcDynamicLookupFunction extends TableFunction<RowData> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicLookupFunction.class);
+	private static final long serialVersionUID = 1L;
+
+	private final String query;
+	private final String drivername;
+	private final String dbURL;
+	private final String username;
+	private final String password;
+	private final DataType[] keyTypes;
+	private final int[] keySqlTypes;
+	private final long cacheMaxSize;
+	private final long cacheExpireMs;
+	private final int maxRetryTimes;
+	private final JdbcDialect jdbcDialect;
+	private final RowType rowType;
+
+	private transient Connection dbConn;
+	private transient PreparedStatement statement;
+	private transient Cache<RowData, List<RowData>> cache;
+
+	private JdbcDynamicLookupFunction(
+		JdbcOptions options, JdbcLookupOptions lookupOptions,
+		String[] fieldNames, DataType[] fieldTypes, String[] keyNames, RowType rowType) {

Review comment:
       Indent.

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormat.java
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.executor.InsertOrUpdateJdbcExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * OutputFormat for {@link JdbcDynamicTableSource}.
+ */
+@Internal
+public class JdbcDynamicOutputFormat extends JdbcBatchingOutputFormat<RowData, RowData, JdbcBatchStatementExecutor<RowData>> {
+	private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicOutputFormat.class);
+
+	private JdbcBatchStatementExecutor<RowData> deleteExecutor;
+	private final JdbcDmlOptions dmlOptions;
+	private final LogicalType[] logicalTypes;
+
+	private JdbcDynamicOutputFormat(JdbcConnectionProvider connectionProvider, JdbcDmlOptions dmlOptions, JdbcExecutionOptions batchOptions, TypeInformation<RowData> rowDataTypeInfo, LogicalType[] logicalTypes) {
+		super(connectionProvider, batchOptions, ctx -> createUpsertRowExecutor(dmlOptions, ctx, rowDataTypeInfo, logicalTypes), RecordExtractor.identity());
+		this.dmlOptions = dmlOptions;
+		this.logicalTypes = logicalTypes;
+	}
+
+	private JdbcDynamicOutputFormat(JdbcConnectionProvider connectionProvider, JdbcDmlOptions dmlOptions, JdbcExecutionOptions batchOptions, TypeInformation<RowData> rowDataTypeInfo, LogicalType[] logicalTypes, String sql) {
+		super(connectionProvider, batchOptions,
+			ctx -> createSimpleRowDataExecutor(dmlOptions.getDialect(), sql, logicalTypes, ctx, rowDataTypeInfo),
+			RecordExtractor.identity());
+		this.dmlOptions = dmlOptions;
+		this.logicalTypes = logicalTypes;
+	}
+
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		super.open(taskNumber, numTasks);
+		deleteExecutor = createDeleteExecutor();
+		try {
+			deleteExecutor.open(connection);
+		} catch (SQLException e) {
+			throw new IOException(e);
+		}
+	}
+
+	private JdbcBatchStatementExecutor<RowData> createDeleteExecutor() {
+		int[] pkFields = Arrays.stream(dmlOptions.getFieldNames()).mapToInt(Arrays.asList(dmlOptions.getFieldNames())::indexOf).toArray();
+		LogicalType[] pkTypes = Arrays.stream(pkFields).mapToObj(f -> logicalTypes[f]).toArray(LogicalType[]::new);
+		String deleteSql = dmlOptions.getDialect().getDeleteStatement(dmlOptions.getTableName(), dmlOptions.getFieldNames());
+		return createKeyedRowExecutor(dmlOptions.getDialect(), pkFields, pkTypes, deleteSql, logicalTypes);
+	}
+
+	@Override
+	protected void addToBatch(RowData original, RowData extracted) throws SQLException {
+		switch (original.getRowKind()) {
+			case DELETE:
+				deleteExecutor.addToBatch(extracted);
+				break;
+			case INSERT:
+			case UPDATE_AFTER:
+				super.addToBatch(original, extracted);
+				break;
+			default:
+				return;

Review comment:
       Remove this? We don't have other RowKinds for now. There is a warning on the `return`. 

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormat.java
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.executor.InsertOrUpdateJdbcExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * OutputFormat for {@link JdbcDynamicTableSource}.
+ */
+@Internal
+public class JdbcDynamicOutputFormat extends JdbcBatchingOutputFormat<RowData, RowData, JdbcBatchStatementExecutor<RowData>> {

Review comment:
       Add `serialVersionUID`.

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSink.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.OutputFormatProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+import static org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormat.DynamicOutputFormatBuilder;
+
+/**
+ * A {@link DynamicTableSink} for JDBC.
+ */
+@Internal
+public class JdbcDynamicTableSink implements DynamicTableSink {
+
+	private static final String name = "JdbcTableSink";
+	private final JdbcOptions jdbcOptions;
+	private final JdbcExecutionOptions executionOptions;
+	private final JdbcDmlOptions dmlOptions;
+	private final DataType rowDataType;
+	private final DataType[] fieldTypes;
+
+	public JdbcDynamicTableSink(JdbcOptions jdbcOptions, JdbcExecutionOptions executionOptions, JdbcDmlOptions dmlOptions, DataType rowDataType, DataType[] fieldTypes) {
+		this.jdbcOptions = jdbcOptions;
+		this.executionOptions = executionOptions;
+		this.dmlOptions = dmlOptions;
+		this.rowDataType = rowDataType;
+		this.fieldTypes = fieldTypes;
+	}
+
+	@Override
+	public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+		return ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.INSERT)
+			.addContainedKind(RowKind.DELETE)
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.build();
+	}
+
+	@Override
+	public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+		final TypeInformation<RowData> rowDataTypeInformation = (TypeInformation<RowData>) context.createTypeInformation(rowDataType);
+		final DynamicOutputFormatBuilder builder = JdbcDynamicOutputFormat.dynamicOutputFormatBuilder();
+
+		builder.setJdbcOptions(jdbcOptions);
+		builder.setJdbcDmlOptions(dmlOptions);
+		builder.setJdbcExecutionOptions(executionOptions);
+		builder.setRowDataTypeInfo(rowDataTypeInformation);
+		builder.setFieldDataTypes(fieldTypes);
+		return OutputFormatProvider.of(builder.build());
+	}
+
+	@Override
+	public DynamicTableSink copy() {
+		return new JdbcDynamicTableSink(jdbcOptions, executionOptions, dmlOptions, rowDataType, fieldTypes);
+	}
+
+	@Override
+	public String asSummaryString() {
+		return name;

Review comment:
       Would be better to return `"JDBC:" + dialectName`.

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormat.java
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.executor.InsertOrUpdateJdbcExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * OutputFormat for {@link JdbcDynamicTableSource}.
+ */
+@Internal
+public class JdbcDynamicOutputFormat extends JdbcBatchingOutputFormat<RowData, RowData, JdbcBatchStatementExecutor<RowData>> {
+	private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicOutputFormat.class);
+
+	private JdbcBatchStatementExecutor<RowData> deleteExecutor;
+	private final JdbcDmlOptions dmlOptions;
+	private final LogicalType[] logicalTypes;
+
+	private JdbcDynamicOutputFormat(JdbcConnectionProvider connectionProvider, JdbcDmlOptions dmlOptions, JdbcExecutionOptions batchOptions, TypeInformation<RowData> rowDataTypeInfo, LogicalType[] logicalTypes) {
+		super(connectionProvider, batchOptions, ctx -> createUpsertRowExecutor(dmlOptions, ctx, rowDataTypeInfo, logicalTypes), RecordExtractor.identity());
+		this.dmlOptions = dmlOptions;
+		this.logicalTypes = logicalTypes;
+	}
+
+	private JdbcDynamicOutputFormat(JdbcConnectionProvider connectionProvider, JdbcDmlOptions dmlOptions, JdbcExecutionOptions batchOptions, TypeInformation<RowData> rowDataTypeInfo, LogicalType[] logicalTypes, String sql) {
+		super(connectionProvider, batchOptions,
+			ctx -> createSimpleRowDataExecutor(dmlOptions.getDialect(), sql, logicalTypes, ctx, rowDataTypeInfo),
+			RecordExtractor.identity());
+		this.dmlOptions = dmlOptions;
+		this.logicalTypes = logicalTypes;
+	}
+
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		super.open(taskNumber, numTasks);
+		deleteExecutor = createDeleteExecutor();
+		try {
+			deleteExecutor.open(connection);
+		} catch (SQLException e) {
+			throw new IOException(e);
+		}
+	}
+
+	private JdbcBatchStatementExecutor<RowData> createDeleteExecutor() {
+		int[] pkFields = Arrays.stream(dmlOptions.getFieldNames()).mapToInt(Arrays.asList(dmlOptions.getFieldNames())::indexOf).toArray();
+		LogicalType[] pkTypes = Arrays.stream(pkFields).mapToObj(f -> logicalTypes[f]).toArray(LogicalType[]::new);
+		String deleteSql = dmlOptions.getDialect().getDeleteStatement(dmlOptions.getTableName(), dmlOptions.getFieldNames());
+		return createKeyedRowExecutor(dmlOptions.getDialect(), pkFields, pkTypes, deleteSql, logicalTypes);
+	}
+
+	@Override
+	protected void addToBatch(RowData original, RowData extracted) throws SQLException {
+		switch (original.getRowKind()) {
+			case DELETE:
+				deleteExecutor.addToBatch(extracted);
+				break;
+			case INSERT:
+			case UPDATE_AFTER:
+				super.addToBatch(original, extracted);
+				break;
+			default:
+				return;
+		}
+	}
+
+	@Override
+	public synchronized void close() {
+		try {
+			super.close();
+		} finally {
+			try {
+				if (deleteExecutor != null) {
+					deleteExecutor.close();
+				}
+			} catch (SQLException e) {
+				LOG.warn("unable to close delete statement runner", e);
+			}
+		}
+	}
+
+	@Override
+	protected void attemptFlush() throws SQLException {
+		super.attemptFlush();
+		deleteExecutor.executeBatch();
+	}
+
+	private static JdbcBatchStatementExecutor<RowData> createKeyedRowExecutor(JdbcDialect dialect, int[] pkFields, LogicalType[] pkTypes, String sql, LogicalType[] logicalTypes) {
+		return JdbcBatchStatementExecutor.keyed(
+			sql,
+			createRowKeyExtractor(pkFields, logicalTypes),
+			(st, record) -> dialect
+				.getOutputConverter(logicalTypes)
+				.toExternal(createRowKeyExtractor(pkFields, logicalTypes).apply(record), st));
+	}
+
+	private static JdbcBatchStatementExecutor<RowData> createUpsertRowExecutor(JdbcDmlOptions opt, RuntimeContext ctx, TypeInformation<RowData> rowDataTypeInfo, LogicalType[] logicalTypes) {
+		checkArgument(opt.getKeyFields().isPresent());
+
+		int[] pkFields = Arrays.stream(opt.getKeyFields().get()).mapToInt(Arrays.asList(opt.getFieldNames())::indexOf).toArray();
+		LogicalType[] pkTypes = Arrays.stream(pkFields).mapToObj(f -> logicalTypes[f]).toArray(LogicalType[]::new);
+		JdbcDialect dialect = opt.getDialect();
+		final TypeSerializer<RowData> typeSerializer = rowDataTypeInfo.createSerializer(ctx.getExecutionConfig());
+		return opt.getDialect()
+			.getUpsertStatement(opt.getTableName(), opt.getFieldNames(), opt.getKeyFields().get())
+			.map(sql -> createSimpleRowDataExecutor(dialect, sql, logicalTypes, ctx, rowDataTypeInfo))
+			.orElseGet(() ->
+				new InsertOrUpdateJdbcExecutor<>(
+					opt.getDialect().getRowExistsStatement(opt.getTableName(), opt.getKeyFields().get()),
+					opt.getDialect().getInsertIntoStatement(opt.getTableName(), opt.getFieldNames()),
+					opt.getDialect().getUpdateStatement(opt.getTableName(), opt.getFieldNames(), opt.getKeyFields().get()),
+					createRowDataJdbcStatementBuilder(dialect, pkTypes),
+					createRowDataJdbcStatementBuilder(dialect, logicalTypes),
+					createRowDataJdbcStatementBuilder(dialect, logicalTypes),
+					createRowKeyExtractor(pkFields, logicalTypes),
+					ctx.getExecutionConfig().isObjectReuseEnabled() ? typeSerializer::copy : r -> r));
+	}
+
+	private static Function<RowData, RowData> createRowKeyExtractor(int[] pkFields, LogicalType[] logicalTypes) {
+		return row -> getPrimaryKey(row, pkFields, logicalTypes);
+	}
+
+	private static JdbcBatchStatementExecutor<RowData> createSimpleRowDataExecutor(JdbcDialect dialect, String sql, LogicalType[] fieldTypes, RuntimeContext ctx, TypeInformation<RowData> rowDataTypeInfo) {
+		final TypeSerializer<RowData> typeSerializer = rowDataTypeInfo.createSerializer(ctx.getExecutionConfig());
+		return JdbcBatchStatementExecutor.simple(
+			sql,
+			createRowDataJdbcStatementBuilder(dialect, fieldTypes),
+			ctx.getExecutionConfig().isObjectReuseEnabled() ? typeSerializer::copy : Function.identity());
+	}
+
+	/**
+	 * Creates a {@link JdbcStatementBuilder} for {@link Row} using the provided SQL types array.
+	 * Uses {@link JdbcUtils#setRecordToStatement}
+	 */
+	private static JdbcStatementBuilder<RowData> createRowDataJdbcStatementBuilder(JdbcDialect dialect, LogicalType[] types) {
+		return (st, record) -> dialect.getOutputConverter(types).toExternal(record, st);
+	}
+
+	private static RowData getPrimaryKey(RowData row, int[] pkFields, LogicalType[] logicalTypes) {
+		GenericRowData pkRow = new GenericRowData(pkFields.length);
+		for (int i = 0; i < pkFields.length; i++) {
+			pkRow.setField(i, RowData.get(row, pkFields[i], logicalTypes[pkFields[i]]));

Review comment:
       Please use RowData#FieldGetter.

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormat.java
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.executor.InsertOrUpdateJdbcExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * OutputFormat for {@link JdbcDynamicTableSource}.
+ */
+@Internal
+public class JdbcDynamicOutputFormat extends JdbcBatchingOutputFormat<RowData, RowData, JdbcBatchStatementExecutor<RowData>> {
+	private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicOutputFormat.class);
+
+	private JdbcBatchStatementExecutor<RowData> deleteExecutor;
+	private final JdbcDmlOptions dmlOptions;
+	private final LogicalType[] logicalTypes;
+
+	private JdbcDynamicOutputFormat(JdbcConnectionProvider connectionProvider, JdbcDmlOptions dmlOptions, JdbcExecutionOptions batchOptions, TypeInformation<RowData> rowDataTypeInfo, LogicalType[] logicalTypes) {
+		super(connectionProvider, batchOptions, ctx -> createUpsertRowExecutor(dmlOptions, ctx, rowDataTypeInfo, logicalTypes), RecordExtractor.identity());
+		this.dmlOptions = dmlOptions;
+		this.logicalTypes = logicalTypes;
+	}
+
+	private JdbcDynamicOutputFormat(JdbcConnectionProvider connectionProvider, JdbcDmlOptions dmlOptions, JdbcExecutionOptions batchOptions, TypeInformation<RowData> rowDataTypeInfo, LogicalType[] logicalTypes, String sql) {
+		super(connectionProvider, batchOptions,
+			ctx -> createSimpleRowDataExecutor(dmlOptions.getDialect(), sql, logicalTypes, ctx, rowDataTypeInfo),
+			RecordExtractor.identity());
+		this.dmlOptions = dmlOptions;
+		this.logicalTypes = logicalTypes;
+	}
+
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		super.open(taskNumber, numTasks);
+		deleteExecutor = createDeleteExecutor();
+		try {
+			deleteExecutor.open(connection);
+		} catch (SQLException e) {
+			throw new IOException(e);
+		}
+	}
+
+	private JdbcBatchStatementExecutor<RowData> createDeleteExecutor() {
+		int[] pkFields = Arrays.stream(dmlOptions.getFieldNames()).mapToInt(Arrays.asList(dmlOptions.getFieldNames())::indexOf).toArray();
+		LogicalType[] pkTypes = Arrays.stream(pkFields).mapToObj(f -> logicalTypes[f]).toArray(LogicalType[]::new);
+		String deleteSql = dmlOptions.getDialect().getDeleteStatement(dmlOptions.getTableName(), dmlOptions.getFieldNames());
+		return createKeyedRowExecutor(dmlOptions.getDialect(), pkFields, pkTypes, deleteSql, logicalTypes);
+	}
+
+	@Override
+	protected void addToBatch(RowData original, RowData extracted) throws SQLException {
+		switch (original.getRowKind()) {
+			case DELETE:
+				deleteExecutor.addToBatch(extracted);
+				break;
+			case INSERT:
+			case UPDATE_AFTER:
+				super.addToBatch(original, extracted);
+				break;
+			default:
+				return;
+		}
+	}
+
+	@Override
+	public synchronized void close() {
+		try {
+			super.close();
+		} finally {
+			try {
+				if (deleteExecutor != null) {
+					deleteExecutor.close();
+				}
+			} catch (SQLException e) {
+				LOG.warn("unable to close delete statement runner", e);
+			}
+		}
+	}
+
+	@Override
+	protected void attemptFlush() throws SQLException {
+		super.attemptFlush();
+		deleteExecutor.executeBatch();
+	}
+
+	private static JdbcBatchStatementExecutor<RowData> createKeyedRowExecutor(JdbcDialect dialect, int[] pkFields, LogicalType[] pkTypes, String sql, LogicalType[] logicalTypes) {
+		return JdbcBatchStatementExecutor.keyed(
+			sql,
+			createRowKeyExtractor(pkFields, logicalTypes),
+			(st, record) -> dialect
+				.getOutputConverter(logicalTypes)

Review comment:
       Hot path! 

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormat.java
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.executor.InsertOrUpdateJdbcExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * OutputFormat for {@link JdbcDynamicTableSource}.
+ */
+@Internal
+public class JdbcDynamicOutputFormat extends JdbcBatchingOutputFormat<RowData, RowData, JdbcBatchStatementExecutor<RowData>> {
+	private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicOutputFormat.class);
+
+	private JdbcBatchStatementExecutor<RowData> deleteExecutor;
+	private final JdbcDmlOptions dmlOptions;
+	private final LogicalType[] logicalTypes;
+
+	private JdbcDynamicOutputFormat(JdbcConnectionProvider connectionProvider, JdbcDmlOptions dmlOptions, JdbcExecutionOptions batchOptions, TypeInformation<RowData> rowDataTypeInfo, LogicalType[] logicalTypes) {
+		super(connectionProvider, batchOptions, ctx -> createUpsertRowExecutor(dmlOptions, ctx, rowDataTypeInfo, logicalTypes), RecordExtractor.identity());
+		this.dmlOptions = dmlOptions;
+		this.logicalTypes = logicalTypes;
+	}
+
+	private JdbcDynamicOutputFormat(JdbcConnectionProvider connectionProvider, JdbcDmlOptions dmlOptions, JdbcExecutionOptions batchOptions, TypeInformation<RowData> rowDataTypeInfo, LogicalType[] logicalTypes, String sql) {
+		super(connectionProvider, batchOptions,
+			ctx -> createSimpleRowDataExecutor(dmlOptions.getDialect(), sql, logicalTypes, ctx, rowDataTypeInfo),
+			RecordExtractor.identity());
+		this.dmlOptions = dmlOptions;
+		this.logicalTypes = logicalTypes;
+	}
+
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		super.open(taskNumber, numTasks);
+		deleteExecutor = createDeleteExecutor();
+		try {
+			deleteExecutor.open(connection);
+		} catch (SQLException e) {
+			throw new IOException(e);
+		}
+	}
+
+	private JdbcBatchStatementExecutor<RowData> createDeleteExecutor() {
+		int[] pkFields = Arrays.stream(dmlOptions.getFieldNames()).mapToInt(Arrays.asList(dmlOptions.getFieldNames())::indexOf).toArray();
+		LogicalType[] pkTypes = Arrays.stream(pkFields).mapToObj(f -> logicalTypes[f]).toArray(LogicalType[]::new);
+		String deleteSql = dmlOptions.getDialect().getDeleteStatement(dmlOptions.getTableName(), dmlOptions.getFieldNames());
+		return createKeyedRowExecutor(dmlOptions.getDialect(), pkFields, pkTypes, deleteSql, logicalTypes);
+	}
+
+	@Override
+	protected void addToBatch(RowData original, RowData extracted) throws SQLException {
+		switch (original.getRowKind()) {
+			case DELETE:
+				deleteExecutor.addToBatch(extracted);
+				break;
+			case INSERT:
+			case UPDATE_AFTER:

Review comment:
       Move this before DELETE, because insertions are more than deletions. 

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -43,11 +45,18 @@
 	boolean canHandle(String url);
 
 	/**
-	 * Get a row converter for the database according to the given row type.
+	 * Get converter that convert the jdbc object to flink internal object according to the given row type.
 	 * @param rowType the given row type
 	 * @return a row converter for the database
 	 */
-	JdbcRowConverter getRowConverter(RowType rowType);
+	JdbcToRowConverter getInputConverter(RowType rowType);
+
+	/**
+	 * Get converter that convert the flink internal object to jdbc object according to the given jdbc type.
+	 * @param jdbcTypes the given jdbc type
+	 * @return a row converter for the database
+	 */
+	RowToJdbcConverter getOutputConverter(LogicalType[] jdbcTypes);

Review comment:
       Could we merge these two interface into one? IMO, a dialect must implement both of them. 

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialects;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.TableSchemaUtils;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Factory for creating configured instances of {@link JdbcDynamicTableSource}
+ * and {@link JdbcDynamicTableSink}.
+ */
+@Internal
+public class JdbcDynamicTableSourceSinkFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+	public static final ConfigOption<String> IDENTIFIER = ConfigOptions
+		.key("connector")
+		.stringType()
+		.defaultValue("jdbc")
+		.withDescription("-- required: specify this table type is jdbc.");
+	public static final ConfigOption<String> URL = ConfigOptions
+		.key("url")
+		.stringType()
+		.noDefaultValue()
+		.withDescription("-- required: the jdbc database url.");
+	public static final ConfigOption<String> TABLE = ConfigOptions
+		.key("table")

Review comment:
       Use `table-name`, see https://issues.apache.org/jira/browse/FLINK-17029
   Please update other option keys according to the JIRA description. 

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicLookupFunction.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialects;
+import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A lookup function for {@link JdbcDynamicTableSource}.
+ */
+@Internal
+public class JdbcDynamicLookupFunction extends TableFunction<RowData> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicLookupFunction.class);
+	private static final long serialVersionUID = 1L;
+
+	private final String query;
+	private final String drivername;
+	private final String dbURL;
+	private final String username;
+	private final String password;
+	private final DataType[] keyTypes;
+	private final int[] keySqlTypes;
+	private final long cacheMaxSize;
+	private final long cacheExpireMs;
+	private final int maxRetryTimes;
+	private final JdbcDialect jdbcDialect;
+	private final RowType rowType;
+
+	private transient Connection dbConn;
+	private transient PreparedStatement statement;
+	private transient Cache<RowData, List<RowData>> cache;
+
+	private JdbcDynamicLookupFunction(
+		JdbcOptions options, JdbcLookupOptions lookupOptions,
+		String[] fieldNames, DataType[] fieldTypes, String[] keyNames, RowType rowType) {
+		this.drivername = options.getDriverName();
+		this.dbURL = options.getDbURL();
+		this.username = options.getUsername().orElse(null);
+		this.password = options.getPassword().orElse(null);
+		List<String> nameList = Arrays.asList(fieldNames);
+		this.keyTypes = Arrays.stream(keyNames)
+			.map(s -> {
+				checkArgument(nameList.contains(s),
+					"keyName %s can't find in fieldNames %s.", s, nameList);
+				return fieldTypes[nameList.indexOf(s)];
+			})
+			.toArray(DataType[]::new);
+		this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+		this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+		this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+		this.keySqlTypes = Arrays.stream(keyTypes)
+			.map(TypeConversions::fromDataTypeToLegacyInfo)
+			.mapToInt(JdbcTypeUtil::typeInformationToSqlType).toArray();
+		this.query = options.getDialect().getSelectFromStatement(
+			options.getTableName(), fieldNames, keyNames);
+		this.jdbcDialect = JdbcDialects.get(dbURL)
+			.orElseThrow(() -> new UnsupportedOperationException(String.format("Unknown dbUrl:%s", dbURL)));
+		this.rowType = rowType;
+	}
+
+	public static Builder builder() {
+		return new Builder();
+	}
+
+	@Override
+	public void open(FunctionContext context) throws Exception {
+		try {
+			establishConnection();
+			statement = dbConn.prepareStatement(query);
+			this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null : CacheBuilder.newBuilder()
+				.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+				.maximumSize(cacheMaxSize)
+				.build();
+		} catch (SQLException sqe) {
+			throw new IllegalArgumentException("open() failed.", sqe);
+		} catch (ClassNotFoundException cnfe) {
+			throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
+		}
+	}
+
+	public void eval(Object... keys) {
+		RowData keyRow = GenericRowData.of(keys);
+		if (cache != null) {
+			List<RowData> cachedRows = cache.getIfPresent(keyRow);
+			if (cachedRows != null) {
+				for (RowData cachedRow : cachedRows) {
+					collect(cachedRow);
+				}
+				return;
+			}
+		}
+
+		for (int retry = 1; retry <= maxRetryTimes; retry++) {
+			try {
+				statement.clearParameters();
+				for (int i = 0; i < keys.length; i++) {
+					JdbcUtils.setField(statement, keySqlTypes[i], keys[i], i);
+				}
+				try (ResultSet resultSet = statement.executeQuery()) {
+					if (cache == null) {
+						while (resultSet.next()) {
+							collect(convertToRowFromResultSet(resultSet));
+						}
+					} else {
+						ArrayList<RowData> rows = new ArrayList<>();
+						while (resultSet.next()) {
+							RowData row = convertToRowFromResultSet(resultSet);
+							rows.add(row);
+							collect(row);
+						}
+						rows.trimToSize();
+						cache.put(keyRow, rows);
+					}
+				}
+				break;
+			} catch (SQLException e) {
+				LOG.error(String.format("JDBC executeBatch error, retry times = %d", retry), e);
+				if (retry >= maxRetryTimes) {
+					throw new RuntimeException("Execution of JDBC statement failed.", e);
+				}
+
+				try {
+					Thread.sleep(1000 * retry);
+				} catch (InterruptedException e1) {
+					throw new RuntimeException(e1);
+				}
+			}
+		}
+	}
+
+	private RowData convertToRowFromResultSet(ResultSet resultSet) throws SQLException {
+		return jdbcDialect.getInputConverter(rowType).toInternal(resultSet);

Review comment:
       This is a hot path, we shouldn't call `getInputConverter` for every record which is a very heavy operation. 

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicInputFormat.java
##########
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.converter.JdbcToRowConverter;
+import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+
+/**
+ * InputFormat for {@link JdbcDynamicTableSource}.
+ */
+@Internal
+public class JdbcDynamicInputFormat extends RichInputFormat<RowData, InputSplit> implements ResultTypeQueryable<RowData> {

Review comment:
       Can we reuse with `JdbcInputFormat`?

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicInputFormat.java
##########
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.converter.JdbcToRowConverter;
+import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+
+/**
+ * InputFormat for {@link JdbcDynamicTableSource}.
+ */
+@Internal
+public class JdbcDynamicInputFormat extends RichInputFormat<RowData, InputSplit> implements ResultTypeQueryable<RowData> {
+
+	private static final long serialVersionUID = 1L;
+	private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicInputFormat.class);
+
+	private JdbcConnectionOptions connectionOptions;
+	private int fetchSize;
+	private Boolean autoCommit;
+	private Object[][] parameterValues;
+	private String queryTemplate;
+	private int resultSetType;
+	private int resultSetConcurrency;
+	private JdbcToRowConverter rowConverter;
+
+	private transient Connection dbConn;
+	private transient PreparedStatement statement;
+	private transient ResultSet resultSet;
+	private transient boolean hasNext;
+
+	private JdbcDynamicInputFormat(
+		JdbcConnectionOptions connectionOptions,
+		int fetchSize,
+		Boolean autoCommit,
+		Object[][] parameterValues,
+		String queryTemplate,
+		int resultSetType,
+		int resultSetConcurrency,
+		JdbcToRowConverter rowConverter) {

Review comment:
       Indent. 

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicLookupFunction.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialects;
+import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A lookup function for {@link JdbcDynamicTableSource}.
+ */
+@Internal
+public class JdbcDynamicLookupFunction extends TableFunction<RowData> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicLookupFunction.class);
+	private static final long serialVersionUID = 1L;
+
+	private final String query;
+	private final String drivername;
+	private final String dbURL;
+	private final String username;
+	private final String password;
+	private final DataType[] keyTypes;
+	private final int[] keySqlTypes;
+	private final long cacheMaxSize;
+	private final long cacheExpireMs;
+	private final int maxRetryTimes;
+	private final JdbcDialect jdbcDialect;
+	private final RowType rowType;
+
+	private transient Connection dbConn;
+	private transient PreparedStatement statement;
+	private transient Cache<RowData, List<RowData>> cache;
+
+	private JdbcDynamicLookupFunction(
+		JdbcOptions options, JdbcLookupOptions lookupOptions,
+		String[] fieldNames, DataType[] fieldTypes, String[] keyNames, RowType rowType) {
+		this.drivername = options.getDriverName();
+		this.dbURL = options.getDbURL();
+		this.username = options.getUsername().orElse(null);
+		this.password = options.getPassword().orElse(null);
+		List<String> nameList = Arrays.asList(fieldNames);
+		this.keyTypes = Arrays.stream(keyNames)
+			.map(s -> {
+				checkArgument(nameList.contains(s),
+					"keyName %s can't find in fieldNames %s.", s, nameList);
+				return fieldTypes[nameList.indexOf(s)];
+			})
+			.toArray(DataType[]::new);
+		this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+		this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+		this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+		this.keySqlTypes = Arrays.stream(keyTypes)
+			.map(TypeConversions::fromDataTypeToLegacyInfo)
+			.mapToInt(JdbcTypeUtil::typeInformationToSqlType).toArray();
+		this.query = options.getDialect().getSelectFromStatement(
+			options.getTableName(), fieldNames, keyNames);
+		this.jdbcDialect = JdbcDialects.get(dbURL)
+			.orElseThrow(() -> new UnsupportedOperationException(String.format("Unknown dbUrl:%s", dbURL)));
+		this.rowType = rowType;
+	}
+
+	public static Builder builder() {
+		return new Builder();
+	}
+
+	@Override
+	public void open(FunctionContext context) throws Exception {
+		try {
+			establishConnection();
+			statement = dbConn.prepareStatement(query);
+			this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null : CacheBuilder.newBuilder()
+				.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+				.maximumSize(cacheMaxSize)
+				.build();
+		} catch (SQLException sqe) {
+			throw new IllegalArgumentException("open() failed.", sqe);
+		} catch (ClassNotFoundException cnfe) {
+			throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
+		}
+	}
+
+	public void eval(Object... keys) {
+		RowData keyRow = GenericRowData.of(keys);
+		if (cache != null) {
+			List<RowData> cachedRows = cache.getIfPresent(keyRow);
+			if (cachedRows != null) {
+				for (RowData cachedRow : cachedRows) {
+					collect(cachedRow);
+				}
+				return;
+			}
+		}
+
+		for (int retry = 1; retry <= maxRetryTimes; retry++) {
+			try {
+				statement.clearParameters();
+				for (int i = 0; i < keys.length; i++) {
+					JdbcUtils.setField(statement, keySqlTypes[i], keys[i], i);

Review comment:
       Why not use the jdbc converter from dialect?

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicLookupFunction.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialects;
+import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A lookup function for {@link JdbcDynamicTableSource}.
+ */
+@Internal
+public class JdbcDynamicLookupFunction extends TableFunction<RowData> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicLookupFunction.class);
+	private static final long serialVersionUID = 1L;
+
+	private final String query;
+	private final String drivername;
+	private final String dbURL;
+	private final String username;
+	private final String password;
+	private final DataType[] keyTypes;
+	private final int[] keySqlTypes;
+	private final long cacheMaxSize;
+	private final long cacheExpireMs;
+	private final int maxRetryTimes;
+	private final JdbcDialect jdbcDialect;
+	private final RowType rowType;
+
+	private transient Connection dbConn;
+	private transient PreparedStatement statement;
+	private transient Cache<RowData, List<RowData>> cache;
+
+	private JdbcDynamicLookupFunction(
+		JdbcOptions options, JdbcLookupOptions lookupOptions,
+		String[] fieldNames, DataType[] fieldTypes, String[] keyNames, RowType rowType) {
+		this.drivername = options.getDriverName();
+		this.dbURL = options.getDbURL();
+		this.username = options.getUsername().orElse(null);
+		this.password = options.getPassword().orElse(null);
+		List<String> nameList = Arrays.asList(fieldNames);
+		this.keyTypes = Arrays.stream(keyNames)
+			.map(s -> {
+				checkArgument(nameList.contains(s),
+					"keyName %s can't find in fieldNames %s.", s, nameList);
+				return fieldTypes[nameList.indexOf(s)];
+			})
+			.toArray(DataType[]::new);
+		this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+		this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+		this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+		this.keySqlTypes = Arrays.stream(keyTypes)
+			.map(TypeConversions::fromDataTypeToLegacyInfo)
+			.mapToInt(JdbcTypeUtil::typeInformationToSqlType).toArray();
+		this.query = options.getDialect().getSelectFromStatement(
+			options.getTableName(), fieldNames, keyNames);
+		this.jdbcDialect = JdbcDialects.get(dbURL)
+			.orElseThrow(() -> new UnsupportedOperationException(String.format("Unknown dbUrl:%s", dbURL)));
+		this.rowType = rowType;
+	}
+
+	public static Builder builder() {
+		return new Builder();
+	}
+
+	@Override
+	public void open(FunctionContext context) throws Exception {
+		try {
+			establishConnection();
+			statement = dbConn.prepareStatement(query);
+			this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null : CacheBuilder.newBuilder()
+				.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+				.maximumSize(cacheMaxSize)
+				.build();
+		} catch (SQLException sqe) {
+			throw new IllegalArgumentException("open() failed.", sqe);
+		} catch (ClassNotFoundException cnfe) {
+			throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
+		}
+	}
+
+	public void eval(Object... keys) {

Review comment:
       Add a Javadoc on this method. This is a lookup method which is called by Flink framework in runtime. 

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicLookupFunction.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialects;
+import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A lookup function for {@link JdbcDynamicTableSource}.
+ */
+@Internal
+public class JdbcDynamicLookupFunction extends TableFunction<RowData> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicLookupFunction.class);
+	private static final long serialVersionUID = 1L;
+
+	private final String query;
+	private final String drivername;
+	private final String dbURL;
+	private final String username;
+	private final String password;
+	private final DataType[] keyTypes;
+	private final int[] keySqlTypes;
+	private final long cacheMaxSize;
+	private final long cacheExpireMs;
+	private final int maxRetryTimes;
+	private final JdbcDialect jdbcDialect;
+	private final RowType rowType;
+
+	private transient Connection dbConn;
+	private transient PreparedStatement statement;
+	private transient Cache<RowData, List<RowData>> cache;
+
+	private JdbcDynamicLookupFunction(
+		JdbcOptions options, JdbcLookupOptions lookupOptions,
+		String[] fieldNames, DataType[] fieldTypes, String[] keyNames, RowType rowType) {
+		this.drivername = options.getDriverName();
+		this.dbURL = options.getDbURL();
+		this.username = options.getUsername().orElse(null);
+		this.password = options.getPassword().orElse(null);
+		List<String> nameList = Arrays.asList(fieldNames);
+		this.keyTypes = Arrays.stream(keyNames)
+			.map(s -> {
+				checkArgument(nameList.contains(s),
+					"keyName %s can't find in fieldNames %s.", s, nameList);
+				return fieldTypes[nameList.indexOf(s)];
+			})
+			.toArray(DataType[]::new);
+		this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+		this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+		this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+		this.keySqlTypes = Arrays.stream(keyTypes)
+			.map(TypeConversions::fromDataTypeToLegacyInfo)
+			.mapToInt(JdbcTypeUtil::typeInformationToSqlType).toArray();
+		this.query = options.getDialect().getSelectFromStatement(
+			options.getTableName(), fieldNames, keyNames);
+		this.jdbcDialect = JdbcDialects.get(dbURL)
+			.orElseThrow(() -> new UnsupportedOperationException(String.format("Unknown dbUrl:%s", dbURL)));
+		this.rowType = rowType;
+	}
+
+	public static Builder builder() {
+		return new Builder();
+	}
+
+	@Override
+	public void open(FunctionContext context) throws Exception {
+		try {
+			establishConnection();
+			statement = dbConn.prepareStatement(query);
+			this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null : CacheBuilder.newBuilder()
+				.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+				.maximumSize(cacheMaxSize)
+				.build();
+		} catch (SQLException sqe) {
+			throw new IllegalArgumentException("open() failed.", sqe);
+		} catch (ClassNotFoundException cnfe) {
+			throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
+		}
+	}
+
+	public void eval(Object... keys) {
+		RowData keyRow = GenericRowData.of(keys);
+		if (cache != null) {
+			List<RowData> cachedRows = cache.getIfPresent(keyRow);
+			if (cachedRows != null) {
+				for (RowData cachedRow : cachedRows) {
+					collect(cachedRow);
+				}
+				return;
+			}
+		}
+
+		for (int retry = 1; retry <= maxRetryTimes; retry++) {
+			try {
+				statement.clearParameters();
+				for (int i = 0; i < keys.length; i++) {
+					JdbcUtils.setField(statement, keySqlTypes[i], keys[i], i);
+				}
+				try (ResultSet resultSet = statement.executeQuery()) {
+					if (cache == null) {
+						while (resultSet.next()) {
+							collect(convertToRowFromResultSet(resultSet));
+						}
+					} else {
+						ArrayList<RowData> rows = new ArrayList<>();
+						while (resultSet.next()) {
+							RowData row = convertToRowFromResultSet(resultSet);
+							rows.add(row);
+							collect(row);
+						}
+						rows.trimToSize();
+						cache.put(keyRow, rows);
+					}
+				}
+				break;
+			} catch (SQLException e) {
+				LOG.error(String.format("JDBC executeBatch error, retry times = %d", retry), e);
+				if (retry >= maxRetryTimes) {
+					throw new RuntimeException("Execution of JDBC statement failed.", e);
+				}
+
+				try {
+					Thread.sleep(1000 * retry);
+				} catch (InterruptedException e1) {
+					throw new RuntimeException(e1);
+				}
+			}
+		}
+	}
+
+	private RowData convertToRowFromResultSet(ResultSet resultSet) throws SQLException {
+		return jdbcDialect.getInputConverter(rowType).toInternal(resultSet);
+	}
+
+	private void establishConnection() throws SQLException, ClassNotFoundException {
+		Class.forName(drivername);
+		if (username == null) {
+			dbConn = DriverManager.getConnection(dbURL);
+		} else {
+			dbConn = DriverManager.getConnection(dbURL, username, password);
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		if (cache != null) {
+			cache.cleanUp();
+			cache = null;
+		}
+		if (statement != null) {
+			try {
+				statement.close();
+			} catch (SQLException e) {
+				LOG.info("JDBC statement could not be closed: " + e.getMessage());
+			} finally {
+				statement = null;
+			}
+		}
+
+		if (dbConn != null) {
+			try {
+				dbConn.close();
+			} catch (SQLException se) {
+				LOG.info("JDBC connection could not be closed: " + se.getMessage());
+			} finally {
+				dbConn = null;
+			}
+		}
+	}
+
+	/**
+	 * Builder for a {@link JdbcDynamicLookupFunction}.
+	 */
+	public static class Builder {

Review comment:
       We don't need a Builder for this lookup function. Because this is an internal class which is only be called by factory. 

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormat.java
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.executor.InsertOrUpdateJdbcExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * OutputFormat for {@link JdbcDynamicTableSource}.
+ */
+@Internal
+public class JdbcDynamicOutputFormat extends JdbcBatchingOutputFormat<RowData, RowData, JdbcBatchStatementExecutor<RowData>> {
+	private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicOutputFormat.class);
+
+	private JdbcBatchStatementExecutor<RowData> deleteExecutor;
+	private final JdbcDmlOptions dmlOptions;
+	private final LogicalType[] logicalTypes;
+
+	private JdbcDynamicOutputFormat(JdbcConnectionProvider connectionProvider, JdbcDmlOptions dmlOptions, JdbcExecutionOptions batchOptions, TypeInformation<RowData> rowDataTypeInfo, LogicalType[] logicalTypes) {
+		super(connectionProvider, batchOptions, ctx -> createUpsertRowExecutor(dmlOptions, ctx, rowDataTypeInfo, logicalTypes), RecordExtractor.identity());
+		this.dmlOptions = dmlOptions;
+		this.logicalTypes = logicalTypes;
+	}
+
+	private JdbcDynamicOutputFormat(JdbcConnectionProvider connectionProvider, JdbcDmlOptions dmlOptions, JdbcExecutionOptions batchOptions, TypeInformation<RowData> rowDataTypeInfo, LogicalType[] logicalTypes, String sql) {
+		super(connectionProvider, batchOptions,
+			ctx -> createSimpleRowDataExecutor(dmlOptions.getDialect(), sql, logicalTypes, ctx, rowDataTypeInfo),
+			RecordExtractor.identity());
+		this.dmlOptions = dmlOptions;
+		this.logicalTypes = logicalTypes;
+	}
+
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		super.open(taskNumber, numTasks);
+		deleteExecutor = createDeleteExecutor();
+		try {
+			deleteExecutor.open(connection);
+		} catch (SQLException e) {
+			throw new IOException(e);
+		}
+	}
+
+	private JdbcBatchStatementExecutor<RowData> createDeleteExecutor() {
+		int[] pkFields = Arrays.stream(dmlOptions.getFieldNames()).mapToInt(Arrays.asList(dmlOptions.getFieldNames())::indexOf).toArray();
+		LogicalType[] pkTypes = Arrays.stream(pkFields).mapToObj(f -> logicalTypes[f]).toArray(LogicalType[]::new);
+		String deleteSql = dmlOptions.getDialect().getDeleteStatement(dmlOptions.getTableName(), dmlOptions.getFieldNames());
+		return createKeyedRowExecutor(dmlOptions.getDialect(), pkFields, pkTypes, deleteSql, logicalTypes);
+	}
+
+	@Override
+	protected void addToBatch(RowData original, RowData extracted) throws SQLException {
+		switch (original.getRowKind()) {
+			case DELETE:
+				deleteExecutor.addToBatch(extracted);
+				break;
+			case INSERT:
+			case UPDATE_AFTER:
+				super.addToBatch(original, extracted);
+				break;
+			default:
+				return;
+		}
+	}
+
+	@Override
+	public synchronized void close() {
+		try {
+			super.close();
+		} finally {
+			try {
+				if (deleteExecutor != null) {
+					deleteExecutor.close();
+				}
+			} catch (SQLException e) {
+				LOG.warn("unable to close delete statement runner", e);
+			}
+		}
+	}
+
+	@Override
+	protected void attemptFlush() throws SQLException {
+		super.attemptFlush();
+		deleteExecutor.executeBatch();
+	}
+
+	private static JdbcBatchStatementExecutor<RowData> createKeyedRowExecutor(JdbcDialect dialect, int[] pkFields, LogicalType[] pkTypes, String sql, LogicalType[] logicalTypes) {
+		return JdbcBatchStatementExecutor.keyed(
+			sql,
+			createRowKeyExtractor(pkFields, logicalTypes),
+			(st, record) -> dialect
+				.getOutputConverter(logicalTypes)
+				.toExternal(createRowKeyExtractor(pkFields, logicalTypes).apply(record), st));
+	}
+
+	private static JdbcBatchStatementExecutor<RowData> createUpsertRowExecutor(JdbcDmlOptions opt, RuntimeContext ctx, TypeInformation<RowData> rowDataTypeInfo, LogicalType[] logicalTypes) {
+		checkArgument(opt.getKeyFields().isPresent());
+
+		int[] pkFields = Arrays.stream(opt.getKeyFields().get()).mapToInt(Arrays.asList(opt.getFieldNames())::indexOf).toArray();
+		LogicalType[] pkTypes = Arrays.stream(pkFields).mapToObj(f -> logicalTypes[f]).toArray(LogicalType[]::new);
+		JdbcDialect dialect = opt.getDialect();
+		final TypeSerializer<RowData> typeSerializer = rowDataTypeInfo.createSerializer(ctx.getExecutionConfig());
+		return opt.getDialect()
+			.getUpsertStatement(opt.getTableName(), opt.getFieldNames(), opt.getKeyFields().get())
+			.map(sql -> createSimpleRowDataExecutor(dialect, sql, logicalTypes, ctx, rowDataTypeInfo))
+			.orElseGet(() ->
+				new InsertOrUpdateJdbcExecutor<>(
+					opt.getDialect().getRowExistsStatement(opt.getTableName(), opt.getKeyFields().get()),
+					opt.getDialect().getInsertIntoStatement(opt.getTableName(), opt.getFieldNames()),
+					opt.getDialect().getUpdateStatement(opt.getTableName(), opt.getFieldNames(), opt.getKeyFields().get()),
+					createRowDataJdbcStatementBuilder(dialect, pkTypes),
+					createRowDataJdbcStatementBuilder(dialect, logicalTypes),
+					createRowDataJdbcStatementBuilder(dialect, logicalTypes),
+					createRowKeyExtractor(pkFields, logicalTypes),
+					ctx.getExecutionConfig().isObjectReuseEnabled() ? typeSerializer::copy : r -> r));
+	}
+
+	private static Function<RowData, RowData> createRowKeyExtractor(int[] pkFields, LogicalType[] logicalTypes) {
+		return row -> getPrimaryKey(row, pkFields, logicalTypes);
+	}
+
+	private static JdbcBatchStatementExecutor<RowData> createSimpleRowDataExecutor(JdbcDialect dialect, String sql, LogicalType[] fieldTypes, RuntimeContext ctx, TypeInformation<RowData> rowDataTypeInfo) {
+		final TypeSerializer<RowData> typeSerializer = rowDataTypeInfo.createSerializer(ctx.getExecutionConfig());
+		return JdbcBatchStatementExecutor.simple(
+			sql,
+			createRowDataJdbcStatementBuilder(dialect, fieldTypes),
+			ctx.getExecutionConfig().isObjectReuseEnabled() ? typeSerializer::copy : Function.identity());
+	}
+
+	/**
+	 * Creates a {@link JdbcStatementBuilder} for {@link Row} using the provided SQL types array.
+	 * Uses {@link JdbcUtils#setRecordToStatement}
+	 */
+	private static JdbcStatementBuilder<RowData> createRowDataJdbcStatementBuilder(JdbcDialect dialect, LogicalType[] types) {
+		return (st, record) -> dialect.getOutputConverter(types).toExternal(record, st);
+	}
+
+	private static RowData getPrimaryKey(RowData row, int[] pkFields, LogicalType[] logicalTypes) {
+		GenericRowData pkRow = new GenericRowData(pkFields.length);
+		for (int i = 0; i < pkFields.length; i++) {
+			pkRow.setField(i, RowData.get(row, pkFields[i], logicalTypes[pkFields[i]]));
+		}
+		return pkRow;
+	}
+
+	public static DynamicOutputFormatBuilder dynamicOutputFormatBuilder() {
+		return new DynamicOutputFormatBuilder();
+	}
+
+	/**
+	 * Builder for {@link JdbcDynamicOutputFormat}.
+	 */
+	public static class DynamicOutputFormatBuilder {

Review comment:
       We don't need Builder for this class. 

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormat.java
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.executor.InsertOrUpdateJdbcExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * OutputFormat for {@link JdbcDynamicTableSource}.
+ */
+@Internal
+public class JdbcDynamicOutputFormat extends JdbcBatchingOutputFormat<RowData, RowData, JdbcBatchStatementExecutor<RowData>> {
+	private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicOutputFormat.class);
+
+	private JdbcBatchStatementExecutor<RowData> deleteExecutor;
+	private final JdbcDmlOptions dmlOptions;
+	private final LogicalType[] logicalTypes;
+
+	private JdbcDynamicOutputFormat(JdbcConnectionProvider connectionProvider, JdbcDmlOptions dmlOptions, JdbcExecutionOptions batchOptions, TypeInformation<RowData> rowDataTypeInfo, LogicalType[] logicalTypes) {
+		super(connectionProvider, batchOptions, ctx -> createUpsertRowExecutor(dmlOptions, ctx, rowDataTypeInfo, logicalTypes), RecordExtractor.identity());
+		this.dmlOptions = dmlOptions;
+		this.logicalTypes = logicalTypes;
+	}
+
+	private JdbcDynamicOutputFormat(JdbcConnectionProvider connectionProvider, JdbcDmlOptions dmlOptions, JdbcExecutionOptions batchOptions, TypeInformation<RowData> rowDataTypeInfo, LogicalType[] logicalTypes, String sql) {

Review comment:
       Break the parameters into separate lines if long. 

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormat.java
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.executor.InsertOrUpdateJdbcExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * OutputFormat for {@link JdbcDynamicTableSource}.
+ */
+@Internal
+public class JdbcDynamicOutputFormat extends JdbcBatchingOutputFormat<RowData, RowData, JdbcBatchStatementExecutor<RowData>> {
+	private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicOutputFormat.class);
+
+	private JdbcBatchStatementExecutor<RowData> deleteExecutor;
+	private final JdbcDmlOptions dmlOptions;
+	private final LogicalType[] logicalTypes;
+
+	private JdbcDynamicOutputFormat(JdbcConnectionProvider connectionProvider, JdbcDmlOptions dmlOptions, JdbcExecutionOptions batchOptions, TypeInformation<RowData> rowDataTypeInfo, LogicalType[] logicalTypes) {
+		super(connectionProvider, batchOptions, ctx -> createUpsertRowExecutor(dmlOptions, ctx, rowDataTypeInfo, logicalTypes), RecordExtractor.identity());
+		this.dmlOptions = dmlOptions;
+		this.logicalTypes = logicalTypes;
+	}
+
+	private JdbcDynamicOutputFormat(JdbcConnectionProvider connectionProvider, JdbcDmlOptions dmlOptions, JdbcExecutionOptions batchOptions, TypeInformation<RowData> rowDataTypeInfo, LogicalType[] logicalTypes, String sql) {
+		super(connectionProvider, batchOptions,
+			ctx -> createSimpleRowDataExecutor(dmlOptions.getDialect(), sql, logicalTypes, ctx, rowDataTypeInfo),
+			RecordExtractor.identity());
+		this.dmlOptions = dmlOptions;
+		this.logicalTypes = logicalTypes;
+	}
+
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		super.open(taskNumber, numTasks);
+		deleteExecutor = createDeleteExecutor();
+		try {
+			deleteExecutor.open(connection);
+		} catch (SQLException e) {
+			throw new IOException(e);
+		}
+	}
+
+	private JdbcBatchStatementExecutor<RowData> createDeleteExecutor() {
+		int[] pkFields = Arrays.stream(dmlOptions.getFieldNames()).mapToInt(Arrays.asList(dmlOptions.getFieldNames())::indexOf).toArray();
+		LogicalType[] pkTypes = Arrays.stream(pkFields).mapToObj(f -> logicalTypes[f]).toArray(LogicalType[]::new);
+		String deleteSql = dmlOptions.getDialect().getDeleteStatement(dmlOptions.getTableName(), dmlOptions.getFieldNames());
+		return createKeyedRowExecutor(dmlOptions.getDialect(), pkFields, pkTypes, deleteSql, logicalTypes);
+	}
+
+	@Override
+	protected void addToBatch(RowData original, RowData extracted) throws SQLException {
+		switch (original.getRowKind()) {
+			case DELETE:

Review comment:
       add `case UPDATE_BEFORE:`, it is still possible to receive a `UPDATE_BEFORE` because ignore before is just an optimization. 

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormat.java
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.executor.InsertOrUpdateJdbcExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * OutputFormat for {@link JdbcDynamicTableSource}.
+ */
+@Internal
+public class JdbcDynamicOutputFormat extends JdbcBatchingOutputFormat<RowData, RowData, JdbcBatchStatementExecutor<RowData>> {
+	private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicOutputFormat.class);
+
+	private JdbcBatchStatementExecutor<RowData> deleteExecutor;
+	private final JdbcDmlOptions dmlOptions;
+	private final LogicalType[] logicalTypes;
+
+	private JdbcDynamicOutputFormat(JdbcConnectionProvider connectionProvider, JdbcDmlOptions dmlOptions, JdbcExecutionOptions batchOptions, TypeInformation<RowData> rowDataTypeInfo, LogicalType[] logicalTypes) {

Review comment:
       Break the parameters into separate lines if long. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org