You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2022/09/30 04:29:22 UTC

[flink] 05/06: [FLINK-29381][Connector/Pulsar] Add a document on how to use Key_Shared subscription.

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0dbcca0b87117b73491b4cf6a8a54da094621f55
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Wed Sep 28 15:27:53 2022 +0800

    [FLINK-29381][Connector/Pulsar] Add a document on how to use Key_Shared subscription.
---
 .../docs/connectors/datastream/pulsar.md           | 16 +++++++--
 docs/content/docs/connectors/datastream/pulsar.md  | 39 ++++++++++++++++++++--
 2 files changed, 50 insertions(+), 5 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/pulsar.md b/docs/content.zh/docs/connectors/datastream/pulsar.md
index 8dab844e14b..0778eb8b9eb 100644
--- a/docs/content.zh/docs/connectors/datastream/pulsar.md
+++ b/docs/content.zh/docs/connectors/datastream/pulsar.md
@@ -273,9 +273,21 @@ PulsarSource.builder().set_subscription_name("my-exclusive").set_subscription_ty
 {{< /tab >}}
 {{< /tabs >}}
 
-如果想在 Pulsar Source 里面使用 `key 共享` 订阅,需要提供 `RangeGenerator` 实例。`RangeGenerator` 会生成一组消息 key 的 hash 范围,Pulsar Source 会基于给定的范围来消费数据。
+#### Key_Shared 订阅
 
-Pulsar Source 也提供了一个名为 `UniformRangeGenerator` 的默认实现,它会基于 flink 数据源的并行度将 hash 范围均分。
+当时用 Key_Shared 订阅时,Pulsar 将会基于 Message 的 key 去计算对应的 Hash 值,Hash 取值范围为(0~65535)。我们首先会使用 `Message.getOrderingKey()` 计算 Hash,如果没有则会依次使用 `Message.getKey()` 和 `Message.getKeyBytes()`。对于上述 key 都找不到的消息,我们会使用字符串 `"NO_KEY"` 来计算消息的 Hash 值。
+
+在 Flink Connector 中针对 Key_Shared 订阅提供了两种消费模式,分别是 `KeySharedMode.SPLIT` 和 `KeySharedMode.JOIN`,它们的实际消费行为并不相同。`KeySharedMode.JOIN` 会把所有的给定的 Hash 范围放于一个 Reader 中进行消费,而 `KeySharedMode.SPLIT` 会打散给定的 Hash 范围于不同的 Reader 中消费。
+
+之所以这么设计的主要原因是因为,在 Key_Shared 的订阅模式中,如果一条消息找不到对应的消费者,所有的消息都不会继续往下发送。所以我们提供了 `KeySharedMode.JOIN` 模式,允许用户只消费部分 Hash 范围的消息。
+
+##### 定义 RangeGenerator
+
+如果想在 Pulsar Source 里面使用 `Key_Shared` 订阅,需要提供 `RangeGenerator` 实例。`RangeGenerator` 会生成一组消息 key 的 hash 范围,Pulsar Source 会基于给定的范围来消费数据。
+
+Pulsar Source 也提供了一个名为 `SplitRangeGenerator` 的默认实现,它会基于 flink 数据源的并行度将 hash 范围均分。
+
+由于 Pulsar 并未提供 Key 的 Hash 计算方法,所以我们在 Flink 中提供了名为 `FixedKeysRangeGenerator` 的实现,你可以在 builder 中依次提供需要消费的 Key 内容即可。但需要注意的是,Pulsar 的 Key Hash 值并不对应唯一的一个 Key,所以如果你只想消费某几个 Key 的消息,还需要在后面的代码中使用 `DataStream.filter()` 方法来过滤出对应的消息。
 
 ### 起始消费位置
 
diff --git a/docs/content/docs/connectors/datastream/pulsar.md b/docs/content/docs/connectors/datastream/pulsar.md
index b3a8ec6300b..34a3bbcd762 100644
--- a/docs/content/docs/connectors/datastream/pulsar.md
+++ b/docs/content/docs/connectors/datastream/pulsar.md
@@ -301,12 +301,45 @@ PulsarSource.builder().set_subscription_name("my-exclusive").set_subscription_ty
 {{< /tab >}}
 {{< /tabs >}}
 
-Ensure that you provide a `RangeGenerator` implementation if you want to use the `Key_Shared` subscription type on the Pulsar connector.
-The `RangeGenerator` generates a set of key hash ranges so that a respective reader subtask only dispatches messages where the hash of the message key is contained in the specified range.
+#### Key_Shared subscriptions
 
-The Pulsar connector uses `UniformRangeGenerator` that divides the range by the Flink source
+All the Pulsar's messages will be calculated with a key hash in Key_Shared subscription.
+The hash range must be 0 to 65535. We try to compute the key hash in the order of `Message.getOrderingKey()`,
+`Message.getKey()` or `Message.getKeyBytes()`. We will use `"NO_KEY"` str as the message key if none of these keys has been provided.
+
+Pulsar's Key_Shared subscription comes in two forms in Connector, the `KeySharedMode.SPLIT` and `KeySharedMode.JOIN`.
+Different `KeySharedMode` means different split assignment behaviors. If you only consume a subset of Pulsar's key hash range,
+remember to use the `KeySharedMode.JOIN` which will subscribe all the range in only one reader.
+Otherwise, when the ranges can join into a full Pulsar key hash range (0~65535) you should use `KeySharedMode.SPLIT`
+mode for sharing the splits among all the backend readers.
+
+In the `KeySharedMode.SPLIT` mode. The topic will be subscribed by multiple readers.
+But Pulsar has one limit in this situation. That is if a Message can't find the corresponding reader by the key hash range.
+No messages will be delivered to the current readers, until there is a reader which can subscribe to such messages.
+
+##### Define a RangeGenerator
+
+Ensure that you have provided a `RangeGenerator` implementation if you want to use the `Key_Shared` subscription type on the Pulsar connector.
+The `RangeGenerator` generates a set of key hash ranges so that a respective reader subtask only dispatches
+messages where the hash of the message key is contained in the specified range.
+
+The Pulsar connector uses `SplitRangeGenerator` that divides the range by the Flink source
 parallelism if no `RangeGenerator` is provided in the `Key_Shared` subscription type.
 
+Since the Pulsar didn't expose the key hash range method. We have to provide an `FixedKeysRangeGenerator` for end-user.
+You can add the keys you want to consume, no need to calculate any hash ranges.
+The key's hash isn't specified to only one key, so the consuming results may contain the messages with
+different keys comparing the keys you have defined in this range generator.
+Remember to use flink's `DataStream.filter()` method after the Pulsar source.
+
+```java
+FixedKeysRangeGenerator.builder()
+    .supportNullKey()
+    .key("someKey")
+    .keys(Arrays.asList("key1", "key2"))
+    .build()
+```
+
 ### Starting Position
 
 The Pulsar source is able to consume messages starting from different positions by setting the `setStartCursor(StartCursor)` option.