You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/08/10 11:41:33 UTC

[flink] branch master updated: [FLINK-28904][python][docs] Add missing connector/format documentation

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 465db25502e [FLINK-28904][python][docs] Add missing connector/format documentation
465db25502e is described below

commit 465db25502e6e2c59e466b05dfd3bdb28919a765
Author: Juntao Hu <ma...@gmail.com>
AuthorDate: Wed Aug 10 16:43:01 2022 +0800

    [FLINK-28904][python][docs] Add missing connector/format documentation
    
    This closes #20535.
---
 .../docs/connectors/datastream/filesystem.md       | 70 ++++++++++++++++++++++
 .../docs/connectors/datastream/firehose.md         |  2 +
 .../docs/connectors/datastream/formats/json.md     | 32 +++++++++-
 .../connectors/datastream/formats/text_files.md    | 24 ++++++++
 .../docs/connectors/datastream/kinesis.md          |  2 +
 .../docs/connectors/datastream/pulsar.md           |  2 +
 .../docs/connectors/datastream/rabbitmq.md         |  2 +
 .../docs/connectors/datastream/filesystem.md       | 70 ++++++++++++++++++++++
 .../content/docs/connectors/datastream/firehose.md |  2 +
 .../docs/connectors/datastream/formats/json.md     | 32 +++++++++-
 .../connectors/datastream/formats/text_files.md    | 24 ++++++++
 docs/content/docs/connectors/datastream/kinesis.md |  2 +
 docs/content/docs/connectors/datastream/pulsar.md  |  2 +
 .../content/docs/connectors/datastream/rabbitmq.md |  2 +
 docs/data/sql_connectors.yml                       | 17 ++++++
 .../pyflink/datastream/connectors/file_system.py   |  8 +--
 16 files changed, 287 insertions(+), 6 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/filesystem.md b/docs/content.zh/docs/connectors/datastream/filesystem.md
index 80f49f4cb02..8db49e928b0 100644
--- a/docs/content.zh/docs/connectors/datastream/filesystem.md
+++ b/docs/content.zh/docs/connectors/datastream/filesystem.md
@@ -74,6 +74,15 @@ FileSource.forRecordStreamFormat(StreamFormat,Path...);
 FileSource.forBulkFileFormat(BulkFormat,Path...);
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+# 从文件流中读取文件内容
+FileSource.for_record_stream_format(stream_format, *path)
+
+# 从文件中一次读取一批记录
+FileSource.for_bulk_file_format(bulk_format, *path)
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 可以通过创建 `FileSource.FileSourceBuilder` 设置 File Source 的所有参数。
@@ -93,6 +102,13 @@ final FileSource<String> source =
         .build();
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+source = FileSource.for_record_stream_format(...) \
+    .monitor_continously(Duration.of_millis(5)) \
+    .build()
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 <a name="source-format-types"></a>
@@ -351,6 +367,19 @@ val sink: FileSink[String] = FileSink
 
 input.sinkTo(sink)
 
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+data_stream = ...
+
+sink = FileSink \
+    .for_row_format(OUTPUT_PATH, Encoder.simple_string_encoder("UTF-8")) \
+    .with_rolling_policy(RollingPolicy.default_rolling_policy(
+        part_size=1024 ** 3, rollover_interval=15 * 60 * 1000, inactivity_interval=5 * 60 * 1000)) \
+    .build()
+
+data_stream.sink_to(sink)
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -902,6 +931,10 @@ Flink 内置了两种 BucketAssigners:
 - `DateTimeBucketAssigner` :默认的基于时间的分配器
 - `BasePathBucketAssigner` :分配所有文件存储在基础路径上(单个全局桶)
 
+{{< hint info >}}
+PyFlink 只支持 `DateTimeBucketAssigner` 和 `BasePathBucketAssigner` 。
+{{< /hint >}}
+
 <a name="rolling-policy"></a>
 
 ### 滚动策略
@@ -915,6 +948,10 @@ Flink 内置了两种 RollingPolicies:
 - `DefaultRollingPolicy`
 - `OnCheckpointRollingPolicy`
 
+{{< hint info >}}
+PyFlink 只支持 `DefaultRollingPolicy` 和 `OnCheckpointRollingPolicy` 。
+{{< /hint >}}
+
 <a name="part-file-lifecycle"></a>
 
 ### Part 文件生命周期
@@ -1033,6 +1070,22 @@ val sink = FileSink
 			
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+config = OutputFileConfig \
+    .builder() \
+    .with_part_prefix("prefix") \
+    .with_part_suffix(".ext") \
+    .build()
+
+sink = FileSink \
+    .for_row_format(OUTPUT_PATH, Encoder.simple_string_encoder("UTF-8")) \
+    .with_bucket_assigner(BucketAssigner.base_path_bucket_assigner()) \
+    .with_rolling_policy(RollingPolicy.on_checkpoint_rolling_policy()) \
+    .with_output_file_config(config) \
+    .build()
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 <a name="compaction"></a>
@@ -1078,6 +1131,19 @@ val fileSink: FileSink[Integer] =
 
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+file_sink = FileSink \
+    .for_row_format(PATH, Encoder.simple_string_encoder()) \
+    .enable_compact(
+        FileCompactStrategy.builder()
+            .set_size_threshold(1024)
+            .enable_compaction_on_checkpoint(5)
+            .build(),
+        FileCompactor.concat_file_compactor()) \
+    .build()
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 这一功能开启后,在文件转为 `pending` 状态与文件最终提交之间会进行文件合并。这些 `pending` 状态的文件将首先被提交为一个以 `.` 开头的
@@ -1105,6 +1171,10 @@ val fileSink: FileSink[Integer] =
 **注意事项2** 如果启用了文件合并功能,文件可见的时间会被延长。
 {{< /hint >}}
 
+{{< hint info >}}
+PyFlink 只支持 `ConcatFileCompactor` 和 `IdenticalFileCompactor` 。
+{{< /hint >}}
+
 <a name="important-considerations"></a>
 
 ### 重要注意事项
diff --git a/docs/content.zh/docs/connectors/datastream/firehose.md b/docs/content.zh/docs/connectors/datastream/firehose.md
index 58b9083780e..afcbf56988d 100644
--- a/docs/content.zh/docs/connectors/datastream/firehose.md
+++ b/docs/content.zh/docs/connectors/datastream/firehose.md
@@ -33,6 +33,8 @@ To use the connector, add the following Maven dependency to your project:
 
 {{< artifact flink-connector-aws-kinesis-firehose >}}
 
+{{< py_download_link "aws-kinesis-firehose" >}}
+
 The `KinesisFirehoseSink` uses [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to write data from a Flink stream into a Firehose delivery stream.
 
 {{< tabs "42vs28vdth5-nm76-6dz1-5m7s-5y345bu56se5u66je" >}}
diff --git a/docs/content.zh/docs/connectors/datastream/formats/json.md b/docs/content.zh/docs/connectors/datastream/formats/json.md
index 232c45441f3..8e40e52f4da 100644
--- a/docs/content.zh/docs/connectors/datastream/formats/json.md
+++ b/docs/content.zh/docs/connectors/datastream/formats/json.md
@@ -35,6 +35,8 @@ To use the JSON format you need to add the Flink JSON dependency to your project
 </dependency>
 ```
 
+For PyFlink users, you could use it directly in your jobs.
+
 Flink supports reading/writing JSON records via the `JsonSerializationSchema/JsonDeserializationSchema`.
 These utilize the [Jackson](https://github.com/FasterXML/jackson) library, and support any type that is supported by Jackson, including, but not limited to, `POJO`s and `ObjectNode`.
 
@@ -74,4 +76,32 @@ JsonSerializationSchema<SomeClass> jsonFormat = new JsonSerializationSchema<>(
     () -> new ObjectMapper()
         .enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS))
         .registerModule(new ParameterNamesModule());
-```
\ No newline at end of file
+```
+
+## Python
+
+In PyFlink, `JsonRowSerializationSchema` and `JsonRowDeserializationSchema` are built-in support for `Row` type.
+Here are examples on how to use it in `KafkaSource` and `KafkaSink`:
+
+```python
+row_type_info = Types.ROW_NAMED(['name', 'age'], [Types.STRING(), Types.INT()])
+json_format = JsonRowDeserializationSchema.builder().type_info(row_type_info).build()
+
+source = KafkaSource.builder() \
+    .set_value_only_deserializer(json_format) \
+    .build()
+```
+
+```python
+row_type_info = Types.ROW_NAMED(['name', 'age'], [Types.STRING(), Types.INT()])
+json_format = JsonRowSerializationSchema.builder().with_type_info(row_type_info).build()
+
+sink = KafkaSink.builder() \
+    .set_record_serializer(
+        KafkaRecordSerializationSchema.builder()
+            .set_topic('test')
+            .set_value_serialization_schema(json_format)
+            .build()
+    ) \
+    .build()
+```
diff --git a/docs/content.zh/docs/connectors/datastream/formats/text_files.md b/docs/content.zh/docs/connectors/datastream/formats/text_files.md
index 2f0cbbfa84b..2e66da76452 100644
--- a/docs/content.zh/docs/connectors/datastream/formats/text_files.md
+++ b/docs/content.zh/docs/connectors/datastream/formats/text_files.md
@@ -39,6 +39,8 @@ Flink 支持使用 `TextLineInputFormat` 从文件中读取文本行。此 forma
 </dependency>
 ```
 
+PyFlink 用户可直接使用相关接口,无需添加依赖。
+
 此 format 与新 Source 兼容,可以在批处理和流模式下使用。
 因此,你可以通过两种方式使用此 format:
 - 批处理模式的有界读取
@@ -49,6 +51,8 @@ Flink 支持使用 `TextLineInputFormat` 从文件中读取文本行。此 forma
 在此示例中,我们创建了一个 DataStream,其中包含作为字符串的文本文件的行。
 此处不需要水印策略,因为记录不包含事件时间戳。
 
+{{< tabs "bounded" >}}
+{{< tab "Java" >}}
 ```java
 final FileSource<String> source =
   FileSource.forRecordStreamFormat(new TextLineInputFormat(), /* Flink Path */)
@@ -56,11 +60,21 @@ final FileSource<String> source =
 final DataStream<String> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+source = FileSource.for_record_stream_format(StreamFormat.text_line_format(), *path).build()
+stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "file-source")
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 **连续读取示例**:
 在此示例中,我们创建了一个 DataStream,随着新文件被添加到目录中,其中包含的文本文件行的字符串流将无限增长。我们每秒会进行新文件监控。
 此处不需要水印策略,因为记录不包含事件时间戳。
 
+{{< tabs "continous" >}}
+{{< tab "Java" >}}
 ```java
 final FileSource<String> source =
     FileSource.forRecordStreamFormat(new TextLineInputFormat(), /* Flink Path */)
@@ -69,3 +83,13 @@ final FileSource<String> source =
 final DataStream<String> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+source = FileSource \
+    .for_record_stream_format(StreamFormat.text_line_format(), *path) \
+    .monitor_continously(Duration.of_seconds(1)) \
+    .build()
+stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "file-source")
+```
+{{< /tab >}}
diff --git a/docs/content.zh/docs/connectors/datastream/kinesis.md b/docs/content.zh/docs/connectors/datastream/kinesis.md
index 208cadb73a2..cdb7649f24d 100644
--- a/docs/content.zh/docs/connectors/datastream/kinesis.md
+++ b/docs/content.zh/docs/connectors/datastream/kinesis.md
@@ -52,6 +52,8 @@ Kinesis 连接器提供访问 [Amazon Kinesis Data Streams](http://aws.amazon.co
 
 由于许可证问题,以前的版本中 `flink-connector-kinesis` 工件没有部署到Maven中心库。有关更多信息,请参阅特定版本的文档。
 
+{{< py_download_link "kinesis" >}}
+
 ## 使用亚马逊 Kinesis 流服务
 遵循 [Amazon Kinesis Streams Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) 的指令建立 Kinesis 流。
 
diff --git a/docs/content.zh/docs/connectors/datastream/pulsar.md b/docs/content.zh/docs/connectors/datastream/pulsar.md
index b7ca75e3c52..eace8bb8fc5 100644
--- a/docs/content.zh/docs/connectors/datastream/pulsar.md
+++ b/docs/content.zh/docs/connectors/datastream/pulsar.md
@@ -34,6 +34,8 @@ Pulsar Source 当前支持 Pulsar 2.8.1 之后的版本,但是 Pulsar Source 
 
 {{< artifact flink-connector-pulsar >}}
 
+{{< py_download_link "pulsar" >}}
+
 Flink 的流连接器并不会放到发行文件里面一同发布,阅读[此文档]({{< ref "docs/dev/configuration/overview" >}}),了解如何将连接器添加到集群实例内。
 
 ## Pulsar Source
diff --git a/docs/content.zh/docs/connectors/datastream/rabbitmq.md b/docs/content.zh/docs/connectors/datastream/rabbitmq.md
index 783cca83121..aee35b45bac 100644
--- a/docs/content.zh/docs/connectors/datastream/rabbitmq.md
+++ b/docs/content.zh/docs/connectors/datastream/rabbitmq.md
@@ -40,6 +40,8 @@ Flink 自身既没有复用 "RabbitMQ AMQP Java Client" 的代码,也没有将
 
 {{< artifact flink-connector-rabbitmq >}}
 
+{{< py_download_link "rabbitmq" >}}
+
 注意连接器现在没有包含在二进制发行版中。集群执行的相关信息请参考 [这里]({{< ref "docs/dev/configuration/overview" >}}).
 
 ### 安装 RabbitMQ
diff --git a/docs/content/docs/connectors/datastream/filesystem.md b/docs/content/docs/connectors/datastream/filesystem.md
index c1f047c6e86..ca2bfc77306 100644
--- a/docs/content/docs/connectors/datastream/filesystem.md
+++ b/docs/content/docs/connectors/datastream/filesystem.md
@@ -71,6 +71,15 @@ FileSource.forRecordStreamFormat(StreamFormat,Path...);
 FileSource.forBulkFileFormat(BulkFormat,Path...);
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+# reads the contents of a file from a file stream.
+FileSource.for_record_stream_format(stream_format, *path)
+
+# reads batches of records from a file at a time
+FileSource.for_bulk_file_format(bulk_format, *path)
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 This creates a `FileSource.FileSourceBuilder` on which you can configure all the properties of the File Source.
@@ -91,6 +100,13 @@ final FileSource<String> source =
         .build();
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+source = FileSource.for_record_stream_format(...) \
+    .monitor_continously(Duration.of_millis(5)) \
+    .build()
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 ### Format Types
@@ -349,6 +365,19 @@ val sink: FileSink[String] = FileSink
 
 input.sinkTo(sink)
 
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+data_stream = ...
+
+sink = FileSink \
+    .for_row_format(OUTPUT_PATH, Encoder.simple_string_encoder("UTF-8")) \
+    .with_rolling_policy(RollingPolicy.default_rolling_policy(
+        part_size=1024 ** 3, rollover_interval=15 * 60 * 1000, inactivity_interval=5 * 60 * 1000)) \
+    .build()
+
+data_stream.sink_to(sink)
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -905,6 +934,10 @@ Flink comes with two built-in BucketAssigners:
  - `DateTimeBucketAssigner` : Default time based assigner
  - `BasePathBucketAssigner` : Assigner that stores all part files in the base path (single global bucket)
 
+{{< hint info >}}
+Note: PyFlink only supports `DateTimeBucketAssigner` and `BasePathBucketAssigner`.
+{{< /hint >}}
+
 ### Rolling Policy
 
 The `RollingPolicy` defines when a given in-progress part file will be closed and moved to the pending and later to finished state.
@@ -918,6 +951,10 @@ Flink comes with two built-in RollingPolicies:
  - `DefaultRollingPolicy`
  - `OnCheckpointRollingPolicy`
 
+{{< hint info >}}
+Note: PyFlink only supports `DefaultRollingPolicy` and `OnCheckpointRollingPolicy`.
+{{< /hint >}}
+
 ### Part file lifecycle
 
 In order to use the output of the `FileSink` in downstream systems, we need to understand the naming and lifecycle of the output files produced.
@@ -1033,6 +1070,22 @@ val sink = FileSink
 			
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+config = OutputFileConfig \
+    .builder() \
+    .with_part_prefix("prefix") \
+    .with_part_suffix(".ext") \
+    .build()
+
+sink = FileSink \
+    .for_row_format(OUTPUT_PATH, Encoder.simple_string_encoder("UTF-8")) \
+    .with_bucket_assigner(BucketAssigner.base_path_bucket_assigner()) \
+    .with_rolling_policy(RollingPolicy.on_checkpoint_rolling_policy()) \
+    .with_output_file_config(config) \
+    .build()
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 ### Compaction
@@ -1077,6 +1130,19 @@ val fileSink: FileSink[Integer] =
 
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+file_sink = FileSink \
+    .for_row_format(PATH, Encoder.simple_string_encoder()) \
+    .enable_compact(
+        FileCompactStrategy.builder()
+            .set_size_threshold(1024)
+            .enable_compaction_on_checkpoint(5)
+            .build(),
+        FileCompactor.concat_file_compactor()) \
+    .build()
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 Once enabled, the compaction happens between the files become `pending` and get committed. The pending files will
@@ -1108,6 +1174,10 @@ the give list of `Path` and write the result file. It could be classified into t
 **Important Note 2** When the compaction is enabled, the written files need to wait for longer time before they get visible.
 {{< /hint >}}
 
+{{< hint info >}}
+Note: PyFlink only supports `ConcatFileCompactor` and `IdenticalFileCompactor`.
+{{< /hint >}}
+
 ### Important Considerations
 
 #### General
diff --git a/docs/content/docs/connectors/datastream/firehose.md b/docs/content/docs/connectors/datastream/firehose.md
index b99d9f0e2c5..2f2a5cefdb8 100644
--- a/docs/content/docs/connectors/datastream/firehose.md
+++ b/docs/content/docs/connectors/datastream/firehose.md
@@ -33,6 +33,8 @@ To use the connector, add the following Maven dependency to your project:
 
 {{< artifact flink-connector-aws-kinesis-firehose >}}
 
+{{< py_download_link "aws-kinesis-firehose" >}}
+
 The `KinesisFirehoseSink` uses [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to write data from a Flink stream into a Firehose delivery stream.
 
 {{< tabs "42vs28vdth5-nm76-6dz1-5m7s-5y345bu56se5u66je" >}}
diff --git a/docs/content/docs/connectors/datastream/formats/json.md b/docs/content/docs/connectors/datastream/formats/json.md
index 57c97cb6b20..eb538a4432c 100644
--- a/docs/content/docs/connectors/datastream/formats/json.md
+++ b/docs/content/docs/connectors/datastream/formats/json.md
@@ -35,6 +35,8 @@ To use the JSON format you need to add the Flink JSON dependency to your project
 </dependency>
 ```
 
+For PyFlink users, you could use it directly in your jobs.
+
 Flink supports reading/writing JSON records via the `JsonSerializationSchema/JsonDeserializationSchema`.
 These utilize the [Jackson](https://github.com/FasterXML/jackson) library, and support any type that is supported by Jackson, including, but not limited to, `POJO`s and `ObjectNode`.
 
@@ -74,4 +76,32 @@ JsonSerializationSchema<SomeClass> jsonFormat=new JsonSerializationSchema<>(
     () -> new ObjectMapper()
         .enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS))
         .registerModule(new ParameterNamesModule());
-```
\ No newline at end of file
+```
+
+## Python
+
+In PyFlink, `JsonRowSerializationSchema` and `JsonRowDeserializationSchema` are built-in support for `Row` type.
+For example to use it in `KafkaSource` and `KafkaSink`:
+
+```python
+row_type_info = Types.ROW_NAMED(['name', 'age'], [Types.STRING(), Types.INT()])
+json_format = JsonRowDeserializationSchema.builder().type_info(row_type_info).build()
+
+source = KafkaSource.builder() \
+    .set_value_only_deserializer(json_format) \
+    .build()
+```
+
+```python
+row_type_info = Types.ROW_NAMED(['name', 'age'], [Types.STRING(), Types.INT()])
+json_format = JsonRowSerializationSchema.builder().with_type_info(row_type_info).build()
+
+sink = KafkaSink.builder() \
+    .set_record_serializer(
+        KafkaRecordSerializationSchema.builder()
+            .set_topic('test')
+            .set_value_serialization_schema(json_format)
+            .build()
+    ) \
+    .build()
+```
diff --git a/docs/content/docs/connectors/datastream/formats/text_files.md b/docs/content/docs/connectors/datastream/formats/text_files.md
index 33a69566b5a..aa8073fd131 100644
--- a/docs/content/docs/connectors/datastream/formats/text_files.md
+++ b/docs/content/docs/connectors/datastream/formats/text_files.md
@@ -39,6 +39,8 @@ To use the format you need to add the Flink Connector Files dependency to your p
 </dependency>
 ```
 
+For PyFlink users, you could use it directly in your jobs.
+
 This format is compatible with the new Source that can be used in both batch and streaming modes.
 Thus, you can use this format in two ways:
 - Bounded read for batch mode
@@ -49,6 +51,8 @@ Thus, you can use this format in two ways:
 In this example we create a DataStream containing the lines of a text file as Strings. 
 There is no need for a watermark strategy as records do not contain event timestamps.
 
+{{< tabs "bounded" >}}
+{{< tab "Java" >}}
 ```java
 final FileSource<String> source =
   FileSource.forRecordStreamFormat(new TextLineInputFormat(), /* Flink Path */)
@@ -56,12 +60,22 @@ final FileSource<String> source =
 final DataStream<String> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+source = FileSource.for_record_stream_format(StreamFormat.text_line_format(), *path).build()
+stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "file-source")
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 **Continuous read example**:
 In this example, we create a DataStream containing the lines of text files as Strings that will infinitely grow 
 as new files are added to the directory. We monitor for new files each second.
 There is no need for a watermark strategy as records do not contain event timestamps.
 
+{{< tabs "continous" >}}
+{{< tab "Java" >}}
 ```java
 final FileSource<String> source =
     FileSource.forRecordStreamFormat(new TextLineInputFormat(), /* Flink Path */)
@@ -70,3 +84,13 @@ final FileSource<String> source =
 final DataStream<String> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+source = FileSource \
+    .for_record_stream_format(StreamFormat.text_line_format(), *path) \
+    .monitor_continously(Duration.of_seconds(1)) \
+    .build()
+stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "file-source")
+```
+{{< /tab >}}
diff --git a/docs/content/docs/connectors/datastream/kinesis.md b/docs/content/docs/connectors/datastream/kinesis.md
index 9513a115c92..af7ab42f90c 100644
--- a/docs/content/docs/connectors/datastream/kinesis.md
+++ b/docs/content/docs/connectors/datastream/kinesis.md
@@ -52,6 +52,8 @@ To use this connector, add one or more of the following dependencies to your pro
 
 Due to the licensing issue, the `flink-connector-kinesis` artifact is not deployed to Maven central for the prior versions. Please see the version specific documentation for further information.
 
+{{< py_download_link "kinesis" >}}
+
 ## Using the Amazon Kinesis Streams Service
 Follow the instructions from the [Amazon Kinesis Streams Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
 to setup Kinesis streams.
diff --git a/docs/content/docs/connectors/datastream/pulsar.md b/docs/content/docs/connectors/datastream/pulsar.md
index 568110193e8..391fb97a1bf 100644
--- a/docs/content/docs/connectors/datastream/pulsar.md
+++ b/docs/content/docs/connectors/datastream/pulsar.md
@@ -34,6 +34,8 @@ Details on Pulsar compatibility can be found in [PIP-72](https://github.com/apac
 
 {{< artifact flink-connector-pulsar >}}
 
+{{< py_download_link "pulsar" >}}
+
 Flink's streaming connectors are not part of the binary distribution.
 See how to link with them for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
 
diff --git a/docs/content/docs/connectors/datastream/rabbitmq.md b/docs/content/docs/connectors/datastream/rabbitmq.md
index 3d8678aee2f..78fbccf5f53 100644
--- a/docs/content/docs/connectors/datastream/rabbitmq.md
+++ b/docs/content/docs/connectors/datastream/rabbitmq.md
@@ -44,6 +44,8 @@ This connector provides access to data streams from [RabbitMQ](http://www.rabbit
 
 {{< artifact flink-connector-rabbitmq >}}
 
+{{< py_download_link "rabbitmq" >}}
+
 Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
 
 ### Installing RabbitMQ
diff --git a/docs/data/sql_connectors.yml b/docs/data/sql_connectors.yml
index eb8ac5f41b8..7c292469647 100644
--- a/docs/data/sql_connectors.yml
+++ b/docs/data/sql_connectors.yml
@@ -172,3 +172,20 @@ kinesis:
     maven: flink-connector-kinesis
     sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kinesis/$version/flink-sql-connector-kinesis-$version.jar
 
+aws-kinesis-firehose:
+  name: AWS Kinesis Firehose
+  category: connector
+  maven: flink-connector-kinesis-firehose
+  sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-aws-kinesis-firehose/$version/flink-sql-connector-aws-kinesis-firehose-$version.jar
+
+pulsar:
+    name: Pulsar
+    category: connector
+    maven: flink-connector-pulsar
+    sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-pulsar/$version/flink-sql-connector-pulsar-$version.jar
+
+rabbitmq:
+    name: RabbitMQ
+    category: connector
+    maven: flink-connector-rabbitmq
+    sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-rabbitmq/$version/flink-sql-connector-rabbitmq-$version.jar
diff --git a/flink-python/pyflink/datastream/connectors/file_system.py b/flink-python/pyflink/datastream/connectors/file_system.py
index 2494693c9c2..11ad3918ef3 100644
--- a/flink-python/pyflink/datastream/connectors/file_system.py
+++ b/flink-python/pyflink/datastream/connectors/file_system.py
@@ -569,14 +569,14 @@ class FileCompactor(JavaObjectWrapper):
             return FileCompactor(JConcatFileCompactor())
 
     @staticmethod
-    def identity_file_compactor():
+    def identical_file_compactor():
         """
         Returns a file compactor that directly copy the content of the only input file to the
         output.
         """
-        JIdentityFileCompactor = get_gateway().jvm.org.apache.flink.connector.file.sink.compactor.\
-            IdentityFileCompactor
-        return FileCompactor(JIdentityFileCompactor())
+        JIdenticalFileCompactor = get_gateway().jvm.org.apache.flink.connector.file.sink.compactor.\
+            IdenticalFileCompactor
+        return FileCompactor(JIdenticalFileCompactor())
 
 
 class FileSink(Sink, SupportsPreprocessing):