You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/02/16 11:46:15 UTC

[incubator-seatunnel] branch dev updated: [Feature] [connector] Support flink jdbc sink based on new flink version (#1254)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 43b7a82  [Feature] [connector] Support flink jdbc sink based on new flink version (#1254)
43b7a82 is described below

commit 43b7a829283472d3de39f463f399f8ce07e861f0
Author: leo65535 <le...@163.com>
AuthorDate: Wed Feb 16 19:45:26 2022 +0800

    [Feature] [connector] Support flink jdbc sink based on new flink version (#1254)
    
    * [Feature] [connector] Support flink jdbc sink based on new flink version
    
    * update more options
---
 .../org/apache/seatunnel/flink/sink/JdbcSink.java  | 69 ++++++++++++----------
 1 file changed, 37 insertions(+), 32 deletions(-)

diff --git a/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java b/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
index e036358..c70b9cb 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
@@ -20,25 +20,31 @@ package org.apache.seatunnel.flink.sink;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.flink.FlinkEnvironment;
-import org.apache.seatunnel.flink.batch.FlinkBatchSink;
 import org.apache.seatunnel.flink.stream.FlinkStreamSink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.types.Row;
 
 import javax.annotation.Nullable;
 
-public class JdbcSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row, Row> {
+import java.util.Arrays;
+
+public class JdbcSink implements FlinkStreamSink<Row, Row> {
 
     private static final long serialVersionUID = 3677571223952518115L;
     private static final int DEFAULT_BATCH_SIZE = 5000;
+    private static final int DEFAULT_MAX_RETRY_TIMES = 3;
+    private static final int DEFAULT_INTERVAL_MILLIS = 0;
 
     private Config config;
     private String driverName;
@@ -47,6 +53,8 @@ public class JdbcSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row,
     private String password;
     private String query;
     private int batchSize = DEFAULT_BATCH_SIZE;
+    private long batchIntervalMs = DEFAULT_INTERVAL_MILLIS;
+    private int maxRetries = DEFAULT_MAX_RETRY_TIMES;
 
     @Override
     public void setConfig(Config config) {
@@ -75,40 +83,37 @@ public class JdbcSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row,
         if (config.hasPath("batch_size")) {
             batchSize = config.getInt("batch_size");
         }
+        if (config.hasPath("batch_interval")) {
+            batchIntervalMs = config.getLong("batch_interval");
+        }
+        if (config.hasPath("batch_max_retries")) {
+            maxRetries = config.getInt("batch_max_retries");
+        }
     }
 
     @Override
     @Nullable
     public DataStreamSink<Row> outputStream(FlinkEnvironment env, DataStream<Row> dataStream) {
         Table table = env.getStreamTableEnvironment().fromDataStream(dataStream);
-        createSink(env.getStreamTableEnvironment(), table);
-        return null;
-    }
+        TypeInformation<?>[] fieldTypes = table.getSchema().getFieldTypes();
 
-    @Override
-    @Nullable
-    public DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
-        final Table table = env.getBatchTableEnvironment().fromDataSet(dataSet);
-        createSink(env.getBatchTableEnvironment(), table);
-        return null;
-    }
+        int[] types = Arrays.stream(fieldTypes).mapToInt(JdbcTypeUtil::typeInformationToSqlType).toArray();
+        SinkFunction<Row> sink = org.apache.flink.connector.jdbc.JdbcSink.sink(
+            query,
+            (st, row) -> JdbcUtils.setRecordToStatement(st, types, row),
+            JdbcExecutionOptions.builder()
+                .withBatchSize(batchSize)
+                .withBatchIntervalMs(batchIntervalMs)
+                .withMaxRetries(maxRetries)
+                .build(),
+            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+                .withUrl(dbUrl)
+                .withDriverName(driverName)
+                .withUsername(username)
+                .withPassword(password)
+                .build());
 
-    private void createSink(TableEnvironment tableEnvironment, Table table) {
-        /*  TypeInformation<?>[] fieldTypes = table.getSchema().getFieldTypes();
-            String[] fieldNames = table.getSchema().getFieldNames();
-            TableSink sink = JDBCAppendTableSink.builder()
-                    .setDrivername(driverName)
-                    .setDBUrl(dbUrl)
-                    .setUsername(username)
-                    .setPassword(password)
-                    .setBatchSize(batchSize)
-                    .setQuery(query)
-                    .setParameterTypes(fieldTypes)
-                    .build()
-                    .configure(fieldNames, fieldTypes);
-            String uniqueTableName = SchemaUtil.getUniqueTableName();
-            tableEnvironment.registerTableSink(uniqueTableName, sink);
-            table.insertInto(uniqueTableName);
-        */
+        return dataStream.addSink(sink);
     }
+
 }