You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Michael Marshall <mm...@apache.org> on 2022/11/01 05:59:11 UTC

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

> I think we lose a single linearized view.

Which linearized view are we losing, and what is the role of that
linearized view? I think I might be missing why it is important. I
agree that consumers won't know about each unsuccessful attempted
acquisition of a bundle, but that seems like unnecessary information
to broadcast to every broker in the cluster.

> I think the leader requires a write-through cache to compact messages based
> on the latest states.

This brings up an important point that I would like to clarify. If we
trust the write ahead log as the source of truth, what happens when a
bundle has been validly owned by multiple brokers? As a broker starts
and consumes from the compacted topic, how do we prevent it from
incorrectly thinking that it owns a bundle for some short time period
in the case that the ownership topic hasn't yet been compacted to
remove old ownership state?

> Pulsar guarantees "a single writer".

I didn't think we were using a single writer in the PIP 192 design. I
thought we had many producers sending events to a compacted topic. My
proposal would still have many producers, but the writer to bookkeeper
would act as the single writer. It would technically be distinct from
a normal Pulsar topic producer.

I should highlight that I am only proposing "broker filtering before
write" in the context of PIP 192 and as an alternative to adding
pluggable compaction strategies. It would not be a generic feature.

> Could we clarify how to handle
> the following(edge cases and failure recovery)?
> 0. Is the un-compacted topic a persistent topic or a non-persistent topic?

It is a persistent topic.

> 1. How does the leader recover state from the two topics?

A leader would recover state by first consuming the whole compacted
topic and then by consuming from the current location of a cursor on
the first input topic. As stated elsewhere, this introduces latency
and could be an issue.

> 2. How do we handle the case when the leader fails before writing messages
> to the compacted topic

The leader would not acknowledge the message on the input topic until
it has successfully persisted the event on the compacted topic.
Publishing the same event to a compacted topic multiple times is
idempotent, so there is no risk of lost state. The real risk is
latency. However, I think we might have similar (though not the same)
latency risks in the current solution.

> Analysis: the "pre-filter + two-topics" option can reduce the number of
> messages to broadcast at the expense of the leader broker compaction.

My primary point is that with this PIP's design, the filter logic is
run on every broker and again during topic compaction. With the
alternative design, the filter is run once.

> 3. initially less complex to implement (leaderless conflict resolution and
> requires a single topic)

PIP 215 has its own complexity too. Coordinating filters
on both the client (table view) and the server (compaction) is non
trivial. The proposed API includes hard coded client configuration for
each component, which will make upgrading the version of the
compaction strategy complicated, and could lead to incorrect
interpretation of events in the stream. When a single broker is doing
the filtering, versioning is no longer a distributed problem. That
being said, I do not mean to suggest my solution is without
complexity.

> 4. it is not a "one-way door" decision (we could add the pre-filter logic
> as well later)

It's fair to say that we could add it later, but at that point, we
will have added this new API for compaction strategy. Are we confident
that pluggable compaction is independently an important addition to Pulsar's
features, or would it make sense to make this API only exposed in the broker?

Thanks,
Michael





On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
<he...@streamnative.io.invalid> wrote:
>
> Hi,
>
> Also, I thought about some concurrent assignment scenarios between
> pre-filter vs post-filter.
>
> Example 1: When the topic broadcast is slower than the concurrent
> assignment requests
>
> With pre-filter + two-topics (non-compacted and compacted topics)
> t1: A -> non-compacted topic // broker A published a message to the
> non-compacted topic, m1: {broker A assigned bundle x to broker A}
> t2: B -> non-compacted topic // broker B published a message, m2: {broker B
> assigned bundle x to broker C}
> t3: C -> non-compacted topic // broker C published a message, m3: {broker C
> assigned bundle x to broker B}
> t4: non-compacted topic -> L // leader broker consumed the messages: m1,m2,
> and m3
> t5: L -> compacted topic // leader compacted the messages and broadcasted
> m1 to all consumers
> t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>
> With post-filter + a single topic
> t1: A -> topic // broker A published a message to the non-compacted topic,
> m1: {broker A assigned bundle x to broker A}
> t2: B -> topic // broker B published a message, m2: {broker B assigned
> bundle x to broker C}
> t3: C -> topic // broker C published a message, m3: {broker C assigned
> bundle x to broker B}
> t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2, and m3
> t5: [A,B,C] -> m1 // broker A,B,C individually compacted the messages to m1.
>
> Analysis: the "pre-filter + two-topics" option can reduce the number of
> messages to broadcast at the expense of the leader broker compaction.
>
>
> Example 2: When the topic broadcast is faster than the concurrent
> assignment requests
>
> With pre-filter + two-topics (non-compacted and compacted topics)
> t1: A -> non-compacted topic // broker A published a message to the
> non-compacted topic, m1: {broker A assigned bundle x to broker A}
> t2: non-compacted topic -> L // leader broker consumed the messages: m1
> t3: L -> compacted topic // leader compacted the message and broadcasted m1
> to all consumers
> t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
> t5: A-> own bundle // broker A knows that its assignment has been accepted,
> so proceeding to own the bundle (meanwhile deferring lookup requests)
> t6: B -> defer client lookups // broker B knows that bundle assignment is
> running(meanwhile deferring lookup requests)
> t7: C -> defer client lookups // broker C knows that bundle assignment is
> running(meanwhile deferring lookup requests)
>
> With post-filter + a single topic
> t1: A -> topic // broker A published a message to the non-compacted topic,
> m1: {broker A assigned bundle x to broker A}
> t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
> t3:  A-> own bundle // broker A knows that its assignment has been
> accepted, so proceeding to own the bundle (meanwhile deferring lookup
> requests)
> t4: B -> defer client lookups // broker B knows that bundle assignment is
> running(meanwhile deferring lookup requests)
> t5: C -> defer client lookups // broker C knows that bundle assignment is
> running(meanwhile deferring lookup requests)
>
> Analysis: The "post-filter + a single topic" can perform ok in this case
> without the additional leader coordination and the secondary topic because
> the early broadcast can inform all brokers and prevent them from requesting
> other assignments for the same bundle.
>
> I think the post-filter option is initially not bad because:
>
> 1. it is safe in the worst case (in case the messages are not correctly
> pre-filtered at the leader)
> 2. it performs ok because the early broadcast can prevent
> concurrent assignment requests.
> 3. initially less complex to implement (leaderless conflict resolution and
> requires a single topic)
> 4. it is not a "one-way door" decision (we could add the pre-filter logic
> as well later)
>
> Regards,
> Heesung
>
>
>
> On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <he...@streamnative.io>
> wrote:
>
> > Hi Michael,
> >
> > For the pre-prefilter(pre-compaction) option,
> > I think the leader requires a write-through cache to compact messages
> > based on the latest states. Otherwise, the leader needs to wait for the
> > last msg from the (compacted) topic before compacting the next msg for the
> > same bundle.
> >
> > Pulsar guarantees "a single writer". However, for the worst-case
> > scenario(due to network partitions, bugs in zk or etcd leader election,
> > bugs in bk, data corruption ), I think it is safe to place the post-filter
> > on the consumer side(compaction and table views) as well in order to
> > validate the state changes.
> >
> > For the two-topic approach,
> > I think we lose a single linearized view. Could we clarify how to handle
> > the following(edge cases and failure recovery)?
> > 0. Is the un-compacted topic a persistent topic or a non-persistent topic?
> > 1. How does the leader recover state from the two topics?
> > 2. How do we handle the case when the leader fails before writing messages
> > to the compacted topic
> >
> > Regards,
> > Heesung
> >
> > On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <mm...@apache.org>
> > wrote:
> >
> >> Sharing some more thoughts. We could alternatively use two topics
> >> instead of one. In this design, the first topic is the unfiltered
> >> write ahead log that represents many writers (brokers) trying to
> >> acquire ownership of bundles. The second topic is the distilled log
> >> that represents the "winners" or the "owners" of the bundles. There is
> >> a single writer, the leader broker, that reads from the input topic
> >> and writes to the output topic. The first topic is normal and the
> >> second is compacted.
> >>
> >> The primary benefit in a two topic solution is that it is easy for the
> >> leader broker to trade off ownership without needing to slow down
> >> writes to the input topic. The leader broker will start consuming from
> >> the input topic when it has fully consumed the table view on the
> >> output topic. In general, I don't think consumers know when they have
> >> "reached the end of a table view", but we should be able to trivially
> >> figure this out if we are the topic's only writer and the topic and
> >> writer are collocated on the same broker.
> >>
> >> In that design, it might make sense to use something like the
> >> replication cursor to keep track of this consumer's state.
> >>
> >> - Michael
> >>
> >> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <mm...@apache.org>
> >> wrote:
> >> >
> >> > Thanks for your proposal, Heesung.
> >> >
> >> > Fundamentally, we have the problems listed in this PIP because we have
> >> > multiple writers instead of just one writer. Can we solve this problem
> >> > by changing our write pattern? What if we use the leader broker as the
> >> > single writer? That broker would intercept attempts to acquire
> >> > ownership on bundles and would grant ownership to the first broker to
> >> > claim an unassigned bundle. It could "grant ownership" by letting the
> >> > first write to claim an unassigned bundle get written to the ownership
> >> > topic. When a bundle is already owned, the leader won't persist that
> >> > event to the bookkeeper. In this design, the log becomes a true
> >> > ownership log, which will correctly work with the existing topic
> >> > compaction and table view solutions. My proposal essentially moves the
> >> > conflict resolution to just before the write, and as a consequence, it
> >> > greatly reduces the need for post processing of the event log. One
> >> > trade off might be that the leader broker could slow down the write
> >> > path, but given that the leader would just need to verify the current
> >> > state of the bundle, I think it'd be performant enough.
> >> >
> >> > Additionally, we'd need the leader broker to be "caught up" on bundle
> >> > ownership in order to grant ownership of topics, but unless I am
> >> > mistaken, that is already a requirement of the current PIP 192
> >> > paradigm.
> >> >
> >> > Below are some additional thoughts that will be relevant if we move
> >> > forward with the design as it is currently proposed.
> >> >
> >> > I think it might be helpful to update the title to show that this
> >> > proposal will also affect table view as well. I didn't catch that at
> >> > first.
> >> >
> >> > Do you have any documentation describing how the
> >> > TopicCompactionStrategy will determine which states are valid in the
> >> > context of load balancing? I looked at
> >> > https://github.com/apache/pulsar/pull/18195, but I couldn't seem to
> >> > find anything for it. That would help make this proposal less
> >> > abstract.
> >> >
> >> > The proposed API seems very tied to the needs of PIP 192. For example,
> >> > `isValid` is not a term I associate with topic compaction. The
> >> > fundamental question for compaction is which value to keep (or build a
> >> > new value). I think we might be able to simplify the API by replacing
> >> > the "isValid", "isMergeEnabled", and "merge" methods with a single
> >> > method that lets the implementation handle one or all tasks. That
> >> > would also remove the need to deserialize payloads multiple times too.
> >> >
> >> > I also feel like mentioning that after working with the PIP 105 broker
> >> > side filtering, I think we should avoid running UDFs in the broker as
> >> > much as possible. (I do not consider the load balancing logic to be a
> >> > UDF here.) I think it would be worth not making this a user facing
> >> > feature unless there is demand for real use cases.
> >> >
> >> > Thanks!
> >> > Michael
> >> >
> >> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org> wrote:
> >> > >
> >> > > +1(non-binding)
> >> > >
> >> > > thanks,
> >> > > bo
> >> > >
> >> > > Heesung Sohn <he...@streamnative.io.invalid> 于2022年10月19日周三
> >> 07:54写道:
> >> > > >
> >> > > > Hi pulsar-dev community,
> >> > > >
> >> > > > I raised a pip to discuss : PIP-215: Configurable Topic Compaction
> >> Strategy
> >> > > >
> >> > > > PIP link: https://github.com/apache/pulsar/issues/18099
> >> > > >
> >> > > > Regards,
> >> > > > Heesung
> >>
> >

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Hi,

The vote is open in this email thread.

https://lists.apache.org/thread/6y8407pw4fv21n2n0cbrvsspg5tok0h7

Regards,
Heesung

On Fri, Nov 4, 2022 at 3:07 PM Heesung Sohn <he...@streamnative.io>
wrote:

> Hi,
>
> > I would like to understand what the performance tradeoffs are for the
> changes that you (as an individual) and others are proposing.
> The problem that this PIP is trying to solve is how to support custom
> topic compaction logic. (currently, Pulsar only takes the latest message in
> the topic compaction and table-view(consumers))
>
> As Michael and I discussed in the above emails, in the worst case, when
> there are many conflicting messages, this PIP can incur more repeated
> custom compaction than the alternative as individual consumers need to
> compact messages (topic compaction and table views). However, one of the
> advantages of this proposal is that pub/sub is faster since it uses a
> single topic. For example, in PIP-192, if the "bundle assignment" broadcast
> is fast enough, conflicting bundle assignment requests can be
> reduced(broadcast filter effect).
>
>
> Post-Compaction(this PIP proposes)
>
>    - - Producers publish messages to a single topic.
>    - - All consumers individually run the custom compaction logic when
>    consuming the topic (by table-view).
>    - - Compactor needs to run the custom compaction logic during
>    compaction.
>
>
>
> The alternative that Michael proposed is instead compacting messages at
> the earlier stage by a single writer, using two topics.
> Pre-Compaction(the alternative that Michael proposes)
>
>    - - Producers publish messages to a non-compacted topic first.
>    - - Only the leader consumes this non-compacted topic and runs the
>    custom compaction logic.
>    - - Then, the leader publishes compacted messages to the compacted
>    topic(resolve conflicts by the single writer).
>    - - All consumers consume the compacted topic. (no need to compact the
>    messages separately on the consumer side)
>    - - Compactor does not need to run the custom compaction logic during
>    compaction.
>
>
>
> > It really seems that you are proposing a change to the default behavior
> whether or not a user chooses to use the interface in PIP-192.
> The pip does not change the default behavior of compaction and table-view.
> I updated the goals in the PIP to clarify this.
>
> Thanks,
> Heesung
>
>
> On Fri, Nov 4, 2022 at 11:11 AM Dave Fisher <wa...@apache.org> wrote:
>
>>
>>
>> > On Nov 4, 2022, at 10:28 AM, Heesung Sohn <he...@streamnative.io.INVALID>
>> wrote:
>> >
>> > Hi,
>> >
>> > I think `bool shouldKeepLeft(T prev, T cur)` is clearer. I updated the
>> PIP.
>> >
>> > Hopefully, we provided enough context about this PIP and the design
>> > trade-off as well.
>>
>> I would like to understand what the performance tradeoffs are for the
>> changes that you (as an individual) and others are proposing.
>>
>> > Goal
>> >
>> >       • Create another Topic compactor, StrategicTwoPhaseCompactor,
>> where we can configure a compaction strategy,
>> > TopicCompactionStrategy
>> >
>> >       • Update the TableViewConfigurationData to load and consider the
>> TopicCompactionStrategy when updating the internal key-value map in
>> TableView.
>> >
>> >       • Add TopicCompactionStrategy in Topic-level Policy to run
>> StrategicTwoPhaseCompactor instead of TwoPhaseCompactor when executing
>> compaction.
>>
>> It really seems that you are proposing a change to the default behavior
>> whether or not a user chooses to use the interface in PIP-192.
>>
>> >
>> > I will send out a vote email soon.
>> >
>> > Thank you,
>> > Heesung
>> >
>> >
>> >
>> >
>> >
>> >
>> > On Thu, Nov 3, 2022 at 9:59 PM Michael Marshall <mm...@apache.org>
>> > wrote:
>> >
>> >> Thank you for your detailed responses, Heesung.
>> >>
>> >>> We are not planning to expose this feature to users
>> >>> soon unless demanded and proven to be stable.
>> >>
>> >> In that case, I think we should move forward with this PIP. I have a
>> >> different opinion about the trade offs for the two designs, but none
>> >> of my concerns are problems that could not be solved later if we
>> >> encounter problems.
>> >>
>> >> Just to say it succinctly, my concern is that broadcasting all
>> >> attempts to acquire ownership of every unclaimed bundle to all brokers
>> >> will generate a lot of unnecessary traffic.
>> >>
>> >>>
>> >>
>> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a
>> >>
>> >> Thank you for this reference. I missed it. That is great documentation!
>> >>
>> >>> In fact, with the `compact(T prev, T cur)` api only, it is not clear
>> if
>> >>> prev => cur is a valid transition or not(if invalid, we should filter
>> out
>> >>> the cur message instead of further compacting/merging). I think we
>> still
>> >>> need to keep the `isValid()` and `merge()` separated.
>> >>
>> >> I was thinking that the result of `compact` would be the result put in
>> >> the table view or written to the compacted topic. The one issue might
>> >> be about keeping the memory utilization down for use cases that are
>> >> not updating the message's value but are only selecting "left" or
>> >> "right". I thought we could infer when to keep the message id vs keep
>> >> the message value, but that might be easy to implement.
>> >>
>> >> My final critique is that I think `isValid` could have a better name.
>> >> In the event this does become a public API, I don't think all use
>> >> cases will think about which event should be persisted in terms of
>> >> validity.
>> >>
>> >> The main name that comes to my mind is `bool shouldKeepLeft(T prev, T
>> >> cur)`. When true, prev wins. When false, cur wins. That nomenclature
>> >> comes from Akka Streams. It's not perfect, but it is easy to infer
>> >> what the result will do.
>> >>
>> >>> Regarding redundant deserialization, the input type `T` is the type of
>> >>> message value, so the input values are already deserialized.
>> >>
>> >> Great, I should have realized that. That takes care of that concern.
>> >>
>> >> Thanks,
>> >> Michael
>> >>
>> >> On Thu, Nov 3, 2022 at 7:37 PM Heesung Sohn
>> >> <he...@streamnative.io.invalid> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> I have a different thought about my previous comment.
>> >>>
>> >>> - Agreed with your point that we should merge CompactionStrategy
>> APIs. I
>> >>> updated the interface proposal in the PIP. I replaced `"isValid",
>> >>> "isMergeEnabled", and "merge"` apis with "compact" api.
>> >>>
>> >>> boolean isValid(T prev, T cur)
>> >>> boolean isMergeEnabled()
>> >>> T merge(T prev, T cur)
>> >>>
>> >>> =>
>> >>>
>> >>> T compact(T prev, T cur)
>> >>>
>> >>> In fact, with the `compact(T prev, T cur)` api only, it is not clear
>> if
>> >>> prev => cur is a valid transition or not(if invalid, we should filter
>> out
>> >>> the cur message instead of further compacting/merging). I think we
>> still
>> >>> need to keep the `isValid()` and `merge()` separated.
>> >>>
>> >>> Regarding redundant deserialization, the input type `T` is the type of
>> >>> message value, so the input values are already deserialized. We don't
>> >> want
>> >>> to expose the Message<T> interface in this CompactionStrategy to avoid
>> >>> message serialization/deserialization dependencies in the
>> >>> CompactionStrategy.
>> >>>
>> >>> The `merge()` functionality is suggested for more complex use cases
>> >> (merge
>> >>> values instead of just filtering), and to support this `merge()`, we
>> need
>> >>> to internally create a new msg with the compacted value, metadata, and
>> >>> messageId copies. We could initially define `isValid()` only in
>> >>> CompactionStrategy, and add `isMergeEnabled() and merge()` later in
>> the
>> >>> CompactionStrategy interface if requested.
>> >>>
>> >>> Regards,
>> >>> Heesung
>> >>>
>> >>>
>> >>> On Thu, Nov 3, 2022 at 9:50 AM Heesung Sohn <
>> >> heesung.sohn@streamnative.io>
>> >>> wrote:
>> >>>
>> >>>> Oops! Michael, I apologize for the typo in your name.
>> >>>>
>> >>>> On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <
>> >> heesung.sohn@streamnative.io>
>> >>>> wrote:
>> >>>>
>> >>>>> Hi Machel,
>> >>>>>
>> >>>>> Here are my additional comments regarding your earlier email.
>> >>>>>
>> >>>>> - I updated the PIP title to show that this will impact table view
>> as
>> >>>>> well.
>> >>>>>
>> >>>>> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the
>> >>>>> general idea of the states and their actions, and I defined the
>> actual
>> >>>>> states here in the PR,
>> >>>>>
>> >>
>> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a
>> .
>> >> I
>> >>>>> will further clarify the bundle state data validation logic when
>> >>>>> introducing `BundleStateCompactionStrategy` class. This PIP is to
>> >> support
>> >>>>> CompactionStrategy in general.
>> >>>>>
>> >>>>> - Agreed with your point that we should merge CompactionStrategy
>> >> APIs. I
>> >>>>> updated the interface proposal in the PIP. I replaced `"isValid",
>> >>>>> "isMergeEnabled", and "merge"` apis with "compact" api.
>> >>>>>
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Heesung
>> >>>>>
>> >>>>>
>> >>>>> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <
>> >>>>> heesung.sohn@streamnative.io> wrote:
>> >>>>>
>> >>>>>> Hi,
>> >>>>>> Thank you for the great comments.
>> >>>>>> Please find my comments inline too.
>> >>>>>>
>> >>>>>> Regards,
>> >>>>>> Heesung
>> >>>>>>
>> >>>>>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <
>> >> mmarshall@apache.org>
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>>>> I think we lose a single linearized view.
>> >>>>>>>
>> >>>>>>> Which linearized view are we losing, and what is the role of that
>> >>>>>>> linearized view? I think I might be missing why it is important. I
>> >>>>>>> agree that consumers won't know about each unsuccessful attempted
>> >>>>>>> acquisition of a bundle, but that seems like unnecessary
>> information
>> >>>>>>> to broadcast to every broker in the cluster.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>> PIP-192 proposes an assignment, transfer, and split
>> >> protocol(multi-phase
>> >>>>>> state changes), relying on early broadcast across brokers, and all
>> >> brokers
>> >>>>>> react to their clients according to the state change notifications
>> --
>> >>>>>> brokers could defer any client lookups for bundle x if an
>> >>>>>> assignment/transfer/split is ongoing for x(broadcasted early in the
>> >> topic).
>> >>>>>> One early broadcast example is the one that I discussed above,
>> `When
>> >> the
>> >>>>>> topic broadcast is faster than the concurrent assignment requests.`
>> >> I think
>> >>>>>> the prefilter could delay this early broadcast, as it needs to go
>> >> through
>> >>>>>> the additional single-leader compaction path.
>> >>>>>>
>> >>>>>> The bundle state recovery process is simpler by a single linearized
>> >> view.
>> >>>>>>
>> >>>>>> The single linearized view can be easier to debug bundle states. We
>> >> can
>> >>>>>> more easily track where the assignment requests come from and how
>> it
>> >> is
>> >>>>>> compacted in a single linearized view.
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>>> I think the leader requires a write-through cache to compact
>> >> messages
>> >>>>>>> based
>> >>>>>>>> on the latest states.
>> >>>>>>>
>> >>>>>> This brings up an important point that I would like to clarify. If
>> we
>> >>>>>>> trust the write ahead log as the source of truth, what happens
>> when
>> >> a
>> >>>>>>> bundle has been validly owned by multiple brokers? As a broker
>> >> starts
>> >>>>>>> and consumes from the compacted topic, how do we prevent it from
>> >>>>>>> incorrectly thinking that it owns a bundle for some short time
>> >> period
>> >>>>>>> in the case that the ownership topic hasn't yet been compacted to
>> >>>>>>> remove old ownership state?
>> >>>>>>>
>> >>>>>>
>> >>>>>> Since the multi-phase transfer protocol involves the source and
>> >>>>>> destination broker's actions, the successful transfer should get
>> the
>> >> source
>> >>>>>> and destination broker to have the (near) latest state. For
>> example,
>> >> if
>> >>>>>> some brokers have old ownership states(network partitioned or
>> >> delayed),
>> >>>>>> they will redirect clients to the source(old) broker. However, by
>> the
>> >>>>>> transfer protocol, the source broker should have the latest state,
>> >> so it
>> >>>>>> can redirect the client again to the destination broker.
>> >>>>>>
>> >>>>>> When a broker restarts, it won't start until its BSC state to the
>> >> (near)
>> >>>>>> latest (til the last known messageId at that time).
>> >>>>>>
>> >>>>>>
>> >>>>>>>> Pulsar guarantees "a single writer".
>> >>>>>>>
>> >>>>>>> I didn't think we were using a single writer in the PIP 192
>> design.
>> >> I
>> >>>>>>> thought we had many producers sending events to a compacted topic.
>> >> My
>> >>>>>>> proposal would still have many producers, but the writer to
>> >> bookkeeper
>> >>>>>>> would act as the single writer. It would technically be distinct
>> >> from
>> >>>>>>> a normal Pulsar topic producer.
>> >>>>>>>
>> >>>>>>> I should highlight that I am only proposing "broker filtering
>> before
>> >>>>>>> write" in the context of PIP 192 and as an alternative to adding
>> >>>>>>> pluggable compaction strategies. It would not be a generic
>> feature.
>> >>>>>>>
>> >>>>>>>
>> >>>>>> I was worried about the worst case where two producers(leaders)
>> >> happen
>> >>>>>> to write the compacted topic (although Pulsar can guarantee "a
>> single
>> >>>>>> writer" or "a single producer" for a topic in normal situations).
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>>> Could we clarify how to handle
>> >>>>>>>> the following(edge cases and failure recovery)?
>> >>>>>>>> 0. Is the un-compacted topic a persistent topic or a
>> >> non-persistent
>> >>>>>>> topic?
>> >>>>>>>
>> >>>>>>> It is a persistent topic.
>> >>>>>>>
>> >>>>>>>> 1. How does the leader recover state from the two topics?
>> >>>>>>>
>> >>>>>>> A leader would recover state by first consuming the whole
>> compacted
>> >>>>>>> topic and then by consuming from the current location of a cursor
>> on
>> >>>>>>> the first input topic. As stated elsewhere, this introduces
>> latency
>> >>>>>>> and could be an issue.
>> >>>>>>>
>> >>>>>>>> 2. How do we handle the case when the leader fails before writing
>> >>>>>>> messages
>> >>>>>>>> to the compacted topic
>> >>>>>>>
>> >>>>>>> The leader would not acknowledge the message on the input topic
>> >> until
>> >>>>>>> it has successfully persisted the event on the compacted topic.
>> >>>>>>> Publishing the same event to a compacted topic multiple times is
>> >>>>>>> idempotent, so there is no risk of lost state. The real risk is
>> >>>>>>> latency. However, I think we might have similar (though not the
>> >> same)
>> >>>>>>> latency risks in the current solution.
>> >>>>>>>
>> >>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the
>> >> number
>> >>>>>>> of
>> >>>>>>>> messages to broadcast at the expense of the leader broker
>> >> compaction.
>> >>>>>>>
>> >>>>>>> My primary point is that with this PIP's design, the filter logic
>> is
>> >>>>>>> run on every broker and again during topic compaction. With the
>> >>>>>>> alternative design, the filter is run once.
>> >>>>>>>
>> >>>>>>> Thank you for the clarification.
>> >>>>>>
>> >>>>>> I think the difference is that the post-filter is an optimistic
>> >> approach
>> >>>>>> as it optimistically relies on the "broadcast-filter"
>> effect(brokers
>> >> will
>> >>>>>> defer client lookups if notified ahead that any assignment is
>> >> ongoing for
>> >>>>>> bundle x). Yes, in the worst case, if the broadcast is slower, each
>> >> broker
>> >>>>>> needs to individually compact the conflicting assignment requests.
>> >>>>>>
>> >>>>>> Conversely, one downside of the pessimistic approach (single leader
>> >>>>>> pre-filter) is that when there are not many conflict concurrent
>> >> assignment
>> >>>>>> requests(assign for bundle a, assign for bundle b, assign for
>> bundle
>> >> c...),
>> >>>>>> the requests need to redundantly go through the leader compaction.
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>>> 3. initially less complex to implement (leaderless conflict
>> >>>>>>> resolution and
>> >>>>>>>> requires a single topic)
>> >>>>>>>
>> >>>>>>> PIP 215 has its own complexity too. Coordinating filters
>> >>>>>>> on both the client (table view) and the server (compaction) is non
>> >>>>>>> trivial. The proposed API includes hard coded client configuration
>> >> for
>> >>>>>>> each component, which will make upgrading the version of the
>> >>>>>>> compaction strategy complicated, and could lead to incorrect
>> >>>>>>> interpretation of events in the stream. When a single broker is
>> >> doing
>> >>>>>>> the filtering, versioning is no longer a distributed problem. That
>> >>>>>>> being said, I do not mean to suggest my solution is without
>> >>>>>>> complexity.
>> >>>>>>>
>> >>>>>>>> 4. it is not a "one-way door" decision (we could add the
>> >> pre-filter
>> >>>>>>> logic
>> >>>>>>>> as well later)
>> >>>>>>>
>> >>>>>>> It's fair to say that we could add it later, but at that point, we
>> >>>>>>> will have added this new API for compaction strategy. Are we
>> >> confident
>> >>>>>>> that pluggable compaction is independently an important addition
>> to
>> >>>>>>> Pulsar's
>> >>>>>>> features, or would it make sense to make this API only exposed in
>> >> the
>> >>>>>>> broker?
>> >>>>>>>
>> >>>>>>>
>> >>>>>> The intention is that this compaction feature could be useful for
>> >>>>>> complex user applications (if they are trying to do a similar
>> >> thing). As I
>> >>>>>> mentioned, this feature is closely tied to the PIP-192 now. We are
>> >> not
>> >>>>>> planning to expose this feature to users soon unless demanded and
>> >> proven to
>> >>>>>> be stable.
>> >>>>>>
>> >>>>>>
>> >>>>>>> Thanks,
>> >>>>>>> Michael
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
>> >>>>>>> <he...@streamnative.io.invalid> wrote:
>> >>>>>>>>
>> >>>>>>>> Hi,
>> >>>>>>>>
>> >>>>>>>> Also, I thought about some concurrent assignment scenarios
>> between
>> >>>>>>>> pre-filter vs post-filter.
>> >>>>>>>>
>> >>>>>>>> Example 1: When the topic broadcast is slower than the concurrent
>> >>>>>>>> assignment requests
>> >>>>>>>>
>> >>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics)
>> >>>>>>>> t1: A -> non-compacted topic // broker A published a message to
>> >> the
>> >>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A}
>> >>>>>>>> t2: B -> non-compacted topic // broker B published a message, m2:
>> >>>>>>> {broker B
>> >>>>>>>> assigned bundle x to broker C}
>> >>>>>>>> t3: C -> non-compacted topic // broker C published a message, m3:
>> >>>>>>> {broker C
>> >>>>>>>> assigned bundle x to broker B}
>> >>>>>>>> t4: non-compacted topic -> L // leader broker consumed the
>> >> messages:
>> >>>>>>> m1,m2,
>> >>>>>>>> and m3
>> >>>>>>>> t5: L -> compacted topic // leader compacted the messages and
>> >>>>>>> broadcasted
>> >>>>>>>> m1 to all consumers
>> >>>>>>>> t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>> >>>>>>>>
>> >>>>>>>> With post-filter + a single topic
>> >>>>>>>> t1: A -> topic // broker A published a message to the
>> >> non-compacted
>> >>>>>>> topic,
>> >>>>>>>> m1: {broker A assigned bundle x to broker A}
>> >>>>>>>> t2: B -> topic // broker B published a message, m2: {broker B
>> >> assigned
>> >>>>>>>> bundle x to broker C}
>> >>>>>>>> t3: C -> topic // broker C published a message, m3: {broker C
>> >> assigned
>> >>>>>>>> bundle x to broker B}
>> >>>>>>>> t4: topic -> [A,B,C] // broker A,B,C consumed the messages:
>> m1,m2,
>> >>>>>>> and m3
>> >>>>>>>> t5: [A,B,C] -> m1 // broker A,B,C individually compacted the
>> >> messages
>> >>>>>>> to m1.
>> >>>>>>>>
>> >>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the
>> >> number
>> >>>>>>> of
>> >>>>>>>> messages to broadcast at the expense of the leader broker
>> >> compaction.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Example 2: When the topic broadcast is faster than the concurrent
>> >>>>>>>> assignment requests
>> >>>>>>>>
>> >>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics)
>> >>>>>>>> t1: A -> non-compacted topic // broker A published a message to
>> >> the
>> >>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A}
>> >>>>>>>> t2: non-compacted topic -> L // leader broker consumed the
>> >> messages:
>> >>>>>>> m1
>> >>>>>>>> t3: L -> compacted topic // leader compacted the message and
>> >>>>>>> broadcasted m1
>> >>>>>>>> to all consumers
>> >>>>>>>> t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>> >>>>>>>> t5: A-> own bundle // broker A knows that its assignment has been
>> >>>>>>> accepted,
>> >>>>>>>> so proceeding to own the bundle (meanwhile deferring lookup
>> >> requests)
>> >>>>>>>> t6: B -> defer client lookups // broker B knows that bundle
>> >>>>>>> assignment is
>> >>>>>>>> running(meanwhile deferring lookup requests)
>> >>>>>>>> t7: C -> defer client lookups // broker C knows that bundle
>> >>>>>>> assignment is
>> >>>>>>>> running(meanwhile deferring lookup requests)
>> >>>>>>>>
>> >>>>>>>> With post-filter + a single topic
>> >>>>>>>> t1: A -> topic // broker A published a message to the
>> >> non-compacted
>> >>>>>>> topic,
>> >>>>>>>> m1: {broker A assigned bundle x to broker A}
>> >>>>>>>> t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
>> >>>>>>>> t3:  A-> own bundle // broker A knows that its assignment has
>> been
>> >>>>>>>> accepted, so proceeding to own the bundle (meanwhile deferring
>> >> lookup
>> >>>>>>>> requests)
>> >>>>>>>> t4: B -> defer client lookups // broker B knows that bundle
>> >>>>>>> assignment is
>> >>>>>>>> running(meanwhile deferring lookup requests)
>> >>>>>>>> t5: C -> defer client lookups // broker C knows that bundle
>> >>>>>>> assignment is
>> >>>>>>>> running(meanwhile deferring lookup requests)
>> >>>>>>>>
>> >>>>>>>> Analysis: The "post-filter + a single topic" can perform ok in
>> >> this
>> >>>>>>> case
>> >>>>>>>> without the additional leader coordination and the secondary
>> topic
>> >>>>>>> because
>> >>>>>>>> the early broadcast can inform all brokers and prevent them from
>> >>>>>>> requesting
>> >>>>>>>> other assignments for the same bundle.
>> >>>>>>>>
>> >>>>>>>> I think the post-filter option is initially not bad because:
>> >>>>>>>>
>> >>>>>>>> 1. it is safe in the worst case (in case the messages are not
>> >>>>>>> correctly
>> >>>>>>>> pre-filtered at the leader)
>> >>>>>>>> 2. it performs ok because the early broadcast can prevent
>> >>>>>>>> concurrent assignment requests.
>> >>>>>>>> 3. initially less complex to implement (leaderless conflict
>> >>>>>>> resolution and
>> >>>>>>>> requires a single topic)
>> >>>>>>>> 4. it is not a "one-way door" decision (we could add the
>> >> pre-filter
>> >>>>>>> logic
>> >>>>>>>> as well later)
>> >>>>>>>>
>> >>>>>>>> Regards,
>> >>>>>>>> Heesung
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
>> >>>>>>> heesung.sohn@streamnative.io>
>> >>>>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>>> Hi Michael,
>> >>>>>>>>>
>> >>>>>>>>> For the pre-prefilter(pre-compaction) option,
>> >>>>>>>>> I think the leader requires a write-through cache to compact
>> >>>>>>> messages
>> >>>>>>>>> based on the latest states. Otherwise, the leader needs to wait
>> >> for
>> >>>>>>> the
>> >>>>>>>>> last msg from the (compacted) topic before compacting the next
>> >> msg
>> >>>>>>> for the
>> >>>>>>>>> same bundle.
>> >>>>>>>>>
>> >>>>>>>>> Pulsar guarantees "a single writer". However, for the worst-case
>> >>>>>>>>> scenario(due to network partitions, bugs in zk or etcd leader
>> >>>>>>> election,
>> >>>>>>>>> bugs in bk, data corruption ), I think it is safe to place the
>> >>>>>>> post-filter
>> >>>>>>>>> on the consumer side(compaction and table views) as well in
>> >> order to
>> >>>>>>>>> validate the state changes.
>> >>>>>>>>>
>> >>>>>>>>> For the two-topic approach,
>> >>>>>>>>> I think we lose a single linearized view. Could we clarify how
>> >> to
>> >>>>>>> handle
>> >>>>>>>>> the following(edge cases and failure recovery)?
>> >>>>>>>>> 0. Is the un-compacted topic a persistent topic or a
>> >> non-persistent
>> >>>>>>> topic?
>> >>>>>>>>> 1. How does the leader recover state from the two topics?
>> >>>>>>>>> 2. How do we handle the case when the leader fails before
>> >> writing
>> >>>>>>> messages
>> >>>>>>>>> to the compacted topic
>> >>>>>>>>>
>> >>>>>>>>> Regards,
>> >>>>>>>>> Heesung
>> >>>>>>>>>
>> >>>>>>>>> On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
>> >>>>>>> mmarshall@apache.org>
>> >>>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> Sharing some more thoughts. We could alternatively use two
>> >> topics
>> >>>>>>>>>> instead of one. In this design, the first topic is the
>> >> unfiltered
>> >>>>>>>>>> write ahead log that represents many writers (brokers) trying
>> >> to
>> >>>>>>>>>> acquire ownership of bundles. The second topic is the
>> >> distilled log
>> >>>>>>>>>> that represents the "winners" or the "owners" of the bundles.
>> >>>>>>> There is
>> >>>>>>>>>> a single writer, the leader broker, that reads from the input
>> >> topic
>> >>>>>>>>>> and writes to the output topic. The first topic is normal and
>> >> the
>> >>>>>>>>>> second is compacted.
>> >>>>>>>>>>
>> >>>>>>>>>> The primary benefit in a two topic solution is that it is easy
>> >> for
>> >>>>>>> the
>> >>>>>>>>>> leader broker to trade off ownership without needing to slow
>> >> down
>> >>>>>>>>>> writes to the input topic. The leader broker will start
>> >> consuming
>> >>>>>>> from
>> >>>>>>>>>> the input topic when it has fully consumed the table view on
>> >> the
>> >>>>>>>>>> output topic. In general, I don't think consumers know when
>> >> they
>> >>>>>>> have
>> >>>>>>>>>> "reached the end of a table view", but we should be able to
>> >>>>>>> trivially
>> >>>>>>>>>> figure this out if we are the topic's only writer and the
>> >> topic and
>> >>>>>>>>>> writer are collocated on the same broker.
>> >>>>>>>>>>
>> >>>>>>>>>> In that design, it might make sense to use something like the
>> >>>>>>>>>> replication cursor to keep track of this consumer's state.
>> >>>>>>>>>>
>> >>>>>>>>>> - Michael
>> >>>>>>>>>>
>> >>>>>>>>>> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
>> >>>>>>> mmarshall@apache.org>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>> Thanks for your proposal, Heesung.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Fundamentally, we have the problems listed in this PIP
>> >> because
>> >>>>>>> we have
>> >>>>>>>>>>> multiple writers instead of just one writer. Can we solve
>> >> this
>> >>>>>>> problem
>> >>>>>>>>>>> by changing our write pattern? What if we use the leader
>> >> broker
>> >>>>>>> as the
>> >>>>>>>>>>> single writer? That broker would intercept attempts to
>> >> acquire
>> >>>>>>>>>>> ownership on bundles and would grant ownership to the first
>> >>>>>>> broker to
>> >>>>>>>>>>> claim an unassigned bundle. It could "grant ownership" by
>> >>>>>>> letting the
>> >>>>>>>>>>> first write to claim an unassigned bundle get written to the
>> >>>>>>> ownership
>> >>>>>>>>>>> topic. When a bundle is already owned, the leader won't
>> >> persist
>> >>>>>>> that
>> >>>>>>>>>>> event to the bookkeeper. In this design, the log becomes a
>> >> true
>> >>>>>>>>>>> ownership log, which will correctly work with the existing
>> >> topic
>> >>>>>>>>>>> compaction and table view solutions. My proposal essentially
>> >>>>>>> moves the
>> >>>>>>>>>>> conflict resolution to just before the write, and as a
>> >>>>>>> consequence, it
>> >>>>>>>>>>> greatly reduces the need for post processing of the event
>> >> log.
>> >>>>>>> One
>> >>>>>>>>>>> trade off might be that the leader broker could slow down the
>> >>>>>>> write
>> >>>>>>>>>>> path, but given that the leader would just need to verify the
>> >>>>>>> current
>> >>>>>>>>>>> state of the bundle, I think it'd be performant enough.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Additionally, we'd need the leader broker to be "caught up"
>> >> on
>> >>>>>>> bundle
>> >>>>>>>>>>> ownership in order to grant ownership of topics, but unless
>> >> I am
>> >>>>>>>>>>> mistaken, that is already a requirement of the current PIP
>> >> 192
>> >>>>>>>>>>> paradigm.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Below are some additional thoughts that will be relevant if
>> >> we
>> >>>>>>> move
>> >>>>>>>>>>> forward with the design as it is currently proposed.
>> >>>>>>>>>>>
>> >>>>>>>>>>> I think it might be helpful to update the title to show that
>> >> this
>> >>>>>>>>>>> proposal will also affect table view as well. I didn't catch
>> >>>>>>> that at
>> >>>>>>>>>>> first.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Do you have any documentation describing how the
>> >>>>>>>>>>> TopicCompactionStrategy will determine which states are
>> >> valid in
>> >>>>>>> the
>> >>>>>>>>>>> context of load balancing? I looked at
>> >>>>>>>>>>> https://github.com/apache/pulsar/pull/18195, but I couldn't
>> >>>>>>> seem to
>> >>>>>>>>>>> find anything for it. That would help make this proposal less
>> >>>>>>>>>>> abstract.
>> >>>>>>>>>>>
>> >>>>>>>>>>> The proposed API seems very tied to the needs of PIP 192. For
>> >>>>>>> example,
>> >>>>>>>>>>> `isValid` is not a term I associate with topic compaction.
>> >> The
>> >>>>>>>>>>> fundamental question for compaction is which value to keep
>> >> (or
>> >>>>>>> build a
>> >>>>>>>>>>> new value). I think we might be able to simplify the API by
>> >>>>>>> replacing
>> >>>>>>>>>>> the "isValid", "isMergeEnabled", and "merge" methods with a
>> >>>>>>> single
>> >>>>>>>>>>> method that lets the implementation handle one or all tasks.
>> >> That
>> >>>>>>>>>>> would also remove the need to deserialize payloads multiple
>> >>>>>>> times too.
>> >>>>>>>>>>>
>> >>>>>>>>>>> I also feel like mentioning that after working with the PIP
>> >> 105
>> >>>>>>> broker
>> >>>>>>>>>>> side filtering, I think we should avoid running UDFs in the
>> >>>>>>> broker as
>> >>>>>>>>>>> much as possible. (I do not consider the load balancing
>> >> logic to
>> >>>>>>> be a
>> >>>>>>>>>>> UDF here.) I think it would be worth not making this a user
>> >>>>>>> facing
>> >>>>>>>>>>> feature unless there is demand for real use cases.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Thanks!
>> >>>>>>>>>>> Michael
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org>
>> >> wrote:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> +1(non-binding)
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> thanks,
>> >>>>>>>>>>>> bo
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Heesung Sohn <he...@streamnative.io.invalid>
>> >>>>>>> 于2022年10月19日周三
>> >>>>>>>>>> 07:54写道:
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Hi pulsar-dev community,
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> I raised a pip to discuss : PIP-215: Configurable Topic
>> >>>>>>> Compaction
>> >>>>>>>>>> Strategy
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> PIP link: https://github.com/apache/pulsar/issues/18099
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Regards,
>> >>>>>>>>>>>>> Heesung
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>
>>
>>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Hi,

> I would like to understand what the performance tradeoffs are for the
changes that you (as an individual) and others are proposing.
The problem that this PIP is trying to solve is how to support custom topic
compaction logic. (currently, Pulsar only takes the latest message in the
topic compaction and table-view(consumers))

As Michael and I discussed in the above emails, in the worst case, when
there are many conflicting messages, this PIP can incur more repeated
custom compaction than the alternative as individual consumers need to
compact messages (topic compaction and table views). However, one of the
advantages of this proposal is that pub/sub is faster since it uses a
single topic. For example, in PIP-192, if the "bundle assignment" broadcast
is fast enough, conflicting bundle assignment requests can be
reduced(broadcast filter effect).


Post-Compaction(this PIP proposes)

   - - Producers publish messages to a single topic.
   - - All consumers individually run the custom compaction logic when
   consuming the topic (by table-view).
   - - Compactor needs to run the custom compaction logic during compaction.



The alternative that Michael proposed is instead compacting messages at the
earlier stage by a single writer, using two topics.
Pre-Compaction(the alternative that Michael proposes)

   - - Producers publish messages to a non-compacted topic first.
   - - Only the leader consumes this non-compacted topic and runs the
   custom compaction logic.
   - - Then, the leader publishes compacted messages to the compacted
   topic(resolve conflicts by the single writer).
   - - All consumers consume the compacted topic. (no need to compact the
   messages separately on the consumer side)
   - - Compactor does not need to run the custom compaction logic during
   compaction.



> It really seems that you are proposing a change to the default behavior
whether or not a user chooses to use the interface in PIP-192.
The pip does not change the default behavior of compaction and table-view.
I updated the goals in the PIP to clarify this.

Thanks,
Heesung


On Fri, Nov 4, 2022 at 11:11 AM Dave Fisher <wa...@apache.org> wrote:

>
>
> > On Nov 4, 2022, at 10:28 AM, Heesung Sohn <he...@streamnative.io.INVALID>
> wrote:
> >
> > Hi,
> >
> > I think `bool shouldKeepLeft(T prev, T cur)` is clearer. I updated the
> PIP.
> >
> > Hopefully, we provided enough context about this PIP and the design
> > trade-off as well.
>
> I would like to understand what the performance tradeoffs are for the
> changes that you (as an individual) and others are proposing.
>
> > Goal
> >
> >       • Create another Topic compactor, StrategicTwoPhaseCompactor,
> where we can configure a compaction strategy,
> > TopicCompactionStrategy
> >
> >       • Update the TableViewConfigurationData to load and consider the
> TopicCompactionStrategy when updating the internal key-value map in
> TableView.
> >
> >       • Add TopicCompactionStrategy in Topic-level Policy to run
> StrategicTwoPhaseCompactor instead of TwoPhaseCompactor when executing
> compaction.
>
> It really seems that you are proposing a change to the default behavior
> whether or not a user chooses to use the interface in PIP-192.
>
> >
> > I will send out a vote email soon.
> >
> > Thank you,
> > Heesung
> >
> >
> >
> >
> >
> >
> > On Thu, Nov 3, 2022 at 9:59 PM Michael Marshall <mm...@apache.org>
> > wrote:
> >
> >> Thank you for your detailed responses, Heesung.
> >>
> >>> We are not planning to expose this feature to users
> >>> soon unless demanded and proven to be stable.
> >>
> >> In that case, I think we should move forward with this PIP. I have a
> >> different opinion about the trade offs for the two designs, but none
> >> of my concerns are problems that could not be solved later if we
> >> encounter problems.
> >>
> >> Just to say it succinctly, my concern is that broadcasting all
> >> attempts to acquire ownership of every unclaimed bundle to all brokers
> >> will generate a lot of unnecessary traffic.
> >>
> >>>
> >>
> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a
> >>
> >> Thank you for this reference. I missed it. That is great documentation!
> >>
> >>> In fact, with the `compact(T prev, T cur)` api only, it is not clear if
> >>> prev => cur is a valid transition or not(if invalid, we should filter
> out
> >>> the cur message instead of further compacting/merging). I think we
> still
> >>> need to keep the `isValid()` and `merge()` separated.
> >>
> >> I was thinking that the result of `compact` would be the result put in
> >> the table view or written to the compacted topic. The one issue might
> >> be about keeping the memory utilization down for use cases that are
> >> not updating the message's value but are only selecting "left" or
> >> "right". I thought we could infer when to keep the message id vs keep
> >> the message value, but that might be easy to implement.
> >>
> >> My final critique is that I think `isValid` could have a better name.
> >> In the event this does become a public API, I don't think all use
> >> cases will think about which event should be persisted in terms of
> >> validity.
> >>
> >> The main name that comes to my mind is `bool shouldKeepLeft(T prev, T
> >> cur)`. When true, prev wins. When false, cur wins. That nomenclature
> >> comes from Akka Streams. It's not perfect, but it is easy to infer
> >> what the result will do.
> >>
> >>> Regarding redundant deserialization, the input type `T` is the type of
> >>> message value, so the input values are already deserialized.
> >>
> >> Great, I should have realized that. That takes care of that concern.
> >>
> >> Thanks,
> >> Michael
> >>
> >> On Thu, Nov 3, 2022 at 7:37 PM Heesung Sohn
> >> <he...@streamnative.io.invalid> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I have a different thought about my previous comment.
> >>>
> >>> - Agreed with your point that we should merge CompactionStrategy APIs.
> I
> >>> updated the interface proposal in the PIP. I replaced `"isValid",
> >>> "isMergeEnabled", and "merge"` apis with "compact" api.
> >>>
> >>> boolean isValid(T prev, T cur)
> >>> boolean isMergeEnabled()
> >>> T merge(T prev, T cur)
> >>>
> >>> =>
> >>>
> >>> T compact(T prev, T cur)
> >>>
> >>> In fact, with the `compact(T prev, T cur)` api only, it is not clear if
> >>> prev => cur is a valid transition or not(if invalid, we should filter
> out
> >>> the cur message instead of further compacting/merging). I think we
> still
> >>> need to keep the `isValid()` and `merge()` separated.
> >>>
> >>> Regarding redundant deserialization, the input type `T` is the type of
> >>> message value, so the input values are already deserialized. We don't
> >> want
> >>> to expose the Message<T> interface in this CompactionStrategy to avoid
> >>> message serialization/deserialization dependencies in the
> >>> CompactionStrategy.
> >>>
> >>> The `merge()` functionality is suggested for more complex use cases
> >> (merge
> >>> values instead of just filtering), and to support this `merge()`, we
> need
> >>> to internally create a new msg with the compacted value, metadata, and
> >>> messageId copies. We could initially define `isValid()` only in
> >>> CompactionStrategy, and add `isMergeEnabled() and merge()` later in the
> >>> CompactionStrategy interface if requested.
> >>>
> >>> Regards,
> >>> Heesung
> >>>
> >>>
> >>> On Thu, Nov 3, 2022 at 9:50 AM Heesung Sohn <
> >> heesung.sohn@streamnative.io>
> >>> wrote:
> >>>
> >>>> Oops! Michael, I apologize for the typo in your name.
> >>>>
> >>>> On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <
> >> heesung.sohn@streamnative.io>
> >>>> wrote:
> >>>>
> >>>>> Hi Machel,
> >>>>>
> >>>>> Here are my additional comments regarding your earlier email.
> >>>>>
> >>>>> - I updated the PIP title to show that this will impact table view as
> >>>>> well.
> >>>>>
> >>>>> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the
> >>>>> general idea of the states and their actions, and I defined the
> actual
> >>>>> states here in the PR,
> >>>>>
> >>
> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a
> .
> >> I
> >>>>> will further clarify the bundle state data validation logic when
> >>>>> introducing `BundleStateCompactionStrategy` class. This PIP is to
> >> support
> >>>>> CompactionStrategy in general.
> >>>>>
> >>>>> - Agreed with your point that we should merge CompactionStrategy
> >> APIs. I
> >>>>> updated the interface proposal in the PIP. I replaced `"isValid",
> >>>>> "isMergeEnabled", and "merge"` apis with "compact" api.
> >>>>>
> >>>>>
> >>>>> Thanks,
> >>>>> Heesung
> >>>>>
> >>>>>
> >>>>> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <
> >>>>> heesung.sohn@streamnative.io> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>> Thank you for the great comments.
> >>>>>> Please find my comments inline too.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Heesung
> >>>>>>
> >>>>>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <
> >> mmarshall@apache.org>
> >>>>>> wrote:
> >>>>>>
> >>>>>>>> I think we lose a single linearized view.
> >>>>>>>
> >>>>>>> Which linearized view are we losing, and what is the role of that
> >>>>>>> linearized view? I think I might be missing why it is important. I
> >>>>>>> agree that consumers won't know about each unsuccessful attempted
> >>>>>>> acquisition of a bundle, but that seems like unnecessary
> information
> >>>>>>> to broadcast to every broker in the cluster.
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>> PIP-192 proposes an assignment, transfer, and split
> >> protocol(multi-phase
> >>>>>> state changes), relying on early broadcast across brokers, and all
> >> brokers
> >>>>>> react to their clients according to the state change notifications
> --
> >>>>>> brokers could defer any client lookups for bundle x if an
> >>>>>> assignment/transfer/split is ongoing for x(broadcasted early in the
> >> topic).
> >>>>>> One early broadcast example is the one that I discussed above, `When
> >> the
> >>>>>> topic broadcast is faster than the concurrent assignment requests.`
> >> I think
> >>>>>> the prefilter could delay this early broadcast, as it needs to go
> >> through
> >>>>>> the additional single-leader compaction path.
> >>>>>>
> >>>>>> The bundle state recovery process is simpler by a single linearized
> >> view.
> >>>>>>
> >>>>>> The single linearized view can be easier to debug bundle states. We
> >> can
> >>>>>> more easily track where the assignment requests come from and how it
> >> is
> >>>>>> compacted in a single linearized view.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>>> I think the leader requires a write-through cache to compact
> >> messages
> >>>>>>> based
> >>>>>>>> on the latest states.
> >>>>>>>
> >>>>>> This brings up an important point that I would like to clarify. If
> we
> >>>>>>> trust the write ahead log as the source of truth, what happens when
> >> a
> >>>>>>> bundle has been validly owned by multiple brokers? As a broker
> >> starts
> >>>>>>> and consumes from the compacted topic, how do we prevent it from
> >>>>>>> incorrectly thinking that it owns a bundle for some short time
> >> period
> >>>>>>> in the case that the ownership topic hasn't yet been compacted to
> >>>>>>> remove old ownership state?
> >>>>>>>
> >>>>>>
> >>>>>> Since the multi-phase transfer protocol involves the source and
> >>>>>> destination broker's actions, the successful transfer should get the
> >> source
> >>>>>> and destination broker to have the (near) latest state. For example,
> >> if
> >>>>>> some brokers have old ownership states(network partitioned or
> >> delayed),
> >>>>>> they will redirect clients to the source(old) broker. However, by
> the
> >>>>>> transfer protocol, the source broker should have the latest state,
> >> so it
> >>>>>> can redirect the client again to the destination broker.
> >>>>>>
> >>>>>> When a broker restarts, it won't start until its BSC state to the
> >> (near)
> >>>>>> latest (til the last known messageId at that time).
> >>>>>>
> >>>>>>
> >>>>>>>> Pulsar guarantees "a single writer".
> >>>>>>>
> >>>>>>> I didn't think we were using a single writer in the PIP 192 design.
> >> I
> >>>>>>> thought we had many producers sending events to a compacted topic.
> >> My
> >>>>>>> proposal would still have many producers, but the writer to
> >> bookkeeper
> >>>>>>> would act as the single writer. It would technically be distinct
> >> from
> >>>>>>> a normal Pulsar topic producer.
> >>>>>>>
> >>>>>>> I should highlight that I am only proposing "broker filtering
> before
> >>>>>>> write" in the context of PIP 192 and as an alternative to adding
> >>>>>>> pluggable compaction strategies. It would not be a generic feature.
> >>>>>>>
> >>>>>>>
> >>>>>> I was worried about the worst case where two producers(leaders)
> >> happen
> >>>>>> to write the compacted topic (although Pulsar can guarantee "a
> single
> >>>>>> writer" or "a single producer" for a topic in normal situations).
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>>> Could we clarify how to handle
> >>>>>>>> the following(edge cases and failure recovery)?
> >>>>>>>> 0. Is the un-compacted topic a persistent topic or a
> >> non-persistent
> >>>>>>> topic?
> >>>>>>>
> >>>>>>> It is a persistent topic.
> >>>>>>>
> >>>>>>>> 1. How does the leader recover state from the two topics?
> >>>>>>>
> >>>>>>> A leader would recover state by first consuming the whole compacted
> >>>>>>> topic and then by consuming from the current location of a cursor
> on
> >>>>>>> the first input topic. As stated elsewhere, this introduces latency
> >>>>>>> and could be an issue.
> >>>>>>>
> >>>>>>>> 2. How do we handle the case when the leader fails before writing
> >>>>>>> messages
> >>>>>>>> to the compacted topic
> >>>>>>>
> >>>>>>> The leader would not acknowledge the message on the input topic
> >> until
> >>>>>>> it has successfully persisted the event on the compacted topic.
> >>>>>>> Publishing the same event to a compacted topic multiple times is
> >>>>>>> idempotent, so there is no risk of lost state. The real risk is
> >>>>>>> latency. However, I think we might have similar (though not the
> >> same)
> >>>>>>> latency risks in the current solution.
> >>>>>>>
> >>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the
> >> number
> >>>>>>> of
> >>>>>>>> messages to broadcast at the expense of the leader broker
> >> compaction.
> >>>>>>>
> >>>>>>> My primary point is that with this PIP's design, the filter logic
> is
> >>>>>>> run on every broker and again during topic compaction. With the
> >>>>>>> alternative design, the filter is run once.
> >>>>>>>
> >>>>>>> Thank you for the clarification.
> >>>>>>
> >>>>>> I think the difference is that the post-filter is an optimistic
> >> approach
> >>>>>> as it optimistically relies on the "broadcast-filter" effect(brokers
> >> will
> >>>>>> defer client lookups if notified ahead that any assignment is
> >> ongoing for
> >>>>>> bundle x). Yes, in the worst case, if the broadcast is slower, each
> >> broker
> >>>>>> needs to individually compact the conflicting assignment requests.
> >>>>>>
> >>>>>> Conversely, one downside of the pessimistic approach (single leader
> >>>>>> pre-filter) is that when there are not many conflict concurrent
> >> assignment
> >>>>>> requests(assign for bundle a, assign for bundle b, assign for bundle
> >> c...),
> >>>>>> the requests need to redundantly go through the leader compaction.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>>> 3. initially less complex to implement (leaderless conflict
> >>>>>>> resolution and
> >>>>>>>> requires a single topic)
> >>>>>>>
> >>>>>>> PIP 215 has its own complexity too. Coordinating filters
> >>>>>>> on both the client (table view) and the server (compaction) is non
> >>>>>>> trivial. The proposed API includes hard coded client configuration
> >> for
> >>>>>>> each component, which will make upgrading the version of the
> >>>>>>> compaction strategy complicated, and could lead to incorrect
> >>>>>>> interpretation of events in the stream. When a single broker is
> >> doing
> >>>>>>> the filtering, versioning is no longer a distributed problem. That
> >>>>>>> being said, I do not mean to suggest my solution is without
> >>>>>>> complexity.
> >>>>>>>
> >>>>>>>> 4. it is not a "one-way door" decision (we could add the
> >> pre-filter
> >>>>>>> logic
> >>>>>>>> as well later)
> >>>>>>>
> >>>>>>> It's fair to say that we could add it later, but at that point, we
> >>>>>>> will have added this new API for compaction strategy. Are we
> >> confident
> >>>>>>> that pluggable compaction is independently an important addition to
> >>>>>>> Pulsar's
> >>>>>>> features, or would it make sense to make this API only exposed in
> >> the
> >>>>>>> broker?
> >>>>>>>
> >>>>>>>
> >>>>>> The intention is that this compaction feature could be useful for
> >>>>>> complex user applications (if they are trying to do a similar
> >> thing). As I
> >>>>>> mentioned, this feature is closely tied to the PIP-192 now. We are
> >> not
> >>>>>> planning to expose this feature to users soon unless demanded and
> >> proven to
> >>>>>> be stable.
> >>>>>>
> >>>>>>
> >>>>>>> Thanks,
> >>>>>>> Michael
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
> >>>>>>> <he...@streamnative.io.invalid> wrote:
> >>>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> Also, I thought about some concurrent assignment scenarios between
> >>>>>>>> pre-filter vs post-filter.
> >>>>>>>>
> >>>>>>>> Example 1: When the topic broadcast is slower than the concurrent
> >>>>>>>> assignment requests
> >>>>>>>>
> >>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics)
> >>>>>>>> t1: A -> non-compacted topic // broker A published a message to
> >> the
> >>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A}
> >>>>>>>> t2: B -> non-compacted topic // broker B published a message, m2:
> >>>>>>> {broker B
> >>>>>>>> assigned bundle x to broker C}
> >>>>>>>> t3: C -> non-compacted topic // broker C published a message, m3:
> >>>>>>> {broker C
> >>>>>>>> assigned bundle x to broker B}
> >>>>>>>> t4: non-compacted topic -> L // leader broker consumed the
> >> messages:
> >>>>>>> m1,m2,
> >>>>>>>> and m3
> >>>>>>>> t5: L -> compacted topic // leader compacted the messages and
> >>>>>>> broadcasted
> >>>>>>>> m1 to all consumers
> >>>>>>>> t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
> >>>>>>>>
> >>>>>>>> With post-filter + a single topic
> >>>>>>>> t1: A -> topic // broker A published a message to the
> >> non-compacted
> >>>>>>> topic,
> >>>>>>>> m1: {broker A assigned bundle x to broker A}
> >>>>>>>> t2: B -> topic // broker B published a message, m2: {broker B
> >> assigned
> >>>>>>>> bundle x to broker C}
> >>>>>>>> t3: C -> topic // broker C published a message, m3: {broker C
> >> assigned
> >>>>>>>> bundle x to broker B}
> >>>>>>>> t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2,
> >>>>>>> and m3
> >>>>>>>> t5: [A,B,C] -> m1 // broker A,B,C individually compacted the
> >> messages
> >>>>>>> to m1.
> >>>>>>>>
> >>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the
> >> number
> >>>>>>> of
> >>>>>>>> messages to broadcast at the expense of the leader broker
> >> compaction.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Example 2: When the topic broadcast is faster than the concurrent
> >>>>>>>> assignment requests
> >>>>>>>>
> >>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics)
> >>>>>>>> t1: A -> non-compacted topic // broker A published a message to
> >> the
> >>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A}
> >>>>>>>> t2: non-compacted topic -> L // leader broker consumed the
> >> messages:
> >>>>>>> m1
> >>>>>>>> t3: L -> compacted topic // leader compacted the message and
> >>>>>>> broadcasted m1
> >>>>>>>> to all consumers
> >>>>>>>> t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
> >>>>>>>> t5: A-> own bundle // broker A knows that its assignment has been
> >>>>>>> accepted,
> >>>>>>>> so proceeding to own the bundle (meanwhile deferring lookup
> >> requests)
> >>>>>>>> t6: B -> defer client lookups // broker B knows that bundle
> >>>>>>> assignment is
> >>>>>>>> running(meanwhile deferring lookup requests)
> >>>>>>>> t7: C -> defer client lookups // broker C knows that bundle
> >>>>>>> assignment is
> >>>>>>>> running(meanwhile deferring lookup requests)
> >>>>>>>>
> >>>>>>>> With post-filter + a single topic
> >>>>>>>> t1: A -> topic // broker A published a message to the
> >> non-compacted
> >>>>>>> topic,
> >>>>>>>> m1: {broker A assigned bundle x to broker A}
> >>>>>>>> t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
> >>>>>>>> t3:  A-> own bundle // broker A knows that its assignment has been
> >>>>>>>> accepted, so proceeding to own the bundle (meanwhile deferring
> >> lookup
> >>>>>>>> requests)
> >>>>>>>> t4: B -> defer client lookups // broker B knows that bundle
> >>>>>>> assignment is
> >>>>>>>> running(meanwhile deferring lookup requests)
> >>>>>>>> t5: C -> defer client lookups // broker C knows that bundle
> >>>>>>> assignment is
> >>>>>>>> running(meanwhile deferring lookup requests)
> >>>>>>>>
> >>>>>>>> Analysis: The "post-filter + a single topic" can perform ok in
> >> this
> >>>>>>> case
> >>>>>>>> without the additional leader coordination and the secondary topic
> >>>>>>> because
> >>>>>>>> the early broadcast can inform all brokers and prevent them from
> >>>>>>> requesting
> >>>>>>>> other assignments for the same bundle.
> >>>>>>>>
> >>>>>>>> I think the post-filter option is initially not bad because:
> >>>>>>>>
> >>>>>>>> 1. it is safe in the worst case (in case the messages are not
> >>>>>>> correctly
> >>>>>>>> pre-filtered at the leader)
> >>>>>>>> 2. it performs ok because the early broadcast can prevent
> >>>>>>>> concurrent assignment requests.
> >>>>>>>> 3. initially less complex to implement (leaderless conflict
> >>>>>>> resolution and
> >>>>>>>> requires a single topic)
> >>>>>>>> 4. it is not a "one-way door" decision (we could add the
> >> pre-filter
> >>>>>>> logic
> >>>>>>>> as well later)
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Heesung
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
> >>>>>>> heesung.sohn@streamnative.io>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Michael,
> >>>>>>>>>
> >>>>>>>>> For the pre-prefilter(pre-compaction) option,
> >>>>>>>>> I think the leader requires a write-through cache to compact
> >>>>>>> messages
> >>>>>>>>> based on the latest states. Otherwise, the leader needs to wait
> >> for
> >>>>>>> the
> >>>>>>>>> last msg from the (compacted) topic before compacting the next
> >> msg
> >>>>>>> for the
> >>>>>>>>> same bundle.
> >>>>>>>>>
> >>>>>>>>> Pulsar guarantees "a single writer". However, for the worst-case
> >>>>>>>>> scenario(due to network partitions, bugs in zk or etcd leader
> >>>>>>> election,
> >>>>>>>>> bugs in bk, data corruption ), I think it is safe to place the
> >>>>>>> post-filter
> >>>>>>>>> on the consumer side(compaction and table views) as well in
> >> order to
> >>>>>>>>> validate the state changes.
> >>>>>>>>>
> >>>>>>>>> For the two-topic approach,
> >>>>>>>>> I think we lose a single linearized view. Could we clarify how
> >> to
> >>>>>>> handle
> >>>>>>>>> the following(edge cases and failure recovery)?
> >>>>>>>>> 0. Is the un-compacted topic a persistent topic or a
> >> non-persistent
> >>>>>>> topic?
> >>>>>>>>> 1. How does the leader recover state from the two topics?
> >>>>>>>>> 2. How do we handle the case when the leader fails before
> >> writing
> >>>>>>> messages
> >>>>>>>>> to the compacted topic
> >>>>>>>>>
> >>>>>>>>> Regards,
> >>>>>>>>> Heesung
> >>>>>>>>>
> >>>>>>>>> On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
> >>>>>>> mmarshall@apache.org>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Sharing some more thoughts. We could alternatively use two
> >> topics
> >>>>>>>>>> instead of one. In this design, the first topic is the
> >> unfiltered
> >>>>>>>>>> write ahead log that represents many writers (brokers) trying
> >> to
> >>>>>>>>>> acquire ownership of bundles. The second topic is the
> >> distilled log
> >>>>>>>>>> that represents the "winners" or the "owners" of the bundles.
> >>>>>>> There is
> >>>>>>>>>> a single writer, the leader broker, that reads from the input
> >> topic
> >>>>>>>>>> and writes to the output topic. The first topic is normal and
> >> the
> >>>>>>>>>> second is compacted.
> >>>>>>>>>>
> >>>>>>>>>> The primary benefit in a two topic solution is that it is easy
> >> for
> >>>>>>> the
> >>>>>>>>>> leader broker to trade off ownership without needing to slow
> >> down
> >>>>>>>>>> writes to the input topic. The leader broker will start
> >> consuming
> >>>>>>> from
> >>>>>>>>>> the input topic when it has fully consumed the table view on
> >> the
> >>>>>>>>>> output topic. In general, I don't think consumers know when
> >> they
> >>>>>>> have
> >>>>>>>>>> "reached the end of a table view", but we should be able to
> >>>>>>> trivially
> >>>>>>>>>> figure this out if we are the topic's only writer and the
> >> topic and
> >>>>>>>>>> writer are collocated on the same broker.
> >>>>>>>>>>
> >>>>>>>>>> In that design, it might make sense to use something like the
> >>>>>>>>>> replication cursor to keep track of this consumer's state.
> >>>>>>>>>>
> >>>>>>>>>> - Michael
> >>>>>>>>>>
> >>>>>>>>>> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
> >>>>>>> mmarshall@apache.org>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for your proposal, Heesung.
> >>>>>>>>>>>
> >>>>>>>>>>> Fundamentally, we have the problems listed in this PIP
> >> because
> >>>>>>> we have
> >>>>>>>>>>> multiple writers instead of just one writer. Can we solve
> >> this
> >>>>>>> problem
> >>>>>>>>>>> by changing our write pattern? What if we use the leader
> >> broker
> >>>>>>> as the
> >>>>>>>>>>> single writer? That broker would intercept attempts to
> >> acquire
> >>>>>>>>>>> ownership on bundles and would grant ownership to the first
> >>>>>>> broker to
> >>>>>>>>>>> claim an unassigned bundle. It could "grant ownership" by
> >>>>>>> letting the
> >>>>>>>>>>> first write to claim an unassigned bundle get written to the
> >>>>>>> ownership
> >>>>>>>>>>> topic. When a bundle is already owned, the leader won't
> >> persist
> >>>>>>> that
> >>>>>>>>>>> event to the bookkeeper. In this design, the log becomes a
> >> true
> >>>>>>>>>>> ownership log, which will correctly work with the existing
> >> topic
> >>>>>>>>>>> compaction and table view solutions. My proposal essentially
> >>>>>>> moves the
> >>>>>>>>>>> conflict resolution to just before the write, and as a
> >>>>>>> consequence, it
> >>>>>>>>>>> greatly reduces the need for post processing of the event
> >> log.
> >>>>>>> One
> >>>>>>>>>>> trade off might be that the leader broker could slow down the
> >>>>>>> write
> >>>>>>>>>>> path, but given that the leader would just need to verify the
> >>>>>>> current
> >>>>>>>>>>> state of the bundle, I think it'd be performant enough.
> >>>>>>>>>>>
> >>>>>>>>>>> Additionally, we'd need the leader broker to be "caught up"
> >> on
> >>>>>>> bundle
> >>>>>>>>>>> ownership in order to grant ownership of topics, but unless
> >> I am
> >>>>>>>>>>> mistaken, that is already a requirement of the current PIP
> >> 192
> >>>>>>>>>>> paradigm.
> >>>>>>>>>>>
> >>>>>>>>>>> Below are some additional thoughts that will be relevant if
> >> we
> >>>>>>> move
> >>>>>>>>>>> forward with the design as it is currently proposed.
> >>>>>>>>>>>
> >>>>>>>>>>> I think it might be helpful to update the title to show that
> >> this
> >>>>>>>>>>> proposal will also affect table view as well. I didn't catch
> >>>>>>> that at
> >>>>>>>>>>> first.
> >>>>>>>>>>>
> >>>>>>>>>>> Do you have any documentation describing how the
> >>>>>>>>>>> TopicCompactionStrategy will determine which states are
> >> valid in
> >>>>>>> the
> >>>>>>>>>>> context of load balancing? I looked at
> >>>>>>>>>>> https://github.com/apache/pulsar/pull/18195, but I couldn't
> >>>>>>> seem to
> >>>>>>>>>>> find anything for it. That would help make this proposal less
> >>>>>>>>>>> abstract.
> >>>>>>>>>>>
> >>>>>>>>>>> The proposed API seems very tied to the needs of PIP 192. For
> >>>>>>> example,
> >>>>>>>>>>> `isValid` is not a term I associate with topic compaction.
> >> The
> >>>>>>>>>>> fundamental question for compaction is which value to keep
> >> (or
> >>>>>>> build a
> >>>>>>>>>>> new value). I think we might be able to simplify the API by
> >>>>>>> replacing
> >>>>>>>>>>> the "isValid", "isMergeEnabled", and "merge" methods with a
> >>>>>>> single
> >>>>>>>>>>> method that lets the implementation handle one or all tasks.
> >> That
> >>>>>>>>>>> would also remove the need to deserialize payloads multiple
> >>>>>>> times too.
> >>>>>>>>>>>
> >>>>>>>>>>> I also feel like mentioning that after working with the PIP
> >> 105
> >>>>>>> broker
> >>>>>>>>>>> side filtering, I think we should avoid running UDFs in the
> >>>>>>> broker as
> >>>>>>>>>>> much as possible. (I do not consider the load balancing
> >> logic to
> >>>>>>> be a
> >>>>>>>>>>> UDF here.) I think it would be worth not making this a user
> >>>>>>> facing
> >>>>>>>>>>> feature unless there is demand for real use cases.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks!
> >>>>>>>>>>> Michael
> >>>>>>>>>>>
> >>>>>>>>>>> On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org>
> >> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> +1(non-binding)
> >>>>>>>>>>>>
> >>>>>>>>>>>> thanks,
> >>>>>>>>>>>> bo
> >>>>>>>>>>>>
> >>>>>>>>>>>> Heesung Sohn <he...@streamnative.io.invalid>
> >>>>>>> 于2022年10月19日周三
> >>>>>>>>>> 07:54写道:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi pulsar-dev community,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I raised a pip to discuss : PIP-215: Configurable Topic
> >>>>>>> Compaction
> >>>>>>>>>> Strategy
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> PIP link: https://github.com/apache/pulsar/issues/18099
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>> Heesung
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>
>
>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Dave Fisher <wa...@apache.org>.

> On Nov 4, 2022, at 10:28 AM, Heesung Sohn <he...@streamnative.io.INVALID> wrote:
> 
> Hi,
> 
> I think `bool shouldKeepLeft(T prev, T cur)` is clearer. I updated the PIP.
> 
> Hopefully, we provided enough context about this PIP and the design
> trade-off as well.

I would like to understand what the performance tradeoffs are for the changes that you (as an individual) and others are proposing.

> Goal
> 
> 	• Create another Topic compactor, StrategicTwoPhaseCompactor, where we can configure a compaction strategy,
> TopicCompactionStrategy
> 
> 	• Update the TableViewConfigurationData to load and consider the TopicCompactionStrategy when updating the internal key-value map in TableView.
> 
> 	• Add TopicCompactionStrategy in Topic-level Policy to run StrategicTwoPhaseCompactor instead of TwoPhaseCompactor when executing compaction.

It really seems that you are proposing a change to the default behavior whether or not a user chooses to use the interface in PIP-192.

> 
> I will send out a vote email soon.
> 
> Thank you,
> Heesung
> 
> 
> 
> 
> 
> 
> On Thu, Nov 3, 2022 at 9:59 PM Michael Marshall <mm...@apache.org>
> wrote:
> 
>> Thank you for your detailed responses, Heesung.
>> 
>>> We are not planning to expose this feature to users
>>> soon unless demanded and proven to be stable.
>> 
>> In that case, I think we should move forward with this PIP. I have a
>> different opinion about the trade offs for the two designs, but none
>> of my concerns are problems that could not be solved later if we
>> encounter problems.
>> 
>> Just to say it succinctly, my concern is that broadcasting all
>> attempts to acquire ownership of every unclaimed bundle to all brokers
>> will generate a lot of unnecessary traffic.
>> 
>>> 
>> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a
>> 
>> Thank you for this reference. I missed it. That is great documentation!
>> 
>>> In fact, with the `compact(T prev, T cur)` api only, it is not clear if
>>> prev => cur is a valid transition or not(if invalid, we should filter out
>>> the cur message instead of further compacting/merging). I think we still
>>> need to keep the `isValid()` and `merge()` separated.
>> 
>> I was thinking that the result of `compact` would be the result put in
>> the table view or written to the compacted topic. The one issue might
>> be about keeping the memory utilization down for use cases that are
>> not updating the message's value but are only selecting "left" or
>> "right". I thought we could infer when to keep the message id vs keep
>> the message value, but that might be easy to implement.
>> 
>> My final critique is that I think `isValid` could have a better name.
>> In the event this does become a public API, I don't think all use
>> cases will think about which event should be persisted in terms of
>> validity.
>> 
>> The main name that comes to my mind is `bool shouldKeepLeft(T prev, T
>> cur)`. When true, prev wins. When false, cur wins. That nomenclature
>> comes from Akka Streams. It's not perfect, but it is easy to infer
>> what the result will do.
>> 
>>> Regarding redundant deserialization, the input type `T` is the type of
>>> message value, so the input values are already deserialized.
>> 
>> Great, I should have realized that. That takes care of that concern.
>> 
>> Thanks,
>> Michael
>> 
>> On Thu, Nov 3, 2022 at 7:37 PM Heesung Sohn
>> <he...@streamnative.io.invalid> wrote:
>>> 
>>> Hi,
>>> 
>>> I have a different thought about my previous comment.
>>> 
>>> - Agreed with your point that we should merge CompactionStrategy APIs. I
>>> updated the interface proposal in the PIP. I replaced `"isValid",
>>> "isMergeEnabled", and "merge"` apis with "compact" api.
>>> 
>>> boolean isValid(T prev, T cur)
>>> boolean isMergeEnabled()
>>> T merge(T prev, T cur)
>>> 
>>> =>
>>> 
>>> T compact(T prev, T cur)
>>> 
>>> In fact, with the `compact(T prev, T cur)` api only, it is not clear if
>>> prev => cur is a valid transition or not(if invalid, we should filter out
>>> the cur message instead of further compacting/merging). I think we still
>>> need to keep the `isValid()` and `merge()` separated.
>>> 
>>> Regarding redundant deserialization, the input type `T` is the type of
>>> message value, so the input values are already deserialized. We don't
>> want
>>> to expose the Message<T> interface in this CompactionStrategy to avoid
>>> message serialization/deserialization dependencies in the
>>> CompactionStrategy.
>>> 
>>> The `merge()` functionality is suggested for more complex use cases
>> (merge
>>> values instead of just filtering), and to support this `merge()`, we need
>>> to internally create a new msg with the compacted value, metadata, and
>>> messageId copies. We could initially define `isValid()` only in
>>> CompactionStrategy, and add `isMergeEnabled() and merge()` later in the
>>> CompactionStrategy interface if requested.
>>> 
>>> Regards,
>>> Heesung
>>> 
>>> 
>>> On Thu, Nov 3, 2022 at 9:50 AM Heesung Sohn <
>> heesung.sohn@streamnative.io>
>>> wrote:
>>> 
>>>> Oops! Michael, I apologize for the typo in your name.
>>>> 
>>>> On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <
>> heesung.sohn@streamnative.io>
>>>> wrote:
>>>> 
>>>>> Hi Machel,
>>>>> 
>>>>> Here are my additional comments regarding your earlier email.
>>>>> 
>>>>> - I updated the PIP title to show that this will impact table view as
>>>>> well.
>>>>> 
>>>>> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the
>>>>> general idea of the states and their actions, and I defined the actual
>>>>> states here in the PR,
>>>>> 
>> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a.
>> I
>>>>> will further clarify the bundle state data validation logic when
>>>>> introducing `BundleStateCompactionStrategy` class. This PIP is to
>> support
>>>>> CompactionStrategy in general.
>>>>> 
>>>>> - Agreed with your point that we should merge CompactionStrategy
>> APIs. I
>>>>> updated the interface proposal in the PIP. I replaced `"isValid",
>>>>> "isMergeEnabled", and "merge"` apis with "compact" api.
>>>>> 
>>>>> 
>>>>> Thanks,
>>>>> Heesung
>>>>> 
>>>>> 
>>>>> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <
>>>>> heesung.sohn@streamnative.io> wrote:
>>>>> 
>>>>>> Hi,
>>>>>> Thank you for the great comments.
>>>>>> Please find my comments inline too.
>>>>>> 
>>>>>> Regards,
>>>>>> Heesung
>>>>>> 
>>>>>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <
>> mmarshall@apache.org>
>>>>>> wrote:
>>>>>> 
>>>>>>>> I think we lose a single linearized view.
>>>>>>> 
>>>>>>> Which linearized view are we losing, and what is the role of that
>>>>>>> linearized view? I think I might be missing why it is important. I
>>>>>>> agree that consumers won't know about each unsuccessful attempted
>>>>>>> acquisition of a bundle, but that seems like unnecessary information
>>>>>>> to broadcast to every broker in the cluster.
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> PIP-192 proposes an assignment, transfer, and split
>> protocol(multi-phase
>>>>>> state changes), relying on early broadcast across brokers, and all
>> brokers
>>>>>> react to their clients according to the state change notifications --
>>>>>> brokers could defer any client lookups for bundle x if an
>>>>>> assignment/transfer/split is ongoing for x(broadcasted early in the
>> topic).
>>>>>> One early broadcast example is the one that I discussed above, `When
>> the
>>>>>> topic broadcast is faster than the concurrent assignment requests.`
>> I think
>>>>>> the prefilter could delay this early broadcast, as it needs to go
>> through
>>>>>> the additional single-leader compaction path.
>>>>>> 
>>>>>> The bundle state recovery process is simpler by a single linearized
>> view.
>>>>>> 
>>>>>> The single linearized view can be easier to debug bundle states. We
>> can
>>>>>> more easily track where the assignment requests come from and how it
>> is
>>>>>> compacted in a single linearized view.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>>> I think the leader requires a write-through cache to compact
>> messages
>>>>>>> based
>>>>>>>> on the latest states.
>>>>>>> 
>>>>>> This brings up an important point that I would like to clarify. If we
>>>>>>> trust the write ahead log as the source of truth, what happens when
>> a
>>>>>>> bundle has been validly owned by multiple brokers? As a broker
>> starts
>>>>>>> and consumes from the compacted topic, how do we prevent it from
>>>>>>> incorrectly thinking that it owns a bundle for some short time
>> period
>>>>>>> in the case that the ownership topic hasn't yet been compacted to
>>>>>>> remove old ownership state?
>>>>>>> 
>>>>>> 
>>>>>> Since the multi-phase transfer protocol involves the source and
>>>>>> destination broker's actions, the successful transfer should get the
>> source
>>>>>> and destination broker to have the (near) latest state. For example,
>> if
>>>>>> some brokers have old ownership states(network partitioned or
>> delayed),
>>>>>> they will redirect clients to the source(old) broker. However, by the
>>>>>> transfer protocol, the source broker should have the latest state,
>> so it
>>>>>> can redirect the client again to the destination broker.
>>>>>> 
>>>>>> When a broker restarts, it won't start until its BSC state to the
>> (near)
>>>>>> latest (til the last known messageId at that time).
>>>>>> 
>>>>>> 
>>>>>>>> Pulsar guarantees "a single writer".
>>>>>>> 
>>>>>>> I didn't think we were using a single writer in the PIP 192 design.
>> I
>>>>>>> thought we had many producers sending events to a compacted topic.
>> My
>>>>>>> proposal would still have many producers, but the writer to
>> bookkeeper
>>>>>>> would act as the single writer. It would technically be distinct
>> from
>>>>>>> a normal Pulsar topic producer.
>>>>>>> 
>>>>>>> I should highlight that I am only proposing "broker filtering before
>>>>>>> write" in the context of PIP 192 and as an alternative to adding
>>>>>>> pluggable compaction strategies. It would not be a generic feature.
>>>>>>> 
>>>>>>> 
>>>>>> I was worried about the worst case where two producers(leaders)
>> happen
>>>>>> to write the compacted topic (although Pulsar can guarantee "a single
>>>>>> writer" or "a single producer" for a topic in normal situations).
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>>> Could we clarify how to handle
>>>>>>>> the following(edge cases and failure recovery)?
>>>>>>>> 0. Is the un-compacted topic a persistent topic or a
>> non-persistent
>>>>>>> topic?
>>>>>>> 
>>>>>>> It is a persistent topic.
>>>>>>> 
>>>>>>>> 1. How does the leader recover state from the two topics?
>>>>>>> 
>>>>>>> A leader would recover state by first consuming the whole compacted
>>>>>>> topic and then by consuming from the current location of a cursor on
>>>>>>> the first input topic. As stated elsewhere, this introduces latency
>>>>>>> and could be an issue.
>>>>>>> 
>>>>>>>> 2. How do we handle the case when the leader fails before writing
>>>>>>> messages
>>>>>>>> to the compacted topic
>>>>>>> 
>>>>>>> The leader would not acknowledge the message on the input topic
>> until
>>>>>>> it has successfully persisted the event on the compacted topic.
>>>>>>> Publishing the same event to a compacted topic multiple times is
>>>>>>> idempotent, so there is no risk of lost state. The real risk is
>>>>>>> latency. However, I think we might have similar (though not the
>> same)
>>>>>>> latency risks in the current solution.
>>>>>>> 
>>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the
>> number
>>>>>>> of
>>>>>>>> messages to broadcast at the expense of the leader broker
>> compaction.
>>>>>>> 
>>>>>>> My primary point is that with this PIP's design, the filter logic is
>>>>>>> run on every broker and again during topic compaction. With the
>>>>>>> alternative design, the filter is run once.
>>>>>>> 
>>>>>>> Thank you for the clarification.
>>>>>> 
>>>>>> I think the difference is that the post-filter is an optimistic
>> approach
>>>>>> as it optimistically relies on the "broadcast-filter" effect(brokers
>> will
>>>>>> defer client lookups if notified ahead that any assignment is
>> ongoing for
>>>>>> bundle x). Yes, in the worst case, if the broadcast is slower, each
>> broker
>>>>>> needs to individually compact the conflicting assignment requests.
>>>>>> 
>>>>>> Conversely, one downside of the pessimistic approach (single leader
>>>>>> pre-filter) is that when there are not many conflict concurrent
>> assignment
>>>>>> requests(assign for bundle a, assign for bundle b, assign for bundle
>> c...),
>>>>>> the requests need to redundantly go through the leader compaction.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>>> 3. initially less complex to implement (leaderless conflict
>>>>>>> resolution and
>>>>>>>> requires a single topic)
>>>>>>> 
>>>>>>> PIP 215 has its own complexity too. Coordinating filters
>>>>>>> on both the client (table view) and the server (compaction) is non
>>>>>>> trivial. The proposed API includes hard coded client configuration
>> for
>>>>>>> each component, which will make upgrading the version of the
>>>>>>> compaction strategy complicated, and could lead to incorrect
>>>>>>> interpretation of events in the stream. When a single broker is
>> doing
>>>>>>> the filtering, versioning is no longer a distributed problem. That
>>>>>>> being said, I do not mean to suggest my solution is without
>>>>>>> complexity.
>>>>>>> 
>>>>>>>> 4. it is not a "one-way door" decision (we could add the
>> pre-filter
>>>>>>> logic
>>>>>>>> as well later)
>>>>>>> 
>>>>>>> It's fair to say that we could add it later, but at that point, we
>>>>>>> will have added this new API for compaction strategy. Are we
>> confident
>>>>>>> that pluggable compaction is independently an important addition to
>>>>>>> Pulsar's
>>>>>>> features, or would it make sense to make this API only exposed in
>> the
>>>>>>> broker?
>>>>>>> 
>>>>>>> 
>>>>>> The intention is that this compaction feature could be useful for
>>>>>> complex user applications (if they are trying to do a similar
>> thing). As I
>>>>>> mentioned, this feature is closely tied to the PIP-192 now. We are
>> not
>>>>>> planning to expose this feature to users soon unless demanded and
>> proven to
>>>>>> be stable.
>>>>>> 
>>>>>> 
>>>>>>> Thanks,
>>>>>>> Michael
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
>>>>>>> <he...@streamnative.io.invalid> wrote:
>>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> 
>>>>>>>> Also, I thought about some concurrent assignment scenarios between
>>>>>>>> pre-filter vs post-filter.
>>>>>>>> 
>>>>>>>> Example 1: When the topic broadcast is slower than the concurrent
>>>>>>>> assignment requests
>>>>>>>> 
>>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics)
>>>>>>>> t1: A -> non-compacted topic // broker A published a message to
>> the
>>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A}
>>>>>>>> t2: B -> non-compacted topic // broker B published a message, m2:
>>>>>>> {broker B
>>>>>>>> assigned bundle x to broker C}
>>>>>>>> t3: C -> non-compacted topic // broker C published a message, m3:
>>>>>>> {broker C
>>>>>>>> assigned bundle x to broker B}
>>>>>>>> t4: non-compacted topic -> L // leader broker consumed the
>> messages:
>>>>>>> m1,m2,
>>>>>>>> and m3
>>>>>>>> t5: L -> compacted topic // leader compacted the messages and
>>>>>>> broadcasted
>>>>>>>> m1 to all consumers
>>>>>>>> t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>>>>>>>> 
>>>>>>>> With post-filter + a single topic
>>>>>>>> t1: A -> topic // broker A published a message to the
>> non-compacted
>>>>>>> topic,
>>>>>>>> m1: {broker A assigned bundle x to broker A}
>>>>>>>> t2: B -> topic // broker B published a message, m2: {broker B
>> assigned
>>>>>>>> bundle x to broker C}
>>>>>>>> t3: C -> topic // broker C published a message, m3: {broker C
>> assigned
>>>>>>>> bundle x to broker B}
>>>>>>>> t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2,
>>>>>>> and m3
>>>>>>>> t5: [A,B,C] -> m1 // broker A,B,C individually compacted the
>> messages
>>>>>>> to m1.
>>>>>>>> 
>>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the
>> number
>>>>>>> of
>>>>>>>> messages to broadcast at the expense of the leader broker
>> compaction.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Example 2: When the topic broadcast is faster than the concurrent
>>>>>>>> assignment requests
>>>>>>>> 
>>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics)
>>>>>>>> t1: A -> non-compacted topic // broker A published a message to
>> the
>>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A}
>>>>>>>> t2: non-compacted topic -> L // leader broker consumed the
>> messages:
>>>>>>> m1
>>>>>>>> t3: L -> compacted topic // leader compacted the message and
>>>>>>> broadcasted m1
>>>>>>>> to all consumers
>>>>>>>> t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>>>>>>>> t5: A-> own bundle // broker A knows that its assignment has been
>>>>>>> accepted,
>>>>>>>> so proceeding to own the bundle (meanwhile deferring lookup
>> requests)
>>>>>>>> t6: B -> defer client lookups // broker B knows that bundle
>>>>>>> assignment is
>>>>>>>> running(meanwhile deferring lookup requests)
>>>>>>>> t7: C -> defer client lookups // broker C knows that bundle
>>>>>>> assignment is
>>>>>>>> running(meanwhile deferring lookup requests)
>>>>>>>> 
>>>>>>>> With post-filter + a single topic
>>>>>>>> t1: A -> topic // broker A published a message to the
>> non-compacted
>>>>>>> topic,
>>>>>>>> m1: {broker A assigned bundle x to broker A}
>>>>>>>> t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
>>>>>>>> t3:  A-> own bundle // broker A knows that its assignment has been
>>>>>>>> accepted, so proceeding to own the bundle (meanwhile deferring
>> lookup
>>>>>>>> requests)
>>>>>>>> t4: B -> defer client lookups // broker B knows that bundle
>>>>>>> assignment is
>>>>>>>> running(meanwhile deferring lookup requests)
>>>>>>>> t5: C -> defer client lookups // broker C knows that bundle
>>>>>>> assignment is
>>>>>>>> running(meanwhile deferring lookup requests)
>>>>>>>> 
>>>>>>>> Analysis: The "post-filter + a single topic" can perform ok in
>> this
>>>>>>> case
>>>>>>>> without the additional leader coordination and the secondary topic
>>>>>>> because
>>>>>>>> the early broadcast can inform all brokers and prevent them from
>>>>>>> requesting
>>>>>>>> other assignments for the same bundle.
>>>>>>>> 
>>>>>>>> I think the post-filter option is initially not bad because:
>>>>>>>> 
>>>>>>>> 1. it is safe in the worst case (in case the messages are not
>>>>>>> correctly
>>>>>>>> pre-filtered at the leader)
>>>>>>>> 2. it performs ok because the early broadcast can prevent
>>>>>>>> concurrent assignment requests.
>>>>>>>> 3. initially less complex to implement (leaderless conflict
>>>>>>> resolution and
>>>>>>>> requires a single topic)
>>>>>>>> 4. it is not a "one-way door" decision (we could add the
>> pre-filter
>>>>>>> logic
>>>>>>>> as well later)
>>>>>>>> 
>>>>>>>> Regards,
>>>>>>>> Heesung
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
>>>>>>> heesung.sohn@streamnative.io>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi Michael,
>>>>>>>>> 
>>>>>>>>> For the pre-prefilter(pre-compaction) option,
>>>>>>>>> I think the leader requires a write-through cache to compact
>>>>>>> messages
>>>>>>>>> based on the latest states. Otherwise, the leader needs to wait
>> for
>>>>>>> the
>>>>>>>>> last msg from the (compacted) topic before compacting the next
>> msg
>>>>>>> for the
>>>>>>>>> same bundle.
>>>>>>>>> 
>>>>>>>>> Pulsar guarantees "a single writer". However, for the worst-case
>>>>>>>>> scenario(due to network partitions, bugs in zk or etcd leader
>>>>>>> election,
>>>>>>>>> bugs in bk, data corruption ), I think it is safe to place the
>>>>>>> post-filter
>>>>>>>>> on the consumer side(compaction and table views) as well in
>> order to
>>>>>>>>> validate the state changes.
>>>>>>>>> 
>>>>>>>>> For the two-topic approach,
>>>>>>>>> I think we lose a single linearized view. Could we clarify how
>> to
>>>>>>> handle
>>>>>>>>> the following(edge cases and failure recovery)?
>>>>>>>>> 0. Is the un-compacted topic a persistent topic or a
>> non-persistent
>>>>>>> topic?
>>>>>>>>> 1. How does the leader recover state from the two topics?
>>>>>>>>> 2. How do we handle the case when the leader fails before
>> writing
>>>>>>> messages
>>>>>>>>> to the compacted topic
>>>>>>>>> 
>>>>>>>>> Regards,
>>>>>>>>> Heesung
>>>>>>>>> 
>>>>>>>>> On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
>>>>>>> mmarshall@apache.org>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Sharing some more thoughts. We could alternatively use two
>> topics
>>>>>>>>>> instead of one. In this design, the first topic is the
>> unfiltered
>>>>>>>>>> write ahead log that represents many writers (brokers) trying
>> to
>>>>>>>>>> acquire ownership of bundles. The second topic is the
>> distilled log
>>>>>>>>>> that represents the "winners" or the "owners" of the bundles.
>>>>>>> There is
>>>>>>>>>> a single writer, the leader broker, that reads from the input
>> topic
>>>>>>>>>> and writes to the output topic. The first topic is normal and
>> the
>>>>>>>>>> second is compacted.
>>>>>>>>>> 
>>>>>>>>>> The primary benefit in a two topic solution is that it is easy
>> for
>>>>>>> the
>>>>>>>>>> leader broker to trade off ownership without needing to slow
>> down
>>>>>>>>>> writes to the input topic. The leader broker will start
>> consuming
>>>>>>> from
>>>>>>>>>> the input topic when it has fully consumed the table view on
>> the
>>>>>>>>>> output topic. In general, I don't think consumers know when
>> they
>>>>>>> have
>>>>>>>>>> "reached the end of a table view", but we should be able to
>>>>>>> trivially
>>>>>>>>>> figure this out if we are the topic's only writer and the
>> topic and
>>>>>>>>>> writer are collocated on the same broker.
>>>>>>>>>> 
>>>>>>>>>> In that design, it might make sense to use something like the
>>>>>>>>>> replication cursor to keep track of this consumer's state.
>>>>>>>>>> 
>>>>>>>>>> - Michael
>>>>>>>>>> 
>>>>>>>>>> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
>>>>>>> mmarshall@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Thanks for your proposal, Heesung.
>>>>>>>>>>> 
>>>>>>>>>>> Fundamentally, we have the problems listed in this PIP
>> because
>>>>>>> we have
>>>>>>>>>>> multiple writers instead of just one writer. Can we solve
>> this
>>>>>>> problem
>>>>>>>>>>> by changing our write pattern? What if we use the leader
>> broker
>>>>>>> as the
>>>>>>>>>>> single writer? That broker would intercept attempts to
>> acquire
>>>>>>>>>>> ownership on bundles and would grant ownership to the first
>>>>>>> broker to
>>>>>>>>>>> claim an unassigned bundle. It could "grant ownership" by
>>>>>>> letting the
>>>>>>>>>>> first write to claim an unassigned bundle get written to the
>>>>>>> ownership
>>>>>>>>>>> topic. When a bundle is already owned, the leader won't
>> persist
>>>>>>> that
>>>>>>>>>>> event to the bookkeeper. In this design, the log becomes a
>> true
>>>>>>>>>>> ownership log, which will correctly work with the existing
>> topic
>>>>>>>>>>> compaction and table view solutions. My proposal essentially
>>>>>>> moves the
>>>>>>>>>>> conflict resolution to just before the write, and as a
>>>>>>> consequence, it
>>>>>>>>>>> greatly reduces the need for post processing of the event
>> log.
>>>>>>> One
>>>>>>>>>>> trade off might be that the leader broker could slow down the
>>>>>>> write
>>>>>>>>>>> path, but given that the leader would just need to verify the
>>>>>>> current
>>>>>>>>>>> state of the bundle, I think it'd be performant enough.
>>>>>>>>>>> 
>>>>>>>>>>> Additionally, we'd need the leader broker to be "caught up"
>> on
>>>>>>> bundle
>>>>>>>>>>> ownership in order to grant ownership of topics, but unless
>> I am
>>>>>>>>>>> mistaken, that is already a requirement of the current PIP
>> 192
>>>>>>>>>>> paradigm.
>>>>>>>>>>> 
>>>>>>>>>>> Below are some additional thoughts that will be relevant if
>> we
>>>>>>> move
>>>>>>>>>>> forward with the design as it is currently proposed.
>>>>>>>>>>> 
>>>>>>>>>>> I think it might be helpful to update the title to show that
>> this
>>>>>>>>>>> proposal will also affect table view as well. I didn't catch
>>>>>>> that at
>>>>>>>>>>> first.
>>>>>>>>>>> 
>>>>>>>>>>> Do you have any documentation describing how the
>>>>>>>>>>> TopicCompactionStrategy will determine which states are
>> valid in
>>>>>>> the
>>>>>>>>>>> context of load balancing? I looked at
>>>>>>>>>>> https://github.com/apache/pulsar/pull/18195, but I couldn't
>>>>>>> seem to
>>>>>>>>>>> find anything for it. That would help make this proposal less
>>>>>>>>>>> abstract.
>>>>>>>>>>> 
>>>>>>>>>>> The proposed API seems very tied to the needs of PIP 192. For
>>>>>>> example,
>>>>>>>>>>> `isValid` is not a term I associate with topic compaction.
>> The
>>>>>>>>>>> fundamental question for compaction is which value to keep
>> (or
>>>>>>> build a
>>>>>>>>>>> new value). I think we might be able to simplify the API by
>>>>>>> replacing
>>>>>>>>>>> the "isValid", "isMergeEnabled", and "merge" methods with a
>>>>>>> single
>>>>>>>>>>> method that lets the implementation handle one or all tasks.
>> That
>>>>>>>>>>> would also remove the need to deserialize payloads multiple
>>>>>>> times too.
>>>>>>>>>>> 
>>>>>>>>>>> I also feel like mentioning that after working with the PIP
>> 105
>>>>>>> broker
>>>>>>>>>>> side filtering, I think we should avoid running UDFs in the
>>>>>>> broker as
>>>>>>>>>>> much as possible. (I do not consider the load balancing
>> logic to
>>>>>>> be a
>>>>>>>>>>> UDF here.) I think it would be worth not making this a user
>>>>>>> facing
>>>>>>>>>>> feature unless there is demand for real use cases.
>>>>>>>>>>> 
>>>>>>>>>>> Thanks!
>>>>>>>>>>> Michael
>>>>>>>>>>> 
>>>>>>>>>>> On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org>
>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> +1(non-binding)
>>>>>>>>>>>> 
>>>>>>>>>>>> thanks,
>>>>>>>>>>>> bo
>>>>>>>>>>>> 
>>>>>>>>>>>> Heesung Sohn <he...@streamnative.io.invalid>
>>>>>>> 于2022年10月19日周三
>>>>>>>>>> 07:54写道:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi pulsar-dev community,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I raised a pip to discuss : PIP-215: Configurable Topic
>>>>>>> Compaction
>>>>>>>>>> Strategy
>>>>>>>>>>>>> 
>>>>>>>>>>>>> PIP link: https://github.com/apache/pulsar/issues/18099
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Heesung
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>> 


Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Hi,

I think `bool shouldKeepLeft(T prev, T cur)` is clearer. I updated the PIP.

Hopefully, we provided enough context about this PIP and the design
trade-off as well.

I will send out a vote email soon.

Thank you,
Heesung






On Thu, Nov 3, 2022 at 9:59 PM Michael Marshall <mm...@apache.org>
wrote:

> Thank you for your detailed responses, Heesung.
>
> > We are not planning to expose this feature to users
> > soon unless demanded and proven to be stable.
>
> In that case, I think we should move forward with this PIP. I have a
> different opinion about the trade offs for the two designs, but none
> of my concerns are problems that could not be solved later if we
> encounter problems.
>
> Just to say it succinctly, my concern is that broadcasting all
> attempts to acquire ownership of every unclaimed bundle to all brokers
> will generate a lot of unnecessary traffic.
>
> >
> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a
>
> Thank you for this reference. I missed it. That is great documentation!
>
> > In fact, with the `compact(T prev, T cur)` api only, it is not clear if
> > prev => cur is a valid transition or not(if invalid, we should filter out
> > the cur message instead of further compacting/merging). I think we still
> > need to keep the `isValid()` and `merge()` separated.
>
> I was thinking that the result of `compact` would be the result put in
> the table view or written to the compacted topic. The one issue might
> be about keeping the memory utilization down for use cases that are
> not updating the message's value but are only selecting "left" or
> "right". I thought we could infer when to keep the message id vs keep
> the message value, but that might be easy to implement.
>
> My final critique is that I think `isValid` could have a better name.
> In the event this does become a public API, I don't think all use
> cases will think about which event should be persisted in terms of
> validity.
>
> The main name that comes to my mind is `bool shouldKeepLeft(T prev, T
> cur)`. When true, prev wins. When false, cur wins. That nomenclature
> comes from Akka Streams. It's not perfect, but it is easy to infer
> what the result will do.
>
> > Regarding redundant deserialization, the input type `T` is the type of
> > message value, so the input values are already deserialized.
>
> Great, I should have realized that. That takes care of that concern.
>
> Thanks,
> Michael
>
> On Thu, Nov 3, 2022 at 7:37 PM Heesung Sohn
> <he...@streamnative.io.invalid> wrote:
> >
> > Hi,
> >
> > I have a different thought about my previous comment.
> >
> > - Agreed with your point that we should merge CompactionStrategy APIs. I
> > updated the interface proposal in the PIP. I replaced `"isValid",
> > "isMergeEnabled", and "merge"` apis with "compact" api.
> >
> > boolean isValid(T prev, T cur)
> > boolean isMergeEnabled()
> > T merge(T prev, T cur)
> >
> > =>
> >
> > T compact(T prev, T cur)
> >
> > In fact, with the `compact(T prev, T cur)` api only, it is not clear if
> > prev => cur is a valid transition or not(if invalid, we should filter out
> > the cur message instead of further compacting/merging). I think we still
> > need to keep the `isValid()` and `merge()` separated.
> >
> > Regarding redundant deserialization, the input type `T` is the type of
> > message value, so the input values are already deserialized. We don't
> want
> > to expose the Message<T> interface in this CompactionStrategy to avoid
> > message serialization/deserialization dependencies in the
> > CompactionStrategy.
> >
> > The `merge()` functionality is suggested for more complex use cases
> (merge
> > values instead of just filtering), and to support this `merge()`, we need
> > to internally create a new msg with the compacted value, metadata, and
> > messageId copies. We could initially define `isValid()` only in
> > CompactionStrategy, and add `isMergeEnabled() and merge()` later in the
> > CompactionStrategy interface if requested.
> >
> > Regards,
> > Heesung
> >
> >
> > On Thu, Nov 3, 2022 at 9:50 AM Heesung Sohn <
> heesung.sohn@streamnative.io>
> > wrote:
> >
> > > Oops! Michael, I apologize for the typo in your name.
> > >
> > > On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <
> heesung.sohn@streamnative.io>
> > > wrote:
> > >
> > >> Hi Machel,
> > >>
> > >> Here are my additional comments regarding your earlier email.
> > >>
> > >> - I updated the PIP title to show that this will impact table view as
> > >> well.
> > >>
> > >> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the
> > >> general idea of the states and their actions, and I defined the actual
> > >> states here in the PR,
> > >>
> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a.
> I
> > >> will further clarify the bundle state data validation logic when
> > >> introducing `BundleStateCompactionStrategy` class. This PIP is to
> support
> > >> CompactionStrategy in general.
> > >>
> > >> - Agreed with your point that we should merge CompactionStrategy
> APIs. I
> > >> updated the interface proposal in the PIP. I replaced `"isValid",
> > >> "isMergeEnabled", and "merge"` apis with "compact" api.
> > >>
> > >>
> > >> Thanks,
> > >> Heesung
> > >>
> > >>
> > >> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <
> > >> heesung.sohn@streamnative.io> wrote:
> > >>
> > >>> Hi,
> > >>> Thank you for the great comments.
> > >>> Please find my comments inline too.
> > >>>
> > >>> Regards,
> > >>> Heesung
> > >>>
> > >>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <
> mmarshall@apache.org>
> > >>> wrote:
> > >>>
> > >>>> > I think we lose a single linearized view.
> > >>>>
> > >>>> Which linearized view are we losing, and what is the role of that
> > >>>> linearized view? I think I might be missing why it is important. I
> > >>>> agree that consumers won't know about each unsuccessful attempted
> > >>>> acquisition of a bundle, but that seems like unnecessary information
> > >>>> to broadcast to every broker in the cluster.
> > >>>>
> > >>>>
> > >>>
> > >>> PIP-192 proposes an assignment, transfer, and split
> protocol(multi-phase
> > >>> state changes), relying on early broadcast across brokers, and all
> brokers
> > >>> react to their clients according to the state change notifications --
> > >>> brokers could defer any client lookups for bundle x if an
> > >>> assignment/transfer/split is ongoing for x(broadcasted early in the
> topic).
> > >>> One early broadcast example is the one that I discussed above, `When
> the
> > >>> topic broadcast is faster than the concurrent assignment requests.`
> I think
> > >>> the prefilter could delay this early broadcast, as it needs to go
> through
> > >>> the additional single-leader compaction path.
> > >>>
> > >>> The bundle state recovery process is simpler by a single linearized
> view.
> > >>>
> > >>> The single linearized view can be easier to debug bundle states. We
> can
> > >>> more easily track where the assignment requests come from and how it
> is
> > >>> compacted in a single linearized view.
> > >>>
> > >>>
> > >>>
> > >>>> > I think the leader requires a write-through cache to compact
> messages
> > >>>> based
> > >>>> > on the latest states.
> > >>>>
> > >>> This brings up an important point that I would like to clarify. If we
> > >>>> trust the write ahead log as the source of truth, what happens when
> a
> > >>>> bundle has been validly owned by multiple brokers? As a broker
> starts
> > >>>> and consumes from the compacted topic, how do we prevent it from
> > >>>> incorrectly thinking that it owns a bundle for some short time
> period
> > >>>> in the case that the ownership topic hasn't yet been compacted to
> > >>>> remove old ownership state?
> > >>>>
> > >>>
> > >>> Since the multi-phase transfer protocol involves the source and
> > >>> destination broker's actions, the successful transfer should get the
> source
> > >>> and destination broker to have the (near) latest state. For example,
> if
> > >>> some brokers have old ownership states(network partitioned or
> delayed),
> > >>> they will redirect clients to the source(old) broker. However, by the
> > >>> transfer protocol, the source broker should have the latest state,
> so it
> > >>> can redirect the client again to the destination broker.
> > >>>
> > >>> When a broker restarts, it won't start until its BSC state to the
> (near)
> > >>> latest (til the last known messageId at that time).
> > >>>
> > >>>
> > >>>> > Pulsar guarantees "a single writer".
> > >>>>
> > >>>> I didn't think we were using a single writer in the PIP 192 design.
> I
> > >>>> thought we had many producers sending events to a compacted topic.
> My
> > >>>> proposal would still have many producers, but the writer to
> bookkeeper
> > >>>> would act as the single writer. It would technically be distinct
> from
> > >>>> a normal Pulsar topic producer.
> > >>>>
> > >>>> I should highlight that I am only proposing "broker filtering before
> > >>>> write" in the context of PIP 192 and as an alternative to adding
> > >>>> pluggable compaction strategies. It would not be a generic feature.
> > >>>>
> > >>>>
> > >>> I was worried about the worst case where two producers(leaders)
> happen
> > >>> to write the compacted topic (although Pulsar can guarantee "a single
> > >>> writer" or "a single producer" for a topic in normal situations).
> > >>>
> > >>>
> > >>>
> > >>>> > Could we clarify how to handle
> > >>>> > the following(edge cases and failure recovery)?
> > >>>> > 0. Is the un-compacted topic a persistent topic or a
> non-persistent
> > >>>> topic?
> > >>>>
> > >>>> It is a persistent topic.
> > >>>>
> > >>>> > 1. How does the leader recover state from the two topics?
> > >>>>
> > >>>> A leader would recover state by first consuming the whole compacted
> > >>>> topic and then by consuming from the current location of a cursor on
> > >>>> the first input topic. As stated elsewhere, this introduces latency
> > >>>> and could be an issue.
> > >>>>
> > >>>> > 2. How do we handle the case when the leader fails before writing
> > >>>> messages
> > >>>> > to the compacted topic
> > >>>>
> > >>>> The leader would not acknowledge the message on the input topic
> until
> > >>>> it has successfully persisted the event on the compacted topic.
> > >>>> Publishing the same event to a compacted topic multiple times is
> > >>>> idempotent, so there is no risk of lost state. The real risk is
> > >>>> latency. However, I think we might have similar (though not the
> same)
> > >>>> latency risks in the current solution.
> > >>>>
> > >>>> > Analysis: the "pre-filter + two-topics" option can reduce the
> number
> > >>>> of
> > >>>> > messages to broadcast at the expense of the leader broker
> compaction.
> > >>>>
> > >>>> My primary point is that with this PIP's design, the filter logic is
> > >>>> run on every broker and again during topic compaction. With the
> > >>>> alternative design, the filter is run once.
> > >>>>
> > >>>> Thank you for the clarification.
> > >>>
> > >>> I think the difference is that the post-filter is an optimistic
> approach
> > >>> as it optimistically relies on the "broadcast-filter" effect(brokers
> will
> > >>> defer client lookups if notified ahead that any assignment is
> ongoing for
> > >>> bundle x). Yes, in the worst case, if the broadcast is slower, each
> broker
> > >>> needs to individually compact the conflicting assignment requests.
> > >>>
> > >>> Conversely, one downside of the pessimistic approach (single leader
> > >>> pre-filter) is that when there are not many conflict concurrent
> assignment
> > >>> requests(assign for bundle a, assign for bundle b, assign for bundle
> c...),
> > >>> the requests need to redundantly go through the leader compaction.
> > >>>
> > >>>
> > >>>
> > >>>> > 3. initially less complex to implement (leaderless conflict
> > >>>> resolution and
> > >>>> > requires a single topic)
> > >>>>
> > >>>> PIP 215 has its own complexity too. Coordinating filters
> > >>>> on both the client (table view) and the server (compaction) is non
> > >>>> trivial. The proposed API includes hard coded client configuration
> for
> > >>>> each component, which will make upgrading the version of the
> > >>>> compaction strategy complicated, and could lead to incorrect
> > >>>> interpretation of events in the stream. When a single broker is
> doing
> > >>>> the filtering, versioning is no longer a distributed problem. That
> > >>>> being said, I do not mean to suggest my solution is without
> > >>>> complexity.
> > >>>>
> > >>>> > 4. it is not a "one-way door" decision (we could add the
> pre-filter
> > >>>> logic
> > >>>> > as well later)
> > >>>>
> > >>>> It's fair to say that we could add it later, but at that point, we
> > >>>> will have added this new API for compaction strategy. Are we
> confident
> > >>>> that pluggable compaction is independently an important addition to
> > >>>> Pulsar's
> > >>>> features, or would it make sense to make this API only exposed in
> the
> > >>>> broker?
> > >>>>
> > >>>>
> > >>> The intention is that this compaction feature could be useful for
> > >>> complex user applications (if they are trying to do a similar
> thing). As I
> > >>> mentioned, this feature is closely tied to the PIP-192 now. We are
> not
> > >>> planning to expose this feature to users soon unless demanded and
> proven to
> > >>> be stable.
> > >>>
> > >>>
> > >>>> Thanks,
> > >>>> Michael
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
> > >>>> <he...@streamnative.io.invalid> wrote:
> > >>>> >
> > >>>> > Hi,
> > >>>> >
> > >>>> > Also, I thought about some concurrent assignment scenarios between
> > >>>> > pre-filter vs post-filter.
> > >>>> >
> > >>>> > Example 1: When the topic broadcast is slower than the concurrent
> > >>>> > assignment requests
> > >>>> >
> > >>>> > With pre-filter + two-topics (non-compacted and compacted topics)
> > >>>> > t1: A -> non-compacted topic // broker A published a message to
> the
> > >>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
> > >>>> > t2: B -> non-compacted topic // broker B published a message, m2:
> > >>>> {broker B
> > >>>> > assigned bundle x to broker C}
> > >>>> > t3: C -> non-compacted topic // broker C published a message, m3:
> > >>>> {broker C
> > >>>> > assigned bundle x to broker B}
> > >>>> > t4: non-compacted topic -> L // leader broker consumed the
> messages:
> > >>>> m1,m2,
> > >>>> > and m3
> > >>>> > t5: L -> compacted topic // leader compacted the messages and
> > >>>> broadcasted
> > >>>> > m1 to all consumers
> > >>>> > t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
> > >>>> >
> > >>>> > With post-filter + a single topic
> > >>>> > t1: A -> topic // broker A published a message to the
> non-compacted
> > >>>> topic,
> > >>>> > m1: {broker A assigned bundle x to broker A}
> > >>>> > t2: B -> topic // broker B published a message, m2: {broker B
> assigned
> > >>>> > bundle x to broker C}
> > >>>> > t3: C -> topic // broker C published a message, m3: {broker C
> assigned
> > >>>> > bundle x to broker B}
> > >>>> > t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2,
> > >>>> and m3
> > >>>> > t5: [A,B,C] -> m1 // broker A,B,C individually compacted the
> messages
> > >>>> to m1.
> > >>>> >
> > >>>> > Analysis: the "pre-filter + two-topics" option can reduce the
> number
> > >>>> of
> > >>>> > messages to broadcast at the expense of the leader broker
> compaction.
> > >>>> >
> > >>>> >
> > >>>> > Example 2: When the topic broadcast is faster than the concurrent
> > >>>> > assignment requests
> > >>>> >
> > >>>> > With pre-filter + two-topics (non-compacted and compacted topics)
> > >>>> > t1: A -> non-compacted topic // broker A published a message to
> the
> > >>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
> > >>>> > t2: non-compacted topic -> L // leader broker consumed the
> messages:
> > >>>> m1
> > >>>> > t3: L -> compacted topic // leader compacted the message and
> > >>>> broadcasted m1
> > >>>> > to all consumers
> > >>>> > t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
> > >>>> > t5: A-> own bundle // broker A knows that its assignment has been
> > >>>> accepted,
> > >>>> > so proceeding to own the bundle (meanwhile deferring lookup
> requests)
> > >>>> > t6: B -> defer client lookups // broker B knows that bundle
> > >>>> assignment is
> > >>>> > running(meanwhile deferring lookup requests)
> > >>>> > t7: C -> defer client lookups // broker C knows that bundle
> > >>>> assignment is
> > >>>> > running(meanwhile deferring lookup requests)
> > >>>> >
> > >>>> > With post-filter + a single topic
> > >>>> > t1: A -> topic // broker A published a message to the
> non-compacted
> > >>>> topic,
> > >>>> > m1: {broker A assigned bundle x to broker A}
> > >>>> > t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
> > >>>> > t3:  A-> own bundle // broker A knows that its assignment has been
> > >>>> > accepted, so proceeding to own the bundle (meanwhile deferring
> lookup
> > >>>> > requests)
> > >>>> > t4: B -> defer client lookups // broker B knows that bundle
> > >>>> assignment is
> > >>>> > running(meanwhile deferring lookup requests)
> > >>>> > t5: C -> defer client lookups // broker C knows that bundle
> > >>>> assignment is
> > >>>> > running(meanwhile deferring lookup requests)
> > >>>> >
> > >>>> > Analysis: The "post-filter + a single topic" can perform ok in
> this
> > >>>> case
> > >>>> > without the additional leader coordination and the secondary topic
> > >>>> because
> > >>>> > the early broadcast can inform all brokers and prevent them from
> > >>>> requesting
> > >>>> > other assignments for the same bundle.
> > >>>> >
> > >>>> > I think the post-filter option is initially not bad because:
> > >>>> >
> > >>>> > 1. it is safe in the worst case (in case the messages are not
> > >>>> correctly
> > >>>> > pre-filtered at the leader)
> > >>>> > 2. it performs ok because the early broadcast can prevent
> > >>>> > concurrent assignment requests.
> > >>>> > 3. initially less complex to implement (leaderless conflict
> > >>>> resolution and
> > >>>> > requires a single topic)
> > >>>> > 4. it is not a "one-way door" decision (we could add the
> pre-filter
> > >>>> logic
> > >>>> > as well later)
> > >>>> >
> > >>>> > Regards,
> > >>>> > Heesung
> > >>>> >
> > >>>> >
> > >>>> >
> > >>>> > On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
> > >>>> heesung.sohn@streamnative.io>
> > >>>> > wrote:
> > >>>> >
> > >>>> > > Hi Michael,
> > >>>> > >
> > >>>> > > For the pre-prefilter(pre-compaction) option,
> > >>>> > > I think the leader requires a write-through cache to compact
> > >>>> messages
> > >>>> > > based on the latest states. Otherwise, the leader needs to wait
> for
> > >>>> the
> > >>>> > > last msg from the (compacted) topic before compacting the next
> msg
> > >>>> for the
> > >>>> > > same bundle.
> > >>>> > >
> > >>>> > > Pulsar guarantees "a single writer". However, for the worst-case
> > >>>> > > scenario(due to network partitions, bugs in zk or etcd leader
> > >>>> election,
> > >>>> > > bugs in bk, data corruption ), I think it is safe to place the
> > >>>> post-filter
> > >>>> > > on the consumer side(compaction and table views) as well in
> order to
> > >>>> > > validate the state changes.
> > >>>> > >
> > >>>> > > For the two-topic approach,
> > >>>> > > I think we lose a single linearized view. Could we clarify how
> to
> > >>>> handle
> > >>>> > > the following(edge cases and failure recovery)?
> > >>>> > > 0. Is the un-compacted topic a persistent topic or a
> non-persistent
> > >>>> topic?
> > >>>> > > 1. How does the leader recover state from the two topics?
> > >>>> > > 2. How do we handle the case when the leader fails before
> writing
> > >>>> messages
> > >>>> > > to the compacted topic
> > >>>> > >
> > >>>> > > Regards,
> > >>>> > > Heesung
> > >>>> > >
> > >>>> > > On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
> > >>>> mmarshall@apache.org>
> > >>>> > > wrote:
> > >>>> > >
> > >>>> > >> Sharing some more thoughts. We could alternatively use two
> topics
> > >>>> > >> instead of one. In this design, the first topic is the
> unfiltered
> > >>>> > >> write ahead log that represents many writers (brokers) trying
> to
> > >>>> > >> acquire ownership of bundles. The second topic is the
> distilled log
> > >>>> > >> that represents the "winners" or the "owners" of the bundles.
> > >>>> There is
> > >>>> > >> a single writer, the leader broker, that reads from the input
> topic
> > >>>> > >> and writes to the output topic. The first topic is normal and
> the
> > >>>> > >> second is compacted.
> > >>>> > >>
> > >>>> > >> The primary benefit in a two topic solution is that it is easy
> for
> > >>>> the
> > >>>> > >> leader broker to trade off ownership without needing to slow
> down
> > >>>> > >> writes to the input topic. The leader broker will start
> consuming
> > >>>> from
> > >>>> > >> the input topic when it has fully consumed the table view on
> the
> > >>>> > >> output topic. In general, I don't think consumers know when
> they
> > >>>> have
> > >>>> > >> "reached the end of a table view", but we should be able to
> > >>>> trivially
> > >>>> > >> figure this out if we are the topic's only writer and the
> topic and
> > >>>> > >> writer are collocated on the same broker.
> > >>>> > >>
> > >>>> > >> In that design, it might make sense to use something like the
> > >>>> > >> replication cursor to keep track of this consumer's state.
> > >>>> > >>
> > >>>> > >> - Michael
> > >>>> > >>
> > >>>> > >> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
> > >>>> mmarshall@apache.org>
> > >>>> > >> wrote:
> > >>>> > >> >
> > >>>> > >> > Thanks for your proposal, Heesung.
> > >>>> > >> >
> > >>>> > >> > Fundamentally, we have the problems listed in this PIP
> because
> > >>>> we have
> > >>>> > >> > multiple writers instead of just one writer. Can we solve
> this
> > >>>> problem
> > >>>> > >> > by changing our write pattern? What if we use the leader
> broker
> > >>>> as the
> > >>>> > >> > single writer? That broker would intercept attempts to
> acquire
> > >>>> > >> > ownership on bundles and would grant ownership to the first
> > >>>> broker to
> > >>>> > >> > claim an unassigned bundle. It could "grant ownership" by
> > >>>> letting the
> > >>>> > >> > first write to claim an unassigned bundle get written to the
> > >>>> ownership
> > >>>> > >> > topic. When a bundle is already owned, the leader won't
> persist
> > >>>> that
> > >>>> > >> > event to the bookkeeper. In this design, the log becomes a
> true
> > >>>> > >> > ownership log, which will correctly work with the existing
> topic
> > >>>> > >> > compaction and table view solutions. My proposal essentially
> > >>>> moves the
> > >>>> > >> > conflict resolution to just before the write, and as a
> > >>>> consequence, it
> > >>>> > >> > greatly reduces the need for post processing of the event
> log.
> > >>>> One
> > >>>> > >> > trade off might be that the leader broker could slow down the
> > >>>> write
> > >>>> > >> > path, but given that the leader would just need to verify the
> > >>>> current
> > >>>> > >> > state of the bundle, I think it'd be performant enough.
> > >>>> > >> >
> > >>>> > >> > Additionally, we'd need the leader broker to be "caught up"
> on
> > >>>> bundle
> > >>>> > >> > ownership in order to grant ownership of topics, but unless
> I am
> > >>>> > >> > mistaken, that is already a requirement of the current PIP
> 192
> > >>>> > >> > paradigm.
> > >>>> > >> >
> > >>>> > >> > Below are some additional thoughts that will be relevant if
> we
> > >>>> move
> > >>>> > >> > forward with the design as it is currently proposed.
> > >>>> > >> >
> > >>>> > >> > I think it might be helpful to update the title to show that
> this
> > >>>> > >> > proposal will also affect table view as well. I didn't catch
> > >>>> that at
> > >>>> > >> > first.
> > >>>> > >> >
> > >>>> > >> > Do you have any documentation describing how the
> > >>>> > >> > TopicCompactionStrategy will determine which states are
> valid in
> > >>>> the
> > >>>> > >> > context of load balancing? I looked at
> > >>>> > >> > https://github.com/apache/pulsar/pull/18195, but I couldn't
> > >>>> seem to
> > >>>> > >> > find anything for it. That would help make this proposal less
> > >>>> > >> > abstract.
> > >>>> > >> >
> > >>>> > >> > The proposed API seems very tied to the needs of PIP 192. For
> > >>>> example,
> > >>>> > >> > `isValid` is not a term I associate with topic compaction.
> The
> > >>>> > >> > fundamental question for compaction is which value to keep
> (or
> > >>>> build a
> > >>>> > >> > new value). I think we might be able to simplify the API by
> > >>>> replacing
> > >>>> > >> > the "isValid", "isMergeEnabled", and "merge" methods with a
> > >>>> single
> > >>>> > >> > method that lets the implementation handle one or all tasks.
> That
> > >>>> > >> > would also remove the need to deserialize payloads multiple
> > >>>> times too.
> > >>>> > >> >
> > >>>> > >> > I also feel like mentioning that after working with the PIP
> 105
> > >>>> broker
> > >>>> > >> > side filtering, I think we should avoid running UDFs in the
> > >>>> broker as
> > >>>> > >> > much as possible. (I do not consider the load balancing
> logic to
> > >>>> be a
> > >>>> > >> > UDF here.) I think it would be worth not making this a user
> > >>>> facing
> > >>>> > >> > feature unless there is demand for real use cases.
> > >>>> > >> >
> > >>>> > >> > Thanks!
> > >>>> > >> > Michael
> > >>>> > >> >
> > >>>> > >> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org>
> wrote:
> > >>>> > >> > >
> > >>>> > >> > > +1(non-binding)
> > >>>> > >> > >
> > >>>> > >> > > thanks,
> > >>>> > >> > > bo
> > >>>> > >> > >
> > >>>> > >> > > Heesung Sohn <he...@streamnative.io.invalid>
> > >>>> 于2022年10月19日周三
> > >>>> > >> 07:54写道:
> > >>>> > >> > > >
> > >>>> > >> > > > Hi pulsar-dev community,
> > >>>> > >> > > >
> > >>>> > >> > > > I raised a pip to discuss : PIP-215: Configurable Topic
> > >>>> Compaction
> > >>>> > >> Strategy
> > >>>> > >> > > >
> > >>>> > >> > > > PIP link: https://github.com/apache/pulsar/issues/18099
> > >>>> > >> > > >
> > >>>> > >> > > > Regards,
> > >>>> > >> > > > Heesung
> > >>>> > >>
> > >>>> > >
> > >>>>
> > >>>
>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Michael Marshall <mm...@apache.org>.
Thank you for your detailed responses, Heesung.

> We are not planning to expose this feature to users
> soon unless demanded and proven to be stable.

In that case, I think we should move forward with this PIP. I have a
different opinion about the trade offs for the two designs, but none
of my concerns are problems that could not be solved later if we
encounter problems.

Just to say it succinctly, my concern is that broadcasting all
attempts to acquire ownership of every unclaimed bundle to all brokers
will generate a lot of unnecessary traffic.

> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a

Thank you for this reference. I missed it. That is great documentation!

> In fact, with the `compact(T prev, T cur)` api only, it is not clear if
> prev => cur is a valid transition or not(if invalid, we should filter out
> the cur message instead of further compacting/merging). I think we still
> need to keep the `isValid()` and `merge()` separated.

I was thinking that the result of `compact` would be the result put in
the table view or written to the compacted topic. The one issue might
be about keeping the memory utilization down for use cases that are
not updating the message's value but are only selecting "left" or
"right". I thought we could infer when to keep the message id vs keep
the message value, but that might be easy to implement.

My final critique is that I think `isValid` could have a better name.
In the event this does become a public API, I don't think all use
cases will think about which event should be persisted in terms of
validity.

The main name that comes to my mind is `bool shouldKeepLeft(T prev, T
cur)`. When true, prev wins. When false, cur wins. That nomenclature
comes from Akka Streams. It's not perfect, but it is easy to infer
what the result will do.

> Regarding redundant deserialization, the input type `T` is the type of
> message value, so the input values are already deserialized.

Great, I should have realized that. That takes care of that concern.

Thanks,
Michael

On Thu, Nov 3, 2022 at 7:37 PM Heesung Sohn
<he...@streamnative.io.invalid> wrote:
>
> Hi,
>
> I have a different thought about my previous comment.
>
> - Agreed with your point that we should merge CompactionStrategy APIs. I
> updated the interface proposal in the PIP. I replaced `"isValid",
> "isMergeEnabled", and "merge"` apis with "compact" api.
>
> boolean isValid(T prev, T cur)
> boolean isMergeEnabled()
> T merge(T prev, T cur)
>
> =>
>
> T compact(T prev, T cur)
>
> In fact, with the `compact(T prev, T cur)` api only, it is not clear if
> prev => cur is a valid transition or not(if invalid, we should filter out
> the cur message instead of further compacting/merging). I think we still
> need to keep the `isValid()` and `merge()` separated.
>
> Regarding redundant deserialization, the input type `T` is the type of
> message value, so the input values are already deserialized. We don't want
> to expose the Message<T> interface in this CompactionStrategy to avoid
> message serialization/deserialization dependencies in the
> CompactionStrategy.
>
> The `merge()` functionality is suggested for more complex use cases (merge
> values instead of just filtering), and to support this `merge()`, we need
> to internally create a new msg with the compacted value, metadata, and
> messageId copies. We could initially define `isValid()` only in
> CompactionStrategy, and add `isMergeEnabled() and merge()` later in the
> CompactionStrategy interface if requested.
>
> Regards,
> Heesung
>
>
> On Thu, Nov 3, 2022 at 9:50 AM Heesung Sohn <he...@streamnative.io>
> wrote:
>
> > Oops! Michael, I apologize for the typo in your name.
> >
> > On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <he...@streamnative.io>
> > wrote:
> >
> >> Hi Machel,
> >>
> >> Here are my additional comments regarding your earlier email.
> >>
> >> - I updated the PIP title to show that this will impact table view as
> >> well.
> >>
> >> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the
> >> general idea of the states and their actions, and I defined the actual
> >> states here in the PR,
> >> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a. I
> >> will further clarify the bundle state data validation logic when
> >> introducing `BundleStateCompactionStrategy` class. This PIP is to support
> >> CompactionStrategy in general.
> >>
> >> - Agreed with your point that we should merge CompactionStrategy APIs. I
> >> updated the interface proposal in the PIP. I replaced `"isValid",
> >> "isMergeEnabled", and "merge"` apis with "compact" api.
> >>
> >>
> >> Thanks,
> >> Heesung
> >>
> >>
> >> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <
> >> heesung.sohn@streamnative.io> wrote:
> >>
> >>> Hi,
> >>> Thank you for the great comments.
> >>> Please find my comments inline too.
> >>>
> >>> Regards,
> >>> Heesung
> >>>
> >>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <mm...@apache.org>
> >>> wrote:
> >>>
> >>>> > I think we lose a single linearized view.
> >>>>
> >>>> Which linearized view are we losing, and what is the role of that
> >>>> linearized view? I think I might be missing why it is important. I
> >>>> agree that consumers won't know about each unsuccessful attempted
> >>>> acquisition of a bundle, but that seems like unnecessary information
> >>>> to broadcast to every broker in the cluster.
> >>>>
> >>>>
> >>>
> >>> PIP-192 proposes an assignment, transfer, and split protocol(multi-phase
> >>> state changes), relying on early broadcast across brokers, and all brokers
> >>> react to their clients according to the state change notifications --
> >>> brokers could defer any client lookups for bundle x if an
> >>> assignment/transfer/split is ongoing for x(broadcasted early in the topic).
> >>> One early broadcast example is the one that I discussed above, `When the
> >>> topic broadcast is faster than the concurrent assignment requests.` I think
> >>> the prefilter could delay this early broadcast, as it needs to go through
> >>> the additional single-leader compaction path.
> >>>
> >>> The bundle state recovery process is simpler by a single linearized view.
> >>>
> >>> The single linearized view can be easier to debug bundle states. We can
> >>> more easily track where the assignment requests come from and how it is
> >>> compacted in a single linearized view.
> >>>
> >>>
> >>>
> >>>> > I think the leader requires a write-through cache to compact messages
> >>>> based
> >>>> > on the latest states.
> >>>>
> >>> This brings up an important point that I would like to clarify. If we
> >>>> trust the write ahead log as the source of truth, what happens when a
> >>>> bundle has been validly owned by multiple brokers? As a broker starts
> >>>> and consumes from the compacted topic, how do we prevent it from
> >>>> incorrectly thinking that it owns a bundle for some short time period
> >>>> in the case that the ownership topic hasn't yet been compacted to
> >>>> remove old ownership state?
> >>>>
> >>>
> >>> Since the multi-phase transfer protocol involves the source and
> >>> destination broker's actions, the successful transfer should get the source
> >>> and destination broker to have the (near) latest state. For example, if
> >>> some brokers have old ownership states(network partitioned or delayed),
> >>> they will redirect clients to the source(old) broker. However, by the
> >>> transfer protocol, the source broker should have the latest state, so it
> >>> can redirect the client again to the destination broker.
> >>>
> >>> When a broker restarts, it won't start until its BSC state to the (near)
> >>> latest (til the last known messageId at that time).
> >>>
> >>>
> >>>> > Pulsar guarantees "a single writer".
> >>>>
> >>>> I didn't think we were using a single writer in the PIP 192 design. I
> >>>> thought we had many producers sending events to a compacted topic. My
> >>>> proposal would still have many producers, but the writer to bookkeeper
> >>>> would act as the single writer. It would technically be distinct from
> >>>> a normal Pulsar topic producer.
> >>>>
> >>>> I should highlight that I am only proposing "broker filtering before
> >>>> write" in the context of PIP 192 and as an alternative to adding
> >>>> pluggable compaction strategies. It would not be a generic feature.
> >>>>
> >>>>
> >>> I was worried about the worst case where two producers(leaders) happen
> >>> to write the compacted topic (although Pulsar can guarantee "a single
> >>> writer" or "a single producer" for a topic in normal situations).
> >>>
> >>>
> >>>
> >>>> > Could we clarify how to handle
> >>>> > the following(edge cases and failure recovery)?
> >>>> > 0. Is the un-compacted topic a persistent topic or a non-persistent
> >>>> topic?
> >>>>
> >>>> It is a persistent topic.
> >>>>
> >>>> > 1. How does the leader recover state from the two topics?
> >>>>
> >>>> A leader would recover state by first consuming the whole compacted
> >>>> topic and then by consuming from the current location of a cursor on
> >>>> the first input topic. As stated elsewhere, this introduces latency
> >>>> and could be an issue.
> >>>>
> >>>> > 2. How do we handle the case when the leader fails before writing
> >>>> messages
> >>>> > to the compacted topic
> >>>>
> >>>> The leader would not acknowledge the message on the input topic until
> >>>> it has successfully persisted the event on the compacted topic.
> >>>> Publishing the same event to a compacted topic multiple times is
> >>>> idempotent, so there is no risk of lost state. The real risk is
> >>>> latency. However, I think we might have similar (though not the same)
> >>>> latency risks in the current solution.
> >>>>
> >>>> > Analysis: the "pre-filter + two-topics" option can reduce the number
> >>>> of
> >>>> > messages to broadcast at the expense of the leader broker compaction.
> >>>>
> >>>> My primary point is that with this PIP's design, the filter logic is
> >>>> run on every broker and again during topic compaction. With the
> >>>> alternative design, the filter is run once.
> >>>>
> >>>> Thank you for the clarification.
> >>>
> >>> I think the difference is that the post-filter is an optimistic approach
> >>> as it optimistically relies on the "broadcast-filter" effect(brokers will
> >>> defer client lookups if notified ahead that any assignment is ongoing for
> >>> bundle x). Yes, in the worst case, if the broadcast is slower, each broker
> >>> needs to individually compact the conflicting assignment requests.
> >>>
> >>> Conversely, one downside of the pessimistic approach (single leader
> >>> pre-filter) is that when there are not many conflict concurrent assignment
> >>> requests(assign for bundle a, assign for bundle b, assign for bundle c...),
> >>> the requests need to redundantly go through the leader compaction.
> >>>
> >>>
> >>>
> >>>> > 3. initially less complex to implement (leaderless conflict
> >>>> resolution and
> >>>> > requires a single topic)
> >>>>
> >>>> PIP 215 has its own complexity too. Coordinating filters
> >>>> on both the client (table view) and the server (compaction) is non
> >>>> trivial. The proposed API includes hard coded client configuration for
> >>>> each component, which will make upgrading the version of the
> >>>> compaction strategy complicated, and could lead to incorrect
> >>>> interpretation of events in the stream. When a single broker is doing
> >>>> the filtering, versioning is no longer a distributed problem. That
> >>>> being said, I do not mean to suggest my solution is without
> >>>> complexity.
> >>>>
> >>>> > 4. it is not a "one-way door" decision (we could add the pre-filter
> >>>> logic
> >>>> > as well later)
> >>>>
> >>>> It's fair to say that we could add it later, but at that point, we
> >>>> will have added this new API for compaction strategy. Are we confident
> >>>> that pluggable compaction is independently an important addition to
> >>>> Pulsar's
> >>>> features, or would it make sense to make this API only exposed in the
> >>>> broker?
> >>>>
> >>>>
> >>> The intention is that this compaction feature could be useful for
> >>> complex user applications (if they are trying to do a similar thing). As I
> >>> mentioned, this feature is closely tied to the PIP-192 now. We are not
> >>> planning to expose this feature to users soon unless demanded and proven to
> >>> be stable.
> >>>
> >>>
> >>>> Thanks,
> >>>> Michael
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
> >>>> <he...@streamnative.io.invalid> wrote:
> >>>> >
> >>>> > Hi,
> >>>> >
> >>>> > Also, I thought about some concurrent assignment scenarios between
> >>>> > pre-filter vs post-filter.
> >>>> >
> >>>> > Example 1: When the topic broadcast is slower than the concurrent
> >>>> > assignment requests
> >>>> >
> >>>> > With pre-filter + two-topics (non-compacted and compacted topics)
> >>>> > t1: A -> non-compacted topic // broker A published a message to the
> >>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
> >>>> > t2: B -> non-compacted topic // broker B published a message, m2:
> >>>> {broker B
> >>>> > assigned bundle x to broker C}
> >>>> > t3: C -> non-compacted topic // broker C published a message, m3:
> >>>> {broker C
> >>>> > assigned bundle x to broker B}
> >>>> > t4: non-compacted topic -> L // leader broker consumed the messages:
> >>>> m1,m2,
> >>>> > and m3
> >>>> > t5: L -> compacted topic // leader compacted the messages and
> >>>> broadcasted
> >>>> > m1 to all consumers
> >>>> > t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
> >>>> >
> >>>> > With post-filter + a single topic
> >>>> > t1: A -> topic // broker A published a message to the non-compacted
> >>>> topic,
> >>>> > m1: {broker A assigned bundle x to broker A}
> >>>> > t2: B -> topic // broker B published a message, m2: {broker B assigned
> >>>> > bundle x to broker C}
> >>>> > t3: C -> topic // broker C published a message, m3: {broker C assigned
> >>>> > bundle x to broker B}
> >>>> > t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2,
> >>>> and m3
> >>>> > t5: [A,B,C] -> m1 // broker A,B,C individually compacted the messages
> >>>> to m1.
> >>>> >
> >>>> > Analysis: the "pre-filter + two-topics" option can reduce the number
> >>>> of
> >>>> > messages to broadcast at the expense of the leader broker compaction.
> >>>> >
> >>>> >
> >>>> > Example 2: When the topic broadcast is faster than the concurrent
> >>>> > assignment requests
> >>>> >
> >>>> > With pre-filter + two-topics (non-compacted and compacted topics)
> >>>> > t1: A -> non-compacted topic // broker A published a message to the
> >>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
> >>>> > t2: non-compacted topic -> L // leader broker consumed the messages:
> >>>> m1
> >>>> > t3: L -> compacted topic // leader compacted the message and
> >>>> broadcasted m1
> >>>> > to all consumers
> >>>> > t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
> >>>> > t5: A-> own bundle // broker A knows that its assignment has been
> >>>> accepted,
> >>>> > so proceeding to own the bundle (meanwhile deferring lookup requests)
> >>>> > t6: B -> defer client lookups // broker B knows that bundle
> >>>> assignment is
> >>>> > running(meanwhile deferring lookup requests)
> >>>> > t7: C -> defer client lookups // broker C knows that bundle
> >>>> assignment is
> >>>> > running(meanwhile deferring lookup requests)
> >>>> >
> >>>> > With post-filter + a single topic
> >>>> > t1: A -> topic // broker A published a message to the non-compacted
> >>>> topic,
> >>>> > m1: {broker A assigned bundle x to broker A}
> >>>> > t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
> >>>> > t3:  A-> own bundle // broker A knows that its assignment has been
> >>>> > accepted, so proceeding to own the bundle (meanwhile deferring lookup
> >>>> > requests)
> >>>> > t4: B -> defer client lookups // broker B knows that bundle
> >>>> assignment is
> >>>> > running(meanwhile deferring lookup requests)
> >>>> > t5: C -> defer client lookups // broker C knows that bundle
> >>>> assignment is
> >>>> > running(meanwhile deferring lookup requests)
> >>>> >
> >>>> > Analysis: The "post-filter + a single topic" can perform ok in this
> >>>> case
> >>>> > without the additional leader coordination and the secondary topic
> >>>> because
> >>>> > the early broadcast can inform all brokers and prevent them from
> >>>> requesting
> >>>> > other assignments for the same bundle.
> >>>> >
> >>>> > I think the post-filter option is initially not bad because:
> >>>> >
> >>>> > 1. it is safe in the worst case (in case the messages are not
> >>>> correctly
> >>>> > pre-filtered at the leader)
> >>>> > 2. it performs ok because the early broadcast can prevent
> >>>> > concurrent assignment requests.
> >>>> > 3. initially less complex to implement (leaderless conflict
> >>>> resolution and
> >>>> > requires a single topic)
> >>>> > 4. it is not a "one-way door" decision (we could add the pre-filter
> >>>> logic
> >>>> > as well later)
> >>>> >
> >>>> > Regards,
> >>>> > Heesung
> >>>> >
> >>>> >
> >>>> >
> >>>> > On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
> >>>> heesung.sohn@streamnative.io>
> >>>> > wrote:
> >>>> >
> >>>> > > Hi Michael,
> >>>> > >
> >>>> > > For the pre-prefilter(pre-compaction) option,
> >>>> > > I think the leader requires a write-through cache to compact
> >>>> messages
> >>>> > > based on the latest states. Otherwise, the leader needs to wait for
> >>>> the
> >>>> > > last msg from the (compacted) topic before compacting the next msg
> >>>> for the
> >>>> > > same bundle.
> >>>> > >
> >>>> > > Pulsar guarantees "a single writer". However, for the worst-case
> >>>> > > scenario(due to network partitions, bugs in zk or etcd leader
> >>>> election,
> >>>> > > bugs in bk, data corruption ), I think it is safe to place the
> >>>> post-filter
> >>>> > > on the consumer side(compaction and table views) as well in order to
> >>>> > > validate the state changes.
> >>>> > >
> >>>> > > For the two-topic approach,
> >>>> > > I think we lose a single linearized view. Could we clarify how to
> >>>> handle
> >>>> > > the following(edge cases and failure recovery)?
> >>>> > > 0. Is the un-compacted topic a persistent topic or a non-persistent
> >>>> topic?
> >>>> > > 1. How does the leader recover state from the two topics?
> >>>> > > 2. How do we handle the case when the leader fails before writing
> >>>> messages
> >>>> > > to the compacted topic
> >>>> > >
> >>>> > > Regards,
> >>>> > > Heesung
> >>>> > >
> >>>> > > On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
> >>>> mmarshall@apache.org>
> >>>> > > wrote:
> >>>> > >
> >>>> > >> Sharing some more thoughts. We could alternatively use two topics
> >>>> > >> instead of one. In this design, the first topic is the unfiltered
> >>>> > >> write ahead log that represents many writers (brokers) trying to
> >>>> > >> acquire ownership of bundles. The second topic is the distilled log
> >>>> > >> that represents the "winners" or the "owners" of the bundles.
> >>>> There is
> >>>> > >> a single writer, the leader broker, that reads from the input topic
> >>>> > >> and writes to the output topic. The first topic is normal and the
> >>>> > >> second is compacted.
> >>>> > >>
> >>>> > >> The primary benefit in a two topic solution is that it is easy for
> >>>> the
> >>>> > >> leader broker to trade off ownership without needing to slow down
> >>>> > >> writes to the input topic. The leader broker will start consuming
> >>>> from
> >>>> > >> the input topic when it has fully consumed the table view on the
> >>>> > >> output topic. In general, I don't think consumers know when they
> >>>> have
> >>>> > >> "reached the end of a table view", but we should be able to
> >>>> trivially
> >>>> > >> figure this out if we are the topic's only writer and the topic and
> >>>> > >> writer are collocated on the same broker.
> >>>> > >>
> >>>> > >> In that design, it might make sense to use something like the
> >>>> > >> replication cursor to keep track of this consumer's state.
> >>>> > >>
> >>>> > >> - Michael
> >>>> > >>
> >>>> > >> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
> >>>> mmarshall@apache.org>
> >>>> > >> wrote:
> >>>> > >> >
> >>>> > >> > Thanks for your proposal, Heesung.
> >>>> > >> >
> >>>> > >> > Fundamentally, we have the problems listed in this PIP because
> >>>> we have
> >>>> > >> > multiple writers instead of just one writer. Can we solve this
> >>>> problem
> >>>> > >> > by changing our write pattern? What if we use the leader broker
> >>>> as the
> >>>> > >> > single writer? That broker would intercept attempts to acquire
> >>>> > >> > ownership on bundles and would grant ownership to the first
> >>>> broker to
> >>>> > >> > claim an unassigned bundle. It could "grant ownership" by
> >>>> letting the
> >>>> > >> > first write to claim an unassigned bundle get written to the
> >>>> ownership
> >>>> > >> > topic. When a bundle is already owned, the leader won't persist
> >>>> that
> >>>> > >> > event to the bookkeeper. In this design, the log becomes a true
> >>>> > >> > ownership log, which will correctly work with the existing topic
> >>>> > >> > compaction and table view solutions. My proposal essentially
> >>>> moves the
> >>>> > >> > conflict resolution to just before the write, and as a
> >>>> consequence, it
> >>>> > >> > greatly reduces the need for post processing of the event log.
> >>>> One
> >>>> > >> > trade off might be that the leader broker could slow down the
> >>>> write
> >>>> > >> > path, but given that the leader would just need to verify the
> >>>> current
> >>>> > >> > state of the bundle, I think it'd be performant enough.
> >>>> > >> >
> >>>> > >> > Additionally, we'd need the leader broker to be "caught up" on
> >>>> bundle
> >>>> > >> > ownership in order to grant ownership of topics, but unless I am
> >>>> > >> > mistaken, that is already a requirement of the current PIP 192
> >>>> > >> > paradigm.
> >>>> > >> >
> >>>> > >> > Below are some additional thoughts that will be relevant if we
> >>>> move
> >>>> > >> > forward with the design as it is currently proposed.
> >>>> > >> >
> >>>> > >> > I think it might be helpful to update the title to show that this
> >>>> > >> > proposal will also affect table view as well. I didn't catch
> >>>> that at
> >>>> > >> > first.
> >>>> > >> >
> >>>> > >> > Do you have any documentation describing how the
> >>>> > >> > TopicCompactionStrategy will determine which states are valid in
> >>>> the
> >>>> > >> > context of load balancing? I looked at
> >>>> > >> > https://github.com/apache/pulsar/pull/18195, but I couldn't
> >>>> seem to
> >>>> > >> > find anything for it. That would help make this proposal less
> >>>> > >> > abstract.
> >>>> > >> >
> >>>> > >> > The proposed API seems very tied to the needs of PIP 192. For
> >>>> example,
> >>>> > >> > `isValid` is not a term I associate with topic compaction. The
> >>>> > >> > fundamental question for compaction is which value to keep (or
> >>>> build a
> >>>> > >> > new value). I think we might be able to simplify the API by
> >>>> replacing
> >>>> > >> > the "isValid", "isMergeEnabled", and "merge" methods with a
> >>>> single
> >>>> > >> > method that lets the implementation handle one or all tasks. That
> >>>> > >> > would also remove the need to deserialize payloads multiple
> >>>> times too.
> >>>> > >> >
> >>>> > >> > I also feel like mentioning that after working with the PIP 105
> >>>> broker
> >>>> > >> > side filtering, I think we should avoid running UDFs in the
> >>>> broker as
> >>>> > >> > much as possible. (I do not consider the load balancing logic to
> >>>> be a
> >>>> > >> > UDF here.) I think it would be worth not making this a user
> >>>> facing
> >>>> > >> > feature unless there is demand for real use cases.
> >>>> > >> >
> >>>> > >> > Thanks!
> >>>> > >> > Michael
> >>>> > >> >
> >>>> > >> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org> wrote:
> >>>> > >> > >
> >>>> > >> > > +1(non-binding)
> >>>> > >> > >
> >>>> > >> > > thanks,
> >>>> > >> > > bo
> >>>> > >> > >
> >>>> > >> > > Heesung Sohn <he...@streamnative.io.invalid>
> >>>> 于2022年10月19日周三
> >>>> > >> 07:54写道:
> >>>> > >> > > >
> >>>> > >> > > > Hi pulsar-dev community,
> >>>> > >> > > >
> >>>> > >> > > > I raised a pip to discuss : PIP-215: Configurable Topic
> >>>> Compaction
> >>>> > >> Strategy
> >>>> > >> > > >
> >>>> > >> > > > PIP link: https://github.com/apache/pulsar/issues/18099
> >>>> > >> > > >
> >>>> > >> > > > Regards,
> >>>> > >> > > > Heesung
> >>>> > >>
> >>>> > >
> >>>>
> >>>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Hi,

I have a different thought about my previous comment.

- Agreed with your point that we should merge CompactionStrategy APIs. I
updated the interface proposal in the PIP. I replaced `"isValid",
"isMergeEnabled", and "merge"` apis with "compact" api.

boolean isValid(T prev, T cur)
boolean isMergeEnabled()
T merge(T prev, T cur)

=>

T compact(T prev, T cur)

In fact, with the `compact(T prev, T cur)` api only, it is not clear if
prev => cur is a valid transition or not(if invalid, we should filter out
the cur message instead of further compacting/merging). I think we still
need to keep the `isValid()` and `merge()` separated.

Regarding redundant deserialization, the input type `T` is the type of
message value, so the input values are already deserialized. We don't want
to expose the Message<T> interface in this CompactionStrategy to avoid
message serialization/deserialization dependencies in the
CompactionStrategy.

The `merge()` functionality is suggested for more complex use cases (merge
values instead of just filtering), and to support this `merge()`, we need
to internally create a new msg with the compacted value, metadata, and
messageId copies. We could initially define `isValid()` only in
CompactionStrategy, and add `isMergeEnabled() and merge()` later in the
CompactionStrategy interface if requested.

Regards,
Heesung


On Thu, Nov 3, 2022 at 9:50 AM Heesung Sohn <he...@streamnative.io>
wrote:

> Oops! Michael, I apologize for the typo in your name.
>
> On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <he...@streamnative.io>
> wrote:
>
>> Hi Machel,
>>
>> Here are my additional comments regarding your earlier email.
>>
>> - I updated the PIP title to show that this will impact table view as
>> well.
>>
>> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the
>> general idea of the states and their actions, and I defined the actual
>> states here in the PR,
>> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a. I
>> will further clarify the bundle state data validation logic when
>> introducing `BundleStateCompactionStrategy` class. This PIP is to support
>> CompactionStrategy in general.
>>
>> - Agreed with your point that we should merge CompactionStrategy APIs. I
>> updated the interface proposal in the PIP. I replaced `"isValid",
>> "isMergeEnabled", and "merge"` apis with "compact" api.
>>
>>
>> Thanks,
>> Heesung
>>
>>
>> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <
>> heesung.sohn@streamnative.io> wrote:
>>
>>> Hi,
>>> Thank you for the great comments.
>>> Please find my comments inline too.
>>>
>>> Regards,
>>> Heesung
>>>
>>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <mm...@apache.org>
>>> wrote:
>>>
>>>> > I think we lose a single linearized view.
>>>>
>>>> Which linearized view are we losing, and what is the role of that
>>>> linearized view? I think I might be missing why it is important. I
>>>> agree that consumers won't know about each unsuccessful attempted
>>>> acquisition of a bundle, but that seems like unnecessary information
>>>> to broadcast to every broker in the cluster.
>>>>
>>>>
>>>
>>> PIP-192 proposes an assignment, transfer, and split protocol(multi-phase
>>> state changes), relying on early broadcast across brokers, and all brokers
>>> react to their clients according to the state change notifications --
>>> brokers could defer any client lookups for bundle x if an
>>> assignment/transfer/split is ongoing for x(broadcasted early in the topic).
>>> One early broadcast example is the one that I discussed above, `When the
>>> topic broadcast is faster than the concurrent assignment requests.` I think
>>> the prefilter could delay this early broadcast, as it needs to go through
>>> the additional single-leader compaction path.
>>>
>>> The bundle state recovery process is simpler by a single linearized view.
>>>
>>> The single linearized view can be easier to debug bundle states. We can
>>> more easily track where the assignment requests come from and how it is
>>> compacted in a single linearized view.
>>>
>>>
>>>
>>>> > I think the leader requires a write-through cache to compact messages
>>>> based
>>>> > on the latest states.
>>>>
>>> This brings up an important point that I would like to clarify. If we
>>>> trust the write ahead log as the source of truth, what happens when a
>>>> bundle has been validly owned by multiple brokers? As a broker starts
>>>> and consumes from the compacted topic, how do we prevent it from
>>>> incorrectly thinking that it owns a bundle for some short time period
>>>> in the case that the ownership topic hasn't yet been compacted to
>>>> remove old ownership state?
>>>>
>>>
>>> Since the multi-phase transfer protocol involves the source and
>>> destination broker's actions, the successful transfer should get the source
>>> and destination broker to have the (near) latest state. For example, if
>>> some brokers have old ownership states(network partitioned or delayed),
>>> they will redirect clients to the source(old) broker. However, by the
>>> transfer protocol, the source broker should have the latest state, so it
>>> can redirect the client again to the destination broker.
>>>
>>> When a broker restarts, it won't start until its BSC state to the (near)
>>> latest (til the last known messageId at that time).
>>>
>>>
>>>> > Pulsar guarantees "a single writer".
>>>>
>>>> I didn't think we were using a single writer in the PIP 192 design. I
>>>> thought we had many producers sending events to a compacted topic. My
>>>> proposal would still have many producers, but the writer to bookkeeper
>>>> would act as the single writer. It would technically be distinct from
>>>> a normal Pulsar topic producer.
>>>>
>>>> I should highlight that I am only proposing "broker filtering before
>>>> write" in the context of PIP 192 and as an alternative to adding
>>>> pluggable compaction strategies. It would not be a generic feature.
>>>>
>>>>
>>> I was worried about the worst case where two producers(leaders) happen
>>> to write the compacted topic (although Pulsar can guarantee "a single
>>> writer" or "a single producer" for a topic in normal situations).
>>>
>>>
>>>
>>>> > Could we clarify how to handle
>>>> > the following(edge cases and failure recovery)?
>>>> > 0. Is the un-compacted topic a persistent topic or a non-persistent
>>>> topic?
>>>>
>>>> It is a persistent topic.
>>>>
>>>> > 1. How does the leader recover state from the two topics?
>>>>
>>>> A leader would recover state by first consuming the whole compacted
>>>> topic and then by consuming from the current location of a cursor on
>>>> the first input topic. As stated elsewhere, this introduces latency
>>>> and could be an issue.
>>>>
>>>> > 2. How do we handle the case when the leader fails before writing
>>>> messages
>>>> > to the compacted topic
>>>>
>>>> The leader would not acknowledge the message on the input topic until
>>>> it has successfully persisted the event on the compacted topic.
>>>> Publishing the same event to a compacted topic multiple times is
>>>> idempotent, so there is no risk of lost state. The real risk is
>>>> latency. However, I think we might have similar (though not the same)
>>>> latency risks in the current solution.
>>>>
>>>> > Analysis: the "pre-filter + two-topics" option can reduce the number
>>>> of
>>>> > messages to broadcast at the expense of the leader broker compaction.
>>>>
>>>> My primary point is that with this PIP's design, the filter logic is
>>>> run on every broker and again during topic compaction. With the
>>>> alternative design, the filter is run once.
>>>>
>>>> Thank you for the clarification.
>>>
>>> I think the difference is that the post-filter is an optimistic approach
>>> as it optimistically relies on the "broadcast-filter" effect(brokers will
>>> defer client lookups if notified ahead that any assignment is ongoing for
>>> bundle x). Yes, in the worst case, if the broadcast is slower, each broker
>>> needs to individually compact the conflicting assignment requests.
>>>
>>> Conversely, one downside of the pessimistic approach (single leader
>>> pre-filter) is that when there are not many conflict concurrent assignment
>>> requests(assign for bundle a, assign for bundle b, assign for bundle c...),
>>> the requests need to redundantly go through the leader compaction.
>>>
>>>
>>>
>>>> > 3. initially less complex to implement (leaderless conflict
>>>> resolution and
>>>> > requires a single topic)
>>>>
>>>> PIP 215 has its own complexity too. Coordinating filters
>>>> on both the client (table view) and the server (compaction) is non
>>>> trivial. The proposed API includes hard coded client configuration for
>>>> each component, which will make upgrading the version of the
>>>> compaction strategy complicated, and could lead to incorrect
>>>> interpretation of events in the stream. When a single broker is doing
>>>> the filtering, versioning is no longer a distributed problem. That
>>>> being said, I do not mean to suggest my solution is without
>>>> complexity.
>>>>
>>>> > 4. it is not a "one-way door" decision (we could add the pre-filter
>>>> logic
>>>> > as well later)
>>>>
>>>> It's fair to say that we could add it later, but at that point, we
>>>> will have added this new API for compaction strategy. Are we confident
>>>> that pluggable compaction is independently an important addition to
>>>> Pulsar's
>>>> features, or would it make sense to make this API only exposed in the
>>>> broker?
>>>>
>>>>
>>> The intention is that this compaction feature could be useful for
>>> complex user applications (if they are trying to do a similar thing). As I
>>> mentioned, this feature is closely tied to the PIP-192 now. We are not
>>> planning to expose this feature to users soon unless demanded and proven to
>>> be stable.
>>>
>>>
>>>> Thanks,
>>>> Michael
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
>>>> <he...@streamnative.io.invalid> wrote:
>>>> >
>>>> > Hi,
>>>> >
>>>> > Also, I thought about some concurrent assignment scenarios between
>>>> > pre-filter vs post-filter.
>>>> >
>>>> > Example 1: When the topic broadcast is slower than the concurrent
>>>> > assignment requests
>>>> >
>>>> > With pre-filter + two-topics (non-compacted and compacted topics)
>>>> > t1: A -> non-compacted topic // broker A published a message to the
>>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
>>>> > t2: B -> non-compacted topic // broker B published a message, m2:
>>>> {broker B
>>>> > assigned bundle x to broker C}
>>>> > t3: C -> non-compacted topic // broker C published a message, m3:
>>>> {broker C
>>>> > assigned bundle x to broker B}
>>>> > t4: non-compacted topic -> L // leader broker consumed the messages:
>>>> m1,m2,
>>>> > and m3
>>>> > t5: L -> compacted topic // leader compacted the messages and
>>>> broadcasted
>>>> > m1 to all consumers
>>>> > t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>>>> >
>>>> > With post-filter + a single topic
>>>> > t1: A -> topic // broker A published a message to the non-compacted
>>>> topic,
>>>> > m1: {broker A assigned bundle x to broker A}
>>>> > t2: B -> topic // broker B published a message, m2: {broker B assigned
>>>> > bundle x to broker C}
>>>> > t3: C -> topic // broker C published a message, m3: {broker C assigned
>>>> > bundle x to broker B}
>>>> > t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2,
>>>> and m3
>>>> > t5: [A,B,C] -> m1 // broker A,B,C individually compacted the messages
>>>> to m1.
>>>> >
>>>> > Analysis: the "pre-filter + two-topics" option can reduce the number
>>>> of
>>>> > messages to broadcast at the expense of the leader broker compaction.
>>>> >
>>>> >
>>>> > Example 2: When the topic broadcast is faster than the concurrent
>>>> > assignment requests
>>>> >
>>>> > With pre-filter + two-topics (non-compacted and compacted topics)
>>>> > t1: A -> non-compacted topic // broker A published a message to the
>>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
>>>> > t2: non-compacted topic -> L // leader broker consumed the messages:
>>>> m1
>>>> > t3: L -> compacted topic // leader compacted the message and
>>>> broadcasted m1
>>>> > to all consumers
>>>> > t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>>>> > t5: A-> own bundle // broker A knows that its assignment has been
>>>> accepted,
>>>> > so proceeding to own the bundle (meanwhile deferring lookup requests)
>>>> > t6: B -> defer client lookups // broker B knows that bundle
>>>> assignment is
>>>> > running(meanwhile deferring lookup requests)
>>>> > t7: C -> defer client lookups // broker C knows that bundle
>>>> assignment is
>>>> > running(meanwhile deferring lookup requests)
>>>> >
>>>> > With post-filter + a single topic
>>>> > t1: A -> topic // broker A published a message to the non-compacted
>>>> topic,
>>>> > m1: {broker A assigned bundle x to broker A}
>>>> > t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
>>>> > t3:  A-> own bundle // broker A knows that its assignment has been
>>>> > accepted, so proceeding to own the bundle (meanwhile deferring lookup
>>>> > requests)
>>>> > t4: B -> defer client lookups // broker B knows that bundle
>>>> assignment is
>>>> > running(meanwhile deferring lookup requests)
>>>> > t5: C -> defer client lookups // broker C knows that bundle
>>>> assignment is
>>>> > running(meanwhile deferring lookup requests)
>>>> >
>>>> > Analysis: The "post-filter + a single topic" can perform ok in this
>>>> case
>>>> > without the additional leader coordination and the secondary topic
>>>> because
>>>> > the early broadcast can inform all brokers and prevent them from
>>>> requesting
>>>> > other assignments for the same bundle.
>>>> >
>>>> > I think the post-filter option is initially not bad because:
>>>> >
>>>> > 1. it is safe in the worst case (in case the messages are not
>>>> correctly
>>>> > pre-filtered at the leader)
>>>> > 2. it performs ok because the early broadcast can prevent
>>>> > concurrent assignment requests.
>>>> > 3. initially less complex to implement (leaderless conflict
>>>> resolution and
>>>> > requires a single topic)
>>>> > 4. it is not a "one-way door" decision (we could add the pre-filter
>>>> logic
>>>> > as well later)
>>>> >
>>>> > Regards,
>>>> > Heesung
>>>> >
>>>> >
>>>> >
>>>> > On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
>>>> heesung.sohn@streamnative.io>
>>>> > wrote:
>>>> >
>>>> > > Hi Michael,
>>>> > >
>>>> > > For the pre-prefilter(pre-compaction) option,
>>>> > > I think the leader requires a write-through cache to compact
>>>> messages
>>>> > > based on the latest states. Otherwise, the leader needs to wait for
>>>> the
>>>> > > last msg from the (compacted) topic before compacting the next msg
>>>> for the
>>>> > > same bundle.
>>>> > >
>>>> > > Pulsar guarantees "a single writer". However, for the worst-case
>>>> > > scenario(due to network partitions, bugs in zk or etcd leader
>>>> election,
>>>> > > bugs in bk, data corruption ), I think it is safe to place the
>>>> post-filter
>>>> > > on the consumer side(compaction and table views) as well in order to
>>>> > > validate the state changes.
>>>> > >
>>>> > > For the two-topic approach,
>>>> > > I think we lose a single linearized view. Could we clarify how to
>>>> handle
>>>> > > the following(edge cases and failure recovery)?
>>>> > > 0. Is the un-compacted topic a persistent topic or a non-persistent
>>>> topic?
>>>> > > 1. How does the leader recover state from the two topics?
>>>> > > 2. How do we handle the case when the leader fails before writing
>>>> messages
>>>> > > to the compacted topic
>>>> > >
>>>> > > Regards,
>>>> > > Heesung
>>>> > >
>>>> > > On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
>>>> mmarshall@apache.org>
>>>> > > wrote:
>>>> > >
>>>> > >> Sharing some more thoughts. We could alternatively use two topics
>>>> > >> instead of one. In this design, the first topic is the unfiltered
>>>> > >> write ahead log that represents many writers (brokers) trying to
>>>> > >> acquire ownership of bundles. The second topic is the distilled log
>>>> > >> that represents the "winners" or the "owners" of the bundles.
>>>> There is
>>>> > >> a single writer, the leader broker, that reads from the input topic
>>>> > >> and writes to the output topic. The first topic is normal and the
>>>> > >> second is compacted.
>>>> > >>
>>>> > >> The primary benefit in a two topic solution is that it is easy for
>>>> the
>>>> > >> leader broker to trade off ownership without needing to slow down
>>>> > >> writes to the input topic. The leader broker will start consuming
>>>> from
>>>> > >> the input topic when it has fully consumed the table view on the
>>>> > >> output topic. In general, I don't think consumers know when they
>>>> have
>>>> > >> "reached the end of a table view", but we should be able to
>>>> trivially
>>>> > >> figure this out if we are the topic's only writer and the topic and
>>>> > >> writer are collocated on the same broker.
>>>> > >>
>>>> > >> In that design, it might make sense to use something like the
>>>> > >> replication cursor to keep track of this consumer's state.
>>>> > >>
>>>> > >> - Michael
>>>> > >>
>>>> > >> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
>>>> mmarshall@apache.org>
>>>> > >> wrote:
>>>> > >> >
>>>> > >> > Thanks for your proposal, Heesung.
>>>> > >> >
>>>> > >> > Fundamentally, we have the problems listed in this PIP because
>>>> we have
>>>> > >> > multiple writers instead of just one writer. Can we solve this
>>>> problem
>>>> > >> > by changing our write pattern? What if we use the leader broker
>>>> as the
>>>> > >> > single writer? That broker would intercept attempts to acquire
>>>> > >> > ownership on bundles and would grant ownership to the first
>>>> broker to
>>>> > >> > claim an unassigned bundle. It could "grant ownership" by
>>>> letting the
>>>> > >> > first write to claim an unassigned bundle get written to the
>>>> ownership
>>>> > >> > topic. When a bundle is already owned, the leader won't persist
>>>> that
>>>> > >> > event to the bookkeeper. In this design, the log becomes a true
>>>> > >> > ownership log, which will correctly work with the existing topic
>>>> > >> > compaction and table view solutions. My proposal essentially
>>>> moves the
>>>> > >> > conflict resolution to just before the write, and as a
>>>> consequence, it
>>>> > >> > greatly reduces the need for post processing of the event log.
>>>> One
>>>> > >> > trade off might be that the leader broker could slow down the
>>>> write
>>>> > >> > path, but given that the leader would just need to verify the
>>>> current
>>>> > >> > state of the bundle, I think it'd be performant enough.
>>>> > >> >
>>>> > >> > Additionally, we'd need the leader broker to be "caught up" on
>>>> bundle
>>>> > >> > ownership in order to grant ownership of topics, but unless I am
>>>> > >> > mistaken, that is already a requirement of the current PIP 192
>>>> > >> > paradigm.
>>>> > >> >
>>>> > >> > Below are some additional thoughts that will be relevant if we
>>>> move
>>>> > >> > forward with the design as it is currently proposed.
>>>> > >> >
>>>> > >> > I think it might be helpful to update the title to show that this
>>>> > >> > proposal will also affect table view as well. I didn't catch
>>>> that at
>>>> > >> > first.
>>>> > >> >
>>>> > >> > Do you have any documentation describing how the
>>>> > >> > TopicCompactionStrategy will determine which states are valid in
>>>> the
>>>> > >> > context of load balancing? I looked at
>>>> > >> > https://github.com/apache/pulsar/pull/18195, but I couldn't
>>>> seem to
>>>> > >> > find anything for it. That would help make this proposal less
>>>> > >> > abstract.
>>>> > >> >
>>>> > >> > The proposed API seems very tied to the needs of PIP 192. For
>>>> example,
>>>> > >> > `isValid` is not a term I associate with topic compaction. The
>>>> > >> > fundamental question for compaction is which value to keep (or
>>>> build a
>>>> > >> > new value). I think we might be able to simplify the API by
>>>> replacing
>>>> > >> > the "isValid", "isMergeEnabled", and "merge" methods with a
>>>> single
>>>> > >> > method that lets the implementation handle one or all tasks. That
>>>> > >> > would also remove the need to deserialize payloads multiple
>>>> times too.
>>>> > >> >
>>>> > >> > I also feel like mentioning that after working with the PIP 105
>>>> broker
>>>> > >> > side filtering, I think we should avoid running UDFs in the
>>>> broker as
>>>> > >> > much as possible. (I do not consider the load balancing logic to
>>>> be a
>>>> > >> > UDF here.) I think it would be worth not making this a user
>>>> facing
>>>> > >> > feature unless there is demand for real use cases.
>>>> > >> >
>>>> > >> > Thanks!
>>>> > >> > Michael
>>>> > >> >
>>>> > >> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org> wrote:
>>>> > >> > >
>>>> > >> > > +1(non-binding)
>>>> > >> > >
>>>> > >> > > thanks,
>>>> > >> > > bo
>>>> > >> > >
>>>> > >> > > Heesung Sohn <he...@streamnative.io.invalid>
>>>> 于2022年10月19日周三
>>>> > >> 07:54写道:
>>>> > >> > > >
>>>> > >> > > > Hi pulsar-dev community,
>>>> > >> > > >
>>>> > >> > > > I raised a pip to discuss : PIP-215: Configurable Topic
>>>> Compaction
>>>> > >> Strategy
>>>> > >> > > >
>>>> > >> > > > PIP link: https://github.com/apache/pulsar/issues/18099
>>>> > >> > > >
>>>> > >> > > > Regards,
>>>> > >> > > > Heesung
>>>> > >>
>>>> > >
>>>>
>>>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Oops! Michael, I apologize for the typo in your name.

On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <he...@streamnative.io>
wrote:

> Hi Machel,
>
> Here are my additional comments regarding your earlier email.
>
> - I updated the PIP title to show that this will impact table view as well.
>
> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the
> general idea of the states and their actions, and I defined the actual
> states here in the PR,
> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a. I
> will further clarify the bundle state data validation logic when
> introducing `BundleStateCompactionStrategy` class. This PIP is to support
> CompactionStrategy in general.
>
> - Agreed with your point that we should merge CompactionStrategy APIs. I
> updated the interface proposal in the PIP. I replaced `"isValid",
> "isMergeEnabled", and "merge"` apis with "compact" api.
>
>
> Thanks,
> Heesung
>
>
> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <he...@streamnative.io>
> wrote:
>
>> Hi,
>> Thank you for the great comments.
>> Please find my comments inline too.
>>
>> Regards,
>> Heesung
>>
>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <mm...@apache.org>
>> wrote:
>>
>>> > I think we lose a single linearized view.
>>>
>>> Which linearized view are we losing, and what is the role of that
>>> linearized view? I think I might be missing why it is important. I
>>> agree that consumers won't know about each unsuccessful attempted
>>> acquisition of a bundle, but that seems like unnecessary information
>>> to broadcast to every broker in the cluster.
>>>
>>>
>>
>> PIP-192 proposes an assignment, transfer, and split protocol(multi-phase
>> state changes), relying on early broadcast across brokers, and all brokers
>> react to their clients according to the state change notifications --
>> brokers could defer any client lookups for bundle x if an
>> assignment/transfer/split is ongoing for x(broadcasted early in the topic).
>> One early broadcast example is the one that I discussed above, `When the
>> topic broadcast is faster than the concurrent assignment requests.` I think
>> the prefilter could delay this early broadcast, as it needs to go through
>> the additional single-leader compaction path.
>>
>> The bundle state recovery process is simpler by a single linearized view.
>>
>> The single linearized view can be easier to debug bundle states. We can
>> more easily track where the assignment requests come from and how it is
>> compacted in a single linearized view.
>>
>>
>>
>>> > I think the leader requires a write-through cache to compact messages
>>> based
>>> > on the latest states.
>>>
>> This brings up an important point that I would like to clarify. If we
>>> trust the write ahead log as the source of truth, what happens when a
>>> bundle has been validly owned by multiple brokers? As a broker starts
>>> and consumes from the compacted topic, how do we prevent it from
>>> incorrectly thinking that it owns a bundle for some short time period
>>> in the case that the ownership topic hasn't yet been compacted to
>>> remove old ownership state?
>>>
>>
>> Since the multi-phase transfer protocol involves the source and
>> destination broker's actions, the successful transfer should get the source
>> and destination broker to have the (near) latest state. For example, if
>> some brokers have old ownership states(network partitioned or delayed),
>> they will redirect clients to the source(old) broker. However, by the
>> transfer protocol, the source broker should have the latest state, so it
>> can redirect the client again to the destination broker.
>>
>> When a broker restarts, it won't start until its BSC state to the (near)
>> latest (til the last known messageId at that time).
>>
>>
>>> > Pulsar guarantees "a single writer".
>>>
>>> I didn't think we were using a single writer in the PIP 192 design. I
>>> thought we had many producers sending events to a compacted topic. My
>>> proposal would still have many producers, but the writer to bookkeeper
>>> would act as the single writer. It would technically be distinct from
>>> a normal Pulsar topic producer.
>>>
>>> I should highlight that I am only proposing "broker filtering before
>>> write" in the context of PIP 192 and as an alternative to adding
>>> pluggable compaction strategies. It would not be a generic feature.
>>>
>>>
>> I was worried about the worst case where two producers(leaders) happen to
>> write the compacted topic (although Pulsar can guarantee "a single writer"
>> or "a single producer" for a topic in normal situations).
>>
>>
>>
>>> > Could we clarify how to handle
>>> > the following(edge cases and failure recovery)?
>>> > 0. Is the un-compacted topic a persistent topic or a non-persistent
>>> topic?
>>>
>>> It is a persistent topic.
>>>
>>> > 1. How does the leader recover state from the two topics?
>>>
>>> A leader would recover state by first consuming the whole compacted
>>> topic and then by consuming from the current location of a cursor on
>>> the first input topic. As stated elsewhere, this introduces latency
>>> and could be an issue.
>>>
>>> > 2. How do we handle the case when the leader fails before writing
>>> messages
>>> > to the compacted topic
>>>
>>> The leader would not acknowledge the message on the input topic until
>>> it has successfully persisted the event on the compacted topic.
>>> Publishing the same event to a compacted topic multiple times is
>>> idempotent, so there is no risk of lost state. The real risk is
>>> latency. However, I think we might have similar (though not the same)
>>> latency risks in the current solution.
>>>
>>> > Analysis: the "pre-filter + two-topics" option can reduce the number of
>>> > messages to broadcast at the expense of the leader broker compaction.
>>>
>>> My primary point is that with this PIP's design, the filter logic is
>>> run on every broker and again during topic compaction. With the
>>> alternative design, the filter is run once.
>>>
>>> Thank you for the clarification.
>>
>> I think the difference is that the post-filter is an optimistic approach
>> as it optimistically relies on the "broadcast-filter" effect(brokers will
>> defer client lookups if notified ahead that any assignment is ongoing for
>> bundle x). Yes, in the worst case, if the broadcast is slower, each broker
>> needs to individually compact the conflicting assignment requests.
>>
>> Conversely, one downside of the pessimistic approach (single leader
>> pre-filter) is that when there are not many conflict concurrent assignment
>> requests(assign for bundle a, assign for bundle b, assign for bundle c...),
>> the requests need to redundantly go through the leader compaction.
>>
>>
>>
>>> > 3. initially less complex to implement (leaderless conflict resolution
>>> and
>>> > requires a single topic)
>>>
>>> PIP 215 has its own complexity too. Coordinating filters
>>> on both the client (table view) and the server (compaction) is non
>>> trivial. The proposed API includes hard coded client configuration for
>>> each component, which will make upgrading the version of the
>>> compaction strategy complicated, and could lead to incorrect
>>> interpretation of events in the stream. When a single broker is doing
>>> the filtering, versioning is no longer a distributed problem. That
>>> being said, I do not mean to suggest my solution is without
>>> complexity.
>>>
>>> > 4. it is not a "one-way door" decision (we could add the pre-filter
>>> logic
>>> > as well later)
>>>
>>> It's fair to say that we could add it later, but at that point, we
>>> will have added this new API for compaction strategy. Are we confident
>>> that pluggable compaction is independently an important addition to
>>> Pulsar's
>>> features, or would it make sense to make this API only exposed in the
>>> broker?
>>>
>>>
>> The intention is that this compaction feature could be useful for complex
>> user applications (if they are trying to do a similar thing). As I
>> mentioned, this feature is closely tied to the PIP-192 now. We are not
>> planning to expose this feature to users soon unless demanded and proven to
>> be stable.
>>
>>
>>> Thanks,
>>> Michael
>>>
>>>
>>>
>>>
>>>
>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
>>> <he...@streamnative.io.invalid> wrote:
>>> >
>>> > Hi,
>>> >
>>> > Also, I thought about some concurrent assignment scenarios between
>>> > pre-filter vs post-filter.
>>> >
>>> > Example 1: When the topic broadcast is slower than the concurrent
>>> > assignment requests
>>> >
>>> > With pre-filter + two-topics (non-compacted and compacted topics)
>>> > t1: A -> non-compacted topic // broker A published a message to the
>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
>>> > t2: B -> non-compacted topic // broker B published a message, m2:
>>> {broker B
>>> > assigned bundle x to broker C}
>>> > t3: C -> non-compacted topic // broker C published a message, m3:
>>> {broker C
>>> > assigned bundle x to broker B}
>>> > t4: non-compacted topic -> L // leader broker consumed the messages:
>>> m1,m2,
>>> > and m3
>>> > t5: L -> compacted topic // leader compacted the messages and
>>> broadcasted
>>> > m1 to all consumers
>>> > t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>>> >
>>> > With post-filter + a single topic
>>> > t1: A -> topic // broker A published a message to the non-compacted
>>> topic,
>>> > m1: {broker A assigned bundle x to broker A}
>>> > t2: B -> topic // broker B published a message, m2: {broker B assigned
>>> > bundle x to broker C}
>>> > t3: C -> topic // broker C published a message, m3: {broker C assigned
>>> > bundle x to broker B}
>>> > t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2, and
>>> m3
>>> > t5: [A,B,C] -> m1 // broker A,B,C individually compacted the messages
>>> to m1.
>>> >
>>> > Analysis: the "pre-filter + two-topics" option can reduce the number of
>>> > messages to broadcast at the expense of the leader broker compaction.
>>> >
>>> >
>>> > Example 2: When the topic broadcast is faster than the concurrent
>>> > assignment requests
>>> >
>>> > With pre-filter + two-topics (non-compacted and compacted topics)
>>> > t1: A -> non-compacted topic // broker A published a message to the
>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
>>> > t2: non-compacted topic -> L // leader broker consumed the messages: m1
>>> > t3: L -> compacted topic // leader compacted the message and
>>> broadcasted m1
>>> > to all consumers
>>> > t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>>> > t5: A-> own bundle // broker A knows that its assignment has been
>>> accepted,
>>> > so proceeding to own the bundle (meanwhile deferring lookup requests)
>>> > t6: B -> defer client lookups // broker B knows that bundle assignment
>>> is
>>> > running(meanwhile deferring lookup requests)
>>> > t7: C -> defer client lookups // broker C knows that bundle assignment
>>> is
>>> > running(meanwhile deferring lookup requests)
>>> >
>>> > With post-filter + a single topic
>>> > t1: A -> topic // broker A published a message to the non-compacted
>>> topic,
>>> > m1: {broker A assigned bundle x to broker A}
>>> > t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
>>> > t3:  A-> own bundle // broker A knows that its assignment has been
>>> > accepted, so proceeding to own the bundle (meanwhile deferring lookup
>>> > requests)
>>> > t4: B -> defer client lookups // broker B knows that bundle assignment
>>> is
>>> > running(meanwhile deferring lookup requests)
>>> > t5: C -> defer client lookups // broker C knows that bundle assignment
>>> is
>>> > running(meanwhile deferring lookup requests)
>>> >
>>> > Analysis: The "post-filter + a single topic" can perform ok in this
>>> case
>>> > without the additional leader coordination and the secondary topic
>>> because
>>> > the early broadcast can inform all brokers and prevent them from
>>> requesting
>>> > other assignments for the same bundle.
>>> >
>>> > I think the post-filter option is initially not bad because:
>>> >
>>> > 1. it is safe in the worst case (in case the messages are not correctly
>>> > pre-filtered at the leader)
>>> > 2. it performs ok because the early broadcast can prevent
>>> > concurrent assignment requests.
>>> > 3. initially less complex to implement (leaderless conflict resolution
>>> and
>>> > requires a single topic)
>>> > 4. it is not a "one-way door" decision (we could add the pre-filter
>>> logic
>>> > as well later)
>>> >
>>> > Regards,
>>> > Heesung
>>> >
>>> >
>>> >
>>> > On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
>>> heesung.sohn@streamnative.io>
>>> > wrote:
>>> >
>>> > > Hi Michael,
>>> > >
>>> > > For the pre-prefilter(pre-compaction) option,
>>> > > I think the leader requires a write-through cache to compact messages
>>> > > based on the latest states. Otherwise, the leader needs to wait for
>>> the
>>> > > last msg from the (compacted) topic before compacting the next msg
>>> for the
>>> > > same bundle.
>>> > >
>>> > > Pulsar guarantees "a single writer". However, for the worst-case
>>> > > scenario(due to network partitions, bugs in zk or etcd leader
>>> election,
>>> > > bugs in bk, data corruption ), I think it is safe to place the
>>> post-filter
>>> > > on the consumer side(compaction and table views) as well in order to
>>> > > validate the state changes.
>>> > >
>>> > > For the two-topic approach,
>>> > > I think we lose a single linearized view. Could we clarify how to
>>> handle
>>> > > the following(edge cases and failure recovery)?
>>> > > 0. Is the un-compacted topic a persistent topic or a non-persistent
>>> topic?
>>> > > 1. How does the leader recover state from the two topics?
>>> > > 2. How do we handle the case when the leader fails before writing
>>> messages
>>> > > to the compacted topic
>>> > >
>>> > > Regards,
>>> > > Heesung
>>> > >
>>> > > On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
>>> mmarshall@apache.org>
>>> > > wrote:
>>> > >
>>> > >> Sharing some more thoughts. We could alternatively use two topics
>>> > >> instead of one. In this design, the first topic is the unfiltered
>>> > >> write ahead log that represents many writers (brokers) trying to
>>> > >> acquire ownership of bundles. The second topic is the distilled log
>>> > >> that represents the "winners" or the "owners" of the bundles. There
>>> is
>>> > >> a single writer, the leader broker, that reads from the input topic
>>> > >> and writes to the output topic. The first topic is normal and the
>>> > >> second is compacted.
>>> > >>
>>> > >> The primary benefit in a two topic solution is that it is easy for
>>> the
>>> > >> leader broker to trade off ownership without needing to slow down
>>> > >> writes to the input topic. The leader broker will start consuming
>>> from
>>> > >> the input topic when it has fully consumed the table view on the
>>> > >> output topic. In general, I don't think consumers know when they
>>> have
>>> > >> "reached the end of a table view", but we should be able to
>>> trivially
>>> > >> figure this out if we are the topic's only writer and the topic and
>>> > >> writer are collocated on the same broker.
>>> > >>
>>> > >> In that design, it might make sense to use something like the
>>> > >> replication cursor to keep track of this consumer's state.
>>> > >>
>>> > >> - Michael
>>> > >>
>>> > >> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
>>> mmarshall@apache.org>
>>> > >> wrote:
>>> > >> >
>>> > >> > Thanks for your proposal, Heesung.
>>> > >> >
>>> > >> > Fundamentally, we have the problems listed in this PIP because we
>>> have
>>> > >> > multiple writers instead of just one writer. Can we solve this
>>> problem
>>> > >> > by changing our write pattern? What if we use the leader broker
>>> as the
>>> > >> > single writer? That broker would intercept attempts to acquire
>>> > >> > ownership on bundles and would grant ownership to the first
>>> broker to
>>> > >> > claim an unassigned bundle. It could "grant ownership" by letting
>>> the
>>> > >> > first write to claim an unassigned bundle get written to the
>>> ownership
>>> > >> > topic. When a bundle is already owned, the leader won't persist
>>> that
>>> > >> > event to the bookkeeper. In this design, the log becomes a true
>>> > >> > ownership log, which will correctly work with the existing topic
>>> > >> > compaction and table view solutions. My proposal essentially
>>> moves the
>>> > >> > conflict resolution to just before the write, and as a
>>> consequence, it
>>> > >> > greatly reduces the need for post processing of the event log. One
>>> > >> > trade off might be that the leader broker could slow down the
>>> write
>>> > >> > path, but given that the leader would just need to verify the
>>> current
>>> > >> > state of the bundle, I think it'd be performant enough.
>>> > >> >
>>> > >> > Additionally, we'd need the leader broker to be "caught up" on
>>> bundle
>>> > >> > ownership in order to grant ownership of topics, but unless I am
>>> > >> > mistaken, that is already a requirement of the current PIP 192
>>> > >> > paradigm.
>>> > >> >
>>> > >> > Below are some additional thoughts that will be relevant if we
>>> move
>>> > >> > forward with the design as it is currently proposed.
>>> > >> >
>>> > >> > I think it might be helpful to update the title to show that this
>>> > >> > proposal will also affect table view as well. I didn't catch that
>>> at
>>> > >> > first.
>>> > >> >
>>> > >> > Do you have any documentation describing how the
>>> > >> > TopicCompactionStrategy will determine which states are valid in
>>> the
>>> > >> > context of load balancing? I looked at
>>> > >> > https://github.com/apache/pulsar/pull/18195, but I couldn't seem
>>> to
>>> > >> > find anything for it. That would help make this proposal less
>>> > >> > abstract.
>>> > >> >
>>> > >> > The proposed API seems very tied to the needs of PIP 192. For
>>> example,
>>> > >> > `isValid` is not a term I associate with topic compaction. The
>>> > >> > fundamental question for compaction is which value to keep (or
>>> build a
>>> > >> > new value). I think we might be able to simplify the API by
>>> replacing
>>> > >> > the "isValid", "isMergeEnabled", and "merge" methods with a single
>>> > >> > method that lets the implementation handle one or all tasks. That
>>> > >> > would also remove the need to deserialize payloads multiple times
>>> too.
>>> > >> >
>>> > >> > I also feel like mentioning that after working with the PIP 105
>>> broker
>>> > >> > side filtering, I think we should avoid running UDFs in the
>>> broker as
>>> > >> > much as possible. (I do not consider the load balancing logic to
>>> be a
>>> > >> > UDF here.) I think it would be worth not making this a user facing
>>> > >> > feature unless there is demand for real use cases.
>>> > >> >
>>> > >> > Thanks!
>>> > >> > Michael
>>> > >> >
>>> > >> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org> wrote:
>>> > >> > >
>>> > >> > > +1(non-binding)
>>> > >> > >
>>> > >> > > thanks,
>>> > >> > > bo
>>> > >> > >
>>> > >> > > Heesung Sohn <he...@streamnative.io.invalid>
>>> 于2022年10月19日周三
>>> > >> 07:54写道:
>>> > >> > > >
>>> > >> > > > Hi pulsar-dev community,
>>> > >> > > >
>>> > >> > > > I raised a pip to discuss : PIP-215: Configurable Topic
>>> Compaction
>>> > >> Strategy
>>> > >> > > >
>>> > >> > > > PIP link: https://github.com/apache/pulsar/issues/18099
>>> > >> > > >
>>> > >> > > > Regards,
>>> > >> > > > Heesung
>>> > >>
>>> > >
>>>
>>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Hi Machel,

Here are my additional comments regarding your earlier email.

- I updated the PIP title to show that this will impact table view as well.

- PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the general
idea of the states and their actions, and I defined the actual states here
in the PR,
https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a.
I
will further clarify the bundle state data validation logic when
introducing `BundleStateCompactionStrategy` class. This PIP is to support
CompactionStrategy in general.

- Agreed with your point that we should merge CompactionStrategy APIs. I
updated the interface proposal in the PIP. I replaced `"isValid",
"isMergeEnabled", and "merge"` apis with "compact" api.


Thanks,
Heesung


On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <he...@streamnative.io>
wrote:

> Hi,
> Thank you for the great comments.
> Please find my comments inline too.
>
> Regards,
> Heesung
>
> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <mm...@apache.org>
> wrote:
>
>> > I think we lose a single linearized view.
>>
>> Which linearized view are we losing, and what is the role of that
>> linearized view? I think I might be missing why it is important. I
>> agree that consumers won't know about each unsuccessful attempted
>> acquisition of a bundle, but that seems like unnecessary information
>> to broadcast to every broker in the cluster.
>>
>>
>
> PIP-192 proposes an assignment, transfer, and split protocol(multi-phase
> state changes), relying on early broadcast across brokers, and all brokers
> react to their clients according to the state change notifications --
> brokers could defer any client lookups for bundle x if an
> assignment/transfer/split is ongoing for x(broadcasted early in the topic).
> One early broadcast example is the one that I discussed above, `When the
> topic broadcast is faster than the concurrent assignment requests.` I think
> the prefilter could delay this early broadcast, as it needs to go through
> the additional single-leader compaction path.
>
> The bundle state recovery process is simpler by a single linearized view.
>
> The single linearized view can be easier to debug bundle states. We can
> more easily track where the assignment requests come from and how it is
> compacted in a single linearized view.
>
>
>
>> > I think the leader requires a write-through cache to compact messages
>> based
>> > on the latest states.
>>
> This brings up an important point that I would like to clarify. If we
>> trust the write ahead log as the source of truth, what happens when a
>> bundle has been validly owned by multiple brokers? As a broker starts
>> and consumes from the compacted topic, how do we prevent it from
>> incorrectly thinking that it owns a bundle for some short time period
>> in the case that the ownership topic hasn't yet been compacted to
>> remove old ownership state?
>>
>
> Since the multi-phase transfer protocol involves the source and
> destination broker's actions, the successful transfer should get the source
> and destination broker to have the (near) latest state. For example, if
> some brokers have old ownership states(network partitioned or delayed),
> they will redirect clients to the source(old) broker. However, by the
> transfer protocol, the source broker should have the latest state, so it
> can redirect the client again to the destination broker.
>
> When a broker restarts, it won't start until its BSC state to the (near)
> latest (til the last known messageId at that time).
>
>
>> > Pulsar guarantees "a single writer".
>>
>> I didn't think we were using a single writer in the PIP 192 design. I
>> thought we had many producers sending events to a compacted topic. My
>> proposal would still have many producers, but the writer to bookkeeper
>> would act as the single writer. It would technically be distinct from
>> a normal Pulsar topic producer.
>>
>> I should highlight that I am only proposing "broker filtering before
>> write" in the context of PIP 192 and as an alternative to adding
>> pluggable compaction strategies. It would not be a generic feature.
>>
>>
> I was worried about the worst case where two producers(leaders) happen to
> write the compacted topic (although Pulsar can guarantee "a single writer"
> or "a single producer" for a topic in normal situations).
>
>
>
>> > Could we clarify how to handle
>> > the following(edge cases and failure recovery)?
>> > 0. Is the un-compacted topic a persistent topic or a non-persistent
>> topic?
>>
>> It is a persistent topic.
>>
>> > 1. How does the leader recover state from the two topics?
>>
>> A leader would recover state by first consuming the whole compacted
>> topic and then by consuming from the current location of a cursor on
>> the first input topic. As stated elsewhere, this introduces latency
>> and could be an issue.
>>
>> > 2. How do we handle the case when the leader fails before writing
>> messages
>> > to the compacted topic
>>
>> The leader would not acknowledge the message on the input topic until
>> it has successfully persisted the event on the compacted topic.
>> Publishing the same event to a compacted topic multiple times is
>> idempotent, so there is no risk of lost state. The real risk is
>> latency. However, I think we might have similar (though not the same)
>> latency risks in the current solution.
>>
>> > Analysis: the "pre-filter + two-topics" option can reduce the number of
>> > messages to broadcast at the expense of the leader broker compaction.
>>
>> My primary point is that with this PIP's design, the filter logic is
>> run on every broker and again during topic compaction. With the
>> alternative design, the filter is run once.
>>
>> Thank you for the clarification.
>
> I think the difference is that the post-filter is an optimistic approach
> as it optimistically relies on the "broadcast-filter" effect(brokers will
> defer client lookups if notified ahead that any assignment is ongoing for
> bundle x). Yes, in the worst case, if the broadcast is slower, each broker
> needs to individually compact the conflicting assignment requests.
>
> Conversely, one downside of the pessimistic approach (single leader
> pre-filter) is that when there are not many conflict concurrent assignment
> requests(assign for bundle a, assign for bundle b, assign for bundle c...),
> the requests need to redundantly go through the leader compaction.
>
>
>
>> > 3. initially less complex to implement (leaderless conflict resolution
>> and
>> > requires a single topic)
>>
>> PIP 215 has its own complexity too. Coordinating filters
>> on both the client (table view) and the server (compaction) is non
>> trivial. The proposed API includes hard coded client configuration for
>> each component, which will make upgrading the version of the
>> compaction strategy complicated, and could lead to incorrect
>> interpretation of events in the stream. When a single broker is doing
>> the filtering, versioning is no longer a distributed problem. That
>> being said, I do not mean to suggest my solution is without
>> complexity.
>>
>> > 4. it is not a "one-way door" decision (we could add the pre-filter
>> logic
>> > as well later)
>>
>> It's fair to say that we could add it later, but at that point, we
>> will have added this new API for compaction strategy. Are we confident
>> that pluggable compaction is independently an important addition to
>> Pulsar's
>> features, or would it make sense to make this API only exposed in the
>> broker?
>>
>>
> The intention is that this compaction feature could be useful for complex
> user applications (if they are trying to do a similar thing). As I
> mentioned, this feature is closely tied to the PIP-192 now. We are not
> planning to expose this feature to users soon unless demanded and proven to
> be stable.
>
>
>> Thanks,
>> Michael
>>
>>
>>
>>
>>
>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
>> <he...@streamnative.io.invalid> wrote:
>> >
>> > Hi,
>> >
>> > Also, I thought about some concurrent assignment scenarios between
>> > pre-filter vs post-filter.
>> >
>> > Example 1: When the topic broadcast is slower than the concurrent
>> > assignment requests
>> >
>> > With pre-filter + two-topics (non-compacted and compacted topics)
>> > t1: A -> non-compacted topic // broker A published a message to the
>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
>> > t2: B -> non-compacted topic // broker B published a message, m2:
>> {broker B
>> > assigned bundle x to broker C}
>> > t3: C -> non-compacted topic // broker C published a message, m3:
>> {broker C
>> > assigned bundle x to broker B}
>> > t4: non-compacted topic -> L // leader broker consumed the messages:
>> m1,m2,
>> > and m3
>> > t5: L -> compacted topic // leader compacted the messages and
>> broadcasted
>> > m1 to all consumers
>> > t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>> >
>> > With post-filter + a single topic
>> > t1: A -> topic // broker A published a message to the non-compacted
>> topic,
>> > m1: {broker A assigned bundle x to broker A}
>> > t2: B -> topic // broker B published a message, m2: {broker B assigned
>> > bundle x to broker C}
>> > t3: C -> topic // broker C published a message, m3: {broker C assigned
>> > bundle x to broker B}
>> > t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2, and
>> m3
>> > t5: [A,B,C] -> m1 // broker A,B,C individually compacted the messages
>> to m1.
>> >
>> > Analysis: the "pre-filter + two-topics" option can reduce the number of
>> > messages to broadcast at the expense of the leader broker compaction.
>> >
>> >
>> > Example 2: When the topic broadcast is faster than the concurrent
>> > assignment requests
>> >
>> > With pre-filter + two-topics (non-compacted and compacted topics)
>> > t1: A -> non-compacted topic // broker A published a message to the
>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
>> > t2: non-compacted topic -> L // leader broker consumed the messages: m1
>> > t3: L -> compacted topic // leader compacted the message and
>> broadcasted m1
>> > to all consumers
>> > t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>> > t5: A-> own bundle // broker A knows that its assignment has been
>> accepted,
>> > so proceeding to own the bundle (meanwhile deferring lookup requests)
>> > t6: B -> defer client lookups // broker B knows that bundle assignment
>> is
>> > running(meanwhile deferring lookup requests)
>> > t7: C -> defer client lookups // broker C knows that bundle assignment
>> is
>> > running(meanwhile deferring lookup requests)
>> >
>> > With post-filter + a single topic
>> > t1: A -> topic // broker A published a message to the non-compacted
>> topic,
>> > m1: {broker A assigned bundle x to broker A}
>> > t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
>> > t3:  A-> own bundle // broker A knows that its assignment has been
>> > accepted, so proceeding to own the bundle (meanwhile deferring lookup
>> > requests)
>> > t4: B -> defer client lookups // broker B knows that bundle assignment
>> is
>> > running(meanwhile deferring lookup requests)
>> > t5: C -> defer client lookups // broker C knows that bundle assignment
>> is
>> > running(meanwhile deferring lookup requests)
>> >
>> > Analysis: The "post-filter + a single topic" can perform ok in this case
>> > without the additional leader coordination and the secondary topic
>> because
>> > the early broadcast can inform all brokers and prevent them from
>> requesting
>> > other assignments for the same bundle.
>> >
>> > I think the post-filter option is initially not bad because:
>> >
>> > 1. it is safe in the worst case (in case the messages are not correctly
>> > pre-filtered at the leader)
>> > 2. it performs ok because the early broadcast can prevent
>> > concurrent assignment requests.
>> > 3. initially less complex to implement (leaderless conflict resolution
>> and
>> > requires a single topic)
>> > 4. it is not a "one-way door" decision (we could add the pre-filter
>> logic
>> > as well later)
>> >
>> > Regards,
>> > Heesung
>> >
>> >
>> >
>> > On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
>> heesung.sohn@streamnative.io>
>> > wrote:
>> >
>> > > Hi Michael,
>> > >
>> > > For the pre-prefilter(pre-compaction) option,
>> > > I think the leader requires a write-through cache to compact messages
>> > > based on the latest states. Otherwise, the leader needs to wait for
>> the
>> > > last msg from the (compacted) topic before compacting the next msg
>> for the
>> > > same bundle.
>> > >
>> > > Pulsar guarantees "a single writer". However, for the worst-case
>> > > scenario(due to network partitions, bugs in zk or etcd leader
>> election,
>> > > bugs in bk, data corruption ), I think it is safe to place the
>> post-filter
>> > > on the consumer side(compaction and table views) as well in order to
>> > > validate the state changes.
>> > >
>> > > For the two-topic approach,
>> > > I think we lose a single linearized view. Could we clarify how to
>> handle
>> > > the following(edge cases and failure recovery)?
>> > > 0. Is the un-compacted topic a persistent topic or a non-persistent
>> topic?
>> > > 1. How does the leader recover state from the two topics?
>> > > 2. How do we handle the case when the leader fails before writing
>> messages
>> > > to the compacted topic
>> > >
>> > > Regards,
>> > > Heesung
>> > >
>> > > On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
>> mmarshall@apache.org>
>> > > wrote:
>> > >
>> > >> Sharing some more thoughts. We could alternatively use two topics
>> > >> instead of one. In this design, the first topic is the unfiltered
>> > >> write ahead log that represents many writers (brokers) trying to
>> > >> acquire ownership of bundles. The second topic is the distilled log
>> > >> that represents the "winners" or the "owners" of the bundles. There
>> is
>> > >> a single writer, the leader broker, that reads from the input topic
>> > >> and writes to the output topic. The first topic is normal and the
>> > >> second is compacted.
>> > >>
>> > >> The primary benefit in a two topic solution is that it is easy for
>> the
>> > >> leader broker to trade off ownership without needing to slow down
>> > >> writes to the input topic. The leader broker will start consuming
>> from
>> > >> the input topic when it has fully consumed the table view on the
>> > >> output topic. In general, I don't think consumers know when they have
>> > >> "reached the end of a table view", but we should be able to trivially
>> > >> figure this out if we are the topic's only writer and the topic and
>> > >> writer are collocated on the same broker.
>> > >>
>> > >> In that design, it might make sense to use something like the
>> > >> replication cursor to keep track of this consumer's state.
>> > >>
>> > >> - Michael
>> > >>
>> > >> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
>> mmarshall@apache.org>
>> > >> wrote:
>> > >> >
>> > >> > Thanks for your proposal, Heesung.
>> > >> >
>> > >> > Fundamentally, we have the problems listed in this PIP because we
>> have
>> > >> > multiple writers instead of just one writer. Can we solve this
>> problem
>> > >> > by changing our write pattern? What if we use the leader broker as
>> the
>> > >> > single writer? That broker would intercept attempts to acquire
>> > >> > ownership on bundles and would grant ownership to the first broker
>> to
>> > >> > claim an unassigned bundle. It could "grant ownership" by letting
>> the
>> > >> > first write to claim an unassigned bundle get written to the
>> ownership
>> > >> > topic. When a bundle is already owned, the leader won't persist
>> that
>> > >> > event to the bookkeeper. In this design, the log becomes a true
>> > >> > ownership log, which will correctly work with the existing topic
>> > >> > compaction and table view solutions. My proposal essentially moves
>> the
>> > >> > conflict resolution to just before the write, and as a
>> consequence, it
>> > >> > greatly reduces the need for post processing of the event log. One
>> > >> > trade off might be that the leader broker could slow down the write
>> > >> > path, but given that the leader would just need to verify the
>> current
>> > >> > state of the bundle, I think it'd be performant enough.
>> > >> >
>> > >> > Additionally, we'd need the leader broker to be "caught up" on
>> bundle
>> > >> > ownership in order to grant ownership of topics, but unless I am
>> > >> > mistaken, that is already a requirement of the current PIP 192
>> > >> > paradigm.
>> > >> >
>> > >> > Below are some additional thoughts that will be relevant if we move
>> > >> > forward with the design as it is currently proposed.
>> > >> >
>> > >> > I think it might be helpful to update the title to show that this
>> > >> > proposal will also affect table view as well. I didn't catch that
>> at
>> > >> > first.
>> > >> >
>> > >> > Do you have any documentation describing how the
>> > >> > TopicCompactionStrategy will determine which states are valid in
>> the
>> > >> > context of load balancing? I looked at
>> > >> > https://github.com/apache/pulsar/pull/18195, but I couldn't seem
>> to
>> > >> > find anything for it. That would help make this proposal less
>> > >> > abstract.
>> > >> >
>> > >> > The proposed API seems very tied to the needs of PIP 192. For
>> example,
>> > >> > `isValid` is not a term I associate with topic compaction. The
>> > >> > fundamental question for compaction is which value to keep (or
>> build a
>> > >> > new value). I think we might be able to simplify the API by
>> replacing
>> > >> > the "isValid", "isMergeEnabled", and "merge" methods with a single
>> > >> > method that lets the implementation handle one or all tasks. That
>> > >> > would also remove the need to deserialize payloads multiple times
>> too.
>> > >> >
>> > >> > I also feel like mentioning that after working with the PIP 105
>> broker
>> > >> > side filtering, I think we should avoid running UDFs in the broker
>> as
>> > >> > much as possible. (I do not consider the load balancing logic to
>> be a
>> > >> > UDF here.) I think it would be worth not making this a user facing
>> > >> > feature unless there is demand for real use cases.
>> > >> >
>> > >> > Thanks!
>> > >> > Michael
>> > >> >
>> > >> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org> wrote:
>> > >> > >
>> > >> > > +1(non-binding)
>> > >> > >
>> > >> > > thanks,
>> > >> > > bo
>> > >> > >
>> > >> > > Heesung Sohn <he...@streamnative.io.invalid>
>> 于2022年10月19日周三
>> > >> 07:54写道:
>> > >> > > >
>> > >> > > > Hi pulsar-dev community,
>> > >> > > >
>> > >> > > > I raised a pip to discuss : PIP-215: Configurable Topic
>> Compaction
>> > >> Strategy
>> > >> > > >
>> > >> > > > PIP link: https://github.com/apache/pulsar/issues/18099
>> > >> > > >
>> > >> > > > Regards,
>> > >> > > > Heesung
>> > >>
>> > >
>>
>

Re: [DISCUSS] PIP-215: Configurable Topic Compaction Strategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Hi,
Thank you for the great comments.
Please find my comments inline too.

Regards,
Heesung

On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <mm...@apache.org>
wrote:

> > I think we lose a single linearized view.
>
> Which linearized view are we losing, and what is the role of that
> linearized view? I think I might be missing why it is important. I
> agree that consumers won't know about each unsuccessful attempted
> acquisition of a bundle, but that seems like unnecessary information
> to broadcast to every broker in the cluster.
>
>

PIP-192 proposes an assignment, transfer, and split protocol(multi-phase
state changes), relying on early broadcast across brokers, and all brokers
react to their clients according to the state change notifications --
brokers could defer any client lookups for bundle x if an
assignment/transfer/split is ongoing for x(broadcasted early in the topic).
One early broadcast example is the one that I discussed above, `When the
topic broadcast is faster than the concurrent assignment requests.` I think
the prefilter could delay this early broadcast, as it needs to go through
the additional single-leader compaction path.

The bundle state recovery process is simpler by a single linearized view.

The single linearized view can be easier to debug bundle states. We can
more easily track where the assignment requests come from and how it is
compacted in a single linearized view.



> > I think the leader requires a write-through cache to compact messages
> based
> > on the latest states.
>
This brings up an important point that I would like to clarify. If we
> trust the write ahead log as the source of truth, what happens when a
> bundle has been validly owned by multiple brokers? As a broker starts
> and consumes from the compacted topic, how do we prevent it from
> incorrectly thinking that it owns a bundle for some short time period
> in the case that the ownership topic hasn't yet been compacted to
> remove old ownership state?
>

Since the multi-phase transfer protocol involves the source and destination
broker's actions, the successful transfer should get the source and
destination broker to have the (near) latest state. For example, if some
brokers have old ownership states(network partitioned or delayed), they
will redirect clients to the source(old) broker. However, by the transfer
protocol, the source broker should have the latest state, so it can
redirect the client again to the destination broker.

When a broker restarts, it won't start until its BSC state to the (near)
latest (til the last known messageId at that time).


> > Pulsar guarantees "a single writer".
>
> I didn't think we were using a single writer in the PIP 192 design. I
> thought we had many producers sending events to a compacted topic. My
> proposal would still have many producers, but the writer to bookkeeper
> would act as the single writer. It would technically be distinct from
> a normal Pulsar topic producer.
>
> I should highlight that I am only proposing "broker filtering before
> write" in the context of PIP 192 and as an alternative to adding
> pluggable compaction strategies. It would not be a generic feature.
>
>
I was worried about the worst case where two producers(leaders) happen to
write the compacted topic (although Pulsar can guarantee "a single writer"
or "a single producer" for a topic in normal situations).



> > Could we clarify how to handle
> > the following(edge cases and failure recovery)?
> > 0. Is the un-compacted topic a persistent topic or a non-persistent
> topic?
>
> It is a persistent topic.
>
> > 1. How does the leader recover state from the two topics?
>
> A leader would recover state by first consuming the whole compacted
> topic and then by consuming from the current location of a cursor on
> the first input topic. As stated elsewhere, this introduces latency
> and could be an issue.
>
> > 2. How do we handle the case when the leader fails before writing
> messages
> > to the compacted topic
>
> The leader would not acknowledge the message on the input topic until
> it has successfully persisted the event on the compacted topic.
> Publishing the same event to a compacted topic multiple times is
> idempotent, so there is no risk of lost state. The real risk is
> latency. However, I think we might have similar (though not the same)
> latency risks in the current solution.
>
> > Analysis: the "pre-filter + two-topics" option can reduce the number of
> > messages to broadcast at the expense of the leader broker compaction.
>
> My primary point is that with this PIP's design, the filter logic is
> run on every broker and again during topic compaction. With the
> alternative design, the filter is run once.
>
> Thank you for the clarification.

I think the difference is that the post-filter is an optimistic approach as
it optimistically relies on the "broadcast-filter" effect(brokers will
defer client lookups if notified ahead that any assignment is ongoing for
bundle x). Yes, in the worst case, if the broadcast is slower, each broker
needs to individually compact the conflicting assignment requests.

Conversely, one downside of the pessimistic approach (single leader
pre-filter) is that when there are not many conflict concurrent assignment
requests(assign for bundle a, assign for bundle b, assign for bundle c...),
the requests need to redundantly go through the leader compaction.



> > 3. initially less complex to implement (leaderless conflict resolution
> and
> > requires a single topic)
>
> PIP 215 has its own complexity too. Coordinating filters
> on both the client (table view) and the server (compaction) is non
> trivial. The proposed API includes hard coded client configuration for
> each component, which will make upgrading the version of the
> compaction strategy complicated, and could lead to incorrect
> interpretation of events in the stream. When a single broker is doing
> the filtering, versioning is no longer a distributed problem. That
> being said, I do not mean to suggest my solution is without
> complexity.
>
> > 4. it is not a "one-way door" decision (we could add the pre-filter logic
> > as well later)
>
> It's fair to say that we could add it later, but at that point, we
> will have added this new API for compaction strategy. Are we confident
> that pluggable compaction is independently an important addition to
> Pulsar's
> features, or would it make sense to make this API only exposed in the
> broker?
>
>
The intention is that this compaction feature could be useful for complex
user applications (if they are trying to do a similar thing). As I
mentioned, this feature is closely tied to the PIP-192 now. We are not
planning to expose this feature to users soon unless demanded and proven to
be stable.


> Thanks,
> Michael
>
>
>
>
>
> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
> <he...@streamnative.io.invalid> wrote:
> >
> > Hi,
> >
> > Also, I thought about some concurrent assignment scenarios between
> > pre-filter vs post-filter.
> >
> > Example 1: When the topic broadcast is slower than the concurrent
> > assignment requests
> >
> > With pre-filter + two-topics (non-compacted and compacted topics)
> > t1: A -> non-compacted topic // broker A published a message to the
> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
> > t2: B -> non-compacted topic // broker B published a message, m2:
> {broker B
> > assigned bundle x to broker C}
> > t3: C -> non-compacted topic // broker C published a message, m3:
> {broker C
> > assigned bundle x to broker B}
> > t4: non-compacted topic -> L // leader broker consumed the messages:
> m1,m2,
> > and m3
> > t5: L -> compacted topic // leader compacted the messages and broadcasted
> > m1 to all consumers
> > t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
> >
> > With post-filter + a single topic
> > t1: A -> topic // broker A published a message to the non-compacted
> topic,
> > m1: {broker A assigned bundle x to broker A}
> > t2: B -> topic // broker B published a message, m2: {broker B assigned
> > bundle x to broker C}
> > t3: C -> topic // broker C published a message, m3: {broker C assigned
> > bundle x to broker B}
> > t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2, and m3
> > t5: [A,B,C] -> m1 // broker A,B,C individually compacted the messages to
> m1.
> >
> > Analysis: the "pre-filter + two-topics" option can reduce the number of
> > messages to broadcast at the expense of the leader broker compaction.
> >
> >
> > Example 2: When the topic broadcast is faster than the concurrent
> > assignment requests
> >
> > With pre-filter + two-topics (non-compacted and compacted topics)
> > t1: A -> non-compacted topic // broker A published a message to the
> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
> > t2: non-compacted topic -> L // leader broker consumed the messages: m1
> > t3: L -> compacted topic // leader compacted the message and broadcasted
> m1
> > to all consumers
> > t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
> > t5: A-> own bundle // broker A knows that its assignment has been
> accepted,
> > so proceeding to own the bundle (meanwhile deferring lookup requests)
> > t6: B -> defer client lookups // broker B knows that bundle assignment is
> > running(meanwhile deferring lookup requests)
> > t7: C -> defer client lookups // broker C knows that bundle assignment is
> > running(meanwhile deferring lookup requests)
> >
> > With post-filter + a single topic
> > t1: A -> topic // broker A published a message to the non-compacted
> topic,
> > m1: {broker A assigned bundle x to broker A}
> > t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
> > t3:  A-> own bundle // broker A knows that its assignment has been
> > accepted, so proceeding to own the bundle (meanwhile deferring lookup
> > requests)
> > t4: B -> defer client lookups // broker B knows that bundle assignment is
> > running(meanwhile deferring lookup requests)
> > t5: C -> defer client lookups // broker C knows that bundle assignment is
> > running(meanwhile deferring lookup requests)
> >
> > Analysis: The "post-filter + a single topic" can perform ok in this case
> > without the additional leader coordination and the secondary topic
> because
> > the early broadcast can inform all brokers and prevent them from
> requesting
> > other assignments for the same bundle.
> >
> > I think the post-filter option is initially not bad because:
> >
> > 1. it is safe in the worst case (in case the messages are not correctly
> > pre-filtered at the leader)
> > 2. it performs ok because the early broadcast can prevent
> > concurrent assignment requests.
> > 3. initially less complex to implement (leaderless conflict resolution
> and
> > requires a single topic)
> > 4. it is not a "one-way door" decision (we could add the pre-filter logic
> > as well later)
> >
> > Regards,
> > Heesung
> >
> >
> >
> > On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
> heesung.sohn@streamnative.io>
> > wrote:
> >
> > > Hi Michael,
> > >
> > > For the pre-prefilter(pre-compaction) option,
> > > I think the leader requires a write-through cache to compact messages
> > > based on the latest states. Otherwise, the leader needs to wait for the
> > > last msg from the (compacted) topic before compacting the next msg for
> the
> > > same bundle.
> > >
> > > Pulsar guarantees "a single writer". However, for the worst-case
> > > scenario(due to network partitions, bugs in zk or etcd leader election,
> > > bugs in bk, data corruption ), I think it is safe to place the
> post-filter
> > > on the consumer side(compaction and table views) as well in order to
> > > validate the state changes.
> > >
> > > For the two-topic approach,
> > > I think we lose a single linearized view. Could we clarify how to
> handle
> > > the following(edge cases and failure recovery)?
> > > 0. Is the un-compacted topic a persistent topic or a non-persistent
> topic?
> > > 1. How does the leader recover state from the two topics?
> > > 2. How do we handle the case when the leader fails before writing
> messages
> > > to the compacted topic
> > >
> > > Regards,
> > > Heesung
> > >
> > > On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <mmarshall@apache.org
> >
> > > wrote:
> > >
> > >> Sharing some more thoughts. We could alternatively use two topics
> > >> instead of one. In this design, the first topic is the unfiltered
> > >> write ahead log that represents many writers (brokers) trying to
> > >> acquire ownership of bundles. The second topic is the distilled log
> > >> that represents the "winners" or the "owners" of the bundles. There is
> > >> a single writer, the leader broker, that reads from the input topic
> > >> and writes to the output topic. The first topic is normal and the
> > >> second is compacted.
> > >>
> > >> The primary benefit in a two topic solution is that it is easy for the
> > >> leader broker to trade off ownership without needing to slow down
> > >> writes to the input topic. The leader broker will start consuming from
> > >> the input topic when it has fully consumed the table view on the
> > >> output topic. In general, I don't think consumers know when they have
> > >> "reached the end of a table view", but we should be able to trivially
> > >> figure this out if we are the topic's only writer and the topic and
> > >> writer are collocated on the same broker.
> > >>
> > >> In that design, it might make sense to use something like the
> > >> replication cursor to keep track of this consumer's state.
> > >>
> > >> - Michael
> > >>
> > >> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
> mmarshall@apache.org>
> > >> wrote:
> > >> >
> > >> > Thanks for your proposal, Heesung.
> > >> >
> > >> > Fundamentally, we have the problems listed in this PIP because we
> have
> > >> > multiple writers instead of just one writer. Can we solve this
> problem
> > >> > by changing our write pattern? What if we use the leader broker as
> the
> > >> > single writer? That broker would intercept attempts to acquire
> > >> > ownership on bundles and would grant ownership to the first broker
> to
> > >> > claim an unassigned bundle. It could "grant ownership" by letting
> the
> > >> > first write to claim an unassigned bundle get written to the
> ownership
> > >> > topic. When a bundle is already owned, the leader won't persist that
> > >> > event to the bookkeeper. In this design, the log becomes a true
> > >> > ownership log, which will correctly work with the existing topic
> > >> > compaction and table view solutions. My proposal essentially moves
> the
> > >> > conflict resolution to just before the write, and as a consequence,
> it
> > >> > greatly reduces the need for post processing of the event log. One
> > >> > trade off might be that the leader broker could slow down the write
> > >> > path, but given that the leader would just need to verify the
> current
> > >> > state of the bundle, I think it'd be performant enough.
> > >> >
> > >> > Additionally, we'd need the leader broker to be "caught up" on
> bundle
> > >> > ownership in order to grant ownership of topics, but unless I am
> > >> > mistaken, that is already a requirement of the current PIP 192
> > >> > paradigm.
> > >> >
> > >> > Below are some additional thoughts that will be relevant if we move
> > >> > forward with the design as it is currently proposed.
> > >> >
> > >> > I think it might be helpful to update the title to show that this
> > >> > proposal will also affect table view as well. I didn't catch that at
> > >> > first.
> > >> >
> > >> > Do you have any documentation describing how the
> > >> > TopicCompactionStrategy will determine which states are valid in the
> > >> > context of load balancing? I looked at
> > >> > https://github.com/apache/pulsar/pull/18195, but I couldn't seem to
> > >> > find anything for it. That would help make this proposal less
> > >> > abstract.
> > >> >
> > >> > The proposed API seems very tied to the needs of PIP 192. For
> example,
> > >> > `isValid` is not a term I associate with topic compaction. The
> > >> > fundamental question for compaction is which value to keep (or
> build a
> > >> > new value). I think we might be able to simplify the API by
> replacing
> > >> > the "isValid", "isMergeEnabled", and "merge" methods with a single
> > >> > method that lets the implementation handle one or all tasks. That
> > >> > would also remove the need to deserialize payloads multiple times
> too.
> > >> >
> > >> > I also feel like mentioning that after working with the PIP 105
> broker
> > >> > side filtering, I think we should avoid running UDFs in the broker
> as
> > >> > much as possible. (I do not consider the load balancing logic to be
> a
> > >> > UDF here.) I think it would be worth not making this a user facing
> > >> > feature unless there is demand for real use cases.
> > >> >
> > >> > Thanks!
> > >> > Michael
> > >> >
> > >> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bo...@apache.org> wrote:
> > >> > >
> > >> > > +1(non-binding)
> > >> > >
> > >> > > thanks,
> > >> > > bo
> > >> > >
> > >> > > Heesung Sohn <he...@streamnative.io.invalid>
> 于2022年10月19日周三
> > >> 07:54写道:
> > >> > > >
> > >> > > > Hi pulsar-dev community,
> > >> > > >
> > >> > > > I raised a pip to discuss : PIP-215: Configurable Topic
> Compaction
> > >> Strategy
> > >> > > >
> > >> > > > PIP link: https://github.com/apache/pulsar/issues/18099
> > >> > > >
> > >> > > > Regards,
> > >> > > > Heesung
> > >>
> > >
>