You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by le...@apache.org on 2022/02/21 05:58:01 UTC
[incubator-seatunnel] branch dev updated: [Feature] [connector] For flink JdbcSource , JdbcSink and InfluxDbSource plugin supports parallel parameters (#1278)
This is an automated email from the ASF dual-hosted git repository.
leo65535 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a1765f9 [Feature] [connector] For flink JdbcSource , JdbcSink and InfluxDbSource plugin supports parallel parameters (#1278)
a1765f9 is described below
commit a1765f9b6c3efc111a0d3b1425f0476950821891
Author: bigdataf <33...@users.noreply.github.com>
AuthorDate: Mon Feb 21 13:57:53 2022 +0800
[Feature] [connector] For flink JdbcSource , JdbcSink and InfluxDbSource plugin supports parallel parameters (#1278)
* [SeaTunnel#1191] DorisSink,DruidSink,DruidSource,Elasticsearch,FileSource,FileSink,InfluxDbSink Support for parallel parameters
Signed-off-by: jianmei.gao <ji...@mtime.com>
* [SeaTunnel#1191] DorisSink,DruidSink,DruidSource,Elasticsearch,FileSource,FileSink,InfluxDbSink Support for parallel parameters, fix the check style to pass the CI ,and add doc
* [SeaTunnel#1191] update doc
* [add exit status code] By adding the wrong status code, determine whether the program is executed correctly and successfully
* [bug] In order to fix the execution problem after the flink version is upgraded to 1.13.5
* [Bug] [example] for rollback pom style 666 lines
* [Feature] [connector] For flink JdbcSource , JdbcSink and InfluxDbSource plugin supports parallel parameters
* [Feature] [connector] In order to fix the flink JdbcSource and InfluxDbSource doc table format alignment issues
* [Feature] [connector] In order to fix the flink JdbcSource doc
Co-authored-by: jianmei.gao <ji...@mtime.com>
---
docs/en/flink/configuration/sink-plugins/Jdbc.md | 5 +++++
docs/en/flink/configuration/source-plugins/InfluxDb.md | 5 +++++
docs/en/flink/configuration/source-plugins/Jdbc.md | 5 +++++
.../java/org/apache/seatunnel/flink/source/InfluxDbSource.java | 9 ++++++++-
.../src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java | 4 ++++
.../main/java/org/apache/seatunnel/flink/source/JdbcSource.java | 9 ++++++++-
6 files changed, 35 insertions(+), 2 deletions(-)
diff --git a/docs/en/flink/configuration/sink-plugins/Jdbc.md b/docs/en/flink/configuration/sink-plugins/Jdbc.md
index 1205bb7..d163915 100644
--- a/docs/en/flink/configuration/sink-plugins/Jdbc.md
+++ b/docs/en/flink/configuration/sink-plugins/Jdbc.md
@@ -18,6 +18,7 @@ Write data through jdbc
| batch_size | int | no | - |
| source_table_name | string | yes | - |
| common-options | string | no | - |
+| parallelism | int | no | - |
### driver [string]
@@ -43,6 +44,10 @@ Insert statement
Number of writes per batch
+### parallelism [int]
+
+The parallelism of an individual operator, for JdbcSink.
+
### common options [string]
Sink plugin common parameters, please refer to [Sink Plugin](./sink-plugin.md) for details
diff --git a/docs/en/flink/configuration/source-plugins/InfluxDb.md b/docs/en/flink/configuration/source-plugins/InfluxDb.md
index 44d1212..590b415 100644
--- a/docs/en/flink/configuration/source-plugins/InfluxDb.md
+++ b/docs/en/flink/configuration/source-plugins/InfluxDb.md
@@ -17,6 +17,7 @@ Read data from InfluxDB.
| measurement | `String` | yes | - |
| fields | `List<String>` | yes | - |
| field_types | `List<String>` | yes | - |
+| parallelism | `Int` | no | - |
### server_url [`String`]
@@ -46,6 +47,10 @@ The list of Field in InfluxDB.
The list of Field Types in InfluxDB.
+### parallelism [`Int`]
+
+The parallelism of an individual operator, for InfluxDbSource.
+
## Example
### Simple
diff --git a/docs/en/flink/configuration/source-plugins/Jdbc.md b/docs/en/flink/configuration/source-plugins/Jdbc.md
index c636476..61c0818 100644
--- a/docs/en/flink/configuration/source-plugins/Jdbc.md
+++ b/docs/en/flink/configuration/source-plugins/Jdbc.md
@@ -17,6 +17,7 @@ Read data through jdbc
| query | string | yes | - |
| fetch_size | int | no | - |
| common-options | string | no | - |
+| parallelism | int | no | - |
### driver [string]
@@ -42,6 +43,10 @@ Query statement
fetch size
+### parallelism [int]
+
+The parallelism of an individual operator, for JdbcSource.
+
### common options [string]
Source plugin common parameters, please refer to [Source Plugin](./source-plugin.md) for details
diff --git a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
index c045765..a8cf18e 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
@@ -35,6 +35,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
@@ -55,6 +56,7 @@ public class InfluxDbSource implements FlinkBatchSource<Row> {
private static final String FIELD_TYPES = "field_types";
private static final String START_TIMESTAMP = "start_date";
private static final String END_TIMESTAMP = "end_date";
+ private static final String PARALLELISM = "parallelism";
private HashMap<String, TypeInformation> informationMapping = new HashMap<>();
@@ -72,7 +74,12 @@ public class InfluxDbSource implements FlinkBatchSource<Row> {
@Override
public DataSet<Row> getData(FlinkEnvironment env) {
- return env.getBatchEnvironment().createInput(influxDbInputFormat);
+ DataSource<Row> dataSource = env.getBatchEnvironment().createInput(influxDbInputFormat);
+ if (config.hasPath(PARALLELISM)) {
+ int parallelism = config.getInt(PARALLELISM);
+ return dataSource.setParallelism(parallelism);
+ }
+ return dataSource;
}
@Override
diff --git a/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java b/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
index c70b9cb..d47e3c4 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
@@ -45,6 +45,7 @@ public class JdbcSink implements FlinkStreamSink<Row, Row> {
private static final int DEFAULT_BATCH_SIZE = 5000;
private static final int DEFAULT_MAX_RETRY_TIMES = 3;
private static final int DEFAULT_INTERVAL_MILLIS = 0;
+ private static final String PARALLELISM = "parallelism";
private Config config;
private String driverName;
@@ -113,6 +114,9 @@ public class JdbcSink implements FlinkStreamSink<Row, Row> {
.withPassword(password)
.build());
+ if (config.hasPath(PARALLELISM)) {
+ return dataStream.addSink(sink).setParallelism(config.getInt(PARALLELISM));
+ }
return dataStream.addSink(sink);
}
diff --git a/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java b/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
index 6025bca..aa22712 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
@@ -38,6 +38,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.jdbc.JdbcInputFormat;
import org.apache.flink.types.Row;
@@ -70,6 +71,7 @@ public class JdbcSource implements FlinkBatchSource<Row> {
private Set<String> fields;
private static final Pattern COMPILE = Pattern.compile("select (.+) from (.+).*");
+ private static final String PARALLELISM = "parallelism";
private HashMap<String, TypeInformation> informationMapping = new HashMap<>();
@@ -99,7 +101,12 @@ public class JdbcSource implements FlinkBatchSource<Row> {
@Override
public DataSet<Row> getData(FlinkEnvironment env) {
- return env.getBatchEnvironment().createInput(jdbcInputFormat);
+ DataSource<Row> dataSource = env.getBatchEnvironment().createInput(jdbcInputFormat);
+ if (config.hasPath(PARALLELISM)) {
+ int parallelism = config.getInt(PARALLELISM);
+ return dataSource.setParallelism(parallelism);
+ }
+ return dataSource;
}
@Override