You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/03/10 12:18:23 UTC

[incubator-seatunnel] branch dev updated: [Feature][flink-connector-jdbc] JdbcSink plugin supports batch mode by implementing FlinkBatchSink interface (#1432)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8c14671  [Feature][flink-connector-jdbc] JdbcSink plugin supports batch mode by implementing FlinkBatchSink interface (#1432)
8c14671 is described below

commit 8c146713a7010ef684972973ce09c6c9de791f07
Author: kalencaya <19...@qq.com>
AuthorDate: Thu Mar 10 20:18:18 2022 +0800

    [Feature][flink-connector-jdbc] JdbcSink plugin supports batch mode by implementing FlinkBatchSink interface (#1432)
    
    * connector: flink JdbcSink plugin supports batch by implementing FlinkBatchSink interface.
    
    * connector: checkstyle for import order.
    
    Co-authored-by: wangqi <wa...@xinc818.group>
---
 .../org/apache/seatunnel/flink/sink/JdbcSink.java  | 24 +++++++++++++++++++++-
 1 file changed, 23 insertions(+), 1 deletion(-)

diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
index d47e3c4..d40bc11 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
@@ -20,13 +20,17 @@ package org.apache.seatunnel.flink.sink;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
 import org.apache.seatunnel.flink.stream.FlinkStreamSink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSink;
 import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcOutputFormat;
 import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
 import org.apache.flink.connector.jdbc.utils.JdbcUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -39,7 +43,7 @@ import javax.annotation.Nullable;
 
 import java.util.Arrays;
 
-public class JdbcSink implements FlinkStreamSink<Row, Row> {
+public class JdbcSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row, Row> {
 
     private static final long serialVersionUID = 3677571223952518115L;
     private static final int DEFAULT_BATCH_SIZE = 5000;
@@ -120,4 +124,22 @@ public class JdbcSink implements FlinkStreamSink<Row, Row> {
         return dataStream.addSink(sink);
     }
 
+    @Nullable
+    @Override
+    public DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
+        Table table = env.getBatchTableEnvironment().fromDataSet(dataSet);
+        TypeInformation<?>[] fieldTypes = table.getSchema().getFieldTypes();
+        int[] types = Arrays.stream(fieldTypes).mapToInt(JdbcTypeUtil::typeInformationToSqlType).toArray();
+
+        JdbcOutputFormat format = JdbcOutputFormat.buildJdbcOutputFormat()
+                .setDrivername(driverName)
+                .setDBUrl(dbUrl)
+                .setUsername(username)
+                .setPassword(password)
+                .setQuery(query)
+                .setBatchSize(batchSize)
+                .setSqlTypes(types)
+                .finish();
+        return dataSet.output(format);
+    }
 }