You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/03/10 16:17:02 UTC
[flink] 04/10: [FLINK-15782][connectors/jdbc] generalize jdbc
statement executors (from Row to ) Motivation: use existing code
oriented to Table API in DataStream API
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 427eb55d5048dc40a6dfc7713e6d5d42881da873
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu Jan 23 18:22:08 2020 +0100
[FLINK-15782][connectors/jdbc] generalize jdbc statement executors (from Row to <T>)
Motivation: use existing code oriented to Table API in DataStream API
---
.../apache/flink/api/java/io/jdbc/JDBCUtils.java | 8 +++
.../api/java/io/jdbc/TableJdbcOutputFormat.java | 5 +-
.../jdbc/executor/InsertOrUpdateJdbcExecutor.java | 65 +++++++++++-----------
.../jdbc/executor/JdbcBatchStatementExecutor.java | 38 ++++++++++---
.../jdbc/executor/KeyedBatchStatementExecutor.java | 60 +++++++-------------
.../api/java/io/jdbc/executor/ParameterSetter.java | 45 +++++++++++++++
.../executor/SimpleBatchStatementExecutor.java | 27 ++++-----
7 files changed, 150 insertions(+), 98 deletions(-)
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java
index d50b82e..14902da 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java
@@ -231,4 +231,12 @@ public class JDBCUtils {
return ret;
}
}
+
+ public static Row getPrimaryKey(Row row, int[] pkFields) {
+ Row pkRow = new Row(pkFields.length);
+ for (int i = 0; i < pkFields.length; i++) {
+ pkRow.setField(i, row.getField(pkFields[i]));
+ }
+ return pkRow;
+ }
}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/TableJdbcOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/TableJdbcOutputFormat.java
index 305cf20..a9a5625 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/TableJdbcOutputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/TableJdbcOutputFormat.java
@@ -54,13 +54,12 @@ class TableJdbcOutputFormat extends JdbcBatchingOutputFormat<Tuple2<Boolean, Row
int[] pkTypes = dmlOptions.getFieldTypes() == null ? null :
Arrays.stream(pkFields).map(f -> dmlOptions.getFieldTypes()[f]).toArray();
String deleteSql = dmlOptions.getDialect().getDeleteStatement(dmlOptions.getTableName(), dmlOptions.getFieldNames());
- boolean objectReuseEnabled = getRuntimeContext().getExecutionConfig().isObjectReuseEnabled();
- return JdbcBatchStatementExecutor.keyed(pkFields, pkTypes, deleteSql, objectReuseEnabled);
+ return JdbcBatchStatementExecutor.keyedRow(pkFields, pkTypes, deleteSql);
}
@Override
JdbcBatchStatementExecutor<Row> createStatementRunner(JDBCDialect dialect) {
- return JdbcBatchStatementExecutor.upsert(
+ return JdbcBatchStatementExecutor.upsertRow(
dialect, dmlOptions.getTableName(), dmlOptions.getFieldNames(), dmlOptions.getFieldTypes(), dmlOptions.getKeyFields(),
getRuntimeContext().getExecutionConfig().isObjectReuseEnabled());
}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/InsertOrUpdateJdbcExecutor.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/InsertOrUpdateJdbcExecutor.java
index ba37707..0a5d873 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/InsertOrUpdateJdbcExecutor.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/InsertOrUpdateJdbcExecutor.java
@@ -17,8 +17,6 @@
package org.apache.flink.api.java.io.jdbc.executor;
-import org.apache.flink.types.Row;
-
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -26,76 +24,81 @@ import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import java.util.function.Function;
-import static org.apache.flink.api.java.io.jdbc.JDBCUtils.setRecordToStatement;
-
-final class InsertOrUpdateJdbcExecutor implements JdbcBatchStatementExecutor<Row> {
+final class InsertOrUpdateJdbcExecutor<R, K, V> implements JdbcBatchStatementExecutor<R> {
private final String existSQL;
private final String insertSQL;
private final String updateSQL;
- private final int[] pkTypes;
- private final int[] pkFields;
- private final boolean objectReuse;
+
+ private final ParameterSetter<K> existSetter;
+ private final ParameterSetter<V> insertSetter;
+ private final ParameterSetter<V> updateSetter;
+
+ private final Function<R, K> keyExtractor;
+ private final Function<R, V> valueMapper;
private transient PreparedStatement existStatement;
private transient PreparedStatement insertStatement;
private transient PreparedStatement updateStatement;
- private transient Map<Row, Row> keyToRows = new HashMap<>();
- private final int[] fieldTypes;
+ private transient Map<K, V> batch = new HashMap<>();
- InsertOrUpdateJdbcExecutor(
- int[] fieldTypes,
- int[] pkFields, int[] pkTypes,
- String existSQL,
- String insertSQL,
- String updateSQL, boolean objectReuse) {
- this.pkFields = pkFields;
+ InsertOrUpdateJdbcExecutor(String existSQL,
+ String insertSQL,
+ String updateSQL,
+ ParameterSetter<K> existSetter,
+ ParameterSetter<V> insertSetter,
+ ParameterSetter<V> updateSetter,
+ Function<R, K> keyExtractor,
+ Function<R, V> valueExtractor) {
this.existSQL = existSQL;
this.insertSQL = insertSQL;
this.updateSQL = updateSQL;
- this.fieldTypes = fieldTypes;
- this.pkTypes = pkTypes;
- this.objectReuse = objectReuse;
+ this.existSetter = existSetter;
+ this.insertSetter = insertSetter;
+ this.updateSetter = updateSetter;
+ this.keyExtractor = keyExtractor;
+ this.valueMapper = valueExtractor;
}
@Override
public void open(Connection connection) throws SQLException {
- keyToRows = new HashMap<>();
+ batch = new HashMap<>();
existStatement = connection.prepareStatement(existSQL);
insertStatement = connection.prepareStatement(insertSQL);
updateStatement = connection.prepareStatement(updateSQL);
}
@Override
- public void process(Row record) {
- keyToRows.put(KeyedBatchStatementExecutor.getPrimaryKey(record, pkFields), objectReuse ? Row.copy(record) : record);
+ public void process(R record) {
+ batch.put(keyExtractor.apply(record), valueMapper.apply(record));
}
@Override
public void executeBatch() throws SQLException {
- if (keyToRows.size() > 0) {
- for (Map.Entry<Row, Row> entry : keyToRows.entrySet()) {
+ if (!batch.isEmpty()) {
+ for (Map.Entry<K, V> entry : batch.entrySet()) {
processOneRowInBatch(entry.getKey(), entry.getValue());
}
updateStatement.executeBatch();
insertStatement.executeBatch();
- keyToRows.clear();
+ batch.clear();
}
}
- private void processOneRowInBatch(Row pk, Row row) throws SQLException {
+ private void processOneRowInBatch(K pk, V row) throws SQLException {
if (exist(pk)) {
- setRecordToStatement(updateStatement, fieldTypes, row);
+ updateSetter.accept(updateStatement, row);
updateStatement.addBatch();
} else {
- setRecordToStatement(insertStatement, fieldTypes, row);
+ insertSetter.accept(insertStatement, row);
insertStatement.addBatch();
}
}
- private boolean exist(Row pk) throws SQLException {
- setRecordToStatement(existStatement, pkTypes, pk);
+ private boolean exist(K pk) throws SQLException {
+ existSetter.accept(existStatement, pk);
try (ResultSet resultSet = existStatement.executeQuery()) {
return resultSet.next();
}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/JdbcBatchStatementExecutor.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/JdbcBatchStatementExecutor.java
index f231a739..58632f6 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/JdbcBatchStatementExecutor.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/JdbcBatchStatementExecutor.java
@@ -25,7 +25,10 @@ import org.apache.flink.types.Row;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Arrays;
+import java.util.function.Function;
+import static org.apache.flink.api.java.io.jdbc.JDBCUtils.getPrimaryKey;
+import static org.apache.flink.api.java.io.jdbc.JDBCUtils.setRecordToStatement;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -51,7 +54,7 @@ public interface JdbcBatchStatementExecutor<T> {
*/
void close() throws SQLException;
- static JdbcBatchStatementExecutor<Row> upsert(
+ static JdbcBatchStatementExecutor<Row> upsertRow(
JDBCDialect dialect,
String tableName,
String[] fieldNames,
@@ -66,22 +69,39 @@ public interface JdbcBatchStatementExecutor<T> {
return dialect
.getUpsertStatement(tableName, fieldNames, keyFields)
- .map(sql -> keyed(pkFields, pkTypes, sql, objectReuse))
+ .map(sql -> keyedRow(pkFields, fieldTypes, sql))
.orElseGet(() ->
- new InsertOrUpdateJdbcExecutor(
- fieldTypes, pkFields, pkTypes,
+ new InsertOrUpdateJdbcExecutor<>(
dialect.getRowExistsStatement(tableName, keyFields),
dialect.getInsertIntoStatement(tableName, fieldNames),
dialect.getUpdateStatement(tableName, fieldNames, keyFields),
- objectReuse));
+ ParameterSetter.forRow(pkTypes),
+ ParameterSetter.forRow(fieldTypes),
+ ParameterSetter.forRow(fieldTypes),
+ rowKeyExtractor(pkFields),
+ objectReuse ? Row::copy : Function.identity()));
}
- static JdbcBatchStatementExecutor<Row> keyed(int[] pkFields, int[] pkTypes, String sql, boolean objectReuse) {
- return new KeyedBatchStatementExecutor(pkFields, pkTypes, sql, objectReuse);
+ static Function<Row, Row> rowKeyExtractor(int[] pkFields) {
+ return row -> getPrimaryKey(row, pkFields);
}
- static JdbcBatchStatementExecutor<Row> simple(String sql, int[] fieldTypes, boolean objectReuse) {
- return new SimpleBatchStatementExecutor(sql, fieldTypes, objectReuse);
+ static JdbcBatchStatementExecutor<Row> keyedRow(int[] pkFields, int[] pkTypes, String sql) {
+ return keyed(sql,
+ rowKeyExtractor(pkFields),
+ (st, record) -> setRecordToStatement(st, pkTypes, rowKeyExtractor(pkFields).apply(record)));
+ }
+
+ static <T, K> JdbcBatchStatementExecutor<T> keyed(String sql, Function<T, K> keyExtractor, ParameterSetter<K> parameterSetter) {
+ return new KeyedBatchStatementExecutor<>(sql, keyExtractor, parameterSetter);
+ }
+
+ static JdbcBatchStatementExecutor<Row> simpleRow(String sql, int[] fieldTypes, boolean objectReuse) {
+ return simple(sql, ParameterSetter.forRow(fieldTypes), objectReuse ? Row::copy : Function.identity());
+ }
+
+ static <T, V> JdbcBatchStatementExecutor<T> simple(String sql, ParameterSetter<V> paramSetter, Function<T, V> valueTransformer) {
+ return new SimpleBatchStatementExecutor<>(sql, paramSetter, valueTransformer);
}
}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/KeyedBatchStatementExecutor.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/KeyedBatchStatementExecutor.java
index 70ab679..1da9f45 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/KeyedBatchStatementExecutor.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/KeyedBatchStatementExecutor.java
@@ -18,59 +18,51 @@
package org.apache.flink.api.java.io.jdbc.executor;
-import org.apache.flink.types.Row;
-
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.flink.api.java.io.jdbc.JDBCUtils.setRecordToStatement;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Function;
-/**
- * Upsert writer to deal with upsert, delete message.
- */
-class KeyedBatchStatementExecutor implements JdbcBatchStatementExecutor<Row> {
+class KeyedBatchStatementExecutor<T, K> implements JdbcBatchStatementExecutor<T> {
- private final int[] pkTypes;
- private final int[] pkFields;
private final String sql;
- private final boolean objectReuse;
+ private final ParameterSetter<K> parameterSetter;
+ private final Function<T, K> keyExtractor;
- private transient Map<Row, Row> keyToRows = new HashMap<>();
+ private transient Set<K> batch = new HashSet<>();
private transient PreparedStatement st;
- KeyedBatchStatementExecutor(int[] pkFields, int[] pkTypes, String sql, boolean objectReuse) {
- this.pkFields = pkFields;
- this.pkTypes = pkTypes;
+ /**
+ * Keep in mind object reuse: if it's on then key extractor may be required to return new object.
+ */
+ KeyedBatchStatementExecutor(String sql, Function<T, K> keyExtractor, ParameterSetter<K> parameterSetter) {
+ this.parameterSetter = parameterSetter;
+ this.keyExtractor = keyExtractor;
this.sql = sql;
- this.objectReuse = objectReuse;
}
@Override
public void open(Connection connection) throws SQLException {
- keyToRows = new HashMap<>();
+ batch = new HashSet<>();
st = connection.prepareStatement(sql);
}
@Override
- public void process(Row record) {
- // we don't need perform a deep copy, because jdbc field are immutable object.
- Row row = objectReuse ? Row.copy(record) : record;
- // add records to buffer
- keyToRows.put(getPrimaryKey(row), row);
+ public void process(T record) {
+ batch.add(keyExtractor.apply(record));
}
@Override
public void executeBatch() throws SQLException {
- if (keyToRows.size() > 0) {
- for (Map.Entry<Row, Row> entry : keyToRows.entrySet()) {
- setRecordToStatement(st, pkTypes, entry.getKey());
+ if (!batch.isEmpty()) {
+ for (K entry : batch) {
+ parameterSetter.accept(st, entry);
st.addBatch();
}
st.executeBatch();
- keyToRows.clear();
+ batch.clear();
}
}
@@ -82,16 +74,4 @@ class KeyedBatchStatementExecutor implements JdbcBatchStatementExecutor<Row> {
}
}
- private Row getPrimaryKey(Row row) {
- int[] pkFields = this.pkFields;
- return getPrimaryKey(row, pkFields);
- }
-
- static Row getPrimaryKey(Row row, int[] pkFields) {
- Row pkRow = new Row(pkFields.length);
- for (int i = 0; i < pkFields.length; i++) {
- pkRow.setField(i, row.getField(pkFields[i]));
- }
- return pkRow;
- }
}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/ParameterSetter.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/ParameterSetter.java
new file mode 100644
index 0000000..bb6b6f5
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/ParameterSetter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.executor;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import static org.apache.flink.api.java.io.jdbc.JDBCUtils.setRecordToStatement;
+
+/**
+ * Sets {@link PreparedStatement} parameters to use in JDBC Sink based on a specific type of StreamRecord.
+ * @param <T> type of payload in {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord StreamRecord}
+ * @see JdbcBatchStatementExecutor
+ */
+@Internal
+public interface ParameterSetter<T> extends BiConsumerWithException<PreparedStatement, T, SQLException>, Serializable {
+
+ /**
+ * Creates a {@link ParameterSetter} for {@link Row} using the provided SQL types array.
+ * Uses {@link org.apache.flink.api.java.io.jdbc.JDBCUtils#setRecordToStatement}
+ */
+ static ParameterSetter<Row> forRow(int[] types) {
+ return (st, record) -> setRecordToStatement(st, types, record);
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/SimpleBatchStatementExecutor.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/SimpleBatchStatementExecutor.java
index 16325ec..ff92b9f 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/SimpleBatchStatementExecutor.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/executor/SimpleBatchStatementExecutor.java
@@ -18,29 +18,26 @@
package org.apache.flink.api.java.io.jdbc.executor;
-import org.apache.flink.types.Row;
-
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
+import java.util.function.Function;
-import static org.apache.flink.api.java.io.jdbc.JDBCUtils.setRecordToStatement;
-
-class SimpleBatchStatementExecutor implements JdbcBatchStatementExecutor<Row> {
+class SimpleBatchStatementExecutor<T, V> implements JdbcBatchStatementExecutor<T> {
private final String sql;
- private final int[] paramTypes;
+ private final ParameterSetter<V> parameterSetter;
+ private final Function<T, V> valueTransformer;
private transient PreparedStatement st;
- private List<Row> batch;
- private final boolean objectReuse;
+ private transient List<V> batch;
- SimpleBatchStatementExecutor(String sql, int[] paramTypes, boolean objectReuse) {
+ SimpleBatchStatementExecutor(String sql, ParameterSetter<V> parameterSetter, Function<T, V> valueTransformer) {
this.sql = sql;
- this.paramTypes = paramTypes;
- this.objectReuse = objectReuse;
+ this.parameterSetter = parameterSetter;
+ this.valueTransformer = valueTransformer;
}
@Override
@@ -50,15 +47,15 @@ class SimpleBatchStatementExecutor implements JdbcBatchStatementExecutor<Row> {
}
@Override
- public void process(Row record) {
- batch.add(objectReuse ? Row.copy(record) : record);
+ public void process(T record) {
+ batch.add(valueTransformer.apply(record));
}
@Override
public void executeBatch() throws SQLException {
if (!batch.isEmpty()) {
- for (Row r : batch) {
- setRecordToStatement(st, paramTypes, r);
+ for (V r : batch) {
+ parameterSetter.accept(st, r);
st.addBatch();
}
st.executeBatch();