You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wu...@apache.org on 2022/03/02 07:41:28 UTC
[incubator-seatunnel] branch dev updated: [Improve][Connectors] Add description for system variables ${now}${uuid} in file-connector and support for flink (#1370)
This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new dcf3797 [Improve][Connectors] Add description for system variables ${now}${uuid} in file-connector and support for flink (#1370)
dcf3797 is described below
commit dcf37972e39384c2f7803f0090e7ed987d081ad7
Author: Simon <zh...@cvte.com>
AuthorDate: Wed Mar 2 15:41:21 2022 +0800
[Improve][Connectors] Add description for system variables ${now}${uuid} in file-connector and support for flink (#1370)
* fileimprove
* doc
---
docs/en/flink/configuration/sink-plugins/File.md | 20 +++++++++++++++++++-
docs/en/spark/configuration/sink-plugins/File.md | 4 +++-
.../org/apache/seatunnel/flink/sink/FileSink.java | 7 +++++--
3 files changed, 27 insertions(+), 4 deletions(-)
diff --git a/docs/en/flink/configuration/sink-plugins/File.md b/docs/en/flink/configuration/sink-plugins/File.md
index 161107c..fd2da5d 100644
--- a/docs/en/flink/configuration/sink-plugins/File.md
+++ b/docs/en/flink/configuration/sink-plugins/File.md
@@ -12,6 +12,7 @@ Write data to the file system
| -------------- | ------ | -------- | ------------- |
| format | string | yes | - |
| path | string | yes | - |
+| path_time_format | string | no | yyyyMMddHHmmss |
| write_mode | string | no | - |
| common-options | string | no | - |
| parallelism | int | no | - |
@@ -23,7 +24,24 @@ Currently, `csv` , `json` , and `text` are supported. The streaming mode current
### path [string]
-The file path is required. The `hdfs file` starts with `hdfs://` , and the `local file` starts with `file://` .
+The file path is required. The `hdfs file` starts with `hdfs://` , and the `local file` starts with `file://`,
+we can add the variable `${now}` or `${uuid}` in the path, like `hdfs:///test_${uuid}_${now}.txt`,
+`${now}` represents the current time, and its format can be defined by specifying the option `path_time_format`
+
+### path_time_format [string]
+
+When the format in the `path` parameter is `xxxx-${now}` , `path_time_format` can specify the time format of the path, and the default value is `yyyy.MM.dd` . The commonly used time formats are listed as follows:
+
+| Symbol | Description |
+| ------ | ------------------ |
+| y | Year |
+| M | Month |
+| d | Day of month |
+| H | Hour in day (0-23) |
+| m | Minute in hour |
+| s | Second in minute |
+
+See [Java SimpleDateFormat](https://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html) for detailed time format syntax.
### write_mode [string]
diff --git a/docs/en/spark/configuration/sink-plugins/File.md b/docs/en/spark/configuration/sink-plugins/File.md
index e79419b..077db05 100644
--- a/docs/en/spark/configuration/sink-plugins/File.md
+++ b/docs/en/spark/configuration/sink-plugins/File.md
@@ -28,7 +28,9 @@ Partition data based on selected fields
### path [string]
-Output file path, starting with `file://` or `hdfs://`
+The file path is required. The `hdfs file` starts with `hdfs://` , and the `local file` starts with `file://`,
+we can add the variable `${now}` or `${uuid}` in the path, like `hdfs:///test_${uuid}_${now}.txt`,
+`${now}` represents the current time, and its format can be defined by specifying the option `path_time_format`
### path_time_format [string]
diff --git a/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java b/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
index 9bf10a8..bc39de9 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.flink.sink;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.utils.StringTemplate;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSink;
import org.apache.seatunnel.flink.stream.FlinkStreamSink;
@@ -52,7 +53,8 @@ public class FileSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row,
private static final String FORMAT = "format";
private static final String WRITE_MODE = "write_mode";
private static final String PARALLELISM = "parallelism";
-
+ private static final String PATH_TIME_FORMAT = "path_time_format";
+ private static final String DEFAULT_TIME_FORMAT = "yyyyMMddHHmmss";
private Config config;
private FileOutputFormat outputFormat;
@@ -121,7 +123,8 @@ public class FileSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row,
@Override
public void prepare(FlinkEnvironment env) {
- String path = config.getString(PATH);
+ String format = config.hasPath(PATH_TIME_FORMAT) ? config.getString(PATH_TIME_FORMAT) : DEFAULT_TIME_FORMAT;
+ String path = StringTemplate.substitute(config.getString(PATH), format);
filePath = new Path(path);
}
}