You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/17 07:53:26 UTC

[GitHub] [flink] fapaul commented on a change in pull request #18288: [FLINK-20188][Connectors][Docs][FileSystem] Added documentation for File Source

fapaul commented on a change in pull request #18288:
URL: https://github.com/apache/flink/pull/18288#discussion_r785706129



##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -25,12 +27,243 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` that reads or writes (partitioned) files to filesystems
 supported by the [Flink `FileSystem` abstraction]({{< ref "docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it is an evolution of the 
-existing [Streaming File Sink]({{< ref "docs/connectors/datastream/streamfile_sink" >}}) which was designed for providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is designed for providing exactly-once semantics for `STREAMING` execution.
+
+The connector supports reading and writing a set of files from any (distributed) file system (e.g. POSIX, S3, HDFS)
+with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) (e.g., Avro, CSV, Parquet),
+producing a stream or records.
+
+## File Source
+
+The `File Source` is based on the [Source API]({{< ref "docs/dev/datastream/sources" >}}#the-data-source-api), 
+a unified data source that reads files - both in batch and in streaming mode. 
+It is divided into the following two parts: File SplitEnumerator and File SourceReader. 
+
+* File `SplitEnumerator` is responsible for discovering and identifying the files to read and assigns them to the File SourceReader.
+* File `SourceReader` requests the files it needs to process and reads the file from the filesystem. 
+
+You'll need to combine the File Source with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}). This allows you to
+parse CSV, decode AVRO or read Parquet columnar files.
+
+#### Bounded and Unbounded Streams
+
+A bounded `File Source` lists all files (via SplitEnumerator, for example a recursive directory list with filtered-out hidden files) and reads them all.
+
+An unbounded `File Source` is created when configuring the enumerator for periodic file discovery.
+In that case, the SplitEnumerator will enumerate like the bounded case but after a certain interval repeats the enumeration.
+For any repeated enumeration, the `SplitEnumerator` filters out previously detect files and only sends new ones to the `SourceReader`.
+
+### Usage
+
+You start building a file source via one of the following calls:
+
+{{< tabs "FileSourceUsage" >}}
+{{< tab "Java" >}}
+```java
+// reads the contents of a file from a file stream. 
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+        
+// reads batches of records from a file at a time
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+This creates a `FileSource.FileSourceBuilder` on which you can configure all the properties of the file source.
+
+For the bounded/batch case, the file source processes all files under the given path(s). 
+In the continuous/streaming case, the source periodically checks the paths for new files and will start reading those.
+
+When you start creating a file source (via the `FileSource.FileSourceBuilder` created through one of the above-mentioned methods) 
+the source is by default in bounded/batch mode. Call `AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` 
+to put the source into continuous streaming mode.
+
+{{< tabs "FileSourceBuilder" >}}
+{{< tab "Java" >}}
+```java
+final FileSource<String> source =
+        FileSource.forRecordStreamFormat(...)
+        .monitorContinuously(Duration.ofMillis(5))  
+        .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Format Types
+
+The reading of each file happens through file readers defined by file formats. 
+These define the parsing logic for the contents of the file. There are multiple classes that the source supports. 
+Their interfaces trade of simplicity of implementation and flexibility/efficiency.
+
+* A `StreamFormat` reads the contents of a file from a file stream. It is the simplest format to implement, 
+and provides many features out-of-the-box (like checkpointing logic) but is limited in the optimizations it can apply 
+(such as object reuse, batching, etc.).
+
+* A `BulkFormat` reads batches of records from a file at a time. 
+It is the most "low level" format to implement, but offers the greatest flexibility to optimize the implementation.
+
+#### TextLine format
+
+A `StreamFormat` reader format that text lines from a file.
+The reader uses Java's built-in `InputStreamReader` to decode the byte stream using
+various supported charset encodings.
+This format does not support optimized recovery from checkpoints. On recovery, it will re-read
+and discard the number of lined that were processed before the last checkpoint. That is due to
+the fact that the offsets of lines in the file cannot be tracked through the charset decoders
+with their internal buffering of stream input and charset decoder state.
+
+#### SimpleStreamFormat Abstract Class
+
+A simple version of `StreamFormat` for formats that are not splittable.
+Custom reads of Array or File can be done by implementing `SimpleStreamFormat`:
+
+{{< tabs "SimpleStreamFormat" >}}
+{{< tab "Java" >}}
+```java
+private static final class ArrayReaderFormat extends SimpleStreamFormat<byte[]> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public Reader<byte[]> createReader(Configuration config, FSDataInputStream stream)
+            throws IOException {
+        return new ArrayReader(stream);
+    }
+
+    @Override
+    public TypeInformation<byte[]> getProducedType() {
+        return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+    }
+}
+
+final FileSource<byte[]> source =
+                FileSource.forRecordStreamFormat(new ArrayReaderFormat(), path).build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+An example of a `SimpleStreamFormat` is `CsvReaderFormat`. It can be initialized as follows:
+```java
+CsvReaderFormat<SomePojo> csvFormat = CsvReaderFormat.forPojo(SomePojo.class);
+FileSource<SomePojo> source = 
+        FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();
+```
+
+The schema for CSV parsing, in this case, is automatically derived based on the fields of the `SomePojo` class using the `Jackson` library. (Note: you might need to add `@JsonPropertyOrder({field1, field2, ...})` annotation to your class definition with the fields order exactly matching those of the CSV file columns).
+
+If you need more fine-grained control over the CSV schema or the parsing options, use the more low-level `forSchema` static factory method of `CsvReaderFormat`:
+
+```java
+CsvReaderFormat<T> forSchema(CsvMapper mapper, 
+                             CsvSchema schema, 
+                             TypeInformation<T> typeInformation) 
+```
+
+#### Bulk Format
+
+The BulkFormat reads and decodes batches of records at a time. Examples of bulk formats
+are formats like ORC or Parquet.
+The outer `BulkFormat` class acts mainly as a configuration holder and factory for the
+reader. The actual reading is done by the `BulkFormat.Reader`, which is created in the
+`BulkFormat#createReader(Configuration, FileSourceSplit)` method. If a bulk reader is
+created based on a checkpoint during checkpointed streaming execution, then the reader is
+re-created in the `BulkFormat#restoreReader(Configuration, FileSourceSplit)` method.
+
+A `SimpleStreamFormat` can be turned into a `BulkFormat` by wrapping it in a `StreamFormatAdapter`:
+```java
+BulkFormat<SomePojo, FileSourceSplit> bulkFormat = 
+        new StreamFormatAdapter<>(CsvReaderFormat.forPojo(SomePojo.class));
+```
+
+### Customizing File Enumeration
+
+{{< tabs "CustomizingFileEnumeration" >}}
+{{< tab "Java" >}}
+```java
+/**
+ * A FileEnumerator implementation for hive source, which generates splits based on 
+ * HiveTablePartition.
+ */
+public class HiveSourceFileEnumerator implements FileEnumerator {
+    
+    // reference constructor
+    public HiveSourceFileEnumerator(...) {
+        ...
+    }
+
+    /***
+     * Generates all file splits for the relevant files under the given paths. The {@code
+     * minDesiredSplits} is an optional hint indicating how many splits would be necessary to
+     * exploit parallelism properly.
+     */
+    @Override
+    public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
+            throws IOException {
+        // createInputSplits:splitting files into fragmented collections
+        return new ArrayList<>(createInputSplits(...));
+    }
+
+    ...
+
+    /***
+     * A factory to create HiveSourceFileEnumerator.
+     */
+    public static class Provider implements FileEnumerator.Provider {
+
+        ...
+        @Override
+        public FileEnumerator create() {
+            return new HiveSourceFileEnumerator(...);
+        }
+    }
+}
+// use the customizing file enumeration
+new HiveSource<>(
+        ...,
+        new HiveSourceFileEnumerator.Provider(
+        partitions != null ? partitions : Collections.emptyList(),
+        new JobConfWrapper(jobConf)),
+       ...);
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Current Limitations
+
+Watermarking doesn't work particularly well for large backlogs of files, because watermarks eagerly advance within a file, and the next file might contain data later than the watermark again.
+We are looking at ways to generate the watermarks more based on global information.
+
+For Unbounded File Sources, the enumerator currently remembers paths of all already processed files, which is a state that can in come cases grow rather large.

Review comment:
       ```suggestion
   For Unbounded File Sources, the enumerator currently remembers paths of all already processed files, which is a state that can in some cases grow rather large.
   ```

##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -25,12 +27,243 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` that reads or writes (partitioned) files to filesystems
 supported by the [Flink `FileSystem` abstraction]({{< ref "docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it is an evolution of the 
-existing [Streaming File Sink]({{< ref "docs/connectors/datastream/streamfile_sink" >}}) which was designed for providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is designed for providing exactly-once semantics for `STREAMING` execution.
+
+The connector supports reading and writing a set of files from any (distributed) file system (e.g. POSIX, S3, HDFS)
+with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) (e.g., Avro, CSV, Parquet),
+producing a stream or records.
+
+## File Source
+
+The `File Source` is based on the [Source API]({{< ref "docs/dev/datastream/sources" >}}#the-data-source-api), 
+a unified data source that reads files - both in batch and in streaming mode. 
+It is divided into the following two parts: File SplitEnumerator and File SourceReader. 
+
+* File `SplitEnumerator` is responsible for discovering and identifying the files to read and assigns them to the File SourceReader.
+* File `SourceReader` requests the files it needs to process and reads the file from the filesystem. 
+
+You'll need to combine the File Source with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}). This allows you to
+parse CSV, decode AVRO or read Parquet columnar files.
+
+#### Bounded and Unbounded Streams
+
+A bounded `File Source` lists all files (via SplitEnumerator, for example a recursive directory list with filtered-out hidden files) and reads them all.
+
+An unbounded `File Source` is created when configuring the enumerator for periodic file discovery.
+In that case, the SplitEnumerator will enumerate like the bounded case but after a certain interval repeats the enumeration.
+For any repeated enumeration, the `SplitEnumerator` filters out previously detect files and only sends new ones to the `SourceReader`.
+
+### Usage
+
+You start building a file source via one of the following calls:
+
+{{< tabs "FileSourceUsage" >}}
+{{< tab "Java" >}}
+```java
+// reads the contents of a file from a file stream. 
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+        
+// reads batches of records from a file at a time
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+This creates a `FileSource.FileSourceBuilder` on which you can configure all the properties of the file source.
+
+For the bounded/batch case, the file source processes all files under the given path(s). 
+In the continuous/streaming case, the source periodically checks the paths for new files and will start reading those.
+
+When you start creating a file source (via the `FileSource.FileSourceBuilder` created through one of the above-mentioned methods) 
+the source is by default in bounded/batch mode. Call `AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` 
+to put the source into continuous streaming mode.
+
+{{< tabs "FileSourceBuilder" >}}
+{{< tab "Java" >}}
+```java
+final FileSource<String> source =
+        FileSource.forRecordStreamFormat(...)
+        .monitorContinuously(Duration.ofMillis(5))  
+        .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Format Types
+
+The reading of each file happens through file readers defined by file formats. 
+These define the parsing logic for the contents of the file. There are multiple classes that the source supports. 
+Their interfaces trade of simplicity of implementation and flexibility/efficiency.
+
+* A `StreamFormat` reads the contents of a file from a file stream. It is the simplest format to implement, 
+and provides many features out-of-the-box (like checkpointing logic) but is limited in the optimizations it can apply 
+(such as object reuse, batching, etc.).
+
+* A `BulkFormat` reads batches of records from a file at a time. 
+It is the most "low level" format to implement, but offers the greatest flexibility to optimize the implementation.
+
+#### TextLine format
+
+A `StreamFormat` reader format that text lines from a file.
+The reader uses Java's built-in `InputStreamReader` to decode the byte stream using
+various supported charset encodings.
+This format does not support optimized recovery from checkpoints. On recovery, it will re-read
+and discard the number of lined that were processed before the last checkpoint. That is due to
+the fact that the offsets of lines in the file cannot be tracked through the charset decoders
+with their internal buffering of stream input and charset decoder state.
+
+#### SimpleStreamFormat Abstract Class
+
+A simple version of `StreamFormat` for formats that are not splittable.
+Custom reads of Array or File can be done by implementing `SimpleStreamFormat`:
+
+{{< tabs "SimpleStreamFormat" >}}
+{{< tab "Java" >}}
+```java
+private static final class ArrayReaderFormat extends SimpleStreamFormat<byte[]> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public Reader<byte[]> createReader(Configuration config, FSDataInputStream stream)
+            throws IOException {
+        return new ArrayReader(stream);
+    }
+
+    @Override
+    public TypeInformation<byte[]> getProducedType() {
+        return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+    }
+}
+
+final FileSource<byte[]> source =
+                FileSource.forRecordStreamFormat(new ArrayReaderFormat(), path).build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+An example of a `SimpleStreamFormat` is `CsvReaderFormat`. It can be initialized as follows:
+```java
+CsvReaderFormat<SomePojo> csvFormat = CsvReaderFormat.forPojo(SomePojo.class);
+FileSource<SomePojo> source = 
+        FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();
+```
+
+The schema for CSV parsing, in this case, is automatically derived based on the fields of the `SomePojo` class using the `Jackson` library. (Note: you might need to add `@JsonPropertyOrder({field1, field2, ...})` annotation to your class definition with the fields order exactly matching those of the CSV file columns).
+
+If you need more fine-grained control over the CSV schema or the parsing options, use the more low-level `forSchema` static factory method of `CsvReaderFormat`:
+
+```java
+CsvReaderFormat<T> forSchema(CsvMapper mapper, 
+                             CsvSchema schema, 
+                             TypeInformation<T> typeInformation) 
+```
+
+#### Bulk Format
+
+The BulkFormat reads and decodes batches of records at a time. Examples of bulk formats
+are formats like ORC or Parquet.
+The outer `BulkFormat` class acts mainly as a configuration holder and factory for the
+reader. The actual reading is done by the `BulkFormat.Reader`, which is created in the
+`BulkFormat#createReader(Configuration, FileSourceSplit)` method. If a bulk reader is
+created based on a checkpoint during checkpointed streaming execution, then the reader is
+re-created in the `BulkFormat#restoreReader(Configuration, FileSourceSplit)` method.
+
+A `SimpleStreamFormat` can be turned into a `BulkFormat` by wrapping it in a `StreamFormatAdapter`:
+```java
+BulkFormat<SomePojo, FileSourceSplit> bulkFormat = 
+        new StreamFormatAdapter<>(CsvReaderFormat.forPojo(SomePojo.class));
+```
+
+### Customizing File Enumeration
+
+{{< tabs "CustomizingFileEnumeration" >}}
+{{< tab "Java" >}}
+```java
+/**
+ * A FileEnumerator implementation for hive source, which generates splits based on 
+ * HiveTablePartition.
+ */
+public class HiveSourceFileEnumerator implements FileEnumerator {
+    
+    // reference constructor
+    public HiveSourceFileEnumerator(...) {
+        ...
+    }
+
+    /***
+     * Generates all file splits for the relevant files under the given paths. The {@code
+     * minDesiredSplits} is an optional hint indicating how many splits would be necessary to
+     * exploit parallelism properly.
+     */
+    @Override
+    public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
+            throws IOException {
+        // createInputSplits:splitting files into fragmented collections
+        return new ArrayList<>(createInputSplits(...));
+    }
+
+    ...
+
+    /***
+     * A factory to create HiveSourceFileEnumerator.
+     */
+    public static class Provider implements FileEnumerator.Provider {
+
+        ...
+        @Override
+        public FileEnumerator create() {
+            return new HiveSourceFileEnumerator(...);
+        }
+    }
+}
+// use the customizing file enumeration
+new HiveSource<>(
+        ...,
+        new HiveSourceFileEnumerator.Provider(
+        partitions != null ? partitions : Collections.emptyList(),
+        new JobConfWrapper(jobConf)),
+       ...);
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Current Limitations
+
+Watermarking doesn't work particularly well for large backlogs of files, because watermarks eagerly advance within a file, and the next file might contain data later than the watermark again.
+We are looking at ways to generate the watermarks more based on global information.

Review comment:
       I cannot recall seeing any planned effort to fix this behavior.

##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -25,12 +27,243 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` that reads or writes (partitioned) files to filesystems
 supported by the [Flink `FileSystem` abstraction]({{< ref "docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it is an evolution of the 
-existing [Streaming File Sink]({{< ref "docs/connectors/datastream/streamfile_sink" >}}) which was designed for providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is designed for providing exactly-once semantics for `STREAMING` execution.
+
+The connector supports reading and writing a set of files from any (distributed) file system (e.g. POSIX, S3, HDFS)
+with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) (e.g., Avro, CSV, Parquet),
+producing a stream or records.
+
+## File Source
+
+The `File Source` is based on the [Source API]({{< ref "docs/dev/datastream/sources" >}}#the-data-source-api), 
+a unified data source that reads files - both in batch and in streaming mode. 
+It is divided into the following two parts: File SplitEnumerator and File SourceReader. 
+
+* File `SplitEnumerator` is responsible for discovering and identifying the files to read and assigns them to the File SourceReader.
+* File `SourceReader` requests the files it needs to process and reads the file from the filesystem. 
+
+You'll need to combine the File Source with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}). This allows you to
+parse CSV, decode AVRO or read Parquet columnar files.
+
+#### Bounded and Unbounded Streams
+
+A bounded `File Source` lists all files (via SplitEnumerator, for example a recursive directory list with filtered-out hidden files) and reads them all.
+
+An unbounded `File Source` is created when configuring the enumerator for periodic file discovery.
+In that case, the SplitEnumerator will enumerate like the bounded case but after a certain interval repeats the enumeration.
+For any repeated enumeration, the `SplitEnumerator` filters out previously detect files and only sends new ones to the `SourceReader`.
+
+### Usage
+
+You start building a file source via one of the following calls:
+
+{{< tabs "FileSourceUsage" >}}
+{{< tab "Java" >}}
+```java
+// reads the contents of a file from a file stream. 
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+        
+// reads batches of records from a file at a time
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+This creates a `FileSource.FileSourceBuilder` on which you can configure all the properties of the file source.
+
+For the bounded/batch case, the file source processes all files under the given path(s). 
+In the continuous/streaming case, the source periodically checks the paths for new files and will start reading those.
+
+When you start creating a file source (via the `FileSource.FileSourceBuilder` created through one of the above-mentioned methods) 
+the source is by default in bounded/batch mode. Call `AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` 
+to put the source into continuous streaming mode.
+
+{{< tabs "FileSourceBuilder" >}}
+{{< tab "Java" >}}
+```java
+final FileSource<String> source =
+        FileSource.forRecordStreamFormat(...)
+        .monitorContinuously(Duration.ofMillis(5))  
+        .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Format Types
+
+The reading of each file happens through file readers defined by file formats. 
+These define the parsing logic for the contents of the file. There are multiple classes that the source supports. 
+Their interfaces trade of simplicity of implementation and flexibility/efficiency.
+
+* A `StreamFormat` reads the contents of a file from a file stream. It is the simplest format to implement, 
+and provides many features out-of-the-box (like checkpointing logic) but is limited in the optimizations it can apply 
+(such as object reuse, batching, etc.).
+
+* A `BulkFormat` reads batches of records from a file at a time. 
+It is the most "low level" format to implement, but offers the greatest flexibility to optimize the implementation.
+
+#### TextLine format
+
+A `StreamFormat` reader format that text lines from a file.
+The reader uses Java's built-in `InputStreamReader` to decode the byte stream using
+various supported charset encodings.
+This format does not support optimized recovery from checkpoints. On recovery, it will re-read
+and discard the number of lined that were processed before the last checkpoint. That is due to
+the fact that the offsets of lines in the file cannot be tracked through the charset decoders
+with their internal buffering of stream input and charset decoder state.
+
+#### SimpleStreamFormat Abstract Class
+
+A simple version of `StreamFormat` for formats that are not splittable.
+Custom reads of Array or File can be done by implementing `SimpleStreamFormat`:
+
+{{< tabs "SimpleStreamFormat" >}}
+{{< tab "Java" >}}
+```java
+private static final class ArrayReaderFormat extends SimpleStreamFormat<byte[]> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public Reader<byte[]> createReader(Configuration config, FSDataInputStream stream)
+            throws IOException {
+        return new ArrayReader(stream);
+    }
+
+    @Override
+    public TypeInformation<byte[]> getProducedType() {
+        return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+    }
+}
+
+final FileSource<byte[]> source =
+                FileSource.forRecordStreamFormat(new ArrayReaderFormat(), path).build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+An example of a `SimpleStreamFormat` is `CsvReaderFormat`. It can be initialized as follows:
+```java
+CsvReaderFormat<SomePojo> csvFormat = CsvReaderFormat.forPojo(SomePojo.class);
+FileSource<SomePojo> source = 
+        FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();
+```
+
+The schema for CSV parsing, in this case, is automatically derived based on the fields of the `SomePojo` class using the `Jackson` library. (Note: you might need to add `@JsonPropertyOrder({field1, field2, ...})` annotation to your class definition with the fields order exactly matching those of the CSV file columns).
+
+If you need more fine-grained control over the CSV schema or the parsing options, use the more low-level `forSchema` static factory method of `CsvReaderFormat`:
+
+```java
+CsvReaderFormat<T> forSchema(CsvMapper mapper, 
+                             CsvSchema schema, 
+                             TypeInformation<T> typeInformation) 
+```
+
+#### Bulk Format
+
+The BulkFormat reads and decodes batches of records at a time. Examples of bulk formats
+are formats like ORC or Parquet.
+The outer `BulkFormat` class acts mainly as a configuration holder and factory for the
+reader. The actual reading is done by the `BulkFormat.Reader`, which is created in the
+`BulkFormat#createReader(Configuration, FileSourceSplit)` method. If a bulk reader is
+created based on a checkpoint during checkpointed streaming execution, then the reader is
+re-created in the `BulkFormat#restoreReader(Configuration, FileSourceSplit)` method.
+
+A `SimpleStreamFormat` can be turned into a `BulkFormat` by wrapping it in a `StreamFormatAdapter`:
+```java
+BulkFormat<SomePojo, FileSourceSplit> bulkFormat = 
+        new StreamFormatAdapter<>(CsvReaderFormat.forPojo(SomePojo.class));
+```
+
+### Customizing File Enumeration
+
+{{< tabs "CustomizingFileEnumeration" >}}
+{{< tab "Java" >}}
+```java
+/**
+ * A FileEnumerator implementation for hive source, which generates splits based on 
+ * HiveTablePartition.
+ */
+public class HiveSourceFileEnumerator implements FileEnumerator {
+    
+    // reference constructor
+    public HiveSourceFileEnumerator(...) {
+        ...
+    }
+
+    /***
+     * Generates all file splits for the relevant files under the given paths. The {@code
+     * minDesiredSplits} is an optional hint indicating how many splits would be necessary to
+     * exploit parallelism properly.
+     */
+    @Override
+    public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
+            throws IOException {
+        // createInputSplits:splitting files into fragmented collections
+        return new ArrayList<>(createInputSplits(...));
+    }
+
+    ...
+
+    /***
+     * A factory to create HiveSourceFileEnumerator.
+     */
+    public static class Provider implements FileEnumerator.Provider {
+
+        ...
+        @Override
+        public FileEnumerator create() {
+            return new HiveSourceFileEnumerator(...);
+        }
+    }
+}
+// use the customizing file enumeration
+new HiveSource<>(
+        ...,
+        new HiveSourceFileEnumerator.Provider(
+        partitions != null ? partitions : Collections.emptyList(),
+        new JobConfWrapper(jobConf)),
+       ...);
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Current Limitations
+
+Watermarking doesn't work particularly well for large backlogs of files, because watermarks eagerly advance within a file, and the next file might contain data later than the watermark again.
+We are looking at ways to generate the watermarks more based on global information.
+
+For Unbounded File Sources, the enumerator currently remembers paths of all already processed files, which is a state that can in come cases grow rather large.
+The future will be planned to add a compressed form of tracking already processed files in the future (for example by keeping modification timestamps lower boundaries).

Review comment:
       The delta lake source also hit this problem we probably have to fix this rather soonish.

##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -25,12 +27,243 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` that reads or writes (partitioned) files to filesystems
 supported by the [Flink `FileSystem` abstraction]({{< ref "docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it is an evolution of the 
-existing [Streaming File Sink]({{< ref "docs/connectors/datastream/streamfile_sink" >}}) which was designed for providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is designed for providing exactly-once semantics for `STREAMING` execution.
+
+The connector supports reading and writing a set of files from any (distributed) file system (e.g. POSIX, S3, HDFS)
+with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) (e.g., Avro, CSV, Parquet),
+producing a stream or records.
+
+## File Source
+
+The `File Source` is based on the [Source API]({{< ref "docs/dev/datastream/sources" >}}#the-data-source-api), 
+a unified data source that reads files - both in batch and in streaming mode. 
+It is divided into the following two parts: File SplitEnumerator and File SourceReader. 
+
+* File `SplitEnumerator` is responsible for discovering and identifying the files to read and assigns them to the File SourceReader.
+* File `SourceReader` requests the files it needs to process and reads the file from the filesystem. 
+
+You'll need to combine the File Source with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}). This allows you to
+parse CSV, decode AVRO or read Parquet columnar files.
+
+#### Bounded and Unbounded Streams
+
+A bounded `File Source` lists all files (via SplitEnumerator, for example a recursive directory list with filtered-out hidden files) and reads them all.
+
+An unbounded `File Source` is created when configuring the enumerator for periodic file discovery.
+In that case, the SplitEnumerator will enumerate like the bounded case but after a certain interval repeats the enumeration.
+For any repeated enumeration, the `SplitEnumerator` filters out previously detect files and only sends new ones to the `SourceReader`.
+
+### Usage
+
+You start building a file source via one of the following calls:
+
+{{< tabs "FileSourceUsage" >}}
+{{< tab "Java" >}}
+```java
+// reads the contents of a file from a file stream. 
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+        
+// reads batches of records from a file at a time
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+This creates a `FileSource.FileSourceBuilder` on which you can configure all the properties of the file source.
+
+For the bounded/batch case, the file source processes all files under the given path(s). 
+In the continuous/streaming case, the source periodically checks the paths for new files and will start reading those.
+
+When you start creating a file source (via the `FileSource.FileSourceBuilder` created through one of the above-mentioned methods) 
+the source is by default in bounded/batch mode. Call `AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` 
+to put the source into continuous streaming mode.
+
+{{< tabs "FileSourceBuilder" >}}
+{{< tab "Java" >}}
+```java
+final FileSource<String> source =
+        FileSource.forRecordStreamFormat(...)
+        .monitorContinuously(Duration.ofMillis(5))  
+        .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Format Types
+
+The reading of each file happens through file readers defined by file formats. 
+These define the parsing logic for the contents of the file. There are multiple classes that the source supports. 
+Their interfaces trade of simplicity of implementation and flexibility/efficiency.
+
+* A `StreamFormat` reads the contents of a file from a file stream. It is the simplest format to implement, 
+and provides many features out-of-the-box (like checkpointing logic) but is limited in the optimizations it can apply 
+(such as object reuse, batching, etc.).
+
+* A `BulkFormat` reads batches of records from a file at a time. 
+It is the most "low level" format to implement, but offers the greatest flexibility to optimize the implementation.
+
+#### TextLine format
+
+A `StreamFormat` reader format that text lines from a file.
+The reader uses Java's built-in `InputStreamReader` to decode the byte stream using
+various supported charset encodings.
+This format does not support optimized recovery from checkpoints. On recovery, it will re-read
+and discard the number of lined that were processed before the last checkpoint. That is due to
+the fact that the offsets of lines in the file cannot be tracked through the charset decoders
+with their internal buffering of stream input and charset decoder state.
+
+#### SimpleStreamFormat Abstract Class
+
+A simple version of `StreamFormat` for formats that are not splittable.
+Custom reads of Array or File can be done by implementing `SimpleStreamFormat`:
+
+{{< tabs "SimpleStreamFormat" >}}
+{{< tab "Java" >}}
+```java
+private static final class ArrayReaderFormat extends SimpleStreamFormat<byte[]> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public Reader<byte[]> createReader(Configuration config, FSDataInputStream stream)
+            throws IOException {
+        return new ArrayReader(stream);
+    }
+
+    @Override
+    public TypeInformation<byte[]> getProducedType() {
+        return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+    }
+}
+
+final FileSource<byte[]> source =
+                FileSource.forRecordStreamFormat(new ArrayReaderFormat(), path).build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+An example of a `SimpleStreamFormat` is `CsvReaderFormat`. It can be initialized as follows:
+```java
+CsvReaderFormat<SomePojo> csvFormat = CsvReaderFormat.forPojo(SomePojo.class);
+FileSource<SomePojo> source = 
+        FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();
+```
+
+The schema for CSV parsing, in this case, is automatically derived based on the fields of the `SomePojo` class using the `Jackson` library. (Note: you might need to add `@JsonPropertyOrder({field1, field2, ...})` annotation to your class definition with the fields order exactly matching those of the CSV file columns).
+
+If you need more fine-grained control over the CSV schema or the parsing options, use the more low-level `forSchema` static factory method of `CsvReaderFormat`:
+
+```java
+CsvReaderFormat<T> forSchema(CsvMapper mapper, 
+                             CsvSchema schema, 
+                             TypeInformation<T> typeInformation) 
+```
+
+#### Bulk Format
+
+The BulkFormat reads and decodes batches of records at a time. Examples of bulk formats
+are formats like ORC or Parquet.
+The outer `BulkFormat` class acts mainly as a configuration holder and factory for the
+reader. The actual reading is done by the `BulkFormat.Reader`, which is created in the
+`BulkFormat#createReader(Configuration, FileSourceSplit)` method. If a bulk reader is
+created based on a checkpoint during checkpointed streaming execution, then the reader is
+re-created in the `BulkFormat#restoreReader(Configuration, FileSourceSplit)` method.
+
+A `SimpleStreamFormat` can be turned into a `BulkFormat` by wrapping it in a `StreamFormatAdapter`:
+```java
+BulkFormat<SomePojo, FileSourceSplit> bulkFormat = 
+        new StreamFormatAdapter<>(CsvReaderFormat.forPojo(SomePojo.class));
+```
+
+### Customizing File Enumeration
+
+{{< tabs "CustomizingFileEnumeration" >}}
+{{< tab "Java" >}}
+```java
+/**
+ * A FileEnumerator implementation for hive source, which generates splits based on 
+ * HiveTablePartition.
+ */
+public class HiveSourceFileEnumerator implements FileEnumerator {
+    
+    // reference constructor
+    public HiveSourceFileEnumerator(...) {
+        ...
+    }
+
+    /***
+     * Generates all file splits for the relevant files under the given paths. The {@code
+     * minDesiredSplits} is an optional hint indicating how many splits would be necessary to
+     * exploit parallelism properly.
+     */
+    @Override
+    public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
+            throws IOException {
+        // createInputSplits:splitting files into fragmented collections
+        return new ArrayList<>(createInputSplits(...));
+    }
+
+    ...
+
+    /***
+     * A factory to create HiveSourceFileEnumerator.
+     */
+    public static class Provider implements FileEnumerator.Provider {
+
+        ...
+        @Override
+        public FileEnumerator create() {
+            return new HiveSourceFileEnumerator(...);
+        }
+    }
+}
+// use the customizing file enumeration
+new HiveSource<>(
+        ...,
+        new HiveSourceFileEnumerator.Provider(
+        partitions != null ? partitions : Collections.emptyList(),
+        new JobConfWrapper(jobConf)),
+       ...);
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Current Limitations
+
+Watermarking doesn't work particularly well for large backlogs of files, because watermarks eagerly advance within a file, and the next file might contain data later than the watermark again.
+We are looking at ways to generate the watermarks more based on global information.
+
+For Unbounded File Sources, the enumerator currently remembers paths of all already processed files, which is a state that can in come cases grow rather large.
+The future will be planned to add a compressed form of tracking already processed files in the future (for example by keeping modification timestamps lower boundaries).
+
+### Behind the Scene
+{{< hint info >}}
+If you are interested in how File source works under the design of new data source API, you may
+want to read this part as a reference. For details about the new data source API,
+[documentation of data source]({{< ref "docs/dev/datastream/sources.md" >}}) and
+<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface">FLIP-27</a>
+provide more descriptive discussions.
+{{< /hint >}}
+
+The `File Source` is divided in the following two parts: File SplitEnumerator and File SourceReader.

Review comment:
       I'd remove everything beginning from here because the internals might be outdated or wrong and on the other not confusing for users.

##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -25,12 +27,243 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` that reads or writes (partitioned) files to filesystems
 supported by the [Flink `FileSystem` abstraction]({{< ref "docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it is an evolution of the 
-existing [Streaming File Sink]({{< ref "docs/connectors/datastream/streamfile_sink" >}}) which was designed for providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is designed for providing exactly-once semantics for `STREAMING` execution.
+
+The connector supports reading and writing a set of files from any (distributed) file system (e.g. POSIX, S3, HDFS)
+with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) (e.g., Avro, CSV, Parquet),
+producing a stream or records.
+
+## File Source
+
+The `File Source` is based on the [Source API]({{< ref "docs/dev/datastream/sources" >}}#the-data-source-api), 
+a unified data source that reads files - both in batch and in streaming mode. 
+It is divided into the following two parts: File SplitEnumerator and File SourceReader. 
+
+* File `SplitEnumerator` is responsible for discovering and identifying the files to read and assigns them to the File SourceReader.
+* File `SourceReader` requests the files it needs to process and reads the file from the filesystem. 
+
+You'll need to combine the File Source with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}). This allows you to
+parse CSV, decode AVRO or read Parquet columnar files.
+
+#### Bounded and Unbounded Streams
+
+A bounded `File Source` lists all files (via SplitEnumerator, for example a recursive directory list with filtered-out hidden files) and reads them all.
+
+An unbounded `File Source` is created when configuring the enumerator for periodic file discovery.
+In that case, the SplitEnumerator will enumerate like the bounded case but after a certain interval repeats the enumeration.
+For any repeated enumeration, the `SplitEnumerator` filters out previously detect files and only sends new ones to the `SourceReader`.
+
+### Usage
+
+You start building a file source via one of the following calls:
+
+{{< tabs "FileSourceUsage" >}}
+{{< tab "Java" >}}
+```java
+// reads the contents of a file from a file stream. 
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+        
+// reads batches of records from a file at a time
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+This creates a `FileSource.FileSourceBuilder` on which you can configure all the properties of the file source.
+
+For the bounded/batch case, the file source processes all files under the given path(s). 
+In the continuous/streaming case, the source periodically checks the paths for new files and will start reading those.
+
+When you start creating a file source (via the `FileSource.FileSourceBuilder` created through one of the above-mentioned methods) 
+the source is by default in bounded/batch mode. Call `AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` 
+to put the source into continuous streaming mode.
+
+{{< tabs "FileSourceBuilder" >}}
+{{< tab "Java" >}}
+```java
+final FileSource<String> source =
+        FileSource.forRecordStreamFormat(...)
+        .monitorContinuously(Duration.ofMillis(5))  
+        .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Format Types
+
+The reading of each file happens through file readers defined by file formats. 
+These define the parsing logic for the contents of the file. There are multiple classes that the source supports. 
+Their interfaces trade of simplicity of implementation and flexibility/efficiency.
+
+* A `StreamFormat` reads the contents of a file from a file stream. It is the simplest format to implement, 
+and provides many features out-of-the-box (like checkpointing logic) but is limited in the optimizations it can apply 
+(such as object reuse, batching, etc.).
+
+* A `BulkFormat` reads batches of records from a file at a time. 
+It is the most "low level" format to implement, but offers the greatest flexibility to optimize the implementation.
+
+#### TextLine format
+
+A `StreamFormat` reader format that text lines from a file.
+The reader uses Java's built-in `InputStreamReader` to decode the byte stream using
+various supported charset encodings.
+This format does not support optimized recovery from checkpoints. On recovery, it will re-read
+and discard the number of lined that were processed before the last checkpoint. That is due to
+the fact that the offsets of lines in the file cannot be tracked through the charset decoders
+with their internal buffering of stream input and charset decoder state.
+
+#### SimpleStreamFormat Abstract Class
+
+A simple version of `StreamFormat` for formats that are not splittable.
+Custom reads of Array or File can be done by implementing `SimpleStreamFormat`:
+
+{{< tabs "SimpleStreamFormat" >}}
+{{< tab "Java" >}}
+```java
+private static final class ArrayReaderFormat extends SimpleStreamFormat<byte[]> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public Reader<byte[]> createReader(Configuration config, FSDataInputStream stream)
+            throws IOException {
+        return new ArrayReader(stream);
+    }
+
+    @Override
+    public TypeInformation<byte[]> getProducedType() {
+        return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+    }
+}
+
+final FileSource<byte[]> source =
+                FileSource.forRecordStreamFormat(new ArrayReaderFormat(), path).build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+An example of a `SimpleStreamFormat` is `CsvReaderFormat`. It can be initialized as follows:
+```java
+CsvReaderFormat<SomePojo> csvFormat = CsvReaderFormat.forPojo(SomePojo.class);
+FileSource<SomePojo> source = 
+        FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();
+```
+
+The schema for CSV parsing, in this case, is automatically derived based on the fields of the `SomePojo` class using the `Jackson` library. (Note: you might need to add `@JsonPropertyOrder({field1, field2, ...})` annotation to your class definition with the fields order exactly matching those of the CSV file columns).
+
+If you need more fine-grained control over the CSV schema or the parsing options, use the more low-level `forSchema` static factory method of `CsvReaderFormat`:
+
+```java
+CsvReaderFormat<T> forSchema(CsvMapper mapper, 
+                             CsvSchema schema, 
+                             TypeInformation<T> typeInformation) 
+```
+
+#### Bulk Format
+
+The BulkFormat reads and decodes batches of records at a time. Examples of bulk formats
+are formats like ORC or Parquet.
+The outer `BulkFormat` class acts mainly as a configuration holder and factory for the
+reader. The actual reading is done by the `BulkFormat.Reader`, which is created in the
+`BulkFormat#createReader(Configuration, FileSourceSplit)` method. If a bulk reader is
+created based on a checkpoint during checkpointed streaming execution, then the reader is
+re-created in the `BulkFormat#restoreReader(Configuration, FileSourceSplit)` method.
+
+A `SimpleStreamFormat` can be turned into a `BulkFormat` by wrapping it in a `StreamFormatAdapter`:
+```java
+BulkFormat<SomePojo, FileSourceSplit> bulkFormat = 
+        new StreamFormatAdapter<>(CsvReaderFormat.forPojo(SomePojo.class));
+```
+
+### Customizing File Enumeration
+
+{{< tabs "CustomizingFileEnumeration" >}}
+{{< tab "Java" >}}
+```java
+/**
+ * A FileEnumerator implementation for hive source, which generates splits based on 
+ * HiveTablePartition.
+ */
+public class HiveSourceFileEnumerator implements FileEnumerator {
+    
+    // reference constructor
+    public HiveSourceFileEnumerator(...) {
+        ...
+    }
+
+    /***
+     * Generates all file splits for the relevant files under the given paths. The {@code
+     * minDesiredSplits} is an optional hint indicating how many splits would be necessary to
+     * exploit parallelism properly.
+     */
+    @Override
+    public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
+            throws IOException {
+        // createInputSplits:splitting files into fragmented collections
+        return new ArrayList<>(createInputSplits(...));
+    }
+
+    ...
+
+    /***
+     * A factory to create HiveSourceFileEnumerator.
+     */
+    public static class Provider implements FileEnumerator.Provider {
+
+        ...
+        @Override
+        public FileEnumerator create() {
+            return new HiveSourceFileEnumerator(...);
+        }
+    }
+}
+// use the customizing file enumeration
+new HiveSource<>(
+        ...,
+        new HiveSourceFileEnumerator.Provider(
+        partitions != null ? partitions : Collections.emptyList(),
+        new JobConfWrapper(jobConf)),
+       ...);
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Current Limitations
+
+Watermarking doesn't work particularly well for large backlogs of files, because watermarks eagerly advance within a file, and the next file might contain data later than the watermark again.
+We are looking at ways to generate the watermarks more based on global information.
+
+For Unbounded File Sources, the enumerator currently remembers paths of all already processed files, which is a state that can in come cases grow rather large.
+The future will be planned to add a compressed form of tracking already processed files in the future (for example by keeping modification timestamps lower boundaries).
+
+### Behind the Scene
+{{< hint info >}}
+If you are interested in how File source works under the design of new data source API, you may
+want to read this part as a reference. For details about the new data source API,
+[documentation of data source]({{< ref "docs/dev/datastream/sources.md" >}}) and
+<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface">FLIP-27</a>
+provide more descriptive discussions.
+{{< /hint >}}
+
+The `File Source` is divided in the following two parts: File SplitEnumerator and File SourceReader.
+
+The Source of a file system is divided into the following two parts: File Split Enumerator and File Source Reader,
+File Split Enumerator (Split is an abstraction of the external file system data splitting)
+It is responsible for discovering the split in the external system and assigning it to the File SourceReader,
+and it also manages the global water level to ensure that the consumption rate is approximately the same between our different File Source Readers.

Review comment:
       I think this sentence is partially a lie because afaik watermark alignment is not supported by any source yet.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org