You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Joe F <jo...@gmail.com> on 2019/04/29 20:56:53 UTC

Re: [DISCUSS] PIP 33: Replicated subscriptions

I have suggestions for an alternate solution.

If source message-ids were known for replicated messages, a composite
cursor can be maintained for replicated subscriptions as an n-tuple.  Since
messages are ordered from a source, it would be possible to restart from a
known cursor n-tuple in any cluster by  a combination of cursor
positioning  _and_ filtering

A simple way to approximate this is for each cluster to insert its own
ticker marks into the topic. A ticker carries the messsage id as the
message body. The ticker mark can be inserted every 't ' time interval or
every 'n' messages as needed.

The n-tuple of the tickers from each cluster is a well-known state  that
can be re-started anywhere by proper positioning and filtering

That is a simpler solution for users to understand and trouble-shoot.  It
would be resilient to cluster failures, and does NOT require all clusters
to be up, to determine cursor position. No cross-cluster
communication/ordering is needed.

 But it will require skipping messages from specific sources as needed, and
storing  the n-tuple as part of cursor state

Joe

On Mon, Mar 25, 2019 at 10:24 PM Sijie Guo <gu...@gmail.com> wrote:

> On Mon, Mar 25, 2019 at 4:14 PM Matteo Merli <mm...@apache.org> wrote:
>
> > On Sun, Mar 24, 2019 at 9:54 PM Sijie Guo <gu...@gmail.com> wrote:
> > >
> > > Ivan, Matteo, thank you for the writeup!
> > >
> > > I have a few more questions
> > >
> > > - How does this handle ack individual vs ack cumulatively? It seems it
> is
> > > ignored at this moment. But it is good to have some discussions around
> > how
> > > to extend the approach to support them and how easy to do that.
> >
> > Yes, it's stated that current proposal only replicates the mark-delete
> > position.
> > (Will clarify it better in the wiki)
> >
> > Of course the markers approach works well with cumulative acks (since
> they
> > just moves the mark-delete position), but it will work with
> > individual-acks too
> > in most of the scenarios.
> >
> > Keep in mind that in all cases a cluster failover will involve some
> number
> > of duplicates (configurable with the frequency of the snapshot).
> >
> > With individual acks, if all messages are acked within a short amount of
> > time,
> > for example, 1 second, comparable to the snapshot frequency, then there
> > will be no problem and no practical difference from the cumulative ack
> > scenario.
> >
> > Conversely, if some messages can stay unacked for much longer amount
> > of time,  while other messages are being acked, that will lead to a
> larger
> > amount of duplicates during cluster failover.
> >
> > Regarding at how support this case better, I replied below in the
> > "alternative
> > design" answer.
> >
> > > - Do we need to change the dispatcher, and what are the changes?
> >
> > This approach does not require any change in the dispatcher code. The
> > only change in the consumer handling is to filter out the marker messages
> > since they don't need to go back to consumers.
> >
>
>
> How does this
>
>
> >
> > > - If a region goes down, the approach can't take any snapshots. Does it
> > > mean "acknowledge" will be kind of suspended until the the region is
> > > brought back? I guess it is related to how dispatcher is changed to
> react
> > > this snapshot.  It it unclear to me from the proposal. It would be good
> > if
> > > we have more clarifications around it.
> >
> > First off, to clarify, this issue is only relevant when there are 3 or
> > more clusters
> > in the replication set.
> >
> > If one of the cluster is not reachable, the snapshots will not be
> > taken. A consumer
> > will still keep acknowledging locally but these acks won't be
> > replicated in the other
> > clusters. Therefore in case of a cluster failover, the subscription
> > will be rolled back
> > to a much earlier position.
>
>
> > This is not a problem with 2 clusters since if the other cluster is down,
> > we
> > we cannot failover to it anyway.
> >
>
> The question here will be more about how to fail back. If a snapshot is not
> taken, then nothing is *exchanged*
> between the clusters. How does this proposal handle failing back?
>
> In other words, what are the sequences for people to do failover and
> failback?
> It would be good to have an example to demonstrate the sequences, so that
> users will have a clear picture on how to use this feature.
>
>
> >
> > When we have 3+ clusters, though, we can only sustain 1 cluster
> > failure because, after that,
> > the snapshot will not make progress.
> >
> > Typically, though, the purpose of replicated subscriptions is to have
> > the option to fail out
> > of a failed cluster, which in this case it will work.
> >
> > What it won't work would be failing over from A to C when cluster B is
> > down. To define "won't work"
> > is that consumer will go to C but will find the cursor to an older
> > position. No data loss, but potentially
> > a big number of dups.
> >
> > This is to protect for the case of messages exchanged between B and C
> > clusters before B cluster
> > went down.
> >
> > In practice we can have operation tools to do a manually override and
> > ensure snapshots are
> > taken. It would be interesting to see how this feature would be
> > getting used and what the
> > operational pain points will be, before overthinking these problems
> > upfront and dig too much
> > in a direction that might not be too relevant in practice.
> >
> > > Have you guys considered any other alternatives? If you have considered
> > > other alternatives, it might be worth listing out the alternatives for
> > > comparisons.
> >
> > (good point will add to the wiki)
> >
> > Yes, the main other approach would be to attach the "Original-Message-Id"
> > when
> > replicating a message.
> > That would allow to basically keep an additional range-set based on
> > the Original-Message-Id
> > as well as the local message id.
> >
> > The main drawbacks of this approach (compared to the proposal) are:
> >   * It would only work for individual-acks but not for cumulative acks
> >   * We need to make more changes to propagate the OriginalMessageId to
> > consumers,
> >      so that when it acks we don't have to maintain a mapping
> >
> > This require some bit of changes in the rangeset tracker. It shouldn't
> > be terribly hard to
> > do though, and it's a very "localized" change (easy to create
> > substantial amount of unit
> > tests around this custom data structure).
> >
> > The idea is that this approach would be a good candidate to extend the
> > current proposal
> > to support individual-acks on top of the markers to move the
> > mark-delete position.
> >
> > > A lot of the challenges are introduced because messages are interleaved
> > > during cross-region replication. Interleaving might be working for some
> > > cross-region setup and failover strategies. But it also has challenges
> in
> > > supporting other features. I think it might be a good time to rethink
> the
> > > cross-region replication and allow different types of cross-region
> > > replication mechanism exists. So we can introduce a new cross-region
> > > replication mechanism which avoid interleaving (e.g. via having a
> > > per-region partition), so that people can choose which replication
> > > mechanism to use based on their cross-region setup and requirements.
> > >
> > > In a non-interleaved cross-region replication mechanism, we can do
> > > precisely entry-to-entry replication at managed ledger level (via BK's
> > > LedgerHandleAdv) and maintain 1:1 message id mapping during
> replication.
> > > Once we can maintain 1:1 message id mapping, the replicated
> subscription
> > > can be simply done via replicating cursors. This approach can also
> > provide
> > > a few other benefits like a replicated subscription can selectively
> > consume
> > > messages from a given region only (local region or a remote region). It
> > > also provides more flexibilities for consumers to do region failover.
> >
> > I think there are few challenges in the per-region approach:
> >
> >  1. This would remove a fundamental guarantee that currently Pulsar
> > replicated topics
> >      provide: consistency within a single region.
> >      If we have N logs instead of 1 single log, there's no way to
> > replay the log and
> >      reach the same state, or to have 2 consumers in same region to be
> > consistent.
> >      This would break several applications currently running on Pulsar.
> >
>
> currently Pulsar only guarantees per-region and per partition ordering.
> so technically it doesn't really change any behaviors. However if people
> needs an aggregated order of messages from multiple regions, then you
> required interleaved replication. that's why I didn't propose changing
> the current geo-replication mechanism, instead I propose adding a new
> replication mechanism.
>
>
>
> >  2. The topic metadata size will get multiplied by N, with N being the
> > number of clusters.
> >      With 8 clusters and lot of topics that would break some existing
> > deployments.
> >
>
> Correct. For most of deployments, N is bound to 2~3 clusters. So adding a
> new replication mechanism
> can probably address the problems for most of the deployments.
>
>
> >  3. Even if we specify the same entryId when republishing, we would
> > have to ensure
> >      to use a perfectly mirrored ledger in all the regions. What would
> > happen if a
> >      ledger is fenced in one of the regions?
> >
> > This would be on top of a very substantial amount of changes in core
> parts
> > of the code that would have be to tested at scale and lastly to figure
> out
> > a compatible live migration path.
> >
>
> Sure. That's why I propose as adding a new replication mechanism not
> changing original one.
>
>
> >
> > >  so that people can choose which replication
> > > mechanism to use based on their cross-region setup and requirements.
> >
> > Applications can already chose to keep separated the topic, if they wish
> > so. You
> > just need to make sure to write to a different topic in each region
> > and subscribe
> > to all.
> >
>
> Sure. Applications can do so by themselves.
>
>
> >
> >
> > I believe the current proposal is a much less risky approach that can be
> > easily
> > implemented on top of the existing replication infrastructure, providing
> a
> > solution that will benefits a lot of use cases from day 1, with, of
> > course, room
> > to get improved down the road.
> >
>
> Sure. I didn't mean proposing another alternative.
>

Re: [DISCUSS] PIP 33: Replicated subscriptions

Posted by Ivan Kelly <iv...@apache.org>.
Joe, Matteo

What was the conclusion of this discussion? Was there a conclusion?
Has the algorithm used in the implementation changed from the initial
draft?

-Ivan

On Fri, May 3, 2019 at 4:00 AM Joe F <jo...@apache.org> wrote:
>
> Let me try to explain my suggestion better.
>
> First, about positions in an ordered stream:  Consider a simple stream  of
> events when there is no identifier  on each event about its relative
> position in the stream. But after every 'n' events there is a ticker event
> carrying a monotonically increasing sequence id.
>
>                       For eg: after, say, every 4th event,  the stream
> generator inserts a ticker  into the stream.
>                       Then the stream will be like a, a, a,  a,  (a1),  a,
> a, a, a, (a2), a, a, a, a (a3) a .... and so on.
>                     A reader reading can establish it's position based on
> these "ticker" events. (like freeway mile markers)
>
> Assertion 1: A ticker position in the stream is deterministic across all
> copies of the stream, if all copies have the same event order .
> This means reading can be  resumed across copies of the same stream , since
> positions are deterministic. For eg: if reader on copy X says I am at the
> ticker (a2), then the readers position is at (a2) in every other copy, Y or
> Z  . The reader can   stop at reading at (a8) in copy X and resume at (a8)
> in copy Y,  and do so without loss of events.
>
>  Second, consider a  merge operator that merges 'n'  _ordered_ input
> streams and produce one output stream.
>
> The operator can be modeled as being fed with with 'n' input readers and
> emitting one output. There is no buffering, If the operator gets an input,
> it has to write it to the output before it accepts another input from  any
> of its feeders.
>
> This merge operator has 2 properties
>           (1)Input order: the merge operator maintains input order in the
> output
>                            i.e. if input A had (a-x) preceding (a-y), then
> the output of the merge operator has (a-x) preceding (a-y).
>           (2)No output order: different merge operators can produce
> arbitrary output orders across the same input feeds
>                             i.e. No assumptions can be made that, in the
> output,  (a-x) will precede (b-y),   [..or (b-z)  or  (c-y) or (c-z) or ..]
>
> Assertion 2: The merge output can then be  represented as  an n-tuple of
> 'n' individual input  positions;  Since each input is an ordered stream,
> the position  within that input sub-stream  is deterministic, and a
> combination of positions on all inputs is deterministic.
>
> It follows that (1) the set of input positions can be transferred from one
> operator to another, and (2) the output will not lose events across such
> transfers  and (3) output order may change across such transfer
>
> Note that there is no assumption or assertions about the _output_ of the
> merge operator. We are only asserting this about the input.
>
> Example
> --------------
> For eg: readers P, Q R, are each  reading output of different merge
> operators. They all process the same three event streams, one generated
> from A, one from B, and one from C.
>
> Then P's merge operator can be represented as three input stream readers
> Pa, Pb, Pc who feed into  the merge operator P.   The operator for P may
> produce a different output than the operator for Q,  (say because input
> readers may progress at different speeds in each operator), but each  input
> stream reader position is deterministic (by Assertion 1)
>
> If P has a position at [ Pa(a8), Pb(b1), Pc(c3)] ,  Q has an equivalent
> position [Qa(a8), Qb(b1), Qc(c1)] for its input readers.
>           Reader on Q can set up input readers .. Qa to a(8), Qb to b(1)
> and Qc to c(3), to start  feeding into Q
>
> -----------
>
> These two assertions are the invariants in my suggestion. The rest is about
> solving two implementation issues,
>
> (1)  How to map the  input n-tuple of the merge operator into  a  specific
> position in the output of the merge operator
>                         eg: how to map [Pa(x), Pb(y) Pc(z)]  ===>  P(j)
>
> (2) How to resume a reader  across  the _outputs_ of  two merge operators,
> without loss of events (and as little duplication),  when there are fed
> with the same input, but at different rates.
>                      eg: If  [Pa(x), Pb(y) Pc(z)] ==> P(j) , then find
>  position Q(r) <==== [Pa(x), Pb(y) Pc(z)],
>
> And my thinking is that these two things can be solved similar to the
> existing proposal.
>
> On Wed, May 1, 2019 at 4:10 PM Matteo Merli <mm...@apache.org> wrote:
>
> > On Mon, Apr 29, 2019 at 1:57 PM Joe F <jo...@gmail.com> wrote:
> > >
> > > I have suggestions for an alternate solution.
> > >
> > > If source message-ids were known for replicated messages, a composite
> > > cursor can be maintained for replicated subscriptions as an n-tuple.
> > Since
> > > messages are ordered from a source, it would be possible to restart from
> > a
> > > known cursor n-tuple in any cluster by  a combination of cursor
> > > positioning  _and_ filtering
> >
> > Knowing the source message id alone is not enough to establish the
> > order relationship across all the clusters. I think that would only
> > work in the 2 clusters scenario.
> >
> > In general, for each message being written in region A, both locally
> > published or replicated from other regions, we'd have to associate the
> > highest (original from the source) message id. While that could be
> > easy in the simplest case (broker maintains hashmap of the highest
> > source message id from each region), it becomes more difficult to
> > consider failure cases in which we have to reconstruct that hashmap
> > from the log.
> >
> > Also, that would require to modify each message before persisting it,
> > in the target region.
> >
> > Finally, the N-tuple of message ids would have to either:
> >  * Have a mapping in the broker (local-message -> N-tuple)
> >  * Be pushed to consumers so that they will ack with the complete context
> >
> > >
> > > A simple way to approximate this is for each cluster to insert its own
> > > ticker marks into the topic. A ticker carries the messsage id as the
> > > message body. The ticker mark can be inserted every 't ' time interval or
> > > every 'n' messages as needed.
> > >
> > > The n-tuple of the tickers from each cluster is a well-known state  that
> > > can be re-started anywhere by proper positioning and filtering
> > >
> > > That is a simpler solution for users to understand and trouble-shoot.  It
> > > would be resilient to cluster failures, and does NOT require all clusters
> > > to be up, to determine cursor position. No cross-cluster
> > > communication/ordering is needed.
> >
> > I don't think this approach can remove the requirement of "all the
> > clusters to be up" because the information in A won't have any context
> > on what was exchanged between B and C.
> >
> > One quick example:
> >  * Message c2 was replicated to B but not A. Then C goes offline.
> >  * A tuple will be something like (a3, b4, c1)
> >  * In region B, b4 was actually written *after* c2, but c2 doesn't get
> > sent from B to A, since it was already replicated itself.
> >  * If we do a failover from A to B considering a3 ~= b4 we would be
> > missing the message c2
> >

Re: [DISCUSS] PIP 33: Replicated subscriptions

Posted by Joe F <jo...@apache.org>.
Let me try to explain my suggestion better.

First, about positions in an ordered stream:  Consider a simple stream  of
events when there is no identifier  on each event about its relative
position in the stream. But after every 'n' events there is a ticker event
carrying a monotonically increasing sequence id.

                      For eg: after, say, every 4th event,  the stream
generator inserts a ticker  into the stream.
                      Then the stream will be like a, a, a,  a,  (a1),  a,
a, a, a, (a2), a, a, a, a (a3) a .... and so on.
                    A reader reading can establish it's position based on
these "ticker" events. (like freeway mile markers)

Assertion 1: A ticker position in the stream is deterministic across all
copies of the stream, if all copies have the same event order .
This means reading can be  resumed across copies of the same stream , since
positions are deterministic. For eg: if reader on copy X says I am at the
ticker (a2), then the readers position is at (a2) in every other copy, Y or
Z  . The reader can   stop at reading at (a8) in copy X and resume at (a8)
in copy Y,  and do so without loss of events.

 Second, consider a  merge operator that merges 'n'  _ordered_ input
streams and produce one output stream.

The operator can be modeled as being fed with with 'n' input readers and
emitting one output. There is no buffering, If the operator gets an input,
it has to write it to the output before it accepts another input from  any
of its feeders.

This merge operator has 2 properties
          (1)Input order: the merge operator maintains input order in the
output
                           i.e. if input A had (a-x) preceding (a-y), then
the output of the merge operator has (a-x) preceding (a-y).
          (2)No output order: different merge operators can produce
arbitrary output orders across the same input feeds
                            i.e. No assumptions can be made that, in the
output,  (a-x) will precede (b-y),   [..or (b-z)  or  (c-y) or (c-z) or ..]

Assertion 2: The merge output can then be  represented as  an n-tuple of
'n' individual input  positions;  Since each input is an ordered stream,
the position  within that input sub-stream  is deterministic, and a
combination of positions on all inputs is deterministic.

It follows that (1) the set of input positions can be transferred from one
operator to another, and (2) the output will not lose events across such
transfers  and (3) output order may change across such transfer

Note that there is no assumption or assertions about the _output_ of the
merge operator. We are only asserting this about the input.

Example
--------------
For eg: readers P, Q R, are each  reading output of different merge
operators. They all process the same three event streams, one generated
from A, one from B, and one from C.

Then P's merge operator can be represented as three input stream readers
Pa, Pb, Pc who feed into  the merge operator P.   The operator for P may
produce a different output than the operator for Q,  (say because input
readers may progress at different speeds in each operator), but each  input
stream reader position is deterministic (by Assertion 1)

If P has a position at [ Pa(a8), Pb(b1), Pc(c3)] ,  Q has an equivalent
position [Qa(a8), Qb(b1), Qc(c1)] for its input readers.
          Reader on Q can set up input readers .. Qa to a(8), Qb to b(1)
and Qc to c(3), to start  feeding into Q

-----------

These two assertions are the invariants in my suggestion. The rest is about
solving two implementation issues,

(1)  How to map the  input n-tuple of the merge operator into  a  specific
position in the output of the merge operator
                        eg: how to map [Pa(x), Pb(y) Pc(z)]  ===>  P(j)

(2) How to resume a reader  across  the _outputs_ of  two merge operators,
without loss of events (and as little duplication),  when there are fed
with the same input, but at different rates.
                     eg: If  [Pa(x), Pb(y) Pc(z)] ==> P(j) , then find
 position Q(r) <==== [Pa(x), Pb(y) Pc(z)],

And my thinking is that these two things can be solved similar to the
existing proposal.

On Wed, May 1, 2019 at 4:10 PM Matteo Merli <mm...@apache.org> wrote:

> On Mon, Apr 29, 2019 at 1:57 PM Joe F <jo...@gmail.com> wrote:
> >
> > I have suggestions for an alternate solution.
> >
> > If source message-ids were known for replicated messages, a composite
> > cursor can be maintained for replicated subscriptions as an n-tuple.
> Since
> > messages are ordered from a source, it would be possible to restart from
> a
> > known cursor n-tuple in any cluster by  a combination of cursor
> > positioning  _and_ filtering
>
> Knowing the source message id alone is not enough to establish the
> order relationship across all the clusters. I think that would only
> work in the 2 clusters scenario.
>
> In general, for each message being written in region A, both locally
> published or replicated from other regions, we'd have to associate the
> highest (original from the source) message id. While that could be
> easy in the simplest case (broker maintains hashmap of the highest
> source message id from each region), it becomes more difficult to
> consider failure cases in which we have to reconstruct that hashmap
> from the log.
>
> Also, that would require to modify each message before persisting it,
> in the target region.
>
> Finally, the N-tuple of message ids would have to either:
>  * Have a mapping in the broker (local-message -> N-tuple)
>  * Be pushed to consumers so that they will ack with the complete context
>
> >
> > A simple way to approximate this is for each cluster to insert its own
> > ticker marks into the topic. A ticker carries the messsage id as the
> > message body. The ticker mark can be inserted every 't ' time interval or
> > every 'n' messages as needed.
> >
> > The n-tuple of the tickers from each cluster is a well-known state  that
> > can be re-started anywhere by proper positioning and filtering
> >
> > That is a simpler solution for users to understand and trouble-shoot.  It
> > would be resilient to cluster failures, and does NOT require all clusters
> > to be up, to determine cursor position. No cross-cluster
> > communication/ordering is needed.
>
> I don't think this approach can remove the requirement of "all the
> clusters to be up" because the information in A won't have any context
> on what was exchanged between B and C.
>
> One quick example:
>  * Message c2 was replicated to B but not A. Then C goes offline.
>  * A tuple will be something like (a3, b4, c1)
>  * In region B, b4 was actually written *after* c2, but c2 doesn't get
> sent from B to A, since it was already replicated itself.
>  * If we do a failover from A to B considering a3 ~= b4 we would be
> missing the message c2
>

Re: [DISCUSS] PIP 33: Replicated subscriptions

Posted by Matteo Merli <mm...@apache.org>.
On Mon, Apr 29, 2019 at 1:57 PM Joe F <jo...@gmail.com> wrote:
>
> I have suggestions for an alternate solution.
>
> If source message-ids were known for replicated messages, a composite
> cursor can be maintained for replicated subscriptions as an n-tuple.  Since
> messages are ordered from a source, it would be possible to restart from a
> known cursor n-tuple in any cluster by  a combination of cursor
> positioning  _and_ filtering

Knowing the source message id alone is not enough to establish the
order relationship across all the clusters. I think that would only
work in the 2 clusters scenario.

In general, for each message being written in region A, both locally
published or replicated from other regions, we'd have to associate the
highest (original from the source) message id. While that could be
easy in the simplest case (broker maintains hashmap of the highest
source message id from each region), it becomes more difficult to
consider failure cases in which we have to reconstruct that hashmap
from the log.

Also, that would require to modify each message before persisting it,
in the target region.

Finally, the N-tuple of message ids would have to either:
 * Have a mapping in the broker (local-message -> N-tuple)
 * Be pushed to consumers so that they will ack with the complete context

>
> A simple way to approximate this is for each cluster to insert its own
> ticker marks into the topic. A ticker carries the messsage id as the
> message body. The ticker mark can be inserted every 't ' time interval or
> every 'n' messages as needed.
>
> The n-tuple of the tickers from each cluster is a well-known state  that
> can be re-started anywhere by proper positioning and filtering
>
> That is a simpler solution for users to understand and trouble-shoot.  It
> would be resilient to cluster failures, and does NOT require all clusters
> to be up, to determine cursor position. No cross-cluster
> communication/ordering is needed.

I don't think this approach can remove the requirement of "all the
clusters to be up" because the information in A won't have any context
on what was exchanged between B and C.

One quick example:
 * Message c2 was replicated to B but not A. Then C goes offline.
 * A tuple will be something like (a3, b4, c1)
 * In region B, b4 was actually written *after* c2, but c2 doesn't get
sent from B to A, since it was already replicated itself.
 * If we do a failover from A to B considering a3 ~= b4 we would be
missing the message c2