You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/02/16 11:46:15 UTC
[incubator-seatunnel] branch dev updated: [Feature] [connector] Support flink jdbc sink based on new flink version (#1254)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 43b7a82 [Feature] [connector] Support flink jdbc sink based on new flink version (#1254)
43b7a82 is described below
commit 43b7a829283472d3de39f463f399f8ce07e861f0
Author: leo65535 <le...@163.com>
AuthorDate: Wed Feb 16 19:45:26 2022 +0800
[Feature] [connector] Support flink jdbc sink based on new flink version (#1254)
* [Feature] [connector] Support flink jdbc sink based on new flink version
* update more options
---
.../org/apache/seatunnel/flink/sink/JdbcSink.java | 69 ++++++++++++----------
1 file changed, 37 insertions(+), 32 deletions(-)
diff --git a/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java b/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
index e036358..c70b9cb 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
@@ -20,25 +20,31 @@ package org.apache.seatunnel.flink.sink;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.flink.FlinkEnvironment;
-import org.apache.seatunnel.flink.batch.FlinkBatchSink;
import org.apache.seatunnel.flink.stream.FlinkStreamSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.types.Row;
import javax.annotation.Nullable;
-public class JdbcSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row, Row> {
+import java.util.Arrays;
+
+public class JdbcSink implements FlinkStreamSink<Row, Row> {
private static final long serialVersionUID = 3677571223952518115L;
private static final int DEFAULT_BATCH_SIZE = 5000;
+ private static final int DEFAULT_MAX_RETRY_TIMES = 3;
+ private static final int DEFAULT_INTERVAL_MILLIS = 0;
private Config config;
private String driverName;
@@ -47,6 +53,8 @@ public class JdbcSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row,
private String password;
private String query;
private int batchSize = DEFAULT_BATCH_SIZE;
+ private long batchIntervalMs = DEFAULT_INTERVAL_MILLIS;
+ private int maxRetries = DEFAULT_MAX_RETRY_TIMES;
@Override
public void setConfig(Config config) {
@@ -75,40 +83,37 @@ public class JdbcSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row,
if (config.hasPath("batch_size")) {
batchSize = config.getInt("batch_size");
}
+ if (config.hasPath("batch_interval")) {
+ batchIntervalMs = config.getLong("batch_interval");
+ }
+ if (config.hasPath("batch_max_retries")) {
+ maxRetries = config.getInt("batch_max_retries");
+ }
}
@Override
@Nullable
public DataStreamSink<Row> outputStream(FlinkEnvironment env, DataStream<Row> dataStream) {
Table table = env.getStreamTableEnvironment().fromDataStream(dataStream);
- createSink(env.getStreamTableEnvironment(), table);
- return null;
- }
+ TypeInformation<?>[] fieldTypes = table.getSchema().getFieldTypes();
- @Override
- @Nullable
- public DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
- final Table table = env.getBatchTableEnvironment().fromDataSet(dataSet);
- createSink(env.getBatchTableEnvironment(), table);
- return null;
- }
+ int[] types = Arrays.stream(fieldTypes).mapToInt(JdbcTypeUtil::typeInformationToSqlType).toArray();
+ SinkFunction<Row> sink = org.apache.flink.connector.jdbc.JdbcSink.sink(
+ query,
+ (st, row) -> JdbcUtils.setRecordToStatement(st, types, row),
+ JdbcExecutionOptions.builder()
+ .withBatchSize(batchSize)
+ .withBatchIntervalMs(batchIntervalMs)
+ .withMaxRetries(maxRetries)
+ .build(),
+ new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+ .withUrl(dbUrl)
+ .withDriverName(driverName)
+ .withUsername(username)
+ .withPassword(password)
+ .build());
- private void createSink(TableEnvironment tableEnvironment, Table table) {
- /* TypeInformation<?>[] fieldTypes = table.getSchema().getFieldTypes();
- String[] fieldNames = table.getSchema().getFieldNames();
- TableSink sink = JDBCAppendTableSink.builder()
- .setDrivername(driverName)
- .setDBUrl(dbUrl)
- .setUsername(username)
- .setPassword(password)
- .setBatchSize(batchSize)
- .setQuery(query)
- .setParameterTypes(fieldTypes)
- .build()
- .configure(fieldNames, fieldTypes);
- String uniqueTableName = SchemaUtil.getUniqueTableName();
- tableEnvironment.registerTableSink(uniqueTableName, sink);
- table.insertInto(uniqueTableName);
- */
+ return dataStream.addSink(sink);
}
+
}