You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/02/07 05:33:03 UTC
[incubator-seatunnel] branch dev updated: [SeaTunnel#1191] DorisSink,DruidSink,DruidSource,Elasticsearch,FileSo… (#1192)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 805a439 [SeaTunnel#1191] DorisSink,DruidSink,DruidSource,Elasticsearch,FileSo… (#1192)
805a439 is described below
commit 805a4397d3ce71689ab0747316decdc2b432d623
Author: bigdataf <33...@users.noreply.github.com>
AuthorDate: Mon Feb 7 13:32:59 2022 +0800
[SeaTunnel#1191] DorisSink,DruidSink,DruidSource,Elasticsearch,FileSo… (#1192)
* [SeaTunnel#1191] DorisSink,DruidSink,DruidSource,Elasticsearch,FileSource,FileSink,InfluxDbSink Support for parallel parameters
---
docs/en/flink/configuration/sink-plugins/Doris.md | 5 +++++
docs/en/flink/configuration/sink-plugins/Druid.md | 5 +++++
docs/en/flink/configuration/sink-plugins/Elasticsearch.md | 5 +++++
docs/en/flink/configuration/sink-plugins/File.md | 7 +++++++
docs/en/flink/configuration/sink-plugins/InfluxDb.md | 6 ++++++
docs/en/flink/configuration/source-plugins/Druid.md | 5 +++++
docs/en/flink/configuration/source-plugins/File.md | 5 +++++
.../java/org/apache/seatunnel/flink/sink/DorisSink.java | 14 ++++++++++++--
.../java/org/apache/seatunnel/flink/sink/DruidSink.java | 8 +++++++-
.../org/apache/seatunnel/flink/source/DruidSource.java | 9 ++++++++-
.../org/apache/seatunnel/flink/sink/Elasticsearch.java | 14 +++++++++++++-
.../java/org/apache/seatunnel/flink/sink/FileSink.java | 9 ++++++++-
.../java/org/apache/seatunnel/flink/source/FileSource.java | 9 ++++++++-
.../java/org/apache/seatunnel/flink/sink/InfluxDbSink.java | 8 +++++++-
14 files changed, 101 insertions(+), 8 deletions(-)
diff --git a/docs/en/flink/configuration/sink-plugins/Doris.md b/docs/en/flink/configuration/sink-plugins/Doris.md
index 2278f97..88fe76e 100644
--- a/docs/en/flink/configuration/sink-plugins/Doris.md
+++ b/docs/en/flink/configuration/sink-plugins/Doris.md
@@ -17,6 +17,7 @@ Write Data to a Doris Table.
| interval | int | no |1000 | Flink |
| max_retries | int | no | 1 | Flink|
| doris.* | - | no | - | Flink |
+| parallelism | int | no | - |Flink|
##### fenodes [string]
@@ -55,6 +56,10 @@ Number of retries after writing Doris failed
The doris stream load parameters.you can use 'doris.' prefix + stream_load properties. eg:doris.column_separator' = ','
[More Doris stream_load Configurations](https://doris.apache.org/administrator-guide/load-data/stream-load-manual.html)
+### parallelism [Int]
+
+The parallelism of an individual operator, for DorisSink
+
### Examples
```
diff --git a/docs/en/flink/configuration/sink-plugins/Druid.md b/docs/en/flink/configuration/sink-plugins/Druid.md
index 7d95058..b235997 100644
--- a/docs/en/flink/configuration/sink-plugins/Druid.md
+++ b/docs/en/flink/configuration/sink-plugins/Druid.md
@@ -12,6 +12,7 @@ Write data to Apache Druid.
| datasource | `String` | yes | - |
| timestamp_column | `String` | no | timestamp |
| timestamp_format | `String` | no | auto |
+| parallelism | `Int` | no | - |
### coordinator_url [`String`]
@@ -49,6 +50,10 @@ The timestamp format in Apache Druid, the default value is `auto`, it could be:
- any [Joda DateTimeFormat](http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html) string
+### parallelism [`Int`]
+
+The parallelism of an individual operator, for DruidSink
+
## Example
### Simple
diff --git a/docs/en/flink/configuration/sink-plugins/Elasticsearch.md b/docs/en/flink/configuration/sink-plugins/Elasticsearch.md
index 5c9217e..99a1c04 100644
--- a/docs/en/flink/configuration/sink-plugins/Elasticsearch.md
+++ b/docs/en/flink/configuration/sink-plugins/Elasticsearch.md
@@ -13,6 +13,7 @@ Output data to ElasticSearch
| index_time_format | string | no | yyyy.MM.dd |
| index | string | no | seatunnel |
| common-options | string | no | - |
+| parallelism | int | no | - |
### hosts [array]
@@ -41,6 +42,10 @@ See [Java SimpleDateFormat](https://docs.oracle.com/javase/tutorial/i18n/format/
Elasticsearch `index` name. If you need to generate an `index` based on time, you can specify a time variable, such as `seatunnel-${now}` . `now` represents the current data processing time.
+### parallelism [`Int`]
+
+The parallelism of an individual operator, data source, or data sink
+
### common options [string]
Sink plugin common parameters, please refer to [Sink Plugin](./sink-plugin.md) for details
diff --git a/docs/en/flink/configuration/sink-plugins/File.md b/docs/en/flink/configuration/sink-plugins/File.md
index 879aed3..42d13b4 100644
--- a/docs/en/flink/configuration/sink-plugins/File.md
+++ b/docs/en/flink/configuration/sink-plugins/File.md
@@ -12,6 +12,8 @@ Write data to the file system
| path | string | yes | - |
| write_mode | string | no | - |
| common-options | string | no | - |
+| parallelism | int | no | - |
+
### format [string]
@@ -35,6 +37,11 @@ The file path is required. The `hdfs file` starts with `hdfs://` , and the `loca
Sink plugin common parameters, please refer to [Sink Plugin](./sink-plugin.md) for details
+### parallelism [`Int`]
+
+The parallelism of an individual operator, for FileSink
+
+
## Examples
```bash
diff --git a/docs/en/flink/configuration/sink-plugins/InfluxDb.md b/docs/en/flink/configuration/sink-plugins/InfluxDb.md
index 86f8ef0..d31b16f 100644
--- a/docs/en/flink/configuration/sink-plugins/InfluxDb.md
+++ b/docs/en/flink/configuration/sink-plugins/InfluxDb.md
@@ -15,6 +15,7 @@ Write data to InfluxDB.
| measurement | `String` | yes | - |
| tags | `List<String>` | yes | - |
| fields | `List<String>` | yes | - |
+| parallelism | `Int` | no | - |
### server_url [`String`]
@@ -44,6 +45,11 @@ The list of Tag in InfluxDB.
The list of Field in InfluxDB.
+### parallelism [`Int`]
+
+The parallelism of an individual operator, for InfluxDbSink
+
+
## Example
### Simple
diff --git a/docs/en/flink/configuration/source-plugins/Druid.md b/docs/en/flink/configuration/source-plugins/Druid.md
index 9512b30..e45f920 100644
--- a/docs/en/flink/configuration/source-plugins/Druid.md
+++ b/docs/en/flink/configuration/source-plugins/Druid.md
@@ -13,6 +13,7 @@ Read data from Apache Druid.
| start_date | `String` | no | - |
| end_date | `String` | no | - |
| columns | `List<String>` | no | `*` |
+| parallelism | `Int` | no | - |
### jdbc_url [`String`]
@@ -38,6 +39,10 @@ These columns that you want to query of DataSource.
Source Plugin common parameters, refer to [Source Plugin](./source-plugin.md) for details
+### parallelism [`Int`]
+
+The parallelism of an individual operator, for DruidSource
+
## Example
```hocon
diff --git a/docs/en/flink/configuration/source-plugins/File.md b/docs/en/flink/configuration/source-plugins/File.md
index 7bc2b54..cc3d87b 100644
--- a/docs/en/flink/configuration/source-plugins/File.md
+++ b/docs/en/flink/configuration/source-plugins/File.md
@@ -12,6 +12,7 @@ Read data from the file system
| path | string | yes | - |
| schema | string | yes | - |
| common-options | string | no | - |
+| parallelism | int | no | - |
### format.type [string]
@@ -47,6 +48,10 @@ The file path is required. The `hdfs file` starts with `hdfs://` , and the `loca
Source plugin common parameters, please refer to [Source Plugin](./source-plugin.md) for details
+### parallelism [`Int`]
+
+The parallelism of an individual operator, for FileSource
+
## Examples
```bash
diff --git a/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java b/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java
index c92829b..5709942 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java
@@ -44,6 +44,7 @@ public class DorisSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row,
private static final long serialVersionUID = 4747849769146047770L;
private static final int DEFAULT_BATCH_SIZE = 100;
private static final long DEFAULT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1);
+ private static final String PARALLELISM = "parallelism";
private Config config;
private String fenodes;
@@ -103,7 +104,12 @@ public class DorisSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row,
String[] fieldNames = table.getSchema().getFieldNames();
DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(fenodes, dbName, tableName, username, password, streamLoadProp);
- return dataSet.output(new DorisOutputFormat<>(dorisStreamLoad, fieldNames, batchSize, batchIntervalMs, maxRetries));
+ DataSink<Row> rowDataSink = dataSet.output(new DorisOutputFormat<>(dorisStreamLoad, fieldNames, batchSize, batchIntervalMs, maxRetries));
+ if (config.hasPath(PARALLELISM)) {
+ int parallelism = config.getInt(PARALLELISM);
+ return rowDataSink.setParallelism(parallelism);
+ }
+ return rowDataSink;
}
@Override
@@ -113,7 +119,11 @@ public class DorisSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row,
String[] fieldNames = table.getSchema().getFieldNames();
DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(fenodes, dbName, tableName, username, password, streamLoadProp);
- dataStream.addSink(new DorisSinkFunction<>(new DorisOutputFormat<>(dorisStreamLoad, fieldNames, batchSize, batchIntervalMs, maxRetries)));
+ DataStreamSink<Row> rowDataStreamSink = dataStream.addSink(new DorisSinkFunction<>(new DorisOutputFormat<>(dorisStreamLoad, fieldNames, batchSize, batchIntervalMs, maxRetries)));
+ if (config.hasPath(PARALLELISM)) {
+ int parallelism = config.getInt(PARALLELISM);
+ rowDataStreamSink.setParallelism(parallelism);
+ }
return null;
}
}
diff --git a/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java b/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
index 83f6f87..52ea868 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
@@ -35,6 +35,7 @@ public class DruidSink implements FlinkBatchSink<Row, Row> {
private static final String DATASOURCE = "datasource";
private static final String TIMESTAMP_COLUMN = "timestamp_column";
private static final String TIMESTAMP_FORMAT = "timestamp_format";
+ private static final String PARALLELISM = "parallelism";
private Config config;
private String coordinatorURL;
@@ -44,7 +45,12 @@ public class DruidSink implements FlinkBatchSink<Row, Row> {
@Override
public DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
- return dataSet.output(new DruidOutputFormat(coordinatorURL, datasource, timestampColumn, timestampFormat));
+ DataSink<Row> dataSink = dataSet.output(new DruidOutputFormat(coordinatorURL, datasource, timestampColumn, timestampFormat));
+ if (config.hasPath(PARALLELISM)) {
+ int parallelism = config.getInt(PARALLELISM);
+ return dataSink.setParallelism(parallelism);
+ }
+ return dataSink;
}
@Override
diff --git a/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/source/DruidSource.java b/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/source/DruidSource.java
index 1b57162..b271ff0 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/source/DruidSource.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/source/DruidSource.java
@@ -37,6 +37,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
@@ -64,6 +65,7 @@ public class DruidSource implements FlinkBatchSource<Row> {
private static final String START_TIMESTAMP = "start_date";
private static final String END_TIMESTAMP = "end_date";
private static final String COLUMNS = "columns";
+ private static final String PARALLELISM = "parallelism";
private HashMap<String, TypeInformation> informationMapping = new HashMap<>();
@@ -86,7 +88,12 @@ public class DruidSource implements FlinkBatchSource<Row> {
@Override
public DataSet<Row> getData(FlinkEnvironment env) {
- return env.getBatchEnvironment().createInput(druidInputFormat);
+ DataSource<Row> dataSource = env.getBatchEnvironment().createInput(druidInputFormat);
+ if (config.hasPath(PARALLELISM)) {
+ int parallelism = config.getInt(PARALLELISM);
+ return dataSource.setParallelism(parallelism);
+ }
+ return dataSource;
}
@Override
diff --git a/seatunnel-connectors/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/sink/Elasticsearch.java b/seatunnel-connectors/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/sink/Elasticsearch.java
index ed9697e..bf1744a 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/sink/Elasticsearch.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/sink/Elasticsearch.java
@@ -50,6 +50,7 @@ public class Elasticsearch implements FlinkStreamSink<Row, Row>, FlinkBatchSink<
private static final long serialVersionUID = 8445868321245456793L;
private static final int DEFAULT_CONFIG_SIZE = 3;
+ private static final String PARALLELISM = "parallelism";
private Config config;
private String indexName;
@@ -120,6 +121,10 @@ public class Elasticsearch implements FlinkStreamSink<Row, Row>, FlinkBatchSink<
esSinkBuilder.setBulkFlushMaxActions(1);
// finally, build and add the sink to the job's pipeline
+ if (config.hasPath(PARALLELISM)) {
+ int parallelism = config.getInt(PARALLELISM);
+ return dataStream.addSink(esSinkBuilder.build()).setParallelism(parallelism);
+ }
return dataStream.addSink(esSinkBuilder.build());
}
@@ -129,7 +134,7 @@ public class Elasticsearch implements FlinkStreamSink<Row, Row>, FlinkBatchSink<
RowTypeInfo rowTypeInfo = (RowTypeInfo) dataSet.getType();
String[] fieldNames = rowTypeInfo.getFieldNames();
indexName = StringTemplate.substitute(config.getString("index"), config.getString("index_time_format"));
- return dataSet.output(new ElasticsearchOutputFormat<>(config, new ElasticsearchSinkFunction<Row>() {
+ DataSink<Row> dataSink = dataSet.output(new ElasticsearchOutputFormat<>(config, new ElasticsearchSinkFunction<Row>() {
@Override
public void process(Row element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
@@ -148,5 +153,12 @@ public class Elasticsearch implements FlinkStreamSink<Row, Row>, FlinkBatchSink<
.source(json);
}
}));
+
+ if (config.hasPath(PARALLELISM)) {
+ int parallelism = config.getInt(PARALLELISM);
+ return dataSink.setParallelism(parallelism);
+ }
+ return dataSink;
+
}
}
diff --git a/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java b/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
index bc02392..91be72a 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
@@ -51,6 +51,7 @@ public class FileSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row,
private static final String PATH = "path";
private static final String FORMAT = "format";
private static final String WRITE_MODE = "write_mode";
+ private static final String PARALLELISM = "parallelism";
private Config config;
@@ -95,7 +96,13 @@ public class FileSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row,
String mode = config.getString(WRITE_MODE);
outputFormat.setWriteMode(FileSystem.WriteMode.valueOf(mode));
}
- return dataSet.output(outputFormat);
+
+ DataSink dataSink = dataSet.output(outputFormat);
+ if (config.hasPath(PARALLELISM)) {
+ int parallelism = config.getInt(PARALLELISM);
+ return dataSink.setParallelism(parallelism);
+ }
+ return dataSink;
}
@Override
diff --git a/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/source/FileSource.java b/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/source/FileSource.java
index f3439f7..f1cb792 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/source/FileSource.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/source/FileSource.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.io.RowCsvInputFormat;
+import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.ParquetRowInputFormat;
@@ -54,10 +55,16 @@ public class FileSource implements FlinkBatchSource<Row> {
private static final String PATH = "path";
private static final String SOURCE_FORMAT = "format.type";
private static final String SCHEMA = "schema";
+ private static final String PARALLELISM = "parallelism";
@Override
public DataSet<Row> getData(FlinkEnvironment env) {
- return env.getBatchEnvironment().createInput(inputFormat);
+ DataSource dataSource = env.getBatchEnvironment().createInput(inputFormat);
+ if (config.hasPath(PARALLELISM)) {
+ int parallelism = config.getInt(PARALLELISM);
+ return dataSource.setParallelism(parallelism);
+ }
+ return dataSource;
}
@Override
diff --git a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java
index 72be132..82209b4 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java
@@ -40,6 +40,7 @@ public class InfluxDbSink implements FlinkBatchSink<Row, Row> {
private static final String MEASUREMENT = "measurement";
private static final String TAGS = "tags";
private static final String FIELDS = "fields";
+ private static final String PARALLELISM = "parallelism";
private Config config;
private String serverURL;
@@ -52,7 +53,12 @@ public class InfluxDbSink implements FlinkBatchSink<Row, Row> {
@Override
public DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
- return dataSet.output(new InfluxDbOutputFormat(serverURL, username, password, database, measurement, tags, fields));
+ DataSink<Row> dataSink = dataSet.output(new InfluxDbOutputFormat(serverURL, username, password, database, measurement, tags, fields));
+ if (config.hasPath(PARALLELISM)) {
+ int parallelism = config.getInt(PARALLELISM);
+ return dataSink.setParallelism(parallelism);
+ }
+ return dataSink;
}
@Override