You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wu...@apache.org on 2022/03/02 07:41:28 UTC

[incubator-seatunnel] branch dev updated: [Improve][Connectors] Add description for system variables ${now}${uuid} in file-connector and support for flink (#1370)

This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new dcf3797  [Improve][Connectors] Add description for system variables ${now}${uuid} in file-connector and support for flink (#1370)
dcf3797 is described below

commit dcf37972e39384c2f7803f0090e7ed987d081ad7
Author: Simon <zh...@cvte.com>
AuthorDate: Wed Mar 2 15:41:21 2022 +0800

    [Improve][Connectors] Add description for system variables ${now}${uuid} in file-connector and support for flink (#1370)
    
    * fileimprove
    
    * doc
---
 docs/en/flink/configuration/sink-plugins/File.md     | 20 +++++++++++++++++++-
 docs/en/spark/configuration/sink-plugins/File.md     |  4 +++-
 .../org/apache/seatunnel/flink/sink/FileSink.java    |  7 +++++--
 3 files changed, 27 insertions(+), 4 deletions(-)

diff --git a/docs/en/flink/configuration/sink-plugins/File.md b/docs/en/flink/configuration/sink-plugins/File.md
index 161107c..fd2da5d 100644
--- a/docs/en/flink/configuration/sink-plugins/File.md
+++ b/docs/en/flink/configuration/sink-plugins/File.md
@@ -12,6 +12,7 @@ Write data to the file system
 | -------------- | ------ | -------- | ------------- |
 | format         | string | yes      | -             |
 | path           | string | yes      | -             |
+| path_time_format | string | no       | yyyyMMddHHmmss |
 | write_mode     | string | no       | -             |
 | common-options | string | no       | -             |
 | parallelism    | int    | no       | -             |
@@ -23,7 +24,24 @@ Currently, `csv` , `json` , and `text` are supported. The streaming mode current
 
 ### path [string]
 
-The file path is required. The `hdfs file` starts with `hdfs://` , and the `local file` starts with `file://` .
+The file path is required. The `hdfs file` starts with `hdfs://` , and the `local file` starts with `file://`,
+we can add the variable `${now}` or `${uuid}` in the path, like `hdfs:///test_${uuid}_${now}.txt`, 
+`${now}` represents the current time, and its format can be defined by specifying the option `path_time_format`
+
+### path_time_format [string]
+
+When the format in the `path` parameter is `xxxx-${now}` , `path_time_format` can specify the time format of the path, and the default value is `yyyy.MM.dd` . The commonly used time formats are listed as follows:
+
+| Symbol | Description        |
+| ------ | ------------------ |
+| y      | Year               |
+| M      | Month              |
+| d      | Day of month       |
+| H      | Hour in day (0-23) |
+| m      | Minute in hour     |
+| s      | Second in minute   |
+
+See [Java SimpleDateFormat](https://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html) for detailed time format syntax.
 
 ### write_mode [string]
 
diff --git a/docs/en/spark/configuration/sink-plugins/File.md b/docs/en/spark/configuration/sink-plugins/File.md
index e79419b..077db05 100644
--- a/docs/en/spark/configuration/sink-plugins/File.md
+++ b/docs/en/spark/configuration/sink-plugins/File.md
@@ -28,7 +28,9 @@ Partition data based on selected fields
 
 ### path [string]
 
-Output file path, starting with `file://` or  `hdfs://`
+The file path is required. The `hdfs file` starts with `hdfs://` , and the `local file` starts with `file://`,
+we can add the variable `${now}` or `${uuid}` in the path, like `hdfs:///test_${uuid}_${now}.txt`, 
+`${now}` represents the current time, and its format can be defined by specifying the option `path_time_format`
 
 ### path_time_format [string]
 
diff --git a/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java b/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
index 9bf10a8..bc39de9 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.flink.sink;
 
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.utils.StringTemplate;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchSink;
 import org.apache.seatunnel.flink.stream.FlinkStreamSink;
@@ -52,7 +53,8 @@ public class FileSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row,
     private static final String FORMAT = "format";
     private static final String WRITE_MODE = "write_mode";
     private static final String PARALLELISM = "parallelism";
-
+    private static final String PATH_TIME_FORMAT = "path_time_format";
+    private static final String DEFAULT_TIME_FORMAT = "yyyyMMddHHmmss";
     private Config config;
 
     private FileOutputFormat outputFormat;
@@ -121,7 +123,8 @@ public class FileSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row,
 
     @Override
     public void prepare(FlinkEnvironment env) {
-        String path = config.getString(PATH);
+        String format = config.hasPath(PATH_TIME_FORMAT) ? config.getString(PATH_TIME_FORMAT) : DEFAULT_TIME_FORMAT;
+        String path = StringTemplate.substitute(config.getString(PATH), format);
         filePath = new Path(path);
     }
 }