You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/05 01:59:09 UTC
[flink] branch master updated: [FLINK-12956][jdbc] Introduce upsert
table sink for JDBC
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a64a44e [FLINK-12956][jdbc] Introduce upsert table sink for JDBC
a64a44e is described below
commit a64a44ee1a5b63731fc82ebbb9443e276e359821
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Tue Jun 25 15:07:26 2019 +0800
[FLINK-12956][jdbc] Introduce upsert table sink for JDBC
This closes #8867
---
flink-connectors/flink-jdbc/pom.xml | 7 +
.../api/java/io/jdbc/AbstractJDBCOutputFormat.java | 85 +++++++
.../java/io/jdbc/JDBCAppendTableSinkBuilder.java | 4 +-
.../apache/flink/api/java/io/jdbc/JDBCOptions.java | 140 ++++++++++
.../flink/api/java/io/jdbc/JDBCOutputFormat.java | 211 +++-------------
.../api/java/io/jdbc/JDBCUpsertOutputFormat.java | 281 +++++++++++++++++++++
.../api/java/io/jdbc/JDBCUpsertSinkFunction.java | 63 +++++
.../api/java/io/jdbc/JDBCUpsertTableSink.java | 193 ++++++++++++++
.../apache/flink/api/java/io/jdbc/JDBCUtils.java | 153 +++++++++++
.../api/java/io/jdbc/dialect/JDBCDialect.java | 118 +++++++++
.../api/java/io/jdbc/dialect/JDBCDialects.java | 137 ++++++++++
.../api/java/io/jdbc/writer/AppendOnlyWriter.java | 72 ++++++
.../flink/api/java/io/jdbc/writer/JDBCWriter.java | 52 ++++
.../api/java/io/jdbc/writer/UpsertWriter.java | 264 +++++++++++++++++++
.../api/java/io/jdbc/JDBCOutputFormatTest.java | 2 +-
.../java/io/jdbc/JDBCUpsertOutputFormatTest.java | 144 +++++++++++
.../java/io/jdbc/JDBCUpsertTableSinkITCase.java | 186 ++++++++++++++
17 files changed, 1939 insertions(+), 173 deletions(-)
diff --git a/flink-connectors/flink-jdbc/pom.xml b/flink-connectors/flink-jdbc/pom.xml
index 6642915..acb3189 100644
--- a/flink-connectors/flink-jdbc/pom.xml
+++ b/flink-connectors/flink-jdbc/pom.xml
@@ -67,5 +67,12 @@ under the License.
<version>10.14.2.0</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/AbstractJDBCOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/AbstractJDBCOutputFormat.java
new file mode 100644
index 0000000..451baff
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/AbstractJDBCOutputFormat.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+/**
+ * OutputFormat to write Rows into a JDBC database.
+ *
+ * @see Row
+ * @see DriverManager
+ */
+public abstract class AbstractJDBCOutputFormat<T> extends RichOutputFormat<T> {
+
+ private static final long serialVersionUID = 1L;
+ static final int DEFAULT_FLUSH_MAX_SIZE = 5000;
+ static final long DEFAULT_FLUSH_INTERVAL_MILLS = 0;
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractJDBCOutputFormat.class);
+
+ private final String username;
+ private final String password;
+ private final String drivername;
+ protected final String dbURL;
+
+ protected transient Connection connection;
+
+ public AbstractJDBCOutputFormat(String username, String password, String drivername, String dbURL) {
+ this.username = username;
+ this.password = password;
+ this.drivername = drivername;
+ this.dbURL = dbURL;
+ }
+
+ @Override
+ public void configure(Configuration parameters) {
+ }
+
+ protected void establishConnection() throws SQLException, ClassNotFoundException {
+ Class.forName(drivername);
+ if (username == null) {
+ connection = DriverManager.getConnection(dbURL);
+ } else {
+ connection = DriverManager.getConnection(dbURL, username, password);
+ }
+ }
+
+ protected void closeDbConnection() throws IOException {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException se) {
+ LOG.warn("JDBC connection could not be closed: " + se.getMessage());
+ } finally {
+ connection = null;
+ }
+ }
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
index da00d74..023c624 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
@@ -21,7 +21,7 @@ package org.apache.flink.api.java.io.jdbc;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Preconditions;
-import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
+import static org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.DEFAULT_FLUSH_MAX_SIZE;
/**
* A builder to configure and build the JDBCAppendTableSink.
@@ -32,7 +32,7 @@ public class JDBCAppendTableSinkBuilder {
private String driverName;
private String dbURL;
private String query;
- private int batchSize = DEFAULT_BATCH_INTERVAL;
+ private int batchSize = DEFAULT_FLUSH_MAX_SIZE;
private int[] parameterTypes;
/**
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOptions.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOptions.java
new file mode 100644
index 0000000..45e070d
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOptions.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+
+import java.util.Optional;
+
+/**
+ * Options for the JDBC connector.
+ */
+public class JDBCOptions {
+
+ private String dbURL;
+ private String tableName;
+ private String driverName;
+ private String username;
+ private String password;
+ private JDBCDialect dialect;
+
+ public JDBCOptions(String dbURL, String tableName, String driverName, String username,
+ String password, JDBCDialect dialect) {
+ this.dbURL = dbURL;
+ this.tableName = tableName;
+ this.driverName = driverName;
+ this.username = username;
+ this.password = password;
+ this.dialect = dialect;
+ }
+
+ public String getDbURL() {
+ return dbURL;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String getDriverName() {
+ return driverName;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public JDBCDialect getDialect() {
+ return dialect;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder of {@link JDBCOptions}.
+ */
+ public static class Builder {
+ private String dbURL;
+ private String tableName;
+ private String driverName;
+ private String username;
+ private String password;
+ private JDBCDialect dialect;
+
+ public Builder setTableName(String tableName) {
+ this.tableName = tableName;
+ return this;
+ }
+
+ public Builder setUsername(String username) {
+ this.username = username;
+ return this;
+ }
+
+ public Builder setPassword(String password) {
+ this.password = password;
+ return this;
+ }
+
+ public Builder setDriverName(String driverName) {
+ this.driverName = driverName;
+ return this;
+ }
+
+ public Builder setDBUrl(String dbURL) {
+ this.dbURL = dbURL;
+ return this;
+ }
+
+ public Builder setDialect(JDBCDialect dialect) {
+ this.dialect = dialect;
+ return this;
+ }
+
+ public JDBCOptions build() {
+ if (this.dbURL == null) {
+ throw new IllegalArgumentException("No database URL supplied.");
+ }
+ if (this.tableName == null) {
+ throw new IllegalArgumentException("No tableName supplied.");
+ }
+ if (this.dialect == null) {
+ Optional<JDBCDialect> optional = JDBCDialects.get(dbURL);
+ this.dialect = optional.orElseGet(() -> {
+ throw new IllegalArgumentException("No dialect supplied.");
+ });
+ }
+ if (this.driverName == null) {
+ Optional<String> optional = dialect.defaultDriverName();
+ this.driverName = optional.orElseGet(() -> {
+ throw new IllegalArgumentException("No driverName supplied.");
+ });
+ }
+
+ return new JDBCOptions(dbURL, tableName, driverName, username, password, dialect);
+ }
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
index f773635..0893d7f 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -18,19 +18,18 @@
package org.apache.flink.api.java.io.jdbc;
-import org.apache.flink.api.common.io.RichOutputFormat;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import static org.apache.flink.api.java.io.jdbc.JDBCUtils.setRecordToStatement;
+
/**
* OutputFormat to write Rows into a JDBC database.
* The OutputFormat has to be configured using the supplied OutputFormatBuilder.
@@ -38,31 +37,25 @@ import java.sql.SQLException;
* @see Row
* @see DriverManager
*/
-public class JDBCOutputFormat extends RichOutputFormat<Row> {
+public class JDBCOutputFormat extends AbstractJDBCOutputFormat<Row> {
+
private static final long serialVersionUID = 1L;
- static final int DEFAULT_BATCH_INTERVAL = 5000;
private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class);
- private String username;
- private String password;
- private String drivername;
- private String dbURL;
- private String query;
- private int batchInterval = DEFAULT_BATCH_INTERVAL;
+ private final String query;
+ private final int batchInterval;
+ private final int[] typesArray;
- private Connection dbConn;
private PreparedStatement upload;
-
private int batchCount = 0;
- private int[] typesArray;
-
- public JDBCOutputFormat() {
- }
-
- @Override
- public void configure(Configuration parameters) {
+ public JDBCOutputFormat(String username, String password, String drivername,
+ String dbURL, String query, int batchInterval, int[] typesArray) {
+ super(username, password, drivername, dbURL);
+ this.query = query;
+ this.batchInterval = batchInterval;
+ this.typesArray = typesArray;
}
/**
@@ -76,7 +69,7 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
public void open(int taskNumber, int numTasks) throws IOException {
try {
establishConnection();
- upload = dbConn.prepareStatement(query);
+ upload = connection.prepareStatement(query);
} catch (SQLException sqe) {
throw new IllegalArgumentException("open() failed.", sqe);
} catch (ClassNotFoundException cnfe) {
@@ -84,136 +77,17 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
}
}
- private void establishConnection() throws SQLException, ClassNotFoundException {
- Class.forName(drivername);
- if (username == null) {
- dbConn = DriverManager.getConnection(dbURL);
- } else {
- dbConn = DriverManager.getConnection(dbURL, username, password);
- }
- }
-
- /**
- * Adds a record to the prepared statement.
- *
- * <p>When this method is called, the output format is guaranteed to be opened.
- *
- * <p>WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
- * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null))
- *
- * @param row The records to add to the output.
- * @see PreparedStatement
- * @throws IOException Thrown, if the records could not be added due to an I/O problem.
- */
@Override
public void writeRecord(Row row) throws IOException {
-
- if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
- LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
- }
try {
-
- if (typesArray == null) {
- // no types provided
- for (int index = 0; index < row.getArity(); index++) {
- LOG.warn("Unknown column type for column {}. Best effort approach to set its value: {}.", index + 1, row.getField(index));
- upload.setObject(index + 1, row.getField(index));
- }
- } else {
- // types provided
- for (int index = 0; index < row.getArity(); index++) {
-
- if (row.getField(index) == null) {
- upload.setNull(index + 1, typesArray[index]);
- } else {
- try {
- // casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
- switch (typesArray[index]) {
- case java.sql.Types.NULL:
- upload.setNull(index + 1, typesArray[index]);
- break;
- case java.sql.Types.BOOLEAN:
- case java.sql.Types.BIT:
- upload.setBoolean(index + 1, (boolean) row.getField(index));
- break;
- case java.sql.Types.CHAR:
- case java.sql.Types.NCHAR:
- case java.sql.Types.VARCHAR:
- case java.sql.Types.LONGVARCHAR:
- case java.sql.Types.LONGNVARCHAR:
- upload.setString(index + 1, (String) row.getField(index));
- break;
- case java.sql.Types.TINYINT:
- upload.setByte(index + 1, (byte) row.getField(index));
- break;
- case java.sql.Types.SMALLINT:
- upload.setShort(index + 1, (short) row.getField(index));
- break;
- case java.sql.Types.INTEGER:
- upload.setInt(index + 1, (int) row.getField(index));
- break;
- case java.sql.Types.BIGINT:
- upload.setLong(index + 1, (long) row.getField(index));
- break;
- case java.sql.Types.REAL:
- upload.setFloat(index + 1, (float) row.getField(index));
- break;
- case java.sql.Types.FLOAT:
- case java.sql.Types.DOUBLE:
- upload.setDouble(index + 1, (double) row.getField(index));
- break;
- case java.sql.Types.DECIMAL:
- case java.sql.Types.NUMERIC:
- upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.getField(index));
- break;
- case java.sql.Types.DATE:
- upload.setDate(index + 1, (java.sql.Date) row.getField(index));
- break;
- case java.sql.Types.TIME:
- upload.setTime(index + 1, (java.sql.Time) row.getField(index));
- break;
- case java.sql.Types.TIMESTAMP:
- upload.setTimestamp(index + 1, (java.sql.Timestamp) row.getField(index));
- break;
- case java.sql.Types.BINARY:
- case java.sql.Types.VARBINARY:
- case java.sql.Types.LONGVARBINARY:
- upload.setBytes(index + 1, (byte[]) row.getField(index));
- break;
- default:
- upload.setObject(index + 1, row.getField(index));
- LOG.warn("Unmanaged sql type ({}) for column {}. Best effort approach to set its value: {}.",
- typesArray[index], index + 1, row.getField(index));
- // case java.sql.Types.SQLXML
- // case java.sql.Types.ARRAY:
- // case java.sql.Types.JAVA_OBJECT:
- // case java.sql.Types.BLOB:
- // case java.sql.Types.CLOB:
- // case java.sql.Types.NCLOB:
- // case java.sql.Types.DATALINK:
- // case java.sql.Types.DISTINCT:
- // case java.sql.Types.OTHER:
- // case java.sql.Types.REF:
- // case java.sql.Types.ROWID:
- // case java.sql.Types.STRUC
- }
- } catch (ClassCastException e) {
- // enrich the exception with detailed information.
- String errorMessage = String.format(
- "%s, field index: %s, field value: %s.", e.getMessage(), index, row.getField(index));
- ClassCastException enrichedException = new ClassCastException(errorMessage);
- enrichedException.setStackTrace(e.getStackTrace());
- throw enrichedException;
- }
- }
- }
- }
+ setRecordToStatement(upload, typesArray, row);
upload.addBatch();
- batchCount++;
} catch (SQLException e) {
throw new RuntimeException("Preparation of JDBC statement failed.", e);
}
+ batchCount++;
+
if (batchCount >= batchInterval) {
// execute batch
flush();
@@ -242,7 +116,6 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
public void close() throws IOException {
if (upload != null) {
flush();
- // close the connection
try {
upload.close();
} catch (SQLException e) {
@@ -252,15 +125,7 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
}
}
- if (dbConn != null) {
- try {
- dbConn.close();
- } catch (SQLException se) {
- LOG.info("JDBC connection could not be closed: " + se.getMessage());
- } finally {
- dbConn = null;
- }
- }
+ closeDbConnection();
}
public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
@@ -271,44 +136,48 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
* Builder for a {@link JDBCOutputFormat}.
*/
public static class JDBCOutputFormatBuilder {
- private final JDBCOutputFormat format;
+ private String username;
+ private String password;
+ private String drivername;
+ private String dbURL;
+ private String query;
+ private int batchInterval = DEFAULT_FLUSH_MAX_SIZE;
+ private int[] typesArray;
- protected JDBCOutputFormatBuilder() {
- this.format = new JDBCOutputFormat();
- }
+ protected JDBCOutputFormatBuilder() {}
public JDBCOutputFormatBuilder setUsername(String username) {
- format.username = username;
+ this.username = username;
return this;
}
public JDBCOutputFormatBuilder setPassword(String password) {
- format.password = password;
+ this.password = password;
return this;
}
public JDBCOutputFormatBuilder setDrivername(String drivername) {
- format.drivername = drivername;
+ this.drivername = drivername;
return this;
}
public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
- format.dbURL = dbURL;
+ this.dbURL = dbURL;
return this;
}
public JDBCOutputFormatBuilder setQuery(String query) {
- format.query = query;
+ this.query = query;
return this;
}
public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) {
- format.batchInterval = batchInterval;
+ this.batchInterval = batchInterval;
return this;
}
public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) {
- format.typesArray = typesArray;
+ this.typesArray = typesArray;
return this;
}
@@ -318,23 +187,25 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
* @return Configured JDBCOutputFormat
*/
public JDBCOutputFormat finish() {
- if (format.username == null) {
+ if (this.username == null) {
LOG.info("Username was not supplied.");
}
- if (format.password == null) {
+ if (this.password == null) {
LOG.info("Password was not supplied.");
}
- if (format.dbURL == null) {
+ if (this.dbURL == null) {
throw new IllegalArgumentException("No database URL supplied.");
}
- if (format.query == null) {
+ if (this.query == null) {
throw new IllegalArgumentException("No query supplied.");
}
- if (format.drivername == null) {
+ if (this.drivername == null) {
throw new IllegalArgumentException("No driver supplied.");
}
- return format;
+ return new JDBCOutputFormat(
+ username, password, drivername, dbURL,
+ query, batchInterval, typesArray);
}
}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormat.java
new file mode 100644
index 0000000..97989ff
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormat.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter;
+import org.apache.flink.api.java.io.jdbc.writer.JDBCWriter;
+import org.apache.flink.api.java.io.jdbc.writer.UpsertWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An upsert OutputFormat for JDBC.
+ */
+public class JDBCUpsertOutputFormat extends AbstractJDBCOutputFormat<Tuple2<Boolean, Row>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(JDBCUpsertOutputFormat.class);
+
+ static final int DEFAULT_MAX_RETRY_TIMES = 3;
+
+ private final String tableName;
+ private final JDBCDialect dialect;
+ private final String[] fieldNames;
+ private final String[] keyFields;
+ private final int[] fieldTypes;
+
+ private final int flushMaxSize;
+ private final long flushIntervalMills;
+ private final int maxRetryTimes;
+
+ private transient JDBCWriter jdbcWriter;
+ private transient int batchCount = 0;
+ private transient volatile boolean closed = false;
+
+ private transient ScheduledExecutorService scheduler;
+ private transient ScheduledFuture scheduledFuture;
+ private transient volatile Exception flushException;
+
+ public JDBCUpsertOutputFormat(
+ JDBCOptions options,
+ String[] fieldNames,
+ String[] keyFields,
+ int[] fieldTypes,
+ int flushMaxSize,
+ long flushIntervalMills,
+ int maxRetryTimes) {
+ super(options.getUsername(), options.getPassword(), options.getDriverName(), options.getDbURL());
+ this.tableName = options.getTableName();
+ this.dialect = options.getDialect();
+ this.fieldNames = fieldNames;
+ this.keyFields = keyFields;
+ this.fieldTypes = fieldTypes;
+ this.flushMaxSize = flushMaxSize;
+ this.flushIntervalMills = flushIntervalMills;
+ this.maxRetryTimes = maxRetryTimes;
+ }
+
+ /**
+ * Connects to the target database and initializes the prepared statement.
+ *
+ * @param taskNumber The number of the parallel instance.
+ * @throws IOException Thrown, if the output could not be opened due to an
+ * I/O problem.
+ */
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ try {
+ establishConnection();
+ if (keyFields == null || keyFields.length == 0) {
+ String insertSQL = dialect.getInsertIntoStatement(tableName, fieldNames);
+ jdbcWriter = new AppendOnlyWriter(insertSQL, fieldTypes);
+ } else {
+ jdbcWriter = UpsertWriter.create(
+ dialect, tableName, fieldNames, fieldTypes, keyFields,
+ getRuntimeContext().getExecutionConfig().isObjectReuseEnabled());
+ }
+ jdbcWriter.open(connection);
+ } catch (SQLException sqe) {
+ throw new IllegalArgumentException("open() failed.", sqe);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
+ }
+
+ if (flushIntervalMills != 0) {
+ this.scheduler = Executors.newScheduledThreadPool(
+ 1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
+ this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
+ if (closed) {
+ return;
+ }
+ try {
+ flush();
+ } catch (Exception e) {
+ flushException = e;
+ }
+ }, flushIntervalMills, flushIntervalMills, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private void checkFlushException() {
+ if (flushException != null) {
+ throw new RuntimeException("Writing records to JDBC failed.", flushException);
+ }
+ }
+
+ @Override
+ public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOException {
+ checkFlushException();
+
+ try {
+ jdbcWriter.addRecord(tuple2);
+ batchCount++;
+ if (batchCount >= flushMaxSize) {
+ flush();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Writing records to JDBC failed.", e);
+ }
+ }
+
+ public synchronized void flush() throws Exception {
+ checkFlushException();
+
+ for (int i = 1; i <= maxRetryTimes; i++) {
+ try {
+ jdbcWriter.executeBatch();
+ batchCount = 0;
+ break;
+ } catch (SQLException e) {
+ LOG.error("JDBC executeBatch error, retry times = {}", i, e);
+ if (i >= maxRetryTimes) {
+ throw e;
+ }
+ Thread.sleep(1000 * i);
+ }
+ }
+ }
+
+ /**
+ * Executes prepared statement and closes all resources of this instance.
+ *
+ * @throws IOException Thrown, if the input could not be closed properly.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ closed = true;
+
+ checkFlushException();
+
+ if (this.scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ this.scheduler.shutdown();
+
+ try {
+ if (!scheduler.awaitTermination(10, TimeUnit.MINUTES)) {
+ throw new RuntimeException(
+ "The scheduled executor service can not properly terminate.");
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ if (batchCount > 0) {
+ try {
+ flush();
+ } catch (Exception e) {
+ throw new RuntimeException("Writing records to JDBC failed.", e);
+ }
+ }
+
+ try {
+ jdbcWriter.close();
+ } catch (SQLException e) {
+ LOG.warn("Close JDBC writer failed.", e);
+ }
+
+ closeDbConnection();
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for a {@link JDBCUpsertOutputFormat}.
+ */
+ public static class Builder {
+ private JDBCOptions options;
+ private String[] fieldNames;
+ private String[] keyFields;
+ private int[] fieldTypes;
+ private int flushMaxSize = DEFAULT_FLUSH_MAX_SIZE;
+ private long flushIntervalMills = DEFAULT_FLUSH_INTERVAL_MILLS;
+ private int maxRetryTimes = DEFAULT_MAX_RETRY_TIMES;
+
+ public Builder setOptions(JDBCOptions options) {
+ this.options = options;
+ return this;
+ }
+
+ public Builder setFieldNames(String[] fieldNames) {
+ this.fieldNames = fieldNames;
+ return this;
+ }
+
+ public Builder setKeyFields(String[] keyFields) {
+ this.keyFields = keyFields;
+ return this;
+ }
+
+ public Builder setFieldTypes(int[] fieldTypes) {
+ this.fieldTypes = fieldTypes;
+ return this;
+ }
+
+ public Builder setFlushMaxSize(int flushMaxSize) {
+ this.flushMaxSize = flushMaxSize;
+ return this;
+ }
+
+ public Builder setFlushIntervalMills(long flushIntervalMills) {
+ this.flushIntervalMills = flushIntervalMills;
+ return this;
+ }
+
+ public Builder setMaxRetryTimes(int maxRetryTimes) {
+ this.maxRetryTimes = maxRetryTimes;
+ return this;
+ }
+
+ /**
+ * Finalizes the configuration and checks validity.
+ *
+ * @return Configured JDBCUpsertOutputFormat
+ */
+ public JDBCUpsertOutputFormat build() {
+ if (options == null) {
+ throw new IllegalArgumentException("No options supplied.");
+ }
+ if (fieldNames == null) {
+ throw new IllegalArgumentException("No fieldNames supplied.");
+ }
+
+ return new JDBCUpsertOutputFormat(
+ options, fieldNames, keyFields, fieldTypes, flushMaxSize, flushIntervalMills, maxRetryTimes);
+ }
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertSinkFunction.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertSinkFunction.java
new file mode 100644
index 0000000..3586008
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertSinkFunction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.types.Row;
+
+class JDBCUpsertSinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>> implements CheckpointedFunction {
+ private final JDBCUpsertOutputFormat outputFormat;
+
+ JDBCUpsertSinkFunction(JDBCUpsertOutputFormat outputFormat) {
+ this.outputFormat = outputFormat;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ RuntimeContext ctx = getRuntimeContext();
+ outputFormat.setRuntimeContext(ctx);
+ outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
+ }
+
+ @Override
+ public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
+ outputFormat.writeRecord(value);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ outputFormat.flush();
+ }
+
+ @Override
+ public void close() throws Exception {
+ outputFormat.close();
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java
new file mode 100644
index 0000000..58ce1ec
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sinks.UpsertStreamTableSink;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+
+import java.util.Arrays;
+
+import static org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.DEFAULT_FLUSH_INTERVAL_MILLS;
+import static org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.DEFAULT_FLUSH_MAX_SIZE;
+import static org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.DEFAULT_MAX_RETRY_TIMES;
+
+/**
+ * An upsert {@link UpsertStreamTableSink} for JDBC.
+ */
+public class JDBCUpsertTableSink implements UpsertStreamTableSink<Row> {
+
+ private final TableSchema schema;
+ private final JDBCOptions options;
+ private final int flushMaxSize;
+ private final long flushIntervalMills;
+ private final int maxRetryTime;
+
+ private String[] keyFields;
+ private boolean isAppendOnly;
+
+ private JDBCUpsertTableSink(
+ TableSchema schema,
+ JDBCOptions options,
+ int flushMaxSize,
+ long flushIntervalMills,
+ int maxRetryTime) {
+ this.schema = schema;
+ this.options = options;
+ this.flushMaxSize = flushMaxSize;
+ this.flushIntervalMills = flushIntervalMills;
+ this.maxRetryTime = maxRetryTime;
+ }
+
+ private JDBCUpsertOutputFormat newFormat() {
+ if (!isAppendOnly && (keyFields == null || keyFields.length == 0)) {
+ throw new UnsupportedOperationException("JDBCUpsertTableSink can not support ");
+ }
+
+ // sql types
+ int[] jdbcSqlTypes = Arrays.stream(schema.getFieldTypes())
+ .mapToInt(JDBCTypeUtil::typeInformationToSqlType).toArray();
+
+ return JDBCUpsertOutputFormat.builder()
+ .setOptions(options)
+ .setFieldNames(schema.getFieldNames())
+ .setFlushMaxSize(flushMaxSize)
+ .setFlushIntervalMills(flushIntervalMills)
+ .setMaxRetryTimes(maxRetryTime)
+ .setFieldTypes(jdbcSqlTypes)
+ .setKeyFields(keyFields)
+ .build();
+ }
+
+ @Override
+ public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
+ return dataStream
+ .addSink(new JDBCUpsertSinkFunction(newFormat()))
+ .name(TableConnectorUtils.generateRuntimeName(this.getClass(), schema.getFieldNames()));
+ }
+
+ @Override
+ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
+ consumeDataStream(dataStream);
+ }
+
+ @Override
+ public void setKeyFields(String[] keys) {
+ this.keyFields = keys;
+ }
+
+ @Override
+ public void setIsAppendOnly(Boolean isAppendOnly) {
+ this.isAppendOnly = isAppendOnly;
+ }
+
+ @Override
+ public TypeInformation<Tuple2<Boolean, Row>> getOutputType() {
+ return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
+ }
+
+ @Override
+ public TypeInformation<Row> getRecordType() {
+ return new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames());
+ }
+
+ @Override
+ public String[] getFieldNames() {
+ return schema.getFieldNames();
+ }
+
+ @Override
+ public TypeInformation<?>[] getFieldTypes() {
+ return schema.getFieldTypes();
+ }
+
+ @Override
+ public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+ if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {
+ throw new ValidationException("Reconfiguration with different fields is not allowed. " +
+ "Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " +
+ "But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes));
+ }
+
+ JDBCUpsertTableSink copy = new JDBCUpsertTableSink(schema, options, flushMaxSize, flushIntervalMills, maxRetryTime);
+ copy.keyFields = keyFields;
+ return copy;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for a {@link JDBCUpsertTableSink}.
+ */
+ public static class Builder {
+ private TableSchema schema;
+ private JDBCOptions options;
+ private int flushMaxSize = DEFAULT_FLUSH_MAX_SIZE;
+ private long flushIntervalMills = DEFAULT_FLUSH_INTERVAL_MILLS;
+ private int maxRetryTimes = DEFAULT_MAX_RETRY_TIMES;
+
+ public Builder setTableSchema(TableSchema schema) {
+ this.schema = schema;
+ return this;
+ }
+
+ public Builder setOptions(JDBCOptions options) {
+ this.options = options;
+ return this;
+ }
+
+ public Builder setFlushMaxSize(int flushMaxSize) {
+ this.flushMaxSize = flushMaxSize;
+ return this;
+ }
+
+ public Builder setFlushIntervalMills(long flushIntervalMills) {
+ this.flushIntervalMills = flushIntervalMills;
+ return this;
+ }
+
+ public Builder setMaxRetryTimes(int maxRetryTimes) {
+ this.maxRetryTimes = maxRetryTimes;
+ return this;
+ }
+
+ public JDBCUpsertTableSink build() {
+ if (schema == null) {
+ throw new IllegalArgumentException("No schema supplied.");
+ }
+ if (options == null) {
+ throw new IllegalArgumentException("No options supplied.");
+ }
+
+ return new JDBCUpsertTableSink(schema, options, flushMaxSize, flushIntervalMills, maxRetryTimes);
+ }
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java
new file mode 100644
index 0000000..95a0f8d
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Utils for jdbc connectors.
+ */
+public class JDBCUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JDBCUtils.class);
+
+ /**
+ * Adds a record to the prepared statement.
+ *
+ * <p>When this method is called, the output format is guaranteed to be opened.
+ *
+ * <p>WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
+ * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null))
+ *
+ * @param upload The prepared statement.
+ * @param typesArray The jdbc types of the row.
+ * @param row The records to add to the output.
+ * @see PreparedStatement
+ */
+ public static void setRecordToStatement(PreparedStatement upload, int[] typesArray, Row row) throws SQLException {
+ if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
+ LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
+ }
+ if (typesArray == null) {
+ // no types provided
+ for (int index = 0; index < row.getArity(); index++) {
+ LOG.warn("Unknown column type for column {}. Best effort approach to set its value: {}.", index + 1, row.getField(index));
+ upload.setObject(index + 1, row.getField(index));
+ }
+ } else {
+ // types provided
+ for (int i = 0; i < row.getArity(); i++) {
+ setField(upload, typesArray[i], row, i);
+ }
+ }
+ }
+
+ public static void setField(PreparedStatement upload, int type, Row row, int index) throws SQLException {
+ Object field = row.getField(index);
+ if (field == null) {
+ upload.setNull(index + 1, type);
+ } else {
+ try {
+ // casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
+ switch (type) {
+ case java.sql.Types.NULL:
+ upload.setNull(index + 1, type);
+ break;
+ case java.sql.Types.BOOLEAN:
+ case java.sql.Types.BIT:
+ upload.setBoolean(index + 1, (boolean) field);
+ break;
+ case java.sql.Types.CHAR:
+ case java.sql.Types.NCHAR:
+ case java.sql.Types.VARCHAR:
+ case java.sql.Types.LONGVARCHAR:
+ case java.sql.Types.LONGNVARCHAR:
+ upload.setString(index + 1, (String) field);
+ break;
+ case java.sql.Types.TINYINT:
+ upload.setByte(index + 1, (byte) field);
+ break;
+ case java.sql.Types.SMALLINT:
+ upload.setShort(index + 1, (short) field);
+ break;
+ case java.sql.Types.INTEGER:
+ upload.setInt(index + 1, (int) field);
+ break;
+ case java.sql.Types.BIGINT:
+ upload.setLong(index + 1, (long) field);
+ break;
+ case java.sql.Types.REAL:
+ upload.setFloat(index + 1, (float) field);
+ break;
+ case java.sql.Types.FLOAT:
+ case java.sql.Types.DOUBLE:
+ upload.setDouble(index + 1, (double) field);
+ break;
+ case java.sql.Types.DECIMAL:
+ case java.sql.Types.NUMERIC:
+ upload.setBigDecimal(index + 1, (java.math.BigDecimal) field);
+ break;
+ case java.sql.Types.DATE:
+ upload.setDate(index + 1, (java.sql.Date) field);
+ break;
+ case java.sql.Types.TIME:
+ upload.setTime(index + 1, (java.sql.Time) field);
+ break;
+ case java.sql.Types.TIMESTAMP:
+ upload.setTimestamp(index + 1, (java.sql.Timestamp) field);
+ break;
+ case java.sql.Types.BINARY:
+ case java.sql.Types.VARBINARY:
+ case java.sql.Types.LONGVARBINARY:
+ upload.setBytes(index + 1, (byte[]) field);
+ break;
+ default:
+ upload.setObject(index + 1, field);
+ LOG.warn("Unmanaged sql type ({}) for column {}. Best effort approach to set its value: {}.",
+ type, index + 1, field);
+ // case java.sql.Types.SQLXML
+ // case java.sql.Types.ARRAY:
+ // case java.sql.Types.JAVA_OBJECT:
+ // case java.sql.Types.BLOB:
+ // case java.sql.Types.CLOB:
+ // case java.sql.Types.NCLOB:
+ // case java.sql.Types.DATALINK:
+ // case java.sql.Types.DISTINCT:
+ // case java.sql.Types.OTHER:
+ // case java.sql.Types.REF:
+ // case java.sql.Types.ROWID:
+ // case java.sql.Types.STRUC
+ }
+ } catch (ClassCastException e) {
+ // enrich the exception with detailed information.
+ String errorMessage = String.format(
+ "%s, field index: %s, field value: %s.", e.getMessage(), index, field);
+ ClassCastException enrichedException = new ClassCastException(errorMessage);
+ enrichedException.setStackTrace(e.getStackTrace());
+ throw enrichedException;
+ }
+ }
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialect.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialect.java
new file mode 100644
index 0000000..5a55a87
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialect.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.dialect;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Handle the SQL dialect of jdbc driver.
+ */
+public interface JDBCDialect extends Serializable {
+
+ /**
+ * Check if this dialect instance can handle a certain jdbc url.
+ * @param url the jdbc url.
+ * @return True if the dialect can be applied on the given jdbc url.
+ */
+ boolean canHandle(String url);
+
+ /**
+ * @return the default driver class name, if user not configure the driver class name,
+ * then will use this one.
+ */
+ default Optional<String> defaultDriverName() {
+ return Optional.empty();
+ }
+
+ /**
+ * Quotes the identifier. This is used to put quotes around the identifier in case the column
+ * name is a reserved keyword, or in case it contains characters that require quotes (e.g. space).
+ * Default using double quotes {@code "} to quote.
+ */
+ default String quoteIdentifier(String identifier) {
+ return "\"" + identifier + "\"";
+ }
+
+ /**
+ * Get dialect upsert statement, the database has its own upsert syntax, such as Mysql
+ * using DUPLICATE KEY UPDATE, and PostgresSQL using ON CONFLICT... DO UPDATE SET..
+ *
+ * @return None if dialect does not support upsert statement, the writer will degrade to
+ * the use of select + update/insert, this performance is poor.
+ */
+ default Optional<String> getUpsertStatement(
+ String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+ return Optional.empty();
+ }
+
+ /**
+ * Get row exists statement by condition fields. Default use SELECT.
+ */
+ default String getRowExistsStatement(String tableName, String[] conditionFields) {
+ String fieldExpressions = Arrays.stream(conditionFields)
+ .map(f -> quoteIdentifier(f) + "=?")
+ .collect(Collectors.joining(" AND "));
+ return "SELECT 1 FROM " + quoteIdentifier(tableName) + " WHERE " + fieldExpressions;
+ }
+
+ /**
+ * Get insert into statement.
+ */
+ default String getInsertIntoStatement(String tableName, String[] fieldNames) {
+ String columns = Arrays.stream(fieldNames)
+ .map(this::quoteIdentifier)
+ .collect(Collectors.joining(", "));
+ String placeholders = Arrays.stream(fieldNames)
+ .map(f -> "?")
+ .collect(Collectors.joining(", "));
+ return "INSERT INTO " + quoteIdentifier(tableName) +
+ "(" + columns + ")" + " VALUES (" + placeholders + ")";
+ }
+
+ /**
+ * Get update one row statement by condition fields, default not use limit 1,
+ * because limit 1 is a sql dialect.
+ */
+ default String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
+ String setClause = Arrays.stream(fieldNames)
+ .map(f -> quoteIdentifier(f) + "=?")
+ .collect(Collectors.joining(", "));
+ String conditionClause = Arrays.stream(conditionFields)
+ .map(f -> quoteIdentifier(f) + "=?")
+ .collect(Collectors.joining(" AND "));
+ return "UPDATE " + quoteIdentifier(tableName) +
+ " SET " + setClause +
+ " WHERE " + conditionClause;
+ }
+
+ /**
+ * Get delete one row statement by condition fields, default not use limit 1,
+ * because limit 1 is a sql dialect.
+ */
+ default String getDeleteStatement(String tableName, String[] conditionFields) {
+ String conditionClause = Arrays.stream(conditionFields)
+ .map(f -> quoteIdentifier(f) + "=?")
+ .collect(Collectors.joining(" AND "));
+ return "DELETE FROM " + quoteIdentifier(tableName) + " WHERE " + conditionClause;
+ }
+
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
new file mode 100644
index 0000000..12e70db
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.dialect;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Default Jdbc dialects.
+ */
+public final class JDBCDialects {
+
+ private static final List<JDBCDialect> DIALECTS = Arrays.asList(
+ new DerbyDialect(),
+ new MySQLDialect(),
+ new PostgresDialect()
+ );
+
+ /**
+ * Fetch the JDBCDialect class corresponding to a given database url.
+ */
+ public static Optional<JDBCDialect> get(String url) {
+ for (JDBCDialect dialect : DIALECTS) {
+ if (dialect.canHandle(url)) {
+ return Optional.of(dialect);
+ }
+ }
+ return Optional.empty();
+ }
+
+ private static class DerbyDialect implements JDBCDialect {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean canHandle(String url) {
+ return url.startsWith("jdbc:derby:");
+ }
+
+ @Override
+ public Optional<String> defaultDriverName() {
+ return Optional.of("org.apache.derby.jdbc.EmbeddedDriver");
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ return identifier;
+ }
+ }
+
+ private static class MySQLDialect implements JDBCDialect {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean canHandle(String url) {
+ return url.startsWith("jdbc:mysql:");
+ }
+
+ @Override
+ public Optional<String> defaultDriverName() {
+ return Optional.of("com.mysql.jdbc.Driver");
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ return "`" + identifier + "`";
+ }
+
+ /**
+ * Mysql upsert query use DUPLICATE KEY UPDATE.
+ *
+ * <p>NOTE: It requires Mysql's primary key to be consistent with pkFields.
+ *
+ * <p>We don't use REPLACE INTO, if there are other fields, we can keep their previous values.
+ */
+ @Override
+ public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+ String updateClause = Arrays.stream(fieldNames)
+ .map(f -> quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")")
+ .collect(Collectors.joining(", "));
+ return Optional.of(getInsertIntoStatement(tableName, fieldNames) +
+ " ON DUPLICATE KEY UPDATE " + updateClause
+ );
+ }
+ }
+
+ private static class PostgresDialect implements JDBCDialect {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean canHandle(String url) {
+ return url.startsWith("jdbc:postgresql:");
+ }
+
+ @Override
+ public Optional<String> defaultDriverName() {
+ return Optional.of("org.postgresql.Driver");
+ }
+
+ /**
+ * Postgres upsert query. It use ON CONFLICT ... DO UPDATE SET.. to replace into Postgres.
+ */
+ @Override
+ public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+ String uniqueColumns = Arrays.stream(uniqueKeyFields)
+ .map(this::quoteIdentifier)
+ .collect(Collectors.joining(", "));
+ String updateClause = Arrays.stream(fieldNames)
+ .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
+ .collect(Collectors.joining(", "));
+ return Optional.of(getInsertIntoStatement(tableName, fieldNames) +
+ " ON CONFLICT (" + uniqueColumns +
+ " DO UPDATE SET " + updateClause
+ );
+ }
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/writer/AppendOnlyWriter.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/writer/AppendOnlyWriter.java
new file mode 100644
index 0000000..899a22f
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/writer/AppendOnlyWriter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.writer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.Row;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import static org.apache.flink.api.java.io.jdbc.JDBCUtils.setRecordToStatement;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Just append record to jdbc, can not receive retract/delete message.
+ */
+public class AppendOnlyWriter implements JDBCWriter {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String insertSQL;
+ private final int[] fieldTypes;
+
+ private transient PreparedStatement statement;
+
+ public AppendOnlyWriter(String insertSQL, int[] fieldTypes) {
+ this.insertSQL = insertSQL;
+ this.fieldTypes = fieldTypes;
+ }
+
+ @Override
+ public void open(Connection connection) throws SQLException {
+ this.statement = connection.prepareStatement(insertSQL);
+ }
+
+ @Override
+ public void addRecord(Tuple2<Boolean, Row> record) throws SQLException {
+ checkArgument(record.f0, "Append mode can not receive retract/delete message.");
+ setRecordToStatement(statement, fieldTypes, record.f1);
+ statement.addBatch();
+ }
+
+ @Override
+ public void executeBatch() throws SQLException {
+ statement.executeBatch();
+ }
+
+ @Override
+ public void close() throws SQLException {
+ if (statement != null) {
+ statement.close();
+ statement = null;
+ }
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/writer/JDBCWriter.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/writer/JDBCWriter.java
new file mode 100644
index 0000000..8ce4759
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/writer/JDBCWriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.writer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/**
+ * JDBCWriter used to execute statements (e.g. INSERT, UPSERT, DELETE).
+ */
+public interface JDBCWriter extends Serializable {
+
+ /**
+ * Open the writer by JDBC Connection. It can create Statement from Connection.
+ */
+ void open(Connection connection) throws SQLException;
+
+ /**
+ * Add record to writer, the writer may cache the data.
+ */
+ void addRecord(Tuple2<Boolean, Row> record) throws SQLException;
+
+ /**
+ * Submits a batch of commands to the database for execution.
+ */
+ void executeBatch() throws SQLException;
+
+ /**
+ * Close JDBC related statements and other classes.
+ */
+ void close() throws SQLException;
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/writer/UpsertWriter.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/writer/UpsertWriter.java
new file mode 100644
index 0000000..dcc807e
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/writer/UpsertWriter.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.writer;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.Row;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+import static org.apache.flink.api.java.io.jdbc.JDBCUtils.setRecordToStatement;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Upsert writer to deal with upsert, delete message.
+ */
+public abstract class UpsertWriter implements JDBCWriter {
+
+ private static final long serialVersionUID = 1L;
+
+ public static UpsertWriter create(
+ JDBCDialect dialect,
+ String tableName,
+ String[] fieldNames,
+ int[] fieldTypes,
+ String[] keyFields,
+ boolean objectReuse) {
+
+ checkNotNull(keyFields);
+
+ List<String> nameList = Arrays.asList(fieldNames);
+ int[] pkFields = Arrays.stream(keyFields).mapToInt(nameList::indexOf).toArray();
+ int[] pkTypes = fieldTypes == null ? null :
+ Arrays.stream(pkFields).map(f -> fieldTypes[f]).toArray();
+ String deleteSQL = dialect.getDeleteStatement(tableName, keyFields);
+
+ Optional<String> upsertSQL = dialect.getUpsertStatement(tableName, fieldNames, keyFields);
+ return upsertSQL.map((Function<String, UpsertWriter>) sql ->
+ new UpsertWriterUsingUpsertStatement(
+ fieldTypes, pkFields, pkTypes, objectReuse, deleteSQL, sql))
+ .orElseGet(() ->
+ new UpsertWriterUsingInsertUpdateStatement(
+ fieldTypes, pkFields, pkTypes, objectReuse, deleteSQL,
+ dialect.getRowExistsStatement(tableName, keyFields),
+ dialect.getInsertIntoStatement(tableName, fieldNames),
+ dialect.getUpdateStatement(tableName, fieldNames, keyFields)));
+ }
+
+ final int[] fieldTypes;
+ final int[] pkTypes;
+ private final int[] pkFields;
+ private final String deleteSQL;
+ private final boolean objectReuse;
+
+ private transient Map<Row, Tuple2<Boolean, Row>> keyToRows;
+ private transient PreparedStatement deleteStatement;
+
+ private UpsertWriter(int[] fieldTypes, int[] pkFields, int[] pkTypes, String deleteSQL, boolean objectReuse) {
+ this.fieldTypes = fieldTypes;
+ this.pkFields = pkFields;
+ this.pkTypes = pkTypes;
+ this.deleteSQL = deleteSQL;
+ this.objectReuse = objectReuse;
+ }
+
+ @Override
+ public void open(Connection connection) throws SQLException {
+ this.keyToRows = new HashMap<>();
+ this.deleteStatement = connection.prepareStatement(deleteSQL);
+ }
+
+ public void addRecord(Tuple2<Boolean, Row> record) throws SQLException {
+ // we don't need perform a deep copy, because jdbc field are immutable object.
+ Tuple2<Boolean, Row> tuple2 = objectReuse ? new Tuple2<>(record.f0, Row.copy(record.f1)) : record;
+ // add records to buffer
+ keyToRows.put(getPrimaryKey(tuple2.f1), tuple2);
+ }
+
+ @Override
+ public void executeBatch() throws SQLException {
+ if (keyToRows.size() > 0) {
+ for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
+ Row pk = entry.getKey();
+ Tuple2<Boolean, Row> tuple = entry.getValue();
+ if (tuple.f0) {
+ processOneRowInBatch(pk, tuple.f1);
+ } else {
+ setRecordToStatement(deleteStatement, pkTypes, pk);
+ deleteStatement.addBatch();
+ }
+ }
+ internalExecuteBatch();
+ deleteStatement.executeBatch();
+ keyToRows.clear();
+ }
+ }
+
+ abstract void processOneRowInBatch(Row pk, Row row) throws SQLException;
+
+ abstract void internalExecuteBatch() throws SQLException;
+
+ @Override
+ public void close() throws SQLException {
+ if (deleteStatement != null) {
+ deleteStatement.close();
+ deleteStatement = null;
+ }
+ }
+
+ private Row getPrimaryKey(Row row) {
+ Row pks = new Row(pkFields.length);
+ for (int i = 0; i < pkFields.length; i++) {
+ pks.setField(i, row.getField(pkFields[i]));
+ }
+ return pks;
+ }
+
+ // ----------------------------------------------------------------------------------------
+
+ private static final class UpsertWriterUsingUpsertStatement extends UpsertWriter {
+
+ private static final long serialVersionUID = 1L;
+ private final String upsertSQL;
+
+ private transient PreparedStatement upsertStatement;
+
+ private UpsertWriterUsingUpsertStatement(
+ int[] fieldTypes,
+ int[] pkFields,
+ int[] pkTypes,
+ boolean objectReuse,
+ String deleteSQL,
+ String upsertSQL) {
+ super(fieldTypes, pkFields, pkTypes, deleteSQL, objectReuse);
+ this.upsertSQL = upsertSQL;
+ }
+
+ @Override
+ public void open(Connection connection) throws SQLException {
+ super.open(connection);
+ upsertStatement = connection.prepareStatement(upsertSQL);
+ }
+
+ @Override
+ void processOneRowInBatch(Row pk, Row row) throws SQLException {
+ setRecordToStatement(upsertStatement, fieldTypes, row);
+ upsertStatement.addBatch();
+ }
+
+ @Override
+ void internalExecuteBatch() throws SQLException {
+ upsertStatement.executeBatch();
+ }
+
+ @Override
+ public void close() throws SQLException {
+ super.close();
+ if (upsertStatement != null) {
+ upsertStatement.close();
+ upsertStatement = null;
+ }
+ }
+ }
+
+ private static final class UpsertWriterUsingInsertUpdateStatement extends UpsertWriter {
+
+ private static final long serialVersionUID = 1L;
+ private final String existSQL;
+ private final String insertSQL;
+ private final String updateSQL;
+
+ private transient PreparedStatement existStatement;
+ private transient PreparedStatement insertStatement;
+ private transient PreparedStatement updateStatement;
+
+ private UpsertWriterUsingInsertUpdateStatement(
+ int[] fieldTypes,
+ int[] pkFields,
+ int[] pkTypes,
+ boolean objectReuse,
+ String deleteSQL,
+ String existSQL,
+ String insertSQL,
+ String updateSQL) {
+ super(fieldTypes, pkFields, pkTypes, deleteSQL, objectReuse);
+ this.existSQL = existSQL;
+ this.insertSQL = insertSQL;
+ this.updateSQL = updateSQL;
+ }
+
+ @Override
+ public void open(Connection connection) throws SQLException {
+ super.open(connection);
+ existStatement = connection.prepareStatement(existSQL);
+ insertStatement = connection.prepareStatement(insertSQL);
+ updateStatement = connection.prepareStatement(updateSQL);
+ }
+
+ @Override
+ void processOneRowInBatch(Row pk, Row row) throws SQLException {
+ setRecordToStatement(existStatement, pkTypes, pk);
+ ResultSet resultSet = existStatement.executeQuery();
+ boolean exist = resultSet.next();
+ resultSet.close();
+ if (exist) {
+ // do update
+ setRecordToStatement(updateStatement, fieldTypes, row);
+ updateStatement.addBatch();
+ } else {
+ // do insert
+ setRecordToStatement(insertStatement, fieldTypes, row);
+ insertStatement.addBatch();
+ }
+ }
+
+ @Override
+ void internalExecuteBatch() throws SQLException {
+ updateStatement.executeBatch();
+ insertStatement.executeBatch();
+ }
+
+ @Override
+ public void close() throws SQLException {
+ super.close();
+ if (existStatement != null) {
+ existStatement.close();
+ existStatement = null;
+ }
+ if (insertStatement != null) {
+ insertStatement.close();
+ insertStatement = null;
+ }
+ if (updateStatement != null) {
+ updateStatement.close();
+ updateStatement = null;
+ }
+ }
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
index 8582387..92d5d5a 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -246,7 +246,7 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
}
}
- private static Row toRow(TestEntry entry) {
+ static Row toRow(TestEntry entry) {
Row row = new Row(5);
row.setField(0, entry.id);
row.setField(1, entry.title);
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormatTest.java
new file mode 100644
index 0000000..225ab1a
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormatTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.Row;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormatTest.toRow;
+import static org.junit.Assert.assertArrayEquals;
+import static org.mockito.Mockito.doReturn;
+
+/**
+ * Tests for the {@link JDBCUpsertOutputFormat}.
+ */
+public class JDBCUpsertOutputFormatTest extends JDBCTestBase {
+
+ private JDBCUpsertOutputFormat format;
+ private String[] fieldNames;
+ private String[] keyFields;
+
+ @Before
+ public void setup() {
+ fieldNames = new String[]{"id", "title", "author", "price", "qty"};
+ keyFields = new String[]{"id"};
+ }
+
+ @Test
+ public void testJDBCOutputFormat() throws Exception {
+ format = JDBCUpsertOutputFormat.builder()
+ .setOptions(JDBCOptions.builder()
+ .setDBUrl(DB_URL)
+ .setTableName(OUTPUT_TABLE)
+ .build())
+ .setFieldNames(fieldNames)
+ .setKeyFields(keyFields)
+ .build();
+ RuntimeContext context = Mockito.mock(RuntimeContext.class);
+ ExecutionConfig config = Mockito.mock(ExecutionConfig.class);
+ doReturn(config).when(context).getExecutionConfig();
+ doReturn(true).when(config).isObjectReuseEnabled();
+ format.setRuntimeContext(context);
+ format.open(0, 1);
+
+ for (TestEntry entry : TEST_DATA) {
+ format.writeRecord(Tuple2.of(true, toRow(entry)));
+ }
+ format.flush();
+ check(Arrays.stream(TEST_DATA).map(JDBCOutputFormatTest::toRow).toArray(Row[]::new));
+
+ // override
+ for (TestEntry entry : TEST_DATA) {
+ format.writeRecord(Tuple2.of(true, toRow(entry)));
+ }
+ format.flush();
+ check(Arrays.stream(TEST_DATA).map(JDBCOutputFormatTest::toRow).toArray(Row[]::new));
+
+ // delete
+ for (int i = 0; i < TEST_DATA.length / 2; i++) {
+ format.writeRecord(Tuple2.of(false, toRow(TEST_DATA[i])));
+ }
+ Row[] expected = new Row[TEST_DATA.length - TEST_DATA.length / 2];
+ for (int i = TEST_DATA.length / 2; i < TEST_DATA.length; i++) {
+ expected[i - TEST_DATA.length / 2] = toRow(TEST_DATA[i]);
+ }
+ format.flush();
+ check(expected);
+ }
+
+ private void check(Row[] rows) throws SQLException {
+ check(rows, DB_URL, OUTPUT_TABLE, fieldNames);
+ }
+
+ static void check(Row[] rows, String url, String table, String[] fields) throws SQLException {
+ try (
+ Connection dbConn = DriverManager.getConnection(url);
+ PreparedStatement statement = dbConn.prepareStatement("select * from " + table);
+ ResultSet resultSet = statement.executeQuery()
+ ) {
+ List<String> results = new ArrayList<>();
+ while (resultSet.next()) {
+ Row row = new Row(fields.length);
+ for (int i = 0; i < fields.length; i++) {
+ row.setField(i, resultSet.getObject(fields[i]));
+ }
+ results.add(row.toString());
+ }
+ String[] sortedExpect = Arrays.stream(rows).map(Row::toString).toArray(String[]::new);
+ String[] sortedResult = results.toArray(new String[0]);
+ Arrays.sort(sortedExpect);
+ Arrays.sort(sortedResult);
+ assertArrayEquals(sortedExpect, sortedResult);
+ }
+ }
+
+ @After
+ public void clearOutputTable() throws Exception {
+ if (format != null) {
+ format.close();
+ }
+ format = null;
+ Class.forName(DRIVER_CLASS);
+ try (
+ Connection conn = DriverManager.getConnection(DB_URL);
+ Statement stat = conn.createStatement()) {
+ stat.execute("DELETE FROM " + OUTPUT_TABLE);
+
+ stat.close();
+ conn.close();
+ }
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java
new file mode 100644
index 0000000..f4f0e25
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.api.java.io.jdbc.JDBCTestBase.DRIVER_CLASS;
+import static org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormatTest.check;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.INT;
+
+/**
+ * IT case for {@link JDBCUpsertTableSink}.
+ */
+public class JDBCUpsertTableSinkITCase extends AbstractTestBase {
+
+ public static final String DB_URL = "jdbc:derby:memory:upsert";
+ public static final String OUTPUT_TABLE1 = "upsertSink";
+ public static final String OUTPUT_TABLE2 = "appendSink";
+
+ @Before
+ public void before() throws ClassNotFoundException, SQLException {
+ System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
+
+ Class.forName(DRIVER_CLASS);
+ try (
+ Connection conn = DriverManager.getConnection(DB_URL + ";create=true");
+ Statement stat = conn.createStatement()) {
+ stat.executeUpdate("CREATE TABLE " + OUTPUT_TABLE1 + " (" +
+ "cnt BIGINT NOT NULL DEFAULT 0," +
+ "lencnt BIGINT NOT NULL DEFAULT 0," +
+ "cTag INT NOT NULL DEFAULT 0," +
+ "PRIMARY KEY (cnt, cTag))");
+
+ stat.executeUpdate("CREATE TABLE " + OUTPUT_TABLE2 + " (" +
+ "id INT NOT NULL DEFAULT 0," +
+ "num BIGINT NOT NULL DEFAULT 0)");
+ }
+ }
+
+ @After
+ public void clearOutputTable() throws Exception {
+ Class.forName(DRIVER_CLASS);
+ try (
+ Connection conn = DriverManager.getConnection(DB_URL);
+ Statement stat = conn.createStatement()) {
+ stat.execute("DROP TABLE " + OUTPUT_TABLE1);
+ stat.execute("DROP TABLE " + OUTPUT_TABLE2);
+ }
+ }
+
+ public static DataStream<Tuple3<Integer, Long, String>> get3TupleDataStream(StreamExecutionEnvironment env) {
+ List<Tuple3<Integer, Long, String>> data = new ArrayList<>();
+ data.add(new Tuple3<>(1, 1L, "Hi"));
+ data.add(new Tuple3<>(2, 2L, "Hello"));
+ data.add(new Tuple3<>(3, 2L, "Hello world"));
+ data.add(new Tuple3<>(4, 3L, "Hello world, how are you?"));
+ data.add(new Tuple3<>(5, 3L, "I am fine."));
+ data.add(new Tuple3<>(6, 3L, "Luke Skywalker"));
+ data.add(new Tuple3<>(7, 4L, "Comment#1"));
+ data.add(new Tuple3<>(8, 4L, "Comment#2"));
+ data.add(new Tuple3<>(9, 4L, "Comment#3"));
+ data.add(new Tuple3<>(10, 4L, "Comment#4"));
+ data.add(new Tuple3<>(11, 5L, "Comment#5"));
+ data.add(new Tuple3<>(12, 5L, "Comment#6"));
+ data.add(new Tuple3<>(13, 5L, "Comment#7"));
+ data.add(new Tuple3<>(14, 5L, "Comment#8"));
+ data.add(new Tuple3<>(15, 5L, "Comment#9"));
+ data.add(new Tuple3<>(16, 6L, "Comment#10"));
+ data.add(new Tuple3<>(17, 6L, "Comment#11"));
+ data.add(new Tuple3<>(18, 6L, "Comment#12"));
+ data.add(new Tuple3<>(19, 6L, "Comment#13"));
+ data.add(new Tuple3<>(20, 6L, "Comment#14"));
+ data.add(new Tuple3<>(21, 6L, "Comment#15"));
+
+ Collections.shuffle(data);
+ return env.fromCollection(data);
+ }
+
+ @Test
+ public void testUpsert() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ Table t = tEnv.fromDataStream(get3TupleDataStream(env).assignTimestampsAndWatermarks(
+ new AscendingTimestampExtractor<Tuple3<Integer, Long, String>>() {
+ @Override
+ public long extractAscendingTimestamp(Tuple3<Integer, Long, String> element) {
+ return element.f0;
+ }}), "id, num, text");
+
+ tEnv.registerTable("T", t);
+
+ String[] fields = {"cnt", "lencnt", "cTag"};
+ tEnv.registerTableSink("upsertSink", JDBCUpsertTableSink.builder()
+ .setOptions(JDBCOptions.builder()
+ .setDBUrl(DB_URL)
+ .setTableName(OUTPUT_TABLE1)
+ .build())
+ .setTableSchema(TableSchema.builder().fields(
+ fields, new DataType[] {BIGINT(), BIGINT(), INT()}).build())
+ .build());
+
+ tEnv.sqlUpdate("INSERT INTO upsertSink SELECT cnt, COUNT(len) AS lencnt, cTag FROM" +
+ " (SELECT len, COUNT(id) as cnt, cTag FROM" +
+ " (SELECT id, CHAR_LENGTH(text) AS len, (CASE WHEN id > 0 THEN 1 ELSE 0 END) cTag FROM T)" +
+ " GROUP BY len, cTag)" +
+ " GROUP BY cnt, cTag");
+ env.execute();
+ check(new Row[] {
+ Row.of(1, 5, 1),
+ Row.of(7, 1, 1),
+ Row.of(9, 1, 1)
+ }, DB_URL, OUTPUT_TABLE1, fields);
+ }
+
+ @Test
+ public void testAppend() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
+ env.getConfig().setParallelism(1);
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ Table t = tEnv.fromDataStream(get3TupleDataStream(env), "id, num, text");
+
+ tEnv.registerTable("T", t);
+
+ String[] fields = {"id", "num"};
+ tEnv.registerTableSink("upsertSink", JDBCUpsertTableSink.builder()
+ .setOptions(JDBCOptions.builder()
+ .setDBUrl(DB_URL)
+ .setTableName(OUTPUT_TABLE2)
+ .build())
+ .setTableSchema(TableSchema.builder().fields(
+ fields, new DataType[] {INT(), BIGINT()}).build())
+ .build());
+
+ tEnv.sqlUpdate("INSERT INTO upsertSink SELECT id, num FROM T WHERE id IN (2, 10, 20)");
+ env.execute();
+ check(new Row[] {
+ Row.of(2, 2),
+ Row.of(10, 4),
+ Row.of(20, 6)
+ }, DB_URL, OUTPUT_TABLE2, fields);
+ }
+}