You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by houxiaoyu <an...@gmail.com> on 2023/03/06 02:01:03 UTC

Re: [DISCUSS] PIP-247: Notifications for partitions update

Bump. Are there other concerns or suggestions about this PIP :)  Ping @
Michael @Joe @Enrico

Thanks
Xiaoyu Hou

houxiaoyu <an...@gmail.com> 于2023年2月27日周一 14:10写道:

> Hi Joe and Michael,
>
> I think I misunderstood what you replied before. Now I understand and
> explain it again.
>
> Besides the reasons what Asaf mentioned above, there are also some limits
> for using topic list watcher.  For example the `topicsPattern.pattern` must
> less that `maxSubscriptionPatternLeng` [0]. If the consumer subscribes
> multi partitioned-topics, the `topicsPattern.pattern` maybe very long.
>
> So I think that it's better to have a separate notification implementation
> for partition update.
>
> [0]
> https://github.com/apache/pulsar/blob/5d6932137d76d544f939bef27df25f61b4a4d00d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java#L115-L126
>
> Thanks,
> Xiaoyu Hou
>
> houxiaoyu <an...@gmail.com> 于2023年2月27日周一 10:56写道:
>
>> Hi Michael,
>>
>> >  I think we just need the client to "subscribe" to a topic notification
>> for
>> >  "<topic-name>-partition-[0-9]+" to eliminate the polling
>>
>> If pulsar users want to pub/sub a partitioned-topic, I think most of the
>> users would like to create a simple producer or consumer like following:
>> ```
>> Producer<byte[]> producer = client.newProducer().topic(topic).create();
>> producer.sendAsync(msg);
>> ```
>> ```
>> client.newConsumer()
>>         .topic(topic)
>>         .subscriptionName(subscription)
>>         .subscribe();
>> ```
>> I think there is no reason for users to use `topicsPattern` if a pulsar
>> just wants to subscribe a partitioned-topic. In addition, `topicsPattern`
>> couldn't be used for producers.
>>
>> So I think PIP-145 [0] will benefit for regex subscriptions.  And this
>> PIP [1] will benefit for the common partitioned-topic pub/sub scenario.
>>
>> [0] https://github.com/apache/pulsar/issues/14505
>> [1] https://github.com/apache/pulsar/issues/19596
>>
>> Thanks
>> Xiaoyu Hou
>>
>> Michael Marshall <mm...@apache.org> 于2023年2月25日周六 01:29写道:
>>
>>> > Just the way to implements partitioned-topic metadata
>>> > notification mechanism is much like notifications on regex sub changes
>>>
>>> Why do we need a separate notification implementation? The regex
>>> subscription feature is about discovering topics (not subscriptions)
>>> that match a regular expression. As Joe mentioned, I think we just
>>> need the client to "subscribe" to a topic notification for
>>> "<topic-name>-partition-[0-9]+" to eliminate the polling.
>>>
>>> Building on PIP 145, the work for this PIP would be in implementing a
>>> different `TopicsChangedListener` [1] so that the result of an added
>>> topic is to add a producer/consumer to the new partition.
>>>
>>> I support removing polling in our streaming platform, but I'd prefer
>>> to limit the number of notification systems we implement.
>>>
>>> Thanks,
>>> Michael
>>>
>>> [0] https://github.com/apache/pulsar/pull/16062
>>> [1]
>>> https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java#L169-L175
>>>
>>>
>>>
>>> On Fri, Feb 24, 2023 at 1:57 AM houxiaoyu <an...@gmail.com> wrote:
>>> >
>>> > Hi Joe,
>>> >
>>> > When we use PartitionedProducerImpl or MultiTopicsConsumerImpl,  there
>>> is a
>>> > poll task to fetch the metadata of the partitioned-topic regularly for
>>> the
>>> > number of partitions updated.  This PIP wants to use a
>>> > notification mechanism to replace the metadata poll task.
>>> >
>>> > Just the way to implements partitioned-topic metadata
>>> > notification mechanism is much like notifications on regex sub changes
>>> >
>>> > Joe F <jo...@gmail.com> 于2023年2月24日周五 13:37写道:
>>> >
>>> > > Why is this needed when we have notifications on regex sub changes?
>>> Aren't
>>> > > the partition names a well-defined regex?
>>> > >
>>> > > Joe
>>> > >
>>> > > On Thu, Feb 23, 2023 at 8:52 PM houxiaoyu <an...@gmail.com>
>>> wrote:
>>> > >
>>> > > > Hi Asaf,
>>> > > > thanks for your reminder.
>>> > > >
>>> > > > ## Changing
>>> > > > I have updated the following changes to make sure the notification
>>> > > arrived
>>> > > > successfully:
>>> > > > 1. The watch success response `CommandWatchPartitionUpdateSuccess`
>>> will
>>> > > > contain all the concerned topics of this watcher
>>> > > > 2. The notification `CommandPartitionUpdate` will always contain
>>> all the
>>> > > > concerned topics of this watcher.
>>> > > > 3. The notification `CommandPartitionUpdate`contains a
>>> monotonically
>>> > > > increased version.
>>> > > > 4. A map
>>> `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
>>> > > > Pair<version, long/*timestamp*/>>` will keep track of the updating
>>> > > > 5. A timer will check the updating timeout through `inFlightUpdate`
>>> > > > 6. The client acks `CommandPartitionUpdateResult` to broker when it
>>> > > > finishes updating.
>>> > > >
>>> > > > ## Details
>>> > > >
>>> > > > The following mechanism could make sure the newest notification
>>> arrived
>>> > > > successfully, copying the description from GH:
>>> > > >
>>> > > > A new class, `org.apache.pulsar.PartitonUpdateWatcherService` will
>>> keep
>>> > > > track of watchers and will listen to the changes in the metadata.
>>> > > Whenever
>>> > > > a topic partition updates it checks if any watchers should be
>>> notified
>>> > > and
>>> > > > sends an update for all topics the watcher concerns through the
>>> > > ServerCnx.
>>> > > > Then we will record this request into a map,
>>> > > > `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
>>> > > Pair<version,
>>> > > > long/*timestamp*/>>`.  A timer will check this update timeout
>>> through
>>> > > > inFlightUpdate .  We will query all the concerned topics's
>>> partition if
>>> > > > this watcher has sent an update timeout and will resend it.
>>> > > >
>>> > > > The client acks `CommandPartitionUpdateResult` to broker when it
>>> finishes
>>> > > > updating.  The broker handle `CommandPartitionUpdateResult`
>>> request:
>>> > > >  - If CommandPartitionUpdateResult#version <
>>> > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version,
>>> > > broker
>>> > > > ignores this ack.
>>> > > >  -  If CommandPartitionUpdateResult#version ==
>>> > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version
>>> > > >     - If CommandPartitionUpdateResult#success is true,  broker just
>>> > > removes
>>> > > > the watcherID from inFlightUpdate.
>>> > > >     - If CommandPartitionUpdateResult#success is false,  broker
>>> removes
>>> > > the
>>> > > > watcherId from inFlightUpdate, and queries all the concerned
>>> topics's
>>> > > > partition and resend.
>>> > > >  - If CommandPartitionUpdateResult#version >
>>> > > >
>>> PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version, this
>>> > > > should not happen.
>>> > > >
>>> > > >  ## Edge cases
>>> > > > - Broker restarts or crashes
>>> > > > Client will reconnect to another broker, broker responses
>>> > > > `CommandWatchPartitionUpdateSuccess` with watcher concerned
>>> topics's
>>> > > > partitions.  We will call `PartitionsUpdateListener` if the
>>> connection
>>> > > > opens.
>>> > > > - Client acks fail or timeout
>>> > > > Broker will resend the watcher concerned topics's partitions either
>>> > > client
>>> > > > acks fail or acks timeout.
>>> > > > - Partition updates before client acks.
>>> > > > `CommandPartitionUpdate#version` monotonically increases every
>>> time it is
>>> > > > updated. If Partition updates before client acks, a greater
>>> version will
>>> > > be
>>> > > > put into `PartitonUpdateWatcherService#inFlightUpdate`.  The
>>> previous
>>> > > acks
>>> > > > will be ignored because the version is less than the current
>>> version.
>>> > > >
>>> > > >
>>> > > > Asaf Mesika <as...@gmail.com> 于2023年2月22日周三 21:33写道:
>>> > > >
>>> > > > > How about edge cases?
>>> > > > > In Andra's PIP he took into account cases where updates were
>>> lost, so
>>> > > he
>>> > > > > created a secondary poll. Not saying it's the best situation for
>>> your
>>> > > > case
>>> > > > > of course.
>>> > > > > I'm saying that when a broker sends an update
>>> CommandPartitionUpdate,
>>> > > how
>>> > > > > do you know it arrived successfully? From my memory, there is no
>>> ACK in
>>> > > > the
>>> > > > > protocol, saying "I'm the client, I got the update successfully"
>>> and
>>> > > only
>>> > > > > then it removed the "dirty" flag for that topic, for this
>>> watcher ID.
>>> > > > >
>>> > > > > Are there any other edge cases we can have? Let's be exhaustive.
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > > > On Wed, Feb 22, 2023 at 1:14 PM houxiaoyu <an...@gmail.com>
>>> wrote:
>>> > > > >
>>> > > > > > Thanks for your great suggestion Enrico.
>>> > > > > >
>>> > > > > > I agreed with you. It's more reasonable to add a
>>> > > > > > `supports_partition_update_watchers`  in `FeatureFlags`  to
>>> detect
>>> > > that
>>> > > > > the
>>> > > > > > connected broker supporting this feature , and add a new broker
>>> > > > > > configuration property `enableNotificationForPartitionUpdate`
>>> with
>>> > > > > default
>>> > > > > > value true, which is much like PIP-145.
>>> > > > > >
>>> > > > > > I have updated the descriptions.
>>> > > > > >
>>> > > > > > Enrico Olivelli <eo...@gmail.com> 于2023年2月22日周三 17:26写道:
>>> > > > > >
>>> > > > > > > I support this proposal.
>>> > > > > > > Coping here my comments from GH:
>>> > > > > > >
>>> > > > > > > can't we enable this by default in case we detect that the
>>> > > connected
>>> > > > > > > Broker supports it ?
>>> > > > > > > I can't find any reason for not using this mechanism if it is
>>> > > > > available.
>>> > > > > > >
>>> > > > > > > Maybe we can set the default to "true" and allow users to
>>> disable
>>> > > it
>>> > > > > > > in case it impacts their systems in an unwanted way.
>>> > > > > > >
>>> > > > > > > Maybe It would be useful to have a way to disable the
>>> mechanism on
>>> > > > the
>>> > > > > > > broker side as well
>>> > > > > > >
>>> > > > > > > Enrico
>>> > > > > > >
>>> > > > > > > Il giorno mer 22 feb 2023 alle ore 10:22 houxiaoyu
>>> > > > > > > <an...@gmail.com> ha scritto:
>>> > > > > > > >
>>> > > > > > > > Hi Pulsar community:
>>> > > > > > > >
>>> > > > > > > > I opened a PIP to discuss "Notifications for partitions
>>> update"
>>> > > > > > > >
>>> > > > > > > > ### Motivation
>>> > > > > > > >
>>> > > > > > > > Pulsar client will poll brokers at fix time for checking
>>> the
>>> > > > > partitions
>>> > > > > > > > update if we publish/subscribe the partitioned topics with
>>> > > > > > > > `autoUpdatePartitions` as true. This causes unnecessary
>>> load for
>>> > > > > both
>>> > > > > > > > clients and brokers since most of the time the number of
>>> > > partitions
>>> > > > > > will
>>> > > > > > > > not change. In addition polling introduces latency in
>>> partitions
>>> > > > > update
>>> > > > > > > >  which is specified by `autoUpdatePartitionsInterval`.
>>> > > > > > > > This PIP would like to introduce a notification mechanism
>>> for
>>> > > > > partition
>>> > > > > > > > update, which is much like PIP-145 for regex subscriptions
>>> > > > > > > > https://github.com/apache/pulsar/issues/14505.
>>> > > > > > > >
>>> > > > > > > > For more details, please read the PIP at:
>>> > > > > > > > https://github.com/apache/pulsar/issues/19596
>>> > > > > > > > Looking forward to hearing your thoughts.
>>> > > > > > > >
>>> > > > > > > > Thanks,
>>> > > > > > > > Xiaoyu Hou
>>> > > > > > > > ----
>>> > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>>
>>

Re: [DISCUSS] PIP-247: Notifications for partitions update

Posted by houxiaoyu <an...@gmail.com>.
Hi all

Is there any other comments about this design :)

Thanks,
Xiaoyu Hou

houxiaoyu <an...@gmail.com> 于2023年3月8日周三 16:22写道:

> Hi Michael,
>
> > is there a reason that we couldn't also use this to improve PIP 145?
>
> The protocol described in this PIP could also be used to improve PIP-145.
> However I think that it' not a good reason that we use  the regex sub
> watcher to implement the partitioned update watcher because of the other
> reasons we mentioned above.
>
> > Since we know we're using a TCP connection, is it possible to rely on
> > pulsar's keep alive timeout (the broker and the client each have their
> > own) to close a connection that isn't responsive?
>
> Maybe it could fail on application layer I think,  for example, the
> partitioned update listener run fail unexceptionly.  Currently another task
> will be scheduled if the poll task encounters error in partition auto
> update timer task. [0]
>
> > Regarding the connection, which connection should the client use to send
> the watch requests?
>
> The `PartitionUpdateWatcher` will call `connectionHandler.grabCnx()` to
> open an connection, which is analogous to `TopicListWatcher`. [1]
>
> > do we plan on using metadata storenotifications to trigger the callbacks
> that trigger notifications sent
> > to the clients
>
> Yes, we will just look up the metadataStore to fetch the count of the
> partitions and register a watcher to the metadataStore to trigger the count
> update.
>
> > One nit on the protobuf for CommandWatchPartitionUpdateSuccess:
> >
> >    repeated string topics         = 3;
> >   repeated uint32 partitions     = 4;
> >
> > What do you think about using a repeated message that represents a
> > pair of a topic and its partition count instead of using two lists?
>
> Great. It looks better using a repeated message, I will update the
> protobuf.
>
> > How will we handle the case where a watched topic does not exist?
>
> 1. When `PulsarClient` calls `create()` to create a producer or  calls
> `subscribe()` to create a consumer,  the client will first get
> partitioned-topic metadata from broker, [2]. If the topic doesn't exist and
> `isAllowAutoTopicCreation=true` in broker, the partitioned-topic zk node
> will auto create with default partition num.
> 2.  After the client getting partitioned-topic metadata successfully,  the
> `PartitionedProducerImpl` will be create if `meta.partition >
> 0`.  `PartitionUpdateWatcher` will be initilized in
> `PartitionedProducerImpl` constructor. The `PartitionUpdateWatcher` sends
> command to broker to register a watcher. If any topic in the topicList
> doesn't exist,  the broker will send error to the client and the
> `PartitionedProducerImpl` will start fail.  `MultiTopicsConsumerImpl` will
> work in the same way.
>
> > I want to touch on authorization. A role should have "lookup"
> > permission to watch for updates on each partitioned topic that it
> > watches. As a result, if we allow for a request to watch multiple
> > topics, some might succeed while others fail. How do we handle partial
> > success?
>
> If any topic in the topicList authorizes fail, the broker will send error
> to the client. The following reasons support this action:
> 1. Before we sending command to register a partition update watcher, the
> client should have send the `CommandPartitionedTopicMetadata` and should
> have the `lookup` permission [3] [4].
> 2. Currently if any topic subsrbies fail the consumer wil start faiil. [5]
>
>
> [0]
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L1453-L1461
>
> [1]
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java#L67-L81
>
> [2]
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java#L365-L371
>
> [3]
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L903-L923
>
> [4]
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L558-L560
>
> [5]
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L171-L193
>
> Thanks,
> Xiaoyu Hou
>
> Michael Marshall <mm...@apache.org> 于2023年3月7日周二 15:43写道:
>
>> Thanks for the context Xiaoyu Hou and Asaf. I appreciate the
>> efficiencies that we can gain by creating a specific implementation
>> for the partitioned topic use case. I agree that this new notification
>> system makes sense based on Pulsar's current features, and I have some
>> implementation questions.
>>
>> >- If the broker sends notification and it's lost due network issues,
>> > you'll only know about it due to the client doing constant polling,
>> using
>> > its hash to minimize response.
>>
>> I see that we implemented an ack mechanism to get around this. I
>> haven't looked closely, but is there a reason that we couldn't also
>> use this to improve PIP 145?
>>
>> Since we know we're using a TCP connection, is it possible to rely on
>> pulsar's keep alive timeout (the broker and the client each have their
>> own) to close a connection that isn't responsive? Then, when the
>> connection is re-established, the client would get the latest topic
>> partition count.
>>
>> Regarding the connection, which connection should the client use to
>> send the watch requests? At the moment, the "parent" partitioned topic
>> does not have an owner, but perhaps it would help this design to make
>> a single owner for a given partitioned topic. This could trivially be
>> done using the existing bundle mapping. Then, all watchers for a given
>> partitioned topic would be hosted on the same broker, which should be
>> more efficient. I don't think we currently redirect clients to any
>> specific bundle when creating the metadata for a partitioned topic,
>> but if we did, then we might be able to remove some edge cases for
>> notification delivery because a single broker would update the
>> metadata store and then trigger the notifications to the clients. If
>> we don't use this implementation, do we plan on using metadata store
>> notifications to trigger the callbacks that trigger notifications sent
>> to the clients?
>>
>> > - Each time meta-update you'll need to run it through regular
>> > expression, on all topics hosted on the broker, for any given client.
>> > That's a lot of CPU.
>> > - Suggested mechanism mainly cares about the count of partitions, so
>> > it's a lot more efficient.
>>
>> I forgot the partition count was its own piece of metadata that the
>> broker can watch for. That part definitely makes sense to me.
>>
>> One nit on the protobuf for CommandWatchPartitionUpdateSuccess:
>>
>>     repeated string topics         = 3;
>>     repeated uint32 partitions     = 4;
>>
>> What do you think about using a repeated message that represents a
>> pair of a topic and its partition count instead of using two lists?
>>
>> How will we handle the case where a watched topic does not exist?
>>
>> I want to touch on authorization. A role should have "lookup"
>> permission to watch for updates on each partitioned topic that it
>> watches. As a result, if we allow for a request to watch multiple
>> topics, some might succeed while others fail. How do we handle partial
>> success?
>>
>> One interesting detail is that this PIP is essentially aligned with
>> notifying clients when topic metadata changes while PIP 145 was
>> related to topic creation itself. An analogous proposal could request
>> a notification for any topic that gets a new metadata label. I do not
>> think it is worth considering that case in this design.
>>
>> Thanks,
>> Michael
>>
>> [0] https://lists.apache.org/thread/t4cwht08d4mhp3qzoxmqh6tht8l0728r
>>
>> On Sun, Mar 5, 2023 at 8:01 PM houxiaoyu <an...@gmail.com> wrote:
>> >
>> > Bump. Are there other concerns or suggestions about this PIP :)  Ping @
>> > Michael @Joe @Enrico
>> >
>> > Thanks
>> > Xiaoyu Hou
>> >
>> > houxiaoyu <an...@gmail.com> 于2023年2月27日周一 14:10写道:
>> >
>> > > Hi Joe and Michael,
>> > >
>> > > I think I misunderstood what you replied before. Now I understand and
>> > > explain it again.
>> > >
>> > > Besides the reasons what Asaf mentioned above, there are also some
>> limits
>> > > for using topic list watcher.  For example the
>> `topicsPattern.pattern` must
>> > > less that `maxSubscriptionPatternLeng` [0]. If the consumer subscribes
>> > > multi partitioned-topics, the `topicsPattern.pattern` maybe very long.
>> > >
>> > > So I think that it's better to have a separate notification
>> implementation
>> > > for partition update.
>> > >
>> > > [0]
>> > >
>> https://github.com/apache/pulsar/blob/5d6932137d76d544f939bef27df25f61b4a4d00d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java#L115-L126
>> > >
>> > > Thanks,
>> > > Xiaoyu Hou
>> > >
>> > > houxiaoyu <an...@gmail.com> 于2023年2月27日周一 10:56写道:
>> > >
>> > >> Hi Michael,
>> > >>
>> > >> >  I think we just need the client to "subscribe" to a topic
>> notification
>> > >> for
>> > >> >  "<topic-name>-partition-[0-9]+" to eliminate the polling
>> > >>
>> > >> If pulsar users want to pub/sub a partitioned-topic, I think most of
>> the
>> > >> users would like to create a simple producer or consumer like
>> following:
>> > >> ```
>> > >> Producer<byte[]> producer =
>> client.newProducer().topic(topic).create();
>> > >> producer.sendAsync(msg);
>> > >> ```
>> > >> ```
>> > >> client.newConsumer()
>> > >>         .topic(topic)
>> > >>         .subscriptionName(subscription)
>> > >>         .subscribe();
>> > >> ```
>> > >> I think there is no reason for users to use `topicsPattern` if a
>> pulsar
>> > >> just wants to subscribe a partitioned-topic. In addition,
>> `topicsPattern`
>> > >> couldn't be used for producers.
>> > >>
>> > >> So I think PIP-145 [0] will benefit for regex subscriptions.  And
>> this
>> > >> PIP [1] will benefit for the common partitioned-topic pub/sub
>> scenario.
>> > >>
>> > >> [0] https://github.com/apache/pulsar/issues/14505
>> > >> [1] https://github.com/apache/pulsar/issues/19596
>> > >>
>> > >> Thanks
>> > >> Xiaoyu Hou
>> > >>
>> > >> Michael Marshall <mm...@apache.org> 于2023年2月25日周六 01:29写道:
>> > >>
>> > >>> > Just the way to implements partitioned-topic metadata
>> > >>> > notification mechanism is much like notifications on regex sub
>> changes
>> > >>>
>> > >>> Why do we need a separate notification implementation? The regex
>> > >>> subscription feature is about discovering topics (not subscriptions)
>> > >>> that match a regular expression. As Joe mentioned, I think we just
>> > >>> need the client to "subscribe" to a topic notification for
>> > >>> "<topic-name>-partition-[0-9]+" to eliminate the polling.
>> > >>>
>> > >>> Building on PIP 145, the work for this PIP would be in implementing
>> a
>> > >>> different `TopicsChangedListener` [1] so that the result of an added
>> > >>> topic is to add a producer/consumer to the new partition.
>> > >>>
>> > >>> I support removing polling in our streaming platform, but I'd prefer
>> > >>> to limit the number of notification systems we implement.
>> > >>>
>> > >>> Thanks,
>> > >>> Michael
>> > >>>
>> > >>> [0] https://github.com/apache/pulsar/pull/16062
>> > >>> [1]
>> > >>>
>> https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java#L169-L175
>> > >>>
>> > >>>
>> > >>>
>> > >>> On Fri, Feb 24, 2023 at 1:57 AM houxiaoyu <an...@gmail.com>
>> wrote:
>> > >>> >
>> > >>> > Hi Joe,
>> > >>> >
>> > >>> > When we use PartitionedProducerImpl or MultiTopicsConsumerImpl,
>> there
>> > >>> is a
>> > >>> > poll task to fetch the metadata of the partitioned-topic
>> regularly for
>> > >>> the
>> > >>> > number of partitions updated.  This PIP wants to use a
>> > >>> > notification mechanism to replace the metadata poll task.
>> > >>> >
>> > >>> > Just the way to implements partitioned-topic metadata
>> > >>> > notification mechanism is much like notifications on regex sub
>> changes
>> > >>> >
>> > >>> > Joe F <jo...@gmail.com> 于2023年2月24日周五 13:37写道:
>> > >>> >
>> > >>> > > Why is this needed when we have notifications on regex sub
>> changes?
>> > >>> Aren't
>> > >>> > > the partition names a well-defined regex?
>> > >>> > >
>> > >>> > > Joe
>> > >>> > >
>> > >>> > > On Thu, Feb 23, 2023 at 8:52 PM houxiaoyu <an...@gmail.com>
>> > >>> wrote:
>> > >>> > >
>> > >>> > > > Hi Asaf,
>> > >>> > > > thanks for your reminder.
>> > >>> > > >
>> > >>> > > > ## Changing
>> > >>> > > > I have updated the following changes to make sure the
>> notification
>> > >>> > > arrived
>> > >>> > > > successfully:
>> > >>> > > > 1. The watch success response
>> `CommandWatchPartitionUpdateSuccess`
>> > >>> will
>> > >>> > > > contain all the concerned topics of this watcher
>> > >>> > > > 2. The notification `CommandPartitionUpdate` will always
>> contain
>> > >>> all the
>> > >>> > > > concerned topics of this watcher.
>> > >>> > > > 3. The notification `CommandPartitionUpdate`contains a
>> > >>> monotonically
>> > >>> > > > increased version.
>> > >>> > > > 4. A map
>> > >>> `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
>> > >>> > > > Pair<version, long/*timestamp*/>>` will keep track of the
>> updating
>> > >>> > > > 5. A timer will check the updating timeout through
>> `inFlightUpdate`
>> > >>> > > > 6. The client acks `CommandPartitionUpdateResult` to broker
>> when it
>> > >>> > > > finishes updating.
>> > >>> > > >
>> > >>> > > > ## Details
>> > >>> > > >
>> > >>> > > > The following mechanism could make sure the newest
>> notification
>> > >>> arrived
>> > >>> > > > successfully, copying the description from GH:
>> > >>> > > >
>> > >>> > > > A new class, `org.apache.pulsar.PartitonUpdateWatcherService`
>> will
>> > >>> keep
>> > >>> > > > track of watchers and will listen to the changes in the
>> metadata.
>> > >>> > > Whenever
>> > >>> > > > a topic partition updates it checks if any watchers should be
>> > >>> notified
>> > >>> > > and
>> > >>> > > > sends an update for all topics the watcher concerns through
>> the
>> > >>> > > ServerCnx.
>> > >>> > > > Then we will record this request into a map,
>> > >>> > > > `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
>> > >>> > > Pair<version,
>> > >>> > > > long/*timestamp*/>>`.  A timer will check this update timeout
>> > >>> through
>> > >>> > > > inFlightUpdate .  We will query all the concerned topics's
>> > >>> partition if
>> > >>> > > > this watcher has sent an update timeout and will resend it.
>> > >>> > > >
>> > >>> > > > The client acks `CommandPartitionUpdateResult` to broker when
>> it
>> > >>> finishes
>> > >>> > > > updating.  The broker handle `CommandPartitionUpdateResult`
>> > >>> request:
>> > >>> > > >  - If CommandPartitionUpdateResult#version <
>> > >>> > > >
>> PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version,
>> > >>> > > broker
>> > >>> > > > ignores this ack.
>> > >>> > > >  -  If CommandPartitionUpdateResult#version ==
>> > >>> > > >
>> PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version
>> > >>> > > >     - If CommandPartitionUpdateResult#success is true,
>> broker just
>> > >>> > > removes
>> > >>> > > > the watcherID from inFlightUpdate.
>> > >>> > > >     - If CommandPartitionUpdateResult#success is false,
>> broker
>> > >>> removes
>> > >>> > > the
>> > >>> > > > watcherId from inFlightUpdate, and queries all the concerned
>> > >>> topics's
>> > >>> > > > partition and resend.
>> > >>> > > >  - If CommandPartitionUpdateResult#version >
>> > >>> > > >
>> > >>> PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version,
>> this
>> > >>> > > > should not happen.
>> > >>> > > >
>> > >>> > > >  ## Edge cases
>> > >>> > > > - Broker restarts or crashes
>> > >>> > > > Client will reconnect to another broker, broker responses
>> > >>> > > > `CommandWatchPartitionUpdateSuccess` with watcher concerned
>> > >>> topics's
>> > >>> > > > partitions.  We will call `PartitionsUpdateListener` if the
>> > >>> connection
>> > >>> > > > opens.
>> > >>> > > > - Client acks fail or timeout
>> > >>> > > > Broker will resend the watcher concerned topics's partitions
>> either
>> > >>> > > client
>> > >>> > > > acks fail or acks timeout.
>> > >>> > > > - Partition updates before client acks.
>> > >>> > > > `CommandPartitionUpdate#version` monotonically increases every
>> > >>> time it is
>> > >>> > > > updated. If Partition updates before client acks, a greater
>> > >>> version will
>> > >>> > > be
>> > >>> > > > put into `PartitonUpdateWatcherService#inFlightUpdate`.  The
>> > >>> previous
>> > >>> > > acks
>> > >>> > > > will be ignored because the version is less than the current
>> > >>> version.
>> > >>> > > >
>> > >>> > > >
>> > >>> > > > Asaf Mesika <as...@gmail.com> 于2023年2月22日周三 21:33写道:
>> > >>> > > >
>> > >>> > > > > How about edge cases?
>> > >>> > > > > In Andra's PIP he took into account cases where updates were
>> > >>> lost, so
>> > >>> > > he
>> > >>> > > > > created a secondary poll. Not saying it's the best
>> situation for
>> > >>> your
>> > >>> > > > case
>> > >>> > > > > of course.
>> > >>> > > > > I'm saying that when a broker sends an update
>> > >>> CommandPartitionUpdate,
>> > >>> > > how
>> > >>> > > > > do you know it arrived successfully? From my memory, there
>> is no
>> > >>> ACK in
>> > >>> > > > the
>> > >>> > > > > protocol, saying "I'm the client, I got the update
>> successfully"
>> > >>> and
>> > >>> > > only
>> > >>> > > > > then it removed the "dirty" flag for that topic, for this
>> > >>> watcher ID.
>> > >>> > > > >
>> > >>> > > > > Are there any other edge cases we can have? Let's be
>> exhaustive.
>> > >>> > > > >
>> > >>> > > > >
>> > >>> > > > >
>> > >>> > > > > On Wed, Feb 22, 2023 at 1:14 PM houxiaoyu <
>> anonhxygo@gmail.com>
>> > >>> wrote:
>> > >>> > > > >
>> > >>> > > > > > Thanks for your great suggestion Enrico.
>> > >>> > > > > >
>> > >>> > > > > > I agreed with you. It's more reasonable to add a
>> > >>> > > > > > `supports_partition_update_watchers`  in `FeatureFlags`
>> to
>> > >>> detect
>> > >>> > > that
>> > >>> > > > > the
>> > >>> > > > > > connected broker supporting this feature , and add a new
>> broker
>> > >>> > > > > > configuration property
>> `enableNotificationForPartitionUpdate`
>> > >>> with
>> > >>> > > > > default
>> > >>> > > > > > value true, which is much like PIP-145.
>> > >>> > > > > >
>> > >>> > > > > > I have updated the descriptions.
>> > >>> > > > > >
>> > >>> > > > > > Enrico Olivelli <eo...@gmail.com> 于2023年2月22日周三
>> 17:26写道:
>> > >>> > > > > >
>> > >>> > > > > > > I support this proposal.
>> > >>> > > > > > > Coping here my comments from GH:
>> > >>> > > > > > >
>> > >>> > > > > > > can't we enable this by default in case we detect that
>> the
>> > >>> > > connected
>> > >>> > > > > > > Broker supports it ?
>> > >>> > > > > > > I can't find any reason for not using this mechanism if
>> it is
>> > >>> > > > > available.
>> > >>> > > > > > >
>> > >>> > > > > > > Maybe we can set the default to "true" and allow users
>> to
>> > >>> disable
>> > >>> > > it
>> > >>> > > > > > > in case it impacts their systems in an unwanted way.
>> > >>> > > > > > >
>> > >>> > > > > > > Maybe It would be useful to have a way to disable the
>> > >>> mechanism on
>> > >>> > > > the
>> > >>> > > > > > > broker side as well
>> > >>> > > > > > >
>> > >>> > > > > > > Enrico
>> > >>> > > > > > >
>> > >>> > > > > > > Il giorno mer 22 feb 2023 alle ore 10:22 houxiaoyu
>> > >>> > > > > > > <an...@gmail.com> ha scritto:
>> > >>> > > > > > > >
>> > >>> > > > > > > > Hi Pulsar community:
>> > >>> > > > > > > >
>> > >>> > > > > > > > I opened a PIP to discuss "Notifications for
>> partitions
>> > >>> update"
>> > >>> > > > > > > >
>> > >>> > > > > > > > ### Motivation
>> > >>> > > > > > > >
>> > >>> > > > > > > > Pulsar client will poll brokers at fix time for
>> checking
>> > >>> the
>> > >>> > > > > partitions
>> > >>> > > > > > > > update if we publish/subscribe the partitioned topics
>> with
>> > >>> > > > > > > > `autoUpdatePartitions` as true. This causes
>> unnecessary
>> > >>> load for
>> > >>> > > > > both
>> > >>> > > > > > > > clients and brokers since most of the time the number
>> of
>> > >>> > > partitions
>> > >>> > > > > > will
>> > >>> > > > > > > > not change. In addition polling introduces latency in
>> > >>> partitions
>> > >>> > > > > update
>> > >>> > > > > > > >  which is specified by `autoUpdatePartitionsInterval`.
>> > >>> > > > > > > > This PIP would like to introduce a notification
>> mechanism
>> > >>> for
>> > >>> > > > > partition
>> > >>> > > > > > > > update, which is much like PIP-145 for regex
>> subscriptions
>> > >>> > > > > > > > https://github.com/apache/pulsar/issues/14505.
>> > >>> > > > > > > >
>> > >>> > > > > > > > For more details, please read the PIP at:
>> > >>> > > > > > > > https://github.com/apache/pulsar/issues/19596
>> > >>> > > > > > > > Looking forward to hearing your thoughts.
>> > >>> > > > > > > >
>> > >>> > > > > > > > Thanks,
>> > >>> > > > > > > > Xiaoyu Hou
>> > >>> > > > > > > > ----
>> > >>> > > > > > >
>> > >>> > > > > >
>> > >>> > > > >
>> > >>> > > >
>> > >>> > >
>> > >>>
>> > >>
>>
>

Re: [DISCUSS] PIP-247: Notifications for partitions update

Posted by houxiaoyu <an...@gmail.com>.
Hi Michael,

> is there a reason that we couldn't also use this to improve PIP 145?

The protocol described in this PIP could also be used to improve PIP-145.
However I think that it' not a good reason that we use  the regex sub
watcher to implement the partitioned update watcher because of the other
reasons we mentioned above.

> Since we know we're using a TCP connection, is it possible to rely on
> pulsar's keep alive timeout (the broker and the client each have their
> own) to close a connection that isn't responsive?

Maybe it could fail on application layer I think,  for example, the
partitioned update listener run fail unexceptionly.  Currently another task
will be scheduled if the poll task encounters error in partition auto
update timer task. [0]

> Regarding the connection, which connection should the client use to send
the watch requests?

The `PartitionUpdateWatcher` will call `connectionHandler.grabCnx()` to
open an connection, which is analogous to `TopicListWatcher`. [1]

> do we plan on using metadata storenotifications to trigger the callbacks
that trigger notifications sent
> to the clients

Yes, we will just look up the metadataStore to fetch the count of the
partitions and register a watcher to the metadataStore to trigger the count
update.

> One nit on the protobuf for CommandWatchPartitionUpdateSuccess:
>
>    repeated string topics         = 3;
>   repeated uint32 partitions     = 4;
>
> What do you think about using a repeated message that represents a
> pair of a topic and its partition count instead of using two lists?

Great. It looks better using a repeated message, I will update the protobuf.

> How will we handle the case where a watched topic does not exist?

1. When `PulsarClient` calls `create()` to create a producer or  calls
`subscribe()` to create a consumer,  the client will first get
partitioned-topic metadata from broker, [2]. If the topic doesn't exist and
`isAllowAutoTopicCreation=true` in broker, the partitioned-topic zk node
will auto create with default partition num.
2.  After the client getting partitioned-topic metadata successfully,  the
`PartitionedProducerImpl` will be create if `meta.partition >
0`.  `PartitionUpdateWatcher` will be initilized in
`PartitionedProducerImpl` constructor. The `PartitionUpdateWatcher` sends
command to broker to register a watcher. If any topic in the topicList
doesn't exist,  the broker will send error to the client and the
`PartitionedProducerImpl` will start fail.  `MultiTopicsConsumerImpl` will
work in the same way.

> I want to touch on authorization. A role should have "lookup"
> permission to watch for updates on each partitioned topic that it
> watches. As a result, if we allow for a request to watch multiple
> topics, some might succeed while others fail. How do we handle partial
> success?

If any topic in the topicList authorizes fail, the broker will send error
to the client. The following reasons support this action:
1. Before we sending command to register a partition update watcher, the
client should have send the `CommandPartitionedTopicMetadata` and should
have the `lookup` permission [3] [4].
2. Currently if any topic subsrbies fail the consumer wil start faiil. [5]


[0]
https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L1453-L1461

[1]
https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java#L67-L81

[2]
https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java#L365-L371

[3]
https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L903-L923

[4]
https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L558-L560

[5]
https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L171-L193

Thanks,
Xiaoyu Hou

Michael Marshall <mm...@apache.org> 于2023年3月7日周二 15:43写道:

> Thanks for the context Xiaoyu Hou and Asaf. I appreciate the
> efficiencies that we can gain by creating a specific implementation
> for the partitioned topic use case. I agree that this new notification
> system makes sense based on Pulsar's current features, and I have some
> implementation questions.
>
> >- If the broker sends notification and it's lost due network issues,
> > you'll only know about it due to the client doing constant polling, using
> > its hash to minimize response.
>
> I see that we implemented an ack mechanism to get around this. I
> haven't looked closely, but is there a reason that we couldn't also
> use this to improve PIP 145?
>
> Since we know we're using a TCP connection, is it possible to rely on
> pulsar's keep alive timeout (the broker and the client each have their
> own) to close a connection that isn't responsive? Then, when the
> connection is re-established, the client would get the latest topic
> partition count.
>
> Regarding the connection, which connection should the client use to
> send the watch requests? At the moment, the "parent" partitioned topic
> does not have an owner, but perhaps it would help this design to make
> a single owner for a given partitioned topic. This could trivially be
> done using the existing bundle mapping. Then, all watchers for a given
> partitioned topic would be hosted on the same broker, which should be
> more efficient. I don't think we currently redirect clients to any
> specific bundle when creating the metadata for a partitioned topic,
> but if we did, then we might be able to remove some edge cases for
> notification delivery because a single broker would update the
> metadata store and then trigger the notifications to the clients. If
> we don't use this implementation, do we plan on using metadata store
> notifications to trigger the callbacks that trigger notifications sent
> to the clients?
>
> > - Each time meta-update you'll need to run it through regular
> > expression, on all topics hosted on the broker, for any given client.
> > That's a lot of CPU.
> > - Suggested mechanism mainly cares about the count of partitions, so
> > it's a lot more efficient.
>
> I forgot the partition count was its own piece of metadata that the
> broker can watch for. That part definitely makes sense to me.
>
> One nit on the protobuf for CommandWatchPartitionUpdateSuccess:
>
>     repeated string topics         = 3;
>     repeated uint32 partitions     = 4;
>
> What do you think about using a repeated message that represents a
> pair of a topic and its partition count instead of using two lists?
>
> How will we handle the case where a watched topic does not exist?
>
> I want to touch on authorization. A role should have "lookup"
> permission to watch for updates on each partitioned topic that it
> watches. As a result, if we allow for a request to watch multiple
> topics, some might succeed while others fail. How do we handle partial
> success?
>
> One interesting detail is that this PIP is essentially aligned with
> notifying clients when topic metadata changes while PIP 145 was
> related to topic creation itself. An analogous proposal could request
> a notification for any topic that gets a new metadata label. I do not
> think it is worth considering that case in this design.
>
> Thanks,
> Michael
>
> [0] https://lists.apache.org/thread/t4cwht08d4mhp3qzoxmqh6tht8l0728r
>
> On Sun, Mar 5, 2023 at 8:01 PM houxiaoyu <an...@gmail.com> wrote:
> >
> > Bump. Are there other concerns or suggestions about this PIP :)  Ping @
> > Michael @Joe @Enrico
> >
> > Thanks
> > Xiaoyu Hou
> >
> > houxiaoyu <an...@gmail.com> 于2023年2月27日周一 14:10写道:
> >
> > > Hi Joe and Michael,
> > >
> > > I think I misunderstood what you replied before. Now I understand and
> > > explain it again.
> > >
> > > Besides the reasons what Asaf mentioned above, there are also some
> limits
> > > for using topic list watcher.  For example the `topicsPattern.pattern`
> must
> > > less that `maxSubscriptionPatternLeng` [0]. If the consumer subscribes
> > > multi partitioned-topics, the `topicsPattern.pattern` maybe very long.
> > >
> > > So I think that it's better to have a separate notification
> implementation
> > > for partition update.
> > >
> > > [0]
> > >
> https://github.com/apache/pulsar/blob/5d6932137d76d544f939bef27df25f61b4a4d00d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java#L115-L126
> > >
> > > Thanks,
> > > Xiaoyu Hou
> > >
> > > houxiaoyu <an...@gmail.com> 于2023年2月27日周一 10:56写道:
> > >
> > >> Hi Michael,
> > >>
> > >> >  I think we just need the client to "subscribe" to a topic
> notification
> > >> for
> > >> >  "<topic-name>-partition-[0-9]+" to eliminate the polling
> > >>
> > >> If pulsar users want to pub/sub a partitioned-topic, I think most of
> the
> > >> users would like to create a simple producer or consumer like
> following:
> > >> ```
> > >> Producer<byte[]> producer =
> client.newProducer().topic(topic).create();
> > >> producer.sendAsync(msg);
> > >> ```
> > >> ```
> > >> client.newConsumer()
> > >>         .topic(topic)
> > >>         .subscriptionName(subscription)
> > >>         .subscribe();
> > >> ```
> > >> I think there is no reason for users to use `topicsPattern` if a
> pulsar
> > >> just wants to subscribe a partitioned-topic. In addition,
> `topicsPattern`
> > >> couldn't be used for producers.
> > >>
> > >> So I think PIP-145 [0] will benefit for regex subscriptions.  And this
> > >> PIP [1] will benefit for the common partitioned-topic pub/sub
> scenario.
> > >>
> > >> [0] https://github.com/apache/pulsar/issues/14505
> > >> [1] https://github.com/apache/pulsar/issues/19596
> > >>
> > >> Thanks
> > >> Xiaoyu Hou
> > >>
> > >> Michael Marshall <mm...@apache.org> 于2023年2月25日周六 01:29写道:
> > >>
> > >>> > Just the way to implements partitioned-topic metadata
> > >>> > notification mechanism is much like notifications on regex sub
> changes
> > >>>
> > >>> Why do we need a separate notification implementation? The regex
> > >>> subscription feature is about discovering topics (not subscriptions)
> > >>> that match a regular expression. As Joe mentioned, I think we just
> > >>> need the client to "subscribe" to a topic notification for
> > >>> "<topic-name>-partition-[0-9]+" to eliminate the polling.
> > >>>
> > >>> Building on PIP 145, the work for this PIP would be in implementing a
> > >>> different `TopicsChangedListener` [1] so that the result of an added
> > >>> topic is to add a producer/consumer to the new partition.
> > >>>
> > >>> I support removing polling in our streaming platform, but I'd prefer
> > >>> to limit the number of notification systems we implement.
> > >>>
> > >>> Thanks,
> > >>> Michael
> > >>>
> > >>> [0] https://github.com/apache/pulsar/pull/16062
> > >>> [1]
> > >>>
> https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java#L169-L175
> > >>>
> > >>>
> > >>>
> > >>> On Fri, Feb 24, 2023 at 1:57 AM houxiaoyu <an...@gmail.com>
> wrote:
> > >>> >
> > >>> > Hi Joe,
> > >>> >
> > >>> > When we use PartitionedProducerImpl or MultiTopicsConsumerImpl,
> there
> > >>> is a
> > >>> > poll task to fetch the metadata of the partitioned-topic regularly
> for
> > >>> the
> > >>> > number of partitions updated.  This PIP wants to use a
> > >>> > notification mechanism to replace the metadata poll task.
> > >>> >
> > >>> > Just the way to implements partitioned-topic metadata
> > >>> > notification mechanism is much like notifications on regex sub
> changes
> > >>> >
> > >>> > Joe F <jo...@gmail.com> 于2023年2月24日周五 13:37写道:
> > >>> >
> > >>> > > Why is this needed when we have notifications on regex sub
> changes?
> > >>> Aren't
> > >>> > > the partition names a well-defined regex?
> > >>> > >
> > >>> > > Joe
> > >>> > >
> > >>> > > On Thu, Feb 23, 2023 at 8:52 PM houxiaoyu <an...@gmail.com>
> > >>> wrote:
> > >>> > >
> > >>> > > > Hi Asaf,
> > >>> > > > thanks for your reminder.
> > >>> > > >
> > >>> > > > ## Changing
> > >>> > > > I have updated the following changes to make sure the
> notification
> > >>> > > arrived
> > >>> > > > successfully:
> > >>> > > > 1. The watch success response
> `CommandWatchPartitionUpdateSuccess`
> > >>> will
> > >>> > > > contain all the concerned topics of this watcher
> > >>> > > > 2. The notification `CommandPartitionUpdate` will always
> contain
> > >>> all the
> > >>> > > > concerned topics of this watcher.
> > >>> > > > 3. The notification `CommandPartitionUpdate`contains a
> > >>> monotonically
> > >>> > > > increased version.
> > >>> > > > 4. A map
> > >>> `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
> > >>> > > > Pair<version, long/*timestamp*/>>` will keep track of the
> updating
> > >>> > > > 5. A timer will check the updating timeout through
> `inFlightUpdate`
> > >>> > > > 6. The client acks `CommandPartitionUpdateResult` to broker
> when it
> > >>> > > > finishes updating.
> > >>> > > >
> > >>> > > > ## Details
> > >>> > > >
> > >>> > > > The following mechanism could make sure the newest notification
> > >>> arrived
> > >>> > > > successfully, copying the description from GH:
> > >>> > > >
> > >>> > > > A new class, `org.apache.pulsar.PartitonUpdateWatcherService`
> will
> > >>> keep
> > >>> > > > track of watchers and will listen to the changes in the
> metadata.
> > >>> > > Whenever
> > >>> > > > a topic partition updates it checks if any watchers should be
> > >>> notified
> > >>> > > and
> > >>> > > > sends an update for all topics the watcher concerns through the
> > >>> > > ServerCnx.
> > >>> > > > Then we will record this request into a map,
> > >>> > > > `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
> > >>> > > Pair<version,
> > >>> > > > long/*timestamp*/>>`.  A timer will check this update timeout
> > >>> through
> > >>> > > > inFlightUpdate .  We will query all the concerned topics's
> > >>> partition if
> > >>> > > > this watcher has sent an update timeout and will resend it.
> > >>> > > >
> > >>> > > > The client acks `CommandPartitionUpdateResult` to broker when
> it
> > >>> finishes
> > >>> > > > updating.  The broker handle `CommandPartitionUpdateResult`
> > >>> request:
> > >>> > > >  - If CommandPartitionUpdateResult#version <
> > >>> > > >
> PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version,
> > >>> > > broker
> > >>> > > > ignores this ack.
> > >>> > > >  -  If CommandPartitionUpdateResult#version ==
> > >>> > > >
> PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version
> > >>> > > >     - If CommandPartitionUpdateResult#success is true,  broker
> just
> > >>> > > removes
> > >>> > > > the watcherID from inFlightUpdate.
> > >>> > > >     - If CommandPartitionUpdateResult#success is false,  broker
> > >>> removes
> > >>> > > the
> > >>> > > > watcherId from inFlightUpdate, and queries all the concerned
> > >>> topics's
> > >>> > > > partition and resend.
> > >>> > > >  - If CommandPartitionUpdateResult#version >
> > >>> > > >
> > >>> PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version,
> this
> > >>> > > > should not happen.
> > >>> > > >
> > >>> > > >  ## Edge cases
> > >>> > > > - Broker restarts or crashes
> > >>> > > > Client will reconnect to another broker, broker responses
> > >>> > > > `CommandWatchPartitionUpdateSuccess` with watcher concerned
> > >>> topics's
> > >>> > > > partitions.  We will call `PartitionsUpdateListener` if the
> > >>> connection
> > >>> > > > opens.
> > >>> > > > - Client acks fail or timeout
> > >>> > > > Broker will resend the watcher concerned topics's partitions
> either
> > >>> > > client
> > >>> > > > acks fail or acks timeout.
> > >>> > > > - Partition updates before client acks.
> > >>> > > > `CommandPartitionUpdate#version` monotonically increases every
> > >>> time it is
> > >>> > > > updated. If Partition updates before client acks, a greater
> > >>> version will
> > >>> > > be
> > >>> > > > put into `PartitonUpdateWatcherService#inFlightUpdate`.  The
> > >>> previous
> > >>> > > acks
> > >>> > > > will be ignored because the version is less than the current
> > >>> version.
> > >>> > > >
> > >>> > > >
> > >>> > > > Asaf Mesika <as...@gmail.com> 于2023年2月22日周三 21:33写道:
> > >>> > > >
> > >>> > > > > How about edge cases?
> > >>> > > > > In Andra's PIP he took into account cases where updates were
> > >>> lost, so
> > >>> > > he
> > >>> > > > > created a secondary poll. Not saying it's the best situation
> for
> > >>> your
> > >>> > > > case
> > >>> > > > > of course.
> > >>> > > > > I'm saying that when a broker sends an update
> > >>> CommandPartitionUpdate,
> > >>> > > how
> > >>> > > > > do you know it arrived successfully? From my memory, there
> is no
> > >>> ACK in
> > >>> > > > the
> > >>> > > > > protocol, saying "I'm the client, I got the update
> successfully"
> > >>> and
> > >>> > > only
> > >>> > > > > then it removed the "dirty" flag for that topic, for this
> > >>> watcher ID.
> > >>> > > > >
> > >>> > > > > Are there any other edge cases we can have? Let's be
> exhaustive.
> > >>> > > > >
> > >>> > > > >
> > >>> > > > >
> > >>> > > > > On Wed, Feb 22, 2023 at 1:14 PM houxiaoyu <
> anonhxygo@gmail.com>
> > >>> wrote:
> > >>> > > > >
> > >>> > > > > > Thanks for your great suggestion Enrico.
> > >>> > > > > >
> > >>> > > > > > I agreed with you. It's more reasonable to add a
> > >>> > > > > > `supports_partition_update_watchers`  in `FeatureFlags`  to
> > >>> detect
> > >>> > > that
> > >>> > > > > the
> > >>> > > > > > connected broker supporting this feature , and add a new
> broker
> > >>> > > > > > configuration property
> `enableNotificationForPartitionUpdate`
> > >>> with
> > >>> > > > > default
> > >>> > > > > > value true, which is much like PIP-145.
> > >>> > > > > >
> > >>> > > > > > I have updated the descriptions.
> > >>> > > > > >
> > >>> > > > > > Enrico Olivelli <eo...@gmail.com> 于2023年2月22日周三
> 17:26写道:
> > >>> > > > > >
> > >>> > > > > > > I support this proposal.
> > >>> > > > > > > Coping here my comments from GH:
> > >>> > > > > > >
> > >>> > > > > > > can't we enable this by default in case we detect that
> the
> > >>> > > connected
> > >>> > > > > > > Broker supports it ?
> > >>> > > > > > > I can't find any reason for not using this mechanism if
> it is
> > >>> > > > > available.
> > >>> > > > > > >
> > >>> > > > > > > Maybe we can set the default to "true" and allow users to
> > >>> disable
> > >>> > > it
> > >>> > > > > > > in case it impacts their systems in an unwanted way.
> > >>> > > > > > >
> > >>> > > > > > > Maybe It would be useful to have a way to disable the
> > >>> mechanism on
> > >>> > > > the
> > >>> > > > > > > broker side as well
> > >>> > > > > > >
> > >>> > > > > > > Enrico
> > >>> > > > > > >
> > >>> > > > > > > Il giorno mer 22 feb 2023 alle ore 10:22 houxiaoyu
> > >>> > > > > > > <an...@gmail.com> ha scritto:
> > >>> > > > > > > >
> > >>> > > > > > > > Hi Pulsar community:
> > >>> > > > > > > >
> > >>> > > > > > > > I opened a PIP to discuss "Notifications for partitions
> > >>> update"
> > >>> > > > > > > >
> > >>> > > > > > > > ### Motivation
> > >>> > > > > > > >
> > >>> > > > > > > > Pulsar client will poll brokers at fix time for
> checking
> > >>> the
> > >>> > > > > partitions
> > >>> > > > > > > > update if we publish/subscribe the partitioned topics
> with
> > >>> > > > > > > > `autoUpdatePartitions` as true. This causes unnecessary
> > >>> load for
> > >>> > > > > both
> > >>> > > > > > > > clients and brokers since most of the time the number
> of
> > >>> > > partitions
> > >>> > > > > > will
> > >>> > > > > > > > not change. In addition polling introduces latency in
> > >>> partitions
> > >>> > > > > update
> > >>> > > > > > > >  which is specified by `autoUpdatePartitionsInterval`.
> > >>> > > > > > > > This PIP would like to introduce a notification
> mechanism
> > >>> for
> > >>> > > > > partition
> > >>> > > > > > > > update, which is much like PIP-145 for regex
> subscriptions
> > >>> > > > > > > > https://github.com/apache/pulsar/issues/14505.
> > >>> > > > > > > >
> > >>> > > > > > > > For more details, please read the PIP at:
> > >>> > > > > > > > https://github.com/apache/pulsar/issues/19596
> > >>> > > > > > > > Looking forward to hearing your thoughts.
> > >>> > > > > > > >
> > >>> > > > > > > > Thanks,
> > >>> > > > > > > > Xiaoyu Hou
> > >>> > > > > > > > ----
> > >>> > > > > > >
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>>
> > >>
>

Re: [DISCUSS] PIP-247: Notifications for partitions update

Posted by Michael Marshall <mm...@apache.org>.
Thanks for the context Xiaoyu Hou and Asaf. I appreciate the
efficiencies that we can gain by creating a specific implementation
for the partitioned topic use case. I agree that this new notification
system makes sense based on Pulsar's current features, and I have some
implementation questions.

>- If the broker sends notification and it's lost due network issues,
> you'll only know about it due to the client doing constant polling, using
> its hash to minimize response.

I see that we implemented an ack mechanism to get around this. I
haven't looked closely, but is there a reason that we couldn't also
use this to improve PIP 145?

Since we know we're using a TCP connection, is it possible to rely on
pulsar's keep alive timeout (the broker and the client each have their
own) to close a connection that isn't responsive? Then, when the
connection is re-established, the client would get the latest topic
partition count.

Regarding the connection, which connection should the client use to
send the watch requests? At the moment, the "parent" partitioned topic
does not have an owner, but perhaps it would help this design to make
a single owner for a given partitioned topic. This could trivially be
done using the existing bundle mapping. Then, all watchers for a given
partitioned topic would be hosted on the same broker, which should be
more efficient. I don't think we currently redirect clients to any
specific bundle when creating the metadata for a partitioned topic,
but if we did, then we might be able to remove some edge cases for
notification delivery because a single broker would update the
metadata store and then trigger the notifications to the clients. If
we don't use this implementation, do we plan on using metadata store
notifications to trigger the callbacks that trigger notifications sent
to the clients?

> - Each time meta-update you'll need to run it through regular
> expression, on all topics hosted on the broker, for any given client.
> That's a lot of CPU.
> - Suggested mechanism mainly cares about the count of partitions, so
> it's a lot more efficient.

I forgot the partition count was its own piece of metadata that the
broker can watch for. That part definitely makes sense to me.

One nit on the protobuf for CommandWatchPartitionUpdateSuccess:

    repeated string topics         = 3;
    repeated uint32 partitions     = 4;

What do you think about using a repeated message that represents a
pair of a topic and its partition count instead of using two lists?

How will we handle the case where a watched topic does not exist?

I want to touch on authorization. A role should have "lookup"
permission to watch for updates on each partitioned topic that it
watches. As a result, if we allow for a request to watch multiple
topics, some might succeed while others fail. How do we handle partial
success?

One interesting detail is that this PIP is essentially aligned with
notifying clients when topic metadata changes while PIP 145 was
related to topic creation itself. An analogous proposal could request
a notification for any topic that gets a new metadata label. I do not
think it is worth considering that case in this design.

Thanks,
Michael

[0] https://lists.apache.org/thread/t4cwht08d4mhp3qzoxmqh6tht8l0728r

On Sun, Mar 5, 2023 at 8:01 PM houxiaoyu <an...@gmail.com> wrote:
>
> Bump. Are there other concerns or suggestions about this PIP :)  Ping @
> Michael @Joe @Enrico
>
> Thanks
> Xiaoyu Hou
>
> houxiaoyu <an...@gmail.com> 于2023年2月27日周一 14:10写道:
>
> > Hi Joe and Michael,
> >
> > I think I misunderstood what you replied before. Now I understand and
> > explain it again.
> >
> > Besides the reasons what Asaf mentioned above, there are also some limits
> > for using topic list watcher.  For example the `topicsPattern.pattern` must
> > less that `maxSubscriptionPatternLeng` [0]. If the consumer subscribes
> > multi partitioned-topics, the `topicsPattern.pattern` maybe very long.
> >
> > So I think that it's better to have a separate notification implementation
> > for partition update.
> >
> > [0]
> > https://github.com/apache/pulsar/blob/5d6932137d76d544f939bef27df25f61b4a4d00d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java#L115-L126
> >
> > Thanks,
> > Xiaoyu Hou
> >
> > houxiaoyu <an...@gmail.com> 于2023年2月27日周一 10:56写道:
> >
> >> Hi Michael,
> >>
> >> >  I think we just need the client to "subscribe" to a topic notification
> >> for
> >> >  "<topic-name>-partition-[0-9]+" to eliminate the polling
> >>
> >> If pulsar users want to pub/sub a partitioned-topic, I think most of the
> >> users would like to create a simple producer or consumer like following:
> >> ```
> >> Producer<byte[]> producer = client.newProducer().topic(topic).create();
> >> producer.sendAsync(msg);
> >> ```
> >> ```
> >> client.newConsumer()
> >>         .topic(topic)
> >>         .subscriptionName(subscription)
> >>         .subscribe();
> >> ```
> >> I think there is no reason for users to use `topicsPattern` if a pulsar
> >> just wants to subscribe a partitioned-topic. In addition, `topicsPattern`
> >> couldn't be used for producers.
> >>
> >> So I think PIP-145 [0] will benefit for regex subscriptions.  And this
> >> PIP [1] will benefit for the common partitioned-topic pub/sub scenario.
> >>
> >> [0] https://github.com/apache/pulsar/issues/14505
> >> [1] https://github.com/apache/pulsar/issues/19596
> >>
> >> Thanks
> >> Xiaoyu Hou
> >>
> >> Michael Marshall <mm...@apache.org> 于2023年2月25日周六 01:29写道:
> >>
> >>> > Just the way to implements partitioned-topic metadata
> >>> > notification mechanism is much like notifications on regex sub changes
> >>>
> >>> Why do we need a separate notification implementation? The regex
> >>> subscription feature is about discovering topics (not subscriptions)
> >>> that match a regular expression. As Joe mentioned, I think we just
> >>> need the client to "subscribe" to a topic notification for
> >>> "<topic-name>-partition-[0-9]+" to eliminate the polling.
> >>>
> >>> Building on PIP 145, the work for this PIP would be in implementing a
> >>> different `TopicsChangedListener` [1] so that the result of an added
> >>> topic is to add a producer/consumer to the new partition.
> >>>
> >>> I support removing polling in our streaming platform, but I'd prefer
> >>> to limit the number of notification systems we implement.
> >>>
> >>> Thanks,
> >>> Michael
> >>>
> >>> [0] https://github.com/apache/pulsar/pull/16062
> >>> [1]
> >>> https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java#L169-L175
> >>>
> >>>
> >>>
> >>> On Fri, Feb 24, 2023 at 1:57 AM houxiaoyu <an...@gmail.com> wrote:
> >>> >
> >>> > Hi Joe,
> >>> >
> >>> > When we use PartitionedProducerImpl or MultiTopicsConsumerImpl,  there
> >>> is a
> >>> > poll task to fetch the metadata of the partitioned-topic regularly for
> >>> the
> >>> > number of partitions updated.  This PIP wants to use a
> >>> > notification mechanism to replace the metadata poll task.
> >>> >
> >>> > Just the way to implements partitioned-topic metadata
> >>> > notification mechanism is much like notifications on regex sub changes
> >>> >
> >>> > Joe F <jo...@gmail.com> 于2023年2月24日周五 13:37写道:
> >>> >
> >>> > > Why is this needed when we have notifications on regex sub changes?
> >>> Aren't
> >>> > > the partition names a well-defined regex?
> >>> > >
> >>> > > Joe
> >>> > >
> >>> > > On Thu, Feb 23, 2023 at 8:52 PM houxiaoyu <an...@gmail.com>
> >>> wrote:
> >>> > >
> >>> > > > Hi Asaf,
> >>> > > > thanks for your reminder.
> >>> > > >
> >>> > > > ## Changing
> >>> > > > I have updated the following changes to make sure the notification
> >>> > > arrived
> >>> > > > successfully:
> >>> > > > 1. The watch success response `CommandWatchPartitionUpdateSuccess`
> >>> will
> >>> > > > contain all the concerned topics of this watcher
> >>> > > > 2. The notification `CommandPartitionUpdate` will always contain
> >>> all the
> >>> > > > concerned topics of this watcher.
> >>> > > > 3. The notification `CommandPartitionUpdate`contains a
> >>> monotonically
> >>> > > > increased version.
> >>> > > > 4. A map
> >>> `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
> >>> > > > Pair<version, long/*timestamp*/>>` will keep track of the updating
> >>> > > > 5. A timer will check the updating timeout through `inFlightUpdate`
> >>> > > > 6. The client acks `CommandPartitionUpdateResult` to broker when it
> >>> > > > finishes updating.
> >>> > > >
> >>> > > > ## Details
> >>> > > >
> >>> > > > The following mechanism could make sure the newest notification
> >>> arrived
> >>> > > > successfully, copying the description from GH:
> >>> > > >
> >>> > > > A new class, `org.apache.pulsar.PartitonUpdateWatcherService` will
> >>> keep
> >>> > > > track of watchers and will listen to the changes in the metadata.
> >>> > > Whenever
> >>> > > > a topic partition updates it checks if any watchers should be
> >>> notified
> >>> > > and
> >>> > > > sends an update for all topics the watcher concerns through the
> >>> > > ServerCnx.
> >>> > > > Then we will record this request into a map,
> >>> > > > `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
> >>> > > Pair<version,
> >>> > > > long/*timestamp*/>>`.  A timer will check this update timeout
> >>> through
> >>> > > > inFlightUpdate .  We will query all the concerned topics's
> >>> partition if
> >>> > > > this watcher has sent an update timeout and will resend it.
> >>> > > >
> >>> > > > The client acks `CommandPartitionUpdateResult` to broker when it
> >>> finishes
> >>> > > > updating.  The broker handle `CommandPartitionUpdateResult`
> >>> request:
> >>> > > >  - If CommandPartitionUpdateResult#version <
> >>> > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version,
> >>> > > broker
> >>> > > > ignores this ack.
> >>> > > >  -  If CommandPartitionUpdateResult#version ==
> >>> > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version
> >>> > > >     - If CommandPartitionUpdateResult#success is true,  broker just
> >>> > > removes
> >>> > > > the watcherID from inFlightUpdate.
> >>> > > >     - If CommandPartitionUpdateResult#success is false,  broker
> >>> removes
> >>> > > the
> >>> > > > watcherId from inFlightUpdate, and queries all the concerned
> >>> topics's
> >>> > > > partition and resend.
> >>> > > >  - If CommandPartitionUpdateResult#version >
> >>> > > >
> >>> PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version, this
> >>> > > > should not happen.
> >>> > > >
> >>> > > >  ## Edge cases
> >>> > > > - Broker restarts or crashes
> >>> > > > Client will reconnect to another broker, broker responses
> >>> > > > `CommandWatchPartitionUpdateSuccess` with watcher concerned
> >>> topics's
> >>> > > > partitions.  We will call `PartitionsUpdateListener` if the
> >>> connection
> >>> > > > opens.
> >>> > > > - Client acks fail or timeout
> >>> > > > Broker will resend the watcher concerned topics's partitions either
> >>> > > client
> >>> > > > acks fail or acks timeout.
> >>> > > > - Partition updates before client acks.
> >>> > > > `CommandPartitionUpdate#version` monotonically increases every
> >>> time it is
> >>> > > > updated. If Partition updates before client acks, a greater
> >>> version will
> >>> > > be
> >>> > > > put into `PartitonUpdateWatcherService#inFlightUpdate`.  The
> >>> previous
> >>> > > acks
> >>> > > > will be ignored because the version is less than the current
> >>> version.
> >>> > > >
> >>> > > >
> >>> > > > Asaf Mesika <as...@gmail.com> 于2023年2月22日周三 21:33写道:
> >>> > > >
> >>> > > > > How about edge cases?
> >>> > > > > In Andra's PIP he took into account cases where updates were
> >>> lost, so
> >>> > > he
> >>> > > > > created a secondary poll. Not saying it's the best situation for
> >>> your
> >>> > > > case
> >>> > > > > of course.
> >>> > > > > I'm saying that when a broker sends an update
> >>> CommandPartitionUpdate,
> >>> > > how
> >>> > > > > do you know it arrived successfully? From my memory, there is no
> >>> ACK in
> >>> > > > the
> >>> > > > > protocol, saying "I'm the client, I got the update successfully"
> >>> and
> >>> > > only
> >>> > > > > then it removed the "dirty" flag for that topic, for this
> >>> watcher ID.
> >>> > > > >
> >>> > > > > Are there any other edge cases we can have? Let's be exhaustive.
> >>> > > > >
> >>> > > > >
> >>> > > > >
> >>> > > > > On Wed, Feb 22, 2023 at 1:14 PM houxiaoyu <an...@gmail.com>
> >>> wrote:
> >>> > > > >
> >>> > > > > > Thanks for your great suggestion Enrico.
> >>> > > > > >
> >>> > > > > > I agreed with you. It's more reasonable to add a
> >>> > > > > > `supports_partition_update_watchers`  in `FeatureFlags`  to
> >>> detect
> >>> > > that
> >>> > > > > the
> >>> > > > > > connected broker supporting this feature , and add a new broker
> >>> > > > > > configuration property `enableNotificationForPartitionUpdate`
> >>> with
> >>> > > > > default
> >>> > > > > > value true, which is much like PIP-145.
> >>> > > > > >
> >>> > > > > > I have updated the descriptions.
> >>> > > > > >
> >>> > > > > > Enrico Olivelli <eo...@gmail.com> 于2023年2月22日周三 17:26写道:
> >>> > > > > >
> >>> > > > > > > I support this proposal.
> >>> > > > > > > Coping here my comments from GH:
> >>> > > > > > >
> >>> > > > > > > can't we enable this by default in case we detect that the
> >>> > > connected
> >>> > > > > > > Broker supports it ?
> >>> > > > > > > I can't find any reason for not using this mechanism if it is
> >>> > > > > available.
> >>> > > > > > >
> >>> > > > > > > Maybe we can set the default to "true" and allow users to
> >>> disable
> >>> > > it
> >>> > > > > > > in case it impacts their systems in an unwanted way.
> >>> > > > > > >
> >>> > > > > > > Maybe It would be useful to have a way to disable the
> >>> mechanism on
> >>> > > > the
> >>> > > > > > > broker side as well
> >>> > > > > > >
> >>> > > > > > > Enrico
> >>> > > > > > >
> >>> > > > > > > Il giorno mer 22 feb 2023 alle ore 10:22 houxiaoyu
> >>> > > > > > > <an...@gmail.com> ha scritto:
> >>> > > > > > > >
> >>> > > > > > > > Hi Pulsar community:
> >>> > > > > > > >
> >>> > > > > > > > I opened a PIP to discuss "Notifications for partitions
> >>> update"
> >>> > > > > > > >
> >>> > > > > > > > ### Motivation
> >>> > > > > > > >
> >>> > > > > > > > Pulsar client will poll brokers at fix time for checking
> >>> the
> >>> > > > > partitions
> >>> > > > > > > > update if we publish/subscribe the partitioned topics with
> >>> > > > > > > > `autoUpdatePartitions` as true. This causes unnecessary
> >>> load for
> >>> > > > > both
> >>> > > > > > > > clients and brokers since most of the time the number of
> >>> > > partitions
> >>> > > > > > will
> >>> > > > > > > > not change. In addition polling introduces latency in
> >>> partitions
> >>> > > > > update
> >>> > > > > > > >  which is specified by `autoUpdatePartitionsInterval`.
> >>> > > > > > > > This PIP would like to introduce a notification mechanism
> >>> for
> >>> > > > > partition
> >>> > > > > > > > update, which is much like PIP-145 for regex subscriptions
> >>> > > > > > > > https://github.com/apache/pulsar/issues/14505.
> >>> > > > > > > >
> >>> > > > > > > > For more details, please read the PIP at:
> >>> > > > > > > > https://github.com/apache/pulsar/issues/19596
> >>> > > > > > > > Looking forward to hearing your thoughts.
> >>> > > > > > > >
> >>> > > > > > > > Thanks,
> >>> > > > > > > > Xiaoyu Hou
> >>> > > > > > > > ----
> >>> > > > > > >
> >>> > > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>>
> >>