You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/15 15:33:27 UTC

[GitHub] [flink] MartijnVisser commented on a change in pull request #19056: [FLINK-26028][Connector/pulsar] add sink documentation; change some source documentation

MartijnVisser commented on a change in pull request #19056:
URL: https://github.com/apache/flink/pull/19056#discussion_r827071130



##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -44,17 +43,19 @@ See how to link with them for cluster execution [here]({{< ref "docs/dev/configu
 This part describes the Pulsar source based on the new
 [data source]({{< ref "docs/dev/datastream/sources.md" >}}) API.
 
-If you want to use the legacy `SourceFunction` or on Flink 1.13 or lower releases, just use the StreamNative's [pulsar-flink](https://github.com/streamnative/pulsar-flink).
+If you want to use the legacy `SourceFunction` or on Flink 1.13 or lower releases,
+just use the StreamNative's [Pulsar Flink connector](https://github.com/streamnative/pulsar-flink).
 {{< /hint >}}
 
 ### Usage
 
-Pulsar source provides a builder class for constructing an instance of PulsarSource. The code snippet below shows
-how to build a PulsarSource to consume messages from the earliest cursor of topic "persistent://public/default/my-topic",
-with **Exclusive** subscription `my-subscription` and deserialize the raw payload of the messages as strings.
+Pulsar source provides a builder class for constructing a PulsarSource instance. The code snippet below shows
+how to build a PulsarSource instance to consume messages from the earliest cursor of the topic
+"persistent://public/default/my-topic" in **Exclusive** subscription type (`my-subscription`)
+and deserialize the raw payload of the messages as strings.

Review comment:
       ```suggestion
   The Pulsar source provides a builder class for constructing a PulsarSource instance. The code snippet below builds a PulsarSource instance. It consumes messages from the earliest cursor of the topic
   "persistent://public/default/my-topic" in **Exclusive** subscription type (`my-subscription`)
   and deserializes the raw payload of the messages as strings.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -24,14 +24,13 @@ under the License.
 
 # Apache Pulsar Connector
 
-Flink provides an [Apache Pulsar](https://pulsar.apache.org) source connector for reading data from Pulsar topics with exactly-once guarantees.
+Flink provides an [Apache Pulsar](https://pulsar.apache.org) connector for reading data from Pulsar topics with exactly-once guarantees.
 
 ## Dependency
 
-You can use the connector with Pulsar 2.7.0 or higher. However, the Pulsar source connector supports
-Pulsar [transactions](https://pulsar.apache.org/docs/en/txn-what/),
-it is recommended to use Pulsar 2.8.0 or higher releases.
-For details on Pulsar compatibility, please refer to the [PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification).
+You can use the connector with the Pulsar 2.8.1 or higher version. However, the Pulsar connector supports
+Pulsar [transactions](https://pulsar.apache.org/docs/en/txn-what/), it is recommended to use the Pulsar 2.9.2 or higher version.
+For details on Pulsar compatibility, refer to the [PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification).

Review comment:
       ```suggestion
   You can use the connector with the Pulsar 2.8.1 or higher. Because the Pulsar connector supports
   Pulsar [transactions](https://pulsar.apache.org/docs/en/txn-what/), it is recommended to use the Pulsar 2.9.2 or higher.
   Details on Pulsar compatibility can be found in [PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification).
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -96,64 +101,65 @@ Pulsar source provide two ways of topic-partition subscription:
 
 #### Flexible Topic Naming
 
-Since Pulsar 2.0, all topic names internally have the form `{persistent|non-persistent}://tenant/namespace/topic`.
+Since Pulsar 2.0, all topic names internally are in  a form of  `{persistent|non-persistent}://tenant/namespace/topic`.
 Now, for partitioned topics, you can use short names in many cases (for the sake of simplicity).
 The flexible naming system stems from the fact that there is now a default topic type, tenant, and namespace in a Pulsar cluster.
 
-Topic property | Default
-:------------|:-------
-topic type | `persistent`
-tenant | `public`
-namespace | `default`
+| Topic property | Default      |
+|:---------------|:-------------|
+| topic type     | `persistent` |
+| tenant         | `public`     |
+| namespace      | `default`    |
 
-This table lists a mapping relationship between your input topic name and translated topic name:
+This table lists a mapping relationship between your input topic name and the translated topic name:
 
-Input topic name | Translated topic name
-:----------------|:---------------------
-`my-topic` | `persistent://public/default/my-topic`
-`my-tenant/my-namespace/my-topic` | `persistent://my-tenant/my-namespace/my-topic`
+| Input topic name                  | Translated topic name                          |
+|:----------------------------------|:-----------------------------------------------|
+| `my-topic`                        | `persistent://public/default/my-topic`         |
+| `my-tenant/my-namespace/my-topic` | `persistent://my-tenant/my-namespace/my-topic` |
 
 {{< hint warning >}}
-For non-persistent topics, you need to continue to specify the entire topic name,
+For non-persistent topics, you need to specify the entire topic name,
 as the default-based rules do not apply for non-partitioned topics.
-Thus, you cannot use a short name like `non-persistent://my-topic` and would need to use `non-persistent://public/default/my-topic` instead.
+Thus, you cannot use a short name like `non-persistent://my-topic` and would use `non-persistent://public/default/my-topic` instead.

Review comment:
       ```suggestion
   Thus, you cannot use a short name like `non-persistent://my-topic` and need to use `non-persistent://public/default/my-topic` instead.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -96,64 +101,65 @@ Pulsar source provide two ways of topic-partition subscription:
 
 #### Flexible Topic Naming
 
-Since Pulsar 2.0, all topic names internally have the form `{persistent|non-persistent}://tenant/namespace/topic`.
+Since Pulsar 2.0, all topic names internally are in  a form of  `{persistent|non-persistent}://tenant/namespace/topic`.
 Now, for partitioned topics, you can use short names in many cases (for the sake of simplicity).
 The flexible naming system stems from the fact that there is now a default topic type, tenant, and namespace in a Pulsar cluster.
 
-Topic property | Default
-:------------|:-------
-topic type | `persistent`
-tenant | `public`
-namespace | `default`
+| Topic property | Default      |
+|:---------------|:-------------|
+| topic type     | `persistent` |
+| tenant         | `public`     |
+| namespace      | `default`    |
 
-This table lists a mapping relationship between your input topic name and translated topic name:
+This table lists a mapping relationship between your input topic name and the translated topic name:
 
-Input topic name | Translated topic name
-:----------------|:---------------------
-`my-topic` | `persistent://public/default/my-topic`
-`my-tenant/my-namespace/my-topic` | `persistent://my-tenant/my-namespace/my-topic`
+| Input topic name                  | Translated topic name                          |
+|:----------------------------------|:-----------------------------------------------|
+| `my-topic`                        | `persistent://public/default/my-topic`         |
+| `my-tenant/my-namespace/my-topic` | `persistent://my-tenant/my-namespace/my-topic` |
 
 {{< hint warning >}}
-For non-persistent topics, you need to continue to specify the entire topic name,
+For non-persistent topics, you need to specify the entire topic name,
 as the default-based rules do not apply for non-partitioned topics.
-Thus, you cannot use a short name like `non-persistent://my-topic` and would need to use `non-persistent://public/default/my-topic` instead.
+Thus, you cannot use a short name like `non-persistent://my-topic` and would use `non-persistent://public/default/my-topic` instead.
 {{< /hint >}}
 
 #### Subscribing Pulsar Topic Partition
 
 Internally, Pulsar divides a partitioned topic as a set of non-partitioned topics according to the partition size.
 
-For example, if a `simple-string` topic with 3 partitions is created under the `sample` tenant with `flink` namespace.
+For example, if a `simple-string` topic with 3 partitions is created under the `sample` tenant with the `flink` namespace.
 The topics on Pulsar would be:
 
-Topic name | Partitioned
-:--------- | :----------
-`persistent://sample/flink/simple-string` | Y
-`persistent://sample/flink/simple-string-partition-0` | N
-`persistent://sample/flink/simple-string-partition-1` | N
-`persistent://sample/flink/simple-string-partition-2` | N
+| Topic name                                            | Partitioned |
+|:------------------------------------------------------|:------------|
+| `persistent://sample/flink/simple-string`             | Y           |
+| `persistent://sample/flink/simple-string-partition-0` | N           |
+| `persistent://sample/flink/simple-string-partition-1` | N           |
+| `persistent://sample/flink/simple-string-partition-2` | N           |
 
 You can directly consume messages from the topic partitions by using the non-partitioned topic names above.
-For example, use `PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", "sample/flink/simple-string-partition-2")` would consume the partitions 1 and 2 of the `sample/flink/simple-string` topic.
+For example, use `PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", "sample/flink/simple-string-partition-2")`
+would consume the partitions 1 and 2 of the `sample/flink/simple-string` topic.
 
-#### RegexSubscriptionMode for Topic Pattern
+#### Setting Topic Pattern

Review comment:
       ```suggestion
   #### Setting Topic Patterns
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -44,17 +43,19 @@ See how to link with them for cluster execution [here]({{< ref "docs/dev/configu
 This part describes the Pulsar source based on the new
 [data source]({{< ref "docs/dev/datastream/sources.md" >}}) API.
 
-If you want to use the legacy `SourceFunction` or on Flink 1.13 or lower releases, just use the StreamNative's [pulsar-flink](https://github.com/streamnative/pulsar-flink).
+If you want to use the legacy `SourceFunction` or on Flink 1.13 or lower releases,
+just use the StreamNative's [Pulsar Flink connector](https://github.com/streamnative/pulsar-flink).
 {{< /hint >}}

Review comment:
       I would completely remove this part from the documentation, since it will end up in documentation for Flink 1.15 and future versions. 

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -69,13 +70,17 @@ env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
 
 The following properties are **required** for building a PulsarSource:
 
-- Pulsar service url, configured by `setServiceUrl(String)`
-- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Pulsar service URL, configured by `setServiceUrl(String)`
+- Pulsar service HTTP URL (also known as admin URL), configured by `setAdminUrl(String)`
 - Pulsar subscription name, configured by `setSubscriptionName(String)`
 - Topics / partitions to subscribe, see the following
-  [Topic-partition subscription](#topic-partition-subscription) for more details.
+  [topic-partition subscription](#topic-partition-subscription) for more details.
 - Deserializer to parse Pulsar messages, see the following
-  [Deserializer](#deserializer) for more details.
+  [deserializer](#deserializer) for more details.
+
+It is **recommended** to set the consumer name in Pulsar Source by `setConsumerName(String)`.
+This would give a unique name to the flink connector in the Pulsar statistic dashboard.
+Give you a mechanism to monitor the performance of your flink applications.

Review comment:
       ```suggestion
   It is recommended to set the consumer name in Pulsar Source by `setConsumerName(String)`.
   This sets a unique name for the Flink connector in the Pulsar statistic dashboard.
   You can use it to monitor the performance of your Flink connector and applications.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -96,64 +101,65 @@ Pulsar source provide two ways of topic-partition subscription:
 
 #### Flexible Topic Naming
 
-Since Pulsar 2.0, all topic names internally have the form `{persistent|non-persistent}://tenant/namespace/topic`.
+Since Pulsar 2.0, all topic names internally are in  a form of  `{persistent|non-persistent}://tenant/namespace/topic`.
 Now, for partitioned topics, you can use short names in many cases (for the sake of simplicity).
 The flexible naming system stems from the fact that there is now a default topic type, tenant, and namespace in a Pulsar cluster.
 
-Topic property | Default
-:------------|:-------
-topic type | `persistent`
-tenant | `public`
-namespace | `default`
+| Topic property | Default      |
+|:---------------|:-------------|
+| topic type     | `persistent` |
+| tenant         | `public`     |
+| namespace      | `default`    |
 
-This table lists a mapping relationship between your input topic name and translated topic name:
+This table lists a mapping relationship between your input topic name and the translated topic name:
 
-Input topic name | Translated topic name
-:----------------|:---------------------
-`my-topic` | `persistent://public/default/my-topic`
-`my-tenant/my-namespace/my-topic` | `persistent://my-tenant/my-namespace/my-topic`
+| Input topic name                  | Translated topic name                          |
+|:----------------------------------|:-----------------------------------------------|
+| `my-topic`                        | `persistent://public/default/my-topic`         |
+| `my-tenant/my-namespace/my-topic` | `persistent://my-tenant/my-namespace/my-topic` |
 
 {{< hint warning >}}
-For non-persistent topics, you need to continue to specify the entire topic name,
+For non-persistent topics, you need to specify the entire topic name,
 as the default-based rules do not apply for non-partitioned topics.
-Thus, you cannot use a short name like `non-persistent://my-topic` and would need to use `non-persistent://public/default/my-topic` instead.
+Thus, you cannot use a short name like `non-persistent://my-topic` and would use `non-persistent://public/default/my-topic` instead.
 {{< /hint >}}
 
 #### Subscribing Pulsar Topic Partition
 
 Internally, Pulsar divides a partitioned topic as a set of non-partitioned topics according to the partition size.
 
-For example, if a `simple-string` topic with 3 partitions is created under the `sample` tenant with `flink` namespace.
+For example, if a `simple-string` topic with 3 partitions is created under the `sample` tenant with the `flink` namespace.
 The topics on Pulsar would be:
 
-Topic name | Partitioned
-:--------- | :----------
-`persistent://sample/flink/simple-string` | Y
-`persistent://sample/flink/simple-string-partition-0` | N
-`persistent://sample/flink/simple-string-partition-1` | N
-`persistent://sample/flink/simple-string-partition-2` | N
+| Topic name                                            | Partitioned |
+|:------------------------------------------------------|:------------|
+| `persistent://sample/flink/simple-string`             | Y           |
+| `persistent://sample/flink/simple-string-partition-0` | N           |
+| `persistent://sample/flink/simple-string-partition-1` | N           |
+| `persistent://sample/flink/simple-string-partition-2` | N           |
 
 You can directly consume messages from the topic partitions by using the non-partitioned topic names above.
-For example, use `PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", "sample/flink/simple-string-partition-2")` would consume the partitions 1 and 2 of the `sample/flink/simple-string` topic.
+For example, use `PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", "sample/flink/simple-string-partition-2")`
+would consume the partitions 1 and 2 of the `sample/flink/simple-string` topic.
 
-#### RegexSubscriptionMode for Topic Pattern
+#### Setting Topic Pattern
 
-Pulsar connector extracts the topic type (`persistent` or `non-persistent`) from the given topic pattern.
-For example, `PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` would be `non-persistent`.
-The topic type would be `persistent` if you do not provide the topic type in the regular expression.
+Pulsar source extracts the topic type (`persistent` or `non-persistent`) from the given topic pattern.
+For example, you can use the `PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` to specify a `non-persistent` topic.
+By default, a `persistent` topic is created if you do not specify the topic type in the regular expression.

Review comment:
       ```suggestion
   The Pulsar source extracts the topic type (`persistent` or `non-persistent`) from the provided topic pattern.
   For example, you can use the `PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` to specify a `non-persistent` topic.
   By default, a `persistent` topic is created if you do not specify the topic type in the regular expression.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -96,64 +101,65 @@ Pulsar source provide two ways of topic-partition subscription:
 
 #### Flexible Topic Naming
 
-Since Pulsar 2.0, all topic names internally have the form `{persistent|non-persistent}://tenant/namespace/topic`.
+Since Pulsar 2.0, all topic names internally are in  a form of  `{persistent|non-persistent}://tenant/namespace/topic`.
 Now, for partitioned topics, you can use short names in many cases (for the sake of simplicity).
 The flexible naming system stems from the fact that there is now a default topic type, tenant, and namespace in a Pulsar cluster.
 
-Topic property | Default
-:------------|:-------
-topic type | `persistent`
-tenant | `public`
-namespace | `default`
+| Topic property | Default      |
+|:---------------|:-------------|
+| topic type     | `persistent` |
+| tenant         | `public`     |
+| namespace      | `default`    |
 
-This table lists a mapping relationship between your input topic name and translated topic name:
+This table lists a mapping relationship between your input topic name and the translated topic name:
 
-Input topic name | Translated topic name
-:----------------|:---------------------
-`my-topic` | `persistent://public/default/my-topic`
-`my-tenant/my-namespace/my-topic` | `persistent://my-tenant/my-namespace/my-topic`
+| Input topic name                  | Translated topic name                          |
+|:----------------------------------|:-----------------------------------------------|
+| `my-topic`                        | `persistent://public/default/my-topic`         |
+| `my-tenant/my-namespace/my-topic` | `persistent://my-tenant/my-namespace/my-topic` |
 
 {{< hint warning >}}
-For non-persistent topics, you need to continue to specify the entire topic name,
+For non-persistent topics, you need to specify the entire topic name,
 as the default-based rules do not apply for non-partitioned topics.
-Thus, you cannot use a short name like `non-persistent://my-topic` and would need to use `non-persistent://public/default/my-topic` instead.
+Thus, you cannot use a short name like `non-persistent://my-topic` and would use `non-persistent://public/default/my-topic` instead.
 {{< /hint >}}
 
 #### Subscribing Pulsar Topic Partition
 
 Internally, Pulsar divides a partitioned topic as a set of non-partitioned topics according to the partition size.
 
-For example, if a `simple-string` topic with 3 partitions is created under the `sample` tenant with `flink` namespace.
+For example, if a `simple-string` topic with 3 partitions is created under the `sample` tenant with the `flink` namespace.
 The topics on Pulsar would be:
 
-Topic name | Partitioned
-:--------- | :----------
-`persistent://sample/flink/simple-string` | Y
-`persistent://sample/flink/simple-string-partition-0` | N
-`persistent://sample/flink/simple-string-partition-1` | N
-`persistent://sample/flink/simple-string-partition-2` | N
+| Topic name                                            | Partitioned |
+|:------------------------------------------------------|:------------|
+| `persistent://sample/flink/simple-string`             | Y           |
+| `persistent://sample/flink/simple-string-partition-0` | N           |
+| `persistent://sample/flink/simple-string-partition-1` | N           |
+| `persistent://sample/flink/simple-string-partition-2` | N           |
 
 You can directly consume messages from the topic partitions by using the non-partitioned topic names above.
-For example, use `PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", "sample/flink/simple-string-partition-2")` would consume the partitions 1 and 2 of the `sample/flink/simple-string` topic.
+For example, use `PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", "sample/flink/simple-string-partition-2")`
+would consume the partitions 1 and 2 of the `sample/flink/simple-string` topic.
 
-#### RegexSubscriptionMode for Topic Pattern
+#### Setting Topic Pattern
 
-Pulsar connector extracts the topic type (`persistent` or `non-persistent`) from the given topic pattern.
-For example, `PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` would be `non-persistent`.
-The topic type would be `persistent` if you do not provide the topic type in the regular expression.
+Pulsar source extracts the topic type (`persistent` or `non-persistent`) from the given topic pattern.
+For example, you can use the `PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` to specify a `non-persistent` topic.
+By default, a `persistent` topic is created if you do not specify the topic type in the regular expression.
 
-To consume both `persistent` and `non-persistent` topics based on the topic pattern,
-you can use `setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)`.
-Pulsar connector would filter the available topics by the `RegexSubscriptionMode`.
+You can use `setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)` to consume
+both `persistent` and `non-persistent` topics based on the topic pattern.
+Pulsar source would filter the available topics by the `RegexSubscriptionMode`.
 
 ### Deserializer
 
-A deserializer (Deserialization schema) is required for parsing Pulsar messages. The deserializer is
-configured by `setDeserializationSchema(PulsarDeserializationSchema)`.
+A deserializer (`PulsarDeserializationSchema`) is for parsing Pulsar messages from bytes.
+You can configure the deserializer using `setDeserializationSchema(PulsarDeserializationSchema)`.

Review comment:
       ```suggestion
   A deserializer (`PulsarDeserializationSchema`) is for decoding Pulsar messages from bytes.
   You can configure the deserializer using `setDeserializationSchema(PulsarDeserializationSchema)`.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -206,16 +212,17 @@ PulsarSource.builder().setSubscriptionName("my-shared")
 PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive)
 ```
 
-If you want to use `Key_Shared` subscription type on the Pulsar connector. Ensure that you provide a `RangeGenerator` implementation.
-The `RangeGenerator` generates a set of key hash ranges so that
-a respective reader subtask will only dispatch messages where the hash of the message key is contained in the specified range.
+If you want to use the `Key_Shared` subscription type on the Pulsar connector.
+Ensure that you provide a `RangeGenerator` implementation. The `RangeGenerator` generates a set of
+key hash ranges so that a respective reader subtask only dispatches messages where the hash of the
+message key is contained in the specified range.
 
-Pulsar connector would use a `UniformRangeGenerator` which would divides the range by the Flink source parallelism
-if no `RangeGenerator` is provided in the `Key_Shared` subscription type.
+Pulsar connector uses a `UniformRangeGenerator` which divides the range by the Flink source
+parallelism if no `RangeGenerator` is provided in the `Key_Shared` subscription type.
 
 ### Starting Position
 
-Pulsar source is able to consume messages starting from different positions by `setStartCursor(StartCursor)`.
+Pulsar source is able to consume messages starting from different positions by setting the `setStartCursor(StartCursor)` option.

Review comment:
       ```suggestion
   The Pulsar source is able to consume messages starting from different positions by setting the `setStartCursor(StartCursor)` option.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -96,64 +101,65 @@ Pulsar source provide two ways of topic-partition subscription:
 
 #### Flexible Topic Naming
 
-Since Pulsar 2.0, all topic names internally have the form `{persistent|non-persistent}://tenant/namespace/topic`.
+Since Pulsar 2.0, all topic names internally are in  a form of  `{persistent|non-persistent}://tenant/namespace/topic`.
 Now, for partitioned topics, you can use short names in many cases (for the sake of simplicity).
 The flexible naming system stems from the fact that there is now a default topic type, tenant, and namespace in a Pulsar cluster.
 
-Topic property | Default
-:------------|:-------
-topic type | `persistent`
-tenant | `public`
-namespace | `default`
+| Topic property | Default      |
+|:---------------|:-------------|
+| topic type     | `persistent` |
+| tenant         | `public`     |
+| namespace      | `default`    |
 
-This table lists a mapping relationship between your input topic name and translated topic name:
+This table lists a mapping relationship between your input topic name and the translated topic name:
 
-Input topic name | Translated topic name
-:----------------|:---------------------
-`my-topic` | `persistent://public/default/my-topic`
-`my-tenant/my-namespace/my-topic` | `persistent://my-tenant/my-namespace/my-topic`
+| Input topic name                  | Translated topic name                          |
+|:----------------------------------|:-----------------------------------------------|
+| `my-topic`                        | `persistent://public/default/my-topic`         |
+| `my-tenant/my-namespace/my-topic` | `persistent://my-tenant/my-namespace/my-topic` |
 
 {{< hint warning >}}
-For non-persistent topics, you need to continue to specify the entire topic name,
+For non-persistent topics, you need to specify the entire topic name,
 as the default-based rules do not apply for non-partitioned topics.
-Thus, you cannot use a short name like `non-persistent://my-topic` and would need to use `non-persistent://public/default/my-topic` instead.
+Thus, you cannot use a short name like `non-persistent://my-topic` and would use `non-persistent://public/default/my-topic` instead.
 {{< /hint >}}
 
 #### Subscribing Pulsar Topic Partition
 
 Internally, Pulsar divides a partitioned topic as a set of non-partitioned topics according to the partition size.
 
-For example, if a `simple-string` topic with 3 partitions is created under the `sample` tenant with `flink` namespace.
+For example, if a `simple-string` topic with 3 partitions is created under the `sample` tenant with the `flink` namespace.
 The topics on Pulsar would be:
 
-Topic name | Partitioned
-:--------- | :----------
-`persistent://sample/flink/simple-string` | Y
-`persistent://sample/flink/simple-string-partition-0` | N
-`persistent://sample/flink/simple-string-partition-1` | N
-`persistent://sample/flink/simple-string-partition-2` | N
+| Topic name                                            | Partitioned |
+|:------------------------------------------------------|:------------|
+| `persistent://sample/flink/simple-string`             | Y           |
+| `persistent://sample/flink/simple-string-partition-0` | N           |
+| `persistent://sample/flink/simple-string-partition-1` | N           |
+| `persistent://sample/flink/simple-string-partition-2` | N           |
 
 You can directly consume messages from the topic partitions by using the non-partitioned topic names above.
-For example, use `PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", "sample/flink/simple-string-partition-2")` would consume the partitions 1 and 2 of the `sample/flink/simple-string` topic.
+For example, use `PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", "sample/flink/simple-string-partition-2")`
+would consume the partitions 1 and 2 of the `sample/flink/simple-string` topic.
 
-#### RegexSubscriptionMode for Topic Pattern
+#### Setting Topic Pattern
 
-Pulsar connector extracts the topic type (`persistent` or `non-persistent`) from the given topic pattern.
-For example, `PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` would be `non-persistent`.
-The topic type would be `persistent` if you do not provide the topic type in the regular expression.
+Pulsar source extracts the topic type (`persistent` or `non-persistent`) from the given topic pattern.
+For example, you can use the `PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` to specify a `non-persistent` topic.
+By default, a `persistent` topic is created if you do not specify the topic type in the regular expression.
 
-To consume both `persistent` and `non-persistent` topics based on the topic pattern,
-you can use `setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)`.
-Pulsar connector would filter the available topics by the `RegexSubscriptionMode`.
+You can use `setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)` to consume
+both `persistent` and `non-persistent` topics based on the topic pattern.
+Pulsar source would filter the available topics by the `RegexSubscriptionMode`.

Review comment:
       ```suggestion
   You can use `setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)` to consume
   both `persistent` and `non-persistent` topics based on the topic pattern.
   The Pulsar source would filter the available topics by the `RegexSubscriptionMode`.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -176,12 +182,12 @@ you can use the predefined `PulsarDeserializationSchema`. Pulsar connector provi
   ```
 
 Pulsar `Message<byte[]>` contains some [extra properties](https://pulsar.apache.org/docs/en/concepts-messaging/#messages),
-such as message key, message publish time, message time, application defined key/value pairs that will be attached to the message, etc.
+such as message key, message publish time, message time, and application-defined key/value pairs etc.
 These properties could be acquired by the `Message<byte[]>` interface.
 
 If you want to deserialize the Pulsar message by these properties, you need to implement `PulsarDeserializationSchema`.
-And ensure that the `TypeInformation` from the `PulsarDeserializationSchema.getProducedType()` must be correct.
-Flink would use this `TypeInformation` for passing the messages to downstream operators.
+And ensure that the `TypeInformation` from the `PulsarDeserializationSchema.getProducedType()` is correct.
+Flink uses this `TypeInformation` to pass the messages to downstream operators.

Review comment:
       ```suggestion
   Ensure that the `TypeInformation` from the `PulsarDeserializationSchema.getProducedType()` is correct.
   Flink uses this `TypeInformation` to pass the messages to downstream operators.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -227,14 +234,14 @@ Built-in start cursors include:
   StartCursor.latest()
   ```
 - Start from a specified message between the earliest and the latest.
-  Pulsar connector would consume from the latest available message if the message id doesn't exist.
+  Pulsar connector consumes from the latest available message if the message ID does not exist.

Review comment:
       ```suggestion
   The Pulsar connector consumes from the latest available message if the message ID does not exist.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -206,16 +212,17 @@ PulsarSource.builder().setSubscriptionName("my-shared")
 PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive)
 ```
 
-If you want to use `Key_Shared` subscription type on the Pulsar connector. Ensure that you provide a `RangeGenerator` implementation.
-The `RangeGenerator` generates a set of key hash ranges so that
-a respective reader subtask will only dispatch messages where the hash of the message key is contained in the specified range.
+If you want to use the `Key_Shared` subscription type on the Pulsar connector.
+Ensure that you provide a `RangeGenerator` implementation. The `RangeGenerator` generates a set of
+key hash ranges so that a respective reader subtask only dispatches messages where the hash of the
+message key is contained in the specified range.
 
-Pulsar connector would use a `UniformRangeGenerator` which would divides the range by the Flink source parallelism
-if no `RangeGenerator` is provided in the `Key_Shared` subscription type.
+Pulsar connector uses a `UniformRangeGenerator` which divides the range by the Flink source
+parallelism if no `RangeGenerator` is provided in the `Key_Shared` subscription type.

Review comment:
       ```suggestion
   The Pulsar connector uses `UniformRangeGenerator` that divides the range by the Flink source
   parallelism if no `RangeGenerator` is provided in the `Key_Shared` subscription type.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -227,14 +234,14 @@ Built-in start cursors include:
   StartCursor.latest()
   ```
 - Start from a specified message between the earliest and the latest.
-  Pulsar connector would consume from the latest available message if the message id doesn't exist.
+  Pulsar connector consumes from the latest available message if the message ID does not exist.
 
   The start message is included in consuming result.
   ```java
   StartCursor.fromMessageId(MessageId)
   ```
 - Start from a specified message between the earliest and the latest.
-  Pulsar connector would consume from the latest available message if the message id doesn't exist.
+  Pulsar connector consumes from the latest available message if the message id doesn't exist.

Review comment:
       ```suggestion
   The Pulsar connector consumes from the latest available message if the message ID doesn't exist.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -176,12 +182,12 @@ you can use the predefined `PulsarDeserializationSchema`. Pulsar connector provi
   ```
 
 Pulsar `Message<byte[]>` contains some [extra properties](https://pulsar.apache.org/docs/en/concepts-messaging/#messages),
-such as message key, message publish time, message time, application defined key/value pairs that will be attached to the message, etc.
+such as message key, message publish time, message time, and application-defined key/value pairs etc.
 These properties could be acquired by the `Message<byte[]>` interface.

Review comment:
       ```suggestion
   These properties could be defined in the `Message<byte[]>` interface.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar `borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option to change the transaction timeout.

Review comment:
       ```suggestion
   The default timeout for Pulsar transactions is 3 hours.
   Make sure that that timeout is greater than checkpoint interval + maximum recovery time.
   A shorter checkpoint interval indicates a better consuming performance.
   You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option to change the transaction timeout.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -248,37 +255,35 @@ Built-in start cursors include:
 {{< hint info >}}
 Each Pulsar message belongs to an ordered sequence on its topic.
 The sequence ID (`MessageId`) of the message is ordered in that sequence.
-`MessageId` contains some extra information (the ledger, entry, partition) on how the message is stored,
+The `MessageId` contains some extra information (the ledger, entry, partition) about how the message is stored,
 you can create a `MessageId` by using `DefaultImplementation.newMessageId(long ledgerId, long entryId, int partitionIndex)`.
 {{< /hint >}}
 
 ### Boundedness
 
 Pulsar source supports streaming and batch running modes.
-By default, the `PulsarSource` is set to run in the streaming mode.
+By default, the `PulsarSource` runs in the streaming mode.
 
-In streaming mode, Pulsar source never stops until a Flink job fails or is cancelled. However,
-you can set Pulsar source stopping at a stop position by using ```setUnboundedStopCursor(StopCursor)```.
-The Pulsar source will finish when all partitions reach their specified stop positions.
+In the streaming mode, Pulsar source never stops until a Flink job fails or is cancelled.
+However, you can use the `setUnboundedStopCursor(StopCursor)` to set the Pulsar source to stop at a specific stop position.
 
-You can use ```setBoundedStopCursor(StopCursor)``` to specify a stop position so that the Pulsar source can run in the batch mode.
-When all partitions have reached their stop positions, the source will finish.
+You can use `setBoundedStopCursor(StopCursor)` to specify a stop position to run in batch mode.

Review comment:
       ```suggestion
   You can use `setBoundedStopCursor(StopCursor)` to specify a stop position for bounded data. 
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar `borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by `setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics

Review comment:
       ```suggestion
   ### Producing to topics
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -287,65 +292,67 @@ Built-in stop cursors include:
   StopCursor.atEventTime(long)
   ```
 
-### Configurable Options
+### Source Configurable Options
 
 In addition to configuration options described above, you can set arbitrary options for `PulsarClient`,
-`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using `setConfig(ConfigOption<T>, T)` and `setConfig(Configuration)`.
+`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using `setConfig(ConfigOption<T>, T)`,
+`setConfig(Configuration)` and `setConfig(Properties)`.
 
 #### PulsarClient Options
 
-Pulsar connector use the [client API](https://pulsar.apache.org/docs/en/client-libraries-java/)
-to create the `Consumer` instance. Pulsar connector extracts most parts of Pulsar's `ClientConfigurationData`,
+The Pulsar connector uses the [client API](https://pulsar.apache.org/docs/en/client-libraries-java/)
+to create the `Consumer` instance. The Pulsar connector extracts most parts of Pulsar's `ClientConfigurationData`,
 which is required for creating a `PulsarClient`, as Flink configuration options in `PulsarOptions`.
 
 {{< generated/pulsar_client_configuration >}}
 
 #### PulsarAdmin Options
 
 The [admin API](https://pulsar.apache.org/docs/en/admin-api-overview/) is used for querying topic metadata
-and for discovering the desired topics when Pulsar connector uses topic pattern subscription. It would share most part of the
-configuration options with the client API. The configuration options listed here are only used in the admin API.
+and for discovering the desired topics when the Pulsar connector uses topic-pattern subscription.
+It shares most part of the configuration options with the client API.
+The configuration options listed here are only used in the admin API.
 They are also defined in `PulsarOptions`.
 
 {{< generated/pulsar_admin_configuration >}}
 
 #### Pulsar Consumer Options
 
 In general, Pulsar provides the Reader API and Consumer API for consuming messages in different scenarios.
-Flink's Pulsar connector uses the Consumer API. It extracts most parts of Pulsar's `ConsumerConfigurationData` as Flink configuration options in `PulsarSourceOptions`.
+The Pulsar connector uses the Consumer API. It extracts most parts of Pulsar's `ConsumerConfigurationData` as Flink configuration options in `PulsarSourceOptions`.
 
 {{< generated/pulsar_consumer_configuration >}}
 
 #### PulsarSource Options
 
 The configuration options below are mainly used for customizing the performance and message acknowledgement behavior.
-You can just leave them alone if you do not meet any performance issues.
+You can just leave them alone if you do not have any performance issues.

Review comment:
       ```suggestion
   You can ignore them if you do not have any performance issues.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -248,37 +255,35 @@ Built-in start cursors include:
 {{< hint info >}}
 Each Pulsar message belongs to an ordered sequence on its topic.
 The sequence ID (`MessageId`) of the message is ordered in that sequence.
-`MessageId` contains some extra information (the ledger, entry, partition) on how the message is stored,
+The `MessageId` contains some extra information (the ledger, entry, partition) about how the message is stored,
 you can create a `MessageId` by using `DefaultImplementation.newMessageId(long ledgerId, long entryId, int partitionIndex)`.
 {{< /hint >}}
 
 ### Boundedness
 
 Pulsar source supports streaming and batch running modes.
-By default, the `PulsarSource` is set to run in the streaming mode.
+By default, the `PulsarSource` runs in the streaming mode.
 
-In streaming mode, Pulsar source never stops until a Flink job fails or is cancelled. However,
-you can set Pulsar source stopping at a stop position by using ```setUnboundedStopCursor(StopCursor)```.
-The Pulsar source will finish when all partitions reach their specified stop positions.
+In the streaming mode, Pulsar source never stops until a Flink job fails or is cancelled.
+However, you can use the `setUnboundedStopCursor(StopCursor)` to set the Pulsar source to stop at a specific stop position.

Review comment:
       ```suggestion
   For unbounded data the Pulsar source never stops until a Flink job is stopped or failed. 
   You can use the `setUnboundedStopCursor(StopCursor)` to set the Pulsar source to stop at a specific stop position.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -287,65 +292,67 @@ Built-in stop cursors include:
   StopCursor.atEventTime(long)
   ```
 
-### Configurable Options
+### Source Configurable Options
 
 In addition to configuration options described above, you can set arbitrary options for `PulsarClient`,
-`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using `setConfig(ConfigOption<T>, T)` and `setConfig(Configuration)`.
+`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using `setConfig(ConfigOption<T>, T)`,
+`setConfig(Configuration)` and `setConfig(Properties)`.
 
 #### PulsarClient Options
 
-Pulsar connector use the [client API](https://pulsar.apache.org/docs/en/client-libraries-java/)
-to create the `Consumer` instance. Pulsar connector extracts most parts of Pulsar's `ClientConfigurationData`,
+The Pulsar connector uses the [client API](https://pulsar.apache.org/docs/en/client-libraries-java/)
+to create the `Consumer` instance. The Pulsar connector extracts most parts of Pulsar's `ClientConfigurationData`,
 which is required for creating a `PulsarClient`, as Flink configuration options in `PulsarOptions`.
 
 {{< generated/pulsar_client_configuration >}}
 
 #### PulsarAdmin Options
 
 The [admin API](https://pulsar.apache.org/docs/en/admin-api-overview/) is used for querying topic metadata
-and for discovering the desired topics when Pulsar connector uses topic pattern subscription. It would share most part of the
-configuration options with the client API. The configuration options listed here are only used in the admin API.
+and for discovering the desired topics when the Pulsar connector uses topic-pattern subscription.
+It shares most part of the configuration options with the client API.
+The configuration options listed here are only used in the admin API.
 They are also defined in `PulsarOptions`.
 
 {{< generated/pulsar_admin_configuration >}}
 
 #### Pulsar Consumer Options
 
 In general, Pulsar provides the Reader API and Consumer API for consuming messages in different scenarios.
-Flink's Pulsar connector uses the Consumer API. It extracts most parts of Pulsar's `ConsumerConfigurationData` as Flink configuration options in `PulsarSourceOptions`.
+The Pulsar connector uses the Consumer API. It extracts most parts of Pulsar's `ConsumerConfigurationData` as Flink configuration options in `PulsarSourceOptions`.
 
 {{< generated/pulsar_consumer_configuration >}}
 
 #### PulsarSource Options
 
 The configuration options below are mainly used for customizing the performance and message acknowledgement behavior.
-You can just leave them alone if you do not meet any performance issues.
+You can just leave them alone if you do not have any performance issues.
 
 {{< generated/pulsar_source_configuration >}}
 
 ### Dynamic Partition Discovery
 
 To handle scenarios like topic scaling-out or topic creation without restarting the Flink
-job, Pulsar source can be configured to periodically discover new partitions under provided
-topic-partition subscribing pattern. To enable partition discovery, set a non-negative value for
-the option `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS`:
+job, Pulsar source periodically discover new partitions under a provided
+topic-partition subscription pattern. To enable partition discovery, you can set a non-negative value for
+the `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS` option:
 
 ```java
 // discover new partitions per 10 seconds
 PulsarSource.builder()
-        .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 10000);
+    .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 10000);
 ```
 
 {{< hint warning >}}
-- Partition discovery is **enabled** by default. Pulsar connector would query the topic metadata every 30 seconds.
-- You need to set the partition discovery interval to a negative value to disable this feature.
-- The partition discovery would be disabled in batch mode even if you set this option with a non-negative value.
+- Partition discovery is **enabled** by default. The Pulsar connector queries the topic metadata every 30 seconds.
+- To disable partition discovery, you need to set a negative partition discovery interval.
+- The partition discovery is disabled in the batch mode even if you set this option with a non-negative value.
 {{< /hint >}}
 
 ### Event Time and Watermarks
 
 By default, the message uses the timestamp embedded in Pulsar `Message<byte[]>` as the event time.
-You can define your own `WatermarkStrategy` to extract the event time from the message,
+You can define a `WatermarkStrategy` to extract the event time from the message,

Review comment:
       The previous sentence is actually better. 
   ```suggestion
   You can define your own `WatermarkStrategy` to extract the event time from the message,
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -206,16 +212,17 @@ PulsarSource.builder().setSubscriptionName("my-shared")
 PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive)
 ```
 
-If you want to use `Key_Shared` subscription type on the Pulsar connector. Ensure that you provide a `RangeGenerator` implementation.
-The `RangeGenerator` generates a set of key hash ranges so that
-a respective reader subtask will only dispatch messages where the hash of the message key is contained in the specified range.
+If you want to use the `Key_Shared` subscription type on the Pulsar connector.
+Ensure that you provide a `RangeGenerator` implementation. The `RangeGenerator` generates a set of
+key hash ranges so that a respective reader subtask only dispatches messages where the hash of the
+message key is contained in the specified range.

Review comment:
       ```suggestion
   Ensure that you provide a `RangeGenerator` implementation if you want to use the `Key_Shared` subscription type on the Pulsar connector.
   The `RangeGenerator` generates a set of key hash ranges so that a respective reader subtask only dispatches messages where the hash of the message key is contained in the specified range.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -287,65 +292,67 @@ Built-in stop cursors include:
   StopCursor.atEventTime(long)
   ```
 
-### Configurable Options
+### Source Configurable Options
 
 In addition to configuration options described above, you can set arbitrary options for `PulsarClient`,
-`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using `setConfig(ConfigOption<T>, T)` and `setConfig(Configuration)`.
+`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using `setConfig(ConfigOption<T>, T)`,
+`setConfig(Configuration)` and `setConfig(Properties)`.
 
 #### PulsarClient Options
 
-Pulsar connector use the [client API](https://pulsar.apache.org/docs/en/client-libraries-java/)
-to create the `Consumer` instance. Pulsar connector extracts most parts of Pulsar's `ClientConfigurationData`,
+The Pulsar connector uses the [client API](https://pulsar.apache.org/docs/en/client-libraries-java/)
+to create the `Consumer` instance. The Pulsar connector extracts most parts of Pulsar's `ClientConfigurationData`,
 which is required for creating a `PulsarClient`, as Flink configuration options in `PulsarOptions`.
 
 {{< generated/pulsar_client_configuration >}}
 
 #### PulsarAdmin Options
 
 The [admin API](https://pulsar.apache.org/docs/en/admin-api-overview/) is used for querying topic metadata
-and for discovering the desired topics when Pulsar connector uses topic pattern subscription. It would share most part of the
-configuration options with the client API. The configuration options listed here are only used in the admin API.
+and for discovering the desired topics when the Pulsar connector uses topic-pattern subscription.
+It shares most part of the configuration options with the client API.
+The configuration options listed here are only used in the admin API.
 They are also defined in `PulsarOptions`.
 
 {{< generated/pulsar_admin_configuration >}}
 
 #### Pulsar Consumer Options
 
 In general, Pulsar provides the Reader API and Consumer API for consuming messages in different scenarios.
-Flink's Pulsar connector uses the Consumer API. It extracts most parts of Pulsar's `ConsumerConfigurationData` as Flink configuration options in `PulsarSourceOptions`.
+The Pulsar connector uses the Consumer API. It extracts most parts of Pulsar's `ConsumerConfigurationData` as Flink configuration options in `PulsarSourceOptions`.
 
 {{< generated/pulsar_consumer_configuration >}}
 
 #### PulsarSource Options
 
 The configuration options below are mainly used for customizing the performance and message acknowledgement behavior.
-You can just leave them alone if you do not meet any performance issues.
+You can just leave them alone if you do not have any performance issues.
 
 {{< generated/pulsar_source_configuration >}}
 
 ### Dynamic Partition Discovery
 
 To handle scenarios like topic scaling-out or topic creation without restarting the Flink
-job, Pulsar source can be configured to periodically discover new partitions under provided
-topic-partition subscribing pattern. To enable partition discovery, set a non-negative value for
-the option `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS`:
+job, Pulsar source periodically discover new partitions under a provided
+topic-partition subscription pattern. To enable partition discovery, you can set a non-negative value for
+the `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS` option:

Review comment:
       ```suggestion
   job, the Pulsar source periodically discover new partitions under a provided
   topic-partition subscription pattern. To enable partition discovery, you can set a non-negative value for
   the `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS` option:
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -248,37 +255,35 @@ Built-in start cursors include:
 {{< hint info >}}
 Each Pulsar message belongs to an ordered sequence on its topic.
 The sequence ID (`MessageId`) of the message is ordered in that sequence.
-`MessageId` contains some extra information (the ledger, entry, partition) on how the message is stored,
+The `MessageId` contains some extra information (the ledger, entry, partition) about how the message is stored,
 you can create a `MessageId` by using `DefaultImplementation.newMessageId(long ledgerId, long entryId, int partitionIndex)`.
 {{< /hint >}}
 
 ### Boundedness
 
 Pulsar source supports streaming and batch running modes.
-By default, the `PulsarSource` is set to run in the streaming mode.
+By default, the `PulsarSource` runs in the streaming mode.

Review comment:
       ```suggestion
   By default, the `PulsarSource` is configured for unbounded data.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -287,65 +292,67 @@ Built-in stop cursors include:
   StopCursor.atEventTime(long)
   ```
 
-### Configurable Options
+### Source Configurable Options
 
 In addition to configuration options described above, you can set arbitrary options for `PulsarClient`,
-`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using `setConfig(ConfigOption<T>, T)` and `setConfig(Configuration)`.
+`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using `setConfig(ConfigOption<T>, T)`,
+`setConfig(Configuration)` and `setConfig(Properties)`.
 
 #### PulsarClient Options
 
-Pulsar connector use the [client API](https://pulsar.apache.org/docs/en/client-libraries-java/)
-to create the `Consumer` instance. Pulsar connector extracts most parts of Pulsar's `ClientConfigurationData`,
+The Pulsar connector uses the [client API](https://pulsar.apache.org/docs/en/client-libraries-java/)
+to create the `Consumer` instance. The Pulsar connector extracts most parts of Pulsar's `ClientConfigurationData`,
 which is required for creating a `PulsarClient`, as Flink configuration options in `PulsarOptions`.
 
 {{< generated/pulsar_client_configuration >}}
 
 #### PulsarAdmin Options
 
 The [admin API](https://pulsar.apache.org/docs/en/admin-api-overview/) is used for querying topic metadata
-and for discovering the desired topics when Pulsar connector uses topic pattern subscription. It would share most part of the
-configuration options with the client API. The configuration options listed here are only used in the admin API.
+and for discovering the desired topics when the Pulsar connector uses topic-pattern subscription.
+It shares most part of the configuration options with the client API.
+The configuration options listed here are only used in the admin API.
 They are also defined in `PulsarOptions`.
 
 {{< generated/pulsar_admin_configuration >}}
 
 #### Pulsar Consumer Options
 
 In general, Pulsar provides the Reader API and Consumer API for consuming messages in different scenarios.
-Flink's Pulsar connector uses the Consumer API. It extracts most parts of Pulsar's `ConsumerConfigurationData` as Flink configuration options in `PulsarSourceOptions`.
+The Pulsar connector uses the Consumer API. It extracts most parts of Pulsar's `ConsumerConfigurationData` as Flink configuration options in `PulsarSourceOptions`.
 
 {{< generated/pulsar_consumer_configuration >}}
 
 #### PulsarSource Options
 
 The configuration options below are mainly used for customizing the performance and message acknowledgement behavior.
-You can just leave them alone if you do not meet any performance issues.
+You can just leave them alone if you do not have any performance issues.
 
 {{< generated/pulsar_source_configuration >}}
 
 ### Dynamic Partition Discovery
 
 To handle scenarios like topic scaling-out or topic creation without restarting the Flink
-job, Pulsar source can be configured to periodically discover new partitions under provided
-topic-partition subscribing pattern. To enable partition discovery, set a non-negative value for
-the option `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS`:
+job, Pulsar source periodically discover new partitions under a provided
+topic-partition subscription pattern. To enable partition discovery, you can set a non-negative value for
+the `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS` option:
 
 ```java
 // discover new partitions per 10 seconds
 PulsarSource.builder()
-        .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 10000);
+    .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 10000);
 ```
 
 {{< hint warning >}}
-- Partition discovery is **enabled** by default. Pulsar connector would query the topic metadata every 30 seconds.
-- You need to set the partition discovery interval to a negative value to disable this feature.
-- The partition discovery would be disabled in batch mode even if you set this option with a non-negative value.
+- Partition discovery is **enabled** by default. The Pulsar connector queries the topic metadata every 30 seconds.
+- To disable partition discovery, you need to set a negative partition discovery interval.
+- The partition discovery is disabled in the batch mode even if you set this option with a non-negative value.

Review comment:
       ```suggestion
   - Partition discovery is disabled for bounded data even if you set this option with a non-negative value.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar `borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by `setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you can provide a list of topics,
+partitions, or both of them.

Review comment:
       ```suggestion
   Defining the topics for producing is similar to the [topic-partition subscription](#topic-partition-subscription)
   in the Pulsar source. We support a mix-in style of topic setting. You can provide a list of topics,
   partitions, or both of them.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar `borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by `setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", "some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build  the Pulsar sink based on both the topic and its corresponding partitions, Pulsar sink merges them and only use the topic.

Review comment:
       ```suggestion
   If you build the Pulsar sink based on both the topic and its corresponding partitions, Pulsar sink merges them and only uses the topic.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar `borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.

Review comment:
       ```suggestion
   No consistency guarantees can be made in this scenario.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar `borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by `setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", "some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build  the Pulsar sink based on both the topic and its corresponding partitions, Pulsar sink merges them and only use the topic.
+
+For example, it uses the `PulsarSink.builder().setTopics("some-topic1", "some-topic1-partition-0")` option to build the Pulsar sink,
+it is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
+{{< /hint >}}
+
+### Serializer
+
+A serializer (`PulsarSerializationSchema`) is required to serializing the record instance into bytes.
+Similar to `PulsarSource`, Pulsar sink supports both Flink's `SerializationSchema` and
+Pulsar's `Schema`. But Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported in Pulsar Sink.

Review comment:
       ```suggestion
   A serializer (`PulsarSerializationSchema`) is required for serializing the record instance into bytes.
   Similar to `PulsarSource`, Pulsar sink supports both Flink's `SerializationSchema` and
   Pulsar's `Schema`. Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported in the Pulsar sink.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar `borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by `setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", "some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build  the Pulsar sink based on both the topic and its corresponding partitions, Pulsar sink merges them and only use the topic.
+
+For example, it uses the `PulsarSink.builder().setTopics("some-topic1", "some-topic1-partition-0")` option to build the Pulsar sink,
+it is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
+{{< /hint >}}
+
+### Serializer
+
+A serializer (`PulsarSerializationSchema`) is required to serializing the record instance into bytes.
+Similar to `PulsarSource`, Pulsar sink supports both Flink's `SerializationSchema` and
+Pulsar's `Schema`. But Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported in Pulsar Sink.
+
+If you do not need the message key and other message properties in Pulsar's
+[Message](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/Message.html) interface,
+you can use the predefined `PulsarSerializationSchema`. Pulsar sink provides two implementation methods.
+
+- Encode the message by using Pulsar's [Schema](https://pulsar.apache.org/docs/en/schema-understand/).
+  ```java
+  // Primitive types
+  PulsarSerializationSchema.pulsarSchema(Schema)
+
+  // Struct types (JSON, Protobuf, Avro, etc.)
+  PulsarSerializationSchema.pulsarSchema(Schema, Class)
+
+  // KeyValue type
+  PulsarSerializationSchema.pulsarSchema(Schema, Class, Class)
+  ```
+- Encode the message by using Flink's `SerializationSchema`
+  ```java
+  PulsarSerializationSchema.flinkSchema(SerializationSchema)
+  ```
+
+[Schema evolution](https://pulsar.apache.org/docs/en/schema-evolution-compatibility/#schema-evolution)
+can be enabled for users by using `PulsarSerializationSchema.pulsarSchema()` and
+`PulsarSinkBuilder.enableSchemaEvolution()`, meaning that any broker schema validation will be in place.
+Here is a code sample below.
+
+```java
+Schema<SomePojo> schema = Schema.AVRO(SomePojo.class);
+PulsarSerializationSchema<SomePojo> pulsarSchema = PulsarSerializationSchema.pulsarSchema(schema, SomePojo.class);
+
+PulsarSink<String> sink = PulsarSink.builder()
+    ...
+    .setSerializationSchema(pulsarSchema)
+    .enableSchemaEvolution()
+    .build();
+```
+
+{{< hint warning >}}
+If you use Pulsar schema without enabling schema evolution, the target topic will have a `Schema.BYTES` schema.
+And consumers need to handle the deserialization (if needed) themselves.

Review comment:
       ```suggestion
   Consumers will need to handle the deserialization (if needed) themselves.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar `borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by `setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", "some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build  the Pulsar sink based on both the topic and its corresponding partitions, Pulsar sink merges them and only use the topic.
+
+For example, it uses the `PulsarSink.builder().setTopics("some-topic1", "some-topic1-partition-0")` option to build the Pulsar sink,
+it is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
+{{< /hint >}}
+
+### Serializer
+
+A serializer (`PulsarSerializationSchema`) is required to serializing the record instance into bytes.
+Similar to `PulsarSource`, Pulsar sink supports both Flink's `SerializationSchema` and
+Pulsar's `Schema`. But Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported in Pulsar Sink.
+
+If you do not need the message key and other message properties in Pulsar's
+[Message](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/Message.html) interface,

Review comment:
       Should this really point to a snapshot version? 

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar `borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by `setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", "some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build  the Pulsar sink based on both the topic and its corresponding partitions, Pulsar sink merges them and only use the topic.
+
+For example, it uses the `PulsarSink.builder().setTopics("some-topic1", "some-topic1-partition-0")` option to build the Pulsar sink,
+it is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
+{{< /hint >}}
+
+### Serializer
+
+A serializer (`PulsarSerializationSchema`) is required to serializing the record instance into bytes.
+Similar to `PulsarSource`, Pulsar sink supports both Flink's `SerializationSchema` and
+Pulsar's `Schema`. But Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported in Pulsar Sink.
+
+If you do not need the message key and other message properties in Pulsar's
+[Message](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/Message.html) interface,
+you can use the predefined `PulsarSerializationSchema`. Pulsar sink provides two implementation methods.

Review comment:
       ```suggestion
   you can use the predefined `PulsarSerializationSchema`. The Pulsar sink provides two implementation methods.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar `borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by `setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", "some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build  the Pulsar sink based on both the topic and its corresponding partitions, Pulsar sink merges them and only use the topic.
+
+For example, it uses the `PulsarSink.builder().setTopics("some-topic1", "some-topic1-partition-0")` option to build the Pulsar sink,
+it is simplified to `PulsarSink.builder().setTopics("some-topic1")`.

Review comment:
       ```suggestion
   For example, when using the `PulsarSink.builder().setTopics("some-topic1", "some-topic1-partition-0")` option to build the Pulsar sink,
   this is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar `borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by `setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", "some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build  the Pulsar sink based on both the topic and its corresponding partitions, Pulsar sink merges them and only use the topic.
+
+For example, it uses the `PulsarSink.builder().setTopics("some-topic1", "some-topic1-partition-0")` option to build the Pulsar sink,
+it is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
+{{< /hint >}}
+
+### Serializer
+
+A serializer (`PulsarSerializationSchema`) is required to serializing the record instance into bytes.
+Similar to `PulsarSource`, Pulsar sink supports both Flink's `SerializationSchema` and
+Pulsar's `Schema`. But Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported in Pulsar Sink.
+
+If you do not need the message key and other message properties in Pulsar's
+[Message](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/Message.html) interface,
+you can use the predefined `PulsarSerializationSchema`. Pulsar sink provides two implementation methods.
+
+- Encode the message by using Pulsar's [Schema](https://pulsar.apache.org/docs/en/schema-understand/).
+  ```java
+  // Primitive types
+  PulsarSerializationSchema.pulsarSchema(Schema)
+
+  // Struct types (JSON, Protobuf, Avro, etc.)
+  PulsarSerializationSchema.pulsarSchema(Schema, Class)
+
+  // KeyValue type
+  PulsarSerializationSchema.pulsarSchema(Schema, Class, Class)
+  ```
+- Encode the message by using Flink's `SerializationSchema`
+  ```java
+  PulsarSerializationSchema.flinkSchema(SerializationSchema)
+  ```
+
+[Schema evolution](https://pulsar.apache.org/docs/en/schema-evolution-compatibility/#schema-evolution)
+can be enabled for users by using `PulsarSerializationSchema.pulsarSchema()` and
+`PulsarSinkBuilder.enableSchemaEvolution()`, meaning that any broker schema validation will be in place.
+Here is a code sample below.

Review comment:
       ```suggestion
   [Schema evolution](https://pulsar.apache.org/docs/en/schema-evolution-compatibility/#schema-evolution)
   can be enabled by users using `PulsarSerializationSchema.pulsarSchema()` and
   `PulsarSinkBuilder.enableSchemaEvolution()`. This means that any broker schema validation is in place.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar `borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by `setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", "some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.

Review comment:
       ```suggestion
   Configuring writing targets can be replaced by using a custom [`TopicRouter`]
   [message routing](#message-routing). Configuring partitions on the Pulsar connector is explained in the [flexible topic naming](#flexible-topic-naming) section.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -24,14 +24,13 @@ under the License.
 
 # Apache Pulsar Connector
 
-Flink provides an [Apache Pulsar](https://pulsar.apache.org) source connector for reading data from Pulsar topics with exactly-once guarantees.
+Flink provides an [Apache Pulsar](https://pulsar.apache.org) connector for reading data from Pulsar topics with exactly-once guarantees.

Review comment:
       ```suggestion
   Flink provides an [Apache Pulsar](https://pulsar.apache.org) connector for reading and writing data from and to Pulsar topics with exactly-once guarantees.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar `borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.

Review comment:
       ```suggestion
   The Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar `borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}

Review comment:
       Like for the Source, just remove parts that are referring to older versions

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar `borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by `setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic dashboard.
+Give you a mechanism to monitor the performance of your flink applications.

Review comment:
       ```suggestion
   It is recommended to set the producer name in Pulsar Source by `setProducerName(String)`.
   This sets a unique name for the Flink connector in the Pulsar statistic dashboard.
   You can use it to monitor the performance of your Flink connector and applications.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -248,37 +255,35 @@ Built-in start cursors include:
 {{< hint info >}}
 Each Pulsar message belongs to an ordered sequence on its topic.
 The sequence ID (`MessageId`) of the message is ordered in that sequence.
-`MessageId` contains some extra information (the ledger, entry, partition) on how the message is stored,
+The `MessageId` contains some extra information (the ledger, entry, partition) about how the message is stored,
 you can create a `MessageId` by using `DefaultImplementation.newMessageId(long ledgerId, long entryId, int partitionIndex)`.
 {{< /hint >}}
 
 ### Boundedness
 
 Pulsar source supports streaming and batch running modes.

Review comment:
       ```suggestion
   The Pulsar source supports streaming and batch execution mode.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar `borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a specified list of Pulsar partitions.

Review comment:
       ```suggestion
   The Pulsar Sink supports writing records into one or more Pulsar topics or a specified list of Pulsar partitions.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org