You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/03/10 12:18:23 UTC
[incubator-seatunnel] branch dev updated: [Feature][flink-connector-jdbc] JdbcSink plugin supports batch mode by implementing FlinkBatchSink interface (#1432)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8c14671 [Feature][flink-connector-jdbc] JdbcSink plugin supports batch mode by implementing FlinkBatchSink interface (#1432)
8c14671 is described below
commit 8c146713a7010ef684972973ce09c6c9de791f07
Author: kalencaya <19...@qq.com>
AuthorDate: Thu Mar 10 20:18:18 2022 +0800
[Feature][flink-connector-jdbc] JdbcSink plugin supports batch mode by implementing FlinkBatchSink interface (#1432)
* connector: flink JdbcSink plugin supports batch by implementing FlinkBatchSink interface.
* connector: checkstyle for import order.
Co-authored-by: wangqi <wa...@xinc818.group>
---
.../org/apache/seatunnel/flink/sink/JdbcSink.java | 24 +++++++++++++++++++++-
1 file changed, 23 insertions(+), 1 deletion(-)
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
index d47e3c4..d40bc11 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
@@ -20,13 +20,17 @@ package org.apache.seatunnel.flink.sink;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
import org.apache.seatunnel.flink.stream.FlinkStreamSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcOutputFormat;
import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
import org.apache.flink.connector.jdbc.utils.JdbcUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -39,7 +43,7 @@ import javax.annotation.Nullable;
import java.util.Arrays;
-public class JdbcSink implements FlinkStreamSink<Row, Row> {
+public class JdbcSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row, Row> {
private static final long serialVersionUID = 3677571223952518115L;
private static final int DEFAULT_BATCH_SIZE = 5000;
@@ -120,4 +124,22 @@ public class JdbcSink implements FlinkStreamSink<Row, Row> {
return dataStream.addSink(sink);
}
+ @Nullable
+ @Override
+ public DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
+ Table table = env.getBatchTableEnvironment().fromDataSet(dataSet);
+ TypeInformation<?>[] fieldTypes = table.getSchema().getFieldTypes();
+ int[] types = Arrays.stream(fieldTypes).mapToInt(JdbcTypeUtil::typeInformationToSqlType).toArray();
+
+ JdbcOutputFormat format = JdbcOutputFormat.buildJdbcOutputFormat()
+ .setDrivername(driverName)
+ .setDBUrl(dbUrl)
+ .setUsername(username)
+ .setPassword(password)
+ .setQuery(query)
+ .setBatchSize(batchSize)
+ .setSqlTypes(types)
+ .finish();
+ return dataSet.output(format);
+ }
}