You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/02/07 05:33:03 UTC

[incubator-seatunnel] branch dev updated: [SeaTunnel#1191] DorisSink,DruidSink,DruidSource,Elasticsearch,FileSo… (#1192)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 805a439  [SeaTunnel#1191] DorisSink,DruidSink,DruidSource,Elasticsearch,FileSo… (#1192)
805a439 is described below

commit 805a4397d3ce71689ab0747316decdc2b432d623
Author: bigdataf <33...@users.noreply.github.com>
AuthorDate: Mon Feb 7 13:32:59 2022 +0800

    [SeaTunnel#1191] DorisSink,DruidSink,DruidSource,Elasticsearch,FileSo… (#1192)
    
    * [SeaTunnel#1191] DorisSink,DruidSink,DruidSource,Elasticsearch,FileSource,FileSink,InfluxDbSink Support for parallel parameters
---
 docs/en/flink/configuration/sink-plugins/Doris.md          |  5 +++++
 docs/en/flink/configuration/sink-plugins/Druid.md          |  5 +++++
 docs/en/flink/configuration/sink-plugins/Elasticsearch.md  |  5 +++++
 docs/en/flink/configuration/sink-plugins/File.md           |  7 +++++++
 docs/en/flink/configuration/sink-plugins/InfluxDb.md       |  6 ++++++
 docs/en/flink/configuration/source-plugins/Druid.md        |  5 +++++
 docs/en/flink/configuration/source-plugins/File.md         |  5 +++++
 .../java/org/apache/seatunnel/flink/sink/DorisSink.java    | 14 ++++++++++++--
 .../java/org/apache/seatunnel/flink/sink/DruidSink.java    |  8 +++++++-
 .../org/apache/seatunnel/flink/source/DruidSource.java     |  9 ++++++++-
 .../org/apache/seatunnel/flink/sink/Elasticsearch.java     | 14 +++++++++++++-
 .../java/org/apache/seatunnel/flink/sink/FileSink.java     |  9 ++++++++-
 .../java/org/apache/seatunnel/flink/source/FileSource.java |  9 ++++++++-
 .../java/org/apache/seatunnel/flink/sink/InfluxDbSink.java |  8 +++++++-
 14 files changed, 101 insertions(+), 8 deletions(-)

diff --git a/docs/en/flink/configuration/sink-plugins/Doris.md b/docs/en/flink/configuration/sink-plugins/Doris.md
index 2278f97..88fe76e 100644
--- a/docs/en/flink/configuration/sink-plugins/Doris.md
+++ b/docs/en/flink/configuration/sink-plugins/Doris.md
@@ -17,6 +17,7 @@ Write Data to a Doris Table.
 | interval	 | int | no |1000 | Flink |
 | max_retries	 | int | no | 1 | Flink|
 | doris.*	 | - | no | - | Flink  |
+| parallelism | int | no  | - |Flink|
 
 ##### fenodes [string]
 
@@ -55,6 +56,10 @@ Number of retries after writing Doris failed
 The doris stream load parameters.you can use 'doris.' prefix + stream_load properties. eg:doris.column_separator' = ','
 [More Doris stream_load Configurations](https://doris.apache.org/administrator-guide/load-data/stream-load-manual.html)
 
+### parallelism [Int]
+
+The parallelism of an individual operator, for DorisSink
+
 ### Examples
 
 ```
diff --git a/docs/en/flink/configuration/sink-plugins/Druid.md b/docs/en/flink/configuration/sink-plugins/Druid.md
index 7d95058..b235997 100644
--- a/docs/en/flink/configuration/sink-plugins/Druid.md
+++ b/docs/en/flink/configuration/sink-plugins/Druid.md
@@ -12,6 +12,7 @@ Write data to Apache Druid.
 | datasource       | `String` | yes      | -             |
 | timestamp_column | `String` | no       | timestamp     |
 | timestamp_format | `String` | no       | auto          |
+| parallelism      | `Int`    | no       | -             |
 
 ### coordinator_url [`String`]
 
@@ -49,6 +50,10 @@ The timestamp format in Apache Druid, the default value is `auto`, it could be:
 
 - any [Joda DateTimeFormat](http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html) string
 
+### parallelism [`Int`]
+
+The parallelism of an individual operator, for DruidSink
+
 ## Example
 
 ### Simple
diff --git a/docs/en/flink/configuration/sink-plugins/Elasticsearch.md b/docs/en/flink/configuration/sink-plugins/Elasticsearch.md
index 5c9217e..99a1c04 100644
--- a/docs/en/flink/configuration/sink-plugins/Elasticsearch.md
+++ b/docs/en/flink/configuration/sink-plugins/Elasticsearch.md
@@ -13,6 +13,7 @@ Output data to ElasticSearch
 | index_time_format | string | no       | yyyy.MM.dd    |
 | index             | string | no       | seatunnel     |
 | common-options    | string | no       | -             |
+| parallelism       | int    | no       | -             |
 
 ### hosts [array]
 
@@ -41,6 +42,10 @@ See [Java SimpleDateFormat](https://docs.oracle.com/javase/tutorial/i18n/format/
 
 Elasticsearch `index` name. If you need to generate an `index` based on time, you can specify a time variable, such as `seatunnel-${now}` . `now` represents the current data processing time.
 
+### parallelism [`Int`]
+
+The parallelism of an individual operator, data source, or data sink
+
 ### common options [string]
 
 Sink plugin common parameters, please refer to [Sink Plugin](./sink-plugin.md) for details
diff --git a/docs/en/flink/configuration/sink-plugins/File.md b/docs/en/flink/configuration/sink-plugins/File.md
index 879aed3..42d13b4 100644
--- a/docs/en/flink/configuration/sink-plugins/File.md
+++ b/docs/en/flink/configuration/sink-plugins/File.md
@@ -12,6 +12,8 @@ Write data to the file system
 | path           | string | yes      | -             |
 | write_mode     | string | no       | -             |
 | common-options | string | no       | -             |
+| parallelism    | int    | no       | -             |
+
 
 ### format [string]
 
@@ -35,6 +37,11 @@ The file path is required. The `hdfs file` starts with `hdfs://` , and the `loca
 
 Sink plugin common parameters, please refer to [Sink Plugin](./sink-plugin.md) for details
 
+### parallelism [`Int`]
+
+The parallelism of an individual operator, for FileSink
+
+
 ## Examples
 
 ```bash
diff --git a/docs/en/flink/configuration/sink-plugins/InfluxDb.md b/docs/en/flink/configuration/sink-plugins/InfluxDb.md
index 86f8ef0..d31b16f 100644
--- a/docs/en/flink/configuration/sink-plugins/InfluxDb.md
+++ b/docs/en/flink/configuration/sink-plugins/InfluxDb.md
@@ -15,6 +15,7 @@ Write data to InfluxDB.
 | measurement | `String`       | yes      | -             |
 | tags        | `List<String>` | yes      | -             |
 | fields      | `List<String>` | yes      | -             |
+| parallelism | `Int`          | no       | -             |
 
 ### server_url [`String`]
 
@@ -44,6 +45,11 @@ The list of Tag in InfluxDB.
 
 The list of Field in InfluxDB.
 
+### parallelism [`Int`]
+
+The parallelism of an individual operator, for InfluxDbSink
+
+
 ## Example
 
 ### Simple
diff --git a/docs/en/flink/configuration/source-plugins/Druid.md b/docs/en/flink/configuration/source-plugins/Druid.md
index 9512b30..e45f920 100644
--- a/docs/en/flink/configuration/source-plugins/Druid.md
+++ b/docs/en/flink/configuration/source-plugins/Druid.md
@@ -13,6 +13,7 @@ Read data from Apache Druid.
 | start_date | `String`       | no       | -             |
 | end_date   | `String`       | no       | -             |
 | columns    | `List<String>` | no       | `*`           |
+| parallelism      | `Int`    | no       | -             |
 
 ### jdbc_url [`String`]
 
@@ -38,6 +39,10 @@ These columns that you want to query of DataSource.
 
 Source Plugin common parameters, refer to [Source Plugin](./source-plugin.md) for details
 
+### parallelism [`Int`]
+
+The parallelism of an individual operator, for DruidSource
+
 ## Example
 
 ```hocon
diff --git a/docs/en/flink/configuration/source-plugins/File.md b/docs/en/flink/configuration/source-plugins/File.md
index 7bc2b54..cc3d87b 100644
--- a/docs/en/flink/configuration/source-plugins/File.md
+++ b/docs/en/flink/configuration/source-plugins/File.md
@@ -12,6 +12,7 @@ Read data from the file system
 | path           | string | yes      | -             |
 | schema         | string | yes      | -             |
 | common-options | string | no       | -             |
+| parallelism    | int    | no       | -             |
 
 ### format.type [string]
 
@@ -47,6 +48,10 @@ The file path is required. The `hdfs file` starts with `hdfs://` , and the `loca
 
 Source plugin common parameters, please refer to [Source Plugin](./source-plugin.md) for details
 
+### parallelism [`Int`]
+
+The parallelism of an individual operator, for FileSource
+
 ## Examples
 
 ```bash
diff --git a/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java b/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java
index c92829b..5709942 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java
@@ -44,6 +44,7 @@ public class DorisSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row,
     private static final long serialVersionUID = 4747849769146047770L;
     private static final int DEFAULT_BATCH_SIZE = 100;
     private static final long DEFAULT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1);
+    private static final String PARALLELISM = "parallelism";
 
     private Config config;
     private String fenodes;
@@ -103,7 +104,12 @@ public class DorisSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row,
         String[] fieldNames = table.getSchema().getFieldNames();
 
         DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(fenodes, dbName, tableName, username, password, streamLoadProp);
-        return dataSet.output(new DorisOutputFormat<>(dorisStreamLoad, fieldNames, batchSize, batchIntervalMs, maxRetries));
+        DataSink<Row> rowDataSink = dataSet.output(new DorisOutputFormat<>(dorisStreamLoad, fieldNames, batchSize, batchIntervalMs, maxRetries));
+        if (config.hasPath(PARALLELISM)) {
+            int parallelism = config.getInt(PARALLELISM);
+            return rowDataSink.setParallelism(parallelism);
+        }
+        return rowDataSink;
     }
 
     @Override
@@ -113,7 +119,11 @@ public class DorisSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row,
         String[] fieldNames = table.getSchema().getFieldNames();
 
         DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(fenodes, dbName, tableName, username, password, streamLoadProp);
-        dataStream.addSink(new DorisSinkFunction<>(new DorisOutputFormat<>(dorisStreamLoad, fieldNames, batchSize, batchIntervalMs, maxRetries)));
+        DataStreamSink<Row> rowDataStreamSink = dataStream.addSink(new DorisSinkFunction<>(new DorisOutputFormat<>(dorisStreamLoad, fieldNames, batchSize, batchIntervalMs, maxRetries)));
+        if (config.hasPath(PARALLELISM)) {
+            int parallelism = config.getInt(PARALLELISM);
+            rowDataStreamSink.setParallelism(parallelism);
+        }
         return null;
     }
 }
diff --git a/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java b/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
index 83f6f87..52ea868 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
@@ -35,6 +35,7 @@ public class DruidSink implements FlinkBatchSink<Row, Row> {
     private static final String DATASOURCE = "datasource";
     private static final String TIMESTAMP_COLUMN = "timestamp_column";
     private static final String TIMESTAMP_FORMAT = "timestamp_format";
+    private static final String PARALLELISM = "parallelism";
 
     private Config config;
     private String coordinatorURL;
@@ -44,7 +45,12 @@ public class DruidSink implements FlinkBatchSink<Row, Row> {
 
     @Override
     public DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
-        return dataSet.output(new DruidOutputFormat(coordinatorURL, datasource, timestampColumn, timestampFormat));
+        DataSink<Row> dataSink = dataSet.output(new DruidOutputFormat(coordinatorURL, datasource, timestampColumn, timestampFormat));
+        if (config.hasPath(PARALLELISM)) {
+            int parallelism = config.getInt(PARALLELISM);
+            return dataSink.setParallelism(parallelism);
+        }
+        return dataSink;
     }
 
     @Override
diff --git a/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/source/DruidSource.java b/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/source/DruidSource.java
index 1b57162..b271ff0 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/source/DruidSource.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/source/DruidSource.java
@@ -37,6 +37,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.types.Row;
 import org.slf4j.Logger;
@@ -64,6 +65,7 @@ public class DruidSource implements FlinkBatchSource<Row> {
     private static final String START_TIMESTAMP = "start_date";
     private static final String END_TIMESTAMP = "end_date";
     private static final String COLUMNS = "columns";
+    private static final String PARALLELISM = "parallelism";
 
     private HashMap<String, TypeInformation> informationMapping = new HashMap<>();
 
@@ -86,7 +88,12 @@ public class DruidSource implements FlinkBatchSource<Row> {
 
     @Override
     public DataSet<Row> getData(FlinkEnvironment env) {
-        return env.getBatchEnvironment().createInput(druidInputFormat);
+        DataSource<Row> dataSource = env.getBatchEnvironment().createInput(druidInputFormat);
+        if (config.hasPath(PARALLELISM)) {
+            int parallelism = config.getInt(PARALLELISM);
+            return dataSource.setParallelism(parallelism);
+        }
+        return dataSource;
     }
 
     @Override
diff --git a/seatunnel-connectors/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/sink/Elasticsearch.java b/seatunnel-connectors/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/sink/Elasticsearch.java
index ed9697e..bf1744a 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/sink/Elasticsearch.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/sink/Elasticsearch.java
@@ -50,6 +50,7 @@ public class Elasticsearch implements FlinkStreamSink<Row, Row>, FlinkBatchSink<
 
     private static final long serialVersionUID = 8445868321245456793L;
     private static final int DEFAULT_CONFIG_SIZE = 3;
+    private static final String PARALLELISM = "parallelism";
 
     private Config config;
     private String indexName;
@@ -120,6 +121,10 @@ public class Elasticsearch implements FlinkStreamSink<Row, Row>, FlinkBatchSink<
         esSinkBuilder.setBulkFlushMaxActions(1);
 
         // finally, build and add the sink to the job's pipeline
+        if (config.hasPath(PARALLELISM)) {
+            int parallelism = config.getInt(PARALLELISM);
+            return dataStream.addSink(esSinkBuilder.build()).setParallelism(parallelism);
+        }
         return dataStream.addSink(esSinkBuilder.build());
     }
 
@@ -129,7 +134,7 @@ public class Elasticsearch implements FlinkStreamSink<Row, Row>, FlinkBatchSink<
         RowTypeInfo rowTypeInfo = (RowTypeInfo) dataSet.getType();
         String[] fieldNames = rowTypeInfo.getFieldNames();
         indexName = StringTemplate.substitute(config.getString("index"), config.getString("index_time_format"));
-        return dataSet.output(new ElasticsearchOutputFormat<>(config, new ElasticsearchSinkFunction<Row>() {
+        DataSink<Row> dataSink = dataSet.output(new ElasticsearchOutputFormat<>(config, new ElasticsearchSinkFunction<Row>() {
             @Override
             public void process(Row element, RuntimeContext ctx, RequestIndexer indexer) {
                 indexer.add(createIndexRequest(element));
@@ -148,5 +153,12 @@ public class Elasticsearch implements FlinkStreamSink<Row, Row>, FlinkBatchSink<
                         .source(json);
             }
         }));
+
+        if (config.hasPath(PARALLELISM)) {
+            int parallelism = config.getInt(PARALLELISM);
+            return dataSink.setParallelism(parallelism);
+        }
+        return dataSink;
+
     }
 }
diff --git a/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java b/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
index bc02392..91be72a 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
@@ -51,6 +51,7 @@ public class FileSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row,
     private static final String PATH = "path";
     private static final String FORMAT = "format";
     private static final String WRITE_MODE = "write_mode";
+    private static final String PARALLELISM = "parallelism";
 
     private Config config;
 
@@ -95,7 +96,13 @@ public class FileSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row,
             String mode = config.getString(WRITE_MODE);
             outputFormat.setWriteMode(FileSystem.WriteMode.valueOf(mode));
         }
-        return dataSet.output(outputFormat);
+
+        DataSink dataSink = dataSet.output(outputFormat);
+        if (config.hasPath(PARALLELISM)) {
+            int parallelism = config.getInt(PARALLELISM);
+            return dataSink.setParallelism(parallelism);
+        }
+        return dataSink;
     }
 
     @Override
diff --git a/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/source/FileSource.java b/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/source/FileSource.java
index f3439f7..f1cb792 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/source/FileSource.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/source/FileSource.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.io.RowCsvInputFormat;
+import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.parquet.ParquetRowInputFormat;
@@ -54,10 +55,16 @@ public class FileSource implements FlinkBatchSource<Row> {
     private static final String PATH = "path";
     private static final String SOURCE_FORMAT = "format.type";
     private static final String SCHEMA = "schema";
+    private static final String PARALLELISM = "parallelism";
 
     @Override
     public DataSet<Row> getData(FlinkEnvironment env) {
-        return env.getBatchEnvironment().createInput(inputFormat);
+        DataSource dataSource = env.getBatchEnvironment().createInput(inputFormat);
+        if (config.hasPath(PARALLELISM)) {
+            int parallelism = config.getInt(PARALLELISM);
+            return dataSource.setParallelism(parallelism);
+        }
+        return dataSource;
     }
 
     @Override
diff --git a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java
index 72be132..82209b4 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java
@@ -40,6 +40,7 @@ public class InfluxDbSink implements FlinkBatchSink<Row, Row> {
     private static final String MEASUREMENT = "measurement";
     private static final String TAGS = "tags";
     private static final String FIELDS = "fields";
+    private static final String PARALLELISM = "parallelism";
 
     private Config config;
     private String serverURL;
@@ -52,7 +53,12 @@ public class InfluxDbSink implements FlinkBatchSink<Row, Row> {
 
     @Override
     public DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
-        return dataSet.output(new InfluxDbOutputFormat(serverURL, username, password, database, measurement, tags, fields));
+        DataSink<Row> dataSink = dataSet.output(new InfluxDbOutputFormat(serverURL, username, password, database, measurement, tags, fields));
+        if (config.hasPath(PARALLELISM)) {
+            int parallelism = config.getInt(PARALLELISM);
+            return dataSink.setParallelism(parallelism);
+        }
+        return dataSink;
     }
 
     @Override