You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Baodi Shi <ba...@apache.org> on 2023/03/13 02:52:46 UTC

Re: [DISCUSS] PIP 194 : Pulsar client: seek command add epoch

Any updates on this?

On 2022/09/11 04:18:07 Qiang Huang wrote:
> Hi Asaf, thank you for the very detailed reply.
> 
> > The problem we have today is that while we have sent a request to reset
> the subscription position, the broker decides to:
> > 1. Close the TCP connection which in turn causes the client to clear any
> pending messages it has in the queue.
> > 2. Continue to send messages from the previous position, up to a certain
> point where the broker "shifts gear" and starts sending messages from the
> new position.
> 
> > Since Pulsar doesn't follow a request-response model but has a
> bi-directional protocol, the client can send a command to fetch messages
> using a new session
> > sequence number, while the server can still send messages using the old
> session number. Using the Session Sequence Number the client can't tell the
> > difference between the messages being pushed from the server to it.
> I totally agree with you. I am aware of something wrong in the pip when I
> re-read this part of the code.
> 
> 
> > # What are the issues with this PIP?
> > 1. The PIP decides to solve the problem listed above *only* for exclusive
> > and failover subscriptions where you have only a single consumer. The
> > problem still remains at large with Shared or Key Shared subscriptions.
> > 2. The cost of solving a small portion of the problem is high:
> >     Added Complexity - Adding another field to the protocol, and another
> > thing to check. I believe we should aim to reduce the cognitive load of
> the
> > developers of Pulsar.
> > 3. There are no rejected solutions - We always need to examine all
> > available options and list why we decided against them.
> > 4. Lack of background knowledge (context) - it's super hard IMO to grasp
> > the idea without so much context missing: The client-server protocol
> > pertaining to this PIP, including its async nature, what is an epoch and
> > why it was introduced, what are flow permits. I'm not saying explain all
> > pulsar in this doc, but just include a brief explanation of that
> terminology.
> >
> > # What We Suggest
> >
> > Rethink the solution.
> > 1. The consumer (one of many) will send a seek command to the broker, and
> > at the same time clear its internal queue and wait for a response from the
> > broker.
> > 2. The broker upon receiving the seek command, will
> >      a. Stop sending dispatching messages to consumers.
> >      b. Notify all consumers via a command (new) that the subscription
> > position was asked to be reset. Consumers receiving this command will
> clear
> > their internal queue. The broker will no longer close the TCP connection
> > (with its adverse effects on other consumers and produces "riding" on that
> > connection)
> >      c. Reset the cursor to the newly requested position.
> >      d. Continue dispatching messages from newly requested positions to
> consumers.
> >
> 
> Good suggestions. I'll look into these issues and rethink the solution. I
> will rewrite this pip according to your suggestions.
> Thanks again for your review.
> 
> Asaf Mesika <as...@gmail.com> 于2022年9月7日周三 23:12写道:
> 
> > Hi Qiang,
> >
> > We have a brainstorming session on this PIP over Zoom with Penghui, Hang,
> > and more people, and I'm jotting down here our feedback.
> >
> > Before I do that, I just want to write my own understanding of the
> > document, for other readers:
> >
> > # Context
> > Pulsar, as opposed to other distributed / streaming systems, took the
> > approach of a push model. The client (consumer that is) asks for 1000
> > messages (that's the consumer's remaining capacity in its internal queue)
> > from the broker (that process is named flow permits). The broker was now
> > given permission to send 1000 messages to the client, hence utilizing the
> > TCP connection to send those 1000 messages as they were ready to be sent.
> >
> > The consumer has the ability to request the subscription to reset its
> > position to the requested new position.
> > The problem we have today is that while we have sent a request to reset the
> > subscription position, the broker decides to:
> > 1. Close the TCP connection which in turn causes the client to clear any
> > pending messages it has in the queue.
> > 2. Continue to send messages from the previous position, up to a certain
> > point where the broker "shifts gear" and starts sending messages from the
> > new position.
> >
> > So the problem is that you would expect that after the connection was
> > reset, only messages from the new position will be sent to the consumer,
> > but that doesn't happen.
> >
> > We have to keep in mind, that we have effectively two scenarios here from
> > the point of view of the consumer:
> > 1. Single consumer - It can be due to using an Exclusive subscription, or
> > being a consumer attached to a single topic since the subscription is of
> > type Failover.
> > 2. Multiple consumers - In a Shared or Key Shared subscription types. In
> > this case, one of those consumers can decide to reset the position of the
> > *subscription*. When that happens, the broker decides, again, to reset all
> > existing TCP connections to all consumers upon receiving the seek command,
> > and you would expect any messages sent afterward to be from the new
> > position, which again doesn't happen.
> >
> > Another really important piece of information we need to bring to the
> > context of the reader here is the notion of an epoch. First, the epoch in
> > Pulsar PIPs was introduced in PIP-84. The idea is that every time the
> > client starts a "session" of requesting and receiving messages in response,
> > the client will send a Session Sequence Number, and the server responds to
> > those message requests with the same session sequence number. Since Pulsar
> > doesn't follow a request-response model but has a bi-directional protocol,
> > the client can send a command to fetch messages using a new session
> > sequence number, while the server can still send messages using the old
> > session number. Using the Session Sequence Number the client can't tell the
> > difference between the messages being pushed from the server to it. That
> > Session Sequence Number has the one referred to as Epoch in PIP-84 and also
> > in this PIP.
> > The idea was somehow to demarcate the responses coming from the server
> > based on the commands the client sends as they are *independent* (async).
> >
> > # What are the issues with this PIP?
> > 1. The PIP decides to solve the problem listed above *only* for exclusive
> > and failover subscriptions where you have only a single consumer. The
> > problem still remains at large with Shared or Key Shared subscriptions.
> > 2. The cost of solving a small portion of the problem is high:
> >     Added Complexity - Adding another field to the protocol, and another
> > thing to check. I believe we should aim to reduce the cognitive load of the
> > developers of Pulsar.
> > 3. There are no rejected solutions - We always need to examine all
> > available options and list why we decided against them.
> > 4. Lack of background knowledge (context) - it's super hard IMO to grasp
> > the idea without so much context missing: The client-server protocol
> > pertaining to this PIP, including its async nature, what is an epoch and
> > why it was introduced, what are flow permits. I'm not saying explain all
> > pulsar in this doc, but just include a brief explanation of that
> > terminology.
> >
> > # What We Suggest
> >
> > Rethink the solution.
> > 1. The consumer (one of many) will send a seek command to the broker, and
> > at the same time clear its internal queue and wait for a response from the
> > broker.
> > 2. The broker upon receiving the seek command, will
> >      a. Stop sending dispatching messages to consumers.
> >      b. Notify all consumers via a command (new) that the subscription
> > position was asked to be reset. Consumers receiving this command will clear
> > their internal queue. The broker will no longer close the TCP connection
> > (with its adverse effects on other consumers and produces "riding" on that
> > connection)
> >      c. Reset the cursor to the newly requested position.
> >      d. Continue dispatching messages from newly requested positions to
> > consumers.
> >
> > The disadvantages here are that we need to alter the client to get to know
> > a new command and act accordingly, yet I think that is accidental
> > complexity stemming from the client-server architecture of bi-directional
> > and not request response.
> >
> > Thanks,
> >
> > Asaf
> >
> > On Mon, Aug 1, 2022 at 6:43 AM Qiang Huang <qi...@gmail.com>
> > wrote:
> >
> > > Sure. You can refer to pip-84:
> > >
> > >
> > https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch
> > > .
> > >
> > > Zike Yang <zi...@apache.org> 于2022年7月29日周五 10:22写道:
> > >
> > > > Hi, Qiang
> > > >
> > > > > It is necessary to check the current cursor status when handling
> > > > flowPermits
> > > > > request from the server side. If the server is handling seek request,
> > > it
> > > > > should ignore flowPermits request because the request is illegal.
> > > >
> > > > Thanks for your explanation. I think it's better to add this
> > > > explanation to the PIP.
> > > >
> > > > > The reconnected consumer can regard as a new consumer with new epoch.
> > > >
> > > > The consumer will reconnect to the broker during the seek operation.
> > > > And this will change the existing behavior. It doesn't seem to make
> > > > sense. Please correct me if I have misunderstood.
> > > >
> > > > Thanks,
> > > > Zike Yang
> > > >
> > > > On Wed, Jul 27, 2022 at 8:06 PM Qiang Huang <qiang.huang1991@gmail.com
> > >
> > > > wrote:
> > > > >
> > > > > Thanks Zike.
> > > > > > > - stage 1: Check the current cursor status when handling
> > > flowPermits
> > > > > from
> > > > > > > the server side.
> > > > >
> > > > > > > Could you explain more details on this step? It looks like there
> > is
> > > > > not much described above. What kind of status needs to be checked,
> > and
> > > > > what kind of behavior will the broker take?
> > > > > It is necessary to check the current cursor status when handling
> > > > flowPermits
> > > > > request from the server side. If the server is handling seek request,
> > > it
> > > > > should ignore flowPermits request because the request is illegal.
> > > > >
> > > > >
> > > > > > > 1. Consumer reconnect need reset epoch.
> > > > > >> Why do we need to reset the epoch when the consumer reconnects?
> > > > > The reconnected consumer can regard as a new consumer with new epoch.
> > > >
> > >
> > >
> > > --
> > > BR,
> > > Qiang Huang
> > >
> >
> 
> 
> -- 
> BR,
> Qiang Huang
>