You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/05 01:59:09 UTC

[flink] branch master updated: [FLINK-12956][jdbc] Introduce upsert table sink for JDBC

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a64a44e  [FLINK-12956][jdbc] Introduce upsert table sink for JDBC
a64a44e is described below

commit a64a44ee1a5b63731fc82ebbb9443e276e359821
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Tue Jun 25 15:07:26 2019 +0800

    [FLINK-12956][jdbc] Introduce upsert table sink for JDBC
    
    This closes #8867
---
 flink-connectors/flink-jdbc/pom.xml                |   7 +
 .../api/java/io/jdbc/AbstractJDBCOutputFormat.java |  85 +++++++
 .../java/io/jdbc/JDBCAppendTableSinkBuilder.java   |   4 +-
 .../apache/flink/api/java/io/jdbc/JDBCOptions.java | 140 ++++++++++
 .../flink/api/java/io/jdbc/JDBCOutputFormat.java   | 211 +++-------------
 .../api/java/io/jdbc/JDBCUpsertOutputFormat.java   | 281 +++++++++++++++++++++
 .../api/java/io/jdbc/JDBCUpsertSinkFunction.java   |  63 +++++
 .../api/java/io/jdbc/JDBCUpsertTableSink.java      | 193 ++++++++++++++
 .../apache/flink/api/java/io/jdbc/JDBCUtils.java   | 153 +++++++++++
 .../api/java/io/jdbc/dialect/JDBCDialect.java      | 118 +++++++++
 .../api/java/io/jdbc/dialect/JDBCDialects.java     | 137 ++++++++++
 .../api/java/io/jdbc/writer/AppendOnlyWriter.java  |  72 ++++++
 .../flink/api/java/io/jdbc/writer/JDBCWriter.java  |  52 ++++
 .../api/java/io/jdbc/writer/UpsertWriter.java      | 264 +++++++++++++++++++
 .../api/java/io/jdbc/JDBCOutputFormatTest.java     |   2 +-
 .../java/io/jdbc/JDBCUpsertOutputFormatTest.java   | 144 +++++++++++
 .../java/io/jdbc/JDBCUpsertTableSinkITCase.java    | 186 ++++++++++++++
 17 files changed, 1939 insertions(+), 173 deletions(-)

diff --git a/flink-connectors/flink-jdbc/pom.xml b/flink-connectors/flink-jdbc/pom.xml
index 6642915..acb3189 100644
--- a/flink-connectors/flink-jdbc/pom.xml
+++ b/flink-connectors/flink-jdbc/pom.xml
@@ -67,5 +67,12 @@ under the License.
 			<version>10.14.2.0</version>
 			<scope>test</scope>
 		</dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+        </dependency>
 	</dependencies>
 </project>
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/AbstractJDBCOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/AbstractJDBCOutputFormat.java
new file mode 100644
index 0000000..451baff
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/AbstractJDBCOutputFormat.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+/**
+ * OutputFormat to write Rows into a JDBC database.
+ *
+ * @see Row
+ * @see DriverManager
+ */
+public abstract class AbstractJDBCOutputFormat<T> extends RichOutputFormat<T> {
+
+	private static final long serialVersionUID = 1L;
+	static final int DEFAULT_FLUSH_MAX_SIZE = 5000;
+	static final long DEFAULT_FLUSH_INTERVAL_MILLS = 0;
+
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractJDBCOutputFormat.class);
+
+	private final String username;
+	private final String password;
+	private final String drivername;
+	protected final String dbURL;
+
+	protected transient Connection connection;
+
+	public AbstractJDBCOutputFormat(String username, String password, String drivername, String dbURL) {
+		this.username = username;
+		this.password = password;
+		this.drivername = drivername;
+		this.dbURL = dbURL;
+	}
+
+	@Override
+	public void configure(Configuration parameters) {
+	}
+
+	protected void establishConnection() throws SQLException, ClassNotFoundException {
+		Class.forName(drivername);
+		if (username == null) {
+			connection = DriverManager.getConnection(dbURL);
+		} else {
+			connection = DriverManager.getConnection(dbURL, username, password);
+		}
+	}
+
+	protected void closeDbConnection() throws IOException {
+		if (connection != null) {
+			try {
+				connection.close();
+			} catch (SQLException se) {
+				LOG.warn("JDBC connection could not be closed: " + se.getMessage());
+			} finally {
+				connection = null;
+			}
+		}
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
index da00d74..023c624 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
@@ -21,7 +21,7 @@ package org.apache.flink.api.java.io.jdbc;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.util.Preconditions;
 
-import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
+import static org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.DEFAULT_FLUSH_MAX_SIZE;
 
 /**
  * A builder to configure and build the JDBCAppendTableSink.
@@ -32,7 +32,7 @@ public class JDBCAppendTableSinkBuilder {
 	private String driverName;
 	private String dbURL;
 	private String query;
-	private int batchSize = DEFAULT_BATCH_INTERVAL;
+	private int batchSize = DEFAULT_FLUSH_MAX_SIZE;
 	private int[] parameterTypes;
 
 	/**
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOptions.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOptions.java
new file mode 100644
index 0000000..45e070d
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOptions.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+
+import java.util.Optional;
+
+/**
+ * Options for the JDBC connector.
+ */
+public class JDBCOptions {
+
+	private String dbURL;
+	private String tableName;
+	private String driverName;
+	private String username;
+	private String password;
+	private JDBCDialect dialect;
+
+	public JDBCOptions(String dbURL, String tableName, String driverName, String username,
+			String password, JDBCDialect dialect) {
+		this.dbURL = dbURL;
+		this.tableName = tableName;
+		this.driverName = driverName;
+		this.username = username;
+		this.password = password;
+		this.dialect = dialect;
+	}
+
+	public String getDbURL() {
+		return dbURL;
+	}
+
+	public String getTableName() {
+		return tableName;
+	}
+
+	public String getDriverName() {
+		return driverName;
+	}
+
+	public String getUsername() {
+		return username;
+	}
+
+	public String getPassword() {
+		return password;
+	}
+
+	public JDBCDialect getDialect() {
+		return dialect;
+	}
+
+	public static Builder builder() {
+		return new Builder();
+	}
+
+	/**
+	 * Builder of {@link JDBCOptions}.
+	 */
+	public static class Builder {
+		private String dbURL;
+		private String tableName;
+		private String driverName;
+		private String username;
+		private String password;
+		private JDBCDialect dialect;
+
+		public Builder setTableName(String tableName) {
+			this.tableName = tableName;
+			return this;
+		}
+
+		public Builder setUsername(String username) {
+			this.username = username;
+			return this;
+		}
+
+		public Builder setPassword(String password) {
+			this.password = password;
+			return this;
+		}
+
+		public Builder setDriverName(String driverName) {
+			this.driverName = driverName;
+			return this;
+		}
+
+		public Builder setDBUrl(String dbURL) {
+			this.dbURL = dbURL;
+			return this;
+		}
+
+		public Builder setDialect(JDBCDialect dialect) {
+			this.dialect = dialect;
+			return this;
+		}
+
+		public JDBCOptions build() {
+			if (this.dbURL == null) {
+				throw new IllegalArgumentException("No database URL supplied.");
+			}
+			if (this.tableName == null) {
+				throw new IllegalArgumentException("No tableName supplied.");
+			}
+			if (this.dialect == null) {
+				Optional<JDBCDialect> optional = JDBCDialects.get(dbURL);
+				this.dialect = optional.orElseGet(() -> {
+					throw new IllegalArgumentException("No dialect supplied.");
+				});
+			}
+			if (this.driverName == null) {
+				Optional<String> optional = dialect.defaultDriverName();
+				this.driverName = optional.orElseGet(() -> {
+					throw new IllegalArgumentException("No driverName supplied.");
+				});
+			}
+
+			return new JDBCOptions(dbURL, tableName, driverName, username, password, dialect);
+		}
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
index f773635..0893d7f 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -18,19 +18,18 @@
 
 package org.apache.flink.api.java.io.jdbc;
 
-import org.apache.flink.api.common.io.RichOutputFormat;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.Row;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 
+import static org.apache.flink.api.java.io.jdbc.JDBCUtils.setRecordToStatement;
+
 /**
  * OutputFormat to write Rows into a JDBC database.
  * The OutputFormat has to be configured using the supplied OutputFormatBuilder.
@@ -38,31 +37,25 @@ import java.sql.SQLException;
  * @see Row
  * @see DriverManager
  */
-public class JDBCOutputFormat extends RichOutputFormat<Row> {
+public class JDBCOutputFormat extends AbstractJDBCOutputFormat<Row> {
+
 	private static final long serialVersionUID = 1L;
-	static final int DEFAULT_BATCH_INTERVAL = 5000;
 
 	private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class);
 
-	private String username;
-	private String password;
-	private String drivername;
-	private String dbURL;
-	private String query;
-	private int batchInterval = DEFAULT_BATCH_INTERVAL;
+	private final String query;
+	private final int batchInterval;
+	private final int[] typesArray;
 
-	private Connection dbConn;
 	private PreparedStatement upload;
-
 	private int batchCount = 0;
 
-	private int[] typesArray;
-
-	public JDBCOutputFormat() {
-	}
-
-	@Override
-	public void configure(Configuration parameters) {
+	public JDBCOutputFormat(String username, String password, String drivername,
+			String dbURL, String query, int batchInterval, int[] typesArray) {
+		super(username, password, drivername, dbURL);
+		this.query = query;
+		this.batchInterval = batchInterval;
+		this.typesArray = typesArray;
 	}
 
 	/**
@@ -76,7 +69,7 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 	public void open(int taskNumber, int numTasks) throws IOException {
 		try {
 			establishConnection();
-			upload = dbConn.prepareStatement(query);
+			upload = connection.prepareStatement(query);
 		} catch (SQLException sqe) {
 			throw new IllegalArgumentException("open() failed.", sqe);
 		} catch (ClassNotFoundException cnfe) {
@@ -84,136 +77,17 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 		}
 	}
 
-	private void establishConnection() throws SQLException, ClassNotFoundException {
-		Class.forName(drivername);
-		if (username == null) {
-			dbConn = DriverManager.getConnection(dbURL);
-		} else {
-			dbConn = DriverManager.getConnection(dbURL, username, password);
-		}
-	}
-
-	/**
-	 * Adds a record to the prepared statement.
-	 *
-	 * <p>When this method is called, the output format is guaranteed to be opened.
-	 *
-	 * <p>WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
-	 * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null))
-	 *
-	 * @param row The records to add to the output.
-	 * @see PreparedStatement
-	 * @throws IOException Thrown, if the records could not be added due to an I/O problem.
-	 */
 	@Override
 	public void writeRecord(Row row) throws IOException {
-
-		if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
-			LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
-		}
 		try {
-
-			if (typesArray == null) {
-				// no types provided
-				for (int index = 0; index < row.getArity(); index++) {
-					LOG.warn("Unknown column type for column {}. Best effort approach to set its value: {}.", index + 1, row.getField(index));
-					upload.setObject(index + 1, row.getField(index));
-				}
-			} else {
-				// types provided
-				for (int index = 0; index < row.getArity(); index++) {
-
-					if (row.getField(index) == null) {
-						upload.setNull(index + 1, typesArray[index]);
-					} else {
-						try {
-							// casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
-							switch (typesArray[index]) {
-								case java.sql.Types.NULL:
-									upload.setNull(index + 1, typesArray[index]);
-									break;
-								case java.sql.Types.BOOLEAN:
-								case java.sql.Types.BIT:
-									upload.setBoolean(index + 1, (boolean) row.getField(index));
-									break;
-								case java.sql.Types.CHAR:
-								case java.sql.Types.NCHAR:
-								case java.sql.Types.VARCHAR:
-								case java.sql.Types.LONGVARCHAR:
-								case java.sql.Types.LONGNVARCHAR:
-									upload.setString(index + 1, (String) row.getField(index));
-									break;
-								case java.sql.Types.TINYINT:
-									upload.setByte(index + 1, (byte) row.getField(index));
-									break;
-								case java.sql.Types.SMALLINT:
-									upload.setShort(index + 1, (short) row.getField(index));
-									break;
-								case java.sql.Types.INTEGER:
-									upload.setInt(index + 1, (int) row.getField(index));
-									break;
-								case java.sql.Types.BIGINT:
-									upload.setLong(index + 1, (long) row.getField(index));
-									break;
-								case java.sql.Types.REAL:
-									upload.setFloat(index + 1, (float) row.getField(index));
-									break;
-								case java.sql.Types.FLOAT:
-								case java.sql.Types.DOUBLE:
-									upload.setDouble(index + 1, (double) row.getField(index));
-									break;
-								case java.sql.Types.DECIMAL:
-								case java.sql.Types.NUMERIC:
-									upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.getField(index));
-									break;
-								case java.sql.Types.DATE:
-									upload.setDate(index + 1, (java.sql.Date) row.getField(index));
-									break;
-								case java.sql.Types.TIME:
-									upload.setTime(index + 1, (java.sql.Time) row.getField(index));
-									break;
-								case java.sql.Types.TIMESTAMP:
-									upload.setTimestamp(index + 1, (java.sql.Timestamp) row.getField(index));
-									break;
-								case java.sql.Types.BINARY:
-								case java.sql.Types.VARBINARY:
-								case java.sql.Types.LONGVARBINARY:
-									upload.setBytes(index + 1, (byte[]) row.getField(index));
-									break;
-								default:
-									upload.setObject(index + 1, row.getField(index));
-									LOG.warn("Unmanaged sql type ({}) for column {}. Best effort approach to set its value: {}.",
-										typesArray[index], index + 1, row.getField(index));
-									// case java.sql.Types.SQLXML
-									// case java.sql.Types.ARRAY:
-									// case java.sql.Types.JAVA_OBJECT:
-									// case java.sql.Types.BLOB:
-									// case java.sql.Types.CLOB:
-									// case java.sql.Types.NCLOB:
-									// case java.sql.Types.DATALINK:
-									// case java.sql.Types.DISTINCT:
-									// case java.sql.Types.OTHER:
-									// case java.sql.Types.REF:
-									// case java.sql.Types.ROWID:
-									// case java.sql.Types.STRUC
-							}
-						} catch (ClassCastException e) {
-							// enrich the exception with detailed information.
-							String errorMessage = String.format(
-								"%s, field index: %s, field value: %s.", e.getMessage(), index, row.getField(index));
-							ClassCastException enrichedException = new ClassCastException(errorMessage);
-							enrichedException.setStackTrace(e.getStackTrace());
-							throw enrichedException;
-						}
-					}
-				}
-			}
+			setRecordToStatement(upload, typesArray, row);
 			upload.addBatch();
-			batchCount++;
 		} catch (SQLException e) {
 			throw new RuntimeException("Preparation of JDBC statement failed.", e);
 		}
 
+		batchCount++;
+
 		if (batchCount >= batchInterval) {
 			// execute batch
 			flush();
@@ -242,7 +116,6 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 	public void close() throws IOException {
 		if (upload != null) {
 			flush();
-			// close the connection
 			try {
 				upload.close();
 			} catch (SQLException e) {
@@ -252,15 +125,7 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 			}
 		}
 
-		if (dbConn != null) {
-			try {
-				dbConn.close();
-			} catch (SQLException se) {
-				LOG.info("JDBC connection could not be closed: " + se.getMessage());
-			} finally {
-				dbConn = null;
-			}
-		}
+		closeDbConnection();
 	}
 
 	public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
@@ -271,44 +136,48 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 	 * Builder for a {@link JDBCOutputFormat}.
 	 */
 	public static class JDBCOutputFormatBuilder {
-		private final JDBCOutputFormat format;
+		private String username;
+		private String password;
+		private String drivername;
+		private String dbURL;
+		private String query;
+		private int batchInterval = DEFAULT_FLUSH_MAX_SIZE;
+		private int[] typesArray;
 
-		protected JDBCOutputFormatBuilder() {
-			this.format = new JDBCOutputFormat();
-		}
+		protected JDBCOutputFormatBuilder() {}
 
 		public JDBCOutputFormatBuilder setUsername(String username) {
-			format.username = username;
+			this.username = username;
 			return this;
 		}
 
 		public JDBCOutputFormatBuilder setPassword(String password) {
-			format.password = password;
+			this.password = password;
 			return this;
 		}
 
 		public JDBCOutputFormatBuilder setDrivername(String drivername) {
-			format.drivername = drivername;
+			this.drivername = drivername;
 			return this;
 		}
 
 		public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
-			format.dbURL = dbURL;
+			this.dbURL = dbURL;
 			return this;
 		}
 
 		public JDBCOutputFormatBuilder setQuery(String query) {
-			format.query = query;
+			this.query = query;
 			return this;
 		}
 
 		public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) {
-			format.batchInterval = batchInterval;
+			this.batchInterval = batchInterval;
 			return this;
 		}
 
 		public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) {
-			format.typesArray = typesArray;
+			this.typesArray = typesArray;
 			return this;
 		}
 
@@ -318,23 +187,25 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 		 * @return Configured JDBCOutputFormat
 		 */
 		public JDBCOutputFormat finish() {
-			if (format.username == null) {
+			if (this.username == null) {
 				LOG.info("Username was not supplied.");
 			}
-			if (format.password == null) {
+			if (this.password == null) {
 				LOG.info("Password was not supplied.");
 			}
-			if (format.dbURL == null) {
+			if (this.dbURL == null) {
 				throw new IllegalArgumentException("No database URL supplied.");
 			}
-			if (format.query == null) {
+			if (this.query == null) {
 				throw new IllegalArgumentException("No query supplied.");
 			}
-			if (format.drivername == null) {
+			if (this.drivername == null) {
 				throw new IllegalArgumentException("No driver supplied.");
 			}
 
-			return format;
+			return new JDBCOutputFormat(
+					username, password, drivername, dbURL,
+					query, batchInterval, typesArray);
 		}
 	}
 
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormat.java
new file mode 100644
index 0000000..97989ff
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormat.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter;
+import org.apache.flink.api.java.io.jdbc.writer.JDBCWriter;
+import org.apache.flink.api.java.io.jdbc.writer.UpsertWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An upsert OutputFormat for JDBC.
+ */
+public class JDBCUpsertOutputFormat extends AbstractJDBCOutputFormat<Tuple2<Boolean, Row>> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(JDBCUpsertOutputFormat.class);
+
+	static final int DEFAULT_MAX_RETRY_TIMES = 3;
+
+	private final String tableName;
+	private final JDBCDialect dialect;
+	private final String[] fieldNames;
+	private final String[] keyFields;
+	private final int[] fieldTypes;
+
+	private final int flushMaxSize;
+	private final long flushIntervalMills;
+	private final int maxRetryTimes;
+
+	private transient JDBCWriter jdbcWriter;
+	private transient int batchCount = 0;
+	private transient volatile boolean closed = false;
+
+	private transient ScheduledExecutorService scheduler;
+	private transient ScheduledFuture scheduledFuture;
+	private transient volatile Exception flushException;
+
+	public JDBCUpsertOutputFormat(
+			JDBCOptions options,
+			String[] fieldNames,
+			String[] keyFields,
+			int[] fieldTypes,
+			int flushMaxSize,
+			long flushIntervalMills,
+			int maxRetryTimes) {
+		super(options.getUsername(), options.getPassword(), options.getDriverName(), options.getDbURL());
+		this.tableName = options.getTableName();
+		this.dialect = options.getDialect();
+		this.fieldNames = fieldNames;
+		this.keyFields = keyFields;
+		this.fieldTypes = fieldTypes;
+		this.flushMaxSize = flushMaxSize;
+		this.flushIntervalMills = flushIntervalMills;
+		this.maxRetryTimes = maxRetryTimes;
+	}
+
+	/**
+	 * Connects to the target database and initializes the prepared statement.
+	 *
+	 * @param taskNumber The number of the parallel instance.
+	 * @throws IOException Thrown, if the output could not be opened due to an
+	 * I/O problem.
+	 */
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		try {
+			establishConnection();
+			if (keyFields == null || keyFields.length == 0) {
+				String insertSQL = dialect.getInsertIntoStatement(tableName, fieldNames);
+				jdbcWriter = new AppendOnlyWriter(insertSQL, fieldTypes);
+			} else {
+				jdbcWriter = UpsertWriter.create(
+					dialect, tableName, fieldNames, fieldTypes, keyFields,
+					getRuntimeContext().getExecutionConfig().isObjectReuseEnabled());
+			}
+			jdbcWriter.open(connection);
+		} catch (SQLException sqe) {
+			throw new IllegalArgumentException("open() failed.", sqe);
+		} catch (ClassNotFoundException cnfe) {
+			throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
+		}
+
+		if (flushIntervalMills != 0) {
+			this.scheduler = Executors.newScheduledThreadPool(
+					1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
+			this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
+				if (closed) {
+					return;
+				}
+				try {
+					flush();
+				} catch (Exception e) {
+					flushException = e;
+				}
+			}, flushIntervalMills, flushIntervalMills, TimeUnit.MILLISECONDS);
+		}
+	}
+
+	private void checkFlushException() {
+		if (flushException != null) {
+			throw new RuntimeException("Writing records to JDBC failed.", flushException);
+		}
+	}
+
+	@Override
+	public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOException {
+		checkFlushException();
+
+		try {
+			jdbcWriter.addRecord(tuple2);
+			batchCount++;
+			if (batchCount >= flushMaxSize) {
+				flush();
+			}
+		} catch (Exception e) {
+			throw new RuntimeException("Writing records to JDBC failed.", e);
+		}
+	}
+
+	public synchronized void flush() throws Exception {
+		checkFlushException();
+
+		for (int i = 1; i <= maxRetryTimes; i++) {
+			try {
+				jdbcWriter.executeBatch();
+				batchCount = 0;
+				break;
+			} catch (SQLException e) {
+				LOG.error("JDBC executeBatch error, retry times = {}", i, e);
+				if (i >= maxRetryTimes) {
+					throw e;
+				}
+				Thread.sleep(1000 * i);
+			}
+		}
+	}
+
+	/**
+	 * Executes prepared statement and closes all resources of this instance.
+	 *
+	 * @throws IOException Thrown, if the input could not be closed properly.
+	 */
+	@Override
+	public synchronized void close() throws IOException {
+		if (closed) {
+			return;
+		}
+		closed = true;
+
+		checkFlushException();
+
+		if (this.scheduledFuture != null) {
+			scheduledFuture.cancel(false);
+			this.scheduler.shutdown();
+
+			try {
+				if (!scheduler.awaitTermination(10, TimeUnit.MINUTES)) {
+					throw new RuntimeException(
+							"The scheduled executor service can not properly terminate.");
+				}
+			} catch (InterruptedException e) {
+				throw new RuntimeException(e);
+			}
+		}
+
+		if (batchCount > 0) {
+			try {
+				flush();
+			} catch (Exception e) {
+				throw new RuntimeException("Writing records to JDBC failed.", e);
+			}
+		}
+
+		try {
+			jdbcWriter.close();
+		} catch (SQLException e) {
+			LOG.warn("Close JDBC writer failed.", e);
+		}
+
+		closeDbConnection();
+	}
+
+	public static Builder builder() {
+		return new Builder();
+	}
+
+	/**
+	 * Builder for a {@link JDBCUpsertOutputFormat}.
+	 */
+	public static class Builder {
+		private JDBCOptions options;
+		private String[] fieldNames;
+		private String[] keyFields;
+		private int[] fieldTypes;
+		private int flushMaxSize = DEFAULT_FLUSH_MAX_SIZE;
+		private long flushIntervalMills = DEFAULT_FLUSH_INTERVAL_MILLS;
+		private int maxRetryTimes = DEFAULT_MAX_RETRY_TIMES;
+
+		public Builder setOptions(JDBCOptions options) {
+			this.options = options;
+			return this;
+		}
+
+		public Builder setFieldNames(String[] fieldNames) {
+			this.fieldNames = fieldNames;
+			return this;
+		}
+
+		public Builder setKeyFields(String[] keyFields) {
+			this.keyFields = keyFields;
+			return this;
+		}
+
+		public Builder setFieldTypes(int[] fieldTypes) {
+			this.fieldTypes = fieldTypes;
+			return this;
+		}
+
+		public Builder setFlushMaxSize(int flushMaxSize) {
+			this.flushMaxSize = flushMaxSize;
+			return this;
+		}
+
+		public Builder setFlushIntervalMills(long flushIntervalMills) {
+			this.flushIntervalMills = flushIntervalMills;
+			return this;
+		}
+
+		public Builder setMaxRetryTimes(int maxRetryTimes) {
+			this.maxRetryTimes = maxRetryTimes;
+			return this;
+		}
+
+		/**
+		 * Finalizes the configuration and checks validity.
+		 *
+		 * @return Configured JDBCUpsertOutputFormat
+		 */
+		public JDBCUpsertOutputFormat build() {
+			if (options == null) {
+				throw new IllegalArgumentException("No options supplied.");
+			}
+			if (fieldNames == null) {
+				throw new IllegalArgumentException("No fieldNames supplied.");
+			}
+
+			return new JDBCUpsertOutputFormat(
+				options, fieldNames, keyFields, fieldTypes, flushMaxSize, flushIntervalMills, maxRetryTimes);
+		}
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertSinkFunction.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertSinkFunction.java
new file mode 100644
index 0000000..3586008
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertSinkFunction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.types.Row;
+
+class JDBCUpsertSinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>> implements CheckpointedFunction {
+	private final JDBCUpsertOutputFormat outputFormat;
+
+	JDBCUpsertSinkFunction(JDBCUpsertOutputFormat outputFormat) {
+		this.outputFormat = outputFormat;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		RuntimeContext ctx = getRuntimeContext();
+		outputFormat.setRuntimeContext(ctx);
+		outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
+	}
+
+	@Override
+	public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
+		outputFormat.writeRecord(value);
+	}
+
+	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception {
+	}
+
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		outputFormat.flush();
+	}
+
+	@Override
+	public void close() throws Exception {
+		outputFormat.close();
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java
new file mode 100644
index 0000000..58ce1ec
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sinks.UpsertStreamTableSink;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+
+import java.util.Arrays;
+
+import static org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.DEFAULT_FLUSH_INTERVAL_MILLS;
+import static org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.DEFAULT_FLUSH_MAX_SIZE;
+import static org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.DEFAULT_MAX_RETRY_TIMES;
+
+/**
+ * An upsert {@link UpsertStreamTableSink} for JDBC.
+ */
+public class JDBCUpsertTableSink implements UpsertStreamTableSink<Row> {
+
+	private final TableSchema schema;
+	private final JDBCOptions options;
+	private final int flushMaxSize;
+	private final long flushIntervalMills;
+	private final int maxRetryTime;
+
+	private String[] keyFields;
+	private boolean isAppendOnly;
+
+	private JDBCUpsertTableSink(
+		TableSchema schema,
+		JDBCOptions options,
+		int flushMaxSize,
+		long flushIntervalMills,
+		int maxRetryTime) {
+		this.schema = schema;
+		this.options = options;
+		this.flushMaxSize = flushMaxSize;
+		this.flushIntervalMills = flushIntervalMills;
+		this.maxRetryTime = maxRetryTime;
+	}
+
+	private JDBCUpsertOutputFormat newFormat() {
+		if (!isAppendOnly && (keyFields == null || keyFields.length == 0)) {
+			throw new UnsupportedOperationException("JDBCUpsertTableSink can not support ");
+		}
+
+		// sql types
+		int[] jdbcSqlTypes = Arrays.stream(schema.getFieldTypes())
+				.mapToInt(JDBCTypeUtil::typeInformationToSqlType).toArray();
+
+		return JDBCUpsertOutputFormat.builder()
+			.setOptions(options)
+			.setFieldNames(schema.getFieldNames())
+			.setFlushMaxSize(flushMaxSize)
+			.setFlushIntervalMills(flushIntervalMills)
+			.setMaxRetryTimes(maxRetryTime)
+			.setFieldTypes(jdbcSqlTypes)
+			.setKeyFields(keyFields)
+			.build();
+	}
+
+	@Override
+	public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
+		return dataStream
+				.addSink(new JDBCUpsertSinkFunction(newFormat()))
+				.name(TableConnectorUtils.generateRuntimeName(this.getClass(), schema.getFieldNames()));
+	}
+
+	@Override
+	public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
+		consumeDataStream(dataStream);
+	}
+
+	@Override
+	public void setKeyFields(String[] keys) {
+		this.keyFields = keys;
+	}
+
+	@Override
+	public void setIsAppendOnly(Boolean isAppendOnly) {
+		this.isAppendOnly = isAppendOnly;
+	}
+
+	@Override
+	public TypeInformation<Tuple2<Boolean, Row>> getOutputType() {
+		return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
+	}
+
+	@Override
+	public TypeInformation<Row> getRecordType() {
+		return new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames());
+	}
+
+	@Override
+	public String[] getFieldNames() {
+		return schema.getFieldNames();
+	}
+
+	@Override
+	public TypeInformation<?>[] getFieldTypes() {
+		return schema.getFieldTypes();
+	}
+
+	@Override
+	public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+		if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {
+			throw new ValidationException("Reconfiguration with different fields is not allowed. " +
+					"Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " +
+					"But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes));
+		}
+
+		JDBCUpsertTableSink copy = new JDBCUpsertTableSink(schema, options, flushMaxSize, flushIntervalMills, maxRetryTime);
+		copy.keyFields = keyFields;
+		return copy;
+	}
+
+	public static Builder builder() {
+		return new Builder();
+	}
+
+	/**
+	 * Builder for a {@link JDBCUpsertTableSink}.
+	 */
+	public static class Builder {
+		private TableSchema schema;
+		private JDBCOptions options;
+		private int flushMaxSize = DEFAULT_FLUSH_MAX_SIZE;
+		private long flushIntervalMills = DEFAULT_FLUSH_INTERVAL_MILLS;
+		private int maxRetryTimes = DEFAULT_MAX_RETRY_TIMES;
+
+		public Builder setTableSchema(TableSchema schema) {
+			this.schema = schema;
+			return this;
+		}
+
+		public Builder setOptions(JDBCOptions options) {
+			this.options = options;
+			return this;
+		}
+
+		public Builder setFlushMaxSize(int flushMaxSize) {
+			this.flushMaxSize = flushMaxSize;
+			return this;
+		}
+
+		public Builder setFlushIntervalMills(long flushIntervalMills) {
+			this.flushIntervalMills = flushIntervalMills;
+			return this;
+		}
+
+		public Builder setMaxRetryTimes(int maxRetryTimes) {
+			this.maxRetryTimes = maxRetryTimes;
+			return this;
+		}
+
+		public JDBCUpsertTableSink build() {
+			if (schema == null) {
+				throw new IllegalArgumentException("No schema supplied.");
+			}
+			if (options == null) {
+				throw new IllegalArgumentException("No options supplied.");
+			}
+
+			return new JDBCUpsertTableSink(schema, options, flushMaxSize, flushIntervalMills, maxRetryTimes);
+		}
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java
new file mode 100644
index 0000000..95a0f8d
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Utils for jdbc connectors.
+ */
+public class JDBCUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JDBCUtils.class);
+
+	/**
+	 * Adds a record to the prepared statement.
+	 *
+	 * <p>When this method is called, the output format is guaranteed to be opened.
+	 *
+	 * <p>WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
+	 * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null))
+	 *
+	 * @param upload The prepared statement.
+	 * @param typesArray The jdbc types of the row.
+	 * @param row The records to add to the output.
+	 * @see PreparedStatement
+	 */
+	public static void setRecordToStatement(PreparedStatement upload, int[] typesArray, Row row) throws SQLException {
+		if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
+			LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
+		}
+		if (typesArray == null) {
+			// no types provided
+			for (int index = 0; index < row.getArity(); index++) {
+				LOG.warn("Unknown column type for column {}. Best effort approach to set its value: {}.", index + 1, row.getField(index));
+				upload.setObject(index + 1, row.getField(index));
+			}
+		} else {
+			// types provided
+			for (int i = 0; i < row.getArity(); i++) {
+				setField(upload, typesArray[i], row, i);
+			}
+		}
+	}
+
+	public static void setField(PreparedStatement upload, int type, Row row, int index) throws SQLException {
+		Object field = row.getField(index);
+		if (field == null) {
+			upload.setNull(index + 1, type);
+		} else {
+			try {
+				// casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
+				switch (type) {
+					case java.sql.Types.NULL:
+						upload.setNull(index + 1, type);
+						break;
+					case java.sql.Types.BOOLEAN:
+					case java.sql.Types.BIT:
+						upload.setBoolean(index + 1, (boolean) field);
+						break;
+					case java.sql.Types.CHAR:
+					case java.sql.Types.NCHAR:
+					case java.sql.Types.VARCHAR:
+					case java.sql.Types.LONGVARCHAR:
+					case java.sql.Types.LONGNVARCHAR:
+						upload.setString(index + 1, (String) field);
+						break;
+					case java.sql.Types.TINYINT:
+						upload.setByte(index + 1, (byte) field);
+						break;
+					case java.sql.Types.SMALLINT:
+						upload.setShort(index + 1, (short) field);
+						break;
+					case java.sql.Types.INTEGER:
+						upload.setInt(index + 1, (int) field);
+						break;
+					case java.sql.Types.BIGINT:
+						upload.setLong(index + 1, (long) field);
+						break;
+					case java.sql.Types.REAL:
+						upload.setFloat(index + 1, (float) field);
+						break;
+					case java.sql.Types.FLOAT:
+					case java.sql.Types.DOUBLE:
+						upload.setDouble(index + 1, (double) field);
+						break;
+					case java.sql.Types.DECIMAL:
+					case java.sql.Types.NUMERIC:
+						upload.setBigDecimal(index + 1, (java.math.BigDecimal) field);
+						break;
+					case java.sql.Types.DATE:
+						upload.setDate(index + 1, (java.sql.Date) field);
+						break;
+					case java.sql.Types.TIME:
+						upload.setTime(index + 1, (java.sql.Time) field);
+						break;
+					case java.sql.Types.TIMESTAMP:
+						upload.setTimestamp(index + 1, (java.sql.Timestamp) field);
+						break;
+					case java.sql.Types.BINARY:
+					case java.sql.Types.VARBINARY:
+					case java.sql.Types.LONGVARBINARY:
+						upload.setBytes(index + 1, (byte[]) field);
+						break;
+					default:
+						upload.setObject(index + 1, field);
+						LOG.warn("Unmanaged sql type ({}) for column {}. Best effort approach to set its value: {}.",
+							type, index + 1, field);
+						// case java.sql.Types.SQLXML
+						// case java.sql.Types.ARRAY:
+						// case java.sql.Types.JAVA_OBJECT:
+						// case java.sql.Types.BLOB:
+						// case java.sql.Types.CLOB:
+						// case java.sql.Types.NCLOB:
+						// case java.sql.Types.DATALINK:
+						// case java.sql.Types.DISTINCT:
+						// case java.sql.Types.OTHER:
+						// case java.sql.Types.REF:
+						// case java.sql.Types.ROWID:
+						// case java.sql.Types.STRUC
+				}
+			} catch (ClassCastException e) {
+				// enrich the exception with detailed information.
+				String errorMessage = String.format(
+					"%s, field index: %s, field value: %s.", e.getMessage(), index, field);
+				ClassCastException enrichedException = new ClassCastException(errorMessage);
+				enrichedException.setStackTrace(e.getStackTrace());
+				throw enrichedException;
+			}
+		}
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialect.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialect.java
new file mode 100644
index 0000000..5a55a87
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialect.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.dialect;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Handle the SQL dialect of jdbc driver.
+ */
+public interface JDBCDialect extends Serializable {
+
+	/**
+	 * Check if this dialect instance can handle a certain jdbc url.
+	 * @param url the jdbc url.
+	 * @return True if the dialect can be applied on the given jdbc url.
+	 */
+	boolean canHandle(String url);
+
+	/**
+	 * @return the default driver class name, if user not configure the driver class name,
+	 * then will use this one.
+	 */
+	default Optional<String> defaultDriverName() {
+		return Optional.empty();
+	}
+
+	/**
+	 * Quotes the identifier. This is used to put quotes around the identifier in case the column
+	 * name is a reserved keyword, or in case it contains characters that require quotes (e.g. space).
+	 * Default using double quotes {@code "} to quote.
+	 */
+	default String quoteIdentifier(String identifier) {
+		return "\"" + identifier + "\"";
+	}
+
+	/**
+	 * Get dialect upsert statement, the database has its own upsert syntax, such as Mysql
+	 * using DUPLICATE KEY UPDATE, and PostgresSQL using ON CONFLICT... DO UPDATE SET..
+	 *
+	 * @return None if dialect does not support upsert statement, the writer will degrade to
+	 * the use of select + update/insert, this performance is poor.
+	 */
+	default Optional<String> getUpsertStatement(
+			String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+		return Optional.empty();
+	}
+
+	/**
+	 * Get row exists statement by condition fields. Default use SELECT.
+	 */
+	default String getRowExistsStatement(String tableName, String[] conditionFields) {
+		String fieldExpressions = Arrays.stream(conditionFields)
+			.map(f -> quoteIdentifier(f) + "=?")
+			.collect(Collectors.joining(" AND "));
+		return "SELECT 1 FROM " + quoteIdentifier(tableName) + " WHERE " + fieldExpressions;
+	}
+
+	/**
+	 * Get insert into statement.
+	 */
+	default String getInsertIntoStatement(String tableName, String[] fieldNames) {
+		String columns = Arrays.stream(fieldNames)
+			.map(this::quoteIdentifier)
+			.collect(Collectors.joining(", "));
+		String placeholders = Arrays.stream(fieldNames)
+			.map(f -> "?")
+			.collect(Collectors.joining(", "));
+		return "INSERT INTO " + quoteIdentifier(tableName) +
+			"(" + columns + ")" + " VALUES (" + placeholders + ")";
+	}
+
+	/**
+	 * Get update one row statement by condition fields, default not use limit 1,
+	 * because limit 1 is a sql dialect.
+	 */
+	default String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
+		String setClause = Arrays.stream(fieldNames)
+			.map(f -> quoteIdentifier(f) + "=?")
+			.collect(Collectors.joining(", "));
+		String conditionClause = Arrays.stream(conditionFields)
+			.map(f -> quoteIdentifier(f) + "=?")
+			.collect(Collectors.joining(" AND "));
+		return "UPDATE " + quoteIdentifier(tableName) +
+			" SET " + setClause +
+			" WHERE " + conditionClause;
+	}
+
+	/**
+	 * Get delete one row statement by condition fields, default not use limit 1,
+	 * because limit 1 is a sql dialect.
+	 */
+	default String getDeleteStatement(String tableName, String[] conditionFields) {
+		String conditionClause = Arrays.stream(conditionFields)
+			.map(f -> quoteIdentifier(f) + "=?")
+			.collect(Collectors.joining(" AND "));
+		return "DELETE FROM " + quoteIdentifier(tableName) + " WHERE " + conditionClause;
+	}
+
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
new file mode 100644
index 0000000..12e70db
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.dialect;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Default Jdbc dialects.
+ */
+public final class JDBCDialects {
+
+	private static final List<JDBCDialect> DIALECTS = Arrays.asList(
+		new DerbyDialect(),
+		new MySQLDialect(),
+		new PostgresDialect()
+	);
+
+	/**
+	 * Fetch the JDBCDialect class corresponding to a given database url.
+	 */
+	public static Optional<JDBCDialect> get(String url) {
+		for (JDBCDialect dialect : DIALECTS) {
+			if (dialect.canHandle(url)) {
+				return Optional.of(dialect);
+			}
+		}
+		return Optional.empty();
+	}
+
+	private static class DerbyDialect implements JDBCDialect {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean canHandle(String url) {
+			return url.startsWith("jdbc:derby:");
+		}
+
+		@Override
+		public Optional<String> defaultDriverName() {
+			return Optional.of("org.apache.derby.jdbc.EmbeddedDriver");
+		}
+
+		@Override
+		public String quoteIdentifier(String identifier) {
+			return identifier;
+		}
+	}
+
+	private static class MySQLDialect implements JDBCDialect {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean canHandle(String url) {
+			return url.startsWith("jdbc:mysql:");
+		}
+
+		@Override
+		public Optional<String> defaultDriverName() {
+			return Optional.of("com.mysql.jdbc.Driver");
+		}
+
+		@Override
+		public String quoteIdentifier(String identifier) {
+			return "`" + identifier + "`";
+		}
+
+		/**
+		 * Mysql upsert query use DUPLICATE KEY UPDATE.
+		 *
+		 * <p>NOTE: It requires Mysql's primary key to be consistent with pkFields.
+		 *
+		 * <p>We don't use REPLACE INTO, if there are other fields, we can keep their previous values.
+		 */
+		@Override
+		public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+			String updateClause = Arrays.stream(fieldNames)
+					.map(f -> quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")")
+					.collect(Collectors.joining(", "));
+			return Optional.of(getInsertIntoStatement(tableName, fieldNames) +
+					" ON DUPLICATE KEY UPDATE " + updateClause
+			);
+		}
+	}
+
+	private static class PostgresDialect implements JDBCDialect {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean canHandle(String url) {
+			return url.startsWith("jdbc:postgresql:");
+		}
+
+		@Override
+		public Optional<String> defaultDriverName() {
+			return Optional.of("org.postgresql.Driver");
+		}
+
+		/**
+		 * Postgres upsert query. It use ON CONFLICT ... DO UPDATE SET.. to replace into Postgres.
+		 */
+		@Override
+		public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+			String uniqueColumns = Arrays.stream(uniqueKeyFields)
+					.map(this::quoteIdentifier)
+					.collect(Collectors.joining(", "));
+			String updateClause = Arrays.stream(fieldNames)
+					.map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
+					.collect(Collectors.joining(", "));
+			return Optional.of(getInsertIntoStatement(tableName, fieldNames) +
+							" ON CONFLICT (" + uniqueColumns +
+							" DO UPDATE SET " + updateClause
+			);
+		}
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/writer/AppendOnlyWriter.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/writer/AppendOnlyWriter.java
new file mode 100644
index 0000000..899a22f
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/writer/AppendOnlyWriter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.writer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.Row;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import static org.apache.flink.api.java.io.jdbc.JDBCUtils.setRecordToStatement;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Just append record to jdbc, can not receive retract/delete message.
+ */
+public class AppendOnlyWriter implements JDBCWriter {
+
+	private static final long serialVersionUID = 1L;
+
+	private final String insertSQL;
+	private final int[] fieldTypes;
+
+	private transient PreparedStatement statement;
+
+	public AppendOnlyWriter(String insertSQL, int[] fieldTypes) {
+		this.insertSQL = insertSQL;
+		this.fieldTypes = fieldTypes;
+	}
+
+	@Override
+	public void open(Connection connection) throws SQLException {
+		this.statement = connection.prepareStatement(insertSQL);
+	}
+
+	@Override
+	public void addRecord(Tuple2<Boolean, Row> record) throws SQLException {
+		checkArgument(record.f0, "Append mode can not receive retract/delete message.");
+		setRecordToStatement(statement, fieldTypes, record.f1);
+		statement.addBatch();
+	}
+
+	@Override
+	public void executeBatch() throws SQLException {
+		statement.executeBatch();
+	}
+
+	@Override
+	public void close() throws SQLException {
+		if (statement != null) {
+			statement.close();
+			statement = null;
+		}
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/writer/JDBCWriter.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/writer/JDBCWriter.java
new file mode 100644
index 0000000..8ce4759
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/writer/JDBCWriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.writer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/**
+ * JDBCWriter used to execute statements (e.g. INSERT, UPSERT, DELETE).
+ */
+public interface JDBCWriter extends Serializable {
+
+	/**
+	 * Open the writer by JDBC Connection. It can create Statement from Connection.
+	 */
+	void open(Connection connection) throws SQLException;
+
+	/**
+	 * Add record to writer, the writer may cache the data.
+	 */
+	void addRecord(Tuple2<Boolean, Row> record) throws SQLException;
+
+	/**
+	 * Submits a batch of commands to the database for execution.
+	 */
+	void executeBatch() throws SQLException;
+
+	/**
+	 * Close JDBC related statements and other classes.
+	 */
+	void close() throws SQLException;
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/writer/UpsertWriter.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/writer/UpsertWriter.java
new file mode 100644
index 0000000..dcc807e
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/writer/UpsertWriter.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.writer;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.Row;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+import static org.apache.flink.api.java.io.jdbc.JDBCUtils.setRecordToStatement;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Upsert writer to deal with upsert, delete message.
+ */
+public abstract class UpsertWriter implements JDBCWriter {
+
+	private static final long serialVersionUID = 1L;
+
+	public static UpsertWriter create(
+		JDBCDialect dialect,
+		String tableName,
+		String[] fieldNames,
+		int[] fieldTypes,
+		String[] keyFields,
+		boolean objectReuse) {
+
+		checkNotNull(keyFields);
+
+		List<String> nameList = Arrays.asList(fieldNames);
+		int[] pkFields = Arrays.stream(keyFields).mapToInt(nameList::indexOf).toArray();
+		int[] pkTypes = fieldTypes == null ? null :
+			Arrays.stream(pkFields).map(f -> fieldTypes[f]).toArray();
+		String deleteSQL = dialect.getDeleteStatement(tableName, keyFields);
+
+		Optional<String> upsertSQL = dialect.getUpsertStatement(tableName, fieldNames, keyFields);
+		return upsertSQL.map((Function<String, UpsertWriter>) sql ->
+				new UpsertWriterUsingUpsertStatement(
+						fieldTypes, pkFields, pkTypes, objectReuse, deleteSQL, sql))
+				.orElseGet(() ->
+						new UpsertWriterUsingInsertUpdateStatement(
+								fieldTypes, pkFields, pkTypes, objectReuse, deleteSQL,
+								dialect.getRowExistsStatement(tableName, keyFields),
+								dialect.getInsertIntoStatement(tableName, fieldNames),
+								dialect.getUpdateStatement(tableName, fieldNames, keyFields)));
+	}
+
+	final int[] fieldTypes;
+	final int[] pkTypes;
+	private final int[] pkFields;
+	private final String deleteSQL;
+	private final boolean objectReuse;
+
+	private transient Map<Row, Tuple2<Boolean, Row>> keyToRows;
+	private transient PreparedStatement deleteStatement;
+
+	private UpsertWriter(int[] fieldTypes, int[] pkFields, int[] pkTypes, String deleteSQL, boolean objectReuse) {
+		this.fieldTypes = fieldTypes;
+		this.pkFields = pkFields;
+		this.pkTypes = pkTypes;
+		this.deleteSQL = deleteSQL;
+		this.objectReuse = objectReuse;
+	}
+
+	@Override
+	public void open(Connection connection) throws SQLException {
+		this.keyToRows = new HashMap<>();
+		this.deleteStatement = connection.prepareStatement(deleteSQL);
+	}
+
+	public void addRecord(Tuple2<Boolean, Row> record) throws SQLException {
+		// we don't need perform a deep copy, because jdbc field are immutable object.
+		Tuple2<Boolean, Row> tuple2 = objectReuse ? new Tuple2<>(record.f0, Row.copy(record.f1)) : record;
+		// add records to buffer
+		keyToRows.put(getPrimaryKey(tuple2.f1), tuple2);
+	}
+
+	@Override
+	public void executeBatch() throws SQLException {
+		if (keyToRows.size() > 0) {
+			for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
+				Row pk = entry.getKey();
+				Tuple2<Boolean, Row> tuple = entry.getValue();
+				if (tuple.f0) {
+					processOneRowInBatch(pk, tuple.f1);
+				} else {
+					setRecordToStatement(deleteStatement, pkTypes, pk);
+					deleteStatement.addBatch();
+				}
+			}
+			internalExecuteBatch();
+			deleteStatement.executeBatch();
+			keyToRows.clear();
+		}
+	}
+
+	abstract void processOneRowInBatch(Row pk, Row row) throws SQLException;
+
+	abstract void internalExecuteBatch() throws SQLException;
+
+	@Override
+	public void close() throws SQLException {
+		if (deleteStatement != null) {
+			deleteStatement.close();
+			deleteStatement = null;
+		}
+	}
+
+	private Row getPrimaryKey(Row row) {
+		Row pks = new Row(pkFields.length);
+		for (int i = 0; i < pkFields.length; i++) {
+			pks.setField(i, row.getField(pkFields[i]));
+		}
+		return pks;
+	}
+
+	// ----------------------------------------------------------------------------------------
+
+	private static final class UpsertWriterUsingUpsertStatement extends UpsertWriter {
+
+		private static final long serialVersionUID = 1L;
+		private final String upsertSQL;
+
+		private transient PreparedStatement upsertStatement;
+
+		private UpsertWriterUsingUpsertStatement(
+			int[] fieldTypes,
+			int[] pkFields,
+			int[] pkTypes,
+			boolean objectReuse,
+			String deleteSQL,
+			String upsertSQL) {
+			super(fieldTypes, pkFields, pkTypes, deleteSQL, objectReuse);
+			this.upsertSQL = upsertSQL;
+		}
+
+		@Override
+		public void open(Connection connection) throws SQLException {
+			super.open(connection);
+			upsertStatement = connection.prepareStatement(upsertSQL);
+		}
+
+		@Override
+		void processOneRowInBatch(Row pk, Row row) throws SQLException {
+			setRecordToStatement(upsertStatement, fieldTypes, row);
+			upsertStatement.addBatch();
+		}
+
+		@Override
+		void internalExecuteBatch() throws SQLException {
+			upsertStatement.executeBatch();
+		}
+
+		@Override
+		public void close() throws SQLException {
+			super.close();
+			if (upsertStatement != null) {
+				upsertStatement.close();
+				upsertStatement = null;
+			}
+		}
+	}
+
+	private static final class UpsertWriterUsingInsertUpdateStatement extends UpsertWriter {
+
+		private static final long serialVersionUID = 1L;
+		private final String existSQL;
+		private final String insertSQL;
+		private final String updateSQL;
+
+		private transient PreparedStatement existStatement;
+		private transient PreparedStatement insertStatement;
+		private transient PreparedStatement updateStatement;
+
+		private UpsertWriterUsingInsertUpdateStatement(
+			int[] fieldTypes,
+			int[] pkFields,
+			int[] pkTypes,
+			boolean objectReuse,
+			String deleteSQL,
+			String existSQL,
+			String insertSQL,
+			String updateSQL) {
+			super(fieldTypes, pkFields, pkTypes, deleteSQL, objectReuse);
+			this.existSQL = existSQL;
+			this.insertSQL = insertSQL;
+			this.updateSQL = updateSQL;
+		}
+
+		@Override
+		public void open(Connection connection) throws SQLException {
+			super.open(connection);
+			existStatement = connection.prepareStatement(existSQL);
+			insertStatement = connection.prepareStatement(insertSQL);
+			updateStatement = connection.prepareStatement(updateSQL);
+		}
+
+		@Override
+		void processOneRowInBatch(Row pk, Row row) throws SQLException {
+			setRecordToStatement(existStatement, pkTypes, pk);
+			ResultSet resultSet = existStatement.executeQuery();
+			boolean exist = resultSet.next();
+			resultSet.close();
+			if (exist) {
+				// do update
+				setRecordToStatement(updateStatement, fieldTypes, row);
+				updateStatement.addBatch();
+			} else {
+				// do insert
+				setRecordToStatement(insertStatement, fieldTypes, row);
+				insertStatement.addBatch();
+			}
+		}
+
+		@Override
+		void internalExecuteBatch() throws SQLException {
+			updateStatement.executeBatch();
+			insertStatement.executeBatch();
+		}
+
+		@Override
+		public void close() throws SQLException {
+			super.close();
+			if (existStatement != null) {
+				existStatement.close();
+				existStatement = null;
+			}
+			if (insertStatement != null) {
+				insertStatement.close();
+				insertStatement = null;
+			}
+			if (updateStatement != null) {
+				updateStatement.close();
+				updateStatement = null;
+			}
+		}
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
index 8582387..92d5d5a 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -246,7 +246,7 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 		}
 	}
 
-	private static Row toRow(TestEntry entry) {
+	static Row toRow(TestEntry entry) {
 		Row row = new Row(5);
 		row.setField(0, entry.id);
 		row.setField(1, entry.title);
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormatTest.java
new file mode 100644
index 0000000..225ab1a
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormatTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.Row;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormatTest.toRow;
+import static org.junit.Assert.assertArrayEquals;
+import static org.mockito.Mockito.doReturn;
+
+/**
+ * Tests for the {@link JDBCUpsertOutputFormat}.
+ */
+public class JDBCUpsertOutputFormatTest extends JDBCTestBase {
+
+	private JDBCUpsertOutputFormat format;
+	private String[] fieldNames;
+	private String[] keyFields;
+
+	@Before
+	public void setup() {
+		fieldNames = new String[]{"id", "title", "author", "price", "qty"};
+		keyFields = new String[]{"id"};
+	}
+
+	@Test
+	public void testJDBCOutputFormat() throws Exception {
+		format = JDBCUpsertOutputFormat.builder()
+				.setOptions(JDBCOptions.builder()
+						.setDBUrl(DB_URL)
+						.setTableName(OUTPUT_TABLE)
+						.build())
+				.setFieldNames(fieldNames)
+				.setKeyFields(keyFields)
+				.build();
+		RuntimeContext context = Mockito.mock(RuntimeContext.class);
+		ExecutionConfig config = Mockito.mock(ExecutionConfig.class);
+		doReturn(config).when(context).getExecutionConfig();
+		doReturn(true).when(config).isObjectReuseEnabled();
+		format.setRuntimeContext(context);
+		format.open(0, 1);
+
+		for (TestEntry entry : TEST_DATA) {
+			format.writeRecord(Tuple2.of(true, toRow(entry)));
+		}
+		format.flush();
+		check(Arrays.stream(TEST_DATA).map(JDBCOutputFormatTest::toRow).toArray(Row[]::new));
+
+		// override
+		for (TestEntry entry : TEST_DATA) {
+			format.writeRecord(Tuple2.of(true, toRow(entry)));
+		}
+		format.flush();
+		check(Arrays.stream(TEST_DATA).map(JDBCOutputFormatTest::toRow).toArray(Row[]::new));
+
+		// delete
+		for (int i = 0; i < TEST_DATA.length / 2; i++) {
+			format.writeRecord(Tuple2.of(false, toRow(TEST_DATA[i])));
+		}
+		Row[] expected = new Row[TEST_DATA.length - TEST_DATA.length / 2];
+		for (int i = TEST_DATA.length / 2; i < TEST_DATA.length; i++) {
+			expected[i - TEST_DATA.length / 2] = toRow(TEST_DATA[i]);
+		}
+		format.flush();
+		check(expected);
+	}
+
+	private void check(Row[] rows) throws SQLException {
+		check(rows, DB_URL, OUTPUT_TABLE, fieldNames);
+	}
+
+	static void check(Row[] rows, String url, String table, String[] fields) throws SQLException {
+		try (
+				Connection dbConn = DriverManager.getConnection(url);
+				PreparedStatement statement = dbConn.prepareStatement("select * from " + table);
+				ResultSet resultSet = statement.executeQuery()
+		) {
+			List<String> results = new ArrayList<>();
+			while (resultSet.next()) {
+				Row row = new Row(fields.length);
+				for (int i = 0; i < fields.length; i++) {
+					row.setField(i, resultSet.getObject(fields[i]));
+				}
+				results.add(row.toString());
+			}
+			String[] sortedExpect = Arrays.stream(rows).map(Row::toString).toArray(String[]::new);
+			String[] sortedResult = results.toArray(new String[0]);
+			Arrays.sort(sortedExpect);
+			Arrays.sort(sortedResult);
+			assertArrayEquals(sortedExpect, sortedResult);
+		}
+	}
+
+	@After
+	public void clearOutputTable() throws Exception {
+		if (format != null) {
+			format.close();
+		}
+		format = null;
+		Class.forName(DRIVER_CLASS);
+		try (
+			Connection conn = DriverManager.getConnection(DB_URL);
+			Statement stat = conn.createStatement()) {
+			stat.execute("DELETE FROM " + OUTPUT_TABLE);
+
+			stat.close();
+			conn.close();
+		}
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java
new file mode 100644
index 0000000..f4f0e25
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.api.java.io.jdbc.JDBCTestBase.DRIVER_CLASS;
+import static org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormatTest.check;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.INT;
+
+/**
+ * IT case for {@link JDBCUpsertTableSink}.
+ */
+public class JDBCUpsertTableSinkITCase extends AbstractTestBase {
+
+	public static final String DB_URL = "jdbc:derby:memory:upsert";
+	public static final String OUTPUT_TABLE1 = "upsertSink";
+	public static final String OUTPUT_TABLE2 = "appendSink";
+
+	@Before
+	public void before() throws ClassNotFoundException, SQLException {
+		System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
+
+		Class.forName(DRIVER_CLASS);
+		try (
+			Connection conn = DriverManager.getConnection(DB_URL + ";create=true");
+			Statement stat = conn.createStatement()) {
+			stat.executeUpdate("CREATE TABLE " + OUTPUT_TABLE1 + " (" +
+					"cnt BIGINT NOT NULL DEFAULT 0," +
+					"lencnt BIGINT NOT NULL DEFAULT 0," +
+					"cTag INT NOT NULL DEFAULT 0," +
+					"PRIMARY KEY (cnt, cTag))");
+
+			stat.executeUpdate("CREATE TABLE " + OUTPUT_TABLE2 + " (" +
+					"id INT NOT NULL DEFAULT 0," +
+					"num BIGINT NOT NULL DEFAULT 0)");
+		}
+	}
+
+	@After
+	public void clearOutputTable() throws Exception {
+		Class.forName(DRIVER_CLASS);
+		try (
+			Connection conn = DriverManager.getConnection(DB_URL);
+			Statement stat = conn.createStatement()) {
+			stat.execute("DROP TABLE " + OUTPUT_TABLE1);
+			stat.execute("DROP TABLE " + OUTPUT_TABLE2);
+		}
+	}
+
+	public static DataStream<Tuple3<Integer, Long, String>> get3TupleDataStream(StreamExecutionEnvironment env) {
+		List<Tuple3<Integer, Long, String>> data = new ArrayList<>();
+		data.add(new Tuple3<>(1, 1L, "Hi"));
+		data.add(new Tuple3<>(2, 2L, "Hello"));
+		data.add(new Tuple3<>(3, 2L, "Hello world"));
+		data.add(new Tuple3<>(4, 3L, "Hello world, how are you?"));
+		data.add(new Tuple3<>(5, 3L, "I am fine."));
+		data.add(new Tuple3<>(6, 3L, "Luke Skywalker"));
+		data.add(new Tuple3<>(7, 4L, "Comment#1"));
+		data.add(new Tuple3<>(8, 4L, "Comment#2"));
+		data.add(new Tuple3<>(9, 4L, "Comment#3"));
+		data.add(new Tuple3<>(10, 4L, "Comment#4"));
+		data.add(new Tuple3<>(11, 5L, "Comment#5"));
+		data.add(new Tuple3<>(12, 5L, "Comment#6"));
+		data.add(new Tuple3<>(13, 5L, "Comment#7"));
+		data.add(new Tuple3<>(14, 5L, "Comment#8"));
+		data.add(new Tuple3<>(15, 5L, "Comment#9"));
+		data.add(new Tuple3<>(16, 6L, "Comment#10"));
+		data.add(new Tuple3<>(17, 6L, "Comment#11"));
+		data.add(new Tuple3<>(18, 6L, "Comment#12"));
+		data.add(new Tuple3<>(19, 6L, "Comment#13"));
+		data.add(new Tuple3<>(20, 6L, "Comment#14"));
+		data.add(new Tuple3<>(21, 6L, "Comment#15"));
+
+		Collections.shuffle(data);
+		return env.fromCollection(data);
+	}
+
+	@Test
+	public void testUpsert() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+		Table t = tEnv.fromDataStream(get3TupleDataStream(env).assignTimestampsAndWatermarks(
+				new AscendingTimestampExtractor<Tuple3<Integer, Long, String>>() {
+					@Override
+					public long extractAscendingTimestamp(Tuple3<Integer, Long, String> element) {
+						return element.f0;
+					}}), "id, num, text");
+
+		tEnv.registerTable("T", t);
+
+		String[] fields = {"cnt", "lencnt", "cTag"};
+		tEnv.registerTableSink("upsertSink", JDBCUpsertTableSink.builder()
+				.setOptions(JDBCOptions.builder()
+						.setDBUrl(DB_URL)
+						.setTableName(OUTPUT_TABLE1)
+						.build())
+				.setTableSchema(TableSchema.builder().fields(
+						fields, new DataType[] {BIGINT(), BIGINT(), INT()}).build())
+				.build());
+
+		tEnv.sqlUpdate("INSERT INTO upsertSink SELECT cnt, COUNT(len) AS lencnt, cTag FROM" +
+				" (SELECT len, COUNT(id) as cnt, cTag FROM" +
+				" (SELECT id, CHAR_LENGTH(text) AS len, (CASE WHEN id > 0 THEN 1 ELSE 0 END) cTag FROM T)" +
+				" GROUP BY len, cTag)" +
+				" GROUP BY cnt, cTag");
+		env.execute();
+		check(new Row[] {
+				Row.of(1, 5, 1),
+				Row.of(7, 1, 1),
+				Row.of(9, 1, 1)
+		}, DB_URL, OUTPUT_TABLE1, fields);
+	}
+
+	@Test
+	public void testAppend() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+		env.getConfig().setParallelism(1);
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+		Table t = tEnv.fromDataStream(get3TupleDataStream(env), "id, num, text");
+
+		tEnv.registerTable("T", t);
+
+		String[] fields = {"id", "num"};
+		tEnv.registerTableSink("upsertSink", JDBCUpsertTableSink.builder()
+				.setOptions(JDBCOptions.builder()
+						.setDBUrl(DB_URL)
+						.setTableName(OUTPUT_TABLE2)
+						.build())
+				.setTableSchema(TableSchema.builder().fields(
+						fields, new DataType[] {INT(), BIGINT()}).build())
+				.build());
+
+		tEnv.sqlUpdate("INSERT INTO upsertSink SELECT id, num FROM T WHERE id IN (2, 10, 20)");
+		env.execute();
+		check(new Row[] {
+				Row.of(2, 2),
+				Row.of(10, 4),
+				Row.of(20, 6)
+		}, DB_URL, OUTPUT_TABLE2, fields);
+	}
+}