You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/03/10 16:17:02 UTC

[flink] 04/10: [FLINK-15782][connectors/jdbc] generalize jdbc statement executors (from Row to ) Motivation: use existing code oriented to Table API in DataStream API

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 427eb55d5048dc40a6dfc7713e6d5d42881da873
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu Jan 23 18:22:08 2020 +0100

    [FLINK-15782][connectors/jdbc] generalize jdbc statement executors (from Row to <T>)
    Motivation: use existing code oriented to Table API in DataStream API
---
 .../apache/flink/api/java/io/jdbc/JDBCUtils.java   |  8 +++
 .../api/java/io/jdbc/TableJdbcOutputFormat.java    |  5 +-
 .../jdbc/executor/InsertOrUpdateJdbcExecutor.java  | 65 +++++++++++-----------
 .../jdbc/executor/JdbcBatchStatementExecutor.java  | 38 ++++++++++---
 .../jdbc/executor/KeyedBatchStatementExecutor.java | 60 +++++++-------------
 .../api/java/io/jdbc/executor/ParameterSetter.java | 45 +++++++++++++++
 .../executor/SimpleBatchStatementExecutor.java     | 27 ++++-----
 7 files changed, 150 insertions(+), 98 deletions(-)

diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java
index d50b82e..14902da 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java
@@ -231,4 +231,12 @@ public class JDBCUtils {
 			return ret;
 		}
 	}
+
+	public static Row getPrimaryKey(Row row, int[] pkFields) {
+		Row pkRow = new Row(pkFields.length);
+		for (int i = 0; i < pkFields.length; i++) {
+			pkRow.setField(i, row.getField(pkFields[i]));
+		}
+		return pkRow;
+	}
 }
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/TableJdbcOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/TableJdbcOutputFormat.java
index 305cf20..a9a5625 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/TableJdbcOutputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/TableJdbcOutputFormat.java
@@ -54,13 +54,12 @@ class TableJdbcOutputFormat extends JdbcBatchingOutputFormat<Tuple2<Boolean, Row
 		int[] pkTypes = dmlOptions.getFieldTypes() == null ? null :
 				Arrays.stream(pkFields).map(f -> dmlOptions.getFieldTypes()[f]).toArray();
 		String deleteSql = dmlOptions.getDialect().getDeleteStatement(dmlOptions.getTableName(), dmlOptions.getFieldNames());
-		boolean objectReuseEnabled = getRuntimeContext().getExecutionConfig().isObjectReuseEnabled();
-		return JdbcBatchStatementExecutor.keyed(pkFields, pkTypes, deleteSql, objectReuseEnabled);
+		return JdbcBatchStatementExecutor.keyedRow(pkFields, pkTypes, deleteSql);
 	}
 
 	@Override
 	JdbcBatchStatementExecutor<Row> createStatementRunner(JDBCDialect dialect) {
-		return JdbcBatchStatementExecutor.upsert(
+		return JdbcBatchStatementExecutor.upsertRow(
 				dialect, dmlOptions.getTableName(), dmlOptions.getFieldNames(), dmlOptions.getFieldTypes(), dmlOptions.getKeyFields(),
 				getRuntimeContext().getExecutionConfig().isObjectReuseEnabled());
 	}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/InsertOrUpdateJdbcExecutor.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/InsertOrUpdateJdbcExecutor.java
index ba37707..0a5d873 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/InsertOrUpdateJdbcExecutor.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/InsertOrUpdateJdbcExecutor.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.api.java.io.jdbc.executor;
 
-import org.apache.flink.types.Row;
-
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -26,76 +24,81 @@ import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Function;
 
-import static org.apache.flink.api.java.io.jdbc.JDBCUtils.setRecordToStatement;
-
-final class InsertOrUpdateJdbcExecutor implements JdbcBatchStatementExecutor<Row> {
+final class InsertOrUpdateJdbcExecutor<R, K, V> implements JdbcBatchStatementExecutor<R> {
 
 	private final String existSQL;
 	private final String insertSQL;
 	private final String updateSQL;
-	private final int[] pkTypes;
-	private final int[] pkFields;
-	private final boolean objectReuse;
+
+	private final ParameterSetter<K> existSetter;
+	private final ParameterSetter<V> insertSetter;
+	private final ParameterSetter<V> updateSetter;
+
+	private final Function<R, K> keyExtractor;
+	private final Function<R, V> valueMapper;
 
 	private transient PreparedStatement existStatement;
 	private transient PreparedStatement insertStatement;
 	private transient PreparedStatement updateStatement;
-	private transient Map<Row, Row> keyToRows = new HashMap<>();
-	private final int[] fieldTypes;
+	private transient Map<K, V> batch = new HashMap<>();
 
-	InsertOrUpdateJdbcExecutor(
-			int[] fieldTypes,
-			int[] pkFields, int[] pkTypes,
-			String existSQL,
-			String insertSQL,
-			String updateSQL, boolean objectReuse) {
-		this.pkFields = pkFields;
+	InsertOrUpdateJdbcExecutor(String existSQL,
+								String insertSQL,
+								String updateSQL,
+								ParameterSetter<K> existSetter,
+								ParameterSetter<V> insertSetter,
+								ParameterSetter<V> updateSetter,
+								Function<R, K> keyExtractor,
+								Function<R, V> valueExtractor) {
 		this.existSQL = existSQL;
 		this.insertSQL = insertSQL;
 		this.updateSQL = updateSQL;
-		this.fieldTypes = fieldTypes;
-		this.pkTypes = pkTypes;
-		this.objectReuse = objectReuse;
+		this.existSetter = existSetter;
+		this.insertSetter = insertSetter;
+		this.updateSetter = updateSetter;
+		this.keyExtractor = keyExtractor;
+		this.valueMapper = valueExtractor;
 	}
 
 	@Override
 	public void open(Connection connection) throws SQLException {
-		keyToRows = new HashMap<>();
+		batch = new HashMap<>();
 		existStatement = connection.prepareStatement(existSQL);
 		insertStatement = connection.prepareStatement(insertSQL);
 		updateStatement = connection.prepareStatement(updateSQL);
 	}
 
 	@Override
-	public void process(Row record) {
-		keyToRows.put(KeyedBatchStatementExecutor.getPrimaryKey(record, pkFields), objectReuse ? Row.copy(record) : record);
+	public void process(R record) {
+		batch.put(keyExtractor.apply(record), valueMapper.apply(record));
 	}
 
 	@Override
 	public void executeBatch() throws SQLException {
-		if (keyToRows.size() > 0) {
-			for (Map.Entry<Row, Row> entry : keyToRows.entrySet()) {
+		if (!batch.isEmpty()) {
+			for (Map.Entry<K, V> entry : batch.entrySet()) {
 				processOneRowInBatch(entry.getKey(), entry.getValue());
 			}
 			updateStatement.executeBatch();
 			insertStatement.executeBatch();
-			keyToRows.clear();
+			batch.clear();
 		}
 	}
 
-	private void processOneRowInBatch(Row pk, Row row) throws SQLException {
+	private void processOneRowInBatch(K pk, V row) throws SQLException {
 		if (exist(pk)) {
-			setRecordToStatement(updateStatement, fieldTypes, row);
+			updateSetter.accept(updateStatement, row);
 			updateStatement.addBatch();
 		} else {
-			setRecordToStatement(insertStatement, fieldTypes, row);
+			insertSetter.accept(insertStatement, row);
 			insertStatement.addBatch();
 		}
 	}
 
-	private boolean exist(Row pk) throws SQLException {
-		setRecordToStatement(existStatement, pkTypes, pk);
+	private boolean exist(K pk) throws SQLException {
+		existSetter.accept(existStatement, pk);
 		try (ResultSet resultSet = existStatement.executeQuery()) {
 			return resultSet.next();
 		}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/JdbcBatchStatementExecutor.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/JdbcBatchStatementExecutor.java
index f231a739..58632f6 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/JdbcBatchStatementExecutor.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/JdbcBatchStatementExecutor.java
@@ -25,7 +25,10 @@ import org.apache.flink.types.Row;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.Arrays;
+import java.util.function.Function;
 
+import static org.apache.flink.api.java.io.jdbc.JDBCUtils.getPrimaryKey;
+import static org.apache.flink.api.java.io.jdbc.JDBCUtils.setRecordToStatement;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -51,7 +54,7 @@ public interface JdbcBatchStatementExecutor<T> {
 	 */
 	void close() throws SQLException;
 
-	static JdbcBatchStatementExecutor<Row> upsert(
+	static JdbcBatchStatementExecutor<Row> upsertRow(
 			JDBCDialect dialect,
 			String tableName,
 			String[] fieldNames,
@@ -66,22 +69,39 @@ public interface JdbcBatchStatementExecutor<T> {
 
 		return dialect
 				.getUpsertStatement(tableName, fieldNames, keyFields)
-				.map(sql -> keyed(pkFields, pkTypes, sql, objectReuse))
+				.map(sql -> keyedRow(pkFields, fieldTypes, sql))
 				.orElseGet(() ->
-						new InsertOrUpdateJdbcExecutor(
-								fieldTypes, pkFields, pkTypes,
+						new InsertOrUpdateJdbcExecutor<>(
 								dialect.getRowExistsStatement(tableName, keyFields),
 								dialect.getInsertIntoStatement(tableName, fieldNames),
 								dialect.getUpdateStatement(tableName, fieldNames, keyFields),
-								objectReuse));
+								ParameterSetter.forRow(pkTypes),
+								ParameterSetter.forRow(fieldTypes),
+								ParameterSetter.forRow(fieldTypes),
+								rowKeyExtractor(pkFields),
+								objectReuse ? Row::copy : Function.identity()));
 	}
 
-	static JdbcBatchStatementExecutor<Row> keyed(int[] pkFields, int[] pkTypes, String sql, boolean objectReuse) {
-		return new KeyedBatchStatementExecutor(pkFields, pkTypes, sql, objectReuse);
+	static Function<Row, Row> rowKeyExtractor(int[] pkFields) {
+		return row -> getPrimaryKey(row, pkFields);
 	}
 
-	static JdbcBatchStatementExecutor<Row> simple(String sql, int[] fieldTypes, boolean objectReuse) {
-		return new SimpleBatchStatementExecutor(sql, fieldTypes, objectReuse);
+	static JdbcBatchStatementExecutor<Row> keyedRow(int[] pkFields, int[] pkTypes, String sql) {
+		return keyed(sql,
+				rowKeyExtractor(pkFields),
+				(st, record) -> setRecordToStatement(st, pkTypes, rowKeyExtractor(pkFields).apply(record)));
+	}
+
+	static <T, K> JdbcBatchStatementExecutor<T> keyed(String sql, Function<T, K> keyExtractor, ParameterSetter<K> parameterSetter) {
+		return new KeyedBatchStatementExecutor<>(sql, keyExtractor, parameterSetter);
+	}
+
+	static JdbcBatchStatementExecutor<Row> simpleRow(String sql, int[] fieldTypes, boolean objectReuse) {
+		return simple(sql, ParameterSetter.forRow(fieldTypes), objectReuse ? Row::copy : Function.identity());
+	}
+
+	static <T, V> JdbcBatchStatementExecutor<T> simple(String sql, ParameterSetter<V> paramSetter, Function<T, V> valueTransformer) {
+		return new SimpleBatchStatementExecutor<>(sql, paramSetter, valueTransformer);
 	}
 
 }
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/KeyedBatchStatementExecutor.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/KeyedBatchStatementExecutor.java
index 70ab679..1da9f45 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/KeyedBatchStatementExecutor.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/KeyedBatchStatementExecutor.java
@@ -18,59 +18,51 @@
 
 package org.apache.flink.api.java.io.jdbc.executor;
 
-import org.apache.flink.types.Row;
-
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.flink.api.java.io.jdbc.JDBCUtils.setRecordToStatement;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Function;
 
-/**
- * Upsert writer to deal with upsert, delete message.
- */
-class KeyedBatchStatementExecutor implements JdbcBatchStatementExecutor<Row> {
+class KeyedBatchStatementExecutor<T, K> implements JdbcBatchStatementExecutor<T> {
 
-	private final int[] pkTypes;
-	private final int[] pkFields;
 	private final String sql;
-	private final boolean objectReuse;
+	private final ParameterSetter<K> parameterSetter;
+	private final Function<T, K> keyExtractor;
 
-	private transient Map<Row, Row> keyToRows = new HashMap<>();
+	private transient Set<K> batch = new HashSet<>();
 	private transient PreparedStatement st;
 
-	KeyedBatchStatementExecutor(int[] pkFields, int[] pkTypes, String sql, boolean objectReuse) {
-		this.pkFields = pkFields;
-		this.pkTypes = pkTypes;
+	/**
+	 * Keep in mind object reuse: if it's on then key extractor may be required to return new object.
+	 */
+	KeyedBatchStatementExecutor(String sql, Function<T, K> keyExtractor, ParameterSetter<K> parameterSetter) {
+		this.parameterSetter = parameterSetter;
+		this.keyExtractor = keyExtractor;
 		this.sql = sql;
-		this.objectReuse = objectReuse;
 	}
 
 	@Override
 	public void open(Connection connection) throws SQLException {
-		keyToRows = new HashMap<>();
+		batch = new HashSet<>();
 		st = connection.prepareStatement(sql);
 	}
 
 	@Override
-	public void process(Row record) {
-		// we don't need perform a deep copy, because jdbc field are immutable object.
-		Row row = objectReuse ? Row.copy(record) : record;
-		// add records to buffer
-		keyToRows.put(getPrimaryKey(row), row);
+	public void process(T record) {
+		batch.add(keyExtractor.apply(record));
 	}
 
 	@Override
 	public void executeBatch() throws SQLException {
-		if (keyToRows.size() > 0) {
-			for (Map.Entry<Row, Row> entry : keyToRows.entrySet()) {
-				setRecordToStatement(st, pkTypes, entry.getKey());
+		if (!batch.isEmpty()) {
+			for (K entry : batch) {
+				parameterSetter.accept(st, entry);
 				st.addBatch();
 			}
 			st.executeBatch();
-			keyToRows.clear();
+			batch.clear();
 		}
 	}
 
@@ -82,16 +74,4 @@ class KeyedBatchStatementExecutor implements JdbcBatchStatementExecutor<Row> {
 		}
 	}
 
-	private Row getPrimaryKey(Row row) {
-		int[] pkFields = this.pkFields;
-		return getPrimaryKey(row, pkFields);
-	}
-
-	static Row getPrimaryKey(Row row, int[] pkFields) {
-		Row pkRow = new Row(pkFields.length);
-		for (int i = 0; i < pkFields.length; i++) {
-			pkRow.setField(i, row.getField(pkFields[i]));
-		}
-		return pkRow;
-	}
 }
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/ParameterSetter.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/ParameterSetter.java
new file mode 100644
index 0000000..bb6b6f5
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/ParameterSetter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.executor;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import static org.apache.flink.api.java.io.jdbc.JDBCUtils.setRecordToStatement;
+
+/**
+ * Sets {@link PreparedStatement} parameters to use in JDBC Sink based on a specific type of StreamRecord.
+ * @param <T> type of payload in {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord StreamRecord}
+ * @see JdbcBatchStatementExecutor
+ */
+@Internal
+public interface ParameterSetter<T> extends BiConsumerWithException<PreparedStatement, T, SQLException>, Serializable {
+
+	/**
+	 * Creates a {@link ParameterSetter} for {@link Row} using the provided SQL types array.
+	 * Uses {@link org.apache.flink.api.java.io.jdbc.JDBCUtils#setRecordToStatement}
+	 */
+	static ParameterSetter<Row> forRow(int[] types) {
+		return (st, record) -> setRecordToStatement(st, types, record);
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/SimpleBatchStatementExecutor.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/SimpleBatchStatementExecutor.java
index 16325ec..ff92b9f 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/SimpleBatchStatementExecutor.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/SimpleBatchStatementExecutor.java
@@ -18,29 +18,26 @@
 
 package org.apache.flink.api.java.io.jdbc.executor;
 
-import org.apache.flink.types.Row;
-
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Function;
 
-import static org.apache.flink.api.java.io.jdbc.JDBCUtils.setRecordToStatement;
-
-class SimpleBatchStatementExecutor implements JdbcBatchStatementExecutor<Row> {
+class SimpleBatchStatementExecutor<T, V> implements JdbcBatchStatementExecutor<T> {
 
 	private final String sql;
-	private final int[] paramTypes;
+	private final ParameterSetter<V> parameterSetter;
+	private final Function<T, V> valueTransformer;
 
 	private transient PreparedStatement st;
-	private List<Row> batch;
-	private final boolean objectReuse;
+	private transient List<V> batch;
 
-	SimpleBatchStatementExecutor(String sql, int[] paramTypes, boolean objectReuse) {
+	SimpleBatchStatementExecutor(String sql, ParameterSetter<V> parameterSetter, Function<T, V> valueTransformer) {
 		this.sql = sql;
-		this.paramTypes = paramTypes;
-		this.objectReuse = objectReuse;
+		this.parameterSetter = parameterSetter;
+		this.valueTransformer = valueTransformer;
 	}
 
 	@Override
@@ -50,15 +47,15 @@ class SimpleBatchStatementExecutor implements JdbcBatchStatementExecutor<Row> {
 	}
 
 	@Override
-	public void process(Row record) {
-		batch.add(objectReuse ? Row.copy(record) : record);
+	public void process(T record) {
+		batch.add(valueTransformer.apply(record));
 	}
 
 	@Override
 	public void executeBatch() throws SQLException {
 		if (!batch.isEmpty()) {
-			for (Row r : batch) {
-				setRecordToStatement(st, paramTypes, r);
+			for (V r : batch) {
+				parameterSetter.accept(st, r);
 				st.addBatch();
 			}
 			st.executeBatch();