You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by le...@apache.org on 2022/02/21 05:58:01 UTC

[incubator-seatunnel] branch dev updated: [Feature] [connector] For flink JdbcSource , JdbcSink and InfluxDbSource plugin supports parallel parameters (#1278)

This is an automated email from the ASF dual-hosted git repository.

leo65535 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a1765f9  [Feature] [connector] For flink JdbcSource , JdbcSink and InfluxDbSource plugin supports parallel parameters  (#1278)
a1765f9 is described below

commit a1765f9b6c3efc111a0d3b1425f0476950821891
Author: bigdataf <33...@users.noreply.github.com>
AuthorDate: Mon Feb 21 13:57:53 2022 +0800

    [Feature] [connector] For flink JdbcSource , JdbcSink and InfluxDbSource plugin supports parallel parameters  (#1278)
    
    * [SeaTunnel#1191] DorisSink,DruidSink,DruidSource,Elasticsearch,FileSource,FileSink,InfluxDbSink Support for parallel parameters
    
    Signed-off-by: jianmei.gao <ji...@mtime.com>
    
    * [SeaTunnel#1191] DorisSink,DruidSink,DruidSource,Elasticsearch,FileSource,FileSink,InfluxDbSink Support for parallel parameters, fix the check style to pass the CI ,and add doc
    
    * [SeaTunnel#1191] update  doc
    
    * [add exit status code] By adding the wrong status code, determine whether the program is executed correctly and successfully
    
    * [bug] In order to fix the execution problem after the flink version is upgraded to 1.13.5
    
    * [Bug] [example] for rollback pom style 666 lines
    
    * [Feature] [connector] For flink JdbcSource , JdbcSink and InfluxDbSource plugin supports parallel parameters
    
    * [Feature] [connector] In order to fix the flink JdbcSource and InfluxDbSource doc table format alignment issues
    
    * [Feature] [connector] In order to fix the flink JdbcSource doc
    
    Co-authored-by: jianmei.gao <ji...@mtime.com>
---
 docs/en/flink/configuration/sink-plugins/Jdbc.md                 | 5 +++++
 docs/en/flink/configuration/source-plugins/InfluxDb.md           | 5 +++++
 docs/en/flink/configuration/source-plugins/Jdbc.md               | 5 +++++
 .../java/org/apache/seatunnel/flink/source/InfluxDbSource.java   | 9 ++++++++-
 .../src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java  | 4 ++++
 .../main/java/org/apache/seatunnel/flink/source/JdbcSource.java  | 9 ++++++++-
 6 files changed, 35 insertions(+), 2 deletions(-)

diff --git a/docs/en/flink/configuration/sink-plugins/Jdbc.md b/docs/en/flink/configuration/sink-plugins/Jdbc.md
index 1205bb7..d163915 100644
--- a/docs/en/flink/configuration/sink-plugins/Jdbc.md
+++ b/docs/en/flink/configuration/sink-plugins/Jdbc.md
@@ -18,6 +18,7 @@ Write data through jdbc
 | batch_size        | int    | no       | -             |
 | source_table_name | string | yes      | -             |
 | common-options    | string | no       | -             |
+| parallelism       | int    | no       | -             |
 
 ### driver [string]
 
@@ -43,6 +44,10 @@ Insert statement
 
 Number of writes per batch
 
+### parallelism [int]
+
+The parallelism of an individual operator, for JdbcSink.
+
 ### common options [string]
 
 Sink plugin common parameters, please refer to [Sink Plugin](./sink-plugin.md) for details
diff --git a/docs/en/flink/configuration/source-plugins/InfluxDb.md b/docs/en/flink/configuration/source-plugins/InfluxDb.md
index 44d1212..590b415 100644
--- a/docs/en/flink/configuration/source-plugins/InfluxDb.md
+++ b/docs/en/flink/configuration/source-plugins/InfluxDb.md
@@ -17,6 +17,7 @@ Read data from InfluxDB.
 | measurement | `String`       | yes      | -             |
 | fields      | `List<String>` | yes      | -             |
 | field_types | `List<String>` | yes      | -             |
+| parallelism | `Int`          | no       | -             |
 
 ### server_url [`String`]
 
@@ -46,6 +47,10 @@ The list of Field in InfluxDB.
 
 The list of Field Types in InfluxDB.
 
+### parallelism [`Int`]
+
+The parallelism of an individual operator, for InfluxDbSource.
+
 ## Example
 
 ### Simple
diff --git a/docs/en/flink/configuration/source-plugins/Jdbc.md b/docs/en/flink/configuration/source-plugins/Jdbc.md
index c636476..61c0818 100644
--- a/docs/en/flink/configuration/source-plugins/Jdbc.md
+++ b/docs/en/flink/configuration/source-plugins/Jdbc.md
@@ -17,6 +17,7 @@ Read data through jdbc
 | query          | string | yes      | -             |
 | fetch_size     | int    | no       | -             |
 | common-options | string | no       | -             |
+| parallelism    | int    | no       | -             |
 
 ### driver [string]
 
@@ -42,6 +43,10 @@ Query statement
 
 fetch size
 
+### parallelism [int]
+
+The parallelism of an individual operator, for JdbcSource.
+
 ### common options [string]
 
 Source plugin common parameters, please refer to [Source Plugin](./source-plugin.md) for details
diff --git a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
index c045765..a8cf18e 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
@@ -35,6 +35,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.types.Row;
 
@@ -55,6 +56,7 @@ public class InfluxDbSource implements FlinkBatchSource<Row> {
     private static final String FIELD_TYPES = "field_types";
     private static final String START_TIMESTAMP = "start_date";
     private static final String END_TIMESTAMP = "end_date";
+    private static final String PARALLELISM = "parallelism";
 
     private HashMap<String, TypeInformation> informationMapping = new HashMap<>();
 
@@ -72,7 +74,12 @@ public class InfluxDbSource implements FlinkBatchSource<Row> {
 
     @Override
     public DataSet<Row> getData(FlinkEnvironment env) {
-        return env.getBatchEnvironment().createInput(influxDbInputFormat);
+        DataSource<Row> dataSource = env.getBatchEnvironment().createInput(influxDbInputFormat);
+        if (config.hasPath(PARALLELISM)) {
+            int parallelism = config.getInt(PARALLELISM);
+            return dataSource.setParallelism(parallelism);
+        }
+        return dataSource;
     }
 
     @Override
diff --git a/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java b/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
index c70b9cb..d47e3c4 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
@@ -45,6 +45,7 @@ public class JdbcSink implements FlinkStreamSink<Row, Row> {
     private static final int DEFAULT_BATCH_SIZE = 5000;
     private static final int DEFAULT_MAX_RETRY_TIMES = 3;
     private static final int DEFAULT_INTERVAL_MILLIS = 0;
+    private static final String PARALLELISM = "parallelism";
 
     private Config config;
     private String driverName;
@@ -113,6 +114,9 @@ public class JdbcSink implements FlinkStreamSink<Row, Row> {
                 .withPassword(password)
                 .build());
 
+        if (config.hasPath(PARALLELISM)) {
+            return dataStream.addSink(sink).setParallelism(config.getInt(PARALLELISM));
+        }
         return dataStream.addSink(sink);
     }
 
diff --git a/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java b/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
index 6025bca..aa22712 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
@@ -38,6 +38,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.connector.jdbc.JdbcInputFormat;
 import org.apache.flink.types.Row;
@@ -70,6 +71,7 @@ public class JdbcSource implements FlinkBatchSource<Row> {
     private Set<String> fields;
 
     private static final Pattern COMPILE = Pattern.compile("select (.+) from (.+).*");
+    private static final String PARALLELISM = "parallelism";
 
     private HashMap<String, TypeInformation> informationMapping = new HashMap<>();
 
@@ -99,7 +101,12 @@ public class JdbcSource implements FlinkBatchSource<Row> {
 
     @Override
     public DataSet<Row> getData(FlinkEnvironment env) {
-        return env.getBatchEnvironment().createInput(jdbcInputFormat);
+        DataSource<Row> dataSource = env.getBatchEnvironment().createInput(jdbcInputFormat);
+        if (config.hasPath(PARALLELISM)) {
+            int parallelism = config.getInt(PARALLELISM);
+            return dataSource.setParallelism(parallelism);
+        }
+        return dataSource;
     }
 
     @Override