You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Qiang Huang <qi...@gmail.com> on 2022/07/24 14:24:18 UTC

[DISCUSS] PIP 194 : Pulsar client: seek command add epoch

Hi Pulsar community:
I open a pip to discuss "Pulsar client: seek command add epoch"
Proposal Link:

   - issue link: https://github.com/apache/pulsar/issues/16757

--
## Motivation
`Reader` belongs to exclusive subscription type, and it uses `nonDurable`
cursor. After receiving messages, `Reader` will ack cumulatively
immediately.
The `flowPermits` are triggered in multiple scenarios from the client side
and it is isolated from `seek` of `Consumer`. Therefore, it is possibile
that `flowPermits` will execute after `seek` from the client side, like the
following flow chart.

[image: image.png]

When `handleSeek` processing is delay from the server side, the `MarkDelete
position` is modified in a wrong way.
The expected result is that `Reader`can re-consume messages from `mark
delete:(1,1)` after `seek`. But it doesn't work.

Pulsar read message and seek position is not a synchronous operation, the
seek request can't prevent an in-process entry reading operation. The
client-side also has an opportunity to receive messages after the seek
position.

Pulsar client make read messages operation and seek position operation
synchronized so add an epoch into server and client consumer.  After client
reader consumer invoke `seek` , the epoch increase 1 and send `seek`
 command carry the epoch and then server consumer will update the epoch.
When dispatcher messages to client will carry the epoch which the cursor
read at the time. Client consumer will filter the send messages command
which is smaller than current epoch.
In this way, after the client consumer send `seek` command successfully,
because it has passed the epoch filtering, the consumer will not receive a
message with a messageID greater than the user previously seek position.


### Current implementation details
#### CommandSeek Protocal
```proto
// Reset an existing consumer to a particular message id
message CommandSeek {
    required uint64 consumer_id = 1;
    required uint64 request_id  = 2;

    optional MessageIdData message_id = 3;
    optional uint64 message_publish_time = 4;
}
```
### CommandMessage
```proto
message CommandMessage {
    required uint64 consumer_id       = 1;
    required MessageIdData message_id = 2;
    optional uint32 redelivery_count  = 3 [default = 0];
    repeated int64 ack_set = 4;
    optional uint64 epoch = 5 [default = 0];
}
```
`CommandMessage` already add epoch by [PIP-84](
https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch)
, when client receive `CommandMessage` will compare the command epoch and
local epoch to handle this command.

## Goal
Add epoch into seek command.

## API Changes
### Protocal change: CommandSeek
```proto
// Reset an existing consumer to a particular message id
message CommandSeek {
    required uint64 consumer_id = 1;
    required uint64 request_id  = 2;

    optional MessageIdData message_id = 3;
    optional uint64 message_publish_time = 4;
    optional uint64 consumer_epoch = 5;
}
```
`CommandSeek` command add epoch field, when client send seek command to
server successfully, the server will change the server consumer epoch to
the command epoch. The epoch only can bigger than the old epoch in server.
Now the client can filter out the message which contains less consumer
epoch.

## Implementation
- stage 1: Check the current cursor status when handling flowPermits from
the server side.
- stage 2: Add epoch into seek command, and server update the consumer
epoch. It can prevent an in-process entry reading operation after the seek
request.

## Reject Alternatives
None yet.

## Note
1. Consumer reconnect need reset epoch.

-- 
BR,
Qiang Huang

Re: [DISCUSS] PIP 194 : Pulsar client: seek command add epoch

Posted by Baodi Shi <ba...@apache.org>.
Any updates on this?

On 2022/09/11 04:18:07 Qiang Huang wrote:
> Hi Asaf, thank you for the very detailed reply.
> 
> > The problem we have today is that while we have sent a request to reset
> the subscription position, the broker decides to:
> > 1. Close the TCP connection which in turn causes the client to clear any
> pending messages it has in the queue.
> > 2. Continue to send messages from the previous position, up to a certain
> point where the broker "shifts gear" and starts sending messages from the
> new position.
> 
> > Since Pulsar doesn't follow a request-response model but has a
> bi-directional protocol, the client can send a command to fetch messages
> using a new session
> > sequence number, while the server can still send messages using the old
> session number. Using the Session Sequence Number the client can't tell the
> > difference between the messages being pushed from the server to it.
> I totally agree with you. I am aware of something wrong in the pip when I
> re-read this part of the code.
> 
> 
> > # What are the issues with this PIP?
> > 1. The PIP decides to solve the problem listed above *only* for exclusive
> > and failover subscriptions where you have only a single consumer. The
> > problem still remains at large with Shared or Key Shared subscriptions.
> > 2. The cost of solving a small portion of the problem is high:
> >     Added Complexity - Adding another field to the protocol, and another
> > thing to check. I believe we should aim to reduce the cognitive load of
> the
> > developers of Pulsar.
> > 3. There are no rejected solutions - We always need to examine all
> > available options and list why we decided against them.
> > 4. Lack of background knowledge (context) - it's super hard IMO to grasp
> > the idea without so much context missing: The client-server protocol
> > pertaining to this PIP, including its async nature, what is an epoch and
> > why it was introduced, what are flow permits. I'm not saying explain all
> > pulsar in this doc, but just include a brief explanation of that
> terminology.
> >
> > # What We Suggest
> >
> > Rethink the solution.
> > 1. The consumer (one of many) will send a seek command to the broker, and
> > at the same time clear its internal queue and wait for a response from the
> > broker.
> > 2. The broker upon receiving the seek command, will
> >      a. Stop sending dispatching messages to consumers.
> >      b. Notify all consumers via a command (new) that the subscription
> > position was asked to be reset. Consumers receiving this command will
> clear
> > their internal queue. The broker will no longer close the TCP connection
> > (with its adverse effects on other consumers and produces "riding" on that
> > connection)
> >      c. Reset the cursor to the newly requested position.
> >      d. Continue dispatching messages from newly requested positions to
> consumers.
> >
> 
> Good suggestions. I'll look into these issues and rethink the solution. I
> will rewrite this pip according to your suggestions.
> Thanks again for your review.
> 
> Asaf Mesika <as...@gmail.com> 于2022年9月7日周三 23:12写道:
> 
> > Hi Qiang,
> >
> > We have a brainstorming session on this PIP over Zoom with Penghui, Hang,
> > and more people, and I'm jotting down here our feedback.
> >
> > Before I do that, I just want to write my own understanding of the
> > document, for other readers:
> >
> > # Context
> > Pulsar, as opposed to other distributed / streaming systems, took the
> > approach of a push model. The client (consumer that is) asks for 1000
> > messages (that's the consumer's remaining capacity in its internal queue)
> > from the broker (that process is named flow permits). The broker was now
> > given permission to send 1000 messages to the client, hence utilizing the
> > TCP connection to send those 1000 messages as they were ready to be sent.
> >
> > The consumer has the ability to request the subscription to reset its
> > position to the requested new position.
> > The problem we have today is that while we have sent a request to reset the
> > subscription position, the broker decides to:
> > 1. Close the TCP connection which in turn causes the client to clear any
> > pending messages it has in the queue.
> > 2. Continue to send messages from the previous position, up to a certain
> > point where the broker "shifts gear" and starts sending messages from the
> > new position.
> >
> > So the problem is that you would expect that after the connection was
> > reset, only messages from the new position will be sent to the consumer,
> > but that doesn't happen.
> >
> > We have to keep in mind, that we have effectively two scenarios here from
> > the point of view of the consumer:
> > 1. Single consumer - It can be due to using an Exclusive subscription, or
> > being a consumer attached to a single topic since the subscription is of
> > type Failover.
> > 2. Multiple consumers - In a Shared or Key Shared subscription types. In
> > this case, one of those consumers can decide to reset the position of the
> > *subscription*. When that happens, the broker decides, again, to reset all
> > existing TCP connections to all consumers upon receiving the seek command,
> > and you would expect any messages sent afterward to be from the new
> > position, which again doesn't happen.
> >
> > Another really important piece of information we need to bring to the
> > context of the reader here is the notion of an epoch. First, the epoch in
> > Pulsar PIPs was introduced in PIP-84. The idea is that every time the
> > client starts a "session" of requesting and receiving messages in response,
> > the client will send a Session Sequence Number, and the server responds to
> > those message requests with the same session sequence number. Since Pulsar
> > doesn't follow a request-response model but has a bi-directional protocol,
> > the client can send a command to fetch messages using a new session
> > sequence number, while the server can still send messages using the old
> > session number. Using the Session Sequence Number the client can't tell the
> > difference between the messages being pushed from the server to it. That
> > Session Sequence Number has the one referred to as Epoch in PIP-84 and also
> > in this PIP.
> > The idea was somehow to demarcate the responses coming from the server
> > based on the commands the client sends as they are *independent* (async).
> >
> > # What are the issues with this PIP?
> > 1. The PIP decides to solve the problem listed above *only* for exclusive
> > and failover subscriptions where you have only a single consumer. The
> > problem still remains at large with Shared or Key Shared subscriptions.
> > 2. The cost of solving a small portion of the problem is high:
> >     Added Complexity - Adding another field to the protocol, and another
> > thing to check. I believe we should aim to reduce the cognitive load of the
> > developers of Pulsar.
> > 3. There are no rejected solutions - We always need to examine all
> > available options and list why we decided against them.
> > 4. Lack of background knowledge (context) - it's super hard IMO to grasp
> > the idea without so much context missing: The client-server protocol
> > pertaining to this PIP, including its async nature, what is an epoch and
> > why it was introduced, what are flow permits. I'm not saying explain all
> > pulsar in this doc, but just include a brief explanation of that
> > terminology.
> >
> > # What We Suggest
> >
> > Rethink the solution.
> > 1. The consumer (one of many) will send a seek command to the broker, and
> > at the same time clear its internal queue and wait for a response from the
> > broker.
> > 2. The broker upon receiving the seek command, will
> >      a. Stop sending dispatching messages to consumers.
> >      b. Notify all consumers via a command (new) that the subscription
> > position was asked to be reset. Consumers receiving this command will clear
> > their internal queue. The broker will no longer close the TCP connection
> > (with its adverse effects on other consumers and produces "riding" on that
> > connection)
> >      c. Reset the cursor to the newly requested position.
> >      d. Continue dispatching messages from newly requested positions to
> > consumers.
> >
> > The disadvantages here are that we need to alter the client to get to know
> > a new command and act accordingly, yet I think that is accidental
> > complexity stemming from the client-server architecture of bi-directional
> > and not request response.
> >
> > Thanks,
> >
> > Asaf
> >
> > On Mon, Aug 1, 2022 at 6:43 AM Qiang Huang <qi...@gmail.com>
> > wrote:
> >
> > > Sure. You can refer to pip-84:
> > >
> > >
> > https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch
> > > .
> > >
> > > Zike Yang <zi...@apache.org> 于2022年7月29日周五 10:22写道:
> > >
> > > > Hi, Qiang
> > > >
> > > > > It is necessary to check the current cursor status when handling
> > > > flowPermits
> > > > > request from the server side. If the server is handling seek request,
> > > it
> > > > > should ignore flowPermits request because the request is illegal.
> > > >
> > > > Thanks for your explanation. I think it's better to add this
> > > > explanation to the PIP.
> > > >
> > > > > The reconnected consumer can regard as a new consumer with new epoch.
> > > >
> > > > The consumer will reconnect to the broker during the seek operation.
> > > > And this will change the existing behavior. It doesn't seem to make
> > > > sense. Please correct me if I have misunderstood.
> > > >
> > > > Thanks,
> > > > Zike Yang
> > > >
> > > > On Wed, Jul 27, 2022 at 8:06 PM Qiang Huang <qiang.huang1991@gmail.com
> > >
> > > > wrote:
> > > > >
> > > > > Thanks Zike.
> > > > > > > - stage 1: Check the current cursor status when handling
> > > flowPermits
> > > > > from
> > > > > > > the server side.
> > > > >
> > > > > > > Could you explain more details on this step? It looks like there
> > is
> > > > > not much described above. What kind of status needs to be checked,
> > and
> > > > > what kind of behavior will the broker take?
> > > > > It is necessary to check the current cursor status when handling
> > > > flowPermits
> > > > > request from the server side. If the server is handling seek request,
> > > it
> > > > > should ignore flowPermits request because the request is illegal.
> > > > >
> > > > >
> > > > > > > 1. Consumer reconnect need reset epoch.
> > > > > >> Why do we need to reset the epoch when the consumer reconnects?
> > > > > The reconnected consumer can regard as a new consumer with new epoch.
> > > >
> > >
> > >
> > > --
> > > BR,
> > > Qiang Huang
> > >
> >
> 
> 
> -- 
> BR,
> Qiang Huang
> 

Re: [DISCUSS] PIP 194 : Pulsar client: seek command add epoch

Posted by Asaf Mesika <as...@gmail.com>.
 Did you end up fixing the PIP? I see it wasn’t implemented at the end.


On 11 Sep 2022 at 7:18:07, Qiang Huang <qi...@gmail.com> wrote:

> Hi Asaf, thank you for the very detailed reply.
>
> The problem we have today is that while we have sent a request to reset
>
> the subscription position, the broker decides to:
>
> 1. Close the TCP connection which in turn causes the client to clear any
>
> pending messages it has in the queue.
>
> 2. Continue to send messages from the previous position, up to a certain
>
> point where the broker "shifts gear" and starts sending messages from the
> new position.
>
> Since Pulsar doesn't follow a request-response model but has a
>
> bi-directional protocol, the client can send a command to fetch messages
> using a new session
>
> sequence number, while the server can still send messages using the old
>
> session number. Using the Session Sequence Number the client can't tell the
>
> difference between the messages being pushed from the server to it.
>
> I totally agree with you. I am aware of something wrong in the pip when I
> re-read this part of the code.
>
>
> # What are the issues with this PIP?
>
> 1. The PIP decides to solve the problem listed above *only* for exclusive
>
> and failover subscriptions where you have only a single consumer. The
>
> problem still remains at large with Shared or Key Shared subscriptions.
>
> 2. The cost of solving a small portion of the problem is high:
>
>     Added Complexity - Adding another field to the protocol, and another
>
> thing to check. I believe we should aim to reduce the cognitive load of
>
> the
>
> developers of Pulsar.
>
> 3. There are no rejected solutions - We always need to examine all
>
> available options and list why we decided against them.
>
> 4. Lack of background knowledge (context) - it's super hard IMO to grasp
>
> the idea without so much context missing: The client-server protocol
>
> pertaining to this PIP, including its async nature, what is an epoch and
>
> why it was introduced, what are flow permits. I'm not saying explain all
>
> pulsar in this doc, but just include a brief explanation of that
>
> terminology.
>
>
> # What We Suggest
>
>
> Rethink the solution.
>
> 1. The consumer (one of many) will send a seek command to the broker, and
>
> at the same time clear its internal queue and wait for a response from the
>
> broker.
>
> 2. The broker upon receiving the seek command, will
>
>      a. Stop sending dispatching messages to consumers.
>
>      b. Notify all consumers via a command (new) that the subscription
>
> position was asked to be reset. Consumers receiving this command will
>
> clear
>
> their internal queue. The broker will no longer close the TCP connection
>
> (with its adverse effects on other consumers and produces "riding" on that
>
> connection)
>
>      c. Reset the cursor to the newly requested position.
>
>      d. Continue dispatching messages from newly requested positions to
>
> consumers.
>
>
>
> Good suggestions. I'll look into these issues and rethink the solution. I
> will rewrite this pip according to your suggestions.
> Thanks again for your review.
>
> Asaf Mesika <as...@gmail.com> 于2022年9月7日周三 23:12写道:
>
> Hi Qiang,
>
>
> We have a brainstorming session on this PIP over Zoom with Penghui, Hang,
>
> and more people, and I'm jotting down here our feedback.
>
>
> Before I do that, I just want to write my own understanding of the
>
> document, for other readers:
>
>
> # Context
>
> Pulsar, as opposed to other distributed / streaming systems, took the
>
> approach of a push model. The client (consumer that is) asks for 1000
>
> messages (that's the consumer's remaining capacity in its internal queue)
>
> from the broker (that process is named flow permits). The broker was now
>
> given permission to send 1000 messages to the client, hence utilizing the
>
> TCP connection to send those 1000 messages as they were ready to be sent.
>
>
> The consumer has the ability to request the subscription to reset its
>
> position to the requested new position.
>
> The problem we have today is that while we have sent a request to reset the
>
> subscription position, the broker decides to:
>
> 1. Close the TCP connection which in turn causes the client to clear any
>
> pending messages it has in the queue.
>
> 2. Continue to send messages from the previous position, up to a certain
>
> point where the broker "shifts gear" and starts sending messages from the
>
> new position.
>
>
> So the problem is that you would expect that after the connection was
>
> reset, only messages from the new position will be sent to the consumer,
>
> but that doesn't happen.
>
>
> We have to keep in mind, that we have effectively two scenarios here from
>
> the point of view of the consumer:
>
> 1. Single consumer - It can be due to using an Exclusive subscription, or
>
> being a consumer attached to a single topic since the subscription is of
>
> type Failover.
>
> 2. Multiple consumers - In a Shared or Key Shared subscription types. In
>
> this case, one of those consumers can decide to reset the position of the
>
> *subscription*. When that happens, the broker decides, again, to reset all
>
> existing TCP connections to all consumers upon receiving the seek command,
>
> and you would expect any messages sent afterward to be from the new
>
> position, which again doesn't happen.
>
>
> Another really important piece of information we need to bring to the
>
> context of the reader here is the notion of an epoch. First, the epoch in
>
> Pulsar PIPs was introduced in PIP-84. The idea is that every time the
>
> client starts a "session" of requesting and receiving messages in response,
>
> the client will send a Session Sequence Number, and the server responds to
>
> those message requests with the same session sequence number. Since Pulsar
>
> doesn't follow a request-response model but has a bi-directional protocol,
>
> the client can send a command to fetch messages using a new session
>
> sequence number, while the server can still send messages using the old
>
> session number. Using the Session Sequence Number the client can't tell the
>
> difference between the messages being pushed from the server to it. That
>
> Session Sequence Number has the one referred to as Epoch in PIP-84 and also
>
> in this PIP.
>
> The idea was somehow to demarcate the responses coming from the server
>
> based on the commands the client sends as they are *independent* (async).
>
>
> # What are the issues with this PIP?
>
> 1. The PIP decides to solve the problem listed above *only* for exclusive
>
> and failover subscriptions where you have only a single consumer. The
>
> problem still remains at large with Shared or Key Shared subscriptions.
>
> 2. The cost of solving a small portion of the problem is high:
>
>     Added Complexity - Adding another field to the protocol, and another
>
> thing to check. I believe we should aim to reduce the cognitive load of the
>
> developers of Pulsar.
>
> 3. There are no rejected solutions - We always need to examine all
>
> available options and list why we decided against them.
>
> 4. Lack of background knowledge (context) - it's super hard IMO to grasp
>
> the idea without so much context missing: The client-server protocol
>
> pertaining to this PIP, including its async nature, what is an epoch and
>
> why it was introduced, what are flow permits. I'm not saying explain all
>
> pulsar in this doc, but just include a brief explanation of that
>
> terminology.
>
>
> # What We Suggest
>
>
> Rethink the solution.
>
> 1. The consumer (one of many) will send a seek command to the broker, and
>
> at the same time clear its internal queue and wait for a response from the
>
> broker.
>
> 2. The broker upon receiving the seek command, will
>
>      a. Stop sending dispatching messages to consumers.
>
>      b. Notify all consumers via a command (new) that the subscription
>
> position was asked to be reset. Consumers receiving this command will clear
>
> their internal queue. The broker will no longer close the TCP connection
>
> (with its adverse effects on other consumers and produces "riding" on that
>
> connection)
>
>      c. Reset the cursor to the newly requested position.
>
>      d. Continue dispatching messages from newly requested positions to
>
> consumers.
>
>
> The disadvantages here are that we need to alter the client to get to know
>
> a new command and act accordingly, yet I think that is accidental
>
> complexity stemming from the client-server architecture of bi-directional
>
> and not request response.
>
>
> Thanks,
>
>
> Asaf
>
>
> On Mon, Aug 1, 2022 at 6:43 AM Qiang Huang <qi...@gmail.com>
>
> wrote:
>
>
> > Sure. You can refer to pip-84:
>
> >
>
> >
>
>
> https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch
>
> > .
>
> >
>
> > Zike Yang <zi...@apache.org> 于2022年7月29日周五 10:22写道:
>
> >
>
> > > Hi, Qiang
>
> > >
>
> > > > It is necessary to check the current cursor status when handling
>
> > > flowPermits
>
> > > > request from the server side. If the server is handling seek request,
>
> > it
>
> > > > should ignore flowPermits request because the request is illegal.
>
> > >
>
> > > Thanks for your explanation. I think it's better to add this
>
> > > explanation to the PIP.
>
> > >
>
> > > > The reconnected consumer can regard as a new consumer with new epoch.
>
> > >
>
> > > The consumer will reconnect to the broker during the seek operation.
>
> > > And this will change the existing behavior. It doesn't seem to make
>
> > > sense. Please correct me if I have misunderstood.
>
> > >
>
> > > Thanks,
>
> > > Zike Yang
>
> > >
>
> > > On Wed, Jul 27, 2022 at 8:06 PM Qiang Huang <qiang.huang1991@gmail.com
>
> >
>
> > > wrote:
>
> > > >
>
> > > > Thanks Zike.
>
> > > > > > - stage 1: Check the current cursor status when handling
>
> > flowPermits
>
> > > > from
>
> > > > > > the server side.
>
> > > >
>
> > > > > > Could you explain more details on this step? It looks like there
>
> is
>
> > > > not much described above. What kind of status needs to be checked,
>
> and
>
> > > > what kind of behavior will the broker take?
>
> > > > It is necessary to check the current cursor status when handling
>
> > > flowPermits
>
> > > > request from the server side. If the server is handling seek request,
>
> > it
>
> > > > should ignore flowPermits request because the request is illegal.
>
> > > >
>
> > > >
>
> > > > > > 1. Consumer reconnect need reset epoch.
>
> > > > >> Why do we need to reset the epoch when the consumer reconnects?
>
> > > > The reconnected consumer can regard as a new consumer with new epoch.
>
> > >
>
> >
>
> >
>
> > --
>
> > BR,
>
> > Qiang Huang
>
> >
>
>
>
>
> --
> BR,
> Qiang Huang
>

Re: [DISCUSS] PIP 194 : Pulsar client: seek command add epoch

Posted by Qiang Huang <qi...@gmail.com>.
Hi Asaf, thank you for the very detailed reply.

> The problem we have today is that while we have sent a request to reset
the subscription position, the broker decides to:
> 1. Close the TCP connection which in turn causes the client to clear any
pending messages it has in the queue.
> 2. Continue to send messages from the previous position, up to a certain
point where the broker "shifts gear" and starts sending messages from the
new position.

> Since Pulsar doesn't follow a request-response model but has a
bi-directional protocol, the client can send a command to fetch messages
using a new session
> sequence number, while the server can still send messages using the old
session number. Using the Session Sequence Number the client can't tell the
> difference between the messages being pushed from the server to it.
I totally agree with you. I am aware of something wrong in the pip when I
re-read this part of the code.


> # What are the issues with this PIP?
> 1. The PIP decides to solve the problem listed above *only* for exclusive
> and failover subscriptions where you have only a single consumer. The
> problem still remains at large with Shared or Key Shared subscriptions.
> 2. The cost of solving a small portion of the problem is high:
>     Added Complexity - Adding another field to the protocol, and another
> thing to check. I believe we should aim to reduce the cognitive load of
the
> developers of Pulsar.
> 3. There are no rejected solutions - We always need to examine all
> available options and list why we decided against them.
> 4. Lack of background knowledge (context) - it's super hard IMO to grasp
> the idea without so much context missing: The client-server protocol
> pertaining to this PIP, including its async nature, what is an epoch and
> why it was introduced, what are flow permits. I'm not saying explain all
> pulsar in this doc, but just include a brief explanation of that
terminology.
>
> # What We Suggest
>
> Rethink the solution.
> 1. The consumer (one of many) will send a seek command to the broker, and
> at the same time clear its internal queue and wait for a response from the
> broker.
> 2. The broker upon receiving the seek command, will
>      a. Stop sending dispatching messages to consumers.
>      b. Notify all consumers via a command (new) that the subscription
> position was asked to be reset. Consumers receiving this command will
clear
> their internal queue. The broker will no longer close the TCP connection
> (with its adverse effects on other consumers and produces "riding" on that
> connection)
>      c. Reset the cursor to the newly requested position.
>      d. Continue dispatching messages from newly requested positions to
consumers.
>

Good suggestions. I'll look into these issues and rethink the solution. I
will rewrite this pip according to your suggestions.
Thanks again for your review.

Asaf Mesika <as...@gmail.com> 于2022年9月7日周三 23:12写道:

> Hi Qiang,
>
> We have a brainstorming session on this PIP over Zoom with Penghui, Hang,
> and more people, and I'm jotting down here our feedback.
>
> Before I do that, I just want to write my own understanding of the
> document, for other readers:
>
> # Context
> Pulsar, as opposed to other distributed / streaming systems, took the
> approach of a push model. The client (consumer that is) asks for 1000
> messages (that's the consumer's remaining capacity in its internal queue)
> from the broker (that process is named flow permits). The broker was now
> given permission to send 1000 messages to the client, hence utilizing the
> TCP connection to send those 1000 messages as they were ready to be sent.
>
> The consumer has the ability to request the subscription to reset its
> position to the requested new position.
> The problem we have today is that while we have sent a request to reset the
> subscription position, the broker decides to:
> 1. Close the TCP connection which in turn causes the client to clear any
> pending messages it has in the queue.
> 2. Continue to send messages from the previous position, up to a certain
> point where the broker "shifts gear" and starts sending messages from the
> new position.
>
> So the problem is that you would expect that after the connection was
> reset, only messages from the new position will be sent to the consumer,
> but that doesn't happen.
>
> We have to keep in mind, that we have effectively two scenarios here from
> the point of view of the consumer:
> 1. Single consumer - It can be due to using an Exclusive subscription, or
> being a consumer attached to a single topic since the subscription is of
> type Failover.
> 2. Multiple consumers - In a Shared or Key Shared subscription types. In
> this case, one of those consumers can decide to reset the position of the
> *subscription*. When that happens, the broker decides, again, to reset all
> existing TCP connections to all consumers upon receiving the seek command,
> and you would expect any messages sent afterward to be from the new
> position, which again doesn't happen.
>
> Another really important piece of information we need to bring to the
> context of the reader here is the notion of an epoch. First, the epoch in
> Pulsar PIPs was introduced in PIP-84. The idea is that every time the
> client starts a "session" of requesting and receiving messages in response,
> the client will send a Session Sequence Number, and the server responds to
> those message requests with the same session sequence number. Since Pulsar
> doesn't follow a request-response model but has a bi-directional protocol,
> the client can send a command to fetch messages using a new session
> sequence number, while the server can still send messages using the old
> session number. Using the Session Sequence Number the client can't tell the
> difference between the messages being pushed from the server to it. That
> Session Sequence Number has the one referred to as Epoch in PIP-84 and also
> in this PIP.
> The idea was somehow to demarcate the responses coming from the server
> based on the commands the client sends as they are *independent* (async).
>
> # What are the issues with this PIP?
> 1. The PIP decides to solve the problem listed above *only* for exclusive
> and failover subscriptions where you have only a single consumer. The
> problem still remains at large with Shared or Key Shared subscriptions.
> 2. The cost of solving a small portion of the problem is high:
>     Added Complexity - Adding another field to the protocol, and another
> thing to check. I believe we should aim to reduce the cognitive load of the
> developers of Pulsar.
> 3. There are no rejected solutions - We always need to examine all
> available options and list why we decided against them.
> 4. Lack of background knowledge (context) - it's super hard IMO to grasp
> the idea without so much context missing: The client-server protocol
> pertaining to this PIP, including its async nature, what is an epoch and
> why it was introduced, what are flow permits. I'm not saying explain all
> pulsar in this doc, but just include a brief explanation of that
> terminology.
>
> # What We Suggest
>
> Rethink the solution.
> 1. The consumer (one of many) will send a seek command to the broker, and
> at the same time clear its internal queue and wait for a response from the
> broker.
> 2. The broker upon receiving the seek command, will
>      a. Stop sending dispatching messages to consumers.
>      b. Notify all consumers via a command (new) that the subscription
> position was asked to be reset. Consumers receiving this command will clear
> their internal queue. The broker will no longer close the TCP connection
> (with its adverse effects on other consumers and produces "riding" on that
> connection)
>      c. Reset the cursor to the newly requested position.
>      d. Continue dispatching messages from newly requested positions to
> consumers.
>
> The disadvantages here are that we need to alter the client to get to know
> a new command and act accordingly, yet I think that is accidental
> complexity stemming from the client-server architecture of bi-directional
> and not request response.
>
> Thanks,
>
> Asaf
>
> On Mon, Aug 1, 2022 at 6:43 AM Qiang Huang <qi...@gmail.com>
> wrote:
>
> > Sure. You can refer to pip-84:
> >
> >
> https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch
> > .
> >
> > Zike Yang <zi...@apache.org> 于2022年7月29日周五 10:22写道:
> >
> > > Hi, Qiang
> > >
> > > > It is necessary to check the current cursor status when handling
> > > flowPermits
> > > > request from the server side. If the server is handling seek request,
> > it
> > > > should ignore flowPermits request because the request is illegal.
> > >
> > > Thanks for your explanation. I think it's better to add this
> > > explanation to the PIP.
> > >
> > > > The reconnected consumer can regard as a new consumer with new epoch.
> > >
> > > The consumer will reconnect to the broker during the seek operation.
> > > And this will change the existing behavior. It doesn't seem to make
> > > sense. Please correct me if I have misunderstood.
> > >
> > > Thanks,
> > > Zike Yang
> > >
> > > On Wed, Jul 27, 2022 at 8:06 PM Qiang Huang <qiang.huang1991@gmail.com
> >
> > > wrote:
> > > >
> > > > Thanks Zike.
> > > > > > - stage 1: Check the current cursor status when handling
> > flowPermits
> > > > from
> > > > > > the server side.
> > > >
> > > > > > Could you explain more details on this step? It looks like there
> is
> > > > not much described above. What kind of status needs to be checked,
> and
> > > > what kind of behavior will the broker take?
> > > > It is necessary to check the current cursor status when handling
> > > flowPermits
> > > > request from the server side. If the server is handling seek request,
> > it
> > > > should ignore flowPermits request because the request is illegal.
> > > >
> > > >
> > > > > > 1. Consumer reconnect need reset epoch.
> > > > >> Why do we need to reset the epoch when the consumer reconnects?
> > > > The reconnected consumer can regard as a new consumer with new epoch.
> > >
> >
> >
> > --
> > BR,
> > Qiang Huang
> >
>


-- 
BR,
Qiang Huang

Re: [DISCUSS] PIP 194 : Pulsar client: seek command add epoch

Posted by Asaf Mesika <as...@gmail.com>.
Hi Qiang,

We have a brainstorming session on this PIP over Zoom with Penghui, Hang,
and more people, and I'm jotting down here our feedback.

Before I do that, I just want to write my own understanding of the
document, for other readers:

# Context
Pulsar, as opposed to other distributed / streaming systems, took the
approach of a push model. The client (consumer that is) asks for 1000
messages (that's the consumer's remaining capacity in its internal queue)
from the broker (that process is named flow permits). The broker was now
given permission to send 1000 messages to the client, hence utilizing the
TCP connection to send those 1000 messages as they were ready to be sent.

The consumer has the ability to request the subscription to reset its
position to the requested new position.
The problem we have today is that while we have sent a request to reset the
subscription position, the broker decides to:
1. Close the TCP connection which in turn causes the client to clear any
pending messages it has in the queue.
2. Continue to send messages from the previous position, up to a certain
point where the broker "shifts gear" and starts sending messages from the
new position.

So the problem is that you would expect that after the connection was
reset, only messages from the new position will be sent to the consumer,
but that doesn't happen.

We have to keep in mind, that we have effectively two scenarios here from
the point of view of the consumer:
1. Single consumer - It can be due to using an Exclusive subscription, or
being a consumer attached to a single topic since the subscription is of
type Failover.
2. Multiple consumers - In a Shared or Key Shared subscription types. In
this case, one of those consumers can decide to reset the position of the
*subscription*. When that happens, the broker decides, again, to reset all
existing TCP connections to all consumers upon receiving the seek command,
and you would expect any messages sent afterward to be from the new
position, which again doesn't happen.

Another really important piece of information we need to bring to the
context of the reader here is the notion of an epoch. First, the epoch in
Pulsar PIPs was introduced in PIP-84. The idea is that every time the
client starts a "session" of requesting and receiving messages in response,
the client will send a Session Sequence Number, and the server responds to
those message requests with the same session sequence number. Since Pulsar
doesn't follow a request-response model but has a bi-directional protocol,
the client can send a command to fetch messages using a new session
sequence number, while the server can still send messages using the old
session number. Using the Session Sequence Number the client can't tell the
difference between the messages being pushed from the server to it. That
Session Sequence Number has the one referred to as Epoch in PIP-84 and also
in this PIP.
The idea was somehow to demarcate the responses coming from the server
based on the commands the client sends as they are *independent* (async).

# What are the issues with this PIP?
1. The PIP decides to solve the problem listed above *only* for exclusive
and failover subscriptions where you have only a single consumer. The
problem still remains at large with Shared or Key Shared subscriptions.
2. The cost of solving a small portion of the problem is high:
    Added Complexity - Adding another field to the protocol, and another
thing to check. I believe we should aim to reduce the cognitive load of the
developers of Pulsar.
3. There are no rejected solutions - We always need to examine all
available options and list why we decided against them.
4. Lack of background knowledge (context) - it's super hard IMO to grasp
the idea without so much context missing: The client-server protocol
pertaining to this PIP, including its async nature, what is an epoch and
why it was introduced, what are flow permits. I'm not saying explain all
pulsar in this doc, but just include a brief explanation of that
terminology.

# What We Suggest

Rethink the solution.
1. The consumer (one of many) will send a seek command to the broker, and
at the same time clear its internal queue and wait for a response from the
broker.
2. The broker upon receiving the seek command, will
     a. Stop sending dispatching messages to consumers.
     b. Notify all consumers via a command (new) that the subscription
position was asked to be reset. Consumers receiving this command will clear
their internal queue. The broker will no longer close the TCP connection
(with its adverse effects on other consumers and produces "riding" on that
connection)
     c. Reset the cursor to the newly requested position.
     d. Continue dispatching messages from newly requested positions to
consumers.

The disadvantages here are that we need to alter the client to get to know
a new command and act accordingly, yet I think that is accidental
complexity stemming from the client-server architecture of bi-directional
and not request response.

Thanks,

Asaf

On Mon, Aug 1, 2022 at 6:43 AM Qiang Huang <qi...@gmail.com>
wrote:

> Sure. You can refer to pip-84:
>
> https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch
> .
>
> Zike Yang <zi...@apache.org> 于2022年7月29日周五 10:22写道:
>
> > Hi, Qiang
> >
> > > It is necessary to check the current cursor status when handling
> > flowPermits
> > > request from the server side. If the server is handling seek request,
> it
> > > should ignore flowPermits request because the request is illegal.
> >
> > Thanks for your explanation. I think it's better to add this
> > explanation to the PIP.
> >
> > > The reconnected consumer can regard as a new consumer with new epoch.
> >
> > The consumer will reconnect to the broker during the seek operation.
> > And this will change the existing behavior. It doesn't seem to make
> > sense. Please correct me if I have misunderstood.
> >
> > Thanks,
> > Zike Yang
> >
> > On Wed, Jul 27, 2022 at 8:06 PM Qiang Huang <qi...@gmail.com>
> > wrote:
> > >
> > > Thanks Zike.
> > > > > - stage 1: Check the current cursor status when handling
> flowPermits
> > > from
> > > > > the server side.
> > >
> > > > > Could you explain more details on this step? It looks like there is
> > > not much described above. What kind of status needs to be checked, and
> > > what kind of behavior will the broker take?
> > > It is necessary to check the current cursor status when handling
> > flowPermits
> > > request from the server side. If the server is handling seek request,
> it
> > > should ignore flowPermits request because the request is illegal.
> > >
> > >
> > > > > 1. Consumer reconnect need reset epoch.
> > > >> Why do we need to reset the epoch when the consumer reconnects?
> > > The reconnected consumer can regard as a new consumer with new epoch.
> >
>
>
> --
> BR,
> Qiang Huang
>

Re: [DISCUSS] PIP 194 : Pulsar client: seek command add epoch

Posted by Qiang Huang <qi...@gmail.com>.
Sure. You can refer to pip-84:
https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch
.

Zike Yang <zi...@apache.org> 于2022年7月29日周五 10:22写道:

> Hi, Qiang
>
> > It is necessary to check the current cursor status when handling
> flowPermits
> > request from the server side. If the server is handling seek request, it
> > should ignore flowPermits request because the request is illegal.
>
> Thanks for your explanation. I think it's better to add this
> explanation to the PIP.
>
> > The reconnected consumer can regard as a new consumer with new epoch.
>
> The consumer will reconnect to the broker during the seek operation.
> And this will change the existing behavior. It doesn't seem to make
> sense. Please correct me if I have misunderstood.
>
> Thanks,
> Zike Yang
>
> On Wed, Jul 27, 2022 at 8:06 PM Qiang Huang <qi...@gmail.com>
> wrote:
> >
> > Thanks Zike.
> > > > - stage 1: Check the current cursor status when handling flowPermits
> > from
> > > > the server side.
> >
> > > > Could you explain more details on this step? It looks like there is
> > not much described above. What kind of status needs to be checked, and
> > what kind of behavior will the broker take?
> > It is necessary to check the current cursor status when handling
> flowPermits
> > request from the server side. If the server is handling seek request, it
> > should ignore flowPermits request because the request is illegal.
> >
> >
> > > > 1. Consumer reconnect need reset epoch.
> > >> Why do we need to reset the epoch when the consumer reconnects?
> > The reconnected consumer can regard as a new consumer with new epoch.
>


-- 
BR,
Qiang Huang

Re: [DISCUSS] PIP 194 : Pulsar client: seek command add epoch

Posted by Zike Yang <zi...@apache.org>.
Hi, Qiang

> It is necessary to check the current cursor status when handling flowPermits
> request from the server side. If the server is handling seek request, it
> should ignore flowPermits request because the request is illegal.

Thanks for your explanation. I think it's better to add this
explanation to the PIP.

> The reconnected consumer can regard as a new consumer with new epoch.

The consumer will reconnect to the broker during the seek operation.
And this will change the existing behavior. It doesn't seem to make
sense. Please correct me if I have misunderstood.

Thanks,
Zike Yang

On Wed, Jul 27, 2022 at 8:06 PM Qiang Huang <qi...@gmail.com> wrote:
>
> Thanks Zike.
> > > - stage 1: Check the current cursor status when handling flowPermits
> from
> > > the server side.
>
> > > Could you explain more details on this step? It looks like there is
> not much described above. What kind of status needs to be checked, and
> what kind of behavior will the broker take?
> It is necessary to check the current cursor status when handling flowPermits
> request from the server side. If the server is handling seek request, it
> should ignore flowPermits request because the request is illegal.
>
>
> > > 1. Consumer reconnect need reset epoch.
> >> Why do we need to reset the epoch when the consumer reconnects?
> The reconnected consumer can regard as a new consumer with new epoch.

Re: [DISCUSS] PIP 194 : Pulsar client: seek command add epoch

Posted by Qiang Huang <qi...@gmail.com>.
Thanks Zike.
> > - stage 1: Check the current cursor status when handling flowPermits
from
> > the server side.

> > Could you explain more details on this step? It looks like there is
not much described above. What kind of status needs to be checked, and
what kind of behavior will the broker take?
It is necessary to check the current cursor status when handling flowPermits
request from the server side. If the server is handling seek request, it
should ignore flowPermits request because the request is illegal.


> > 1. Consumer reconnect need reset epoch.
>> Why do we need to reset the epoch when the consumer reconnects?
The reconnected consumer can regard as a new consumer with new epoch.

Re: [DISCUSS] PIP 194 : Pulsar client: seek command add epoch

Posted by Zike Yang <zi...@apache.org>.
Hi, Qiang Huang.
This is a good proposal to solve the seek issue of readers. Overall
looks good to me. Left some comments here. Thanks.

> > - stage 1: Check the current cursor status when handling flowPermits from
> > the server side.

Could you explain more details on this step? It looks like there is
not much described above. What kind of status needs to be checked, and
what kind of behavior will the broker take?

> > 1. Consumer reconnect need reset epoch.

Why do we need to reset the epoch when the consumer reconnects?

Thanks!

Zike Yang

On Tue, Jul 26, 2022 at 11:51 AM Anon Hxy <an...@gmail.com> wrote:
>
> +1, Good work.
>
> Thanks,
> Xiaoyu Hou
>
> Qiang Huang <qi...@gmail.com> 于2022年7月24日周日 22:25写道:
>
> > Hi Pulsar community:
> > I open a pip to discuss "Pulsar client: seek command add epoch"
> > Proposal Link:
> >
> >    - issue link: https://github.com/apache/pulsar/issues/16757
> >
> > --
> > ## Motivation
> > `Reader` belongs to exclusive subscription type, and it uses `nonDurable`
> > cursor. After receiving messages, `Reader` will ack cumulatively
> > immediately.
> > The `flowPermits` are triggered in multiple scenarios from the client side
> > and it is isolated from `seek` of `Consumer`. Therefore, it is possibile
> > that `flowPermits` will execute after `seek` from the client side, like the
> > following flow chart.
> >
> > [image: image.png]
> >
> > When `handleSeek` processing is delay from the server side, the
> > `MarkDelete position` is modified in a wrong way.
> > The expected result is that `Reader`can re-consume messages from `mark
> > delete:(1,1)` after `seek`. But it doesn't work.
> >
> > Pulsar read message and seek position is not a synchronous operation, the
> > seek request can't prevent an in-process entry reading operation. The
> > client-side also has an opportunity to receive messages after the seek
> > position.
> >
> > Pulsar client make read messages operation and seek position operation
> > synchronized so add an epoch into server and client consumer.  After client
> > reader consumer invoke `seek` , the epoch increase 1 and send `seek`
> >  command carry the epoch and then server consumer will update the epoch.
> > When dispatcher messages to client will carry the epoch which the cursor
> > read at the time. Client consumer will filter the send messages command
> > which is smaller than current epoch.
> > In this way, after the client consumer send `seek` command successfully,
> > because it has passed the epoch filtering, the consumer will not receive a
> > message with a messageID greater than the user previously seek position.
> >
> >
> > ### Current implementation details
> > #### CommandSeek Protocal
> > ```proto
> > // Reset an existing consumer to a particular message id
> > message CommandSeek {
> >     required uint64 consumer_id = 1;
> >     required uint64 request_id  = 2;
> >
> >     optional MessageIdData message_id = 3;
> >     optional uint64 message_publish_time = 4;
> > }
> > ```
> > ### CommandMessage
> > ```proto
> > message CommandMessage {
> >     required uint64 consumer_id       = 1;
> >     required MessageIdData message_id = 2;
> >     optional uint32 redelivery_count  = 3 [default = 0];
> >     repeated int64 ack_set = 4;
> >     optional uint64 epoch = 5 [default = 0];
> > }
> > ```
> > `CommandMessage` already add epoch by [PIP-84](
> > https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch)
> > , when client receive `CommandMessage` will compare the command epoch and
> > local epoch to handle this command.
> >
> > ## Goal
> > Add epoch into seek command.
> >
> > ## API Changes
> > ### Protocal change: CommandSeek
> > ```proto
> > // Reset an existing consumer to a particular message id
> > message CommandSeek {
> >     required uint64 consumer_id = 1;
> >     required uint64 request_id  = 2;
> >
> >     optional MessageIdData message_id = 3;
> >     optional uint64 message_publish_time = 4;
> >     optional uint64 consumer_epoch = 5;
> > }
> > ```
> > `CommandSeek` command add epoch field, when client send seek command to
> > server successfully, the server will change the server consumer epoch to
> > the command epoch. The epoch only can bigger than the old epoch in server.
> > Now the client can filter out the message which contains less consumer
> > epoch.
> >
> > ## Implementation
> > - stage 1: Check the current cursor status when handling flowPermits from
> > the server side.
> > - stage 2: Add epoch into seek command, and server update the consumer
> > epoch. It can prevent an in-process entry reading operation after the seek
> > request.
> >
> > ## Reject Alternatives
> > None yet.
> >
> > ## Note
> > 1. Consumer reconnect need reset epoch.
> >
> > --
> > BR,
> > Qiang Huang
> >

Re: [DISCUSS] PIP 194 : Pulsar client: seek command add epoch

Posted by Anon Hxy <an...@gmail.com>.
+1, Good work.

Thanks,
Xiaoyu Hou

Qiang Huang <qi...@gmail.com> 于2022年7月24日周日 22:25写道:

> Hi Pulsar community:
> I open a pip to discuss "Pulsar client: seek command add epoch"
> Proposal Link:
>
>    - issue link: https://github.com/apache/pulsar/issues/16757
>
> --
> ## Motivation
> `Reader` belongs to exclusive subscription type, and it uses `nonDurable`
> cursor. After receiving messages, `Reader` will ack cumulatively
> immediately.
> The `flowPermits` are triggered in multiple scenarios from the client side
> and it is isolated from `seek` of `Consumer`. Therefore, it is possibile
> that `flowPermits` will execute after `seek` from the client side, like the
> following flow chart.
>
> [image: image.png]
>
> When `handleSeek` processing is delay from the server side, the
> `MarkDelete position` is modified in a wrong way.
> The expected result is that `Reader`can re-consume messages from `mark
> delete:(1,1)` after `seek`. But it doesn't work.
>
> Pulsar read message and seek position is not a synchronous operation, the
> seek request can't prevent an in-process entry reading operation. The
> client-side also has an opportunity to receive messages after the seek
> position.
>
> Pulsar client make read messages operation and seek position operation
> synchronized so add an epoch into server and client consumer.  After client
> reader consumer invoke `seek` , the epoch increase 1 and send `seek`
>  command carry the epoch and then server consumer will update the epoch.
> When dispatcher messages to client will carry the epoch which the cursor
> read at the time. Client consumer will filter the send messages command
> which is smaller than current epoch.
> In this way, after the client consumer send `seek` command successfully,
> because it has passed the epoch filtering, the consumer will not receive a
> message with a messageID greater than the user previously seek position.
>
>
> ### Current implementation details
> #### CommandSeek Protocal
> ```proto
> // Reset an existing consumer to a particular message id
> message CommandSeek {
>     required uint64 consumer_id = 1;
>     required uint64 request_id  = 2;
>
>     optional MessageIdData message_id = 3;
>     optional uint64 message_publish_time = 4;
> }
> ```
> ### CommandMessage
> ```proto
> message CommandMessage {
>     required uint64 consumer_id       = 1;
>     required MessageIdData message_id = 2;
>     optional uint32 redelivery_count  = 3 [default = 0];
>     repeated int64 ack_set = 4;
>     optional uint64 epoch = 5 [default = 0];
> }
> ```
> `CommandMessage` already add epoch by [PIP-84](
> https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch)
> , when client receive `CommandMessage` will compare the command epoch and
> local epoch to handle this command.
>
> ## Goal
> Add epoch into seek command.
>
> ## API Changes
> ### Protocal change: CommandSeek
> ```proto
> // Reset an existing consumer to a particular message id
> message CommandSeek {
>     required uint64 consumer_id = 1;
>     required uint64 request_id  = 2;
>
>     optional MessageIdData message_id = 3;
>     optional uint64 message_publish_time = 4;
>     optional uint64 consumer_epoch = 5;
> }
> ```
> `CommandSeek` command add epoch field, when client send seek command to
> server successfully, the server will change the server consumer epoch to
> the command epoch. The epoch only can bigger than the old epoch in server.
> Now the client can filter out the message which contains less consumer
> epoch.
>
> ## Implementation
> - stage 1: Check the current cursor status when handling flowPermits from
> the server side.
> - stage 2: Add epoch into seek command, and server update the consumer
> epoch. It can prevent an in-process entry reading operation after the seek
> request.
>
> ## Reject Alternatives
> None yet.
>
> ## Note
> 1. Consumer reconnect need reset epoch.
>
> --
> BR,
> Qiang Huang
>

Re: [DISCUSS] PIP 194 : Pulsar client: seek command add epoch

Posted by Zixuan Liu <no...@gmail.com>.
+1, good idea!

Thanks,
Zixuan



Qiang Huang <qi...@gmail.com> 于2022年7月24日周日 22:25写道:

> Hi Pulsar community:
> I open a pip to discuss "Pulsar client: seek command add epoch"
> Proposal Link:
>
>    - issue link: https://github.com/apache/pulsar/issues/16757
>
> --
> ## Motivation
> `Reader` belongs to exclusive subscription type, and it uses `nonDurable`
> cursor. After receiving messages, `Reader` will ack cumulatively
> immediately.
> The `flowPermits` are triggered in multiple scenarios from the client side
> and it is isolated from `seek` of `Consumer`. Therefore, it is possibile
> that `flowPermits` will execute after `seek` from the client side, like the
> following flow chart.
>
> [image: image.png]
>
> When `handleSeek` processing is delay from the server side, the
> `MarkDelete position` is modified in a wrong way.
> The expected result is that `Reader`can re-consume messages from `mark
> delete:(1,1)` after `seek`. But it doesn't work.
>
> Pulsar read message and seek position is not a synchronous operation, the
> seek request can't prevent an in-process entry reading operation. The
> client-side also has an opportunity to receive messages after the seek
> position.
>
> Pulsar client make read messages operation and seek position operation
> synchronized so add an epoch into server and client consumer.  After client
> reader consumer invoke `seek` , the epoch increase 1 and send `seek`
>  command carry the epoch and then server consumer will update the epoch.
> When dispatcher messages to client will carry the epoch which the cursor
> read at the time. Client consumer will filter the send messages command
> which is smaller than current epoch.
> In this way, after the client consumer send `seek` command successfully,
> because it has passed the epoch filtering, the consumer will not receive a
> message with a messageID greater than the user previously seek position.
>
>
> ### Current implementation details
> #### CommandSeek Protocal
> ```proto
> // Reset an existing consumer to a particular message id
> message CommandSeek {
>     required uint64 consumer_id = 1;
>     required uint64 request_id  = 2;
>
>     optional MessageIdData message_id = 3;
>     optional uint64 message_publish_time = 4;
> }
> ```
> ### CommandMessage
> ```proto
> message CommandMessage {
>     required uint64 consumer_id       = 1;
>     required MessageIdData message_id = 2;
>     optional uint32 redelivery_count  = 3 [default = 0];
>     repeated int64 ack_set = 4;
>     optional uint64 epoch = 5 [default = 0];
> }
> ```
> `CommandMessage` already add epoch by [PIP-84](
> https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch)
> , when client receive `CommandMessage` will compare the command epoch and
> local epoch to handle this command.
>
> ## Goal
> Add epoch into seek command.
>
> ## API Changes
> ### Protocal change: CommandSeek
> ```proto
> // Reset an existing consumer to a particular message id
> message CommandSeek {
>     required uint64 consumer_id = 1;
>     required uint64 request_id  = 2;
>
>     optional MessageIdData message_id = 3;
>     optional uint64 message_publish_time = 4;
>     optional uint64 consumer_epoch = 5;
> }
> ```
> `CommandSeek` command add epoch field, when client send seek command to
> server successfully, the server will change the server consumer epoch to
> the command epoch. The epoch only can bigger than the old epoch in server.
> Now the client can filter out the message which contains less consumer
> epoch.
>
> ## Implementation
> - stage 1: Check the current cursor status when handling flowPermits from
> the server side.
> - stage 2: Add epoch into seek command, and server update the consumer
> epoch. It can prevent an in-process entry reading operation after the seek
> request.
>
> ## Reject Alternatives
> None yet.
>
> ## Note
> 1. Consumer reconnect need reset epoch.
>
> --
> BR,
> Qiang Huang
>