You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Heesung Sohn <he...@streamnative.io.INVALID> on 2022/10/18 23:54:27 UTC

[DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Hi pulsar-dev community,

I raised a pip to discuss : PIP-215: Configurable Topic Compaction Strategy

PIP link: https://github.com/apache/pulsar/issues/18099

Regards,
Heesung

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Hi,

The vote is open in this email thread.

https://lists.apache.org/thread/6y8407pw4fv21n2n0cbrvsspg5tok0h7

Regards,
Heesung

On Fri, Nov 4, 2022 at 3:07 PM Heesung Sohn <he...@streamnative.io>
wrote:

> Hi,
>
> > I would like to understand what the performance tradeoffs are for the
> changes that you (as an individual) and others are proposing.
> The problem that this PIP is trying to solve is how to support custom
> topic compaction logic. (currently, Pulsar only takes the latest message in
> the topic compaction and table-view(consumers))
>
> As Michael and I discussed in the above emails, in the worst case, when
> there are many conflicting messages, this PIP can incur more repeated
> custom compaction than the alternative as individual consumers need to
> compact messages (topic compaction and table views). However, one of the
> advantages of this proposal is that pub/sub is faster since it uses a
> single topic. For example, in PIP-192, if the "bundle assignment" broadcast
> is fast enough, conflicting bundle assignment requests can be
> reduced(broadcast filter effect).
>
>
> Post-Compaction(this PIP proposes)
>
>    - - Producers publish messages to a single topic.
>    - - All consumers individually run the custom compaction logic when
>    consuming the topic (by table-view).
>    - - Compactor needs to run the custom compaction logic during
>    compaction.
>
>
>
> The alternative that Michael proposed is instead compacting messages at
> the earlier stage by a single writer, using two topics.
> Pre-Compaction(the alternative that Michael proposes)
>
>    - - Producers publish messages to a non-compacted topic first.
>    - - Only the leader consumes this non-compacted topic and runs the
>    custom compaction logic.
>    - - Then, the leader publishes compacted messages to the compacted
>    topic(resolve conflicts by the single writer).
>    - - All consumers consume the compacted topic. (no need to compact the
>    messages separately on the consumer side)
>    - - Compactor does not need to run the custom compaction logic during
>    compaction.
>
>
>
> > It really seems that you are proposing a change to the default behavior
> whether or not a user chooses to use the interface in PIP-192.
> The pip does not change the default behavior of compaction and table-view.
> I updated the goals in the PIP to clarify this.
>
> Thanks,
> Heesung
>
>
> On Fri, Nov 4, 2022 at 11:11 AM Dave Fisher <wa...@apache.org> wrote:
>
>>
>>
>> > On Nov 4, 2022, at 10:28 AM, Heesung Sohn <he...@streamnative.io.INVALID>
>> wrote:
>> >
>> > Hi,
>> >
>> > I think `bool shouldKeepLeft(T prev, T cur)` is clearer. I updated the
>> PIP.
>> >
>> > Hopefully, we provided enough context about this PIP and the design
>> > trade-off as well.
>>
>> I would like to understand what the performance tradeoffs are for the
>> changes that you (as an individual) and others are proposing.
>>
>> > Goal
>> >
>> >       • Create another Topic compactor, StrategicTwoPhaseCompactor,
>> where we can configure a compaction strategy,
>> > TopicCompactionStrategy
>> >
>> >       • Update the TableViewConfigurationData to load and consider the
>> TopicCompactionStrategy when updating the internal key-value map in
>> TableView.
>> >
>> >       • Add TopicCompactionStrategy in Topic-level Policy to run
>> StrategicTwoPhaseCompactor instead of TwoPhaseCompactor when executing
>> compaction.
>>
>> It really seems that you are proposing a change to the default behavior
>> whether or not a user chooses to use the interface in PIP-192.
>>
>> >
>> > I will send out a vote email soon.
>> >
>> > Thank you,
>> > Heesung
>> >
>> >
>> >
>> >
>> >
>> >
>> > On Thu, Nov 3, 2022 at 9:59 PM Michael Marshall <mm...@apache.org>
>> > wrote:
>> >
>> >> Thank you for your detailed responses, Heesung.
>> >>
>> >>> We are not planning to expose this feature to users
>> >>> soon unless demanded and proven to be stable.
>> >>
>> >> In that case, I think we should move forward with this PIP. I have a
>> >> different opinion about the trade offs for the two designs, but none
>> >> of my concerns are problems that could not be solved later if we
>> >> encounter problems.
>> >>
>> >> Just to say it succinctly, my concern is that broadcasting all
>> >> attempts to acquire ownership of every unclaimed bundle to all brokers
>> >> will generate a lot of unnecessary traffic.
>> >>
>> >>>
>> >>
>> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a
>> >>
>> >> Thank you for this reference. I missed it. That is great documentation!
>> >>
>> >>> In fact, with the `compact(T prev, T cur)` api only, it is not clear
>> if
>> >>> prev => cur is a valid transition or not(if invalid, we should filter
>> out
>> >>> the cur message instead of further compacting/merging). I think we
>> still
>> >>> need to keep the `isValid()` and `merge()` separated.
>> >>
>> >> I was thinking that the result of `compact` would be the result put in
>> >> the table view or written to the compacted topic. The one issue might
>> >> be about keeping the memory utilization down for use cases that are
>> >> not updating the message's value but are only selecting "left" or
>> >> "right". I thought we could infer when to keep the message id vs keep
>> >> the message value, but that might be easy to implement.
>> >>
>> >> My final critique is that I think `isValid` could have a better name.
>> >> In the event this does become a public API, I don't think all use
>> >> cases will think about which event should be persisted in terms of
>> >> validity.
>> >>
>> >> The main name that comes to my mind is `bool shouldKeepLeft(T prev, T
>> >> cur)`. When true, prev wins. When false, cur wins. That nomenclature
>> >> comes from Akka Streams. It's not perfect, but it is easy to infer
>> >> what the result will do.
>> >>
>> >>> Regarding redundant deserialization, the input type `T` is the type of
>> >>> message value, so the input values are already deserialized.
>> >>
>> >> Great, I should have realized that. That takes care of that concern.
>> >>
>> >> Thanks,
>> >> Michael
>> >>
>> >> On Thu, Nov 3, 2022 at 7:37 PM Heesung Sohn
>> >> <he...@streamnative.io.invalid> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> I have a different thought about my previous comment.
>> >>>
>> >>> - Agreed with your point that we should merge CompactionStrategy
>> APIs. I
>> >>> updated the interface proposal in the PIP. I replaced `"isValid",
>> >>> "isMergeEnabled", and "merge"` apis with "compact" api.
>> >>>
>> >>> boolean isValid(T prev, T cur)
>> >>> boolean isMergeEnabled()
>> >>> T merge(T prev, T cur)
>> >>>
>> >>> =>
>> >>>
>> >>> T compact(T prev, T cur)
>> >>>
>> >>> In fact, with the `compact(T prev, T cur)` api only, it is not clear
>> if
>> >>> prev => cur is a valid transition or not(if invalid, we should filter
>> out
>> >>> the cur message instead of further compacting/merging). I think we
>> still
>> >>> need to keep the `isValid()` and `merge()` separated.
>> >>>
>> >>> Regarding redundant deserialization, the input type `T` is the type of
>> >>> message value, so the input values are already deserialized. We don't
>> >> want
>> >>> to expose the Message<T> interface in this CompactionStrategy to avoid
>> >>> message serialization/deserialization dependencies in the
>> >>> CompactionStrategy.
>> >>>
>> >>> The `merge()` functionality is suggested for more complex use cases
>> >> (merge
>> >>> values instead of just filtering), and to support this `merge()`, we
>> need
>> >>> to internally create a new msg with the compacted value, metadata, and
>> >>> messageId copies. We could initially define `isValid()` only in
>> >>> CompactionStrategy, and add `isMergeEnabled() and merge()` later in
>> the
>> >>> CompactionStrategy interface if requested.
>> >>>
>> >>> Regards,
>> >>> Heesung
>> >>>
>> >>>
>> >>> On Thu, Nov 3, 2022 at 9:50 AM Heesung Sohn <
>> >> heesung.sohn@streamnative.io>
>> >>> wrote:
>> >>>
>> >>>> Oops! Michael, I apologize for the typo in your name.
>> >>>>
>> >>>> On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <
>> >> heesung.sohn@streamnative.io>
>> >>>> wrote:
>> >>>>
>> >>>>> Hi Machel,
>> >>>>>
>> >>>>> Here are my additional comments regarding your earlier email.
>> >>>>>
>> >>>>> - I updated the PIP title to show that this will impact table view
>> as
>> >>>>> well.
>> >>>>>
>> >>>>> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the
>> >>>>> general idea of the states and their actions, and I defined the
>> actual
>> >>>>> states here in the PR,
>> >>>>>
>> >>
>> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a
>> .
>> >> I
>> >>>>> will further clarify the bundle state data validation logic when
>> >>>>> introducing `BundleStateCompactionStrategy` class. This PIP is to
>> >> support
>> >>>>> CompactionStrategy in general.
>> >>>>>
>> >>>>> - Agreed with your point that we should merge CompactionStrategy
>> >> APIs. I
>> >>>>> updated the interface proposal in the PIP. I replaced `"isValid",
>> >>>>> "isMergeEnabled", and "merge"` apis with "compact" api.
>> >>>>>
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Heesung
>> >>>>>
>> >>>>>
>> >>>>> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <
>> >>>>> heesung.sohn@streamnative.io> wrote:
>> >>>>>
>> >>>>>> Hi,
>> >>>>>> Thank you for the great comments.
>> >>>>>> Please find my comments inline too.
>> >>>>>>
>> >>>>>> Regards,
>> >>>>>> Heesung
>> >>>>>>
>> >>>>>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <
>> >> mmarshall@apache.org>
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>>>> I think we lose a single linearized view.
>> >>>>>>>
>> >>>>>>> Which linearized view are we losing, and what is the role of that
>> >>>>>>> linearized view? I think I might be missing why it is important. I
>> >>>>>>> agree that consumers won't know about each unsuccessful attempted
>> >>>>>>> acquisition of a bundle, but that seems like unnecessary
>> information
>> >>>>>>> to broadcast to every broker in the cluster.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>> PIP-192 proposes an assignment, transfer, and split
>> >> protocol(multi-phase
>> >>>>>> state changes), relying on early broadcast across brokers, and all
>> >> brokers
>> >>>>>> react to their clients according to the state change notifications
>> --
>> >>>>>> brokers could defer any client lookups for bundle x if an
>> >>>>>> assignment/transfer/split is ongoing for x(broadcasted early in the
>> >> topic).
>> >>>>>> One early broadcast example is the one that I discussed above,
>> `When
>> >> the
>> >>>>>> topic broadcast is faster than the concurrent assignment requests.`
>> >> I think
>> >>>>>> the prefilter could delay this early broadcast, as it needs to go
>> >> through
>> >>>>>> the additional single-leader compaction path.
>> >>>>>>
>> >>>>>> The bundle state recovery process is simpler by a single linearized
>> >> view.
>> >>>>>>
>> >>>>>> The single linearized view can be easier to debug bundle states. We
>> >> can
>> >>>>>> more easily track where the assignment requests come from and how
>> it
>> >> is
>> >>>>>> compacted in a single linearized view.
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>>> I think the leader requires a write-through cache to compact
>> >> messages
>> >>>>>>> based
>> >>>>>>>> on the latest states.
>> >>>>>>>
>> >>>>>> This brings up an important point that I would like to clarify. If
>> we
>> >>>>>>> trust the write ahead log as the source of truth, what happens
>> when
>> >> a
>> >>>>>>> bundle has been validly owned by multiple brokers? As a broker
>> >> starts
>> >>>>>>> and consumes from the compacted topic, how do we prevent it from
>> >>>>>>> incorrectly thinking that it owns a bundle for some short time
>> >> period
>> >>>>>>> in the case that the ownership topic hasn't yet been compacted to
>> >>>>>>> remove old ownership state?
>> >>>>>>>
>> >>>>>>
>> >>>>>> Since the multi-phase transfer protocol involves the source and
>> >>>>>> destination broker's actions, the successful transfer should get
>> the
>> >> source
>> >>>>>> and destination broker to have the (near) latest state. For
>> example,
>> >> if
>> >>>>>> some brokers have old ownership states(network partitioned or
>> >> delayed),
>> >>>>>> they will redirect clients to the source(old) broker. However, by
>> the
>> >>>>>> transfer protocol, the source broker should have the latest state,
>> >> so it
>> >>>>>> can redirect the client again to the destination broker.
>> >>>>>>
>> >>>>>> When a broker restarts, it won't start until its BSC state to the
>> >> (near)
>> >>>>>> latest (til the last known messageId at that time).
>> >>>>>>
>> >>>>>>
>> >>>>>>>> Pulsar guarantees "a single writer".
>> >>>>>>>
>> >>>>>>> I didn't think we were using a single writer in the PIP 192
>> design.
>> >> I
>> >>>>>>> thought we had many producers sending events to a compacted topic.
>> >> My
>> >>>>>>> proposal would still have many producers, but the writer to
>> >> bookkeeper
>> >>>>>>> would act as the single writer. It would technically be distinct
>> >> from
>> >>>>>>> a normal Pulsar topic producer.
>> >>>>>>>
>> >>>>>>> I should highlight that I am only proposing "broker filtering
>> before
>> >>>>>>> write" in the context of PIP 192 and as an alternative to adding
>> >>>>>>> pluggable compaction strategies. It would not be a generic
>> feature.
>> >>>>>>>
>> >>>>>>>
>> >>>>>> I was worried about the worst case where two producers(leaders)
>> >> happen
>> >>>>>> to write the compacted topic (although Pulsar can guarantee "a
>> single
>> >>>>>> writer" or "a single producer" for a topic in normal situations).
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>>> Could we clarify how to handle
>> >>>>>>>> the following(edge cases and failure recovery)?
>> >>>>>>>> 0. Is the un-compacted topic a persistent topic or a
>> >> non-persistent
>> >>>>>>> topic?
>> >>>>>>>
>> >>>>>>> It is a persistent topic.
>> >>>>>>>
>> >>>>>>>> 1. How does the leader recover state from the two topics?
>> >>>>>>>
>> >>>>>>> A leader would recover state by first consuming the whole
>> compacted
>> >>>>>>> topic and then by consuming from the current location of a cursor
>> on
>> >>>>>>> the first input topic. As stated elsewhere, this introduces
>> latency
>> >>>>>>> and could be an issue.
>> >>>>>>>
>> >>>>>>>> 2. How do we handle the case when the leader fails before writing
>> >>>>>>> messages
>> >>>>>>>> to the compacted topic
>> >>>>>>>
>> >>>>>>> The leader would not acknowledge the message on the input topic
>> >> until
>> >>>>>>> it has successfully persisted the event on the compacted topic.
>> >>>>>>> Publishing the same event to a compacted topic multiple times is
>> >>>>>>> idempotent, so there is no risk of lost state. The real risk is
>> >>>>>>> latency. However, I think we might have similar (though not the
>> >> same)
>> >>>>>>> latency risks in the current solution.
>> >>>>>>>
>> >>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the
>> >> number
>> >>>>>>> of
>> >>>>>>>> messages to broadcast at the expense of the leader broker
>> >> compaction.
>> >>>>>>>
>> >>>>>>> My primary point is that with this PIP's design, the filter logic
>> is
>> >>>>>>> run on every broker and again during topic compaction. With the
>> >>>>>>> alternative design, the filter is run once.
>> >>>>>>>
>> >>>>>>> Thank you for the clarification.
>> >>>>>>
>> >>>>>> I think the difference is that the post-filter is an optimistic
>> >> approach
>> >>>>>> as it optimistically relies on the "broadcast-filter"
>> effect(brokers
>> >> will
>> >>>>>> defer client lookups if notified ahead that any assignment is
>> >> ongoing for
>> >>>>>> bundle x). Yes, in the worst case, if the broadcast is slower, each
>> >> broker
>> >>>>>> needs to individually compact the conflicting assignment requests.
>> >>>>>>
>> >>>>>> Conversely, one downside of the pessimistic approach (single leader
>> >>>>>> pre-filter) is that when there are not many conflict concurrent
>> >> assignment
>> >>>>>> requests(assign for bundle a, assign for bundle b, assign for
>> bundle
>> >> c...),
>> >>>>>> the requests need to redundantly go through the leader compaction.
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>>> 3. initially less complex to implement (leaderless conflict
>> >>>>>>> resolution and
>> >>>>>>>> requires a single topic)
>> >>>>>>>
>> >>>>>>> PIP 215 has its own complexity too. Coordinating filters
>> >>>>>>> on both the client (table view) and the server (compaction) is non
>> >>>>>>> trivial. The proposed API includes hard coded client configuration
>> >> for
>> >>>>>>> each component, which will make upgrading the version of the
>> >>>>>>> compaction strategy complicated, and could lead to incorrect
>> >>>>>>> interpretation of events in the stream. When a single broker is
>> >> doing
>> >>>>>>> the filtering, versioning is no longer a distributed problem. That
>> >>>>>>> being said, I do not mean to suggest my solution is without
>> >>>>>>> complexity.
>> >>>>>>>
>> >>>>>>>> 4. it is not a "one-way door" decision (we could add the
>> >> pre-filter
>> >>>>>>> logic
>> >>>>>>>> as well later)
>> >>>>>>>
>> >>>>>>> It's fair to say that we could add it later, but at that point, we
>> >>>>>>> will have added this new API for compaction strategy. Are we
>> >> confident
>> >>>>>>> that pluggable compaction is independently an important addition
>> to
>> >>>>>>> Pulsar's
>> >>>>>>> features, or would it make sense to make this API only exposed in
>> >> the
>> >>>>>>> broker?
>> >>>>>>>
>> >>>>>>>
>> >>>>>> The intention is that this compaction feature could be useful for
>> >>>>>> complex user applications (if they are trying to do a similar
>> >> thing). As I
>> >>>>>> mentioned, this feature is closely tied to the PIP-192 now. We are
>> >> not
>> >>>>>> planning to expose this feature to users soon unless demanded and
>> >> proven to
>> >>>>>> be stable.
>> >>>>>>
>> >>>>>>
>> >>>>>>> Thanks,
>> >>>>>>> Michael
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
>> >>>>>>> <he...@streamnative.io.invalid> wrote:
>> >>>>>>>>
>> >>>>>>>> Hi,
>> >>>>>>>>
>> >>>>>>>> Also, I thought about some concurrent assignment scenarios
>> between
>> >>>>>>>> pre-filter vs post-filter.
>> >>>>>>>>
>> >>>>>>>> Example 1: When the topic broadcast is slower than the concurrent
>> >>>>>>>> assignment requests
>> >>>>>>>>
>> >>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics)
>> >>>>>>>> t1: A -> non-compacted topic // broker A published a message to
>> >> the
>> >>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A}
>> >>>>>>>> t2: B -> non-compacted topic // broker B published a message, m2:
>> >>>>>>> {broker B
>> >>>>>>>> assigned bundle x to broker C}
>> >>>>>>>> t3: C -> non-compacted topic // broker C published a message, m3:
>> >>>>>>> {broker C
>> >>>>>>>> assigned bundle x to broker B}
>> >>>>>>>> t4: non-compacted topic -> L // leader broker consumed the
>> >> messages:
>> >>>>>>> m1,m2,
>> >>>>>>>> and m3
>> >>>>>>>> t5: L -> compacted topic // leader compacted the messages and
>> >>>>>>> broadcasted
>> >>>>>>>> m1 to all consumers
>> >>>>>>>> t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>> >>>>>>>>
>> >>>>>>>> With post-filter + a single topic
>> >>>>>>>> t1: A -> topic // broker A published a message to the
>> >> non-compacted
>> >>>>>>> topic,
>> >>>>>>>> m1: {broker A assigned bundle x to broker A}
>> >>>>>>>> t2: B -> topic // broker B published a message, m2: {broker B
>> >> assigned
>> >>>>>>>> bundle x to broker C}
>> >>>>>>>> t3: C -> topic // broker C published a message, m3: {broker C
>> >> assigned
>> >>>>>>>> bundle x to broker B}
>> >>>>>>>> t4: topic -> [A,B,C] // broker A,B,C consumed the messages:
>> m1,m2,
>> >>>>>>> and m3
>> >>>>>>>> t5: [A,B,C] -> m1 // broker A,B,C individually compacted the
>> >> messages
>> >>>>>>> to m1.
>> >>>>>>>>
>> >>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the
>> >> number
>> >>>>>>> of
>> >>>>>>>> messages to broadcast at the expense of the leader broker
>> >> compaction.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Example 2: When the topic broadcast is faster than the concurrent
>> >>>>>>>> assignment requests
>> >>>>>>>>
>> >>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics)
>> >>>>>>>> t1: A -> non-compacted topic // broker A published a message to
>> >> the
>> >>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A}
>> >>>>>>>> t2: non-compacted topic -> L // leader broker consumed the
>> >> messages:
>> >>>>>>> m1
>> >>>>>>>> t3: L -> compacted topic // leader compacted the message and
>> >>>>>>> broadcasted m1
>> >>>>>>>> to all consumers
>> >>>>>>>> t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>> >>>>>>>> t5: A-> own bundle // broker A knows that its assignment has been
>> >>>>>>> accepted,
>> >>>>>>>> so proceeding to own the bundle (meanwhile deferring lookup
>> >> requests)
>> >>>>>>>> t6: B -> defer client lookups // broker B knows that bundle
>> >>>>>>> assignment is
>> >>>>>>>> running(meanwhile deferring lookup requests)
>> >>>>>>>> t7: C -> defer client lookups // broker C knows that bundle
>> >>>>>>> assignment is
>> >>>>>>>> running(meanwhile deferring lookup requests)
>> >>>>>>>>
>> >>>>>>>> With post-filter + a single topic
>> >>>>>>>> t1: A -> topic // broker A published a message to the
>> >> non-compacted
>> >>>>>>> topic,
>> >>>>>>>> m1: {broker A assigned bundle x to broker A}
>> >>>>>>>> t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
>> >>>>>>>> t3:  A-> own bundle // broker A knows that its assignment has
>> been
>> >>>>>>>> accepted, so proceeding to own the bundle (meanwhile deferring
>> >> lookup
>> >>>>>>>> requests)
>> >>>>>>>> t4: B -> defer client lookups // broker B knows that bundle
>> >>>>>>> assignment is
>> >>>>>>>> running(meanwhile deferring lookup requests)
>> >>>>>>>> t5: C -> defer client lookups // broker C knows that bundle
>> >>>>>>> assignment is
>> >>>>>>>> running(meanwhile deferring lookup requests)
>> >>>>>>>>
>> >>>>>>>> Analysis: The "post-filter + a single topic" can perform ok in
>> >> this
>> >>>>>>> case
>> >>>>>>>> without the additional leader coordination and the secondary
>> topic
>> >>>>>>> because
>> >>>>>>>> the early broadcast can inform all brokers and prevent them from
>> >>>>>>> requesting
>> >>>>>>>> other assignments for the same bundle.
>> >>>>>>>>
>> >>>>>>>> I think the post-filter option is initially not bad because:
>> >>>>>>>>
>> >>>>>>>> 1. it is safe in the worst case (in case the messages are not
>> >>>>>>> correctly
>> >>>>>>>> pre-filtered at the leader)
>> >>>>>>>> 2. it performs ok because the early broadcast can prevent
>> >>>>>>>> concurrent assignment requests.
>> >>>>>>>> 3. initially less complex to implement (leaderless conflict
>> >>>>>>> resolution and
>> >>>>>>>> requires a single topic)
>> >>>>>>>> 4. it is not a "one-way door" decision (we could add the
>> >> pre-filter
>> >>>>>>> logic
>> >>>>>>>> as well later)
>> >>>>>>>>
>> >>>>>>>> Regards,
>> >>>>>>>> Heesung
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
>> >>>>>>> heesung.sohn@streamnative.io>
>> >>>>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>>> Hi Michael,
>> >>>>>>>>>
>> >>>>>>>>> For the pre-prefilter(pre-compaction) option,
>> >>>>>>>>> I think the leader requires a write-through cache to compact
>> >>>>>>> messages
>> >>>>>>>>> based on the latest states. Otherwise, the leader needs to wait
>> >> for
>> >>>>>>> the
>> >>>>>>>>> last msg from the (compacted) topic before compacting the next
>> >> msg
>> >>>>>>> for the
>> >>>>>>>>> same bundle.
>> >>>>>>>>>
>> >>>>>>>>> Pulsar guarantees "a single writer". However, for the worst-case
>> >>>>>>>>> scenario(due to network partitions, bugs in zk or etcd leader
>> >>>>>>> election,
>> >>>>>>>>> bugs in bk, data corruption ), I think it is safe to place the
>> >>>>>>> post-filter
>> >>>>>>>>> on the consumer side(compaction and table views) as well in
>> >> order to
>> >>>>>>>>> validate the state changes.
>> >>>>>>>>>
>> >>>>>>>>> For the two-topic approach,
>> >>>>>>>>> I think we lose a single linearized view. Could we clarify how
>> >> to
>> >>>>>>> handle
>> >>>>>>>>> the following(edge cases and failure recovery)?
>> >>>>>>>>> 0. Is the un-compacted topic a persistent topic or a
>> >> non-persistent
>> >>>>>>> topic?
>> >>>>>>>>> 1. How does the leader recover state from the two topics?
>> >>>>>>>>> 2. How do we handle the case when the leader fails before
>> >> writing
>> >>>>>>> messages
>> >>>>>>>>> to the compacted topic
>> >>>>>>>>>
>> >>>>>>>>> Regards,
>> >>>>>>>>> Heesung
>> >>>>>>>>>
>> >>>>>>>>> On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
>> >>>>>>> mmarshall@apache.org>
>> >>>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> Sharing some more thoughts. We could alternatively use two
>> >> topics
>> >>>>>>>>>> instead of one. In this design, the first topic is the
>> >> unfiltered
>> >>>>>>>>>> write ahead log that represents many writers (brokers) trying
>> >> to
>> >>>>>>>>>> acquire ownership of bundles. The second topic is the
>> >> distilled log
>> >>>>>>>>>> that represents the "winners" or the "owners" of the bundles.
>> >>>>>>> There is
>> >>>>>>>>>> a single writer, the leader broker, that reads from the input
>> >> topic
>> >>>>>>>>>> and writes to the output topic. The first topic is normal and
>> >> the
>> >>>>>>>>>> second is compacted.
>> >>>>>>>>>>
>> >>>>>>>>>> The primary benefit in a two topic solution is that it is easy
>> >> for
>> >>>>>>> the
>> >>>>>>>>>> leader broker to trade off ownership without needing to slow
>> >> down
>> >>>>>>>>>> writes to the input topic. The leader broker will start
>> >> consuming
>> >>>>>>> from
>> >>>>>>>>>> the input topic when it has fully consumed the table view on
>> >> the
>> >>>>>>>>>> output topic. In general, I don't think consumers know when
>> >> they
>> >>>>>>> have
>> >>>>>>>>>> "reached the end of a table view", but we should be able to
>> >>>>>>> trivially
>> >>>>>>>>>> figure this out if we are the topic's only writer and the
>> >> topic and
>> >>>>>>>>>> writer are collocated on the same broker.
>> >>>>>>>>>>
>> >>>>>>>>>> In that design, it might make sense to use something like the
>> >>>>>>>>>> replication cursor to keep track of this consumer's state.
>> >>>>>>>>>>
>> >>>>>>>>>> - Michael
>> >>>>>>>>>>
>> >>>>>>>>>> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
>> >>>>>>> mmarshall@apache.org>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>> Thanks for your proposal, Heesung.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Fundamentally, we have the problems listed in this PIP
>> >> because
>> >>>>>>> we have
>> >>>>>>>>>>> multiple writers instead of just one writer. Can we solve
>> >> this
>> >>>>>>> problem
>> >>>>>>>>>>> by changing our write pattern? What if we use the leader
>> >> broker
>> >>>>>>> as the
>> >>>>>>>>>>> single writer? That broker would intercept attempts to
>> >> acquire
>> >>>>>>>>>>> ownership on bundles and would grant ownership to the first
>> >>>>>>> broker to
>> >>>>>>>>>>> claim an unassigned bundle. It could "grant ownership" by
>> >>>>>>> letting the
>> >>>>>>>>>>> first write to claim an unassigned bundle get written to the
>> >>>>>>> ownership
>> >>>>>>>>>>> topic. When a bundle is already owned, the leader won't
>> >> persist
>> >>>>>>> that
>> >>>>>>>>>>> event to the bookkeeper. In this design, the log becomes a
>> >> true
>> >>>>>>>>>>> ownership log, which will correctly work with the existing
>> >> topic
>> >>>>>>>>>>> compaction and table view solutions. My proposal essentially
>> >>>>>>> moves the
>> >>>>>>>>>>> conflict resolution to just before the write, and as a
>> >>>>>>> consequence, it
>> >>>>>>>>>>> greatly reduces the need for post processing of the event
>> >> log.
>> >>>>>>> One
>> >>>>>>>>>>> trade off might be that the leader broker could slow down the
>> >>>>>>> write
>> >>>>>>>>>>> path, but given that the leader would just need to verify the
>> >>>>>>> current
>> >>>>>>>>>>> state of the bundle, I think it'd be performant enough.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Additionally, we'd need the leader broker to be "caught up"
>> >> on
>> >>>>>>> bundle
>> >>>>>>>>>>> ownership in order to grant ownership of topics, but unless
>> >> I am
>> >>>>>>>>>>> mistaken, that is already a requirement of the current PIP
>> >> 192
>> >>>>>>>>>>> paradigm.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Below are some additional thoughts that will be relevant if
>> >> we
>> >>>>>>> move
>> >>>>>>>>>>> forward with the design as it is currently proposed.
>> >>>>>>>>>>>
>> >>>>>>>>>>> I think it might be helpful to update the title to show that
>> >> this
>> >>>>>>>>>>> proposal will also affect table view as well. I didn't catch
>> >>>>>>> that at
>> >>>>>>>>>>> first.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Do you have any documentation describing how the
>> >>>>>>>>>>> TopicCompactionStrategy will determine which states are
>> >> valid in
>> >>>>>>> the
>> >>>>>>>>>>> context of load balancing? I looked at
>> >>>>>>>>>>> https://github.com/apache/pulsar/pull/18195, but I couldn't
>> >>>>>>> seem to
>> >>>>>>>>>>> find anything for it. That would help make this proposal less
>> >>>>>>>>>>> abstract.
>> >>>>>>>>>>>
>> >>>>>>>>>>> The proposed API seems very tied to the needs of PIP 192. For
>> >>>>>>> example,
>> >>>>>>>>>>> `isValid` is not a term I associate with topic compaction.
>> >> The
>> >>>>>>>>>>> fundamental question for compaction is which value to keep
>> >> (or
>> >>>>>>> build a
>> >>>>>>>>>>> new value). I think we might be able to simplify the API by
>> >>>>>>> replacing
>> >>>>>>>>>>> the "isValid", "isMergeEnabled", and "merge" methods with a
>> >>>>>>> single
>> >>>>>>>>>>> method that lets the implementation handle one or all tasks.
>> >> That
>> >>>>>>>>>>> would also remove the need to deserialize payloads multiple
>> >>>>>>> times too.
>> >>>>>>>>>>>
>> >>>>>>>>>>> I also feel like mentioning that after working with the PIP
>> >> 105
>> >>>>>>> broker
>> >>>>>>>>>>> side filtering, I think we should avoid running UDFs in the
>> >>>>>>> broker as
>> >>>>>>>>>>> much as possible. (I do not consider the load balancing
>> >> logic to
>> >>>>>>> be a
>> >>>>>>>>>>> UDF here.) I think it would be worth not making this a user
>> >>>>>>> facing
>> >>>>>>>>>>> feature unless there is demand for real use cases.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Thanks!
>> >>>>>>>>>>> Michael
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org>
>> >> wrote:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> +1(non-binding)
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> thanks,
>> >>>>>>>>>>>> bo
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Heesung Sohn <he...@streamnative.io.invalid>
>> >>>>>>> 于2022年10月19日周三
>> >>>>>>>>>> 07:54写道:
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Hi pulsar-dev community,
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> I raised a pip to discuss : PIP-215: Configurable Topic
>> >>>>>>> Compaction
>> >>>>>>>>>> Strategy
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> PIP link: https://github.com/apache/pulsar/issues/18099
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Regards,
>> >>>>>>>>>>>>> Heesung
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>
>>
>>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Hi,

> I would like to understand what the performance tradeoffs are for the
changes that you (as an individual) and others are proposing.
The problem that this PIP is trying to solve is how to support custom topic
compaction logic. (currently, Pulsar only takes the latest message in the
topic compaction and table-view(consumers))

As Michael and I discussed in the above emails, in the worst case, when
there are many conflicting messages, this PIP can incur more repeated
custom compaction than the alternative as individual consumers need to
compact messages (topic compaction and table views). However, one of the
advantages of this proposal is that pub/sub is faster since it uses a
single topic. For example, in PIP-192, if the "bundle assignment" broadcast
is fast enough, conflicting bundle assignment requests can be
reduced(broadcast filter effect).


Post-Compaction(this PIP proposes)

   - - Producers publish messages to a single topic.
   - - All consumers individually run the custom compaction logic when
   consuming the topic (by table-view).
   - - Compactor needs to run the custom compaction logic during compaction.



The alternative that Michael proposed is instead compacting messages at the
earlier stage by a single writer, using two topics.
Pre-Compaction(the alternative that Michael proposes)

   - - Producers publish messages to a non-compacted topic first.
   - - Only the leader consumes this non-compacted topic and runs the
   custom compaction logic.
   - - Then, the leader publishes compacted messages to the compacted
   topic(resolve conflicts by the single writer).
   - - All consumers consume the compacted topic. (no need to compact the
   messages separately on the consumer side)
   - - Compactor does not need to run the custom compaction logic during
   compaction.



> It really seems that you are proposing a change to the default behavior
whether or not a user chooses to use the interface in PIP-192.
The pip does not change the default behavior of compaction and table-view.
I updated the goals in the PIP to clarify this.

Thanks,
Heesung


On Fri, Nov 4, 2022 at 11:11 AM Dave Fisher <wa...@apache.org> wrote:

>
>
> > On Nov 4, 2022, at 10:28 AM, Heesung Sohn <he...@streamnative.io.INVALID>
> wrote:
> >
> > Hi,
> >
> > I think `bool shouldKeepLeft(T prev, T cur)` is clearer. I updated the
> PIP.
> >
> > Hopefully, we provided enough context about this PIP and the design
> > trade-off as well.
>
> I would like to understand what the performance tradeoffs are for the
> changes that you (as an individual) and others are proposing.
>
> > Goal
> >
> >       • Create another Topic compactor, StrategicTwoPhaseCompactor,
> where we can configure a compaction strategy,
> > TopicCompactionStrategy
> >
> >       • Update the TableViewConfigurationData to load and consider the
> TopicCompactionStrategy when updating the internal key-value map in
> TableView.
> >
> >       • Add TopicCompactionStrategy in Topic-level Policy to run
> StrategicTwoPhaseCompactor instead of TwoPhaseCompactor when executing
> compaction.
>
> It really seems that you are proposing a change to the default behavior
> whether or not a user chooses to use the interface in PIP-192.
>
> >
> > I will send out a vote email soon.
> >
> > Thank you,
> > Heesung
> >
> >
> >
> >
> >
> >
> > On Thu, Nov 3, 2022 at 9:59 PM Michael Marshall <mm...@apache.org>
> > wrote:
> >
> >> Thank you for your detailed responses, Heesung.
> >>
> >>> We are not planning to expose this feature to users
> >>> soon unless demanded and proven to be stable.
> >>
> >> In that case, I think we should move forward with this PIP. I have a
> >> different opinion about the trade offs for the two designs, but none
> >> of my concerns are problems that could not be solved later if we
> >> encounter problems.
> >>
> >> Just to say it succinctly, my concern is that broadcasting all
> >> attempts to acquire ownership of every unclaimed bundle to all brokers
> >> will generate a lot of unnecessary traffic.
> >>
> >>>
> >>
> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a
> >>
> >> Thank you for this reference. I missed it. That is great documentation!
> >>
> >>> In fact, with the `compact(T prev, T cur)` api only, it is not clear if
> >>> prev => cur is a valid transition or not(if invalid, we should filter
> out
> >>> the cur message instead of further compacting/merging). I think we
> still
> >>> need to keep the `isValid()` and `merge()` separated.
> >>
> >> I was thinking that the result of `compact` would be the result put in
> >> the table view or written to the compacted topic. The one issue might
> >> be about keeping the memory utilization down for use cases that are
> >> not updating the message's value but are only selecting "left" or
> >> "right". I thought we could infer when to keep the message id vs keep
> >> the message value, but that might be easy to implement.
> >>
> >> My final critique is that I think `isValid` could have a better name.
> >> In the event this does become a public API, I don't think all use
> >> cases will think about which event should be persisted in terms of
> >> validity.
> >>
> >> The main name that comes to my mind is `bool shouldKeepLeft(T prev, T
> >> cur)`. When true, prev wins. When false, cur wins. That nomenclature
> >> comes from Akka Streams. It's not perfect, but it is easy to infer
> >> what the result will do.
> >>
> >>> Regarding redundant deserialization, the input type `T` is the type of
> >>> message value, so the input values are already deserialized.
> >>
> >> Great, I should have realized that. That takes care of that concern.
> >>
> >> Thanks,
> >> Michael
> >>
> >> On Thu, Nov 3, 2022 at 7:37 PM Heesung Sohn
> >> <he...@streamnative.io.invalid> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I have a different thought about my previous comment.
> >>>
> >>> - Agreed with your point that we should merge CompactionStrategy APIs.
> I
> >>> updated the interface proposal in the PIP. I replaced `"isValid",
> >>> "isMergeEnabled", and "merge"` apis with "compact" api.
> >>>
> >>> boolean isValid(T prev, T cur)
> >>> boolean isMergeEnabled()
> >>> T merge(T prev, T cur)
> >>>
> >>> =>
> >>>
> >>> T compact(T prev, T cur)
> >>>
> >>> In fact, with the `compact(T prev, T cur)` api only, it is not clear if
> >>> prev => cur is a valid transition or not(if invalid, we should filter
> out
> >>> the cur message instead of further compacting/merging). I think we
> still
> >>> need to keep the `isValid()` and `merge()` separated.
> >>>
> >>> Regarding redundant deserialization, the input type `T` is the type of
> >>> message value, so the input values are already deserialized. We don't
> >> want
> >>> to expose the Message<T> interface in this CompactionStrategy to avoid
> >>> message serialization/deserialization dependencies in the
> >>> CompactionStrategy.
> >>>
> >>> The `merge()` functionality is suggested for more complex use cases
> >> (merge
> >>> values instead of just filtering), and to support this `merge()`, we
> need
> >>> to internally create a new msg with the compacted value, metadata, and
> >>> messageId copies. We could initially define `isValid()` only in
> >>> CompactionStrategy, and add `isMergeEnabled() and merge()` later in the
> >>> CompactionStrategy interface if requested.
> >>>
> >>> Regards,
> >>> Heesung
> >>>
> >>>
> >>> On Thu, Nov 3, 2022 at 9:50 AM Heesung Sohn <
> >> heesung.sohn@streamnative.io>
> >>> wrote:
> >>>
> >>>> Oops! Michael, I apologize for the typo in your name.
> >>>>
> >>>> On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <
> >> heesung.sohn@streamnative.io>
> >>>> wrote:
> >>>>
> >>>>> Hi Machel,
> >>>>>
> >>>>> Here are my additional comments regarding your earlier email.
> >>>>>
> >>>>> - I updated the PIP title to show that this will impact table view as
> >>>>> well.
> >>>>>
> >>>>> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the
> >>>>> general idea of the states and their actions, and I defined the
> actual
> >>>>> states here in the PR,
> >>>>>
> >>
> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a
> .
> >> I
> >>>>> will further clarify the bundle state data validation logic when
> >>>>> introducing `BundleStateCompactionStrategy` class. This PIP is to
> >> support
> >>>>> CompactionStrategy in general.
> >>>>>
> >>>>> - Agreed with your point that we should merge CompactionStrategy
> >> APIs. I
> >>>>> updated the interface proposal in the PIP. I replaced `"isValid",
> >>>>> "isMergeEnabled", and "merge"` apis with "compact" api.
> >>>>>
> >>>>>
> >>>>> Thanks,
> >>>>> Heesung
> >>>>>
> >>>>>
> >>>>> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <
> >>>>> heesung.sohn@streamnative.io> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>> Thank you for the great comments.
> >>>>>> Please find my comments inline too.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Heesung
> >>>>>>
> >>>>>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <
> >> mmarshall@apache.org>
> >>>>>> wrote:
> >>>>>>
> >>>>>>>> I think we lose a single linearized view.
> >>>>>>>
> >>>>>>> Which linearized view are we losing, and what is the role of that
> >>>>>>> linearized view? I think I might be missing why it is important. I
> >>>>>>> agree that consumers won't know about each unsuccessful attempted
> >>>>>>> acquisition of a bundle, but that seems like unnecessary
> information
> >>>>>>> to broadcast to every broker in the cluster.
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>> PIP-192 proposes an assignment, transfer, and split
> >> protocol(multi-phase
> >>>>>> state changes), relying on early broadcast across brokers, and all
> >> brokers
> >>>>>> react to their clients according to the state change notifications
> --
> >>>>>> brokers could defer any client lookups for bundle x if an
> >>>>>> assignment/transfer/split is ongoing for x(broadcasted early in the
> >> topic).
> >>>>>> One early broadcast example is the one that I discussed above, `When
> >> the
> >>>>>> topic broadcast is faster than the concurrent assignment requests.`
> >> I think
> >>>>>> the prefilter could delay this early broadcast, as it needs to go
> >> through
> >>>>>> the additional single-leader compaction path.
> >>>>>>
> >>>>>> The bundle state recovery process is simpler by a single linearized
> >> view.
> >>>>>>
> >>>>>> The single linearized view can be easier to debug bundle states. We
> >> can
> >>>>>> more easily track where the assignment requests come from and how it
> >> is
> >>>>>> compacted in a single linearized view.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>>> I think the leader requires a write-through cache to compact
> >> messages
> >>>>>>> based
> >>>>>>>> on the latest states.
> >>>>>>>
> >>>>>> This brings up an important point that I would like to clarify. If
> we
> >>>>>>> trust the write ahead log as the source of truth, what happens when
> >> a
> >>>>>>> bundle has been validly owned by multiple brokers? As a broker
> >> starts
> >>>>>>> and consumes from the compacted topic, how do we prevent it from
> >>>>>>> incorrectly thinking that it owns a bundle for some short time
> >> period
> >>>>>>> in the case that the ownership topic hasn't yet been compacted to
> >>>>>>> remove old ownership state?
> >>>>>>>
> >>>>>>
> >>>>>> Since the multi-phase transfer protocol involves the source and
> >>>>>> destination broker's actions, the successful transfer should get the
> >> source
> >>>>>> and destination broker to have the (near) latest state. For example,
> >> if
> >>>>>> some brokers have old ownership states(network partitioned or
> >> delayed),
> >>>>>> they will redirect clients to the source(old) broker. However, by
> the
> >>>>>> transfer protocol, the source broker should have the latest state,
> >> so it
> >>>>>> can redirect the client again to the destination broker.
> >>>>>>
> >>>>>> When a broker restarts, it won't start until its BSC state to the
> >> (near)
> >>>>>> latest (til the last known messageId at that time).
> >>>>>>
> >>>>>>
> >>>>>>>> Pulsar guarantees "a single writer".
> >>>>>>>
> >>>>>>> I didn't think we were using a single writer in the PIP 192 design.
> >> I
> >>>>>>> thought we had many producers sending events to a compacted topic.
> >> My
> >>>>>>> proposal would still have many producers, but the writer to
> >> bookkeeper
> >>>>>>> would act as the single writer. It would technically be distinct
> >> from
> >>>>>>> a normal Pulsar topic producer.
> >>>>>>>
> >>>>>>> I should highlight that I am only proposing "broker filtering
> before
> >>>>>>> write" in the context of PIP 192 and as an alternative to adding
> >>>>>>> pluggable compaction strategies. It would not be a generic feature.
> >>>>>>>
> >>>>>>>
> >>>>>> I was worried about the worst case where two producers(leaders)
> >> happen
> >>>>>> to write the compacted topic (although Pulsar can guarantee "a
> single
> >>>>>> writer" or "a single producer" for a topic in normal situations).
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>>> Could we clarify how to handle
> >>>>>>>> the following(edge cases and failure recovery)?
> >>>>>>>> 0. Is the un-compacted topic a persistent topic or a
> >> non-persistent
> >>>>>>> topic?
> >>>>>>>
> >>>>>>> It is a persistent topic.
> >>>>>>>
> >>>>>>>> 1. How does the leader recover state from the two topics?
> >>>>>>>
> >>>>>>> A leader would recover state by first consuming the whole compacted
> >>>>>>> topic and then by consuming from the current location of a cursor
> on
> >>>>>>> the first input topic. As stated elsewhere, this introduces latency
> >>>>>>> and could be an issue.
> >>>>>>>
> >>>>>>>> 2. How do we handle the case when the leader fails before writing
> >>>>>>> messages
> >>>>>>>> to the compacted topic
> >>>>>>>
> >>>>>>> The leader would not acknowledge the message on the input topic
> >> until
> >>>>>>> it has successfully persisted the event on the compacted topic.
> >>>>>>> Publishing the same event to a compacted topic multiple times is
> >>>>>>> idempotent, so there is no risk of lost state. The real risk is
> >>>>>>> latency. However, I think we might have similar (though not the
> >> same)
> >>>>>>> latency risks in the current solution.
> >>>>>>>
> >>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the
> >> number
> >>>>>>> of
> >>>>>>>> messages to broadcast at the expense of the leader broker
> >> compaction.
> >>>>>>>
> >>>>>>> My primary point is that with this PIP's design, the filter logic
> is
> >>>>>>> run on every broker and again during topic compaction. With the
> >>>>>>> alternative design, the filter is run once.
> >>>>>>>
> >>>>>>> Thank you for the clarification.
> >>>>>>
> >>>>>> I think the difference is that the post-filter is an optimistic
> >> approach
> >>>>>> as it optimistically relies on the "broadcast-filter" effect(brokers
> >> will
> >>>>>> defer client lookups if notified ahead that any assignment is
> >> ongoing for
> >>>>>> bundle x). Yes, in the worst case, if the broadcast is slower, each
> >> broker
> >>>>>> needs to individually compact the conflicting assignment requests.
> >>>>>>
> >>>>>> Conversely, one downside of the pessimistic approach (single leader
> >>>>>> pre-filter) is that when there are not many conflict concurrent
> >> assignment
> >>>>>> requests(assign for bundle a, assign for bundle b, assign for bundle
> >> c...),
> >>>>>> the requests need to redundantly go through the leader compaction.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>>> 3. initially less complex to implement (leaderless conflict
> >>>>>>> resolution and
> >>>>>>>> requires a single topic)
> >>>>>>>
> >>>>>>> PIP 215 has its own complexity too. Coordinating filters
> >>>>>>> on both the client (table view) and the server (compaction) is non
> >>>>>>> trivial. The proposed API includes hard coded client configuration
> >> for
> >>>>>>> each component, which will make upgrading the version of the
> >>>>>>> compaction strategy complicated, and could lead to incorrect
> >>>>>>> interpretation of events in the stream. When a single broker is
> >> doing
> >>>>>>> the filtering, versioning is no longer a distributed problem. That
> >>>>>>> being said, I do not mean to suggest my solution is without
> >>>>>>> complexity.
> >>>>>>>
> >>>>>>>> 4. it is not a "one-way door" decision (we could add the
> >> pre-filter
> >>>>>>> logic
> >>>>>>>> as well later)
> >>>>>>>
> >>>>>>> It's fair to say that we could add it later, but at that point, we
> >>>>>>> will have added this new API for compaction strategy. Are we
> >> confident
> >>>>>>> that pluggable compaction is independently an important addition to
> >>>>>>> Pulsar's
> >>>>>>> features, or would it make sense to make this API only exposed in
> >> the
> >>>>>>> broker?
> >>>>>>>
> >>>>>>>
> >>>>>> The intention is that this compaction feature could be useful for
> >>>>>> complex user applications (if they are trying to do a similar
> >> thing). As I
> >>>>>> mentioned, this feature is closely tied to the PIP-192 now. We are
> >> not
> >>>>>> planning to expose this feature to users soon unless demanded and
> >> proven to
> >>>>>> be stable.
> >>>>>>
> >>>>>>
> >>>>>>> Thanks,
> >>>>>>> Michael
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
> >>>>>>> <he...@streamnative.io.invalid> wrote:
> >>>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> Also, I thought about some concurrent assignment scenarios between
> >>>>>>>> pre-filter vs post-filter.
> >>>>>>>>
> >>>>>>>> Example 1: When the topic broadcast is slower than the concurrent
> >>>>>>>> assignment requests
> >>>>>>>>
> >>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics)
> >>>>>>>> t1: A -> non-compacted topic // broker A published a message to
> >> the
> >>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A}
> >>>>>>>> t2: B -> non-compacted topic // broker B published a message, m2:
> >>>>>>> {broker B
> >>>>>>>> assigned bundle x to broker C}
> >>>>>>>> t3: C -> non-compacted topic // broker C published a message, m3:
> >>>>>>> {broker C
> >>>>>>>> assigned bundle x to broker B}
> >>>>>>>> t4: non-compacted topic -> L // leader broker consumed the
> >> messages:
> >>>>>>> m1,m2,
> >>>>>>>> and m3
> >>>>>>>> t5: L -> compacted topic // leader compacted the messages and
> >>>>>>> broadcasted
> >>>>>>>> m1 to all consumers
> >>>>>>>> t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
> >>>>>>>>
> >>>>>>>> With post-filter + a single topic
> >>>>>>>> t1: A -> topic // broker A published a message to the
> >> non-compacted
> >>>>>>> topic,
> >>>>>>>> m1: {broker A assigned bundle x to broker A}
> >>>>>>>> t2: B -> topic // broker B published a message, m2: {broker B
> >> assigned
> >>>>>>>> bundle x to broker C}
> >>>>>>>> t3: C -> topic // broker C published a message, m3: {broker C
> >> assigned
> >>>>>>>> bundle x to broker B}
> >>>>>>>> t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2,
> >>>>>>> and m3
> >>>>>>>> t5: [A,B,C] -> m1 // broker A,B,C individually compacted the
> >> messages
> >>>>>>> to m1.
> >>>>>>>>
> >>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the
> >> number
> >>>>>>> of
> >>>>>>>> messages to broadcast at the expense of the leader broker
> >> compaction.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Example 2: When the topic broadcast is faster than the concurrent
> >>>>>>>> assignment requests
> >>>>>>>>
> >>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics)
> >>>>>>>> t1: A -> non-compacted topic // broker A published a message to
> >> the
> >>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A}
> >>>>>>>> t2: non-compacted topic -> L // leader broker consumed the
> >> messages:
> >>>>>>> m1
> >>>>>>>> t3: L -> compacted topic // leader compacted the message and
> >>>>>>> broadcasted m1
> >>>>>>>> to all consumers
> >>>>>>>> t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
> >>>>>>>> t5: A-> own bundle // broker A knows that its assignment has been
> >>>>>>> accepted,
> >>>>>>>> so proceeding to own the bundle (meanwhile deferring lookup
> >> requests)
> >>>>>>>> t6: B -> defer client lookups // broker B knows that bundle
> >>>>>>> assignment is
> >>>>>>>> running(meanwhile deferring lookup requests)
> >>>>>>>> t7: C -> defer client lookups // broker C knows that bundle
> >>>>>>> assignment is
> >>>>>>>> running(meanwhile deferring lookup requests)
> >>>>>>>>
> >>>>>>>> With post-filter + a single topic
> >>>>>>>> t1: A -> topic // broker A published a message to the
> >> non-compacted
> >>>>>>> topic,
> >>>>>>>> m1: {broker A assigned bundle x to broker A}
> >>>>>>>> t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
> >>>>>>>> t3:  A-> own bundle // broker A knows that its assignment has been
> >>>>>>>> accepted, so proceeding to own the bundle (meanwhile deferring
> >> lookup
> >>>>>>>> requests)
> >>>>>>>> t4: B -> defer client lookups // broker B knows that bundle
> >>>>>>> assignment is
> >>>>>>>> running(meanwhile deferring lookup requests)
> >>>>>>>> t5: C -> defer client lookups // broker C knows that bundle
> >>>>>>> assignment is
> >>>>>>>> running(meanwhile deferring lookup requests)
> >>>>>>>>
> >>>>>>>> Analysis: The "post-filter + a single topic" can perform ok in
> >> this
> >>>>>>> case
> >>>>>>>> without the additional leader coordination and the secondary topic
> >>>>>>> because
> >>>>>>>> the early broadcast can inform all brokers and prevent them from
> >>>>>>> requesting
> >>>>>>>> other assignments for the same bundle.
> >>>>>>>>
> >>>>>>>> I think the post-filter option is initially not bad because:
> >>>>>>>>
> >>>>>>>> 1. it is safe in the worst case (in case the messages are not
> >>>>>>> correctly
> >>>>>>>> pre-filtered at the leader)
> >>>>>>>> 2. it performs ok because the early broadcast can prevent
> >>>>>>>> concurrent assignment requests.
> >>>>>>>> 3. initially less complex to implement (leaderless conflict
> >>>>>>> resolution and
> >>>>>>>> requires a single topic)
> >>>>>>>> 4. it is not a "one-way door" decision (we could add the
> >> pre-filter
> >>>>>>> logic
> >>>>>>>> as well later)
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Heesung
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
> >>>>>>> heesung.sohn@streamnative.io>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Michael,
> >>>>>>>>>
> >>>>>>>>> For the pre-prefilter(pre-compaction) option,
> >>>>>>>>> I think the leader requires a write-through cache to compact
> >>>>>>> messages
> >>>>>>>>> based on the latest states. Otherwise, the leader needs to wait
> >> for
> >>>>>>> the
> >>>>>>>>> last msg from the (compacted) topic before compacting the next
> >> msg
> >>>>>>> for the
> >>>>>>>>> same bundle.
> >>>>>>>>>
> >>>>>>>>> Pulsar guarantees "a single writer". However, for the worst-case
> >>>>>>>>> scenario(due to network partitions, bugs in zk or etcd leader
> >>>>>>> election,
> >>>>>>>>> bugs in bk, data corruption ), I think it is safe to place the
> >>>>>>> post-filter
> >>>>>>>>> on the consumer side(compaction and table views) as well in
> >> order to
> >>>>>>>>> validate the state changes.
> >>>>>>>>>
> >>>>>>>>> For the two-topic approach,
> >>>>>>>>> I think we lose a single linearized view. Could we clarify how
> >> to
> >>>>>>> handle
> >>>>>>>>> the following(edge cases and failure recovery)?
> >>>>>>>>> 0. Is the un-compacted topic a persistent topic or a
> >> non-persistent
> >>>>>>> topic?
> >>>>>>>>> 1. How does the leader recover state from the two topics?
> >>>>>>>>> 2. How do we handle the case when the leader fails before
> >> writing
> >>>>>>> messages
> >>>>>>>>> to the compacted topic
> >>>>>>>>>
> >>>>>>>>> Regards,
> >>>>>>>>> Heesung
> >>>>>>>>>
> >>>>>>>>> On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
> >>>>>>> mmarshall@apache.org>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Sharing some more thoughts. We could alternatively use two
> >> topics
> >>>>>>>>>> instead of one. In this design, the first topic is the
> >> unfiltered
> >>>>>>>>>> write ahead log that represents many writers (brokers) trying
> >> to
> >>>>>>>>>> acquire ownership of bundles. The second topic is the
> >> distilled log
> >>>>>>>>>> that represents the "winners" or the "owners" of the bundles.
> >>>>>>> There is
> >>>>>>>>>> a single writer, the leader broker, that reads from the input
> >> topic
> >>>>>>>>>> and writes to the output topic. The first topic is normal and
> >> the
> >>>>>>>>>> second is compacted.
> >>>>>>>>>>
> >>>>>>>>>> The primary benefit in a two topic solution is that it is easy
> >> for
> >>>>>>> the
> >>>>>>>>>> leader broker to trade off ownership without needing to slow
> >> down
> >>>>>>>>>> writes to the input topic. The leader broker will start
> >> consuming
> >>>>>>> from
> >>>>>>>>>> the input topic when it has fully consumed the table view on
> >> the
> >>>>>>>>>> output topic. In general, I don't think consumers know when
> >> they
> >>>>>>> have
> >>>>>>>>>> "reached the end of a table view", but we should be able to
> >>>>>>> trivially
> >>>>>>>>>> figure this out if we are the topic's only writer and the
> >> topic and
> >>>>>>>>>> writer are collocated on the same broker.
> >>>>>>>>>>
> >>>>>>>>>> In that design, it might make sense to use something like the
> >>>>>>>>>> replication cursor to keep track of this consumer's state.
> >>>>>>>>>>
> >>>>>>>>>> - Michael
> >>>>>>>>>>
> >>>>>>>>>> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
> >>>>>>> mmarshall@apache.org>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for your proposal, Heesung.
> >>>>>>>>>>>
> >>>>>>>>>>> Fundamentally, we have the problems listed in this PIP
> >> because
> >>>>>>> we have
> >>>>>>>>>>> multiple writers instead of just one writer. Can we solve
> >> this
> >>>>>>> problem
> >>>>>>>>>>> by changing our write pattern? What if we use the leader
> >> broker
> >>>>>>> as the
> >>>>>>>>>>> single writer? That broker would intercept attempts to
> >> acquire
> >>>>>>>>>>> ownership on bundles and would grant ownership to the first
> >>>>>>> broker to
> >>>>>>>>>>> claim an unassigned bundle. It could "grant ownership" by
> >>>>>>> letting the
> >>>>>>>>>>> first write to claim an unassigned bundle get written to the
> >>>>>>> ownership
> >>>>>>>>>>> topic. When a bundle is already owned, the leader won't
> >> persist
> >>>>>>> that
> >>>>>>>>>>> event to the bookkeeper. In this design, the log becomes a
> >> true
> >>>>>>>>>>> ownership log, which will correctly work with the existing
> >> topic
> >>>>>>>>>>> compaction and table view solutions. My proposal essentially
> >>>>>>> moves the
> >>>>>>>>>>> conflict resolution to just before the write, and as a
> >>>>>>> consequence, it
> >>>>>>>>>>> greatly reduces the need for post processing of the event
> >> log.
> >>>>>>> One
> >>>>>>>>>>> trade off might be that the leader broker could slow down the
> >>>>>>> write
> >>>>>>>>>>> path, but given that the leader would just need to verify the
> >>>>>>> current
> >>>>>>>>>>> state of the bundle, I think it'd be performant enough.
> >>>>>>>>>>>
> >>>>>>>>>>> Additionally, we'd need the leader broker to be "caught up"
> >> on
> >>>>>>> bundle
> >>>>>>>>>>> ownership in order to grant ownership of topics, but unless
> >> I am
> >>>>>>>>>>> mistaken, that is already a requirement of the current PIP
> >> 192
> >>>>>>>>>>> paradigm.
> >>>>>>>>>>>
> >>>>>>>>>>> Below are some additional thoughts that will be relevant if
> >> we
> >>>>>>> move
> >>>>>>>>>>> forward with the design as it is currently proposed.
> >>>>>>>>>>>
> >>>>>>>>>>> I think it might be helpful to update the title to show that
> >> this
> >>>>>>>>>>> proposal will also affect table view as well. I didn't catch
> >>>>>>> that at
> >>>>>>>>>>> first.
> >>>>>>>>>>>
> >>>>>>>>>>> Do you have any documentation describing how the
> >>>>>>>>>>> TopicCompactionStrategy will determine which states are
> >> valid in
> >>>>>>> the
> >>>>>>>>>>> context of load balancing? I looked at
> >>>>>>>>>>> https://github.com/apache/pulsar/pull/18195, but I couldn't
> >>>>>>> seem to
> >>>>>>>>>>> find anything for it. That would help make this proposal less
> >>>>>>>>>>> abstract.
> >>>>>>>>>>>
> >>>>>>>>>>> The proposed API seems very tied to the needs of PIP 192. For
> >>>>>>> example,
> >>>>>>>>>>> `isValid` is not a term I associate with topic compaction.
> >> The
> >>>>>>>>>>> fundamental question for compaction is which value to keep
> >> (or
> >>>>>>> build a
> >>>>>>>>>>> new value). I think we might be able to simplify the API by
> >>>>>>> replacing
> >>>>>>>>>>> the "isValid", "isMergeEnabled", and "merge" methods with a
> >>>>>>> single
> >>>>>>>>>>> method that lets the implementation handle one or all tasks.
> >> That
> >>>>>>>>>>> would also remove the need to deserialize payloads multiple
> >>>>>>> times too.
> >>>>>>>>>>>
> >>>>>>>>>>> I also feel like mentioning that after working with the PIP
> >> 105
> >>>>>>> broker
> >>>>>>>>>>> side filtering, I think we should avoid running UDFs in the
> >>>>>>> broker as
> >>>>>>>>>>> much as possible. (I do not consider the load balancing
> >> logic to
> >>>>>>> be a
> >>>>>>>>>>> UDF here.) I think it would be worth not making this a user
> >>>>>>> facing
> >>>>>>>>>>> feature unless there is demand for real use cases.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks!
> >>>>>>>>>>> Michael
> >>>>>>>>>>>
> >>>>>>>>>>> On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org>
> >> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> +1(non-binding)
> >>>>>>>>>>>>
> >>>>>>>>>>>> thanks,
> >>>>>>>>>>>> bo
> >>>>>>>>>>>>
> >>>>>>>>>>>> Heesung Sohn <he...@streamnative.io.invalid>
> >>>>>>> 于2022年10月19日周三
> >>>>>>>>>> 07:54写道:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi pulsar-dev community,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I raised a pip to discuss : PIP-215: Configurable Topic
> >>>>>>> Compaction
> >>>>>>>>>> Strategy
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> PIP link: https://github.com/apache/pulsar/issues/18099
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>> Heesung
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>
>
>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Dave Fisher <wa...@apache.org>.

> On Nov 4, 2022, at 10:28 AM, Heesung Sohn <he...@streamnative.io.INVALID> wrote:
> 
> Hi,
> 
> I think `bool shouldKeepLeft(T prev, T cur)` is clearer. I updated the PIP.
> 
> Hopefully, we provided enough context about this PIP and the design
> trade-off as well.

I would like to understand what the performance tradeoffs are for the changes that you (as an individual) and others are proposing.

> Goal
> 
> 	• Create another Topic compactor, StrategicTwoPhaseCompactor, where we can configure a compaction strategy,
> TopicCompactionStrategy
> 
> 	• Update the TableViewConfigurationData to load and consider the TopicCompactionStrategy when updating the internal key-value map in TableView.
> 
> 	• Add TopicCompactionStrategy in Topic-level Policy to run StrategicTwoPhaseCompactor instead of TwoPhaseCompactor when executing compaction.

It really seems that you are proposing a change to the default behavior whether or not a user chooses to use the interface in PIP-192.

> 
> I will send out a vote email soon.
> 
> Thank you,
> Heesung
> 
> 
> 
> 
> 
> 
> On Thu, Nov 3, 2022 at 9:59 PM Michael Marshall <mm...@apache.org>
> wrote:
> 
>> Thank you for your detailed responses, Heesung.
>> 
>>> We are not planning to expose this feature to users
>>> soon unless demanded and proven to be stable.
>> 
>> In that case, I think we should move forward with this PIP. I have a
>> different opinion about the trade offs for the two designs, but none
>> of my concerns are problems that could not be solved later if we
>> encounter problems.
>> 
>> Just to say it succinctly, my concern is that broadcasting all
>> attempts to acquire ownership of every unclaimed bundle to all brokers
>> will generate a lot of unnecessary traffic.
>> 
>>> 
>> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a
>> 
>> Thank you for this reference. I missed it. That is great documentation!
>> 
>>> In fact, with the `compact(T prev, T cur)` api only, it is not clear if
>>> prev => cur is a valid transition or not(if invalid, we should filter out
>>> the cur message instead of further compacting/merging). I think we still
>>> need to keep the `isValid()` and `merge()` separated.
>> 
>> I was thinking that the result of `compact` would be the result put in
>> the table view or written to the compacted topic. The one issue might
>> be about keeping the memory utilization down for use cases that are
>> not updating the message's value but are only selecting "left" or
>> "right". I thought we could infer when to keep the message id vs keep
>> the message value, but that might be easy to implement.
>> 
>> My final critique is that I think `isValid` could have a better name.
>> In the event this does become a public API, I don't think all use
>> cases will think about which event should be persisted in terms of
>> validity.
>> 
>> The main name that comes to my mind is `bool shouldKeepLeft(T prev, T
>> cur)`. When true, prev wins. When false, cur wins. That nomenclature
>> comes from Akka Streams. It's not perfect, but it is easy to infer
>> what the result will do.
>> 
>>> Regarding redundant deserialization, the input type `T` is the type of
>>> message value, so the input values are already deserialized.
>> 
>> Great, I should have realized that. That takes care of that concern.
>> 
>> Thanks,
>> Michael
>> 
>> On Thu, Nov 3, 2022 at 7:37 PM Heesung Sohn
>> <he...@streamnative.io.invalid> wrote:
>>> 
>>> Hi,
>>> 
>>> I have a different thought about my previous comment.
>>> 
>>> - Agreed with your point that we should merge CompactionStrategy APIs. I
>>> updated the interface proposal in the PIP. I replaced `"isValid",
>>> "isMergeEnabled", and "merge"` apis with "compact" api.
>>> 
>>> boolean isValid(T prev, T cur)
>>> boolean isMergeEnabled()
>>> T merge(T prev, T cur)
>>> 
>>> =>
>>> 
>>> T compact(T prev, T cur)
>>> 
>>> In fact, with the `compact(T prev, T cur)` api only, it is not clear if
>>> prev => cur is a valid transition or not(if invalid, we should filter out
>>> the cur message instead of further compacting/merging). I think we still
>>> need to keep the `isValid()` and `merge()` separated.
>>> 
>>> Regarding redundant deserialization, the input type `T` is the type of
>>> message value, so the input values are already deserialized. We don't
>> want
>>> to expose the Message<T> interface in this CompactionStrategy to avoid
>>> message serialization/deserialization dependencies in the
>>> CompactionStrategy.
>>> 
>>> The `merge()` functionality is suggested for more complex use cases
>> (merge
>>> values instead of just filtering), and to support this `merge()`, we need
>>> to internally create a new msg with the compacted value, metadata, and
>>> messageId copies. We could initially define `isValid()` only in
>>> CompactionStrategy, and add `isMergeEnabled() and merge()` later in the
>>> CompactionStrategy interface if requested.
>>> 
>>> Regards,
>>> Heesung
>>> 
>>> 
>>> On Thu, Nov 3, 2022 at 9:50 AM Heesung Sohn <
>> heesung.sohn@streamnative.io>
>>> wrote:
>>> 
>>>> Oops! Michael, I apologize for the typo in your name.
>>>> 
>>>> On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <
>> heesung.sohn@streamnative.io>
>>>> wrote:
>>>> 
>>>>> Hi Machel,
>>>>> 
>>>>> Here are my additional comments regarding your earlier email.
>>>>> 
>>>>> - I updated the PIP title to show that this will impact table view as
>>>>> well.
>>>>> 
>>>>> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the
>>>>> general idea of the states and their actions, and I defined the actual
>>>>> states here in the PR,
>>>>> 
>> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a.
>> I
>>>>> will further clarify the bundle state data validation logic when
>>>>> introducing `BundleStateCompactionStrategy` class. This PIP is to
>> support
>>>>> CompactionStrategy in general.
>>>>> 
>>>>> - Agreed with your point that we should merge CompactionStrategy
>> APIs. I
>>>>> updated the interface proposal in the PIP. I replaced `"isValid",
>>>>> "isMergeEnabled", and "merge"` apis with "compact" api.
>>>>> 
>>>>> 
>>>>> Thanks,
>>>>> Heesung
>>>>> 
>>>>> 
>>>>> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <
>>>>> heesung.sohn@streamnative.io> wrote:
>>>>> 
>>>>>> Hi,
>>>>>> Thank you for the great comments.
>>>>>> Please find my comments inline too.
>>>>>> 
>>>>>> Regards,
>>>>>> Heesung
>>>>>> 
>>>>>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <
>> mmarshall@apache.org>
>>>>>> wrote:
>>>>>> 
>>>>>>>> I think we lose a single linearized view.
>>>>>>> 
>>>>>>> Which linearized view are we losing, and what is the role of that
>>>>>>> linearized view? I think I might be missing why it is important. I
>>>>>>> agree that consumers won't know about each unsuccessful attempted
>>>>>>> acquisition of a bundle, but that seems like unnecessary information
>>>>>>> to broadcast to every broker in the cluster.
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> PIP-192 proposes an assignment, transfer, and split
>> protocol(multi-phase
>>>>>> state changes), relying on early broadcast across brokers, and all
>> brokers
>>>>>> react to their clients according to the state change notifications --
>>>>>> brokers could defer any client lookups for bundle x if an
>>>>>> assignment/transfer/split is ongoing for x(broadcasted early in the
>> topic).
>>>>>> One early broadcast example is the one that I discussed above, `When
>> the
>>>>>> topic broadcast is faster than the concurrent assignment requests.`
>> I think
>>>>>> the prefilter could delay this early broadcast, as it needs to go
>> through
>>>>>> the additional single-leader compaction path.
>>>>>> 
>>>>>> The bundle state recovery process is simpler by a single linearized
>> view.
>>>>>> 
>>>>>> The single linearized view can be easier to debug bundle states. We
>> can
>>>>>> more easily track where the assignment requests come from and how it
>> is
>>>>>> compacted in a single linearized view.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>>> I think the leader requires a write-through cache to compact
>> messages
>>>>>>> based
>>>>>>>> on the latest states.
>>>>>>> 
>>>>>> This brings up an important point that I would like to clarify. If we
>>>>>>> trust the write ahead log as the source of truth, what happens when
>> a
>>>>>>> bundle has been validly owned by multiple brokers? As a broker
>> starts
>>>>>>> and consumes from the compacted topic, how do we prevent it from
>>>>>>> incorrectly thinking that it owns a bundle for some short time
>> period
>>>>>>> in the case that the ownership topic hasn't yet been compacted to
>>>>>>> remove old ownership state?
>>>>>>> 
>>>>>> 
>>>>>> Since the multi-phase transfer protocol involves the source and
>>>>>> destination broker's actions, the successful transfer should get the
>> source
>>>>>> and destination broker to have the (near) latest state. For example,
>> if
>>>>>> some brokers have old ownership states(network partitioned or
>> delayed),
>>>>>> they will redirect clients to the source(old) broker. However, by the
>>>>>> transfer protocol, the source broker should have the latest state,
>> so it
>>>>>> can redirect the client again to the destination broker.
>>>>>> 
>>>>>> When a broker restarts, it won't start until its BSC state to the
>> (near)
>>>>>> latest (til the last known messageId at that time).
>>>>>> 
>>>>>> 
>>>>>>>> Pulsar guarantees "a single writer".
>>>>>>> 
>>>>>>> I didn't think we were using a single writer in the PIP 192 design.
>> I
>>>>>>> thought we had many producers sending events to a compacted topic.
>> My
>>>>>>> proposal would still have many producers, but the writer to
>> bookkeeper
>>>>>>> would act as the single writer. It would technically be distinct
>> from
>>>>>>> a normal Pulsar topic producer.
>>>>>>> 
>>>>>>> I should highlight that I am only proposing "broker filtering before
>>>>>>> write" in the context of PIP 192 and as an alternative to adding
>>>>>>> pluggable compaction strategies. It would not be a generic feature.
>>>>>>> 
>>>>>>> 
>>>>>> I was worried about the worst case where two producers(leaders)
>> happen
>>>>>> to write the compacted topic (although Pulsar can guarantee "a single
>>>>>> writer" or "a single producer" for a topic in normal situations).
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>>> Could we clarify how to handle
>>>>>>>> the following(edge cases and failure recovery)?
>>>>>>>> 0. Is the un-compacted topic a persistent topic or a
>> non-persistent
>>>>>>> topic?
>>>>>>> 
>>>>>>> It is a persistent topic.
>>>>>>> 
>>>>>>>> 1. How does the leader recover state from the two topics?
>>>>>>> 
>>>>>>> A leader would recover state by first consuming the whole compacted
>>>>>>> topic and then by consuming from the current location of a cursor on
>>>>>>> the first input topic. As stated elsewhere, this introduces latency
>>>>>>> and could be an issue.
>>>>>>> 
>>>>>>>> 2. How do we handle the case when the leader fails before writing
>>>>>>> messages
>>>>>>>> to the compacted topic
>>>>>>> 
>>>>>>> The leader would not acknowledge the message on the input topic
>> until
>>>>>>> it has successfully persisted the event on the compacted topic.
>>>>>>> Publishing the same event to a compacted topic multiple times is
>>>>>>> idempotent, so there is no risk of lost state. The real risk is
>>>>>>> latency. However, I think we might have similar (though not the
>> same)
>>>>>>> latency risks in the current solution.
>>>>>>> 
>>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the
>> number
>>>>>>> of
>>>>>>>> messages to broadcast at the expense of the leader broker
>> compaction.
>>>>>>> 
>>>>>>> My primary point is that with this PIP's design, the filter logic is
>>>>>>> run on every broker and again during topic compaction. With the
>>>>>>> alternative design, the filter is run once.
>>>>>>> 
>>>>>>> Thank you for the clarification.
>>>>>> 
>>>>>> I think the difference is that the post-filter is an optimistic
>> approach
>>>>>> as it optimistically relies on the "broadcast-filter" effect(brokers
>> will
>>>>>> defer client lookups if notified ahead that any assignment is
>> ongoing for
>>>>>> bundle x). Yes, in the worst case, if the broadcast is slower, each
>> broker
>>>>>> needs to individually compact the conflicting assignment requests.
>>>>>> 
>>>>>> Conversely, one downside of the pessimistic approach (single leader
>>>>>> pre-filter) is that when there are not many conflict concurrent
>> assignment
>>>>>> requests(assign for bundle a, assign for bundle b, assign for bundle
>> c...),
>>>>>> the requests need to redundantly go through the leader compaction.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>>> 3. initially less complex to implement (leaderless conflict
>>>>>>> resolution and
>>>>>>>> requires a single topic)
>>>>>>> 
>>>>>>> PIP 215 has its own complexity too. Coordinating filters
>>>>>>> on both the client (table view) and the server (compaction) is non
>>>>>>> trivial. The proposed API includes hard coded client configuration
>> for
>>>>>>> each component, which will make upgrading the version of the
>>>>>>> compaction strategy complicated, and could lead to incorrect
>>>>>>> interpretation of events in the stream. When a single broker is
>> doing
>>>>>>> the filtering, versioning is no longer a distributed problem. That
>>>>>>> being said, I do not mean to suggest my solution is without
>>>>>>> complexity.
>>>>>>> 
>>>>>>>> 4. it is not a "one-way door" decision (we could add the
>> pre-filter
>>>>>>> logic
>>>>>>>> as well later)
>>>>>>> 
>>>>>>> It's fair to say that we could add it later, but at that point, we
>>>>>>> will have added this new API for compaction strategy. Are we
>> confident
>>>>>>> that pluggable compaction is independently an important addition to
>>>>>>> Pulsar's
>>>>>>> features, or would it make sense to make this API only exposed in
>> the
>>>>>>> broker?
>>>>>>> 
>>>>>>> 
>>>>>> The intention is that this compaction feature could be useful for
>>>>>> complex user applications (if they are trying to do a similar
>> thing). As I
>>>>>> mentioned, this feature is closely tied to the PIP-192 now. We are
>> not
>>>>>> planning to expose this feature to users soon unless demanded and
>> proven to
>>>>>> be stable.
>>>>>> 
>>>>>> 
>>>>>>> Thanks,
>>>>>>> Michael
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
>>>>>>> <he...@streamnative.io.invalid> wrote:
>>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> 
>>>>>>>> Also, I thought about some concurrent assignment scenarios between
>>>>>>>> pre-filter vs post-filter.
>>>>>>>> 
>>>>>>>> Example 1: When the topic broadcast is slower than the concurrent
>>>>>>>> assignment requests
>>>>>>>> 
>>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics)
>>>>>>>> t1: A -> non-compacted topic // broker A published a message to
>> the
>>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A}
>>>>>>>> t2: B -> non-compacted topic // broker B published a message, m2:
>>>>>>> {broker B
>>>>>>>> assigned bundle x to broker C}
>>>>>>>> t3: C -> non-compacted topic // broker C published a message, m3:
>>>>>>> {broker C
>>>>>>>> assigned bundle x to broker B}
>>>>>>>> t4: non-compacted topic -> L // leader broker consumed the
>> messages:
>>>>>>> m1,m2,
>>>>>>>> and m3
>>>>>>>> t5: L -> compacted topic // leader compacted the messages and
>>>>>>> broadcasted
>>>>>>>> m1 to all consumers
>>>>>>>> t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>>>>>>>> 
>>>>>>>> With post-filter + a single topic
>>>>>>>> t1: A -> topic // broker A published a message to the
>> non-compacted
>>>>>>> topic,
>>>>>>>> m1: {broker A assigned bundle x to broker A}
>>>>>>>> t2: B -> topic // broker B published a message, m2: {broker B
>> assigned
>>>>>>>> bundle x to broker C}
>>>>>>>> t3: C -> topic // broker C published a message, m3: {broker C
>> assigned
>>>>>>>> bundle x to broker B}
>>>>>>>> t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2,
>>>>>>> and m3
>>>>>>>> t5: [A,B,C] -> m1 // broker A,B,C individually compacted the
>> messages
>>>>>>> to m1.
>>>>>>>> 
>>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the
>> number
>>>>>>> of
>>>>>>>> messages to broadcast at the expense of the leader broker
>> compaction.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Example 2: When the topic broadcast is faster than the concurrent
>>>>>>>> assignment requests
>>>>>>>> 
>>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics)
>>>>>>>> t1: A -> non-compacted topic // broker A published a message to
>> the
>>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A}
>>>>>>>> t2: non-compacted topic -> L // leader broker consumed the
>> messages:
>>>>>>> m1
>>>>>>>> t3: L -> compacted topic // leader compacted the message and
>>>>>>> broadcasted m1
>>>>>>>> to all consumers
>>>>>>>> t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>>>>>>>> t5: A-> own bundle // broker A knows that its assignment has been
>>>>>>> accepted,
>>>>>>>> so proceeding to own the bundle (meanwhile deferring lookup
>> requests)
>>>>>>>> t6: B -> defer client lookups // broker B knows that bundle
>>>>>>> assignment is
>>>>>>>> running(meanwhile deferring lookup requests)
>>>>>>>> t7: C -> defer client lookups // broker C knows that bundle
>>>>>>> assignment is
>>>>>>>> running(meanwhile deferring lookup requests)
>>>>>>>> 
>>>>>>>> With post-filter + a single topic
>>>>>>>> t1: A -> topic // broker A published a message to the
>> non-compacted
>>>>>>> topic,
>>>>>>>> m1: {broker A assigned bundle x to broker A}
>>>>>>>> t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
>>>>>>>> t3:  A-> own bundle // broker A knows that its assignment has been
>>>>>>>> accepted, so proceeding to own the bundle (meanwhile deferring
>> lookup
>>>>>>>> requests)
>>>>>>>> t4: B -> defer client lookups // broker B knows that bundle
>>>>>>> assignment is
>>>>>>>> running(meanwhile deferring lookup requests)
>>>>>>>> t5: C -> defer client lookups // broker C knows that bundle
>>>>>>> assignment is
>>>>>>>> running(meanwhile deferring lookup requests)
>>>>>>>> 
>>>>>>>> Analysis: The "post-filter + a single topic" can perform ok in
>> this
>>>>>>> case
>>>>>>>> without the additional leader coordination and the secondary topic
>>>>>>> because
>>>>>>>> the early broadcast can inform all brokers and prevent them from
>>>>>>> requesting
>>>>>>>> other assignments for the same bundle.
>>>>>>>> 
>>>>>>>> I think the post-filter option is initially not bad because:
>>>>>>>> 
>>>>>>>> 1. it is safe in the worst case (in case the messages are not
>>>>>>> correctly
>>>>>>>> pre-filtered at the leader)
>>>>>>>> 2. it performs ok because the early broadcast can prevent
>>>>>>>> concurrent assignment requests.
>>>>>>>> 3. initially less complex to implement (leaderless conflict
>>>>>>> resolution and
>>>>>>>> requires a single topic)
>>>>>>>> 4. it is not a "one-way door" decision (we could add the
>> pre-filter
>>>>>>> logic
>>>>>>>> as well later)
>>>>>>>> 
>>>>>>>> Regards,
>>>>>>>> Heesung
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
>>>>>>> heesung.sohn@streamnative.io>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi Michael,
>>>>>>>>> 
>>>>>>>>> For the pre-prefilter(pre-compaction) option,
>>>>>>>>> I think the leader requires a write-through cache to compact
>>>>>>> messages
>>>>>>>>> based on the latest states. Otherwise, the leader needs to wait
>> for
>>>>>>> the
>>>>>>>>> last msg from the (compacted) topic before compacting the next
>> msg
>>>>>>> for the
>>>>>>>>> same bundle.
>>>>>>>>> 
>>>>>>>>> Pulsar guarantees "a single writer". However, for the worst-case
>>>>>>>>> scenario(due to network partitions, bugs in zk or etcd leader
>>>>>>> election,
>>>>>>>>> bugs in bk, data corruption ), I think it is safe to place the
>>>>>>> post-filter
>>>>>>>>> on the consumer side(compaction and table views) as well in
>> order to
>>>>>>>>> validate the state changes.
>>>>>>>>> 
>>>>>>>>> For the two-topic approach,
>>>>>>>>> I think we lose a single linearized view. Could we clarify how
>> to
>>>>>>> handle
>>>>>>>>> the following(edge cases and failure recovery)?
>>>>>>>>> 0. Is the un-compacted topic a persistent topic or a
>> non-persistent
>>>>>>> topic?
>>>>>>>>> 1. How does the leader recover state from the two topics?
>>>>>>>>> 2. How do we handle the case when the leader fails before
>> writing
>>>>>>> messages
>>>>>>>>> to the compacted topic
>>>>>>>>> 
>>>>>>>>> Regards,
>>>>>>>>> Heesung
>>>>>>>>> 
>>>>>>>>> On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
>>>>>>> mmarshall@apache.org>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Sharing some more thoughts. We could alternatively use two
>> topics
>>>>>>>>>> instead of one. In this design, the first topic is the
>> unfiltered
>>>>>>>>>> write ahead log that represents many writers (brokers) trying
>> to
>>>>>>>>>> acquire ownership of bundles. The second topic is the
>> distilled log
>>>>>>>>>> that represents the "winners" or the "owners" of the bundles.
>>>>>>> There is
>>>>>>>>>> a single writer, the leader broker, that reads from the input
>> topic
>>>>>>>>>> and writes to the output topic. The first topic is normal and
>> the
>>>>>>>>>> second is compacted.
>>>>>>>>>> 
>>>>>>>>>> The primary benefit in a two topic solution is that it is easy
>> for
>>>>>>> the
>>>>>>>>>> leader broker to trade off ownership without needing to slow
>> down
>>>>>>>>>> writes to the input topic. The leader broker will start
>> consuming
>>>>>>> from
>>>>>>>>>> the input topic when it has fully consumed the table view on
>> the
>>>>>>>>>> output topic. In general, I don't think consumers know when
>> they
>>>>>>> have
>>>>>>>>>> "reached the end of a table view", but we should be able to
>>>>>>> trivially
>>>>>>>>>> figure this out if we are the topic's only writer and the
>> topic and
>>>>>>>>>> writer are collocated on the same broker.
>>>>>>>>>> 
>>>>>>>>>> In that design, it might make sense to use something like the
>>>>>>>>>> replication cursor to keep track of this consumer's state.
>>>>>>>>>> 
>>>>>>>>>> - Michael
>>>>>>>>>> 
>>>>>>>>>> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
>>>>>>> mmarshall@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Thanks for your proposal, Heesung.
>>>>>>>>>>> 
>>>>>>>>>>> Fundamentally, we have the problems listed in this PIP
>> because
>>>>>>> we have
>>>>>>>>>>> multiple writers instead of just one writer. Can we solve
>> this
>>>>>>> problem
>>>>>>>>>>> by changing our write pattern? What if we use the leader
>> broker
>>>>>>> as the
>>>>>>>>>>> single writer? That broker would intercept attempts to
>> acquire
>>>>>>>>>>> ownership on bundles and would grant ownership to the first
>>>>>>> broker to
>>>>>>>>>>> claim an unassigned bundle. It could "grant ownership" by
>>>>>>> letting the
>>>>>>>>>>> first write to claim an unassigned bundle get written to the
>>>>>>> ownership
>>>>>>>>>>> topic. When a bundle is already owned, the leader won't
>> persist
>>>>>>> that
>>>>>>>>>>> event to the bookkeeper. In this design, the log becomes a
>> true
>>>>>>>>>>> ownership log, which will correctly work with the existing
>> topic
>>>>>>>>>>> compaction and table view solutions. My proposal essentially
>>>>>>> moves the
>>>>>>>>>>> conflict resolution to just before the write, and as a
>>>>>>> consequence, it
>>>>>>>>>>> greatly reduces the need for post processing of the event
>> log.
>>>>>>> One
>>>>>>>>>>> trade off might be that the leader broker could slow down the
>>>>>>> write
>>>>>>>>>>> path, but given that the leader would just need to verify the
>>>>>>> current
>>>>>>>>>>> state of the bundle, I think it'd be performant enough.
>>>>>>>>>>> 
>>>>>>>>>>> Additionally, we'd need the leader broker to be "caught up"
>> on
>>>>>>> bundle
>>>>>>>>>>> ownership in order to grant ownership of topics, but unless
>> I am
>>>>>>>>>>> mistaken, that is already a requirement of the current PIP
>> 192
>>>>>>>>>>> paradigm.
>>>>>>>>>>> 
>>>>>>>>>>> Below are some additional thoughts that will be relevant if
>> we
>>>>>>> move
>>>>>>>>>>> forward with the design as it is currently proposed.
>>>>>>>>>>> 
>>>>>>>>>>> I think it might be helpful to update the title to show that
>> this
>>>>>>>>>>> proposal will also affect table view as well. I didn't catch
>>>>>>> that at
>>>>>>>>>>> first.
>>>>>>>>>>> 
>>>>>>>>>>> Do you have any documentation describing how the
>>>>>>>>>>> TopicCompactionStrategy will determine which states are
>> valid in
>>>>>>> the
>>>>>>>>>>> context of load balancing? I looked at
>>>>>>>>>>> https://github.com/apache/pulsar/pull/18195, but I couldn't
>>>>>>> seem to
>>>>>>>>>>> find anything for it. That would help make this proposal less
>>>>>>>>>>> abstract.
>>>>>>>>>>> 
>>>>>>>>>>> The proposed API seems very tied to the needs of PIP 192. For
>>>>>>> example,
>>>>>>>>>>> `isValid` is not a term I associate with topic compaction.
>> The
>>>>>>>>>>> fundamental question for compaction is which value to keep
>> (or
>>>>>>> build a
>>>>>>>>>>> new value). I think we might be able to simplify the API by
>>>>>>> replacing
>>>>>>>>>>> the "isValid", "isMergeEnabled", and "merge" methods with a
>>>>>>> single
>>>>>>>>>>> method that lets the implementation handle one or all tasks.
>> That
>>>>>>>>>>> would also remove the need to deserialize payloads multiple
>>>>>>> times too.
>>>>>>>>>>> 
>>>>>>>>>>> I also feel like mentioning that after working with the PIP
>> 105
>>>>>>> broker
>>>>>>>>>>> side filtering, I think we should avoid running UDFs in the
>>>>>>> broker as
>>>>>>>>>>> much as possible. (I do not consider the load balancing
>> logic to
>>>>>>> be a
>>>>>>>>>>> UDF here.) I think it would be worth not making this a user
>>>>>>> facing
>>>>>>>>>>> feature unless there is demand for real use cases.
>>>>>>>>>>> 
>>>>>>>>>>> Thanks!
>>>>>>>>>>> Michael
>>>>>>>>>>> 
>>>>>>>>>>> On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org>
>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> +1(non-binding)
>>>>>>>>>>>> 
>>>>>>>>>>>> thanks,
>>>>>>>>>>>> bo
>>>>>>>>>>>> 
>>>>>>>>>>>> Heesung Sohn <he...@streamnative.io.invalid>
>>>>>>> 于2022年10月19日周三
>>>>>>>>>> 07:54写道:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi pulsar-dev community,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I raised a pip to discuss : PIP-215: Configurable Topic
>>>>>>> Compaction
>>>>>>>>>> Strategy
>>>>>>>>>>>>> 
>>>>>>>>>>>>> PIP link: https://github.com/apache/pulsar/issues/18099
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Heesung
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>> 


Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Hi,

I think `bool shouldKeepLeft(T prev, T cur)` is clearer. I updated the PIP.

Hopefully, we provided enough context about this PIP and the design
trade-off as well.

I will send out a vote email soon.

Thank you,
Heesung






On Thu, Nov 3, 2022 at 9:59 PM Michael Marshall <mm...@apache.org>
wrote:

> Thank you for your detailed responses, Heesung.
>
> > We are not planning to expose this feature to users
> > soon unless demanded and proven to be stable.
>
> In that case, I think we should move forward with this PIP. I have a
> different opinion about the trade offs for the two designs, but none
> of my concerns are problems that could not be solved later if we
> encounter problems.
>
> Just to say it succinctly, my concern is that broadcasting all
> attempts to acquire ownership of every unclaimed bundle to all brokers
> will generate a lot of unnecessary traffic.
>
> >
> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a
>
> Thank you for this reference. I missed it. That is great documentation!
>
> > In fact, with the `compact(T prev, T cur)` api only, it is not clear if
> > prev => cur is a valid transition or not(if invalid, we should filter out
> > the cur message instead of further compacting/merging). I think we still
> > need to keep the `isValid()` and `merge()` separated.
>
> I was thinking that the result of `compact` would be the result put in
> the table view or written to the compacted topic. The one issue might
> be about keeping the memory utilization down for use cases that are
> not updating the message's value but are only selecting "left" or
> "right". I thought we could infer when to keep the message id vs keep
> the message value, but that might be easy to implement.
>
> My final critique is that I think `isValid` could have a better name.
> In the event this does become a public API, I don't think all use
> cases will think about which event should be persisted in terms of
> validity.
>
> The main name that comes to my mind is `bool shouldKeepLeft(T prev, T
> cur)`. When true, prev wins. When false, cur wins. That nomenclature
> comes from Akka Streams. It's not perfect, but it is easy to infer
> what the result will do.
>
> > Regarding redundant deserialization, the input type `T` is the type of
> > message value, so the input values are already deserialized.
>
> Great, I should have realized that. That takes care of that concern.
>
> Thanks,
> Michael
>
> On Thu, Nov 3, 2022 at 7:37 PM Heesung Sohn
> <he...@streamnative.io.invalid> wrote:
> >
> > Hi,
> >
> > I have a different thought about my previous comment.
> >
> > - Agreed with your point that we should merge CompactionStrategy APIs. I
> > updated the interface proposal in the PIP. I replaced `"isValid",
> > "isMergeEnabled", and "merge"` apis with "compact" api.
> >
> > boolean isValid(T prev, T cur)
> > boolean isMergeEnabled()
> > T merge(T prev, T cur)
> >
> > =>
> >
> > T compact(T prev, T cur)
> >
> > In fact, with the `compact(T prev, T cur)` api only, it is not clear if
> > prev => cur is a valid transition or not(if invalid, we should filter out
> > the cur message instead of further compacting/merging). I think we still
> > need to keep the `isValid()` and `merge()` separated.
> >
> > Regarding redundant deserialization, the input type `T` is the type of
> > message value, so the input values are already deserialized. We don't
> want
> > to expose the Message<T> interface in this CompactionStrategy to avoid
> > message serialization/deserialization dependencies in the
> > CompactionStrategy.
> >
> > The `merge()` functionality is suggested for more complex use cases
> (merge
> > values instead of just filtering), and to support this `merge()`, we need
> > to internally create a new msg with the compacted value, metadata, and
> > messageId copies. We could initially define `isValid()` only in
> > CompactionStrategy, and add `isMergeEnabled() and merge()` later in the
> > CompactionStrategy interface if requested.
> >
> > Regards,
> > Heesung
> >
> >
> > On Thu, Nov 3, 2022 at 9:50 AM Heesung Sohn <
> heesung.sohn@streamnative.io>
> > wrote:
> >
> > > Oops! Michael, I apologize for the typo in your name.
> > >
> > > On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <
> heesung.sohn@streamnative.io>
> > > wrote:
> > >
> > >> Hi Machel,
> > >>
> > >> Here are my additional comments regarding your earlier email.
> > >>
> > >> - I updated the PIP title to show that this will impact table view as
> > >> well.
> > >>
> > >> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the
> > >> general idea of the states and their actions, and I defined the actual
> > >> states here in the PR,
> > >>
> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a.
> I
> > >> will further clarify the bundle state data validation logic when
> > >> introducing `BundleStateCompactionStrategy` class. This PIP is to
> support
> > >> CompactionStrategy in general.
> > >>
> > >> - Agreed with your point that we should merge CompactionStrategy
> APIs. I
> > >> updated the interface proposal in the PIP. I replaced `"isValid",
> > >> "isMergeEnabled", and "merge"` apis with "compact" api.
> > >>
> > >>
> > >> Thanks,
> > >> Heesung
> > >>
> > >>
> > >> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <
> > >> heesung.sohn@streamnative.io> wrote:
> > >>
> > >>> Hi,
> > >>> Thank you for the great comments.
> > >>> Please find my comments inline too.
> > >>>
> > >>> Regards,
> > >>> Heesung
> > >>>
> > >>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <
> mmarshall@apache.org>
> > >>> wrote:
> > >>>
> > >>>> > I think we lose a single linearized view.
> > >>>>
> > >>>> Which linearized view are we losing, and what is the role of that
> > >>>> linearized view? I think I might be missing why it is important. I
> > >>>> agree that consumers won't know about each unsuccessful attempted
> > >>>> acquisition of a bundle, but that seems like unnecessary information
> > >>>> to broadcast to every broker in the cluster.
> > >>>>
> > >>>>
> > >>>
> > >>> PIP-192 proposes an assignment, transfer, and split
> protocol(multi-phase
> > >>> state changes), relying on early broadcast across brokers, and all
> brokers
> > >>> react to their clients according to the state change notifications --
> > >>> brokers could defer any client lookups for bundle x if an
> > >>> assignment/transfer/split is ongoing for x(broadcasted early in the
> topic).
> > >>> One early broadcast example is the one that I discussed above, `When
> the
> > >>> topic broadcast is faster than the concurrent assignment requests.`
> I think
> > >>> the prefilter could delay this early broadcast, as it needs to go
> through
> > >>> the additional single-leader compaction path.
> > >>>
> > >>> The bundle state recovery process is simpler by a single linearized
> view.
> > >>>
> > >>> The single linearized view can be easier to debug bundle states. We
> can
> > >>> more easily track where the assignment requests come from and how it
> is
> > >>> compacted in a single linearized view.
> > >>>
> > >>>
> > >>>
> > >>>> > I think the leader requires a write-through cache to compact
> messages
> > >>>> based
> > >>>> > on the latest states.
> > >>>>
> > >>> This brings up an important point that I would like to clarify. If we
> > >>>> trust the write ahead log as the source of truth, what happens when
> a
> > >>>> bundle has been validly owned by multiple brokers? As a broker
> starts
> > >>>> and consumes from the compacted topic, how do we prevent it from
> > >>>> incorrectly thinking that it owns a bundle for some short time
> period
> > >>>> in the case that the ownership topic hasn't yet been compacted to
> > >>>> remove old ownership state?
> > >>>>
> > >>>
> > >>> Since the multi-phase transfer protocol involves the source and
> > >>> destination broker's actions, the successful transfer should get the
> source
> > >>> and destination broker to have the (near) latest state. For example,
> if
> > >>> some brokers have old ownership states(network partitioned or
> delayed),
> > >>> they will redirect clients to the source(old) broker. However, by the
> > >>> transfer protocol, the source broker should have the latest state,
> so it
> > >>> can redirect the client again to the destination broker.
> > >>>
> > >>> When a broker restarts, it won't start until its BSC state to the
> (near)
> > >>> latest (til the last known messageId at that time).
> > >>>
> > >>>
> > >>>> > Pulsar guarantees "a single writer".
> > >>>>
> > >>>> I didn't think we were using a single writer in the PIP 192 design.
> I
> > >>>> thought we had many producers sending events to a compacted topic.
> My
> > >>>> proposal would still have many producers, but the writer to
> bookkeeper
> > >>>> would act as the single writer. It would technically be distinct
> from
> > >>>> a normal Pulsar topic producer.
> > >>>>
> > >>>> I should highlight that I am only proposing "broker filtering before
> > >>>> write" in the context of PIP 192 and as an alternative to adding
> > >>>> pluggable compaction strategies. It would not be a generic feature.
> > >>>>
> > >>>>
> > >>> I was worried about the worst case where two producers(leaders)
> happen
> > >>> to write the compacted topic (although Pulsar can guarantee "a single
> > >>> writer" or "a single producer" for a topic in normal situations).
> > >>>
> > >>>
> > >>>
> > >>>> > Could we clarify how to handle
> > >>>> > the following(edge cases and failure recovery)?
> > >>>> > 0. Is the un-compacted topic a persistent topic or a
> non-persistent
> > >>>> topic?
> > >>>>
> > >>>> It is a persistent topic.
> > >>>>
> > >>>> > 1. How does the leader recover state from the two topics?
> > >>>>
> > >>>> A leader would recover state by first consuming the whole compacted
> > >>>> topic and then by consuming from the current location of a cursor on
> > >>>> the first input topic. As stated elsewhere, this introduces latency
> > >>>> and could be an issue.
> > >>>>
> > >>>> > 2. How do we handle the case when the leader fails before writing
> > >>>> messages
> > >>>> > to the compacted topic
> > >>>>
> > >>>> The leader would not acknowledge the message on the input topic
> until
> > >>>> it has successfully persisted the event on the compacted topic.
> > >>>> Publishing the same event to a compacted topic multiple times is
> > >>>> idempotent, so there is no risk of lost state. The real risk is
> > >>>> latency. However, I think we might have similar (though not the
> same)
> > >>>> latency risks in the current solution.
> > >>>>
> > >>>> > Analysis: the "pre-filter + two-topics" option can reduce the
> number
> > >>>> of
> > >>>> > messages to broadcast at the expense of the leader broker
> compaction.
> > >>>>
> > >>>> My primary point is that with this PIP's design, the filter logic is
> > >>>> run on every broker and again during topic compaction. With the
> > >>>> alternative design, the filter is run once.
> > >>>>
> > >>>> Thank you for the clarification.
> > >>>
> > >>> I think the difference is that the post-filter is an optimistic
> approach
> > >>> as it optimistically relies on the "broadcast-filter" effect(brokers
> will
> > >>> defer client lookups if notified ahead that any assignment is
> ongoing for
> > >>> bundle x). Yes, in the worst case, if the broadcast is slower, each
> broker
> > >>> needs to individually compact the conflicting assignment requests.
> > >>>
> > >>> Conversely, one downside of the pessimistic approach (single leader
> > >>> pre-filter) is that when there are not many conflict concurrent
> assignment
> > >>> requests(assign for bundle a, assign for bundle b, assign for bundle
> c...),
> > >>> the requests need to redundantly go through the leader compaction.
> > >>>
> > >>>
> > >>>
> > >>>> > 3. initially less complex to implement (leaderless conflict
> > >>>> resolution and
> > >>>> > requires a single topic)
> > >>>>
> > >>>> PIP 215 has its own complexity too. Coordinating filters
> > >>>> on both the client (table view) and the server (compaction) is non
> > >>>> trivial. The proposed API includes hard coded client configuration
> for
> > >>>> each component, which will make upgrading the version of the
> > >>>> compaction strategy complicated, and could lead to incorrect
> > >>>> interpretation of events in the stream. When a single broker is
> doing
> > >>>> the filtering, versioning is no longer a distributed problem. That
> > >>>> being said, I do not mean to suggest my solution is without
> > >>>> complexity.
> > >>>>
> > >>>> > 4. it is not a "one-way door" decision (we could add the
> pre-filter
> > >>>> logic
> > >>>> > as well later)
> > >>>>
> > >>>> It's fair to say that we could add it later, but at that point, we
> > >>>> will have added this new API for compaction strategy. Are we
> confident
> > >>>> that pluggable compaction is independently an important addition to
> > >>>> Pulsar's
> > >>>> features, or would it make sense to make this API only exposed in
> the
> > >>>> broker?
> > >>>>
> > >>>>
> > >>> The intention is that this compaction feature could be useful for
> > >>> complex user applications (if they are trying to do a similar
> thing). As I
> > >>> mentioned, this feature is closely tied to the PIP-192 now. We are
> not
> > >>> planning to expose this feature to users soon unless demanded and
> proven to
> > >>> be stable.
> > >>>
> > >>>
> > >>>> Thanks,
> > >>>> Michael
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
> > >>>> <he...@streamnative.io.invalid> wrote:
> > >>>> >
> > >>>> > Hi,
> > >>>> >
> > >>>> > Also, I thought about some concurrent assignment scenarios between
> > >>>> > pre-filter vs post-filter.
> > >>>> >
> > >>>> > Example 1: When the topic broadcast is slower than the concurrent
> > >>>> > assignment requests
> > >>>> >
> > >>>> > With pre-filter + two-topics (non-compacted and compacted topics)
> > >>>> > t1: A -> non-compacted topic // broker A published a message to
> the
> > >>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
> > >>>> > t2: B -> non-compacted topic // broker B published a message, m2:
> > >>>> {broker B
> > >>>> > assigned bundle x to broker C}
> > >>>> > t3: C -> non-compacted topic // broker C published a message, m3:
> > >>>> {broker C
> > >>>> > assigned bundle x to broker B}
> > >>>> > t4: non-compacted topic -> L // leader broker consumed the
> messages:
> > >>>> m1,m2,
> > >>>> > and m3
> > >>>> > t5: L -> compacted topic // leader compacted the messages and
> > >>>> broadcasted
> > >>>> > m1 to all consumers
> > >>>> > t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
> > >>>> >
> > >>>> > With post-filter + a single topic
> > >>>> > t1: A -> topic // broker A published a message to the
> non-compacted
> > >>>> topic,
> > >>>> > m1: {broker A assigned bundle x to broker A}
> > >>>> > t2: B -> topic // broker B published a message, m2: {broker B
> assigned
> > >>>> > bundle x to broker C}
> > >>>> > t3: C -> topic // broker C published a message, m3: {broker C
> assigned
> > >>>> > bundle x to broker B}
> > >>>> > t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2,
> > >>>> and m3
> > >>>> > t5: [A,B,C] -> m1 // broker A,B,C individually compacted the
> messages
> > >>>> to m1.
> > >>>> >
> > >>>> > Analysis: the "pre-filter + two-topics" option can reduce the
> number
> > >>>> of
> > >>>> > messages to broadcast at the expense of the leader broker
> compaction.
> > >>>> >
> > >>>> >
> > >>>> > Example 2: When the topic broadcast is faster than the concurrent
> > >>>> > assignment requests
> > >>>> >
> > >>>> > With pre-filter + two-topics (non-compacted and compacted topics)
> > >>>> > t1: A -> non-compacted topic // broker A published a message to
> the
> > >>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
> > >>>> > t2: non-compacted topic -> L // leader broker consumed the
> messages:
> > >>>> m1
> > >>>> > t3: L -> compacted topic // leader compacted the message and
> > >>>> broadcasted m1
> > >>>> > to all consumers
> > >>>> > t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
> > >>>> > t5: A-> own bundle // broker A knows that its assignment has been
> > >>>> accepted,
> > >>>> > so proceeding to own the bundle (meanwhile deferring lookup
> requests)
> > >>>> > t6: B -> defer client lookups // broker B knows that bundle
> > >>>> assignment is
> > >>>> > running(meanwhile deferring lookup requests)
> > >>>> > t7: C -> defer client lookups // broker C knows that bundle
> > >>>> assignment is
> > >>>> > running(meanwhile deferring lookup requests)
> > >>>> >
> > >>>> > With post-filter + a single topic
> > >>>> > t1: A -> topic // broker A published a message to the
> non-compacted
> > >>>> topic,
> > >>>> > m1: {broker A assigned bundle x to broker A}
> > >>>> > t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
> > >>>> > t3:  A-> own bundle // broker A knows that its assignment has been
> > >>>> > accepted, so proceeding to own the bundle (meanwhile deferring
> lookup
> > >>>> > requests)
> > >>>> > t4: B -> defer client lookups // broker B knows that bundle
> > >>>> assignment is
> > >>>> > running(meanwhile deferring lookup requests)
> > >>>> > t5: C -> defer client lookups // broker C knows that bundle
> > >>>> assignment is
> > >>>> > running(meanwhile deferring lookup requests)
> > >>>> >
> > >>>> > Analysis: The "post-filter + a single topic" can perform ok in
> this
> > >>>> case
> > >>>> > without the additional leader coordination and the secondary topic
> > >>>> because
> > >>>> > the early broadcast can inform all brokers and prevent them from
> > >>>> requesting
> > >>>> > other assignments for the same bundle.
> > >>>> >
> > >>>> > I think the post-filter option is initially not bad because:
> > >>>> >
> > >>>> > 1. it is safe in the worst case (in case the messages are not
> > >>>> correctly
> > >>>> > pre-filtered at the leader)
> > >>>> > 2. it performs ok because the early broadcast can prevent
> > >>>> > concurrent assignment requests.
> > >>>> > 3. initially less complex to implement (leaderless conflict
> > >>>> resolution and
> > >>>> > requires a single topic)
> > >>>> > 4. it is not a "one-way door" decision (we could add the
> pre-filter
> > >>>> logic
> > >>>> > as well later)
> > >>>> >
> > >>>> > Regards,
> > >>>> > Heesung
> > >>>> >
> > >>>> >
> > >>>> >
> > >>>> > On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
> > >>>> heesung.sohn@streamnative.io>
> > >>>> > wrote:
> > >>>> >
> > >>>> > > Hi Michael,
> > >>>> > >
> > >>>> > > For the pre-prefilter(pre-compaction) option,
> > >>>> > > I think the leader requires a write-through cache to compact
> > >>>> messages
> > >>>> > > based on the latest states. Otherwise, the leader needs to wait
> for
> > >>>> the
> > >>>> > > last msg from the (compacted) topic before compacting the next
> msg
> > >>>> for the
> > >>>> > > same bundle.
> > >>>> > >
> > >>>> > > Pulsar guarantees "a single writer". However, for the worst-case
> > >>>> > > scenario(due to network partitions, bugs in zk or etcd leader
> > >>>> election,
> > >>>> > > bugs in bk, data corruption ), I think it is safe to place the
> > >>>> post-filter
> > >>>> > > on the consumer side(compaction and table views) as well in
> order to
> > >>>> > > validate the state changes.
> > >>>> > >
> > >>>> > > For the two-topic approach,
> > >>>> > > I think we lose a single linearized view. Could we clarify how
> to
> > >>>> handle
> > >>>> > > the following(edge cases and failure recovery)?
> > >>>> > > 0. Is the un-compacted topic a persistent topic or a
> non-persistent
> > >>>> topic?
> > >>>> > > 1. How does the leader recover state from the two topics?
> > >>>> > > 2. How do we handle the case when the leader fails before
> writing
> > >>>> messages
> > >>>> > > to the compacted topic
> > >>>> > >
> > >>>> > > Regards,
> > >>>> > > Heesung
> > >>>> > >
> > >>>> > > On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
> > >>>> mmarshall@apache.org>
> > >>>> > > wrote:
> > >>>> > >
> > >>>> > >> Sharing some more thoughts. We could alternatively use two
> topics
> > >>>> > >> instead of one. In this design, the first topic is the
> unfiltered
> > >>>> > >> write ahead log that represents many writers (brokers) trying
> to
> > >>>> > >> acquire ownership of bundles. The second topic is the
> distilled log
> > >>>> > >> that represents the "winners" or the "owners" of the bundles.
> > >>>> There is
> > >>>> > >> a single writer, the leader broker, that reads from the input
> topic
> > >>>> > >> and writes to the output topic. The first topic is normal and
> the
> > >>>> > >> second is compacted.
> > >>>> > >>
> > >>>> > >> The primary benefit in a two topic solution is that it is easy
> for
> > >>>> the
> > >>>> > >> leader broker to trade off ownership without needing to slow
> down
> > >>>> > >> writes to the input topic. The leader broker will start
> consuming
> > >>>> from
> > >>>> > >> the input topic when it has fully consumed the table view on
> the
> > >>>> > >> output topic. In general, I don't think consumers know when
> they
> > >>>> have
> > >>>> > >> "reached the end of a table view", but we should be able to
> > >>>> trivially
> > >>>> > >> figure this out if we are the topic's only writer and the
> topic and
> > >>>> > >> writer are collocated on the same broker.
> > >>>> > >>
> > >>>> > >> In that design, it might make sense to use something like the
> > >>>> > >> replication cursor to keep track of this consumer's state.
> > >>>> > >>
> > >>>> > >> - Michael
> > >>>> > >>
> > >>>> > >> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
> > >>>> mmarshall@apache.org>
> > >>>> > >> wrote:
> > >>>> > >> >
> > >>>> > >> > Thanks for your proposal, Heesung.
> > >>>> > >> >
> > >>>> > >> > Fundamentally, we have the problems listed in this PIP
> because
> > >>>> we have
> > >>>> > >> > multiple writers instead of just one writer. Can we solve
> this
> > >>>> problem
> > >>>> > >> > by changing our write pattern? What if we use the leader
> broker
> > >>>> as the
> > >>>> > >> > single writer? That broker would intercept attempts to
> acquire
> > >>>> > >> > ownership on bundles and would grant ownership to the first
> > >>>> broker to
> > >>>> > >> > claim an unassigned bundle. It could "grant ownership" by
> > >>>> letting the
> > >>>> > >> > first write to claim an unassigned bundle get written to the
> > >>>> ownership
> > >>>> > >> > topic. When a bundle is already owned, the leader won't
> persist
> > >>>> that
> > >>>> > >> > event to the bookkeeper. In this design, the log becomes a
> true
> > >>>> > >> > ownership log, which will correctly work with the existing
> topic
> > >>>> > >> > compaction and table view solutions. My proposal essentially
> > >>>> moves the
> > >>>> > >> > conflict resolution to just before the write, and as a
> > >>>> consequence, it
> > >>>> > >> > greatly reduces the need for post processing of the event
> log.
> > >>>> One
> > >>>> > >> > trade off might be that the leader broker could slow down the
> > >>>> write
> > >>>> > >> > path, but given that the leader would just need to verify the
> > >>>> current
> > >>>> > >> > state of the bundle, I think it'd be performant enough.
> > >>>> > >> >
> > >>>> > >> > Additionally, we'd need the leader broker to be "caught up"
> on
> > >>>> bundle
> > >>>> > >> > ownership in order to grant ownership of topics, but unless
> I am
> > >>>> > >> > mistaken, that is already a requirement of the current PIP
> 192
> > >>>> > >> > paradigm.
> > >>>> > >> >
> > >>>> > >> > Below are some additional thoughts that will be relevant if
> we
> > >>>> move
> > >>>> > >> > forward with the design as it is currently proposed.
> > >>>> > >> >
> > >>>> > >> > I think it might be helpful to update the title to show that
> this
> > >>>> > >> > proposal will also affect table view as well. I didn't catch
> > >>>> that at
> > >>>> > >> > first.
> > >>>> > >> >
> > >>>> > >> > Do you have any documentation describing how the
> > >>>> > >> > TopicCompactionStrategy will determine which states are
> valid in
> > >>>> the
> > >>>> > >> > context of load balancing? I looked at
> > >>>> > >> > https://github.com/apache/pulsar/pull/18195, but I couldn't
> > >>>> seem to
> > >>>> > >> > find anything for it. That would help make this proposal less
> > >>>> > >> > abstract.
> > >>>> > >> >
> > >>>> > >> > The proposed API seems very tied to the needs of PIP 192. For
> > >>>> example,
> > >>>> > >> > `isValid` is not a term I associate with topic compaction.
> The
> > >>>> > >> > fundamental question for compaction is which value to keep
> (or
> > >>>> build a
> > >>>> > >> > new value). I think we might be able to simplify the API by
> > >>>> replacing
> > >>>> > >> > the "isValid", "isMergeEnabled", and "merge" methods with a
> > >>>> single
> > >>>> > >> > method that lets the implementation handle one or all tasks.
> That
> > >>>> > >> > would also remove the need to deserialize payloads multiple
> > >>>> times too.
> > >>>> > >> >
> > >>>> > >> > I also feel like mentioning that after working with the PIP
> 105
> > >>>> broker
> > >>>> > >> > side filtering, I think we should avoid running UDFs in the
> > >>>> broker as
> > >>>> > >> > much as possible. (I do not consider the load balancing
> logic to
> > >>>> be a
> > >>>> > >> > UDF here.) I think it would be worth not making this a user
> > >>>> facing
> > >>>> > >> > feature unless there is demand for real use cases.
> > >>>> > >> >
> > >>>> > >> > Thanks!
> > >>>> > >> > Michael
> > >>>> > >> >
> > >>>> > >> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org>
> wrote:
> > >>>> > >> > >
> > >>>> > >> > > +1(non-binding)
> > >>>> > >> > >
> > >>>> > >> > > thanks,
> > >>>> > >> > > bo
> > >>>> > >> > >
> > >>>> > >> > > Heesung Sohn <he...@streamnative.io.invalid>
> > >>>> 于2022年10月19日周三
> > >>>> > >> 07:54写道:
> > >>>> > >> > > >
> > >>>> > >> > > > Hi pulsar-dev community,
> > >>>> > >> > > >
> > >>>> > >> > > > I raised a pip to discuss : PIP-215: Configurable Topic
> > >>>> Compaction
> > >>>> > >> Strategy
> > >>>> > >> > > >
> > >>>> > >> > > > PIP link: https://github.com/apache/pulsar/issues/18099
> > >>>> > >> > > >
> > >>>> > >> > > > Regards,
> > >>>> > >> > > > Heesung
> > >>>> > >>
> > >>>> > >
> > >>>>
> > >>>
>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Michael Marshall <mm...@apache.org>.
Thank you for your detailed responses, Heesung.

> We are not planning to expose this feature to users
> soon unless demanded and proven to be stable.

In that case, I think we should move forward with this PIP. I have a
different opinion about the trade offs for the two designs, but none
of my concerns are problems that could not be solved later if we
encounter problems.

Just to say it succinctly, my concern is that broadcasting all
attempts to acquire ownership of every unclaimed bundle to all brokers
will generate a lot of unnecessary traffic.

> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a

Thank you for this reference. I missed it. That is great documentation!

> In fact, with the `compact(T prev, T cur)` api only, it is not clear if
> prev => cur is a valid transition or not(if invalid, we should filter out
> the cur message instead of further compacting/merging). I think we still
> need to keep the `isValid()` and `merge()` separated.

I was thinking that the result of `compact` would be the result put in
the table view or written to the compacted topic. The one issue might
be about keeping the memory utilization down for use cases that are
not updating the message's value but are only selecting "left" or
"right". I thought we could infer when to keep the message id vs keep
the message value, but that might be easy to implement.

My final critique is that I think `isValid` could have a better name.
In the event this does become a public API, I don't think all use
cases will think about which event should be persisted in terms of
validity.

The main name that comes to my mind is `bool shouldKeepLeft(T prev, T
cur)`. When true, prev wins. When false, cur wins. That nomenclature
comes from Akka Streams. It's not perfect, but it is easy to infer
what the result will do.

> Regarding redundant deserialization, the input type `T` is the type of
> message value, so the input values are already deserialized.

Great, I should have realized that. That takes care of that concern.

Thanks,
Michael

On Thu, Nov 3, 2022 at 7:37 PM Heesung Sohn
<he...@streamnative.io.invalid> wrote:
>
> Hi,
>
> I have a different thought about my previous comment.
>
> - Agreed with your point that we should merge CompactionStrategy APIs. I
> updated the interface proposal in the PIP. I replaced `"isValid",
> "isMergeEnabled", and "merge"` apis with "compact" api.
>
> boolean isValid(T prev, T cur)
> boolean isMergeEnabled()
> T merge(T prev, T cur)
>
> =>
>
> T compact(T prev, T cur)
>
> In fact, with the `compact(T prev, T cur)` api only, it is not clear if
> prev => cur is a valid transition or not(if invalid, we should filter out
> the cur message instead of further compacting/merging). I think we still
> need to keep the `isValid()` and `merge()` separated.
>
> Regarding redundant deserialization, the input type `T` is the type of
> message value, so the input values are already deserialized. We don't want
> to expose the Message<T> interface in this CompactionStrategy to avoid
> message serialization/deserialization dependencies in the
> CompactionStrategy.
>
> The `merge()` functionality is suggested for more complex use cases (merge
> values instead of just filtering), and to support this `merge()`, we need
> to internally create a new msg with the compacted value, metadata, and
> messageId copies. We could initially define `isValid()` only in
> CompactionStrategy, and add `isMergeEnabled() and merge()` later in the
> CompactionStrategy interface if requested.
>
> Regards,
> Heesung
>
>
> On Thu, Nov 3, 2022 at 9:50 AM Heesung Sohn <he...@streamnative.io>
> wrote:
>
> > Oops! Michael, I apologize for the typo in your name.
> >
> > On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <he...@streamnative.io>
> > wrote:
> >
> >> Hi Machel,
> >>
> >> Here are my additional comments regarding your earlier email.
> >>
> >> - I updated the PIP title to show that this will impact table view as
> >> well.
> >>
> >> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the
> >> general idea of the states and their actions, and I defined the actual
> >> states here in the PR,
> >> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a. I
> >> will further clarify the bundle state data validation logic when
> >> introducing `BundleStateCompactionStrategy` class. This PIP is to support
> >> CompactionStrategy in general.
> >>
> >> - Agreed with your point that we should merge CompactionStrategy APIs. I
> >> updated the interface proposal in the PIP. I replaced `"isValid",
> >> "isMergeEnabled", and "merge"` apis with "compact" api.
> >>
> >>
> >> Thanks,
> >> Heesung
> >>
> >>
> >> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <
> >> heesung.sohn@streamnative.io> wrote:
> >>
> >>> Hi,
> >>> Thank you for the great comments.
> >>> Please find my comments inline too.
> >>>
> >>> Regards,
> >>> Heesung
> >>>
> >>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <mm...@apache.org>
> >>> wrote:
> >>>
> >>>> > I think we lose a single linearized view.
> >>>>
> >>>> Which linearized view are we losing, and what is the role of that
> >>>> linearized view? I think I might be missing why it is important. I
> >>>> agree that consumers won't know about each unsuccessful attempted
> >>>> acquisition of a bundle, but that seems like unnecessary information
> >>>> to broadcast to every broker in the cluster.
> >>>>
> >>>>
> >>>
> >>> PIP-192 proposes an assignment, transfer, and split protocol(multi-phase
> >>> state changes), relying on early broadcast across brokers, and all brokers
> >>> react to their clients according to the state change notifications --
> >>> brokers could defer any client lookups for bundle x if an
> >>> assignment/transfer/split is ongoing for x(broadcasted early in the topic).
> >>> One early broadcast example is the one that I discussed above, `When the
> >>> topic broadcast is faster than the concurrent assignment requests.` I think
> >>> the prefilter could delay this early broadcast, as it needs to go through
> >>> the additional single-leader compaction path.
> >>>
> >>> The bundle state recovery process is simpler by a single linearized view.
> >>>
> >>> The single linearized view can be easier to debug bundle states. We can
> >>> more easily track where the assignment requests come from and how it is
> >>> compacted in a single linearized view.
> >>>
> >>>
> >>>
> >>>> > I think the leader requires a write-through cache to compact messages
> >>>> based
> >>>> > on the latest states.
> >>>>
> >>> This brings up an important point that I would like to clarify. If we
> >>>> trust the write ahead log as the source of truth, what happens when a
> >>>> bundle has been validly owned by multiple brokers? As a broker starts
> >>>> and consumes from the compacted topic, how do we prevent it from
> >>>> incorrectly thinking that it owns a bundle for some short time period
> >>>> in the case that the ownership topic hasn't yet been compacted to
> >>>> remove old ownership state?
> >>>>
> >>>
> >>> Since the multi-phase transfer protocol involves the source and
> >>> destination broker's actions, the successful transfer should get the source
> >>> and destination broker to have the (near) latest state. For example, if
> >>> some brokers have old ownership states(network partitioned or delayed),
> >>> they will redirect clients to the source(old) broker. However, by the
> >>> transfer protocol, the source broker should have the latest state, so it
> >>> can redirect the client again to the destination broker.
> >>>
> >>> When a broker restarts, it won't start until its BSC state to the (near)
> >>> latest (til the last known messageId at that time).
> >>>
> >>>
> >>>> > Pulsar guarantees "a single writer".
> >>>>
> >>>> I didn't think we were using a single writer in the PIP 192 design. I
> >>>> thought we had many producers sending events to a compacted topic. My
> >>>> proposal would still have many producers, but the writer to bookkeeper
> >>>> would act as the single writer. It would technically be distinct from
> >>>> a normal Pulsar topic producer.
> >>>>
> >>>> I should highlight that I am only proposing "broker filtering before
> >>>> write" in the context of PIP 192 and as an alternative to adding
> >>>> pluggable compaction strategies. It would not be a generic feature.
> >>>>
> >>>>
> >>> I was worried about the worst case where two producers(leaders) happen
> >>> to write the compacted topic (although Pulsar can guarantee "a single
> >>> writer" or "a single producer" for a topic in normal situations).
> >>>
> >>>
> >>>
> >>>> > Could we clarify how to handle
> >>>> > the following(edge cases and failure recovery)?
> >>>> > 0. Is the un-compacted topic a persistent topic or a non-persistent
> >>>> topic?
> >>>>
> >>>> It is a persistent topic.
> >>>>
> >>>> > 1. How does the leader recover state from the two topics?
> >>>>
> >>>> A leader would recover state by first consuming the whole compacted
> >>>> topic and then by consuming from the current location of a cursor on
> >>>> the first input topic. As stated elsewhere, this introduces latency
> >>>> and could be an issue.
> >>>>
> >>>> > 2. How do we handle the case when the leader fails before writing
> >>>> messages
> >>>> > to the compacted topic
> >>>>
> >>>> The leader would not acknowledge the message on the input topic until
> >>>> it has successfully persisted the event on the compacted topic.
> >>>> Publishing the same event to a compacted topic multiple times is
> >>>> idempotent, so there is no risk of lost state. The real risk is
> >>>> latency. However, I think we might have similar (though not the same)
> >>>> latency risks in the current solution.
> >>>>
> >>>> > Analysis: the "pre-filter + two-topics" option can reduce the number
> >>>> of
> >>>> > messages to broadcast at the expense of the leader broker compaction.
> >>>>
> >>>> My primary point is that with this PIP's design, the filter logic is
> >>>> run on every broker and again during topic compaction. With the
> >>>> alternative design, the filter is run once.
> >>>>
> >>>> Thank you for the clarification.
> >>>
> >>> I think the difference is that the post-filter is an optimistic approach
> >>> as it optimistically relies on the "broadcast-filter" effect(brokers will
> >>> defer client lookups if notified ahead that any assignment is ongoing for
> >>> bundle x). Yes, in the worst case, if the broadcast is slower, each broker
> >>> needs to individually compact the conflicting assignment requests.
> >>>
> >>> Conversely, one downside of the pessimistic approach (single leader
> >>> pre-filter) is that when there are not many conflict concurrent assignment
> >>> requests(assign for bundle a, assign for bundle b, assign for bundle c...),
> >>> the requests need to redundantly go through the leader compaction.
> >>>
> >>>
> >>>
> >>>> > 3. initially less complex to implement (leaderless conflict
> >>>> resolution and
> >>>> > requires a single topic)
> >>>>
> >>>> PIP 215 has its own complexity too. Coordinating filters
> >>>> on both the client (table view) and the server (compaction) is non
> >>>> trivial. The proposed API includes hard coded client configuration for
> >>>> each component, which will make upgrading the version of the
> >>>> compaction strategy complicated, and could lead to incorrect
> >>>> interpretation of events in the stream. When a single broker is doing
> >>>> the filtering, versioning is no longer a distributed problem. That
> >>>> being said, I do not mean to suggest my solution is without
> >>>> complexity.
> >>>>
> >>>> > 4. it is not a "one-way door" decision (we could add the pre-filter
> >>>> logic
> >>>> > as well later)
> >>>>
> >>>> It's fair to say that we could add it later, but at that point, we
> >>>> will have added this new API for compaction strategy. Are we confident
> >>>> that pluggable compaction is independently an important addition to
> >>>> Pulsar's
> >>>> features, or would it make sense to make this API only exposed in the
> >>>> broker?
> >>>>
> >>>>
> >>> The intention is that this compaction feature could be useful for
> >>> complex user applications (if they are trying to do a similar thing). As I
> >>> mentioned, this feature is closely tied to the PIP-192 now. We are not
> >>> planning to expose this feature to users soon unless demanded and proven to
> >>> be stable.
> >>>
> >>>
> >>>> Thanks,
> >>>> Michael
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
> >>>> <he...@streamnative.io.invalid> wrote:
> >>>> >
> >>>> > Hi,
> >>>> >
> >>>> > Also, I thought about some concurrent assignment scenarios between
> >>>> > pre-filter vs post-filter.
> >>>> >
> >>>> > Example 1: When the topic broadcast is slower than the concurrent
> >>>> > assignment requests
> >>>> >
> >>>> > With pre-filter + two-topics (non-compacted and compacted topics)
> >>>> > t1: A -> non-compacted topic // broker A published a message to the
> >>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
> >>>> > t2: B -> non-compacted topic // broker B published a message, m2:
> >>>> {broker B
> >>>> > assigned bundle x to broker C}
> >>>> > t3: C -> non-compacted topic // broker C published a message, m3:
> >>>> {broker C
> >>>> > assigned bundle x to broker B}
> >>>> > t4: non-compacted topic -> L // leader broker consumed the messages:
> >>>> m1,m2,
> >>>> > and m3
> >>>> > t5: L -> compacted topic // leader compacted the messages and
> >>>> broadcasted
> >>>> > m1 to all consumers
> >>>> > t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
> >>>> >
> >>>> > With post-filter + a single topic
> >>>> > t1: A -> topic // broker A published a message to the non-compacted
> >>>> topic,
> >>>> > m1: {broker A assigned bundle x to broker A}
> >>>> > t2: B -> topic // broker B published a message, m2: {broker B assigned
> >>>> > bundle x to broker C}
> >>>> > t3: C -> topic // broker C published a message, m3: {broker C assigned
> >>>> > bundle x to broker B}
> >>>> > t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2,
> >>>> and m3
> >>>> > t5: [A,B,C] -> m1 // broker A,B,C individually compacted the messages
> >>>> to m1.
> >>>> >
> >>>> > Analysis: the "pre-filter + two-topics" option can reduce the number
> >>>> of
> >>>> > messages to broadcast at the expense of the leader broker compaction.
> >>>> >
> >>>> >
> >>>> > Example 2: When the topic broadcast is faster than the concurrent
> >>>> > assignment requests
> >>>> >
> >>>> > With pre-filter + two-topics (non-compacted and compacted topics)
> >>>> > t1: A -> non-compacted topic // broker A published a message to the
> >>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
> >>>> > t2: non-compacted topic -> L // leader broker consumed the messages:
> >>>> m1
> >>>> > t3: L -> compacted topic // leader compacted the message and
> >>>> broadcasted m1
> >>>> > to all consumers
> >>>> > t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
> >>>> > t5: A-> own bundle // broker A knows that its assignment has been
> >>>> accepted,
> >>>> > so proceeding to own the bundle (meanwhile deferring lookup requests)
> >>>> > t6: B -> defer client lookups // broker B knows that bundle
> >>>> assignment is
> >>>> > running(meanwhile deferring lookup requests)
> >>>> > t7: C -> defer client lookups // broker C knows that bundle
> >>>> assignment is
> >>>> > running(meanwhile deferring lookup requests)
> >>>> >
> >>>> > With post-filter + a single topic
> >>>> > t1: A -> topic // broker A published a message to the non-compacted
> >>>> topic,
> >>>> > m1: {broker A assigned bundle x to broker A}
> >>>> > t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
> >>>> > t3:  A-> own bundle // broker A knows that its assignment has been
> >>>> > accepted, so proceeding to own the bundle (meanwhile deferring lookup
> >>>> > requests)
> >>>> > t4: B -> defer client lookups // broker B knows that bundle
> >>>> assignment is
> >>>> > running(meanwhile deferring lookup requests)
> >>>> > t5: C -> defer client lookups // broker C knows that bundle
> >>>> assignment is
> >>>> > running(meanwhile deferring lookup requests)
> >>>> >
> >>>> > Analysis: The "post-filter + a single topic" can perform ok in this
> >>>> case
> >>>> > without the additional leader coordination and the secondary topic
> >>>> because
> >>>> > the early broadcast can inform all brokers and prevent them from
> >>>> requesting
> >>>> > other assignments for the same bundle.
> >>>> >
> >>>> > I think the post-filter option is initially not bad because:
> >>>> >
> >>>> > 1. it is safe in the worst case (in case the messages are not
> >>>> correctly
> >>>> > pre-filtered at the leader)
> >>>> > 2. it performs ok because the early broadcast can prevent
> >>>> > concurrent assignment requests.
> >>>> > 3. initially less complex to implement (leaderless conflict
> >>>> resolution and
> >>>> > requires a single topic)
> >>>> > 4. it is not a "one-way door" decision (we could add the pre-filter
> >>>> logic
> >>>> > as well later)
> >>>> >
> >>>> > Regards,
> >>>> > Heesung
> >>>> >
> >>>> >
> >>>> >
> >>>> > On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
> >>>> heesung.sohn@streamnative.io>
> >>>> > wrote:
> >>>> >
> >>>> > > Hi Michael,
> >>>> > >
> >>>> > > For the pre-prefilter(pre-compaction) option,
> >>>> > > I think the leader requires a write-through cache to compact
> >>>> messages
> >>>> > > based on the latest states. Otherwise, the leader needs to wait for
> >>>> the
> >>>> > > last msg from the (compacted) topic before compacting the next msg
> >>>> for the
> >>>> > > same bundle.
> >>>> > >
> >>>> > > Pulsar guarantees "a single writer". However, for the worst-case
> >>>> > > scenario(due to network partitions, bugs in zk or etcd leader
> >>>> election,
> >>>> > > bugs in bk, data corruption ), I think it is safe to place the
> >>>> post-filter
> >>>> > > on the consumer side(compaction and table views) as well in order to
> >>>> > > validate the state changes.
> >>>> > >
> >>>> > > For the two-topic approach,
> >>>> > > I think we lose a single linearized view. Could we clarify how to
> >>>> handle
> >>>> > > the following(edge cases and failure recovery)?
> >>>> > > 0. Is the un-compacted topic a persistent topic or a non-persistent
> >>>> topic?
> >>>> > > 1. How does the leader recover state from the two topics?
> >>>> > > 2. How do we handle the case when the leader fails before writing
> >>>> messages
> >>>> > > to the compacted topic
> >>>> > >
> >>>> > > Regards,
> >>>> > > Heesung
> >>>> > >
> >>>> > > On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
> >>>> mmarshall@apache.org>
> >>>> > > wrote:
> >>>> > >
> >>>> > >> Sharing some more thoughts. We could alternatively use two topics
> >>>> > >> instead of one. In this design, the first topic is the unfiltered
> >>>> > >> write ahead log that represents many writers (brokers) trying to
> >>>> > >> acquire ownership of bundles. The second topic is the distilled log
> >>>> > >> that represents the "winners" or the "owners" of the bundles.
> >>>> There is
> >>>> > >> a single writer, the leader broker, that reads from the input topic
> >>>> > >> and writes to the output topic. The first topic is normal and the
> >>>> > >> second is compacted.
> >>>> > >>
> >>>> > >> The primary benefit in a two topic solution is that it is easy for
> >>>> the
> >>>> > >> leader broker to trade off ownership without needing to slow down
> >>>> > >> writes to the input topic. The leader broker will start consuming
> >>>> from
> >>>> > >> the input topic when it has fully consumed the table view on the
> >>>> > >> output topic. In general, I don't think consumers know when they
> >>>> have
> >>>> > >> "reached the end of a table view", but we should be able to
> >>>> trivially
> >>>> > >> figure this out if we are the topic's only writer and the topic and
> >>>> > >> writer are collocated on the same broker.
> >>>> > >>
> >>>> > >> In that design, it might make sense to use something like the
> >>>> > >> replication cursor to keep track of this consumer's state.
> >>>> > >>
> >>>> > >> - Michael
> >>>> > >>
> >>>> > >> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
> >>>> mmarshall@apache.org>
> >>>> > >> wrote:
> >>>> > >> >
> >>>> > >> > Thanks for your proposal, Heesung.
> >>>> > >> >
> >>>> > >> > Fundamentally, we have the problems listed in this PIP because
> >>>> we have
> >>>> > >> > multiple writers instead of just one writer. Can we solve this
> >>>> problem
> >>>> > >> > by changing our write pattern? What if we use the leader broker
> >>>> as the
> >>>> > >> > single writer? That broker would intercept attempts to acquire
> >>>> > >> > ownership on bundles and would grant ownership to the first
> >>>> broker to
> >>>> > >> > claim an unassigned bundle. It could "grant ownership" by
> >>>> letting the
> >>>> > >> > first write to claim an unassigned bundle get written to the
> >>>> ownership
> >>>> > >> > topic. When a bundle is already owned, the leader won't persist
> >>>> that
> >>>> > >> > event to the bookkeeper. In this design, the log becomes a true
> >>>> > >> > ownership log, which will correctly work with the existing topic
> >>>> > >> > compaction and table view solutions. My proposal essentially
> >>>> moves the
> >>>> > >> > conflict resolution to just before the write, and as a
> >>>> consequence, it
> >>>> > >> > greatly reduces the need for post processing of the event log.
> >>>> One
> >>>> > >> > trade off might be that the leader broker could slow down the
> >>>> write
> >>>> > >> > path, but given that the leader would just need to verify the
> >>>> current
> >>>> > >> > state of the bundle, I think it'd be performant enough.
> >>>> > >> >
> >>>> > >> > Additionally, we'd need the leader broker to be "caught up" on
> >>>> bundle
> >>>> > >> > ownership in order to grant ownership of topics, but unless I am
> >>>> > >> > mistaken, that is already a requirement of the current PIP 192
> >>>> > >> > paradigm.
> >>>> > >> >
> >>>> > >> > Below are some additional thoughts that will be relevant if we
> >>>> move
> >>>> > >> > forward with the design as it is currently proposed.
> >>>> > >> >
> >>>> > >> > I think it might be helpful to update the title to show that this
> >>>> > >> > proposal will also affect table view as well. I didn't catch
> >>>> that at
> >>>> > >> > first.
> >>>> > >> >
> >>>> > >> > Do you have any documentation describing how the
> >>>> > >> > TopicCompactionStrategy will determine which states are valid in
> >>>> the
> >>>> > >> > context of load balancing? I looked at
> >>>> > >> > https://github.com/apache/pulsar/pull/18195, but I couldn't
> >>>> seem to
> >>>> > >> > find anything for it. That would help make this proposal less
> >>>> > >> > abstract.
> >>>> > >> >
> >>>> > >> > The proposed API seems very tied to the needs of PIP 192. For
> >>>> example,
> >>>> > >> > `isValid` is not a term I associate with topic compaction. The
> >>>> > >> > fundamental question for compaction is which value to keep (or
> >>>> build a
> >>>> > >> > new value). I think we might be able to simplify the API by
> >>>> replacing
> >>>> > >> > the "isValid", "isMergeEnabled", and "merge" methods with a
> >>>> single
> >>>> > >> > method that lets the implementation handle one or all tasks. That
> >>>> > >> > would also remove the need to deserialize payloads multiple
> >>>> times too.
> >>>> > >> >
> >>>> > >> > I also feel like mentioning that after working with the PIP 105
> >>>> broker
> >>>> > >> > side filtering, I think we should avoid running UDFs in the
> >>>> broker as
> >>>> > >> > much as possible. (I do not consider the load balancing logic to
> >>>> be a
> >>>> > >> > UDF here.) I think it would be worth not making this a user
> >>>> facing
> >>>> > >> > feature unless there is demand for real use cases.
> >>>> > >> >
> >>>> > >> > Thanks!
> >>>> > >> > Michael
> >>>> > >> >
> >>>> > >> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org> wrote:
> >>>> > >> > >
> >>>> > >> > > +1(non-binding)
> >>>> > >> > >
> >>>> > >> > > thanks,
> >>>> > >> > > bo
> >>>> > >> > >
> >>>> > >> > > Heesung Sohn <he...@streamnative.io.invalid>
> >>>> 于2022年10月19日周三
> >>>> > >> 07:54写道:
> >>>> > >> > > >
> >>>> > >> > > > Hi pulsar-dev community,
> >>>> > >> > > >
> >>>> > >> > > > I raised a pip to discuss : PIP-215: Configurable Topic
> >>>> Compaction
> >>>> > >> Strategy
> >>>> > >> > > >
> >>>> > >> > > > PIP link: https://github.com/apache/pulsar/issues/18099
> >>>> > >> > > >
> >>>> > >> > > > Regards,
> >>>> > >> > > > Heesung
> >>>> > >>
> >>>> > >
> >>>>
> >>>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Hi,

I have a different thought about my previous comment.

- Agreed with your point that we should merge CompactionStrategy APIs. I
updated the interface proposal in the PIP. I replaced `"isValid",
"isMergeEnabled", and "merge"` apis with "compact" api.

boolean isValid(T prev, T cur)
boolean isMergeEnabled()
T merge(T prev, T cur)

=>

T compact(T prev, T cur)

In fact, with the `compact(T prev, T cur)` api only, it is not clear if
prev => cur is a valid transition or not(if invalid, we should filter out
the cur message instead of further compacting/merging). I think we still
need to keep the `isValid()` and `merge()` separated.

Regarding redundant deserialization, the input type `T` is the type of
message value, so the input values are already deserialized. We don't want
to expose the Message<T> interface in this CompactionStrategy to avoid
message serialization/deserialization dependencies in the
CompactionStrategy.

The `merge()` functionality is suggested for more complex use cases (merge
values instead of just filtering), and to support this `merge()`, we need
to internally create a new msg with the compacted value, metadata, and
messageId copies. We could initially define `isValid()` only in
CompactionStrategy, and add `isMergeEnabled() and merge()` later in the
CompactionStrategy interface if requested.

Regards,
Heesung


On Thu, Nov 3, 2022 at 9:50 AM Heesung Sohn <he...@streamnative.io>
wrote:

> Oops! Michael, I apologize for the typo in your name.
>
> On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <he...@streamnative.io>
> wrote:
>
>> Hi Machel,
>>
>> Here are my additional comments regarding your earlier email.
>>
>> - I updated the PIP title to show that this will impact table view as
>> well.
>>
>> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the
>> general idea of the states and their actions, and I defined the actual
>> states here in the PR,
>> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a. I
>> will further clarify the bundle state data validation logic when
>> introducing `BundleStateCompactionStrategy` class. This PIP is to support
>> CompactionStrategy in general.
>>
>> - Agreed with your point that we should merge CompactionStrategy APIs. I
>> updated the interface proposal in the PIP. I replaced `"isValid",
>> "isMergeEnabled", and "merge"` apis with "compact" api.
>>
>>
>> Thanks,
>> Heesung
>>
>>
>> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <
>> heesung.sohn@streamnative.io> wrote:
>>
>>> Hi,
>>> Thank you for the great comments.
>>> Please find my comments inline too.
>>>
>>> Regards,
>>> Heesung
>>>
>>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <mm...@apache.org>
>>> wrote:
>>>
>>>> > I think we lose a single linearized view.
>>>>
>>>> Which linearized view are we losing, and what is the role of that
>>>> linearized view? I think I might be missing why it is important. I
>>>> agree that consumers won't know about each unsuccessful attempted
>>>> acquisition of a bundle, but that seems like unnecessary information
>>>> to broadcast to every broker in the cluster.
>>>>
>>>>
>>>
>>> PIP-192 proposes an assignment, transfer, and split protocol(multi-phase
>>> state changes), relying on early broadcast across brokers, and all brokers
>>> react to their clients according to the state change notifications --
>>> brokers could defer any client lookups for bundle x if an
>>> assignment/transfer/split is ongoing for x(broadcasted early in the topic).
>>> One early broadcast example is the one that I discussed above, `When the
>>> topic broadcast is faster than the concurrent assignment requests.` I think
>>> the prefilter could delay this early broadcast, as it needs to go through
>>> the additional single-leader compaction path.
>>>
>>> The bundle state recovery process is simpler by a single linearized view.
>>>
>>> The single linearized view can be easier to debug bundle states. We can
>>> more easily track where the assignment requests come from and how it is
>>> compacted in a single linearized view.
>>>
>>>
>>>
>>>> > I think the leader requires a write-through cache to compact messages
>>>> based
>>>> > on the latest states.
>>>>
>>> This brings up an important point that I would like to clarify. If we
>>>> trust the write ahead log as the source of truth, what happens when a
>>>> bundle has been validly owned by multiple brokers? As a broker starts
>>>> and consumes from the compacted topic, how do we prevent it from
>>>> incorrectly thinking that it owns a bundle for some short time period
>>>> in the case that the ownership topic hasn't yet been compacted to
>>>> remove old ownership state?
>>>>
>>>
>>> Since the multi-phase transfer protocol involves the source and
>>> destination broker's actions, the successful transfer should get the source
>>> and destination broker to have the (near) latest state. For example, if
>>> some brokers have old ownership states(network partitioned or delayed),
>>> they will redirect clients to the source(old) broker. However, by the
>>> transfer protocol, the source broker should have the latest state, so it
>>> can redirect the client again to the destination broker.
>>>
>>> When a broker restarts, it won't start until its BSC state to the (near)
>>> latest (til the last known messageId at that time).
>>>
>>>
>>>> > Pulsar guarantees "a single writer".
>>>>
>>>> I didn't think we were using a single writer in the PIP 192 design. I
>>>> thought we had many producers sending events to a compacted topic. My
>>>> proposal would still have many producers, but the writer to bookkeeper
>>>> would act as the single writer. It would technically be distinct from
>>>> a normal Pulsar topic producer.
>>>>
>>>> I should highlight that I am only proposing "broker filtering before
>>>> write" in the context of PIP 192 and as an alternative to adding
>>>> pluggable compaction strategies. It would not be a generic feature.
>>>>
>>>>
>>> I was worried about the worst case where two producers(leaders) happen
>>> to write the compacted topic (although Pulsar can guarantee "a single
>>> writer" or "a single producer" for a topic in normal situations).
>>>
>>>
>>>
>>>> > Could we clarify how to handle
>>>> > the following(edge cases and failure recovery)?
>>>> > 0. Is the un-compacted topic a persistent topic or a non-persistent
>>>> topic?
>>>>
>>>> It is a persistent topic.
>>>>
>>>> > 1. How does the leader recover state from the two topics?
>>>>
>>>> A leader would recover state by first consuming the whole compacted
>>>> topic and then by consuming from the current location of a cursor on
>>>> the first input topic. As stated elsewhere, this introduces latency
>>>> and could be an issue.
>>>>
>>>> > 2. How do we handle the case when the leader fails before writing
>>>> messages
>>>> > to the compacted topic
>>>>
>>>> The leader would not acknowledge the message on the input topic until
>>>> it has successfully persisted the event on the compacted topic.
>>>> Publishing the same event to a compacted topic multiple times is
>>>> idempotent, so there is no risk of lost state. The real risk is
>>>> latency. However, I think we might have similar (though not the same)
>>>> latency risks in the current solution.
>>>>
>>>> > Analysis: the "pre-filter + two-topics" option can reduce the number
>>>> of
>>>> > messages to broadcast at the expense of the leader broker compaction.
>>>>
>>>> My primary point is that with this PIP's design, the filter logic is
>>>> run on every broker and again during topic compaction. With the
>>>> alternative design, the filter is run once.
>>>>
>>>> Thank you for the clarification.
>>>
>>> I think the difference is that the post-filter is an optimistic approach
>>> as it optimistically relies on the "broadcast-filter" effect(brokers will
>>> defer client lookups if notified ahead that any assignment is ongoing for
>>> bundle x). Yes, in the worst case, if the broadcast is slower, each broker
>>> needs to individually compact the conflicting assignment requests.
>>>
>>> Conversely, one downside of the pessimistic approach (single leader
>>> pre-filter) is that when there are not many conflict concurrent assignment
>>> requests(assign for bundle a, assign for bundle b, assign for bundle c...),
>>> the requests need to redundantly go through the leader compaction.
>>>
>>>
>>>
>>>> > 3. initially less complex to implement (leaderless conflict
>>>> resolution and
>>>> > requires a single topic)
>>>>
>>>> PIP 215 has its own complexity too. Coordinating filters
>>>> on both the client (table view) and the server (compaction) is non
>>>> trivial. The proposed API includes hard coded client configuration for
>>>> each component, which will make upgrading the version of the
>>>> compaction strategy complicated, and could lead to incorrect
>>>> interpretation of events in the stream. When a single broker is doing
>>>> the filtering, versioning is no longer a distributed problem. That
>>>> being said, I do not mean to suggest my solution is without
>>>> complexity.
>>>>
>>>> > 4. it is not a "one-way door" decision (we could add the pre-filter
>>>> logic
>>>> > as well later)
>>>>
>>>> It's fair to say that we could add it later, but at that point, we
>>>> will have added this new API for compaction strategy. Are we confident
>>>> that pluggable compaction is independently an important addition to
>>>> Pulsar's
>>>> features, or would it make sense to make this API only exposed in the
>>>> broker?
>>>>
>>>>
>>> The intention is that this compaction feature could be useful for
>>> complex user applications (if they are trying to do a similar thing). As I
>>> mentioned, this feature is closely tied to the PIP-192 now. We are not
>>> planning to expose this feature to users soon unless demanded and proven to
>>> be stable.
>>>
>>>
>>>> Thanks,
>>>> Michael
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
>>>> <he...@streamnative.io.invalid> wrote:
>>>> >
>>>> > Hi,
>>>> >
>>>> > Also, I thought about some concurrent assignment scenarios between
>>>> > pre-filter vs post-filter.
>>>> >
>>>> > Example 1: When the topic broadcast is slower than the concurrent
>>>> > assignment requests
>>>> >
>>>> > With pre-filter + two-topics (non-compacted and compacted topics)
>>>> > t1: A -> non-compacted topic // broker A published a message to the
>>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
>>>> > t2: B -> non-compacted topic // broker B published a message, m2:
>>>> {broker B
>>>> > assigned bundle x to broker C}
>>>> > t3: C -> non-compacted topic // broker C published a message, m3:
>>>> {broker C
>>>> > assigned bundle x to broker B}
>>>> > t4: non-compacted topic -> L // leader broker consumed the messages:
>>>> m1,m2,
>>>> > and m3
>>>> > t5: L -> compacted topic // leader compacted the messages and
>>>> broadcasted
>>>> > m1 to all consumers
>>>> > t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>>>> >
>>>> > With post-filter + a single topic
>>>> > t1: A -> topic // broker A published a message to the non-compacted
>>>> topic,
>>>> > m1: {broker A assigned bundle x to broker A}
>>>> > t2: B -> topic // broker B published a message, m2: {broker B assigned
>>>> > bundle x to broker C}
>>>> > t3: C -> topic // broker C published a message, m3: {broker C assigned
>>>> > bundle x to broker B}
>>>> > t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2,
>>>> and m3
>>>> > t5: [A,B,C] -> m1 // broker A,B,C individually compacted the messages
>>>> to m1.
>>>> >
>>>> > Analysis: the "pre-filter + two-topics" option can reduce the number
>>>> of
>>>> > messages to broadcast at the expense of the leader broker compaction.
>>>> >
>>>> >
>>>> > Example 2: When the topic broadcast is faster than the concurrent
>>>> > assignment requests
>>>> >
>>>> > With pre-filter + two-topics (non-compacted and compacted topics)
>>>> > t1: A -> non-compacted topic // broker A published a message to the
>>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
>>>> > t2: non-compacted topic -> L // leader broker consumed the messages:
>>>> m1
>>>> > t3: L -> compacted topic // leader compacted the message and
>>>> broadcasted m1
>>>> > to all consumers
>>>> > t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>>>> > t5: A-> own bundle // broker A knows that its assignment has been
>>>> accepted,
>>>> > so proceeding to own the bundle (meanwhile deferring lookup requests)
>>>> > t6: B -> defer client lookups // broker B knows that bundle
>>>> assignment is
>>>> > running(meanwhile deferring lookup requests)
>>>> > t7: C -> defer client lookups // broker C knows that bundle
>>>> assignment is
>>>> > running(meanwhile deferring lookup requests)
>>>> >
>>>> > With post-filter + a single topic
>>>> > t1: A -> topic // broker A published a message to the non-compacted
>>>> topic,
>>>> > m1: {broker A assigned bundle x to broker A}
>>>> > t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
>>>> > t3:  A-> own bundle // broker A knows that its assignment has been
>>>> > accepted, so proceeding to own the bundle (meanwhile deferring lookup
>>>> > requests)
>>>> > t4: B -> defer client lookups // broker B knows that bundle
>>>> assignment is
>>>> > running(meanwhile deferring lookup requests)
>>>> > t5: C -> defer client lookups // broker C knows that bundle
>>>> assignment is
>>>> > running(meanwhile deferring lookup requests)
>>>> >
>>>> > Analysis: The "post-filter + a single topic" can perform ok in this
>>>> case
>>>> > without the additional leader coordination and the secondary topic
>>>> because
>>>> > the early broadcast can inform all brokers and prevent them from
>>>> requesting
>>>> > other assignments for the same bundle.
>>>> >
>>>> > I think the post-filter option is initially not bad because:
>>>> >
>>>> > 1. it is safe in the worst case (in case the messages are not
>>>> correctly
>>>> > pre-filtered at the leader)
>>>> > 2. it performs ok because the early broadcast can prevent
>>>> > concurrent assignment requests.
>>>> > 3. initially less complex to implement (leaderless conflict
>>>> resolution and
>>>> > requires a single topic)
>>>> > 4. it is not a "one-way door" decision (we could add the pre-filter
>>>> logic
>>>> > as well later)
>>>> >
>>>> > Regards,
>>>> > Heesung
>>>> >
>>>> >
>>>> >
>>>> > On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
>>>> heesung.sohn@streamnative.io>
>>>> > wrote:
>>>> >
>>>> > > Hi Michael,
>>>> > >
>>>> > > For the pre-prefilter(pre-compaction) option,
>>>> > > I think the leader requires a write-through cache to compact
>>>> messages
>>>> > > based on the latest states. Otherwise, the leader needs to wait for
>>>> the
>>>> > > last msg from the (compacted) topic before compacting the next msg
>>>> for the
>>>> > > same bundle.
>>>> > >
>>>> > > Pulsar guarantees "a single writer". However, for the worst-case
>>>> > > scenario(due to network partitions, bugs in zk or etcd leader
>>>> election,
>>>> > > bugs in bk, data corruption ), I think it is safe to place the
>>>> post-filter
>>>> > > on the consumer side(compaction and table views) as well in order to
>>>> > > validate the state changes.
>>>> > >
>>>> > > For the two-topic approach,
>>>> > > I think we lose a single linearized view. Could we clarify how to
>>>> handle
>>>> > > the following(edge cases and failure recovery)?
>>>> > > 0. Is the un-compacted topic a persistent topic or a non-persistent
>>>> topic?
>>>> > > 1. How does the leader recover state from the two topics?
>>>> > > 2. How do we handle the case when the leader fails before writing
>>>> messages
>>>> > > to the compacted topic
>>>> > >
>>>> > > Regards,
>>>> > > Heesung
>>>> > >
>>>> > > On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
>>>> mmarshall@apache.org>
>>>> > > wrote:
>>>> > >
>>>> > >> Sharing some more thoughts. We could alternatively use two topics
>>>> > >> instead of one. In this design, the first topic is the unfiltered
>>>> > >> write ahead log that represents many writers (brokers) trying to
>>>> > >> acquire ownership of bundles. The second topic is the distilled log
>>>> > >> that represents the "winners" or the "owners" of the bundles.
>>>> There is
>>>> > >> a single writer, the leader broker, that reads from the input topic
>>>> > >> and writes to the output topic. The first topic is normal and the
>>>> > >> second is compacted.
>>>> > >>
>>>> > >> The primary benefit in a two topic solution is that it is easy for
>>>> the
>>>> > >> leader broker to trade off ownership without needing to slow down
>>>> > >> writes to the input topic. The leader broker will start consuming
>>>> from
>>>> > >> the input topic when it has fully consumed the table view on the
>>>> > >> output topic. In general, I don't think consumers know when they
>>>> have
>>>> > >> "reached the end of a table view", but we should be able to
>>>> trivially
>>>> > >> figure this out if we are the topic's only writer and the topic and
>>>> > >> writer are collocated on the same broker.
>>>> > >>
>>>> > >> In that design, it might make sense to use something like the
>>>> > >> replication cursor to keep track of this consumer's state.
>>>> > >>
>>>> > >> - Michael
>>>> > >>
>>>> > >> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
>>>> mmarshall@apache.org>
>>>> > >> wrote:
>>>> > >> >
>>>> > >> > Thanks for your proposal, Heesung.
>>>> > >> >
>>>> > >> > Fundamentally, we have the problems listed in this PIP because
>>>> we have
>>>> > >> > multiple writers instead of just one writer. Can we solve this
>>>> problem
>>>> > >> > by changing our write pattern? What if we use the leader broker
>>>> as the
>>>> > >> > single writer? That broker would intercept attempts to acquire
>>>> > >> > ownership on bundles and would grant ownership to the first
>>>> broker to
>>>> > >> > claim an unassigned bundle. It could "grant ownership" by
>>>> letting the
>>>> > >> > first write to claim an unassigned bundle get written to the
>>>> ownership
>>>> > >> > topic. When a bundle is already owned, the leader won't persist
>>>> that
>>>> > >> > event to the bookkeeper. In this design, the log becomes a true
>>>> > >> > ownership log, which will correctly work with the existing topic
>>>> > >> > compaction and table view solutions. My proposal essentially
>>>> moves the
>>>> > >> > conflict resolution to just before the write, and as a
>>>> consequence, it
>>>> > >> > greatly reduces the need for post processing of the event log.
>>>> One
>>>> > >> > trade off might be that the leader broker could slow down the
>>>> write
>>>> > >> > path, but given that the leader would just need to verify the
>>>> current
>>>> > >> > state of the bundle, I think it'd be performant enough.
>>>> > >> >
>>>> > >> > Additionally, we'd need the leader broker to be "caught up" on
>>>> bundle
>>>> > >> > ownership in order to grant ownership of topics, but unless I am
>>>> > >> > mistaken, that is already a requirement of the current PIP 192
>>>> > >> > paradigm.
>>>> > >> >
>>>> > >> > Below are some additional thoughts that will be relevant if we
>>>> move
>>>> > >> > forward with the design as it is currently proposed.
>>>> > >> >
>>>> > >> > I think it might be helpful to update the title to show that this
>>>> > >> > proposal will also affect table view as well. I didn't catch
>>>> that at
>>>> > >> > first.
>>>> > >> >
>>>> > >> > Do you have any documentation describing how the
>>>> > >> > TopicCompactionStrategy will determine which states are valid in
>>>> the
>>>> > >> > context of load balancing? I looked at
>>>> > >> > https://github.com/apache/pulsar/pull/18195, but I couldn't
>>>> seem to
>>>> > >> > find anything for it. That would help make this proposal less
>>>> > >> > abstract.
>>>> > >> >
>>>> > >> > The proposed API seems very tied to the needs of PIP 192. For
>>>> example,
>>>> > >> > `isValid` is not a term I associate with topic compaction. The
>>>> > >> > fundamental question for compaction is which value to keep (or
>>>> build a
>>>> > >> > new value). I think we might be able to simplify the API by
>>>> replacing
>>>> > >> > the "isValid", "isMergeEnabled", and "merge" methods with a
>>>> single
>>>> > >> > method that lets the implementation handle one or all tasks. That
>>>> > >> > would also remove the need to deserialize payloads multiple
>>>> times too.
>>>> > >> >
>>>> > >> > I also feel like mentioning that after working with the PIP 105
>>>> broker
>>>> > >> > side filtering, I think we should avoid running UDFs in the
>>>> broker as
>>>> > >> > much as possible. (I do not consider the load balancing logic to
>>>> be a
>>>> > >> > UDF here.) I think it would be worth not making this a user
>>>> facing
>>>> > >> > feature unless there is demand for real use cases.
>>>> > >> >
>>>> > >> > Thanks!
>>>> > >> > Michael
>>>> > >> >
>>>> > >> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org> wrote:
>>>> > >> > >
>>>> > >> > > +1(non-binding)
>>>> > >> > >
>>>> > >> > > thanks,
>>>> > >> > > bo
>>>> > >> > >
>>>> > >> > > Heesung Sohn <he...@streamnative.io.invalid>
>>>> 于2022年10月19日周三
>>>> > >> 07:54写道:
>>>> > >> > > >
>>>> > >> > > > Hi pulsar-dev community,
>>>> > >> > > >
>>>> > >> > > > I raised a pip to discuss : PIP-215: Configurable Topic
>>>> Compaction
>>>> > >> Strategy
>>>> > >> > > >
>>>> > >> > > > PIP link: https://github.com/apache/pulsar/issues/18099
>>>> > >> > > >
>>>> > >> > > > Regards,
>>>> > >> > > > Heesung
>>>> > >>
>>>> > >
>>>>
>>>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Oops! Michael, I apologize for the typo in your name.

On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <he...@streamnative.io>
wrote:

> Hi Machel,
>
> Here are my additional comments regarding your earlier email.
>
> - I updated the PIP title to show that this will impact table view as well.
>
> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the
> general idea of the states and their actions, and I defined the actual
> states here in the PR,
> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a. I
> will further clarify the bundle state data validation logic when
> introducing `BundleStateCompactionStrategy` class. This PIP is to support
> CompactionStrategy in general.
>
> - Agreed with your point that we should merge CompactionStrategy APIs. I
> updated the interface proposal in the PIP. I replaced `"isValid",
> "isMergeEnabled", and "merge"` apis with "compact" api.
>
>
> Thanks,
> Heesung
>
>
> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <he...@streamnative.io>
> wrote:
>
>> Hi,
>> Thank you for the great comments.
>> Please find my comments inline too.
>>
>> Regards,
>> Heesung
>>
>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <mm...@apache.org>
>> wrote:
>>
>>> > I think we lose a single linearized view.
>>>
>>> Which linearized view are we losing, and what is the role of that
>>> linearized view? I think I might be missing why it is important. I
>>> agree that consumers won't know about each unsuccessful attempted
>>> acquisition of a bundle, but that seems like unnecessary information
>>> to broadcast to every broker in the cluster.
>>>
>>>
>>
>> PIP-192 proposes an assignment, transfer, and split protocol(multi-phase
>> state changes), relying on early broadcast across brokers, and all brokers
>> react to their clients according to the state change notifications --
>> brokers could defer any client lookups for bundle x if an
>> assignment/transfer/split is ongoing for x(broadcasted early in the topic).
>> One early broadcast example is the one that I discussed above, `When the
>> topic broadcast is faster than the concurrent assignment requests.` I think
>> the prefilter could delay this early broadcast, as it needs to go through
>> the additional single-leader compaction path.
>>
>> The bundle state recovery process is simpler by a single linearized view.
>>
>> The single linearized view can be easier to debug bundle states. We can
>> more easily track where the assignment requests come from and how it is
>> compacted in a single linearized view.
>>
>>
>>
>>> > I think the leader requires a write-through cache to compact messages
>>> based
>>> > on the latest states.
>>>
>> This brings up an important point that I would like to clarify. If we
>>> trust the write ahead log as the source of truth, what happens when a
>>> bundle has been validly owned by multiple brokers? As a broker starts
>>> and consumes from the compacted topic, how do we prevent it from
>>> incorrectly thinking that it owns a bundle for some short time period
>>> in the case that the ownership topic hasn't yet been compacted to
>>> remove old ownership state?
>>>
>>
>> Since the multi-phase transfer protocol involves the source and
>> destination broker's actions, the successful transfer should get the source
>> and destination broker to have the (near) latest state. For example, if
>> some brokers have old ownership states(network partitioned or delayed),
>> they will redirect clients to the source(old) broker. However, by the
>> transfer protocol, the source broker should have the latest state, so it
>> can redirect the client again to the destination broker.
>>
>> When a broker restarts, it won't start until its BSC state to the (near)
>> latest (til the last known messageId at that time).
>>
>>
>>> > Pulsar guarantees "a single writer".
>>>
>>> I didn't think we were using a single writer in the PIP 192 design. I
>>> thought we had many producers sending events to a compacted topic. My
>>> proposal would still have many producers, but the writer to bookkeeper
>>> would act as the single writer. It would technically be distinct from
>>> a normal Pulsar topic producer.
>>>
>>> I should highlight that I am only proposing "broker filtering before
>>> write" in the context of PIP 192 and as an alternative to adding
>>> pluggable compaction strategies. It would not be a generic feature.
>>>
>>>
>> I was worried about the worst case where two producers(leaders) happen to
>> write the compacted topic (although Pulsar can guarantee "a single writer"
>> or "a single producer" for a topic in normal situations).
>>
>>
>>
>>> > Could we clarify how to handle
>>> > the following(edge cases and failure recovery)?
>>> > 0. Is the un-compacted topic a persistent topic or a non-persistent
>>> topic?
>>>
>>> It is a persistent topic.
>>>
>>> > 1. How does the leader recover state from the two topics?
>>>
>>> A leader would recover state by first consuming the whole compacted
>>> topic and then by consuming from the current location of a cursor on
>>> the first input topic. As stated elsewhere, this introduces latency
>>> and could be an issue.
>>>
>>> > 2. How do we handle the case when the leader fails before writing
>>> messages
>>> > to the compacted topic
>>>
>>> The leader would not acknowledge the message on the input topic until
>>> it has successfully persisted the event on the compacted topic.
>>> Publishing the same event to a compacted topic multiple times is
>>> idempotent, so there is no risk of lost state. The real risk is
>>> latency. However, I think we might have similar (though not the same)
>>> latency risks in the current solution.
>>>
>>> > Analysis: the "pre-filter + two-topics" option can reduce the number of
>>> > messages to broadcast at the expense of the leader broker compaction.
>>>
>>> My primary point is that with this PIP's design, the filter logic is
>>> run on every broker and again during topic compaction. With the
>>> alternative design, the filter is run once.
>>>
>>> Thank you for the clarification.
>>
>> I think the difference is that the post-filter is an optimistic approach
>> as it optimistically relies on the "broadcast-filter" effect(brokers will
>> defer client lookups if notified ahead that any assignment is ongoing for
>> bundle x). Yes, in the worst case, if the broadcast is slower, each broker
>> needs to individually compact the conflicting assignment requests.
>>
>> Conversely, one downside of the pessimistic approach (single leader
>> pre-filter) is that when there are not many conflict concurrent assignment
>> requests(assign for bundle a, assign for bundle b, assign for bundle c...),
>> the requests need to redundantly go through the leader compaction.
>>
>>
>>
>>> > 3. initially less complex to implement (leaderless conflict resolution
>>> and
>>> > requires a single topic)
>>>
>>> PIP 215 has its own complexity too. Coordinating filters
>>> on both the client (table view) and the server (compaction) is non
>>> trivial. The proposed API includes hard coded client configuration for
>>> each component, which will make upgrading the version of the
>>> compaction strategy complicated, and could lead to incorrect
>>> interpretation of events in the stream. When a single broker is doing
>>> the filtering, versioning is no longer a distributed problem. That
>>> being said, I do not mean to suggest my solution is without
>>> complexity.
>>>
>>> > 4. it is not a "one-way door" decision (we could add the pre-filter
>>> logic
>>> > as well later)
>>>
>>> It's fair to say that we could add it later, but at that point, we
>>> will have added this new API for compaction strategy. Are we confident
>>> that pluggable compaction is independently an important addition to
>>> Pulsar's
>>> features, or would it make sense to make this API only exposed in the
>>> broker?
>>>
>>>
>> The intention is that this compaction feature could be useful for complex
>> user applications (if they are trying to do a similar thing). As I
>> mentioned, this feature is closely tied to the PIP-192 now. We are not
>> planning to expose this feature to users soon unless demanded and proven to
>> be stable.
>>
>>
>>> Thanks,
>>> Michael
>>>
>>>
>>>
>>>
>>>
>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
>>> <he...@streamnative.io.invalid> wrote:
>>> >
>>> > Hi,
>>> >
>>> > Also, I thought about some concurrent assignment scenarios between
>>> > pre-filter vs post-filter.
>>> >
>>> > Example 1: When the topic broadcast is slower than the concurrent
>>> > assignment requests
>>> >
>>> > With pre-filter + two-topics (non-compacted and compacted topics)
>>> > t1: A -> non-compacted topic // broker A published a message to the
>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
>>> > t2: B -> non-compacted topic // broker B published a message, m2:
>>> {broker B
>>> > assigned bundle x to broker C}
>>> > t3: C -> non-compacted topic // broker C published a message, m3:
>>> {broker C
>>> > assigned bundle x to broker B}
>>> > t4: non-compacted topic -> L // leader broker consumed the messages:
>>> m1,m2,
>>> > and m3
>>> > t5: L -> compacted topic // leader compacted the messages and
>>> broadcasted
>>> > m1 to all consumers
>>> > t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>>> >
>>> > With post-filter + a single topic
>>> > t1: A -> topic // broker A published a message to the non-compacted
>>> topic,
>>> > m1: {broker A assigned bundle x to broker A}
>>> > t2: B -> topic // broker B published a message, m2: {broker B assigned
>>> > bundle x to broker C}
>>> > t3: C -> topic // broker C published a message, m3: {broker C assigned
>>> > bundle x to broker B}
>>> > t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2, and
>>> m3
>>> > t5: [A,B,C] -> m1 // broker A,B,C individually compacted the messages
>>> to m1.
>>> >
>>> > Analysis: the "pre-filter + two-topics" option can reduce the number of
>>> > messages to broadcast at the expense of the leader broker compaction.
>>> >
>>> >
>>> > Example 2: When the topic broadcast is faster than the concurrent
>>> > assignment requests
>>> >
>>> > With pre-filter + two-topics (non-compacted and compacted topics)
>>> > t1: A -> non-compacted topic // broker A published a message to the
>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
>>> > t2: non-compacted topic -> L // leader broker consumed the messages: m1
>>> > t3: L -> compacted topic // leader compacted the message and
>>> broadcasted m1
>>> > to all consumers
>>> > t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>>> > t5: A-> own bundle // broker A knows that its assignment has been
>>> accepted,
>>> > so proceeding to own the bundle (meanwhile deferring lookup requests)
>>> > t6: B -> defer client lookups // broker B knows that bundle assignment
>>> is
>>> > running(meanwhile deferring lookup requests)
>>> > t7: C -> defer client lookups // broker C knows that bundle assignment
>>> is
>>> > running(meanwhile deferring lookup requests)
>>> >
>>> > With post-filter + a single topic
>>> > t1: A -> topic // broker A published a message to the non-compacted
>>> topic,
>>> > m1: {broker A assigned bundle x to broker A}
>>> > t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
>>> > t3:  A-> own bundle // broker A knows that its assignment has been
>>> > accepted, so proceeding to own the bundle (meanwhile deferring lookup
>>> > requests)
>>> > t4: B -> defer client lookups // broker B knows that bundle assignment
>>> is
>>> > running(meanwhile deferring lookup requests)
>>> > t5: C -> defer client lookups // broker C knows that bundle assignment
>>> is
>>> > running(meanwhile deferring lookup requests)
>>> >
>>> > Analysis: The "post-filter + a single topic" can perform ok in this
>>> case
>>> > without the additional leader coordination and the secondary topic
>>> because
>>> > the early broadcast can inform all brokers and prevent them from
>>> requesting
>>> > other assignments for the same bundle.
>>> >
>>> > I think the post-filter option is initially not bad because:
>>> >
>>> > 1. it is safe in the worst case (in case the messages are not correctly
>>> > pre-filtered at the leader)
>>> > 2. it performs ok because the early broadcast can prevent
>>> > concurrent assignment requests.
>>> > 3. initially less complex to implement (leaderless conflict resolution
>>> and
>>> > requires a single topic)
>>> > 4. it is not a "one-way door" decision (we could add the pre-filter
>>> logic
>>> > as well later)
>>> >
>>> > Regards,
>>> > Heesung
>>> >
>>> >
>>> >
>>> > On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
>>> heesung.sohn@streamnative.io>
>>> > wrote:
>>> >
>>> > > Hi Michael,
>>> > >
>>> > > For the pre-prefilter(pre-compaction) option,
>>> > > I think the leader requires a write-through cache to compact messages
>>> > > based on the latest states. Otherwise, the leader needs to wait for
>>> the
>>> > > last msg from the (compacted) topic before compacting the next msg
>>> for the
>>> > > same bundle.
>>> > >
>>> > > Pulsar guarantees "a single writer". However, for the worst-case
>>> > > scenario(due to network partitions, bugs in zk or etcd leader
>>> election,
>>> > > bugs in bk, data corruption ), I think it is safe to place the
>>> post-filter
>>> > > on the consumer side(compaction and table views) as well in order to
>>> > > validate the state changes.
>>> > >
>>> > > For the two-topic approach,
>>> > > I think we lose a single linearized view. Could we clarify how to
>>> handle
>>> > > the following(edge cases and failure recovery)?
>>> > > 0. Is the un-compacted topic a persistent topic or a non-persistent
>>> topic?
>>> > > 1. How does the leader recover state from the two topics?
>>> > > 2. How do we handle the case when the leader fails before writing
>>> messages
>>> > > to the compacted topic
>>> > >
>>> > > Regards,
>>> > > Heesung
>>> > >
>>> > > On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
>>> mmarshall@apache.org>
>>> > > wrote:
>>> > >
>>> > >> Sharing some more thoughts. We could alternatively use two topics
>>> > >> instead of one. In this design, the first topic is the unfiltered
>>> > >> write ahead log that represents many writers (brokers) trying to
>>> > >> acquire ownership of bundles. The second topic is the distilled log
>>> > >> that represents the "winners" or the "owners" of the bundles. There
>>> is
>>> > >> a single writer, the leader broker, that reads from the input topic
>>> > >> and writes to the output topic. The first topic is normal and the
>>> > >> second is compacted.
>>> > >>
>>> > >> The primary benefit in a two topic solution is that it is easy for
>>> the
>>> > >> leader broker to trade off ownership without needing to slow down
>>> > >> writes to the input topic. The leader broker will start consuming
>>> from
>>> > >> the input topic when it has fully consumed the table view on the
>>> > >> output topic. In general, I don't think consumers know when they
>>> have
>>> > >> "reached the end of a table view", but we should be able to
>>> trivially
>>> > >> figure this out if we are the topic's only writer and the topic and
>>> > >> writer are collocated on the same broker.
>>> > >>
>>> > >> In that design, it might make sense to use something like the
>>> > >> replication cursor to keep track of this consumer's state.
>>> > >>
>>> > >> - Michael
>>> > >>
>>> > >> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
>>> mmarshall@apache.org>
>>> > >> wrote:
>>> > >> >
>>> > >> > Thanks for your proposal, Heesung.
>>> > >> >
>>> > >> > Fundamentally, we have the problems listed in this PIP because we
>>> have
>>> > >> > multiple writers instead of just one writer. Can we solve this
>>> problem
>>> > >> > by changing our write pattern? What if we use the leader broker
>>> as the
>>> > >> > single writer? That broker would intercept attempts to acquire
>>> > >> > ownership on bundles and would grant ownership to the first
>>> broker to
>>> > >> > claim an unassigned bundle. It could "grant ownership" by letting
>>> the
>>> > >> > first write to claim an unassigned bundle get written to the
>>> ownership
>>> > >> > topic. When a bundle is already owned, the leader won't persist
>>> that
>>> > >> > event to the bookkeeper. In this design, the log becomes a true
>>> > >> > ownership log, which will correctly work with the existing topic
>>> > >> > compaction and table view solutions. My proposal essentially
>>> moves the
>>> > >> > conflict resolution to just before the write, and as a
>>> consequence, it
>>> > >> > greatly reduces the need for post processing of the event log. One
>>> > >> > trade off might be that the leader broker could slow down the
>>> write
>>> > >> > path, but given that the leader would just need to verify the
>>> current
>>> > >> > state of the bundle, I think it'd be performant enough.
>>> > >> >
>>> > >> > Additionally, we'd need the leader broker to be "caught up" on
>>> bundle
>>> > >> > ownership in order to grant ownership of topics, but unless I am
>>> > >> > mistaken, that is already a requirement of the current PIP 192
>>> > >> > paradigm.
>>> > >> >
>>> > >> > Below are some additional thoughts that will be relevant if we
>>> move
>>> > >> > forward with the design as it is currently proposed.
>>> > >> >
>>> > >> > I think it might be helpful to update the title to show that this
>>> > >> > proposal will also affect table view as well. I didn't catch that
>>> at
>>> > >> > first.
>>> > >> >
>>> > >> > Do you have any documentation describing how the
>>> > >> > TopicCompactionStrategy will determine which states are valid in
>>> the
>>> > >> > context of load balancing? I looked at
>>> > >> > https://github.com/apache/pulsar/pull/18195, but I couldn't seem
>>> to
>>> > >> > find anything for it. That would help make this proposal less
>>> > >> > abstract.
>>> > >> >
>>> > >> > The proposed API seems very tied to the needs of PIP 192. For
>>> example,
>>> > >> > `isValid` is not a term I associate with topic compaction. The
>>> > >> > fundamental question for compaction is which value to keep (or
>>> build a
>>> > >> > new value). I think we might be able to simplify the API by
>>> replacing
>>> > >> > the "isValid", "isMergeEnabled", and "merge" methods with a single
>>> > >> > method that lets the implementation handle one or all tasks. That
>>> > >> > would also remove the need to deserialize payloads multiple times
>>> too.
>>> > >> >
>>> > >> > I also feel like mentioning that after working with the PIP 105
>>> broker
>>> > >> > side filtering, I think we should avoid running UDFs in the
>>> broker as
>>> > >> > much as possible. (I do not consider the load balancing logic to
>>> be a
>>> > >> > UDF here.) I think it would be worth not making this a user facing
>>> > >> > feature unless there is demand for real use cases.
>>> > >> >
>>> > >> > Thanks!
>>> > >> > Michael
>>> > >> >
>>> > >> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org> wrote:
>>> > >> > >
>>> > >> > > +1(non-binding)
>>> > >> > >
>>> > >> > > thanks,
>>> > >> > > bo
>>> > >> > >
>>> > >> > > Heesung Sohn <he...@streamnative.io.invalid>
>>> 于2022年10月19日周三
>>> > >> 07:54写道:
>>> > >> > > >
>>> > >> > > > Hi pulsar-dev community,
>>> > >> > > >
>>> > >> > > > I raised a pip to discuss : PIP-215: Configurable Topic
>>> Compaction
>>> > >> Strategy
>>> > >> > > >
>>> > >> > > > PIP link: https://github.com/apache/pulsar/issues/18099
>>> > >> > > >
>>> > >> > > > Regards,
>>> > >> > > > Heesung
>>> > >>
>>> > >
>>>
>>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Hi Machel,

Here are my additional comments regarding your earlier email.

- I updated the PIP title to show that this will impact table view as well.

- PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the general
idea of the states and their actions, and I defined the actual states here
in the PR,
https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a.
I
will further clarify the bundle state data validation logic when
introducing `BundleStateCompactionStrategy` class. This PIP is to support
CompactionStrategy in general.

- Agreed with your point that we should merge CompactionStrategy APIs. I
updated the interface proposal in the PIP. I replaced `"isValid",
"isMergeEnabled", and "merge"` apis with "compact" api.


Thanks,
Heesung


On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <he...@streamnative.io>
wrote:

> Hi,
> Thank you for the great comments.
> Please find my comments inline too.
>
> Regards,
> Heesung
>
> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <mm...@apache.org>
> wrote:
>
>> > I think we lose a single linearized view.
>>
>> Which linearized view are we losing, and what is the role of that
>> linearized view? I think I might be missing why it is important. I
>> agree that consumers won't know about each unsuccessful attempted
>> acquisition of a bundle, but that seems like unnecessary information
>> to broadcast to every broker in the cluster.
>>
>>
>
> PIP-192 proposes an assignment, transfer, and split protocol(multi-phase
> state changes), relying on early broadcast across brokers, and all brokers
> react to their clients according to the state change notifications --
> brokers could defer any client lookups for bundle x if an
> assignment/transfer/split is ongoing for x(broadcasted early in the topic).
> One early broadcast example is the one that I discussed above, `When the
> topic broadcast is faster than the concurrent assignment requests.` I think
> the prefilter could delay this early broadcast, as it needs to go through
> the additional single-leader compaction path.
>
> The bundle state recovery process is simpler by a single linearized view.
>
> The single linearized view can be easier to debug bundle states. We can
> more easily track where the assignment requests come from and how it is
> compacted in a single linearized view.
>
>
>
>> > I think the leader requires a write-through cache to compact messages
>> based
>> > on the latest states.
>>
> This brings up an important point that I would like to clarify. If we
>> trust the write ahead log as the source of truth, what happens when a
>> bundle has been validly owned by multiple brokers? As a broker starts
>> and consumes from the compacted topic, how do we prevent it from
>> incorrectly thinking that it owns a bundle for some short time period
>> in the case that the ownership topic hasn't yet been compacted to
>> remove old ownership state?
>>
>
> Since the multi-phase transfer protocol involves the source and
> destination broker's actions, the successful transfer should get the source
> and destination broker to have the (near) latest state. For example, if
> some brokers have old ownership states(network partitioned or delayed),
> they will redirect clients to the source(old) broker. However, by the
> transfer protocol, the source broker should have the latest state, so it
> can redirect the client again to the destination broker.
>
> When a broker restarts, it won't start until its BSC state to the (near)
> latest (til the last known messageId at that time).
>
>
>> > Pulsar guarantees "a single writer".
>>
>> I didn't think we were using a single writer in the PIP 192 design. I
>> thought we had many producers sending events to a compacted topic. My
>> proposal would still have many producers, but the writer to bookkeeper
>> would act as the single writer. It would technically be distinct from
>> a normal Pulsar topic producer.
>>
>> I should highlight that I am only proposing "broker filtering before
>> write" in the context of PIP 192 and as an alternative to adding
>> pluggable compaction strategies. It would not be a generic feature.
>>
>>
> I was worried about the worst case where two producers(leaders) happen to
> write the compacted topic (although Pulsar can guarantee "a single writer"
> or "a single producer" for a topic in normal situations).
>
>
>
>> > Could we clarify how to handle
>> > the following(edge cases and failure recovery)?
>> > 0. Is the un-compacted topic a persistent topic or a non-persistent
>> topic?
>>
>> It is a persistent topic.
>>
>> > 1. How does the leader recover state from the two topics?
>>
>> A leader would recover state by first consuming the whole compacted
>> topic and then by consuming from the current location of a cursor on
>> the first input topic. As stated elsewhere, this introduces latency
>> and could be an issue.
>>
>> > 2. How do we handle the case when the leader fails before writing
>> messages
>> > to the compacted topic
>>
>> The leader would not acknowledge the message on the input topic until
>> it has successfully persisted the event on the compacted topic.
>> Publishing the same event to a compacted topic multiple times is
>> idempotent, so there is no risk of lost state. The real risk is
>> latency. However, I think we might have similar (though not the same)
>> latency risks in the current solution.
>>
>> > Analysis: the "pre-filter + two-topics" option can reduce the number of
>> > messages to broadcast at the expense of the leader broker compaction.
>>
>> My primary point is that with this PIP's design, the filter logic is
>> run on every broker and again during topic compaction. With the
>> alternative design, the filter is run once.
>>
>> Thank you for the clarification.
>
> I think the difference is that the post-filter is an optimistic approach
> as it optimistically relies on the "broadcast-filter" effect(brokers will
> defer client lookups if notified ahead that any assignment is ongoing for
> bundle x). Yes, in the worst case, if the broadcast is slower, each broker
> needs to individually compact the conflicting assignment requests.
>
> Conversely, one downside of the pessimistic approach (single leader
> pre-filter) is that when there are not many conflict concurrent assignment
> requests(assign for bundle a, assign for bundle b, assign for bundle c...),
> the requests need to redundantly go through the leader compaction.
>
>
>
>> > 3. initially less complex to implement (leaderless conflict resolution
>> and
>> > requires a single topic)
>>
>> PIP 215 has its own complexity too. Coordinating filters
>> on both the client (table view) and the server (compaction) is non
>> trivial. The proposed API includes hard coded client configuration for
>> each component, which will make upgrading the version of the
>> compaction strategy complicated, and could lead to incorrect
>> interpretation of events in the stream. When a single broker is doing
>> the filtering, versioning is no longer a distributed problem. That
>> being said, I do not mean to suggest my solution is without
>> complexity.
>>
>> > 4. it is not a "one-way door" decision (we could add the pre-filter
>> logic
>> > as well later)
>>
>> It's fair to say that we could add it later, but at that point, we
>> will have added this new API for compaction strategy. Are we confident
>> that pluggable compaction is independently an important addition to
>> Pulsar's
>> features, or would it make sense to make this API only exposed in the
>> broker?
>>
>>
> The intention is that this compaction feature could be useful for complex
> user applications (if they are trying to do a similar thing). As I
> mentioned, this feature is closely tied to the PIP-192 now. We are not
> planning to expose this feature to users soon unless demanded and proven to
> be stable.
>
>
>> Thanks,
>> Michael
>>
>>
>>
>>
>>
>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
>> <he...@streamnative.io.invalid> wrote:
>> >
>> > Hi,
>> >
>> > Also, I thought about some concurrent assignment scenarios between
>> > pre-filter vs post-filter.
>> >
>> > Example 1: When the topic broadcast is slower than the concurrent
>> > assignment requests
>> >
>> > With pre-filter + two-topics (non-compacted and compacted topics)
>> > t1: A -> non-compacted topic // broker A published a message to the
>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
>> > t2: B -> non-compacted topic // broker B published a message, m2:
>> {broker B
>> > assigned bundle x to broker C}
>> > t3: C -> non-compacted topic // broker C published a message, m3:
>> {broker C
>> > assigned bundle x to broker B}
>> > t4: non-compacted topic -> L // leader broker consumed the messages:
>> m1,m2,
>> > and m3
>> > t5: L -> compacted topic // leader compacted the messages and
>> broadcasted
>> > m1 to all consumers
>> > t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>> >
>> > With post-filter + a single topic
>> > t1: A -> topic // broker A published a message to the non-compacted
>> topic,
>> > m1: {broker A assigned bundle x to broker A}
>> > t2: B -> topic // broker B published a message, m2: {broker B assigned
>> > bundle x to broker C}
>> > t3: C -> topic // broker C published a message, m3: {broker C assigned
>> > bundle x to broker B}
>> > t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2, and
>> m3
>> > t5: [A,B,C] -> m1 // broker A,B,C individually compacted the messages
>> to m1.
>> >
>> > Analysis: the "pre-filter + two-topics" option can reduce the number of
>> > messages to broadcast at the expense of the leader broker compaction.
>> >
>> >
>> > Example 2: When the topic broadcast is faster than the concurrent
>> > assignment requests
>> >
>> > With pre-filter + two-topics (non-compacted and compacted topics)
>> > t1: A -> non-compacted topic // broker A published a message to the
>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
>> > t2: non-compacted topic -> L // leader broker consumed the messages: m1
>> > t3: L -> compacted topic // leader compacted the message and
>> broadcasted m1
>> > to all consumers
>> > t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>> > t5: A-> own bundle // broker A knows that its assignment has been
>> accepted,
>> > so proceeding to own the bundle (meanwhile deferring lookup requests)
>> > t6: B -> defer client lookups // broker B knows that bundle assignment
>> is
>> > running(meanwhile deferring lookup requests)
>> > t7: C -> defer client lookups // broker C knows that bundle assignment
>> is
>> > running(meanwhile deferring lookup requests)
>> >
>> > With post-filter + a single topic
>> > t1: A -> topic // broker A published a message to the non-compacted
>> topic,
>> > m1: {broker A assigned bundle x to broker A}
>> > t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
>> > t3:  A-> own bundle // broker A knows that its assignment has been
>> > accepted, so proceeding to own the bundle (meanwhile deferring lookup
>> > requests)
>> > t4: B -> defer client lookups // broker B knows that bundle assignment
>> is
>> > running(meanwhile deferring lookup requests)
>> > t5: C -> defer client lookups // broker C knows that bundle assignment
>> is
>> > running(meanwhile deferring lookup requests)
>> >
>> > Analysis: The "post-filter + a single topic" can perform ok in this case
>> > without the additional leader coordination and the secondary topic
>> because
>> > the early broadcast can inform all brokers and prevent them from
>> requesting
>> > other assignments for the same bundle.
>> >
>> > I think the post-filter option is initially not bad because:
>> >
>> > 1. it is safe in the worst case (in case the messages are not correctly
>> > pre-filtered at the leader)
>> > 2. it performs ok because the early broadcast can prevent
>> > concurrent assignment requests.
>> > 3. initially less complex to implement (leaderless conflict resolution
>> and
>> > requires a single topic)
>> > 4. it is not a "one-way door" decision (we could add the pre-filter
>> logic
>> > as well later)
>> >
>> > Regards,
>> > Heesung
>> >
>> >
>> >
>> > On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
>> heesung.sohn@streamnative.io>
>> > wrote:
>> >
>> > > Hi Michael,
>> > >
>> > > For the pre-prefilter(pre-compaction) option,
>> > > I think the leader requires a write-through cache to compact messages
>> > > based on the latest states. Otherwise, the leader needs to wait for
>> the
>> > > last msg from the (compacted) topic before compacting the next msg
>> for the
>> > > same bundle.
>> > >
>> > > Pulsar guarantees "a single writer". However, for the worst-case
>> > > scenario(due to network partitions, bugs in zk or etcd leader
>> election,
>> > > bugs in bk, data corruption ), I think it is safe to place the
>> post-filter
>> > > on the consumer side(compaction and table views) as well in order to
>> > > validate the state changes.
>> > >
>> > > For the two-topic approach,
>> > > I think we lose a single linearized view. Could we clarify how to
>> handle
>> > > the following(edge cases and failure recovery)?
>> > > 0. Is the un-compacted topic a persistent topic or a non-persistent
>> topic?
>> > > 1. How does the leader recover state from the two topics?
>> > > 2. How do we handle the case when the leader fails before writing
>> messages
>> > > to the compacted topic
>> > >
>> > > Regards,
>> > > Heesung
>> > >
>> > > On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
>> mmarshall@apache.org>
>> > > wrote:
>> > >
>> > >> Sharing some more thoughts. We could alternatively use two topics
>> > >> instead of one. In this design, the first topic is the unfiltered
>> > >> write ahead log that represents many writers (brokers) trying to
>> > >> acquire ownership of bundles. The second topic is the distilled log
>> > >> that represents the "winners" or the "owners" of the bundles. There
>> is
>> > >> a single writer, the leader broker, that reads from the input topic
>> > >> and writes to the output topic. The first topic is normal and the
>> > >> second is compacted.
>> > >>
>> > >> The primary benefit in a two topic solution is that it is easy for
>> the
>> > >> leader broker to trade off ownership without needing to slow down
>> > >> writes to the input topic. The leader broker will start consuming
>> from
>> > >> the input topic when it has fully consumed the table view on the
>> > >> output topic. In general, I don't think consumers know when they have
>> > >> "reached the end of a table view", but we should be able to trivially
>> > >> figure this out if we are the topic's only writer and the topic and
>> > >> writer are collocated on the same broker.
>> > >>
>> > >> In that design, it might make sense to use something like the
>> > >> replication cursor to keep track of this consumer's state.
>> > >>
>> > >> - Michael
>> > >>
>> > >> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
>> mmarshall@apache.org>
>> > >> wrote:
>> > >> >
>> > >> > Thanks for your proposal, Heesung.
>> > >> >
>> > >> > Fundamentally, we have the problems listed in this PIP because we
>> have
>> > >> > multiple writers instead of just one writer. Can we solve this
>> problem
>> > >> > by changing our write pattern? What if we use the leader broker as
>> the
>> > >> > single writer? That broker would intercept attempts to acquire
>> > >> > ownership on bundles and would grant ownership to the first broker
>> to
>> > >> > claim an unassigned bundle. It could "grant ownership" by letting
>> the
>> > >> > first write to claim an unassigned bundle get written to the
>> ownership
>> > >> > topic. When a bundle is already owned, the leader won't persist
>> that
>> > >> > event to the bookkeeper. In this design, the log becomes a true
>> > >> > ownership log, which will correctly work with the existing topic
>> > >> > compaction and table view solutions. My proposal essentially moves
>> the
>> > >> > conflict resolution to just before the write, and as a
>> consequence, it
>> > >> > greatly reduces the need for post processing of the event log. One
>> > >> > trade off might be that the leader broker could slow down the write
>> > >> > path, but given that the leader would just need to verify the
>> current
>> > >> > state of the bundle, I think it'd be performant enough.
>> > >> >
>> > >> > Additionally, we'd need the leader broker to be "caught up" on
>> bundle
>> > >> > ownership in order to grant ownership of topics, but unless I am
>> > >> > mistaken, that is already a requirement of the current PIP 192
>> > >> > paradigm.
>> > >> >
>> > >> > Below are some additional thoughts that will be relevant if we move
>> > >> > forward with the design as it is currently proposed.
>> > >> >
>> > >> > I think it might be helpful to update the title to show that this
>> > >> > proposal will also affect table view as well. I didn't catch that
>> at
>> > >> > first.
>> > >> >
>> > >> > Do you have any documentation describing how the
>> > >> > TopicCompactionStrategy will determine which states are valid in
>> the
>> > >> > context of load balancing? I looked at
>> > >> > https://github.com/apache/pulsar/pull/18195, but I couldn't seem
>> to
>> > >> > find anything for it. That would help make this proposal less
>> > >> > abstract.
>> > >> >
>> > >> > The proposed API seems very tied to the needs of PIP 192. For
>> example,
>> > >> > `isValid` is not a term I associate with topic compaction. The
>> > >> > fundamental question for compaction is which value to keep (or
>> build a
>> > >> > new value). I think we might be able to simplify the API by
>> replacing
>> > >> > the "isValid", "isMergeEnabled", and "merge" methods with a single
>> > >> > method that lets the implementation handle one or all tasks. That
>> > >> > would also remove the need to deserialize payloads multiple times
>> too.
>> > >> >
>> > >> > I also feel like mentioning that after working with the PIP 105
>> broker
>> > >> > side filtering, I think we should avoid running UDFs in the broker
>> as
>> > >> > much as possible. (I do not consider the load balancing logic to
>> be a
>> > >> > UDF here.) I think it would be worth not making this a user facing
>> > >> > feature unless there is demand for real use cases.
>> > >> >
>> > >> > Thanks!
>> > >> > Michael
>> > >> >
>> > >> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org> wrote:
>> > >> > >
>> > >> > > +1(non-binding)
>> > >> > >
>> > >> > > thanks,
>> > >> > > bo
>> > >> > >
>> > >> > > Heesung Sohn <he...@streamnative.io.invalid>
>> 于2022年10月19日周三
>> > >> 07:54写道:
>> > >> > > >
>> > >> > > > Hi pulsar-dev community,
>> > >> > > >
>> > >> > > > I raised a pip to discuss : PIP-215: Configurable Topic
>> Compaction
>> > >> Strategy
>> > >> > > >
>> > >> > > > PIP link: https://github.com/apache/pulsar/issues/18099
>> > >> > > >
>> > >> > > > Regards,
>> > >> > > > Heesung
>> > >>
>> > >
>>
>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Hi,
Thank you for the great comments.
Please find my comments inline too.

Regards,
Heesung

On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <mm...@apache.org>
wrote:

> > I think we lose a single linearized view.
>
> Which linearized view are we losing, and what is the role of that
> linearized view? I think I might be missing why it is important. I
> agree that consumers won't know about each unsuccessful attempted
> acquisition of a bundle, but that seems like unnecessary information
> to broadcast to every broker in the cluster.
>
>

PIP-192 proposes an assignment, transfer, and split protocol(multi-phase
state changes), relying on early broadcast across brokers, and all brokers
react to their clients according to the state change notifications --
brokers could defer any client lookups for bundle x if an
assignment/transfer/split is ongoing for x(broadcasted early in the topic).
One early broadcast example is the one that I discussed above, `When the
topic broadcast is faster than the concurrent assignment requests.` I think
the prefilter could delay this early broadcast, as it needs to go through
the additional single-leader compaction path.

The bundle state recovery process is simpler by a single linearized view.

The single linearized view can be easier to debug bundle states. We can
more easily track where the assignment requests come from and how it is
compacted in a single linearized view.



> > I think the leader requires a write-through cache to compact messages
> based
> > on the latest states.
>
This brings up an important point that I would like to clarify. If we
> trust the write ahead log as the source of truth, what happens when a
> bundle has been validly owned by multiple brokers? As a broker starts
> and consumes from the compacted topic, how do we prevent it from
> incorrectly thinking that it owns a bundle for some short time period
> in the case that the ownership topic hasn't yet been compacted to
> remove old ownership state?
>

Since the multi-phase transfer protocol involves the source and destination
broker's actions, the successful transfer should get the source and
destination broker to have the (near) latest state. For example, if some
brokers have old ownership states(network partitioned or delayed), they
will redirect clients to the source(old) broker. However, by the transfer
protocol, the source broker should have the latest state, so it can
redirect the client again to the destination broker.

When a broker restarts, it won't start until its BSC state to the (near)
latest (til the last known messageId at that time).


> > Pulsar guarantees "a single writer".
>
> I didn't think we were using a single writer in the PIP 192 design. I
> thought we had many producers sending events to a compacted topic. My
> proposal would still have many producers, but the writer to bookkeeper
> would act as the single writer. It would technically be distinct from
> a normal Pulsar topic producer.
>
> I should highlight that I am only proposing "broker filtering before
> write" in the context of PIP 192 and as an alternative to adding
> pluggable compaction strategies. It would not be a generic feature.
>
>
I was worried about the worst case where two producers(leaders) happen to
write the compacted topic (although Pulsar can guarantee "a single writer"
or "a single producer" for a topic in normal situations).



> > Could we clarify how to handle
> > the following(edge cases and failure recovery)?
> > 0. Is the un-compacted topic a persistent topic or a non-persistent
> topic?
>
> It is a persistent topic.
>
> > 1. How does the leader recover state from the two topics?
>
> A leader would recover state by first consuming the whole compacted
> topic and then by consuming from the current location of a cursor on
> the first input topic. As stated elsewhere, this introduces latency
> and could be an issue.
>
> > 2. How do we handle the case when the leader fails before writing
> messages
> > to the compacted topic
>
> The leader would not acknowledge the message on the input topic until
> it has successfully persisted the event on the compacted topic.
> Publishing the same event to a compacted topic multiple times is
> idempotent, so there is no risk of lost state. The real risk is
> latency. However, I think we might have similar (though not the same)
> latency risks in the current solution.
>
> > Analysis: the "pre-filter + two-topics" option can reduce the number of
> > messages to broadcast at the expense of the leader broker compaction.
>
> My primary point is that with this PIP's design, the filter logic is
> run on every broker and again during topic compaction. With the
> alternative design, the filter is run once.
>
> Thank you for the clarification.

I think the difference is that the post-filter is an optimistic approach as
it optimistically relies on the "broadcast-filter" effect(brokers will
defer client lookups if notified ahead that any assignment is ongoing for
bundle x). Yes, in the worst case, if the broadcast is slower, each broker
needs to individually compact the conflicting assignment requests.

Conversely, one downside of the pessimistic approach (single leader
pre-filter) is that when there are not many conflict concurrent assignment
requests(assign for bundle a, assign for bundle b, assign for bundle c...),
the requests need to redundantly go through the leader compaction.



> > 3. initially less complex to implement (leaderless conflict resolution
> and
> > requires a single topic)
>
> PIP 215 has its own complexity too. Coordinating filters
> on both the client (table view) and the server (compaction) is non
> trivial. The proposed API includes hard coded client configuration for
> each component, which will make upgrading the version of the
> compaction strategy complicated, and could lead to incorrect
> interpretation of events in the stream. When a single broker is doing
> the filtering, versioning is no longer a distributed problem. That
> being said, I do not mean to suggest my solution is without
> complexity.
>
> > 4. it is not a "one-way door" decision (we could add the pre-filter logic
> > as well later)
>
> It's fair to say that we could add it later, but at that point, we
> will have added this new API for compaction strategy. Are we confident
> that pluggable compaction is independently an important addition to
> Pulsar's
> features, or would it make sense to make this API only exposed in the
> broker?
>
>
The intention is that this compaction feature could be useful for complex
user applications (if they are trying to do a similar thing). As I
mentioned, this feature is closely tied to the PIP-192 now. We are not
planning to expose this feature to users soon unless demanded and proven to
be stable.


> Thanks,
> Michael
>
>
>
>
>
> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
> <he...@streamnative.io.invalid> wrote:
> >
> > Hi,
> >
> > Also, I thought about some concurrent assignment scenarios between
> > pre-filter vs post-filter.
> >
> > Example 1: When the topic broadcast is slower than the concurrent
> > assignment requests
> >
> > With pre-filter + two-topics (non-compacted and compacted topics)
> > t1: A -> non-compacted topic // broker A published a message to the
> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
> > t2: B -> non-compacted topic // broker B published a message, m2:
> {broker B
> > assigned bundle x to broker C}
> > t3: C -> non-compacted topic // broker C published a message, m3:
> {broker C
> > assigned bundle x to broker B}
> > t4: non-compacted topic -> L // leader broker consumed the messages:
> m1,m2,
> > and m3
> > t5: L -> compacted topic // leader compacted the messages and broadcasted
> > m1 to all consumers
> > t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
> >
> > With post-filter + a single topic
> > t1: A -> topic // broker A published a message to the non-compacted
> topic,
> > m1: {broker A assigned bundle x to broker A}
> > t2: B -> topic // broker B published a message, m2: {broker B assigned
> > bundle x to broker C}
> > t3: C -> topic // broker C published a message, m3: {broker C assigned
> > bundle x to broker B}
> > t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2, and m3
> > t5: [A,B,C] -> m1 // broker A,B,C individually compacted the messages to
> m1.
> >
> > Analysis: the "pre-filter + two-topics" option can reduce the number of
> > messages to broadcast at the expense of the leader broker compaction.
> >
> >
> > Example 2: When the topic broadcast is faster than the concurrent
> > assignment requests
> >
> > With pre-filter + two-topics (non-compacted and compacted topics)
> > t1: A -> non-compacted topic // broker A published a message to the
> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
> > t2: non-compacted topic -> L // leader broker consumed the messages: m1
> > t3: L -> compacted topic // leader compacted the message and broadcasted
> m1
> > to all consumers
> > t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
> > t5: A-> own bundle // broker A knows that its assignment has been
> accepted,
> > so proceeding to own the bundle (meanwhile deferring lookup requests)
> > t6: B -> defer client lookups // broker B knows that bundle assignment is
> > running(meanwhile deferring lookup requests)
> > t7: C -> defer client lookups // broker C knows that bundle assignment is
> > running(meanwhile deferring lookup requests)
> >
> > With post-filter + a single topic
> > t1: A -> topic // broker A published a message to the non-compacted
> topic,
> > m1: {broker A assigned bundle x to broker A}
> > t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
> > t3:  A-> own bundle // broker A knows that its assignment has been
> > accepted, so proceeding to own the bundle (meanwhile deferring lookup
> > requests)
> > t4: B -> defer client lookups // broker B knows that bundle assignment is
> > running(meanwhile deferring lookup requests)
> > t5: C -> defer client lookups // broker C knows that bundle assignment is
> > running(meanwhile deferring lookup requests)
> >
> > Analysis: The "post-filter + a single topic" can perform ok in this case
> > without the additional leader coordination and the secondary topic
> because
> > the early broadcast can inform all brokers and prevent them from
> requesting
> > other assignments for the same bundle.
> >
> > I think the post-filter option is initially not bad because:
> >
> > 1. it is safe in the worst case (in case the messages are not correctly
> > pre-filtered at the leader)
> > 2. it performs ok because the early broadcast can prevent
> > concurrent assignment requests.
> > 3. initially less complex to implement (leaderless conflict resolution
> and
> > requires a single topic)
> > 4. it is not a "one-way door" decision (we could add the pre-filter logic
> > as well later)
> >
> > Regards,
> > Heesung
> >
> >
> >
> > On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
> heesung.sohn@streamnative.io>
> > wrote:
> >
> > > Hi Michael,
> > >
> > > For the pre-prefilter(pre-compaction) option,
> > > I think the leader requires a write-through cache to compact messages
> > > based on the latest states. Otherwise, the leader needs to wait for the
> > > last msg from the (compacted) topic before compacting the next msg for
> the
> > > same bundle.
> > >
> > > Pulsar guarantees "a single writer". However, for the worst-case
> > > scenario(due to network partitions, bugs in zk or etcd leader election,
> > > bugs in bk, data corruption ), I think it is safe to place the
> post-filter
> > > on the consumer side(compaction and table views) as well in order to
> > > validate the state changes.
> > >
> > > For the two-topic approach,
> > > I think we lose a single linearized view. Could we clarify how to
> handle
> > > the following(edge cases and failure recovery)?
> > > 0. Is the un-compacted topic a persistent topic or a non-persistent
> topic?
> > > 1. How does the leader recover state from the two topics?
> > > 2. How do we handle the case when the leader fails before writing
> messages
> > > to the compacted topic
> > >
> > > Regards,
> > > Heesung
> > >
> > > On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <mmarshall@apache.org
> >
> > > wrote:
> > >
> > >> Sharing some more thoughts. We could alternatively use two topics
> > >> instead of one. In this design, the first topic is the unfiltered
> > >> write ahead log that represents many writers (brokers) trying to
> > >> acquire ownership of bundles. The second topic is the distilled log
> > >> that represents the "winners" or the "owners" of the bundles. There is
> > >> a single writer, the leader broker, that reads from the input topic
> > >> and writes to the output topic. The first topic is normal and the
> > >> second is compacted.
> > >>
> > >> The primary benefit in a two topic solution is that it is easy for the
> > >> leader broker to trade off ownership without needing to slow down
> > >> writes to the input topic. The leader broker will start consuming from
> > >> the input topic when it has fully consumed the table view on the
> > >> output topic. In general, I don't think consumers know when they have
> > >> "reached the end of a table view", but we should be able to trivially
> > >> figure this out if we are the topic's only writer and the topic and
> > >> writer are collocated on the same broker.
> > >>
> > >> In that design, it might make sense to use something like the
> > >> replication cursor to keep track of this consumer's state.
> > >>
> > >> - Michael
> > >>
> > >> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
> mmarshall@apache.org>
> > >> wrote:
> > >> >
> > >> > Thanks for your proposal, Heesung.
> > >> >
> > >> > Fundamentally, we have the problems listed in this PIP because we
> have
> > >> > multiple writers instead of just one writer. Can we solve this
> problem
> > >> > by changing our write pattern? What if we use the leader broker as
> the
> > >> > single writer? That broker would intercept attempts to acquire
> > >> > ownership on bundles and would grant ownership to the first broker
> to
> > >> > claim an unassigned bundle. It could "grant ownership" by letting
> the
> > >> > first write to claim an unassigned bundle get written to the
> ownership
> > >> > topic. When a bundle is already owned, the leader won't persist that
> > >> > event to the bookkeeper. In this design, the log becomes a true
> > >> > ownership log, which will correctly work with the existing topic
> > >> > compaction and table view solutions. My proposal essentially moves
> the
> > >> > conflict resolution to just before the write, and as a consequence,
> it
> > >> > greatly reduces the need for post processing of the event log. One
> > >> > trade off might be that the leader broker could slow down the write
> > >> > path, but given that the leader would just need to verify the
> current
> > >> > state of the bundle, I think it'd be performant enough.
> > >> >
> > >> > Additionally, we'd need the leader broker to be "caught up" on
> bundle
> > >> > ownership in order to grant ownership of topics, but unless I am
> > >> > mistaken, that is already a requirement of the current PIP 192
> > >> > paradigm.
> > >> >
> > >> > Below are some additional thoughts that will be relevant if we move
> > >> > forward with the design as it is currently proposed.
> > >> >
> > >> > I think it might be helpful to update the title to show that this
> > >> > proposal will also affect table view as well. I didn't catch that at
> > >> > first.
> > >> >
> > >> > Do you have any documentation describing how the
> > >> > TopicCompactionStrategy will determine which states are valid in the
> > >> > context of load balancing? I looked at
> > >> > https://github.com/apache/pulsar/pull/18195, but I couldn't seem to
> > >> > find anything for it. That would help make this proposal less
> > >> > abstract.
> > >> >
> > >> > The proposed API seems very tied to the needs of PIP 192. For
> example,
> > >> > `isValid` is not a term I associate with topic compaction. The
> > >> > fundamental question for compaction is which value to keep (or
> build a
> > >> > new value). I think we might be able to simplify the API by
> replacing
> > >> > the "isValid", "isMergeEnabled", and "merge" methods with a single
> > >> > method that lets the implementation handle one or all tasks. That
> > >> > would also remove the need to deserialize payloads multiple times
> too.
> > >> >
> > >> > I also feel like mentioning that after working with the PIP 105
> broker
> > >> > side filtering, I think we should avoid running UDFs in the broker
> as
> > >> > much as possible. (I do not consider the load balancing logic to be
> a
> > >> > UDF here.) I think it would be worth not making this a user facing
> > >> > feature unless there is demand for real use cases.
> > >> >
> > >> > Thanks!
> > >> > Michael
> > >> >
> > >> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org> wrote:
> > >> > >
> > >> > > +1(non-binding)
> > >> > >
> > >> > > thanks,
> > >> > > bo
> > >> > >
> > >> > > Heesung Sohn <he...@streamnative.io.invalid>
> 于2022年10月19日周三
> > >> 07:54写道:
> > >> > > >
> > >> > > > Hi pulsar-dev community,
> > >> > > >
> > >> > > > I raised a pip to discuss : PIP-215: Configurable Topic
> Compaction
> > >> Strategy
> > >> > > >
> > >> > > > PIP link: https://github.com/apache/pulsar/issues/18099
> > >> > > >
> > >> > > > Regards,
> > >> > > > Heesung
> > >>
> > >
>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Michael Marshall <mm...@apache.org>.
> I think we lose a single linearized view.

Which linearized view are we losing, and what is the role of that
linearized view? I think I might be missing why it is important. I
agree that consumers won't know about each unsuccessful attempted
acquisition of a bundle, but that seems like unnecessary information
to broadcast to every broker in the cluster.

> I think the leader requires a write-through cache to compact messages based
> on the latest states.

This brings up an important point that I would like to clarify. If we
trust the write ahead log as the source of truth, what happens when a
bundle has been validly owned by multiple brokers? As a broker starts
and consumes from the compacted topic, how do we prevent it from
incorrectly thinking that it owns a bundle for some short time period
in the case that the ownership topic hasn't yet been compacted to
remove old ownership state?

> Pulsar guarantees "a single writer".

I didn't think we were using a single writer in the PIP 192 design. I
thought we had many producers sending events to a compacted topic. My
proposal would still have many producers, but the writer to bookkeeper
would act as the single writer. It would technically be distinct from
a normal Pulsar topic producer.

I should highlight that I am only proposing "broker filtering before
write" in the context of PIP 192 and as an alternative to adding
pluggable compaction strategies. It would not be a generic feature.

> Could we clarify how to handle
> the following(edge cases and failure recovery)?
> 0. Is the un-compacted topic a persistent topic or a non-persistent topic?

It is a persistent topic.

> 1. How does the leader recover state from the two topics?

A leader would recover state by first consuming the whole compacted
topic and then by consuming from the current location of a cursor on
the first input topic. As stated elsewhere, this introduces latency
and could be an issue.

> 2. How do we handle the case when the leader fails before writing messages
> to the compacted topic

The leader would not acknowledge the message on the input topic until
it has successfully persisted the event on the compacted topic.
Publishing the same event to a compacted topic multiple times is
idempotent, so there is no risk of lost state. The real risk is
latency. However, I think we might have similar (though not the same)
latency risks in the current solution.

> Analysis: the "pre-filter + two-topics" option can reduce the number of
> messages to broadcast at the expense of the leader broker compaction.

My primary point is that with this PIP's design, the filter logic is
run on every broker and again during topic compaction. With the
alternative design, the filter is run once.

> 3. initially less complex to implement (leaderless conflict resolution and
> requires a single topic)

PIP 215 has its own complexity too. Coordinating filters
on both the client (table view) and the server (compaction) is non
trivial. The proposed API includes hard coded client configuration for
each component, which will make upgrading the version of the
compaction strategy complicated, and could lead to incorrect
interpretation of events in the stream. When a single broker is doing
the filtering, versioning is no longer a distributed problem. That
being said, I do not mean to suggest my solution is without
complexity.

> 4. it is not a "one-way door" decision (we could add the pre-filter logic
> as well later)

It's fair to say that we could add it later, but at that point, we
will have added this new API for compaction strategy. Are we confident
that pluggable compaction is independently an important addition to Pulsar's
features, or would it make sense to make this API only exposed in the broker?

Thanks,
Michael





On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
<he...@streamnative.io.invalid> wrote:
>
> Hi,
>
> Also, I thought about some concurrent assignment scenarios between
> pre-filter vs post-filter.
>
> Example 1: When the topic broadcast is slower than the concurrent
> assignment requests
>
> With pre-filter + two-topics (non-compacted and compacted topics)
> t1: A -> non-compacted topic // broker A published a message to the
> non-compacted topic, m1: {broker A assigned bundle x to broker A}
> t2: B -> non-compacted topic // broker B published a message, m2: {broker B
> assigned bundle x to broker C}
> t3: C -> non-compacted topic // broker C published a message, m3: {broker C
> assigned bundle x to broker B}
> t4: non-compacted topic -> L // leader broker consumed the messages: m1,m2,
> and m3
> t5: L -> compacted topic // leader compacted the messages and broadcasted
> m1 to all consumers
> t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>
> With post-filter + a single topic
> t1: A -> topic // broker A published a message to the non-compacted topic,
> m1: {broker A assigned bundle x to broker A}
> t2: B -> topic // broker B published a message, m2: {broker B assigned
> bundle x to broker C}
> t3: C -> topic // broker C published a message, m3: {broker C assigned
> bundle x to broker B}
> t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2, and m3
> t5: [A,B,C] -> m1 // broker A,B,C individually compacted the messages to m1.
>
> Analysis: the "pre-filter + two-topics" option can reduce the number of
> messages to broadcast at the expense of the leader broker compaction.
>
>
> Example 2: When the topic broadcast is faster than the concurrent
> assignment requests
>
> With pre-filter + two-topics (non-compacted and compacted topics)
> t1: A -> non-compacted topic // broker A published a message to the
> non-compacted topic, m1: {broker A assigned bundle x to broker A}
> t2: non-compacted topic -> L // leader broker consumed the messages: m1
> t3: L -> compacted topic // leader compacted the message and broadcasted m1
> to all consumers
> t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
> t5: A-> own bundle // broker A knows that its assignment has been accepted,
> so proceeding to own the bundle (meanwhile deferring lookup requests)
> t6: B -> defer client lookups // broker B knows that bundle assignment is
> running(meanwhile deferring lookup requests)
> t7: C -> defer client lookups // broker C knows that bundle assignment is
> running(meanwhile deferring lookup requests)
>
> With post-filter + a single topic
> t1: A -> topic // broker A published a message to the non-compacted topic,
> m1: {broker A assigned bundle x to broker A}
> t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
> t3:  A-> own bundle // broker A knows that its assignment has been
> accepted, so proceeding to own the bundle (meanwhile deferring lookup
> requests)
> t4: B -> defer client lookups // broker B knows that bundle assignment is
> running(meanwhile deferring lookup requests)
> t5: C -> defer client lookups // broker C knows that bundle assignment is
> running(meanwhile deferring lookup requests)
>
> Analysis: The "post-filter + a single topic" can perform ok in this case
> without the additional leader coordination and the secondary topic because
> the early broadcast can inform all brokers and prevent them from requesting
> other assignments for the same bundle.
>
> I think the post-filter option is initially not bad because:
>
> 1. it is safe in the worst case (in case the messages are not correctly
> pre-filtered at the leader)
> 2. it performs ok because the early broadcast can prevent
> concurrent assignment requests.
> 3. initially less complex to implement (leaderless conflict resolution and
> requires a single topic)
> 4. it is not a "one-way door" decision (we could add the pre-filter logic
> as well later)
>
> Regards,
> Heesung
>
>
>
> On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <he...@streamnative.io>
> wrote:
>
> > Hi Michael,
> >
> > For the pre-prefilter(pre-compaction) option,
> > I think the leader requires a write-through cache to compact messages
> > based on the latest states. Otherwise, the leader needs to wait for the
> > last msg from the (compacted) topic before compacting the next msg for the
> > same bundle.
> >
> > Pulsar guarantees "a single writer". However, for the worst-case
> > scenario(due to network partitions, bugs in zk or etcd leader election,
> > bugs in bk, data corruption ), I think it is safe to place the post-filter
> > on the consumer side(compaction and table views) as well in order to
> > validate the state changes.
> >
> > For the two-topic approach,
> > I think we lose a single linearized view. Could we clarify how to handle
> > the following(edge cases and failure recovery)?
> > 0. Is the un-compacted topic a persistent topic or a non-persistent topic?
> > 1. How does the leader recover state from the two topics?
> > 2. How do we handle the case when the leader fails before writing messages
> > to the compacted topic
> >
> > Regards,
> > Heesung
> >
> > On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <mm...@apache.org>
> > wrote:
> >
> >> Sharing some more thoughts. We could alternatively use two topics
> >> instead of one. In this design, the first topic is the unfiltered
> >> write ahead log that represents many writers (brokers) trying to
> >> acquire ownership of bundles. The second topic is the distilled log
> >> that represents the "winners" or the "owners" of the bundles. There is
> >> a single writer, the leader broker, that reads from the input topic
> >> and writes to the output topic. The first topic is normal and the
> >> second is compacted.
> >>
> >> The primary benefit in a two topic solution is that it is easy for the
> >> leader broker to trade off ownership without needing to slow down
> >> writes to the input topic. The leader broker will start consuming from
> >> the input topic when it has fully consumed the table view on the
> >> output topic. In general, I don't think consumers know when they have
> >> "reached the end of a table view", but we should be able to trivially
> >> figure this out if we are the topic's only writer and the topic and
> >> writer are collocated on the same broker.
> >>
> >> In that design, it might make sense to use something like the
> >> replication cursor to keep track of this consumer's state.
> >>
> >> - Michael
> >>
> >> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <mm...@apache.org>
> >> wrote:
> >> >
> >> > Thanks for your proposal, Heesung.
> >> >
> >> > Fundamentally, we have the problems listed in this PIP because we have
> >> > multiple writers instead of just one writer. Can we solve this problem
> >> > by changing our write pattern? What if we use the leader broker as the
> >> > single writer? That broker would intercept attempts to acquire
> >> > ownership on bundles and would grant ownership to the first broker to
> >> > claim an unassigned bundle. It could "grant ownership" by letting the
> >> > first write to claim an unassigned bundle get written to the ownership
> >> > topic. When a bundle is already owned, the leader won't persist that
> >> > event to the bookkeeper. In this design, the log becomes a true
> >> > ownership log, which will correctly work with the existing topic
> >> > compaction and table view solutions. My proposal essentially moves the
> >> > conflict resolution to just before the write, and as a consequence, it
> >> > greatly reduces the need for post processing of the event log. One
> >> > trade off might be that the leader broker could slow down the write
> >> > path, but given that the leader would just need to verify the current
> >> > state of the bundle, I think it'd be performant enough.
> >> >
> >> > Additionally, we'd need the leader broker to be "caught up" on bundle
> >> > ownership in order to grant ownership of topics, but unless I am
> >> > mistaken, that is already a requirement of the current PIP 192
> >> > paradigm.
> >> >
> >> > Below are some additional thoughts that will be relevant if we move
> >> > forward with the design as it is currently proposed.
> >> >
> >> > I think it might be helpful to update the title to show that this
> >> > proposal will also affect table view as well. I didn't catch that at
> >> > first.
> >> >
> >> > Do you have any documentation describing how the
> >> > TopicCompactionStrategy will determine which states are valid in the
> >> > context of load balancing? I looked at
> >> > https://github.com/apache/pulsar/pull/18195, but I couldn't seem to
> >> > find anything for it. That would help make this proposal less
> >> > abstract.
> >> >
> >> > The proposed API seems very tied to the needs of PIP 192. For example,
> >> > `isValid` is not a term I associate with topic compaction. The
> >> > fundamental question for compaction is which value to keep (or build a
> >> > new value). I think we might be able to simplify the API by replacing
> >> > the "isValid", "isMergeEnabled", and "merge" methods with a single
> >> > method that lets the implementation handle one or all tasks. That
> >> > would also remove the need to deserialize payloads multiple times too.
> >> >
> >> > I also feel like mentioning that after working with the PIP 105 broker
> >> > side filtering, I think we should avoid running UDFs in the broker as
> >> > much as possible. (I do not consider the load balancing logic to be a
> >> > UDF here.) I think it would be worth not making this a user facing
> >> > feature unless there is demand for real use cases.
> >> >
> >> > Thanks!
> >> > Michael
> >> >
> >> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org> wrote:
> >> > >
> >> > > +1(non-binding)
> >> > >
> >> > > thanks,
> >> > > bo
> >> > >
> >> > > Heesung Sohn <he...@streamnative.io.invalid> 于2022年10月19日周三
> >> 07:54写道:
> >> > > >
> >> > > > Hi pulsar-dev community,
> >> > > >
> >> > > > I raised a pip to discuss : PIP-215: Configurable Topic Compaction
> >> Strategy
> >> > > >
> >> > > > PIP link: https://github.com/apache/pulsar/issues/18099
> >> > > >
> >> > > > Regards,
> >> > > > Heesung
> >>
> >

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Hi,

Also, I thought about some concurrent assignment scenarios between
pre-filter vs post-filter.

Example 1: When the topic broadcast is slower than the concurrent
assignment requests

With pre-filter + two-topics (non-compacted and compacted topics)
t1: A -> non-compacted topic // broker A published a message to the
non-compacted topic, m1: {broker A assigned bundle x to broker A}
t2: B -> non-compacted topic // broker B published a message, m2: {broker B
assigned bundle x to broker C}
t3: C -> non-compacted topic // broker C published a message, m3: {broker C
assigned bundle x to broker B}
t4: non-compacted topic -> L // leader broker consumed the messages: m1,m2,
and m3
t5: L -> compacted topic // leader compacted the messages and broadcasted
m1 to all consumers
t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1

With post-filter + a single topic
t1: A -> topic // broker A published a message to the non-compacted topic,
m1: {broker A assigned bundle x to broker A}
t2: B -> topic // broker B published a message, m2: {broker B assigned
bundle x to broker C}
t3: C -> topic // broker C published a message, m3: {broker C assigned
bundle x to broker B}
t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2, and m3
t5: [A,B,C] -> m1 // broker A,B,C individually compacted the messages to m1.

Analysis: the "pre-filter + two-topics" option can reduce the number of
messages to broadcast at the expense of the leader broker compaction.


Example 2: When the topic broadcast is faster than the concurrent
assignment requests

With pre-filter + two-topics (non-compacted and compacted topics)
t1: A -> non-compacted topic // broker A published a message to the
non-compacted topic, m1: {broker A assigned bundle x to broker A}
t2: non-compacted topic -> L // leader broker consumed the messages: m1
t3: L -> compacted topic // leader compacted the message and broadcasted m1
to all consumers
t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
t5: A-> own bundle // broker A knows that its assignment has been accepted,
so proceeding to own the bundle (meanwhile deferring lookup requests)
t6: B -> defer client lookups // broker B knows that bundle assignment is
running(meanwhile deferring lookup requests)
t7: C -> defer client lookups // broker C knows that bundle assignment is
running(meanwhile deferring lookup requests)

With post-filter + a single topic
t1: A -> topic // broker A published a message to the non-compacted topic,
m1: {broker A assigned bundle x to broker A}
t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
t3:  A-> own bundle // broker A knows that its assignment has been
accepted, so proceeding to own the bundle (meanwhile deferring lookup
requests)
t4: B -> defer client lookups // broker B knows that bundle assignment is
running(meanwhile deferring lookup requests)
t5: C -> defer client lookups // broker C knows that bundle assignment is
running(meanwhile deferring lookup requests)

Analysis: The "post-filter + a single topic" can perform ok in this case
without the additional leader coordination and the secondary topic because
the early broadcast can inform all brokers and prevent them from requesting
other assignments for the same bundle.

I think the post-filter option is initially not bad because:

1. it is safe in the worst case (in case the messages are not correctly
pre-filtered at the leader)
2. it performs ok because the early broadcast can prevent
concurrent assignment requests.
3. initially less complex to implement (leaderless conflict resolution and
requires a single topic)
4. it is not a "one-way door" decision (we could add the pre-filter logic
as well later)

Regards,
Heesung



On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <he...@streamnative.io>
wrote:

> Hi Michael,
>
> For the pre-prefilter(pre-compaction) option,
> I think the leader requires a write-through cache to compact messages
> based on the latest states. Otherwise, the leader needs to wait for the
> last msg from the (compacted) topic before compacting the next msg for the
> same bundle.
>
> Pulsar guarantees "a single writer". However, for the worst-case
> scenario(due to network partitions, bugs in zk or etcd leader election,
> bugs in bk, data corruption ), I think it is safe to place the post-filter
> on the consumer side(compaction and table views) as well in order to
> validate the state changes.
>
> For the two-topic approach,
> I think we lose a single linearized view. Could we clarify how to handle
> the following(edge cases and failure recovery)?
> 0. Is the un-compacted topic a persistent topic or a non-persistent topic?
> 1. How does the leader recover state from the two topics?
> 2. How do we handle the case when the leader fails before writing messages
> to the compacted topic
>
> Regards,
> Heesung
>
> On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <mm...@apache.org>
> wrote:
>
>> Sharing some more thoughts. We could alternatively use two topics
>> instead of one. In this design, the first topic is the unfiltered
>> write ahead log that represents many writers (brokers) trying to
>> acquire ownership of bundles. The second topic is the distilled log
>> that represents the "winners" or the "owners" of the bundles. There is
>> a single writer, the leader broker, that reads from the input topic
>> and writes to the output topic. The first topic is normal and the
>> second is compacted.
>>
>> The primary benefit in a two topic solution is that it is easy for the
>> leader broker to trade off ownership without needing to slow down
>> writes to the input topic. The leader broker will start consuming from
>> the input topic when it has fully consumed the table view on the
>> output topic. In general, I don't think consumers know when they have
>> "reached the end of a table view", but we should be able to trivially
>> figure this out if we are the topic's only writer and the topic and
>> writer are collocated on the same broker.
>>
>> In that design, it might make sense to use something like the
>> replication cursor to keep track of this consumer's state.
>>
>> - Michael
>>
>> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <mm...@apache.org>
>> wrote:
>> >
>> > Thanks for your proposal, Heesung.
>> >
>> > Fundamentally, we have the problems listed in this PIP because we have
>> > multiple writers instead of just one writer. Can we solve this problem
>> > by changing our write pattern? What if we use the leader broker as the
>> > single writer? That broker would intercept attempts to acquire
>> > ownership on bundles and would grant ownership to the first broker to
>> > claim an unassigned bundle. It could "grant ownership" by letting the
>> > first write to claim an unassigned bundle get written to the ownership
>> > topic. When a bundle is already owned, the leader won't persist that
>> > event to the bookkeeper. In this design, the log becomes a true
>> > ownership log, which will correctly work with the existing topic
>> > compaction and table view solutions. My proposal essentially moves the
>> > conflict resolution to just before the write, and as a consequence, it
>> > greatly reduces the need for post processing of the event log. One
>> > trade off might be that the leader broker could slow down the write
>> > path, but given that the leader would just need to verify the current
>> > state of the bundle, I think it'd be performant enough.
>> >
>> > Additionally, we'd need the leader broker to be "caught up" on bundle
>> > ownership in order to grant ownership of topics, but unless I am
>> > mistaken, that is already a requirement of the current PIP 192
>> > paradigm.
>> >
>> > Below are some additional thoughts that will be relevant if we move
>> > forward with the design as it is currently proposed.
>> >
>> > I think it might be helpful to update the title to show that this
>> > proposal will also affect table view as well. I didn't catch that at
>> > first.
>> >
>> > Do you have any documentation describing how the
>> > TopicCompactionStrategy will determine which states are valid in the
>> > context of load balancing? I looked at
>> > https://github.com/apache/pulsar/pull/18195, but I couldn't seem to
>> > find anything for it. That would help make this proposal less
>> > abstract.
>> >
>> > The proposed API seems very tied to the needs of PIP 192. For example,
>> > `isValid` is not a term I associate with topic compaction. The
>> > fundamental question for compaction is which value to keep (or build a
>> > new value). I think we might be able to simplify the API by replacing
>> > the "isValid", "isMergeEnabled", and "merge" methods with a single
>> > method that lets the implementation handle one or all tasks. That
>> > would also remove the need to deserialize payloads multiple times too.
>> >
>> > I also feel like mentioning that after working with the PIP 105 broker
>> > side filtering, I think we should avoid running UDFs in the broker as
>> > much as possible. (I do not consider the load balancing logic to be a
>> > UDF here.) I think it would be worth not making this a user facing
>> > feature unless there is demand for real use cases.
>> >
>> > Thanks!
>> > Michael
>> >
>> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org> wrote:
>> > >
>> > > +1(non-binding)
>> > >
>> > > thanks,
>> > > bo
>> > >
>> > > Heesung Sohn <he...@streamnative.io.invalid> 于2022年10月19日周三
>> 07:54写道:
>> > > >
>> > > > Hi pulsar-dev community,
>> > > >
>> > > > I raised a pip to discuss : PIP-215: Configurable Topic Compaction
>> Strategy
>> > > >
>> > > > PIP link: https://github.com/apache/pulsar/issues/18099
>> > > >
>> > > > Regards,
>> > > > Heesung
>>
>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Hi Michael,

For the pre-prefilter(pre-compaction) option,
I think the leader requires a write-through cache to compact messages based
on the latest states. Otherwise, the leader needs to wait for the last msg
from the (compacted) topic before compacting the next msg for the same
bundle.

Pulsar guarantees "a single writer". However, for the worst-case
scenario(due to network partitions, bugs in zk or etcd leader election,
bugs in bk, data corruption ), I think it is safe to place the post-filter
on the consumer side(compaction and table views) as well in order to
validate the state changes.

For the two-topic approach,
I think we lose a single linearized view. Could we clarify how to handle
the following(edge cases and failure recovery)?
0. Is the un-compacted topic a persistent topic or a non-persistent topic?
1. How does the leader recover state from the two topics?
2. How do we handle the case when the leader fails before writing messages
to the compacted topic

Regards,
Heesung

On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <mm...@apache.org>
wrote:

> Sharing some more thoughts. We could alternatively use two topics
> instead of one. In this design, the first topic is the unfiltered
> write ahead log that represents many writers (brokers) trying to
> acquire ownership of bundles. The second topic is the distilled log
> that represents the "winners" or the "owners" of the bundles. There is
> a single writer, the leader broker, that reads from the input topic
> and writes to the output topic. The first topic is normal and the
> second is compacted.
>
> The primary benefit in a two topic solution is that it is easy for the
> leader broker to trade off ownership without needing to slow down
> writes to the input topic. The leader broker will start consuming from
> the input topic when it has fully consumed the table view on the
> output topic. In general, I don't think consumers know when they have
> "reached the end of a table view", but we should be able to trivially
> figure this out if we are the topic's only writer and the topic and
> writer are collocated on the same broker.
>
> In that design, it might make sense to use something like the
> replication cursor to keep track of this consumer's state.
>
> - Michael
>
> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <mm...@apache.org>
> wrote:
> >
> > Thanks for your proposal, Heesung.
> >
> > Fundamentally, we have the problems listed in this PIP because we have
> > multiple writers instead of just one writer. Can we solve this problem
> > by changing our write pattern? What if we use the leader broker as the
> > single writer? That broker would intercept attempts to acquire
> > ownership on bundles and would grant ownership to the first broker to
> > claim an unassigned bundle. It could "grant ownership" by letting the
> > first write to claim an unassigned bundle get written to the ownership
> > topic. When a bundle is already owned, the leader won't persist that
> > event to the bookkeeper. In this design, the log becomes a true
> > ownership log, which will correctly work with the existing topic
> > compaction and table view solutions. My proposal essentially moves the
> > conflict resolution to just before the write, and as a consequence, it
> > greatly reduces the need for post processing of the event log. One
> > trade off might be that the leader broker could slow down the write
> > path, but given that the leader would just need to verify the current
> > state of the bundle, I think it'd be performant enough.
> >
> > Additionally, we'd need the leader broker to be "caught up" on bundle
> > ownership in order to grant ownership of topics, but unless I am
> > mistaken, that is already a requirement of the current PIP 192
> > paradigm.
> >
> > Below are some additional thoughts that will be relevant if we move
> > forward with the design as it is currently proposed.
> >
> > I think it might be helpful to update the title to show that this
> > proposal will also affect table view as well. I didn't catch that at
> > first.
> >
> > Do you have any documentation describing how the
> > TopicCompactionStrategy will determine which states are valid in the
> > context of load balancing? I looked at
> > https://github.com/apache/pulsar/pull/18195, but I couldn't seem to
> > find anything for it. That would help make this proposal less
> > abstract.
> >
> > The proposed API seems very tied to the needs of PIP 192. For example,
> > `isValid` is not a term I associate with topic compaction. The
> > fundamental question for compaction is which value to keep (or build a
> > new value). I think we might be able to simplify the API by replacing
> > the "isValid", "isMergeEnabled", and "merge" methods with a single
> > method that lets the implementation handle one or all tasks. That
> > would also remove the need to deserialize payloads multiple times too.
> >
> > I also feel like mentioning that after working with the PIP 105 broker
> > side filtering, I think we should avoid running UDFs in the broker as
> > much as possible. (I do not consider the load balancing logic to be a
> > UDF here.) I think it would be worth not making this a user facing
> > feature unless there is demand for real use cases.
> >
> > Thanks!
> > Michael
> >
> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org> wrote:
> > >
> > > +1(non-binding)
> > >
> > > thanks,
> > > bo
> > >
> > > Heesung Sohn <he...@streamnative.io.invalid> 于2022年10月19日周三
> 07:54写道:
> > > >
> > > > Hi pulsar-dev community,
> > > >
> > > > I raised a pip to discuss : PIP-215: Configurable Topic Compaction
> Strategy
> > > >
> > > > PIP link: https://github.com/apache/pulsar/issues/18099
> > > >
> > > > Regards,
> > > > Heesung
>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Michael Marshall <mm...@apache.org>.
Sharing some more thoughts. We could alternatively use two topics
instead of one. In this design, the first topic is the unfiltered
write ahead log that represents many writers (brokers) trying to
acquire ownership of bundles. The second topic is the distilled log
that represents the "winners" or the "owners" of the bundles. There is
a single writer, the leader broker, that reads from the input topic
and writes to the output topic. The first topic is normal and the
second is compacted.

The primary benefit in a two topic solution is that it is easy for the
leader broker to trade off ownership without needing to slow down
writes to the input topic. The leader broker will start consuming from
the input topic when it has fully consumed the table view on the
output topic. In general, I don't think consumers know when they have
"reached the end of a table view", but we should be able to trivially
figure this out if we are the topic's only writer and the topic and
writer are collocated on the same broker.

In that design, it might make sense to use something like the
replication cursor to keep track of this consumer's state.

- Michael

On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <mm...@apache.org> wrote:
>
> Thanks for your proposal, Heesung.
>
> Fundamentally, we have the problems listed in this PIP because we have
> multiple writers instead of just one writer. Can we solve this problem
> by changing our write pattern? What if we use the leader broker as the
> single writer? That broker would intercept attempts to acquire
> ownership on bundles and would grant ownership to the first broker to
> claim an unassigned bundle. It could "grant ownership" by letting the
> first write to claim an unassigned bundle get written to the ownership
> topic. When a bundle is already owned, the leader won't persist that
> event to the bookkeeper. In this design, the log becomes a true
> ownership log, which will correctly work with the existing topic
> compaction and table view solutions. My proposal essentially moves the
> conflict resolution to just before the write, and as a consequence, it
> greatly reduces the need for post processing of the event log. One
> trade off might be that the leader broker could slow down the write
> path, but given that the leader would just need to verify the current
> state of the bundle, I think it'd be performant enough.
>
> Additionally, we'd need the leader broker to be "caught up" on bundle
> ownership in order to grant ownership of topics, but unless I am
> mistaken, that is already a requirement of the current PIP 192
> paradigm.
>
> Below are some additional thoughts that will be relevant if we move
> forward with the design as it is currently proposed.
>
> I think it might be helpful to update the title to show that this
> proposal will also affect table view as well. I didn't catch that at
> first.
>
> Do you have any documentation describing how the
> TopicCompactionStrategy will determine which states are valid in the
> context of load balancing? I looked at
> https://github.com/apache/pulsar/pull/18195, but I couldn't seem to
> find anything for it. That would help make this proposal less
> abstract.
>
> The proposed API seems very tied to the needs of PIP 192. For example,
> `isValid` is not a term I associate with topic compaction. The
> fundamental question for compaction is which value to keep (or build a
> new value). I think we might be able to simplify the API by replacing
> the "isValid", "isMergeEnabled", and "merge" methods with a single
> method that lets the implementation handle one or all tasks. That
> would also remove the need to deserialize payloads multiple times too.
>
> I also feel like mentioning that after working with the PIP 105 broker
> side filtering, I think we should avoid running UDFs in the broker as
> much as possible. (I do not consider the load balancing logic to be a
> UDF here.) I think it would be worth not making this a user facing
> feature unless there is demand for real use cases.
>
> Thanks!
> Michael
>
> On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org> wrote:
> >
> > +1(non-binding)
> >
> > thanks,
> > bo
> >
> > Heesung Sohn <he...@streamnative.io.invalid> 于2022年10月19日周三 07:54写道:
> > >
> > > Hi pulsar-dev community,
> > >
> > > I raised a pip to discuss : PIP-215: Configurable Topic Compaction Strategy
> > >
> > > PIP link: https://github.com/apache/pulsar/issues/18099
> > >
> > > Regards,
> > > Heesung

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Michael Marshall <mm...@apache.org>.
Thanks for your proposal, Heesung.

Fundamentally, we have the problems listed in this PIP because we have
multiple writers instead of just one writer. Can we solve this problem
by changing our write pattern? What if we use the leader broker as the
single writer? That broker would intercept attempts to acquire
ownership on bundles and would grant ownership to the first broker to
claim an unassigned bundle. It could "grant ownership" by letting the
first write to claim an unassigned bundle get written to the ownership
topic. When a bundle is already owned, the leader won't persist that
event to the bookkeeper. In this design, the log becomes a true
ownership log, which will correctly work with the existing topic
compaction and table view solutions. My proposal essentially moves the
conflict resolution to just before the write, and as a consequence, it
greatly reduces the need for post processing of the event log. One
trade off might be that the leader broker could slow down the write
path, but given that the leader would just need to verify the current
state of the bundle, I think it'd be performant enough.

Additionally, we'd need the leader broker to be "caught up" on bundle
ownership in order to grant ownership of topics, but unless I am
mistaken, that is already a requirement of the current PIP 192
paradigm.

Below are some additional thoughts that will be relevant if we move
forward with the design as it is currently proposed.

I think it might be helpful to update the title to show that this
proposal will also affect table view as well. I didn't catch that at
first.

Do you have any documentation describing how the
TopicCompactionStrategy will determine which states are valid in the
context of load balancing? I looked at
https://github.com/apache/pulsar/pull/18195, but I couldn't seem to
find anything for it. That would help make this proposal less
abstract.

The proposed API seems very tied to the needs of PIP 192. For example,
`isValid` is not a term I associate with topic compaction. The
fundamental question for compaction is which value to keep (or build a
new value). I think we might be able to simplify the API by replacing
the "isValid", "isMergeEnabled", and "merge" methods with a single
method that lets the implementation handle one or all tasks. That
would also remove the need to deserialize payloads multiple times too.

I also feel like mentioning that after working with the PIP 105 broker
side filtering, I think we should avoid running UDFs in the broker as
much as possible. (I do not consider the load balancing logic to be a
UDF here.) I think it would be worth not making this a user facing
feature unless there is demand for real use cases.

Thanks!
Michael

On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org> wrote:
>
> +1(non-binding)
>
> thanks,
> bo
>
> Heesung Sohn <he...@streamnative.io.invalid> 于2022年10月19日周三 07:54写道:
> >
> > Hi pulsar-dev community,
> >
> > I raised a pip to discuss : PIP-215: Configurable Topic Compaction Strategy
> >
> > PIP link: https://github.com/apache/pulsar/issues/18099
> >
> > Regards,
> > Heesung

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by 丛搏 <bo...@apache.org>.
+1(non-binding)

thanks,
bo

Heesung Sohn <he...@streamnative.io.invalid> 于2022年10月19日周三 07:54写道:
>
> Hi pulsar-dev community,
>
> I raised a pip to discuss : PIP-215: Configurable Topic Compaction Strategy
>
> PIP link: https://github.com/apache/pulsar/issues/18099
>
> Regards,
> Heesung

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by PengHui Li <pe...@apache.org>.
Hi Heesung,

Thanks for the explanation
I got your point now.
We don't know when the topic compaction task will be triggered.
If we are not able to write the final decision message to the topic.
We will lose the first message of the key.
It makes sense to me.

+1

Penghui

On Tue, Oct 25, 2022 at 12:16 PM Heesung Sohn
<he...@streamnative.io.invalid> wrote:

> Hi,
>
> Regarding the release plan of this strategic compaction, I think we can
> take a conservative approach.
>
> First, since this is required for the system topic introduced in PIP-192,
> we can make this strategic compaction internal-only(the PIP-192, new broker
> load balancer will be the only use-case initially).
>
> Once this strategic compaction is proven to be stable enough, and there is
> demand from the customer topics, then we can expose the following admin
> APIs to enable strategic compaction to customer topics.
>
> pulsar-admin topicPolicies set-compaction-strategy options
> pulsar-admin topicPolicies get-compaction-strategy options
>
> Regards,
> Heesung
>
>
>
> On Mon, Oct 24, 2022 at 1:41 PM Heesung Sohn <heesung.sohn@streamnative.io
> >
> wrote:
>
> > Hi, please find my answers inline.
> >
> > On Sun, Oct 23, 2022 at 7:11 PM PengHui Li <pe...@apache.org> wrote:
> >
> >> Sorry, heesung,
> >>
> >> I think I used a confusing name "leader election".
> >> Actually, I meant to say "topic owner".
> >>
> >> From my understanding, the issue is if we are using the table view for a
> >> compacted topic.
> >> We will always get the last value of a key. But it will not work for the
> >> "broker ownership conflicts handling".
> >> First, we need to change the table view that is able to keep only the
> >> first
> >> value of a key.
> >> Even without compaction, the "broker ownership conflicts handling" will
> >> still work correctly, right?
> >>
> >>
> > Yes, the BSC conflict resolution needs to take the first valid
> value(state
> > change) per key, instead of just the latest value. For non-compacted
> topic,
> > only this table-view update(taking a strategic cache update) will serve
> its
> > purpose.
> >
> >
> >> But if the table view works on a compaction topic. The table view will
> >> show
> >> the last value of a key after the
> >> compaction. So you want also to change the topic compaction to make sure
> >> the table view will always show the
> >> first value of a key.
> >>
> >> Yes.
> >
> >
> >> Maybe I missed something here.
> >>
> >> My point is if we can just write the owner(final, the first value of the
> >> key) broker back to the topic.
> >> So that the table view will always show the first value of the key
> before
> >> the topic compaction or after the topic compaction.
> >>
> >>
> > But how do we conflict-resolve if the tail messages of the topic are
> > non-terminal states?
> >
> > 1. bundle 1 assigned by broker 1 // in the process of assignment
> > 2. bundle 1 assigned by broker 2
> > 3. bundle 2 released by broker 1 // in the process of transfer
> > 3. bundle 2 assigned by broker 1
> > 5. bundle 3 splitting by broker 1 // in the process of split
> > 6. bundle 3 assigned by broker 2
> >
> >
> > Regards,
> > Heesung
> >
> >
> >> Thanks,
> >> Penghui
> >>
> >> On Sat, Oct 22, 2022 at 12:23 AM Heesung Sohn
> >> <he...@streamnative.io.invalid> wrote:
> >>
> >> > Hi Penghui,
> >> >
> >> > I put my answers inline.
> >> >
> >> > On Thu, Oct 20, 2022 at 5:11 PM PengHui Li <pe...@apache.org>
> wrote:
> >> >
> >> > > Hi Heesung.
> >> > >
> >> > > Is it possible to send the promoted value to the topic again to
> >> achieve
> >> > > eventual consistency?
> >> > >
> >> >
> >> > Yes, as long as the state change is valid, BSC will accept it and
> >> broadcast
> >> > it to all brokers.
> >> >
> >> >
> >> > > For example:
> >> > >
> >> > > We have 3 brokers, broker-a, broker-b, and broker-c
> >> > > The message for leader election could be "own: broker-b", "own:
> >> > broker-c",
> >> > > "own: broker-a"
> >> > > The broker-b will win in the end.
> >> > >
> >> > The broker-b can write a new message "own: broker-b" to the topic.
> After
> >> > > the topic compaction.
> >> > > Only the broker-b will be present in the topic. Does it work?
> >> >
> >> >
> >> > The proposal does not use a topic for leader election because of the
> >> > circular dependency. The proposal uses the metadata store, zookeeper,
> to
> >> > elect the leader broker(s) of BSC.
> >> > This part is explained in the "Bundle State Channel Owner Selection
> and
> >> > Discovery" section in pip-192.
> >> >
> >> > *Bundle State Channel Owner Selection and Discovery*
> >> >
> >> > *Bundle State Channel(BSC) is another topic, and because of its
> circular
> >> > dependency, we can't use the BundleStateChannel to find the owner
> >> broker of
> >> > the BSC topic. For example, when a cluster starts, each broker needs
> to
> >> > initiate BSC TopicLookUp(to find the owner broker) in order to consume
> >> the
> >> > messages in BSC. However, initially, each broker does not know which
> >> broker
> >> > owns the BSC.*
> >> >
> >> > *The ZK leader election can be a good option to break this circular
> >> > dependency, like the followings.*
> >> > *Channel Owner Selection*
> >> >
> >> > *The cluster can use the ZK leader election to select the owner
> broker.
> >> If
> >> > the owner becomes unavailable, one of the followers will become the
> new
> >> > owner. We can elect the owner for each bundle state channel
> partition.*
> >> > *Channel Owner Discovery*
> >> >
> >> > *Then, in brokers’ TopicLookUp logic, we will add a special case to
> >> return
> >> > the current leader(the elected BSC owner) for the BSC topics.*
> >> >
> >> >
> >> >
> >> > >
> >> > > Maybe I missed something.
> >> > >
> >> > > Thanks,
> >> > > Penghui
> >> > >
> >> > > On Thu, Oct 20, 2022 at 1:30 AM Heesung Sohn
> >> > > <he...@streamnative.io.invalid> wrote:
> >> > >
> >> > > > Oops.
> >> > > > I forgot to mention another important item. I added it below(in
> >> bold).
> >> > > >
> >> > > > Pros:
> >> > > > - It supports more distributed load balance operations(bundle
> >> > assignment)
> >> > > > in a sequentially consistent manner
> >> > > > - For really large clusters, by a partitioned system topic, BSC
> can
> >> be
> >> > > more
> >> > > > scalable than the current single-leader coordination solution.
> >> > > > - The load balance commands(across brokers) are sent via event
> >> > > > sourcing(more reliable/transparent/easy-to-track) instead of RPC
> >> with
> >> > > > retries.
> >> > > > *- Bundle ownerships can be cached in the topic table-view from
> BSC.
> >> > (no
> >> > > > longer needs to store bundle ownership in metadata store(ZK))*
> >> > > >
> >> > > > Cons:
> >> > > > - It is a new implementation and will require significant effort
> to
> >> > > > stabilize the new implementation.
> >> > > > (Based on our PoC code, I think the event sourcing handlers are
> >> easier
> >> > to
> >> > > > understand and follow the logic.
> >> > > > Also, this new load balancer will be pluggable(will be implemented
> >> in
> >> > new
> >> > > > classes), so it should not break the existing load balance logic.
> >> > > > Users will be able to configure old/new broker load balancer.)
> >> > > >
> >> > > > On Wed, Oct 19, 2022 at 10:17 AM Heesung Sohn <
> >> > > > heesung.sohn@streamnative.io>
> >> > > > wrote:
> >> > > >
> >> > > > > Hi,
> >> > > > >
> >> > > > > On Wed, Oct 19, 2022 at 2:06 AM 丛搏 <co...@gmail.com>
> wrote:
> >> > > > >
> >> > > > >> Hi, Heesung:
> >> > > > >> I have some doubts.
> >> > > > >> I review your PIP-192: New Pulsar Broker Load Balancer. I found
> >> that
> >> > > > >> unload topic uses the leader broker to do, (Assigning, Return)
> >> uses
> >> > > > >> the lookup request broker. why (Assigning, Return) not use a
> >> leader
> >> > > > >> broker?
> >> > > > >> I can think of a few reasons:
> >> > > > >> 1. reduce leader broker pressure
> >> > > > >> 2. does not strongly depend on the leader broker
> >> > > > >>
> >> > > > >> Yes, one of the goals of the PIP-192 is to distribute the load
> >> > balance
> >> > > > > logic to individual brokers (bundle assignment and bundle
> split).
> >> > > > >
> >> > > > > If (Assigning, Return) does not depend on the leader, it will
> >> bring
> >> > > > > the following problems:
> >> > > > >
> >> > > > >> If (Assigning, Return) does not depend on the leader, it will
> >> bring
> >> > > > >> the following problems:
> >> > > > >
> >> > > > > I assume what you meant by `(Assigning, Return) does not depend
> on
> >> > the
> >> > > > > leader` is the distributed topic assignment here(concurrent
> bundle
> >> > > > > assignment across brokers).
> >> > > > >
> >> > > > > 1. leader clear bundle op and (Assigning, Return) will do at the
> >> same
> >> > > > >> time, It will cause many requests to be retried, and the broker
> >> will
> >> > > > >> be in chaos for a long time.
> >> > > > >
> >> > > > > I assume `leader clear bundle op` means bundle unloading, and
> >> `many
> >> > > > > requests` means topic lookup requests(bundle assignment
> requests).
> >> > > > > The leader unloads only high-loaded bundles in the "Owned"
> state.
> >> So,
> >> > > the
> >> > > > > leader does not unload bundles that are in the process of
> >> assignment
> >> > > > states.
> >> > > > > Even if there are conflict state changes, only the first valid
> >> state
> >> > > > > change will be accepted(as explained in Conflict State
> >> > Resolution(Race
> >> > > > > Conditions section in the PIP)) in BSC.
> >> > > > >
> >> > > > > Also, another goal of this PIP-192 is to reduce client lookup
> >> > retries.
> >> > > In
> >> > > > > BSC, the client lookup response will be deferred(max x secs)
> until
> >> > the
> >> > > > > bundle state becomes finally "Owned".
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >> 2. bundle State Channel(BSC) owner depends on the leader
> broker,
> >> > this
> >> > > > >> also makes topic transfer strongly dependent on the leader.
> >> > > > >>
> >> > > > > BSC will use separate leader znodes to decide the owner brokers
> of
> >> > the
> >> > > > > internal BSC system topic.As described in this section in the
> >> > PIP-192,
> >> > > > > "Bundle State and Load Data TableView Scalability",
> >> > > > > We could use a partitioned topic(configurable) for this BSC
> system
> >> > > topic.
> >> > > > > Then, there could be a separate owner broker for each partition
> >> > > > > (e.g. zk leader znodes, /loadbalance/leader/part-1-owner,
> >> > part-2-owner,
> >> > > > > ..etc).
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >> 3. the code becomes more complex and harder to maintain
> >> > > > >>
> >> > > > >> What tradeoffs are the current implementations based on?
> >> > > > >>
> >> > > > >> Here are some Pros and Cons of BSC I can think of.
> >> > > > >
> >> > > > > Pros:
> >> > > > > - It supports more distributed load balance operations(bundle
> >> > > assignment)
> >> > > > > in a sequentially consistent manner
> >> > > > > - For really large clusters, by a partitioned system topic, BSC
> >> can
> >> > be
> >> > > > > more scalable than the current single-leader coordination
> >> solution.
> >> > > > > - The load balance commands(across brokers) are sent via event
> >> > > > > sourcing(more reliable/transparent/easy-to-track) instead of RPC
> >> with
> >> > > > > retries.
> >> > > > >
> >> > > > > Cons:
> >> > > > > - It is a new implementation and will require significant effort
> >> to
> >> > > > > stabilize the new implementation.
> >> > > > > (Based on our PoC code, I think the event sourcing handlers are
> >> > easier
> >> > > to
> >> > > > > understand and follow the logic.
> >> > > > > Also, this new load balancer will be pluggable(will be
> >> implemented in
> >> > > new
> >> > > > > classes), so it should not break the existing load balance
> logic.
> >> > > > > Users will be able to configure old/new broker load balancer.)
> >> > > > >
> >> > > > >
> >> > > > > Thank you for sharing your questions about PIP-192 here. But I
> >> think
> >> > > this
> >> > > > > PIP-215 is independent of PIP-192(though PIP-192 needs some of
> the
> >> > > > features
> >> > > > > in PIP-215).
> >> > > > >
> >> > > > > Thanks,
> >> > > > > Heesung
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >> Thanks,
> >> > > > >> bo
> >> > > > >>
> >> > > > >> Heesung Sohn <he...@streamnative.io.invalid>
> >> 于2022年10月19日周三
> >> > > > >> 07:54写道:
> >> > > > >> >
> >> > > > >> > Hi pulsar-dev community,
> >> > > > >> >
> >> > > > >> > I raised a pip to discuss : PIP-215: Configurable Topic
> >> Compaction
> >> > > > >> Strategy
> >> > > > >> >
> >> > > > >> > PIP link: https://github.com/apache/pulsar/issues/18099
> >> > > > >> >
> >> > > > >> > Regards,
> >> > > > >> > Heesung
> >> > > > >>
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Hi,

Regarding the release plan of this strategic compaction, I think we can
take a conservative approach.

First, since this is required for the system topic introduced in PIP-192,
we can make this strategic compaction internal-only(the PIP-192, new broker
load balancer will be the only use-case initially).

Once this strategic compaction is proven to be stable enough, and there is
demand from the customer topics, then we can expose the following admin
APIs to enable strategic compaction to customer topics.

pulsar-admin topicPolicies set-compaction-strategy options
pulsar-admin topicPolicies get-compaction-strategy options

Regards,
Heesung



On Mon, Oct 24, 2022 at 1:41 PM Heesung Sohn <he...@streamnative.io>
wrote:

> Hi, please find my answers inline.
>
> On Sun, Oct 23, 2022 at 7:11 PM PengHui Li <pe...@apache.org> wrote:
>
>> Sorry, heesung,
>>
>> I think I used a confusing name "leader election".
>> Actually, I meant to say "topic owner".
>>
>> From my understanding, the issue is if we are using the table view for a
>> compacted topic.
>> We will always get the last value of a key. But it will not work for the
>> "broker ownership conflicts handling".
>> First, we need to change the table view that is able to keep only the
>> first
>> value of a key.
>> Even without compaction, the "broker ownership conflicts handling" will
>> still work correctly, right?
>>
>>
> Yes, the BSC conflict resolution needs to take the first valid value(state
> change) per key, instead of just the latest value. For non-compacted topic,
> only this table-view update(taking a strategic cache update) will serve its
> purpose.
>
>
>> But if the table view works on a compaction topic. The table view will
>> show
>> the last value of a key after the
>> compaction. So you want also to change the topic compaction to make sure
>> the table view will always show the
>> first value of a key.
>>
>> Yes.
>
>
>> Maybe I missed something here.
>>
>> My point is if we can just write the owner(final, the first value of the
>> key) broker back to the topic.
>> So that the table view will always show the first value of the key before
>> the topic compaction or after the topic compaction.
>>
>>
> But how do we conflict-resolve if the tail messages of the topic are
> non-terminal states?
>
> 1. bundle 1 assigned by broker 1 // in the process of assignment
> 2. bundle 1 assigned by broker 2
> 3. bundle 2 released by broker 1 // in the process of transfer
> 3. bundle 2 assigned by broker 1
> 5. bundle 3 splitting by broker 1 // in the process of split
> 6. bundle 3 assigned by broker 2
>
>
> Regards,
> Heesung
>
>
>> Thanks,
>> Penghui
>>
>> On Sat, Oct 22, 2022 at 12:23 AM Heesung Sohn
>> <he...@streamnative.io.invalid> wrote:
>>
>> > Hi Penghui,
>> >
>> > I put my answers inline.
>> >
>> > On Thu, Oct 20, 2022 at 5:11 PM PengHui Li <pe...@apache.org> wrote:
>> >
>> > > Hi Heesung.
>> > >
>> > > Is it possible to send the promoted value to the topic again to
>> achieve
>> > > eventual consistency?
>> > >
>> >
>> > Yes, as long as the state change is valid, BSC will accept it and
>> broadcast
>> > it to all brokers.
>> >
>> >
>> > > For example:
>> > >
>> > > We have 3 brokers, broker-a, broker-b, and broker-c
>> > > The message for leader election could be "own: broker-b", "own:
>> > broker-c",
>> > > "own: broker-a"
>> > > The broker-b will win in the end.
>> > >
>> > The broker-b can write a new message "own: broker-b" to the topic. After
>> > > the topic compaction.
>> > > Only the broker-b will be present in the topic. Does it work?
>> >
>> >
>> > The proposal does not use a topic for leader election because of the
>> > circular dependency. The proposal uses the metadata store, zookeeper, to
>> > elect the leader broker(s) of BSC.
>> > This part is explained in the "Bundle State Channel Owner Selection and
>> > Discovery" section in pip-192.
>> >
>> > *Bundle State Channel Owner Selection and Discovery*
>> >
>> > *Bundle State Channel(BSC) is another topic, and because of its circular
>> > dependency, we can't use the BundleStateChannel to find the owner
>> broker of
>> > the BSC topic. For example, when a cluster starts, each broker needs to
>> > initiate BSC TopicLookUp(to find the owner broker) in order to consume
>> the
>> > messages in BSC. However, initially, each broker does not know which
>> broker
>> > owns the BSC.*
>> >
>> > *The ZK leader election can be a good option to break this circular
>> > dependency, like the followings.*
>> > *Channel Owner Selection*
>> >
>> > *The cluster can use the ZK leader election to select the owner broker.
>> If
>> > the owner becomes unavailable, one of the followers will become the new
>> > owner. We can elect the owner for each bundle state channel partition.*
>> > *Channel Owner Discovery*
>> >
>> > *Then, in brokers’ TopicLookUp logic, we will add a special case to
>> return
>> > the current leader(the elected BSC owner) for the BSC topics.*
>> >
>> >
>> >
>> > >
>> > > Maybe I missed something.
>> > >
>> > > Thanks,
>> > > Penghui
>> > >
>> > > On Thu, Oct 20, 2022 at 1:30 AM Heesung Sohn
>> > > <he...@streamnative.io.invalid> wrote:
>> > >
>> > > > Oops.
>> > > > I forgot to mention another important item. I added it below(in
>> bold).
>> > > >
>> > > > Pros:
>> > > > - It supports more distributed load balance operations(bundle
>> > assignment)
>> > > > in a sequentially consistent manner
>> > > > - For really large clusters, by a partitioned system topic, BSC can
>> be
>> > > more
>> > > > scalable than the current single-leader coordination solution.
>> > > > - The load balance commands(across brokers) are sent via event
>> > > > sourcing(more reliable/transparent/easy-to-track) instead of RPC
>> with
>> > > > retries.
>> > > > *- Bundle ownerships can be cached in the topic table-view from BSC.
>> > (no
>> > > > longer needs to store bundle ownership in metadata store(ZK))*
>> > > >
>> > > > Cons:
>> > > > - It is a new implementation and will require significant effort to
>> > > > stabilize the new implementation.
>> > > > (Based on our PoC code, I think the event sourcing handlers are
>> easier
>> > to
>> > > > understand and follow the logic.
>> > > > Also, this new load balancer will be pluggable(will be implemented
>> in
>> > new
>> > > > classes), so it should not break the existing load balance logic.
>> > > > Users will be able to configure old/new broker load balancer.)
>> > > >
>> > > > On Wed, Oct 19, 2022 at 10:17 AM Heesung Sohn <
>> > > > heesung.sohn@streamnative.io>
>> > > > wrote:
>> > > >
>> > > > > Hi,
>> > > > >
>> > > > > On Wed, Oct 19, 2022 at 2:06 AM 丛搏 <co...@gmail.com> wrote:
>> > > > >
>> > > > >> Hi, Heesung:
>> > > > >> I have some doubts.
>> > > > >> I review your PIP-192: New Pulsar Broker Load Balancer. I found
>> that
>> > > > >> unload topic uses the leader broker to do, (Assigning, Return)
>> uses
>> > > > >> the lookup request broker. why (Assigning, Return) not use a
>> leader
>> > > > >> broker?
>> > > > >> I can think of a few reasons:
>> > > > >> 1. reduce leader broker pressure
>> > > > >> 2. does not strongly depend on the leader broker
>> > > > >>
>> > > > >> Yes, one of the goals of the PIP-192 is to distribute the load
>> > balance
>> > > > > logic to individual brokers (bundle assignment and bundle split).
>> > > > >
>> > > > > If (Assigning, Return) does not depend on the leader, it will
>> bring
>> > > > > the following problems:
>> > > > >
>> > > > >> If (Assigning, Return) does not depend on the leader, it will
>> bring
>> > > > >> the following problems:
>> > > > >
>> > > > > I assume what you meant by `(Assigning, Return) does not depend on
>> > the
>> > > > > leader` is the distributed topic assignment here(concurrent bundle
>> > > > > assignment across brokers).
>> > > > >
>> > > > > 1. leader clear bundle op and (Assigning, Return) will do at the
>> same
>> > > > >> time, It will cause many requests to be retried, and the broker
>> will
>> > > > >> be in chaos for a long time.
>> > > > >
>> > > > > I assume `leader clear bundle op` means bundle unloading, and
>> `many
>> > > > > requests` means topic lookup requests(bundle assignment requests).
>> > > > > The leader unloads only high-loaded bundles in the "Owned" state.
>> So,
>> > > the
>> > > > > leader does not unload bundles that are in the process of
>> assignment
>> > > > states.
>> > > > > Even if there are conflict state changes, only the first valid
>> state
>> > > > > change will be accepted(as explained in Conflict State
>> > Resolution(Race
>> > > > > Conditions section in the PIP)) in BSC.
>> > > > >
>> > > > > Also, another goal of this PIP-192 is to reduce client lookup
>> > retries.
>> > > In
>> > > > > BSC, the client lookup response will be deferred(max x secs) until
>> > the
>> > > > > bundle state becomes finally "Owned".
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >> 2. bundle State Channel(BSC) owner depends on the leader broker,
>> > this
>> > > > >> also makes topic transfer strongly dependent on the leader.
>> > > > >>
>> > > > > BSC will use separate leader znodes to decide the owner brokers of
>> > the
>> > > > > internal BSC system topic.As described in this section in the
>> > PIP-192,
>> > > > > "Bundle State and Load Data TableView Scalability",
>> > > > > We could use a partitioned topic(configurable) for this BSC system
>> > > topic.
>> > > > > Then, there could be a separate owner broker for each partition
>> > > > > (e.g. zk leader znodes, /loadbalance/leader/part-1-owner,
>> > part-2-owner,
>> > > > > ..etc).
>> > > > >
>> > > > >
>> > > > >
>> > > > >> 3. the code becomes more complex and harder to maintain
>> > > > >>
>> > > > >> What tradeoffs are the current implementations based on?
>> > > > >>
>> > > > >> Here are some Pros and Cons of BSC I can think of.
>> > > > >
>> > > > > Pros:
>> > > > > - It supports more distributed load balance operations(bundle
>> > > assignment)
>> > > > > in a sequentially consistent manner
>> > > > > - For really large clusters, by a partitioned system topic, BSC
>> can
>> > be
>> > > > > more scalable than the current single-leader coordination
>> solution.
>> > > > > - The load balance commands(across brokers) are sent via event
>> > > > > sourcing(more reliable/transparent/easy-to-track) instead of RPC
>> with
>> > > > > retries.
>> > > > >
>> > > > > Cons:
>> > > > > - It is a new implementation and will require significant effort
>> to
>> > > > > stabilize the new implementation.
>> > > > > (Based on our PoC code, I think the event sourcing handlers are
>> > easier
>> > > to
>> > > > > understand and follow the logic.
>> > > > > Also, this new load balancer will be pluggable(will be
>> implemented in
>> > > new
>> > > > > classes), so it should not break the existing load balance logic.
>> > > > > Users will be able to configure old/new broker load balancer.)
>> > > > >
>> > > > >
>> > > > > Thank you for sharing your questions about PIP-192 here. But I
>> think
>> > > this
>> > > > > PIP-215 is independent of PIP-192(though PIP-192 needs some of the
>> > > > features
>> > > > > in PIP-215).
>> > > > >
>> > > > > Thanks,
>> > > > > Heesung
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >> Thanks,
>> > > > >> bo
>> > > > >>
>> > > > >> Heesung Sohn <he...@streamnative.io.invalid>
>> 于2022年10月19日周三
>> > > > >> 07:54写道:
>> > > > >> >
>> > > > >> > Hi pulsar-dev community,
>> > > > >> >
>> > > > >> > I raised a pip to discuss : PIP-215: Configurable Topic
>> Compaction
>> > > > >> Strategy
>> > > > >> >
>> > > > >> > PIP link: https://github.com/apache/pulsar/issues/18099
>> > > > >> >
>> > > > >> > Regards,
>> > > > >> > Heesung
>> > > > >>
>> > > > >
>> > > >
>> > >
>> >
>>
>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Hi, please find my answers inline.

On Sun, Oct 23, 2022 at 7:11 PM PengHui Li <pe...@apache.org> wrote:

> Sorry, heesung,
>
> I think I used a confusing name "leader election".
> Actually, I meant to say "topic owner".
>
> From my understanding, the issue is if we are using the table view for a
> compacted topic.
> We will always get the last value of a key. But it will not work for the
> "broker ownership conflicts handling".
> First, we need to change the table view that is able to keep only the first
> value of a key.
> Even without compaction, the "broker ownership conflicts handling" will
> still work correctly, right?
>
>
Yes, the BSC conflict resolution needs to take the first valid value(state
change) per key, instead of just the latest value. For non-compacted topic,
only this table-view update(taking a strategic cache update) will serve its
purpose.


> But if the table view works on a compaction topic. The table view will show
> the last value of a key after the
> compaction. So you want also to change the topic compaction to make sure
> the table view will always show the
> first value of a key.
>
> Yes.


> Maybe I missed something here.
>
> My point is if we can just write the owner(final, the first value of the
> key) broker back to the topic.
> So that the table view will always show the first value of the key before
> the topic compaction or after the topic compaction.
>
>
But how do we conflict-resolve if the tail messages of the topic are
non-terminal states?

1. bundle 1 assigned by broker 1 // in the process of assignment
2. bundle 1 assigned by broker 2
3. bundle 2 released by broker 1 // in the process of transfer
3. bundle 2 assigned by broker 1
5. bundle 3 splitting by broker 1 // in the process of split
6. bundle 3 assigned by broker 2


Regards,
Heesung


> Thanks,
> Penghui
>
> On Sat, Oct 22, 2022 at 12:23 AM Heesung Sohn
> <he...@streamnative.io.invalid> wrote:
>
> > Hi Penghui,
> >
> > I put my answers inline.
> >
> > On Thu, Oct 20, 2022 at 5:11 PM PengHui Li <pe...@apache.org> wrote:
> >
> > > Hi Heesung.
> > >
> > > Is it possible to send the promoted value to the topic again to achieve
> > > eventual consistency?
> > >
> >
> > Yes, as long as the state change is valid, BSC will accept it and
> broadcast
> > it to all brokers.
> >
> >
> > > For example:
> > >
> > > We have 3 brokers, broker-a, broker-b, and broker-c
> > > The message for leader election could be "own: broker-b", "own:
> > broker-c",
> > > "own: broker-a"
> > > The broker-b will win in the end.
> > >
> > The broker-b can write a new message "own: broker-b" to the topic. After
> > > the topic compaction.
> > > Only the broker-b will be present in the topic. Does it work?
> >
> >
> > The proposal does not use a topic for leader election because of the
> > circular dependency. The proposal uses the metadata store, zookeeper, to
> > elect the leader broker(s) of BSC.
> > This part is explained in the "Bundle State Channel Owner Selection and
> > Discovery" section in pip-192.
> >
> > *Bundle State Channel Owner Selection and Discovery*
> >
> > *Bundle State Channel(BSC) is another topic, and because of its circular
> > dependency, we can't use the BundleStateChannel to find the owner broker
> of
> > the BSC topic. For example, when a cluster starts, each broker needs to
> > initiate BSC TopicLookUp(to find the owner broker) in order to consume
> the
> > messages in BSC. However, initially, each broker does not know which
> broker
> > owns the BSC.*
> >
> > *The ZK leader election can be a good option to break this circular
> > dependency, like the followings.*
> > *Channel Owner Selection*
> >
> > *The cluster can use the ZK leader election to select the owner broker.
> If
> > the owner becomes unavailable, one of the followers will become the new
> > owner. We can elect the owner for each bundle state channel partition.*
> > *Channel Owner Discovery*
> >
> > *Then, in brokers’ TopicLookUp logic, we will add a special case to
> return
> > the current leader(the elected BSC owner) for the BSC topics.*
> >
> >
> >
> > >
> > > Maybe I missed something.
> > >
> > > Thanks,
> > > Penghui
> > >
> > > On Thu, Oct 20, 2022 at 1:30 AM Heesung Sohn
> > > <he...@streamnative.io.invalid> wrote:
> > >
> > > > Oops.
> > > > I forgot to mention another important item. I added it below(in
> bold).
> > > >
> > > > Pros:
> > > > - It supports more distributed load balance operations(bundle
> > assignment)
> > > > in a sequentially consistent manner
> > > > - For really large clusters, by a partitioned system topic, BSC can
> be
> > > more
> > > > scalable than the current single-leader coordination solution.
> > > > - The load balance commands(across brokers) are sent via event
> > > > sourcing(more reliable/transparent/easy-to-track) instead of RPC with
> > > > retries.
> > > > *- Bundle ownerships can be cached in the topic table-view from BSC.
> > (no
> > > > longer needs to store bundle ownership in metadata store(ZK))*
> > > >
> > > > Cons:
> > > > - It is a new implementation and will require significant effort to
> > > > stabilize the new implementation.
> > > > (Based on our PoC code, I think the event sourcing handlers are
> easier
> > to
> > > > understand and follow the logic.
> > > > Also, this new load balancer will be pluggable(will be implemented in
> > new
> > > > classes), so it should not break the existing load balance logic.
> > > > Users will be able to configure old/new broker load balancer.)
> > > >
> > > > On Wed, Oct 19, 2022 at 10:17 AM Heesung Sohn <
> > > > heesung.sohn@streamnative.io>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > On Wed, Oct 19, 2022 at 2:06 AM 丛搏 <co...@gmail.com> wrote:
> > > > >
> > > > >> Hi, Heesung:
> > > > >> I have some doubts.
> > > > >> I review your PIP-192: New Pulsar Broker Load Balancer. I found
> that
> > > > >> unload topic uses the leader broker to do, (Assigning, Return)
> uses
> > > > >> the lookup request broker. why (Assigning, Return) not use a
> leader
> > > > >> broker?
> > > > >> I can think of a few reasons:
> > > > >> 1. reduce leader broker pressure
> > > > >> 2. does not strongly depend on the leader broker
> > > > >>
> > > > >> Yes, one of the goals of the PIP-192 is to distribute the load
> > balance
> > > > > logic to individual brokers (bundle assignment and bundle split).
> > > > >
> > > > > If (Assigning, Return) does not depend on the leader, it will bring
> > > > > the following problems:
> > > > >
> > > > >> If (Assigning, Return) does not depend on the leader, it will
> bring
> > > > >> the following problems:
> > > > >
> > > > > I assume what you meant by `(Assigning, Return) does not depend on
> > the
> > > > > leader` is the distributed topic assignment here(concurrent bundle
> > > > > assignment across brokers).
> > > > >
> > > > > 1. leader clear bundle op and (Assigning, Return) will do at the
> same
> > > > >> time, It will cause many requests to be retried, and the broker
> will
> > > > >> be in chaos for a long time.
> > > > >
> > > > > I assume `leader clear bundle op` means bundle unloading, and `many
> > > > > requests` means topic lookup requests(bundle assignment requests).
> > > > > The leader unloads only high-loaded bundles in the "Owned" state.
> So,
> > > the
> > > > > leader does not unload bundles that are in the process of
> assignment
> > > > states.
> > > > > Even if there are conflict state changes, only the first valid
> state
> > > > > change will be accepted(as explained in Conflict State
> > Resolution(Race
> > > > > Conditions section in the PIP)) in BSC.
> > > > >
> > > > > Also, another goal of this PIP-192 is to reduce client lookup
> > retries.
> > > In
> > > > > BSC, the client lookup response will be deferred(max x secs) until
> > the
> > > > > bundle state becomes finally "Owned".
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >> 2. bundle State Channel(BSC) owner depends on the leader broker,
> > this
> > > > >> also makes topic transfer strongly dependent on the leader.
> > > > >>
> > > > > BSC will use separate leader znodes to decide the owner brokers of
> > the
> > > > > internal BSC system topic.As described in this section in the
> > PIP-192,
> > > > > "Bundle State and Load Data TableView Scalability",
> > > > > We could use a partitioned topic(configurable) for this BSC system
> > > topic.
> > > > > Then, there could be a separate owner broker for each partition
> > > > > (e.g. zk leader znodes, /loadbalance/leader/part-1-owner,
> > part-2-owner,
> > > > > ..etc).
> > > > >
> > > > >
> > > > >
> > > > >> 3. the code becomes more complex and harder to maintain
> > > > >>
> > > > >> What tradeoffs are the current implementations based on?
> > > > >>
> > > > >> Here are some Pros and Cons of BSC I can think of.
> > > > >
> > > > > Pros:
> > > > > - It supports more distributed load balance operations(bundle
> > > assignment)
> > > > > in a sequentially consistent manner
> > > > > - For really large clusters, by a partitioned system topic, BSC can
> > be
> > > > > more scalable than the current single-leader coordination solution.
> > > > > - The load balance commands(across brokers) are sent via event
> > > > > sourcing(more reliable/transparent/easy-to-track) instead of RPC
> with
> > > > > retries.
> > > > >
> > > > > Cons:
> > > > > - It is a new implementation and will require significant effort to
> > > > > stabilize the new implementation.
> > > > > (Based on our PoC code, I think the event sourcing handlers are
> > easier
> > > to
> > > > > understand and follow the logic.
> > > > > Also, this new load balancer will be pluggable(will be implemented
> in
> > > new
> > > > > classes), so it should not break the existing load balance logic.
> > > > > Users will be able to configure old/new broker load balancer.)
> > > > >
> > > > >
> > > > > Thank you for sharing your questions about PIP-192 here. But I
> think
> > > this
> > > > > PIP-215 is independent of PIP-192(though PIP-192 needs some of the
> > > > features
> > > > > in PIP-215).
> > > > >
> > > > > Thanks,
> > > > > Heesung
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >> Thanks,
> > > > >> bo
> > > > >>
> > > > >> Heesung Sohn <he...@streamnative.io.invalid>
> 于2022年10月19日周三
> > > > >> 07:54写道:
> > > > >> >
> > > > >> > Hi pulsar-dev community,
> > > > >> >
> > > > >> > I raised a pip to discuss : PIP-215: Configurable Topic
> Compaction
> > > > >> Strategy
> > > > >> >
> > > > >> > PIP link: https://github.com/apache/pulsar/issues/18099
> > > > >> >
> > > > >> > Regards,
> > > > >> > Heesung
> > > > >>
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by PengHui Li <pe...@apache.org>.
Sorry, heesung,

I think I used a confusing name "leader election".
Actually, I meant to say "topic owner".

From my understanding, the issue is if we are using the table view for a
compacted topic.
We will always get the last value of a key. But it will not work for the
"broker ownership conflicts handling".
First, we need to change the table view that is able to keep only the first
value of a key.
Even without compaction, the "broker ownership conflicts handling" will
still work correctly, right?

But if the table view works on a compaction topic. The table view will show
the last value of a key after the
compaction. So you want also to change the topic compaction to make sure
the table view will always show the
first value of a key.

Maybe I missed something here.

My point is if we can just write the owner(final, the first value of the
key) broker back to the topic.
So that the table view will always show the first value of the key before
the topic compaction or after the topic compaction.

Thanks,
Penghui

On Sat, Oct 22, 2022 at 12:23 AM Heesung Sohn
<he...@streamnative.io.invalid> wrote:

> Hi Penghui,
>
> I put my answers inline.
>
> On Thu, Oct 20, 2022 at 5:11 PM PengHui Li <pe...@apache.org> wrote:
>
> > Hi Heesung.
> >
> > Is it possible to send the promoted value to the topic again to achieve
> > eventual consistency?
> >
>
> Yes, as long as the state change is valid, BSC will accept it and broadcast
> it to all brokers.
>
>
> > For example:
> >
> > We have 3 brokers, broker-a, broker-b, and broker-c
> > The message for leader election could be "own: broker-b", "own:
> broker-c",
> > "own: broker-a"
> > The broker-b will win in the end.
> >
> The broker-b can write a new message "own: broker-b" to the topic. After
> > the topic compaction.
> > Only the broker-b will be present in the topic. Does it work?
>
>
> The proposal does not use a topic for leader election because of the
> circular dependency. The proposal uses the metadata store, zookeeper, to
> elect the leader broker(s) of BSC.
> This part is explained in the "Bundle State Channel Owner Selection and
> Discovery" section in pip-192.
>
> *Bundle State Channel Owner Selection and Discovery*
>
> *Bundle State Channel(BSC) is another topic, and because of its circular
> dependency, we can't use the BundleStateChannel to find the owner broker of
> the BSC topic. For example, when a cluster starts, each broker needs to
> initiate BSC TopicLookUp(to find the owner broker) in order to consume the
> messages in BSC. However, initially, each broker does not know which broker
> owns the BSC.*
>
> *The ZK leader election can be a good option to break this circular
> dependency, like the followings.*
> *Channel Owner Selection*
>
> *The cluster can use the ZK leader election to select the owner broker. If
> the owner becomes unavailable, one of the followers will become the new
> owner. We can elect the owner for each bundle state channel partition.*
> *Channel Owner Discovery*
>
> *Then, in brokers’ TopicLookUp logic, we will add a special case to return
> the current leader(the elected BSC owner) for the BSC topics.*
>
>
>
> >
> > Maybe I missed something.
> >
> > Thanks,
> > Penghui
> >
> > On Thu, Oct 20, 2022 at 1:30 AM Heesung Sohn
> > <he...@streamnative.io.invalid> wrote:
> >
> > > Oops.
> > > I forgot to mention another important item. I added it below(in bold).
> > >
> > > Pros:
> > > - It supports more distributed load balance operations(bundle
> assignment)
> > > in a sequentially consistent manner
> > > - For really large clusters, by a partitioned system topic, BSC can be
> > more
> > > scalable than the current single-leader coordination solution.
> > > - The load balance commands(across brokers) are sent via event
> > > sourcing(more reliable/transparent/easy-to-track) instead of RPC with
> > > retries.
> > > *- Bundle ownerships can be cached in the topic table-view from BSC.
> (no
> > > longer needs to store bundle ownership in metadata store(ZK))*
> > >
> > > Cons:
> > > - It is a new implementation and will require significant effort to
> > > stabilize the new implementation.
> > > (Based on our PoC code, I think the event sourcing handlers are easier
> to
> > > understand and follow the logic.
> > > Also, this new load balancer will be pluggable(will be implemented in
> new
> > > classes), so it should not break the existing load balance logic.
> > > Users will be able to configure old/new broker load balancer.)
> > >
> > > On Wed, Oct 19, 2022 at 10:17 AM Heesung Sohn <
> > > heesung.sohn@streamnative.io>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > On Wed, Oct 19, 2022 at 2:06 AM 丛搏 <co...@gmail.com> wrote:
> > > >
> > > >> Hi, Heesung:
> > > >> I have some doubts.
> > > >> I review your PIP-192: New Pulsar Broker Load Balancer. I found that
> > > >> unload topic uses the leader broker to do, (Assigning, Return) uses
> > > >> the lookup request broker. why (Assigning, Return) not use a leader
> > > >> broker?
> > > >> I can think of a few reasons:
> > > >> 1. reduce leader broker pressure
> > > >> 2. does not strongly depend on the leader broker
> > > >>
> > > >> Yes, one of the goals of the PIP-192 is to distribute the load
> balance
> > > > logic to individual brokers (bundle assignment and bundle split).
> > > >
> > > > If (Assigning, Return) does not depend on the leader, it will bring
> > > > the following problems:
> > > >
> > > >> If (Assigning, Return) does not depend on the leader, it will bring
> > > >> the following problems:
> > > >
> > > > I assume what you meant by `(Assigning, Return) does not depend on
> the
> > > > leader` is the distributed topic assignment here(concurrent bundle
> > > > assignment across brokers).
> > > >
> > > > 1. leader clear bundle op and (Assigning, Return) will do at the same
> > > >> time, It will cause many requests to be retried, and the broker will
> > > >> be in chaos for a long time.
> > > >
> > > > I assume `leader clear bundle op` means bundle unloading, and `many
> > > > requests` means topic lookup requests(bundle assignment requests).
> > > > The leader unloads only high-loaded bundles in the "Owned" state. So,
> > the
> > > > leader does not unload bundles that are in the process of assignment
> > > states.
> > > > Even if there are conflict state changes, only the first valid state
> > > > change will be accepted(as explained in Conflict State
> Resolution(Race
> > > > Conditions section in the PIP)) in BSC.
> > > >
> > > > Also, another goal of this PIP-192 is to reduce client lookup
> retries.
> > In
> > > > BSC, the client lookup response will be deferred(max x secs) until
> the
> > > > bundle state becomes finally "Owned".
> > > >
> > > >
> > > >
> > > >
> > > >> 2. bundle State Channel(BSC) owner depends on the leader broker,
> this
> > > >> also makes topic transfer strongly dependent on the leader.
> > > >>
> > > > BSC will use separate leader znodes to decide the owner brokers of
> the
> > > > internal BSC system topic.As described in this section in the
> PIP-192,
> > > > "Bundle State and Load Data TableView Scalability",
> > > > We could use a partitioned topic(configurable) for this BSC system
> > topic.
> > > > Then, there could be a separate owner broker for each partition
> > > > (e.g. zk leader znodes, /loadbalance/leader/part-1-owner,
> part-2-owner,
> > > > ..etc).
> > > >
> > > >
> > > >
> > > >> 3. the code becomes more complex and harder to maintain
> > > >>
> > > >> What tradeoffs are the current implementations based on?
> > > >>
> > > >> Here are some Pros and Cons of BSC I can think of.
> > > >
> > > > Pros:
> > > > - It supports more distributed load balance operations(bundle
> > assignment)
> > > > in a sequentially consistent manner
> > > > - For really large clusters, by a partitioned system topic, BSC can
> be
> > > > more scalable than the current single-leader coordination solution.
> > > > - The load balance commands(across brokers) are sent via event
> > > > sourcing(more reliable/transparent/easy-to-track) instead of RPC with
> > > > retries.
> > > >
> > > > Cons:
> > > > - It is a new implementation and will require significant effort to
> > > > stabilize the new implementation.
> > > > (Based on our PoC code, I think the event sourcing handlers are
> easier
> > to
> > > > understand and follow the logic.
> > > > Also, this new load balancer will be pluggable(will be implemented in
> > new
> > > > classes), so it should not break the existing load balance logic.
> > > > Users will be able to configure old/new broker load balancer.)
> > > >
> > > >
> > > > Thank you for sharing your questions about PIP-192 here. But I think
> > this
> > > > PIP-215 is independent of PIP-192(though PIP-192 needs some of the
> > > features
> > > > in PIP-215).
> > > >
> > > > Thanks,
> > > > Heesung
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >> Thanks,
> > > >> bo
> > > >>
> > > >> Heesung Sohn <he...@streamnative.io.invalid> 于2022年10月19日周三
> > > >> 07:54写道:
> > > >> >
> > > >> > Hi pulsar-dev community,
> > > >> >
> > > >> > I raised a pip to discuss : PIP-215: Configurable Topic Compaction
> > > >> Strategy
> > > >> >
> > > >> > PIP link: https://github.com/apache/pulsar/issues/18099
> > > >> >
> > > >> > Regards,
> > > >> > Heesung
> > > >>
> > > >
> > >
> >
>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Hi Penghui,

I put my answers inline.

On Thu, Oct 20, 2022 at 5:11 PM PengHui Li <pe...@apache.org> wrote:

> Hi Heesung.
>
> Is it possible to send the promoted value to the topic again to achieve
> eventual consistency?
>

Yes, as long as the state change is valid, BSC will accept it and broadcast
it to all brokers.


> For example:
>
> We have 3 brokers, broker-a, broker-b, and broker-c
> The message for leader election could be "own: broker-b", "own: broker-c",
> "own: broker-a"
> The broker-b will win in the end.
>
The broker-b can write a new message "own: broker-b" to the topic. After
> the topic compaction.
> Only the broker-b will be present in the topic. Does it work?


The proposal does not use a topic for leader election because of the
circular dependency. The proposal uses the metadata store, zookeeper, to
elect the leader broker(s) of BSC.
This part is explained in the "Bundle State Channel Owner Selection and
Discovery" section in pip-192.

*Bundle State Channel Owner Selection and Discovery*

*Bundle State Channel(BSC) is another topic, and because of its circular
dependency, we can't use the BundleStateChannel to find the owner broker of
the BSC topic. For example, when a cluster starts, each broker needs to
initiate BSC TopicLookUp(to find the owner broker) in order to consume the
messages in BSC. However, initially, each broker does not know which broker
owns the BSC.*

*The ZK leader election can be a good option to break this circular
dependency, like the followings.*
*Channel Owner Selection*

*The cluster can use the ZK leader election to select the owner broker. If
the owner becomes unavailable, one of the followers will become the new
owner. We can elect the owner for each bundle state channel partition.*
*Channel Owner Discovery*

*Then, in brokers’ TopicLookUp logic, we will add a special case to return
the current leader(the elected BSC owner) for the BSC topics.*



>
> Maybe I missed something.
>
> Thanks,
> Penghui
>
> On Thu, Oct 20, 2022 at 1:30 AM Heesung Sohn
> <he...@streamnative.io.invalid> wrote:
>
> > Oops.
> > I forgot to mention another important item. I added it below(in bold).
> >
> > Pros:
> > - It supports more distributed load balance operations(bundle assignment)
> > in a sequentially consistent manner
> > - For really large clusters, by a partitioned system topic, BSC can be
> more
> > scalable than the current single-leader coordination solution.
> > - The load balance commands(across brokers) are sent via event
> > sourcing(more reliable/transparent/easy-to-track) instead of RPC with
> > retries.
> > *- Bundle ownerships can be cached in the topic table-view from BSC. (no
> > longer needs to store bundle ownership in metadata store(ZK))*
> >
> > Cons:
> > - It is a new implementation and will require significant effort to
> > stabilize the new implementation.
> > (Based on our PoC code, I think the event sourcing handlers are easier to
> > understand and follow the logic.
> > Also, this new load balancer will be pluggable(will be implemented in new
> > classes), so it should not break the existing load balance logic.
> > Users will be able to configure old/new broker load balancer.)
> >
> > On Wed, Oct 19, 2022 at 10:17 AM Heesung Sohn <
> > heesung.sohn@streamnative.io>
> > wrote:
> >
> > > Hi,
> > >
> > > On Wed, Oct 19, 2022 at 2:06 AM 丛搏 <co...@gmail.com> wrote:
> > >
> > >> Hi, Heesung:
> > >> I have some doubts.
> > >> I review your PIP-192: New Pulsar Broker Load Balancer. I found that
> > >> unload topic uses the leader broker to do, (Assigning, Return) uses
> > >> the lookup request broker. why (Assigning, Return) not use a leader
> > >> broker?
> > >> I can think of a few reasons:
> > >> 1. reduce leader broker pressure
> > >> 2. does not strongly depend on the leader broker
> > >>
> > >> Yes, one of the goals of the PIP-192 is to distribute the load balance
> > > logic to individual brokers (bundle assignment and bundle split).
> > >
> > > If (Assigning, Return) does not depend on the leader, it will bring
> > > the following problems:
> > >
> > >> If (Assigning, Return) does not depend on the leader, it will bring
> > >> the following problems:
> > >
> > > I assume what you meant by `(Assigning, Return) does not depend on the
> > > leader` is the distributed topic assignment here(concurrent bundle
> > > assignment across brokers).
> > >
> > > 1. leader clear bundle op and (Assigning, Return) will do at the same
> > >> time, It will cause many requests to be retried, and the broker will
> > >> be in chaos for a long time.
> > >
> > > I assume `leader clear bundle op` means bundle unloading, and `many
> > > requests` means topic lookup requests(bundle assignment requests).
> > > The leader unloads only high-loaded bundles in the "Owned" state. So,
> the
> > > leader does not unload bundles that are in the process of assignment
> > states.
> > > Even if there are conflict state changes, only the first valid state
> > > change will be accepted(as explained in Conflict State Resolution(Race
> > > Conditions section in the PIP)) in BSC.
> > >
> > > Also, another goal of this PIP-192 is to reduce client lookup retries.
> In
> > > BSC, the client lookup response will be deferred(max x secs) until the
> > > bundle state becomes finally "Owned".
> > >
> > >
> > >
> > >
> > >> 2. bundle State Channel(BSC) owner depends on the leader broker, this
> > >> also makes topic transfer strongly dependent on the leader.
> > >>
> > > BSC will use separate leader znodes to decide the owner brokers of the
> > > internal BSC system topic.As described in this section in the PIP-192,
> > > "Bundle State and Load Data TableView Scalability",
> > > We could use a partitioned topic(configurable) for this BSC system
> topic.
> > > Then, there could be a separate owner broker for each partition
> > > (e.g. zk leader znodes, /loadbalance/leader/part-1-owner, part-2-owner,
> > > ..etc).
> > >
> > >
> > >
> > >> 3. the code becomes more complex and harder to maintain
> > >>
> > >> What tradeoffs are the current implementations based on?
> > >>
> > >> Here are some Pros and Cons of BSC I can think of.
> > >
> > > Pros:
> > > - It supports more distributed load balance operations(bundle
> assignment)
> > > in a sequentially consistent manner
> > > - For really large clusters, by a partitioned system topic, BSC can be
> > > more scalable than the current single-leader coordination solution.
> > > - The load balance commands(across brokers) are sent via event
> > > sourcing(more reliable/transparent/easy-to-track) instead of RPC with
> > > retries.
> > >
> > > Cons:
> > > - It is a new implementation and will require significant effort to
> > > stabilize the new implementation.
> > > (Based on our PoC code, I think the event sourcing handlers are easier
> to
> > > understand and follow the logic.
> > > Also, this new load balancer will be pluggable(will be implemented in
> new
> > > classes), so it should not break the existing load balance logic.
> > > Users will be able to configure old/new broker load balancer.)
> > >
> > >
> > > Thank you for sharing your questions about PIP-192 here. But I think
> this
> > > PIP-215 is independent of PIP-192(though PIP-192 needs some of the
> > features
> > > in PIP-215).
> > >
> > > Thanks,
> > > Heesung
> > >
> > >
> > >
> > >
> > >
> > >> Thanks,
> > >> bo
> > >>
> > >> Heesung Sohn <he...@streamnative.io.invalid> 于2022年10月19日周三
> > >> 07:54写道:
> > >> >
> > >> > Hi pulsar-dev community,
> > >> >
> > >> > I raised a pip to discuss : PIP-215: Configurable Topic Compaction
> > >> Strategy
> > >> >
> > >> > PIP link: https://github.com/apache/pulsar/issues/18099
> > >> >
> > >> > Regards,
> > >> > Heesung
> > >>
> > >
> >
>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by PengHui Li <pe...@apache.org>.
Hi Heesung.

Is it possible to send the promoted value to the topic again to achieve
eventual consistency?
For example:

We have 3 brokers, broker-a, broker-b, and broker-c
The message for leader election could be "own: broker-b", "own: broker-c",
"own: broker-a"
The broker-b will win in the end.

The broker-b can write a new message "own: broker-b" to the topic. After
the topic compaction.
Only the broker-b will be present in the topic. Does it work?

Maybe I missed something.

Thanks,
Penghui

On Thu, Oct 20, 2022 at 1:30 AM Heesung Sohn
<he...@streamnative.io.invalid> wrote:

> Oops.
> I forgot to mention another important item. I added it below(in bold).
>
> Pros:
> - It supports more distributed load balance operations(bundle assignment)
> in a sequentially consistent manner
> - For really large clusters, by a partitioned system topic, BSC can be more
> scalable than the current single-leader coordination solution.
> - The load balance commands(across brokers) are sent via event
> sourcing(more reliable/transparent/easy-to-track) instead of RPC with
> retries.
> *- Bundle ownerships can be cached in the topic table-view from BSC. (no
> longer needs to store bundle ownership in metadata store(ZK))*
>
> Cons:
> - It is a new implementation and will require significant effort to
> stabilize the new implementation.
> (Based on our PoC code, I think the event sourcing handlers are easier to
> understand and follow the logic.
> Also, this new load balancer will be pluggable(will be implemented in new
> classes), so it should not break the existing load balance logic.
> Users will be able to configure old/new broker load balancer.)
>
> On Wed, Oct 19, 2022 at 10:17 AM Heesung Sohn <
> heesung.sohn@streamnative.io>
> wrote:
>
> > Hi,
> >
> > On Wed, Oct 19, 2022 at 2:06 AM 丛搏 <co...@gmail.com> wrote:
> >
> >> Hi, Heesung:
> >> I have some doubts.
> >> I review your PIP-192: New Pulsar Broker Load Balancer. I found that
> >> unload topic uses the leader broker to do, (Assigning, Return) uses
> >> the lookup request broker. why (Assigning, Return) not use a leader
> >> broker?
> >> I can think of a few reasons:
> >> 1. reduce leader broker pressure
> >> 2. does not strongly depend on the leader broker
> >>
> >> Yes, one of the goals of the PIP-192 is to distribute the load balance
> > logic to individual brokers (bundle assignment and bundle split).
> >
> > If (Assigning, Return) does not depend on the leader, it will bring
> > the following problems:
> >
> >> If (Assigning, Return) does not depend on the leader, it will bring
> >> the following problems:
> >
> > I assume what you meant by `(Assigning, Return) does not depend on the
> > leader` is the distributed topic assignment here(concurrent bundle
> > assignment across brokers).
> >
> > 1. leader clear bundle op and (Assigning, Return) will do at the same
> >> time, It will cause many requests to be retried, and the broker will
> >> be in chaos for a long time.
> >
> > I assume `leader clear bundle op` means bundle unloading, and `many
> > requests` means topic lookup requests(bundle assignment requests).
> > The leader unloads only high-loaded bundles in the "Owned" state. So, the
> > leader does not unload bundles that are in the process of assignment
> states.
> > Even if there are conflict state changes, only the first valid state
> > change will be accepted(as explained in Conflict State Resolution(Race
> > Conditions section in the PIP)) in BSC.
> >
> > Also, another goal of this PIP-192 is to reduce client lookup retries. In
> > BSC, the client lookup response will be deferred(max x secs) until the
> > bundle state becomes finally "Owned".
> >
> >
> >
> >
> >> 2. bundle State Channel(BSC) owner depends on the leader broker, this
> >> also makes topic transfer strongly dependent on the leader.
> >>
> > BSC will use separate leader znodes to decide the owner brokers of the
> > internal BSC system topic.As described in this section in the PIP-192,
> > "Bundle State and Load Data TableView Scalability",
> > We could use a partitioned topic(configurable) for this BSC system topic.
> > Then, there could be a separate owner broker for each partition
> > (e.g. zk leader znodes, /loadbalance/leader/part-1-owner, part-2-owner,
> > ..etc).
> >
> >
> >
> >> 3. the code becomes more complex and harder to maintain
> >>
> >> What tradeoffs are the current implementations based on?
> >>
> >> Here are some Pros and Cons of BSC I can think of.
> >
> > Pros:
> > - It supports more distributed load balance operations(bundle assignment)
> > in a sequentially consistent manner
> > - For really large clusters, by a partitioned system topic, BSC can be
> > more scalable than the current single-leader coordination solution.
> > - The load balance commands(across brokers) are sent via event
> > sourcing(more reliable/transparent/easy-to-track) instead of RPC with
> > retries.
> >
> > Cons:
> > - It is a new implementation and will require significant effort to
> > stabilize the new implementation.
> > (Based on our PoC code, I think the event sourcing handlers are easier to
> > understand and follow the logic.
> > Also, this new load balancer will be pluggable(will be implemented in new
> > classes), so it should not break the existing load balance logic.
> > Users will be able to configure old/new broker load balancer.)
> >
> >
> > Thank you for sharing your questions about PIP-192 here. But I think this
> > PIP-215 is independent of PIP-192(though PIP-192 needs some of the
> features
> > in PIP-215).
> >
> > Thanks,
> > Heesung
> >
> >
> >
> >
> >
> >> Thanks,
> >> bo
> >>
> >> Heesung Sohn <he...@streamnative.io.invalid> 于2022年10月19日周三
> >> 07:54写道:
> >> >
> >> > Hi pulsar-dev community,
> >> >
> >> > I raised a pip to discuss : PIP-215: Configurable Topic Compaction
> >> Strategy
> >> >
> >> > PIP link: https://github.com/apache/pulsar/issues/18099
> >> >
> >> > Regards,
> >> > Heesung
> >>
> >
>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Oops.
I forgot to mention another important item. I added it below(in bold).

Pros:
- It supports more distributed load balance operations(bundle assignment)
in a sequentially consistent manner
- For really large clusters, by a partitioned system topic, BSC can be more
scalable than the current single-leader coordination solution.
- The load balance commands(across brokers) are sent via event
sourcing(more reliable/transparent/easy-to-track) instead of RPC with
retries.
*- Bundle ownerships can be cached in the topic table-view from BSC. (no
longer needs to store bundle ownership in metadata store(ZK))*

Cons:
- It is a new implementation and will require significant effort to
stabilize the new implementation.
(Based on our PoC code, I think the event sourcing handlers are easier to
understand and follow the logic.
Also, this new load balancer will be pluggable(will be implemented in new
classes), so it should not break the existing load balance logic.
Users will be able to configure old/new broker load balancer.)

On Wed, Oct 19, 2022 at 10:17 AM Heesung Sohn <he...@streamnative.io>
wrote:

> Hi,
>
> On Wed, Oct 19, 2022 at 2:06 AM 丛搏 <co...@gmail.com> wrote:
>
>> Hi, Heesung:
>> I have some doubts.
>> I review your PIP-192: New Pulsar Broker Load Balancer. I found that
>> unload topic uses the leader broker to do, (Assigning, Return) uses
>> the lookup request broker. why (Assigning, Return) not use a leader
>> broker?
>> I can think of a few reasons:
>> 1. reduce leader broker pressure
>> 2. does not strongly depend on the leader broker
>>
>> Yes, one of the goals of the PIP-192 is to distribute the load balance
> logic to individual brokers (bundle assignment and bundle split).
>
> If (Assigning, Return) does not depend on the leader, it will bring
> the following problems:
>
>> If (Assigning, Return) does not depend on the leader, it will bring
>> the following problems:
>
> I assume what you meant by `(Assigning, Return) does not depend on the
> leader` is the distributed topic assignment here(concurrent bundle
> assignment across brokers).
>
> 1. leader clear bundle op and (Assigning, Return) will do at the same
>> time, It will cause many requests to be retried, and the broker will
>> be in chaos for a long time.
>
> I assume `leader clear bundle op` means bundle unloading, and `many
> requests` means topic lookup requests(bundle assignment requests).
> The leader unloads only high-loaded bundles in the "Owned" state. So, the
> leader does not unload bundles that are in the process of assignment states.
> Even if there are conflict state changes, only the first valid state
> change will be accepted(as explained in Conflict State Resolution(Race
> Conditions section in the PIP)) in BSC.
>
> Also, another goal of this PIP-192 is to reduce client lookup retries. In
> BSC, the client lookup response will be deferred(max x secs) until the
> bundle state becomes finally "Owned".
>
>
>
>
>> 2. bundle State Channel(BSC) owner depends on the leader broker, this
>> also makes topic transfer strongly dependent on the leader.
>>
> BSC will use separate leader znodes to decide the owner brokers of the
> internal BSC system topic.As described in this section in the PIP-192,
> "Bundle State and Load Data TableView Scalability",
> We could use a partitioned topic(configurable) for this BSC system topic.
> Then, there could be a separate owner broker for each partition
> (e.g. zk leader znodes, /loadbalance/leader/part-1-owner, part-2-owner,
> ..etc).
>
>
>
>> 3. the code becomes more complex and harder to maintain
>>
>> What tradeoffs are the current implementations based on?
>>
>> Here are some Pros and Cons of BSC I can think of.
>
> Pros:
> - It supports more distributed load balance operations(bundle assignment)
> in a sequentially consistent manner
> - For really large clusters, by a partitioned system topic, BSC can be
> more scalable than the current single-leader coordination solution.
> - The load balance commands(across brokers) are sent via event
> sourcing(more reliable/transparent/easy-to-track) instead of RPC with
> retries.
>
> Cons:
> - It is a new implementation and will require significant effort to
> stabilize the new implementation.
> (Based on our PoC code, I think the event sourcing handlers are easier to
> understand and follow the logic.
> Also, this new load balancer will be pluggable(will be implemented in new
> classes), so it should not break the existing load balance logic.
> Users will be able to configure old/new broker load balancer.)
>
>
> Thank you for sharing your questions about PIP-192 here. But I think this
> PIP-215 is independent of PIP-192(though PIP-192 needs some of the features
> in PIP-215).
>
> Thanks,
> Heesung
>
>
>
>
>
>> Thanks,
>> bo
>>
>> Heesung Sohn <he...@streamnative.io.invalid> 于2022年10月19日周三
>> 07:54写道:
>> >
>> > Hi pulsar-dev community,
>> >
>> > I raised a pip to discuss : PIP-215: Configurable Topic Compaction
>> Strategy
>> >
>> > PIP link: https://github.com/apache/pulsar/issues/18099
>> >
>> > Regards,
>> > Heesung
>>
>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Hi,

On Wed, Oct 19, 2022 at 2:06 AM 丛搏 <co...@gmail.com> wrote:

> Hi, Heesung:
> I have some doubts.
> I review your PIP-192: New Pulsar Broker Load Balancer. I found that
> unload topic uses the leader broker to do, (Assigning, Return) uses
> the lookup request broker. why (Assigning, Return) not use a leader
> broker?
> I can think of a few reasons:
> 1. reduce leader broker pressure
> 2. does not strongly depend on the leader broker
>
> Yes, one of the goals of the PIP-192 is to distribute the load balance
logic to individual brokers (bundle assignment and bundle split).

If (Assigning, Return) does not depend on the leader, it will bring
the following problems:

> If (Assigning, Return) does not depend on the leader, it will bring
> the following problems:

I assume what you meant by `(Assigning, Return) does not depend on the
leader` is the distributed topic assignment here(concurrent bundle
assignment across brokers).

1. leader clear bundle op and (Assigning, Return) will do at the same
> time, It will cause many requests to be retried, and the broker will
> be in chaos for a long time.

I assume `leader clear bundle op` means bundle unloading, and `many
requests` means topic lookup requests(bundle assignment requests).
The leader unloads only high-loaded bundles in the "Owned" state. So, the
leader does not unload bundles that are in the process of assignment states.
Even if there are conflict state changes, only the first valid state change
will be accepted(as explained in Conflict State Resolution(Race Conditions
section in the PIP)) in BSC.

Also, another goal of this PIP-192 is to reduce client lookup retries. In
BSC, the client lookup response will be deferred(max x secs) until the
bundle state becomes finally "Owned".




> 2. bundle State Channel(BSC) owner depends on the leader broker, this
> also makes topic transfer strongly dependent on the leader.
>
BSC will use separate leader znodes to decide the owner brokers of the
internal BSC system topic.As described in this section in the PIP-192,
"Bundle State and Load Data TableView Scalability",
We could use a partitioned topic(configurable) for this BSC system topic.
Then, there could be a separate owner broker for each partition
(e.g. zk leader znodes, /loadbalance/leader/part-1-owner, part-2-owner,
..etc).



> 3. the code becomes more complex and harder to maintain
>
> What tradeoffs are the current implementations based on?
>
> Here are some Pros and Cons of BSC I can think of.

Pros:
- It supports more distributed load balance operations(bundle assignment)
in a sequentially consistent manner
- For really large clusters, by a partitioned system topic, BSC can be more
scalable than the current single-leader coordination solution.
- The load balance commands(across brokers) are sent via event
sourcing(more reliable/transparent/easy-to-track) instead of RPC with
retries.

Cons:
- It is a new implementation and will require significant effort to
stabilize the new implementation.
(Based on our PoC code, I think the event sourcing handlers are easier to
understand and follow the logic.
Also, this new load balancer will be pluggable(will be implemented in new
classes), so it should not break the existing load balance logic.
Users will be able to configure old/new broker load balancer.)


Thank you for sharing your questions about PIP-192 here. But I think this
PIP-215 is independent of PIP-192(though PIP-192 needs some of the features
in PIP-215).

Thanks,
Heesung





> Thanks,
> bo
>
> Heesung Sohn <he...@streamnative.io.invalid> 于2022年10月19日周三
> 07:54写道:
> >
> > Hi pulsar-dev community,
> >
> > I raised a pip to discuss : PIP-215: Configurable Topic Compaction
> Strategy
> >
> > PIP link: https://github.com/apache/pulsar/issues/18099
> >
> > Regards,
> > Heesung
>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by 丛搏 <co...@gmail.com>.
Hi, Heesung:
I have some doubts.
I review your PIP-192: New Pulsar Broker Load Balancer. I found that
unload topic uses the leader broker to do, (Assigning, Return) uses
the lookup request broker. why (Assigning, Return) not use a leader
broker?
I can think of a few reasons:
1. reduce leader broker pressure
2. does not strongly depend on the leader broker

If (Assigning, Return) does not depend on the leader, it will bring
the following problems:
1. leader clear bundle op and (Assigning, Return) will do at the same
time, It will cause many requests to be retried, and the broker will
be in chaos for a long time.
2. bundle State Channel(BSC) owner depends on the leader broker, this
also makes topic transfer strongly dependent on the leader.
3. the code becomes more complex and harder to maintain

What tradeoffs are the current implementations based on?

Thanks,
bo

Heesung Sohn <he...@streamnative.io.invalid> 于2022年10月19日周三 07:54写道:
>
> Hi pulsar-dev community,
>
> I raised a pip to discuss : PIP-215: Configurable Topic Compaction Strategy
>
> PIP link: https://github.com/apache/pulsar/issues/18099
>
> Regards,
> Heesung