You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by 丛搏 <co...@gmail.com> on 2022/11/21 13:03:54 UTC

[DISCUSS] The use of consumer redeliverUnacknowledgedMessages method

Hello, Pulsar community:

Now client consumer `void redeliverUnacknowledgedMessages();` is an
async interface, but it doesn't have the return value. only
`writeAndFlush` the redeliver command then finishes.

`ConsumerImpl`:
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1907-L1909

`MultiTopicsConsumerImpl`:
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L667-L677

in the shared subType, I think it doesn't need the response of the
`void redeliverUnacknowledgedMessages()`, and naming the
`redeliverUnacknowledgedMessages` is ok.

but in failover and exclusive subType, if we don't get the response,
the user will receive the message from the `incomingQueue` then the
order of the message will be broken.  If the
`redeliverUnacknowledgedMessages` timeout, we should try again. but
`redeliverUnacknowledgedMessages` doesn't throw any exception or
retry. and the `redeliverUnacknowledgedMessages` name is not accurate
for failover and exclusive subType. it is named `rewind` is more
suitable.

So I suggest `redeliverUnacknowledgedMessages` be deprecated under
failover and exclusive subType and add a new similar async and sync
method called `rewind` for failover and exclusive subType.

Please leave your comments or suggestions, thanks!

Thanks,
bo

Re: [DISCUSS] The use of consumer redeliverUnacknowledgedMessages method

Posted by 丛搏 <co...@gmail.com>.
Hi, Baodi

> After, when the user calls `redeliverUnacknowledgedMessages` under failover and exclusive subType,
> It gets a warning reminder that the messages may be out of order, right?
>
yes, in failover and exclusive subType, we should suggest user to use
a new API to redeliver the messages and reminder that the messages may
be out of order

Re: [DISCUSS] The use of consumer redeliverUnacknowledgedMessages method

Posted by Baodi Shi <ba...@icloud.com.INVALID>.
Hi, Congbo:

Thanks for your explanation. I agree to add a new interface for failover and exclusive subType.

>>> I suggest `redeliverUnacknowledgedMessages` be deprecated under
>>> failover and exclusive subType and add a new similar async and sync
>>> method called `rewind` for failover and exclusive subType.

After, when the user calls `redeliverUnacknowledgedMessages` under failover and exclusive subType, 
It gets a warning reminder that the messages may be out of order, right?

Thanks,
Baodi Shi

> 2022年11月22日 00:33,丛搏 <co...@gmail.com> 写道:
> 
> Hi, Baodi:
> 
> I don't think it will confuse users, eg. individual ack and cumulative
> ack, in the share subType cumulative ack have no use.
> because we have individualAck and cumulativeAck, we should have
> different redeliver methods for them.
> I think adding new methods makes sense unless we separate cumulative
> ack and individual ack into different consumer APIs like
> ShareConsumer, and FailoverConsumer, it's just an example.
> 
> Thanks,
> bo
> 
> Baodi Shi <ba...@icloud.com.invalid> 于2022年11月21日周一 22:43写道:
>> 
>> Hi, Congbo.
>> 
>> The subscription type is an internal property of the consumer; If two APIs are provided: redeliverUnacknowledgedMessages(only work on shard) and rewind(only work exclusive and failover), This can be confusing for users.
>> 
>> Back to the issue, depending on the current usage scenario, the redeliverUnacknowledgedMessages method should be synchronous, maybe we can modify it to be synchronous and support retrying.
>> 
>> 
>> 
>> Thanks,
>> Baodi Shi
>> 
>>> 2022年11月21日 21:03,丛搏 <co...@gmail.com> 写道:
>>> 
>>> Hello, Pulsar community:
>>> 
>>> Now client consumer `void redeliverUnacknowledgedMessages();` is an
>>> async interface, but it doesn't have the return value. only
>>> `writeAndFlush` the redeliver command then finishes.
>>> 
>>> `ConsumerImpl`:
>>> https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1907-L1909
>>> 
>>> `MultiTopicsConsumerImpl`:
>>> https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L667-L677
>>> 
>>> in the shared subType, I think it doesn't need the response of the
>>> `void redeliverUnacknowledgedMessages()`, and naming the
>>> `redeliverUnacknowledgedMessages` is ok.
>>> 
>>> but in failover and exclusive subType, if we don't get the response,
>>> the user will receive the message from the `incomingQueue` then the
>>> order of the message will be broken.  If the
>>> `redeliverUnacknowledgedMessages` timeout, we should try again. but
>>> `redeliverUnacknowledgedMessages` doesn't throw any exception or
>>> retry. and the `redeliverUnacknowledgedMessages` name is not accurate
>>> for failover and exclusive subType. it is named `rewind` is more
>>> suitable.
>>> 
>>> So I suggest `redeliverUnacknowledgedMessages` be deprecated under
>>> failover and exclusive subType and add a new similar async and sync
>>> method called `rewind` for failover and exclusive subType.
>>> 
>>> Please leave your comments or suggestions, thanks!
>>> 
>>> Thanks,
>>> bo
>> 


Re: [DISCUSS] The use of consumer redeliverUnacknowledgedMessages method

Posted by 丛搏 <co...@gmail.com>.
Hi, Baodi:

I don't think it will confuse users, eg. individual ack and cumulative
ack, in the share subType cumulative ack have no use.
because we have individualAck and cumulativeAck, we should have
different redeliver methods for them.
I think adding new methods makes sense unless we separate cumulative
ack and individual ack into different consumer APIs like
ShareConsumer, and FailoverConsumer, it's just an example.

Thanks,
bo

Baodi Shi <ba...@icloud.com.invalid> 于2022年11月21日周一 22:43写道:
>
> Hi, Congbo.
>
> The subscription type is an internal property of the consumer; If two APIs are provided: redeliverUnacknowledgedMessages(only work on shard) and rewind(only work exclusive and failover), This can be confusing for users.
>
> Back to the issue, depending on the current usage scenario, the redeliverUnacknowledgedMessages method should be synchronous, maybe we can modify it to be synchronous and support retrying.
>
>
>
> Thanks,
> Baodi Shi
>
> > 2022年11月21日 21:03,丛搏 <co...@gmail.com> 写道:
> >
> > Hello, Pulsar community:
> >
> > Now client consumer `void redeliverUnacknowledgedMessages();` is an
> > async interface, but it doesn't have the return value. only
> > `writeAndFlush` the redeliver command then finishes.
> >
> > `ConsumerImpl`:
> > https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1907-L1909
> >
> > `MultiTopicsConsumerImpl`:
> > https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L667-L677
> >
> > in the shared subType, I think it doesn't need the response of the
> > `void redeliverUnacknowledgedMessages()`, and naming the
> > `redeliverUnacknowledgedMessages` is ok.
> >
> > but in failover and exclusive subType, if we don't get the response,
> > the user will receive the message from the `incomingQueue` then the
> > order of the message will be broken.  If the
> > `redeliverUnacknowledgedMessages` timeout, we should try again. but
> > `redeliverUnacknowledgedMessages` doesn't throw any exception or
> > retry. and the `redeliverUnacknowledgedMessages` name is not accurate
> > for failover and exclusive subType. it is named `rewind` is more
> > suitable.
> >
> > So I suggest `redeliverUnacknowledgedMessages` be deprecated under
> > failover and exclusive subType and add a new similar async and sync
> > method called `rewind` for failover and exclusive subType.
> >
> > Please leave your comments or suggestions, thanks!
> >
> > Thanks,
> > bo
>

Re: [DISCUSS] The use of consumer redeliverUnacknowledgedMessages method

Posted by Baodi Shi <ba...@icloud.com.INVALID>.
Hi, Congbo.

The subscription type is an internal property of the consumer; If two APIs are provided: redeliverUnacknowledgedMessages(only work on shard) and rewind(only work exclusive and failover), This can be confusing for users.

Back to the issue, depending on the current usage scenario, the redeliverUnacknowledgedMessages method should be synchronous, maybe we can modify it to be synchronous and support retrying.



Thanks,
Baodi Shi

> 2022年11月21日 21:03,丛搏 <co...@gmail.com> 写道:
> 
> Hello, Pulsar community:
> 
> Now client consumer `void redeliverUnacknowledgedMessages();` is an
> async interface, but it doesn't have the return value. only
> `writeAndFlush` the redeliver command then finishes.
> 
> `ConsumerImpl`:
> https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1907-L1909
> 
> `MultiTopicsConsumerImpl`:
> https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L667-L677
> 
> in the shared subType, I think it doesn't need the response of the
> `void redeliverUnacknowledgedMessages()`, and naming the
> `redeliverUnacknowledgedMessages` is ok.
> 
> but in failover and exclusive subType, if we don't get the response,
> the user will receive the message from the `incomingQueue` then the
> order of the message will be broken.  If the
> `redeliverUnacknowledgedMessages` timeout, we should try again. but
> `redeliverUnacknowledgedMessages` doesn't throw any exception or
> retry. and the `redeliverUnacknowledgedMessages` name is not accurate
> for failover and exclusive subType. it is named `rewind` is more
> suitable.
> 
> So I suggest `redeliverUnacknowledgedMessages` be deprecated under
> failover and exclusive subType and add a new similar async and sync
> method called `rewind` for failover and exclusive subType.
> 
> Please leave your comments or suggestions, thanks!
> 
> Thanks,
> bo


Re: [DISCUSS] The use of consumer redeliverUnacknowledgedMessages method

Posted by 丛搏 <bo...@apache.org>.
Hi, Baodi,

>Alternatively, can it be understood that when a user wants to process messages in an orderly manner, he cannot call the ‘reconumserLater` and 'negativeAcknowledge' >methods?

Yes, I think so. This is what the user needs to think about. We need
to provide use cases, and notes for users on what they should do will
can make messages in order

Thanks,
Bo

Baodi Shi <ba...@icloud.com.invalid> 于2022年12月23日周五 16:24写道:
>
> Hi, Congbo:
>
>
> > In the above two solutions, it can keep messages in order. but in the
> > first solution, we don't know how many messages the user process and
> > then cumulative once. If the message numbers are 10000000, maybe the
> > user can't store the message in the memory for reprocessing. so users
> > need a method to redeliver these messages.
>
>
> I agree to add the rewind interface.
>
>
> > Failover also can be individual ack, so we can't disable
> > `reconumserLate`r and `negativeAcknowledge`.
>
> In the Failover subscription model, What is the individual ack scenario?
>
> Alternatively, can it be understood that when a user wants to process messages in an orderly manner, he cannot call the ‘reconumserLater` and 'negativeAcknowledge' methods?
>
>
> > 2022年12月23日 11:30,丛搏 <bo...@apache.org> 写道:
> >
> > Hi, Asaf, Baodi:
> >
> > I'm very sorry for my late reply. Thanks for your discussion.
> >
> >> - receive() - get the following message
> >>  - cumulativeAck(msgId) - acknowledge all messages up to msgId.
> >>    - Maybe we can try to come up with a self-explanatory name like
> >>    ackAllUpTo(msgId).
> >
> > If the user wants the messages in order, the `receive ()` and
> > `cumulativeAck ()` must be in a single thread. Otherwise, the
> > `cumulativeAck` will lose its meaning.
> >
> > If users use cumulative ack code like:
> > ```
> > while (true) {
> >    Message<String> message = consumer.receive();
> >    process(message);
> >    consumer.acknowledgeCumulative(message.getMessageId());
> > }
> > ```
> > I think it is not a good way for users to use `acknowledgeCumulative
> > `. because one message doesn't need `cumulativeAck`, it's meaningless.
> > They use `acknowledgeCumulative ` should like code:
> > ```
> > while (true) {
> >    Messages<String> messages = consumer.batchReceive();
> >    process(messages);
> >    consumer.acknowledgeCumulative(messages);
> > }
> > ```
> > then we should think about when `process(messages);` throw any
> > exception, the user how to reprocess this message.
> >
> > 1. one case is the user reprocess these messages, the
> > `process(messages)` code like:
> > ```
> > private void process(Messages<String> messages) {
> >    try {
> >        // so something
> >    } catch (Exception e) {
> >        process(messages);
> >    }
> > };
> > ```
> > in this way, the consumer doesn't need to do anything
> >
> > 2. pulsar rewind the cursor, and redeliver these messages
> >
> > ```
> >        while (true) {
> >            Messages<String> messages = consumer.batchReceive();
> >            try {
> >                process(messages);
> >            } catch (Exception e) {
> >
> >                consumer.rewind(); // this method can redeliver the
> > messages, whatever the name of this method. before this method
> > succeeds, the consumer can't invoke consumer.batchReceive() again.
> >                 continue;
> >            }
> >            consumer.acknowledgeCumulative(messages);
> >        }
> > ```
> > int this way, the consumer needs a method that can redeliver these
> > messages, `redeliverUnacknowledgedMessages` is an async method that
> > can't guarantee the messages are in order. so we need a new method,
> > and it is a sync method.
> > <<<<<<<<<<<<<
> >
> > In the above two solutions, it can keep messages in order. but in the
> > first solution, we don't know how many messages the user process and
> > then cumulative once. If the message numbers are 10000000, maybe the
> > user can't store the message in the memory for reprocessing. so users
> > need a method to redeliver these messages.
> >
> > < I think we should disable nack under Exclusive/Failover subscription.
> >
> > Failover also can be individual ack, so we can't disable
> > `reconumserLate`r and `negativeAcknowledge`.
> >
> > Thanks,
> > Bo
> >
> > Asaf Mesika <as...@gmail.com> 于2022年12月18日周日 18:36写道:
> >>
> >> Hi Baodi,
> >>
> >> Yes, in effect, I suggest that we have new Consumer interfaces, one per
> >> subscription type perhaps then we can “correct” the current interface
> >> without breaking backward compatibility.
> >>
> >> For Exclusive/Failover, since the idea in those subscription types was to
> >> maintain order, it makes sense we would offer the following:
> >>
> >>
> >>   - receive() - get the following message
> >>   - cumulativeAck(msgId) - acknowledge all messages up to msgId.
> >>      - Maybe we can try to come up with a self-explanatory name like
> >>      ackAllUpTo(msgId).
> >>
> >>
> >>
> >> Like you I’m interested in knowing what the experienced folks in the
> >> community think about this.
> >>
> >>
> >> On 30 Nov 2022 at 4:43:22, Baodi Shi <ba...@icloud.com.invalid> wrote:
> >>
> >>> Hi, Asaf:
> >>>
> >>> Thank you for the comprehensive summary.
> >>>
> >>> So in effect, what you would have expected here is that nack(4) in
> >>>
> >>> exclusive/shared will happen immediately - clear queue, write redeliver
> >>>
> >>> command to broker async and return immediately, hence next receive() will
> >>>
> >>> block until messages have been received.
> >>>
> >>>
> >>> In this way, the nack interface in Exclusive/Failover subscrptionn is also
> >>> doesn’t make sense.
> >>>
> >>> For 1, 2, 3, 4, 5.
> >>> If message 3 processing fails, the application can choose to wait for a
> >>> period of time to process or directly ack this message (skip this message).
> >>>
> >>> I think we should disable nack under Exclusive/Failover subscription.
> >>>
> >>> 4. reconsumeLater(msg): ack existing message and write it to the same topic
> >>>
> >>> or a different one. This is an explicit out-of-order consumption, but it
> >>>
> >>> can be clearly stated in docs.
> >>>
> >>>
> >>> Same as above, we should also disable it in Exclusive/Failover
> >>> subscription.
> >>>
> >>> I think we should have a different consumer interface holding those
> >>>
> >>> commands above.
> >>>
> >>>
> >>> It's a transformative idea. I'd like +1. See what other contributors think.
> >>>
> >>>
> >>> 2022年11月30日 00:19,Asaf Mesika <as...@gmail.com> 写道:
> >>>
> >>>
> >>> Ok, I'll try to summarize what I read here to make sure we're all on the
> >>>
> >>> same page :)
> >>>
> >>>
> >>> Exclusive and Failover subscription types are subscriptions that guarantee
> >>>
> >>> two things:
> >>>
> >>> 1. Single active consumer per topic (partition).
> >>>
> >>> 2. Message processing in the order they were written to the
> >>>
> >>> topic (partition).
> >>>
> >>>
> >>> (1) is guaranteed by the broker by allowing only a single consumer per
> >>>
> >>> topic.
> >>>
> >>> (2) is guaranteed by the broker. Since we only have a single consumer, the
> >>>
> >>> only thing for the broker to take care of is delivery to messages precisely
> >>>
> >>> in the same order they received.
> >>>
> >>> Normal dispatching dispatches messages in the order written to the topic.
> >>>
> >>> When the consumer calls redeliverUnacknowledgedMessages(), it clears the
> >>>
> >>> incoming queue, and the broker rewinds the cursor to the mark delete
> >>>
> >>> position, disregarding any individual acks done after the mark delete. So
> >>>
> >>> messages are always delivered without any gaps.
> >>>
> >>>
> >>> Since the queue is empty, the next receive() call will block until the
> >>>
> >>> broker redelivers the messages and fills the consumer's internal queue.
> >>>
> >>>
> >>> The problem not raised in this discussion thread is the client
> >>>
> >>> implementation of negativeAcknowledgment().
> >>>
> >>> Negative Acknowledgment in today's implementation
> >>>
> >>>
> >>> Adds the negatively-acked message into the NegativeAckTracker, and sets a
> >>>
> >>> timer, if not already present, to send all pending acks in X seconds. Once
> >>>
> >>> that time is up, it sees that negative ack belongs on an Exclusive/Failover
> >>>
> >>> subscription type and hence translates that into
> >>>
> >>> redeliverUnacknowledgedMessages(). So in X seconds, it clears the queue and
> >>>
> >>> asks for messages to be redelivered. Since adding to NegativeAckTracker is
> >>>
> >>> an immediate action (add a message to the queue and return), it just
> >>>
> >>> returns. If you receive() 1,2,3, call nack(4) and then receive() and get
> >>>
> >>> 4,5,6,7,... After X seconds pass, your next receive suddenly gives you
> >>>
> >>> 4,5,6 again.
> >>>
> >>>
> >>> So in effect, what you would have expected here is that nack(4) in
> >>>
> >>> exclusive/shared will happen immediately - clear queue, write redeliver
> >>>
> >>> command to broker async and return immediately, hence next receive() will
> >>>
> >>> block until messages have been received.
> >>>
> >>>
> >>>
> >>> I do side with the suggestion to change the API for exclusive / shared to
> >>>
> >>> be more clear.
> >>>
> >>> In those types of subscriptions, it seems that the only actions you are
> >>>
> >>> supposed to do are:
> >>>
> >>>
> >>> 1. receive(): get the next message.
> >>>
> >>> 2. cumulativeAck(msg): acknowledge all messages up to msg have been
> >>>
> >>> successfully processed.
> >>>
> >>> 3. redeliverUnacknowledgedMessages() - clear the internal queue and ask the
> >>>
> >>> broker to resend messages from the last mark delete position.
> >>>
> >>>
> >>> There is one additional action in which you explicitly push the messages to
> >>>
> >>> a different topic or even the same topic, and that is:
> >>>
> >>> 4. reconsumeLater(msg): ack existing message and write it to the same topic
> >>>
> >>> or a different one. This is an explicit out-of-order consumption, but it
> >>>
> >>> can be clearly stated in docs.
> >>>
> >>>
> >>> I think we should have a different consumer interface holding those
> >>>
> >>> commands above.
> >>>
> >>>
> >>>
> >>>
> >>> On Thu, Nov 24, 2022 at 1:43 PM 丛搏 <co...@gmail.com> wrote:
> >>>
> >>>
> >>>> Hi, Joe:
> >>>
> >>>>
> >>>
> >>>>> This "brokenness" is not clear to me.
> >>>
> >>>> https://github.com/apache/pulsar/pull/10478 This PIP solves some
> >>>
> >>>> problems of "brokenness",
> >>>
> >>>>> The sequence 3,4,5,6,7,8,9,10,11 12,13,14,15, 16
> >>>
> >>>> ,9,10,11,12,13,14,15,16,17, 18, 19, 20 ...does not break
> >>>
> >>>>> the ordering guarantees of Pulsar
> >>>
> >>>> If don't use transaction ack, this order is fine. but when we use
> >>>
> >>>> transaction ack, in this case, message 9 and message 10 will be
> >>>
> >>>> handled twice. Therefore, we need redeliver and receive to be
> >>>
> >>>> synchronized to ensure that messages received before redeliver will
> >>>
> >>>> not be repeated and ordered, and will not be repeatedly consumed after
> >>>
> >>>> redeliver. To achieve these goals, we need to redeliver to be a
> >>>
> >>>> synchronous method instead of async and need to retry automatically.
> >>>
> >>>>
> >>>
> >>>
> >>>
>

Re: [DISCUSS] The use of consumer redeliverUnacknowledgedMessages method

Posted by Baodi Shi <ba...@icloud.com.INVALID>.
Hi, Congbo:


> In the above two solutions, it can keep messages in order. but in the
> first solution, we don't know how many messages the user process and
> then cumulative once. If the message numbers are 10000000, maybe the
> user can't store the message in the memory for reprocessing. so users
> need a method to redeliver these messages.


I agree to add the rewind interface.


> Failover also can be individual ack, so we can't disable
> `reconumserLate`r and `negativeAcknowledge`.

In the Failover subscription model, What is the individual ack scenario?

Alternatively, can it be understood that when a user wants to process messages in an orderly manner, he cannot call the ‘reconumserLater` and 'negativeAcknowledge' methods?


> 2022年12月23日 11:30,丛搏 <bo...@apache.org> 写道:
> 
> Hi, Asaf, Baodi:
> 
> I'm very sorry for my late reply. Thanks for your discussion.
> 
>> - receive() - get the following message
>>  - cumulativeAck(msgId) - acknowledge all messages up to msgId.
>>    - Maybe we can try to come up with a self-explanatory name like
>>    ackAllUpTo(msgId).
> 
> If the user wants the messages in order, the `receive ()` and
> `cumulativeAck ()` must be in a single thread. Otherwise, the
> `cumulativeAck` will lose its meaning.
> 
> If users use cumulative ack code like:
> ```
> while (true) {
>    Message<String> message = consumer.receive();
>    process(message);
>    consumer.acknowledgeCumulative(message.getMessageId());
> }
> ```
> I think it is not a good way for users to use `acknowledgeCumulative
> `. because one message doesn't need `cumulativeAck`, it's meaningless.
> They use `acknowledgeCumulative ` should like code:
> ```
> while (true) {
>    Messages<String> messages = consumer.batchReceive();
>    process(messages);
>    consumer.acknowledgeCumulative(messages);
> }
> ```
> then we should think about when `process(messages);` throw any
> exception, the user how to reprocess this message.
> 
> 1. one case is the user reprocess these messages, the
> `process(messages)` code like:
> ```
> private void process(Messages<String> messages) {
>    try {
>        // so something
>    } catch (Exception e) {
>        process(messages);
>    }
> };
> ```
> in this way, the consumer doesn't need to do anything
> 
> 2. pulsar rewind the cursor, and redeliver these messages
> 
> ```
>        while (true) {
>            Messages<String> messages = consumer.batchReceive();
>            try {
>                process(messages);
>            } catch (Exception e) {
> 
>                consumer.rewind(); // this method can redeliver the
> messages, whatever the name of this method. before this method
> succeeds, the consumer can't invoke consumer.batchReceive() again.
>                 continue;
>            }
>            consumer.acknowledgeCumulative(messages);
>        }
> ```
> int this way, the consumer needs a method that can redeliver these
> messages, `redeliverUnacknowledgedMessages` is an async method that
> can't guarantee the messages are in order. so we need a new method,
> and it is a sync method.
> <<<<<<<<<<<<<
> 
> In the above two solutions, it can keep messages in order. but in the
> first solution, we don't know how many messages the user process and
> then cumulative once. If the message numbers are 10000000, maybe the
> user can't store the message in the memory for reprocessing. so users
> need a method to redeliver these messages.
> 
> < I think we should disable nack under Exclusive/Failover subscription.
> 
> Failover also can be individual ack, so we can't disable
> `reconumserLate`r and `negativeAcknowledge`.
> 
> Thanks,
> Bo
> 
> Asaf Mesika <as...@gmail.com> 于2022年12月18日周日 18:36写道:
>> 
>> Hi Baodi,
>> 
>> Yes, in effect, I suggest that we have new Consumer interfaces, one per
>> subscription type perhaps then we can “correct” the current interface
>> without breaking backward compatibility.
>> 
>> For Exclusive/Failover, since the idea in those subscription types was to
>> maintain order, it makes sense we would offer the following:
>> 
>> 
>>   - receive() - get the following message
>>   - cumulativeAck(msgId) - acknowledge all messages up to msgId.
>>      - Maybe we can try to come up with a self-explanatory name like
>>      ackAllUpTo(msgId).
>> 
>> 
>> 
>> Like you I’m interested in knowing what the experienced folks in the
>> community think about this.
>> 
>> 
>> On 30 Nov 2022 at 4:43:22, Baodi Shi <ba...@icloud.com.invalid> wrote:
>> 
>>> Hi, Asaf:
>>> 
>>> Thank you for the comprehensive summary.
>>> 
>>> So in effect, what you would have expected here is that nack(4) in
>>> 
>>> exclusive/shared will happen immediately - clear queue, write redeliver
>>> 
>>> command to broker async and return immediately, hence next receive() will
>>> 
>>> block until messages have been received.
>>> 
>>> 
>>> In this way, the nack interface in Exclusive/Failover subscrptionn is also
>>> doesn’t make sense.
>>> 
>>> For 1, 2, 3, 4, 5.
>>> If message 3 processing fails, the application can choose to wait for a
>>> period of time to process or directly ack this message (skip this message).
>>> 
>>> I think we should disable nack under Exclusive/Failover subscription.
>>> 
>>> 4. reconsumeLater(msg): ack existing message and write it to the same topic
>>> 
>>> or a different one. This is an explicit out-of-order consumption, but it
>>> 
>>> can be clearly stated in docs.
>>> 
>>> 
>>> Same as above, we should also disable it in Exclusive/Failover
>>> subscription.
>>> 
>>> I think we should have a different consumer interface holding those
>>> 
>>> commands above.
>>> 
>>> 
>>> It's a transformative idea. I'd like +1. See what other contributors think.
>>> 
>>> 
>>> 2022年11月30日 00:19,Asaf Mesika <as...@gmail.com> 写道:
>>> 
>>> 
>>> Ok, I'll try to summarize what I read here to make sure we're all on the
>>> 
>>> same page :)
>>> 
>>> 
>>> Exclusive and Failover subscription types are subscriptions that guarantee
>>> 
>>> two things:
>>> 
>>> 1. Single active consumer per topic (partition).
>>> 
>>> 2. Message processing in the order they were written to the
>>> 
>>> topic (partition).
>>> 
>>> 
>>> (1) is guaranteed by the broker by allowing only a single consumer per
>>> 
>>> topic.
>>> 
>>> (2) is guaranteed by the broker. Since we only have a single consumer, the
>>> 
>>> only thing for the broker to take care of is delivery to messages precisely
>>> 
>>> in the same order they received.
>>> 
>>> Normal dispatching dispatches messages in the order written to the topic.
>>> 
>>> When the consumer calls redeliverUnacknowledgedMessages(), it clears the
>>> 
>>> incoming queue, and the broker rewinds the cursor to the mark delete
>>> 
>>> position, disregarding any individual acks done after the mark delete. So
>>> 
>>> messages are always delivered without any gaps.
>>> 
>>> 
>>> Since the queue is empty, the next receive() call will block until the
>>> 
>>> broker redelivers the messages and fills the consumer's internal queue.
>>> 
>>> 
>>> The problem not raised in this discussion thread is the client
>>> 
>>> implementation of negativeAcknowledgment().
>>> 
>>> Negative Acknowledgment in today's implementation
>>> 
>>> 
>>> Adds the negatively-acked message into the NegativeAckTracker, and sets a
>>> 
>>> timer, if not already present, to send all pending acks in X seconds. Once
>>> 
>>> that time is up, it sees that negative ack belongs on an Exclusive/Failover
>>> 
>>> subscription type and hence translates that into
>>> 
>>> redeliverUnacknowledgedMessages(). So in X seconds, it clears the queue and
>>> 
>>> asks for messages to be redelivered. Since adding to NegativeAckTracker is
>>> 
>>> an immediate action (add a message to the queue and return), it just
>>> 
>>> returns. If you receive() 1,2,3, call nack(4) and then receive() and get
>>> 
>>> 4,5,6,7,... After X seconds pass, your next receive suddenly gives you
>>> 
>>> 4,5,6 again.
>>> 
>>> 
>>> So in effect, what you would have expected here is that nack(4) in
>>> 
>>> exclusive/shared will happen immediately - clear queue, write redeliver
>>> 
>>> command to broker async and return immediately, hence next receive() will
>>> 
>>> block until messages have been received.
>>> 
>>> 
>>> 
>>> I do side with the suggestion to change the API for exclusive / shared to
>>> 
>>> be more clear.
>>> 
>>> In those types of subscriptions, it seems that the only actions you are
>>> 
>>> supposed to do are:
>>> 
>>> 
>>> 1. receive(): get the next message.
>>> 
>>> 2. cumulativeAck(msg): acknowledge all messages up to msg have been
>>> 
>>> successfully processed.
>>> 
>>> 3. redeliverUnacknowledgedMessages() - clear the internal queue and ask the
>>> 
>>> broker to resend messages from the last mark delete position.
>>> 
>>> 
>>> There is one additional action in which you explicitly push the messages to
>>> 
>>> a different topic or even the same topic, and that is:
>>> 
>>> 4. reconsumeLater(msg): ack existing message and write it to the same topic
>>> 
>>> or a different one. This is an explicit out-of-order consumption, but it
>>> 
>>> can be clearly stated in docs.
>>> 
>>> 
>>> I think we should have a different consumer interface holding those
>>> 
>>> commands above.
>>> 
>>> 
>>> 
>>> 
>>> On Thu, Nov 24, 2022 at 1:43 PM 丛搏 <co...@gmail.com> wrote:
>>> 
>>> 
>>>> Hi, Joe:
>>> 
>>>> 
>>> 
>>>>> This "brokenness" is not clear to me.
>>> 
>>>> https://github.com/apache/pulsar/pull/10478 This PIP solves some
>>> 
>>>> problems of "brokenness",
>>> 
>>>>> The sequence 3,4,5,6,7,8,9,10,11 12,13,14,15, 16
>>> 
>>>> ,9,10,11,12,13,14,15,16,17, 18, 19, 20 ...does not break
>>> 
>>>>> the ordering guarantees of Pulsar
>>> 
>>>> If don't use transaction ack, this order is fine. but when we use
>>> 
>>>> transaction ack, in this case, message 9 and message 10 will be
>>> 
>>>> handled twice. Therefore, we need redeliver and receive to be
>>> 
>>>> synchronized to ensure that messages received before redeliver will
>>> 
>>>> not be repeated and ordered, and will not be repeatedly consumed after
>>> 
>>>> redeliver. To achieve these goals, we need to redeliver to be a
>>> 
>>>> synchronous method instead of async and need to retry automatically.
>>> 
>>>> 
>>> 
>>> 
>>> 


Re: [DISCUSS] The use of consumer redeliverUnacknowledgedMessages method

Posted by 丛搏 <bo...@apache.org>.
Hi, Asaf, Baodi:

I'm very sorry for my late reply. Thanks for your discussion.

> - receive() - get the following message
>   - cumulativeAck(msgId) - acknowledge all messages up to msgId.
>     - Maybe we can try to come up with a self-explanatory name like
>     ackAllUpTo(msgId).

If the user wants the messages in order, the `receive ()` and
`cumulativeAck ()` must be in a single thread. Otherwise, the
`cumulativeAck` will lose its meaning.

If users use cumulative ack code like:
```
while (true) {
    Message<String> message = consumer.receive();
    process(message);
    consumer.acknowledgeCumulative(message.getMessageId());
}
```
I think it is not a good way for users to use `acknowledgeCumulative
`. because one message doesn't need `cumulativeAck`, it's meaningless.
They use `acknowledgeCumulative ` should like code:
```
while (true) {
    Messages<String> messages = consumer.batchReceive();
    process(messages);
    consumer.acknowledgeCumulative(messages);
}
```
then we should think about when `process(messages);` throw any
exception, the user how to reprocess this message.

1. one case is the user reprocess these messages, the
`process(messages)` code like:
```
private void process(Messages<String> messages) {
    try {
        // so something
    } catch (Exception e) {
        process(messages);
    }
};
```
in this way, the consumer doesn't need to do anything

2. pulsar rewind the cursor, and redeliver these messages

```
        while (true) {
            Messages<String> messages = consumer.batchReceive();
            try {
                process(messages);
            } catch (Exception e) {

                consumer.rewind(); // this method can redeliver the
messages, whatever the name of this method. before this method
succeeds, the consumer can't invoke consumer.batchReceive() again.
                 continue;
            }
            consumer.acknowledgeCumulative(messages);
        }
```
int this way, the consumer needs a method that can redeliver these
messages, `redeliverUnacknowledgedMessages` is an async method that
can't guarantee the messages are in order. so we need a new method,
and it is a sync method.
<<<<<<<<<<<<<

In the above two solutions, it can keep messages in order. but in the
first solution, we don't know how many messages the user process and
then cumulative once. If the message numbers are 10000000, maybe the
user can't store the message in the memory for reprocessing. so users
need a method to redeliver these messages.

< I think we should disable nack under Exclusive/Failover subscription.

Failover also can be individual ack, so we can't disable
`reconumserLate`r and `negativeAcknowledge`.

Thanks,
Bo

Asaf Mesika <as...@gmail.com> 于2022年12月18日周日 18:36写道:
>
> Hi Baodi,
>
> Yes, in effect, I suggest that we have new Consumer interfaces, one per
> subscription type perhaps then we can “correct” the current interface
> without breaking backward compatibility.
>
> For Exclusive/Failover, since the idea in those subscription types was to
> maintain order, it makes sense we would offer the following:
>
>
>    - receive() - get the following message
>    - cumulativeAck(msgId) - acknowledge all messages up to msgId.
>       - Maybe we can try to come up with a self-explanatory name like
>       ackAllUpTo(msgId).
>
>
>
> Like you I’m interested in knowing what the experienced folks in the
> community think about this.
>
>
> On 30 Nov 2022 at 4:43:22, Baodi Shi <ba...@icloud.com.invalid> wrote:
>
> > Hi, Asaf:
> >
> > Thank you for the comprehensive summary.
> >
> > So in effect, what you would have expected here is that nack(4) in
> >
> > exclusive/shared will happen immediately - clear queue, write redeliver
> >
> > command to broker async and return immediately, hence next receive() will
> >
> > block until messages have been received.
> >
> >
> > In this way, the nack interface in Exclusive/Failover subscrptionn is also
> > doesn’t make sense.
> >
> > For 1, 2, 3, 4, 5.
> > If message 3 processing fails, the application can choose to wait for a
> > period of time to process or directly ack this message (skip this message).
> >
> > I think we should disable nack under Exclusive/Failover subscription.
> >
> > 4. reconsumeLater(msg): ack existing message and write it to the same topic
> >
> > or a different one. This is an explicit out-of-order consumption, but it
> >
> > can be clearly stated in docs.
> >
> >
> > Same as above, we should also disable it in Exclusive/Failover
> > subscription.
> >
> > I think we should have a different consumer interface holding those
> >
> > commands above.
> >
> >
> > It's a transformative idea. I'd like +1. See what other contributors think.
> >
> >
> > 2022年11月30日 00:19,Asaf Mesika <as...@gmail.com> 写道:
> >
> >
> > Ok, I'll try to summarize what I read here to make sure we're all on the
> >
> > same page :)
> >
> >
> > Exclusive and Failover subscription types are subscriptions that guarantee
> >
> > two things:
> >
> > 1. Single active consumer per topic (partition).
> >
> > 2. Message processing in the order they were written to the
> >
> > topic (partition).
> >
> >
> > (1) is guaranteed by the broker by allowing only a single consumer per
> >
> > topic.
> >
> > (2) is guaranteed by the broker. Since we only have a single consumer, the
> >
> > only thing for the broker to take care of is delivery to messages precisely
> >
> > in the same order they received.
> >
> > Normal dispatching dispatches messages in the order written to the topic.
> >
> > When the consumer calls redeliverUnacknowledgedMessages(), it clears the
> >
> > incoming queue, and the broker rewinds the cursor to the mark delete
> >
> > position, disregarding any individual acks done after the mark delete. So
> >
> > messages are always delivered without any gaps.
> >
> >
> > Since the queue is empty, the next receive() call will block until the
> >
> > broker redelivers the messages and fills the consumer's internal queue.
> >
> >
> > The problem not raised in this discussion thread is the client
> >
> > implementation of negativeAcknowledgment().
> >
> > Negative Acknowledgment in today's implementation
> >
> >
> > Adds the negatively-acked message into the NegativeAckTracker, and sets a
> >
> > timer, if not already present, to send all pending acks in X seconds. Once
> >
> > that time is up, it sees that negative ack belongs on an Exclusive/Failover
> >
> > subscription type and hence translates that into
> >
> > redeliverUnacknowledgedMessages(). So in X seconds, it clears the queue and
> >
> > asks for messages to be redelivered. Since adding to NegativeAckTracker is
> >
> > an immediate action (add a message to the queue and return), it just
> >
> > returns. If you receive() 1,2,3, call nack(4) and then receive() and get
> >
> > 4,5,6,7,... After X seconds pass, your next receive suddenly gives you
> >
> > 4,5,6 again.
> >
> >
> > So in effect, what you would have expected here is that nack(4) in
> >
> > exclusive/shared will happen immediately - clear queue, write redeliver
> >
> > command to broker async and return immediately, hence next receive() will
> >
> > block until messages have been received.
> >
> >
> >
> > I do side with the suggestion to change the API for exclusive / shared to
> >
> > be more clear.
> >
> > In those types of subscriptions, it seems that the only actions you are
> >
> > supposed to do are:
> >
> >
> > 1. receive(): get the next message.
> >
> > 2. cumulativeAck(msg): acknowledge all messages up to msg have been
> >
> > successfully processed.
> >
> > 3. redeliverUnacknowledgedMessages() - clear the internal queue and ask the
> >
> > broker to resend messages from the last mark delete position.
> >
> >
> > There is one additional action in which you explicitly push the messages to
> >
> > a different topic or even the same topic, and that is:
> >
> > 4. reconsumeLater(msg): ack existing message and write it to the same topic
> >
> > or a different one. This is an explicit out-of-order consumption, but it
> >
> > can be clearly stated in docs.
> >
> >
> > I think we should have a different consumer interface holding those
> >
> > commands above.
> >
> >
> >
> >
> > On Thu, Nov 24, 2022 at 1:43 PM 丛搏 <co...@gmail.com> wrote:
> >
> >
> > > Hi, Joe:
> >
> > >
> >
> > >> This "brokenness" is not clear to me.
> >
> > > https://github.com/apache/pulsar/pull/10478 This PIP solves some
> >
> > > problems of "brokenness",
> >
> > >> The sequence 3,4,5,6,7,8,9,10,11 12,13,14,15, 16
> >
> > > ,9,10,11,12,13,14,15,16,17, 18, 19, 20 ...does not break
> >
> > >> the ordering guarantees of Pulsar
> >
> > > If don't use transaction ack, this order is fine. but when we use
> >
> > > transaction ack, in this case, message 9 and message 10 will be
> >
> > > handled twice. Therefore, we need redeliver and receive to be
> >
> > > synchronized to ensure that messages received before redeliver will
> >
> > > not be repeated and ordered, and will not be repeatedly consumed after
> >
> > > redeliver. To achieve these goals, we need to redeliver to be a
> >
> > > synchronous method instead of async and need to retry automatically.
> >
> > >
> >
> >
> >

Re: [DISCUSS] The use of consumer redeliverUnacknowledgedMessages method

Posted by Asaf Mesika <as...@gmail.com>.
Hi Baodi,

Yes, in effect, I suggest that we have new Consumer interfaces, one per
subscription type perhaps then we can “correct” the current interface
without breaking backward compatibility.

For Exclusive/Failover, since the idea in those subscription types was to
maintain order, it makes sense we would offer the following:


   - receive() - get the following message
   - cumulativeAck(msgId) - acknowledge all messages up to msgId.
      - Maybe we can try to come up with a self-explanatory name like
      ackAllUpTo(msgId).



Like you I’m interested in knowing what the experienced folks in the
community think about this.


On 30 Nov 2022 at 4:43:22, Baodi Shi <ba...@icloud.com.invalid> wrote:

> Hi, Asaf:
>
> Thank you for the comprehensive summary.
>
> So in effect, what you would have expected here is that nack(4) in
>
> exclusive/shared will happen immediately - clear queue, write redeliver
>
> command to broker async and return immediately, hence next receive() will
>
> block until messages have been received.
>
>
> In this way, the nack interface in Exclusive/Failover subscrptionn is also
> doesn’t make sense.
>
> For 1, 2, 3, 4, 5.
> If message 3 processing fails, the application can choose to wait for a
> period of time to process or directly ack this message (skip this message).
>
> I think we should disable nack under Exclusive/Failover subscription.
>
> 4. reconsumeLater(msg): ack existing message and write it to the same topic
>
> or a different one. This is an explicit out-of-order consumption, but it
>
> can be clearly stated in docs.
>
>
> Same as above, we should also disable it in Exclusive/Failover
> subscription.
>
> I think we should have a different consumer interface holding those
>
> commands above.
>
>
> It's a transformative idea. I'd like +1. See what other contributors think.
>
>
> 2022年11月30日 00:19,Asaf Mesika <as...@gmail.com> 写道:
>
>
> Ok, I'll try to summarize what I read here to make sure we're all on the
>
> same page :)
>
>
> Exclusive and Failover subscription types are subscriptions that guarantee
>
> two things:
>
> 1. Single active consumer per topic (partition).
>
> 2. Message processing in the order they were written to the
>
> topic (partition).
>
>
> (1) is guaranteed by the broker by allowing only a single consumer per
>
> topic.
>
> (2) is guaranteed by the broker. Since we only have a single consumer, the
>
> only thing for the broker to take care of is delivery to messages precisely
>
> in the same order they received.
>
> Normal dispatching dispatches messages in the order written to the topic.
>
> When the consumer calls redeliverUnacknowledgedMessages(), it clears the
>
> incoming queue, and the broker rewinds the cursor to the mark delete
>
> position, disregarding any individual acks done after the mark delete. So
>
> messages are always delivered without any gaps.
>
>
> Since the queue is empty, the next receive() call will block until the
>
> broker redelivers the messages and fills the consumer's internal queue.
>
>
> The problem not raised in this discussion thread is the client
>
> implementation of negativeAcknowledgment().
>
> Negative Acknowledgment in today's implementation
>
>
> Adds the negatively-acked message into the NegativeAckTracker, and sets a
>
> timer, if not already present, to send all pending acks in X seconds. Once
>
> that time is up, it sees that negative ack belongs on an Exclusive/Failover
>
> subscription type and hence translates that into
>
> redeliverUnacknowledgedMessages(). So in X seconds, it clears the queue and
>
> asks for messages to be redelivered. Since adding to NegativeAckTracker is
>
> an immediate action (add a message to the queue and return), it just
>
> returns. If you receive() 1,2,3, call nack(4) and then receive() and get
>
> 4,5,6,7,... After X seconds pass, your next receive suddenly gives you
>
> 4,5,6 again.
>
>
> So in effect, what you would have expected here is that nack(4) in
>
> exclusive/shared will happen immediately - clear queue, write redeliver
>
> command to broker async and return immediately, hence next receive() will
>
> block until messages have been received.
>
>
>
> I do side with the suggestion to change the API for exclusive / shared to
>
> be more clear.
>
> In those types of subscriptions, it seems that the only actions you are
>
> supposed to do are:
>
>
> 1. receive(): get the next message.
>
> 2. cumulativeAck(msg): acknowledge all messages up to msg have been
>
> successfully processed.
>
> 3. redeliverUnacknowledgedMessages() - clear the internal queue and ask the
>
> broker to resend messages from the last mark delete position.
>
>
> There is one additional action in which you explicitly push the messages to
>
> a different topic or even the same topic, and that is:
>
> 4. reconsumeLater(msg): ack existing message and write it to the same topic
>
> or a different one. This is an explicit out-of-order consumption, but it
>
> can be clearly stated in docs.
>
>
> I think we should have a different consumer interface holding those
>
> commands above.
>
>
>
>
> On Thu, Nov 24, 2022 at 1:43 PM 丛搏 <co...@gmail.com> wrote:
>
>
> > Hi, Joe:
>
> >
>
> >> This "brokenness" is not clear to me.
>
> > https://github.com/apache/pulsar/pull/10478 This PIP solves some
>
> > problems of "brokenness",
>
> >> The sequence 3,4,5,6,7,8,9,10,11 12,13,14,15, 16
>
> > ,9,10,11,12,13,14,15,16,17, 18, 19, 20 ...does not break
>
> >> the ordering guarantees of Pulsar
>
> > If don't use transaction ack, this order is fine. but when we use
>
> > transaction ack, in this case, message 9 and message 10 will be
>
> > handled twice. Therefore, we need redeliver and receive to be
>
> > synchronized to ensure that messages received before redeliver will
>
> > not be repeated and ordered, and will not be repeatedly consumed after
>
> > redeliver. To achieve these goals, we need to redeliver to be a
>
> > synchronous method instead of async and need to retry automatically.
>
> >
>
>
>

Re: [DISCUSS] The use of consumer redeliverUnacknowledgedMessages method

Posted by Baodi Shi <ba...@icloud.com.INVALID>.
Hi, Asaf:

Thank you for the comprehensive summary.

> So in effect, what you would have expected here is that nack(4) in
> exclusive/shared will happen immediately - clear queue, write redeliver
> command to broker async and return immediately, hence next receive() will
> block until messages have been received.

In this way, the nack interface in Exclusive/Failover subscrptionn is also doesn’t make sense.

For 1, 2, 3, 4, 5.
If message 3 processing fails, the application can choose to wait for a period of time to process or directly ack this message (skip this message).

I think we should disable nack under Exclusive/Failover subscription.

> 4. reconsumeLater(msg): ack existing message and write it to the same topic
> or a different one. This is an explicit out-of-order consumption, but it
> can be clearly stated in docs.

Same as above, we should also disable it in Exclusive/Failover subscription.

> I think we should have a different consumer interface holding those
> commands above.

It's a transformative idea. I'd like +1. See what other contributors think.


> 2022年11月30日 00:19,Asaf Mesika <as...@gmail.com> 写道:
> 
> Ok, I'll try to summarize what I read here to make sure we're all on the
> same page :)
> 
> Exclusive and Failover subscription types are subscriptions that guarantee
> two things:
> 1. Single active consumer per topic (partition).
> 2. Message processing in the order they were written to the
> topic (partition).
> 
> (1) is guaranteed by the broker by allowing only a single consumer per
> topic.
> (2) is guaranteed by the broker. Since we only have a single consumer, the
> only thing for the broker to take care of is delivery to messages precisely
> in the same order they received.
> Normal dispatching dispatches messages in the order written to the topic.
> When the consumer calls redeliverUnacknowledgedMessages(), it clears the
> incoming queue, and the broker rewinds the cursor to the mark delete
> position, disregarding any individual acks done after the mark delete. So
> messages are always delivered without any gaps.
> 
> Since the queue is empty, the next receive() call will block until the
> broker redelivers the messages and fills the consumer's internal queue.
> 
> The problem not raised in this discussion thread is the client
> implementation of negativeAcknowledgment().
> Negative Acknowledgment in today's implementation
> 
> Adds the negatively-acked message into the NegativeAckTracker, and sets a
> timer, if not already present, to send all pending acks in X seconds. Once
> that time is up, it sees that negative ack belongs on an Exclusive/Failover
> subscription type and hence translates that into
> redeliverUnacknowledgedMessages(). So in X seconds, it clears the queue and
> asks for messages to be redelivered. Since adding to NegativeAckTracker is
> an immediate action (add a message to the queue and return), it just
> returns. If you receive() 1,2,3, call nack(4) and then receive() and get
> 4,5,6,7,... After X seconds pass, your next receive suddenly gives you
> 4,5,6 again.
> 
> So in effect, what you would have expected here is that nack(4) in
> exclusive/shared will happen immediately - clear queue, write redeliver
> command to broker async and return immediately, hence next receive() will
> block until messages have been received.
> 
> 
> I do side with the suggestion to change the API for exclusive / shared to
> be more clear.
> In those types of subscriptions, it seems that the only actions you are
> supposed to do are:
> 
> 1. receive(): get the next message.
> 2. cumulativeAck(msg): acknowledge all messages up to msg have been
> successfully processed.
> 3. redeliverUnacknowledgedMessages() - clear the internal queue and ask the
> broker to resend messages from the last mark delete position.
> 
> There is one additional action in which you explicitly push the messages to
> a different topic or even the same topic, and that is:
> 4. reconsumeLater(msg): ack existing message and write it to the same topic
> or a different one. This is an explicit out-of-order consumption, but it
> can be clearly stated in docs.
> 
> I think we should have a different consumer interface holding those
> commands above.
> 
> 
> 
> On Thu, Nov 24, 2022 at 1:43 PM 丛搏 <co...@gmail.com> wrote:
> 
>> Hi, Joe:
>> 
>>> This "brokenness" is not clear to me.
>> https://github.com/apache/pulsar/pull/10478 This PIP solves some
>> problems of "brokenness",
>>> The sequence 3,4,5,6,7,8,9,10,11 12,13,14,15, 16
>> ,9,10,11,12,13,14,15,16,17, 18, 19, 20 ...does not break
>>> the ordering guarantees of Pulsar
>> If don't use transaction ack, this order is fine. but when we use
>> transaction ack, in this case, message 9 and message 10 will be
>> handled twice. Therefore, we need redeliver and receive to be
>> synchronized to ensure that messages received before redeliver will
>> not be repeated and ordered, and will not be repeatedly consumed after
>> redeliver. To achieve these goals, we need to redeliver to be a
>> synchronous method instead of async and need to retry automatically.
>> 


Re: [DISCUSS] The use of consumer redeliverUnacknowledgedMessages method

Posted by Asaf Mesika <as...@gmail.com>.
Ok, I'll try to summarize what I read here to make sure we're all on the
same page :)

Exclusive and Failover subscription types are subscriptions that guarantee
two things:
1. Single active consumer per topic (partition).
2. Message processing in the order they were written to the
topic (partition).

(1) is guaranteed by the broker by allowing only a single consumer per
topic.
(2) is guaranteed by the broker. Since we only have a single consumer, the
only thing for the broker to take care of is delivery to messages precisely
in the same order they received.
Normal dispatching dispatches messages in the order written to the topic.
When the consumer calls redeliverUnacknowledgedMessages(), it clears the
incoming queue, and the broker rewinds the cursor to the mark delete
position, disregarding any individual acks done after the mark delete. So
messages are always delivered without any gaps.

Since the queue is empty, the next receive() call will block until the
broker redelivers the messages and fills the consumer's internal queue.

The problem not raised in this discussion thread is the client
implementation of negativeAcknowledgment().
Negative Acknowledgment in today's implementation

Adds the negatively-acked message into the NegativeAckTracker, and sets a
timer, if not already present, to send all pending acks in X seconds. Once
that time is up, it sees that negative ack belongs on an Exclusive/Failover
subscription type and hence translates that into
redeliverUnacknowledgedMessages(). So in X seconds, it clears the queue and
asks for messages to be redelivered. Since adding to NegativeAckTracker is
an immediate action (add a message to the queue and return), it just
returns. If you receive() 1,2,3, call nack(4) and then receive() and get
4,5,6,7,... After X seconds pass, your next receive suddenly gives you
4,5,6 again.

So in effect, what you would have expected here is that nack(4) in
exclusive/shared will happen immediately - clear queue, write redeliver
command to broker async and return immediately, hence next receive() will
block until messages have been received.


I do side with the suggestion to change the API for exclusive / shared to
be more clear.
In those types of subscriptions, it seems that the only actions you are
supposed to do are:

1. receive(): get the next message.
2. cumulativeAck(msg): acknowledge all messages up to msg have been
successfully processed.
3. redeliverUnacknowledgedMessages() - clear the internal queue and ask the
broker to resend messages from the last mark delete position.

There is one additional action in which you explicitly push the messages to
a different topic or even the same topic, and that is:
4. reconsumeLater(msg): ack existing message and write it to the same topic
or a different one. This is an explicit out-of-order consumption, but it
can be clearly stated in docs.

I think we should have a different consumer interface holding those
commands above.



On Thu, Nov 24, 2022 at 1:43 PM 丛搏 <co...@gmail.com> wrote:

> Hi, Joe:
>
> > This "brokenness" is not clear to me.
> https://github.com/apache/pulsar/pull/10478 This PIP solves some
> problems of "brokenness",
> >The sequence 3,4,5,6,7,8,9,10,11 12,13,14,15, 16
> ,9,10,11,12,13,14,15,16,17, 18, 19, 20 ...does not break
> > the ordering guarantees of Pulsar
> If don't use transaction ack, this order is fine. but when we use
> transaction ack, in this case, message 9 and message 10 will be
> handled twice. Therefore, we need redeliver and receive to be
> synchronized to ensure that messages received before redeliver will
> not be repeated and ordered, and will not be repeatedly consumed after
> redeliver. To achieve these goals, we need to redeliver to be a
> synchronous method instead of async and need to retry automatically.
>

Re: [DISCUSS] The use of consumer redeliverUnacknowledgedMessages method

Posted by 丛搏 <co...@gmail.com>.
Hi, Joe:

> This "brokenness" is not clear to me.
https://github.com/apache/pulsar/pull/10478 This PIP solves some
problems of "brokenness",
>The sequence 3,4,5,6,7,8,9,10,11 12,13,14,15, 16 ,9,10,11,12,13,14,15,16,17, 18, 19, 20 ...does not break
> the ordering guarantees of Pulsar
If don't use transaction ack, this order is fine. but when we use
transaction ack, in this case, message 9 and message 10 will be
handled twice. Therefore, we need redeliver and receive to be
synchronized to ensure that messages received before redeliver will
not be repeated and ordered, and will not be repeatedly consumed after
redeliver. To achieve these goals, we need to redeliver to be a
synchronous method instead of async and need to retry automatically.

Re: [DISCUSS] The use of consumer redeliverUnacknowledgedMessages method

Posted by Joe F <jo...@gmail.com>.
I am not familiar with all the changes since it's original implementation,
nor can I speak to for all the changes that went after

The original concept was simple and rigorous. For shared sub, all unacked
messages will be redelivered, and for Exclusive subs, the cursor was
rewound and everything after the  rewind point. was redelivered to preserve
order.

>but in failover and exclusive subType, if we don't get the response,
the user will receive the message from the `incomingQueue` then the
order of the message will be broken.

This "brokenness" is not clear to me.  The sequence 3,4,5,6,7,8,9,10,11
12,13,14,15, 16 ,9,10,11,12,13,14,15,16,17, 18, 19, 20 ...does not break
the ordering guarantees of Pular

On Tue, Nov 22, 2022 at 5:47 PM PengHui Li <pe...@apache.org> wrote:

> Hi, Bo
>
> Thanks for starting the discussion.
>
> I have no idea about the initial motivation for supporting message
> redelivery for
> Failover or Exclusive subscription. The redelivered messages will go to the
> same
> consumer under a single active consumer subscription mode.
>
> Or maybe it is only designed for the Shared subscription?
>
> It's better to get some feedback from Matteo, Joe, or anyone who knows the
> background
> about this part.
>
> Thanks,
> Penghui
>
> On Tue, Nov 22, 2022 at 7:36 PM Yubiao Feng
> <yu...@streamnative.io.invalid> wrote:
>
> > Hi Congbo
> >
> > I think it is a goog idea.
> >
> > Thanks
> > Yubiao
> > Yu
> >
> > On Mon, Nov 21, 2022 at 9:04 PM 丛搏 <co...@gmail.com> wrote:
> >
> > > Hello, Pulsar community:
> > >
> > > Now client consumer `void redeliverUnacknowledgedMessages();` is an
> > > async interface, but it doesn't have the return value. only
> > > `writeAndFlush` the redeliver command then finishes.
> > >
> > > `ConsumerImpl`:
> > >
> > >
> >
> https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1907-L1909
> > >
> > > `MultiTopicsConsumerImpl`:
> > >
> > >
> >
> https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L667-L677
> > >
> > > in the shared subType, I think it doesn't need the response of the
> > > `void redeliverUnacknowledgedMessages()`, and naming the
> > > `redeliverUnacknowledgedMessages` is ok.
> > >
> > > but in failover and exclusive subType, if we don't get the response,
> > > the user will receive the message from the `incomingQueue` then the
> > > order of the message will be broken.  If the
> > > `redeliverUnacknowledgedMessages` timeout, we should try again. but
> > > `redeliverUnacknowledgedMessages` doesn't throw any exception or
> > > retry. and the `redeliverUnacknowledgedMessages` name is not accurate
> > > for failover and exclusive subType. it is named `rewind` is more
> > > suitable.
> > >
> > > So I suggest `redeliverUnacknowledgedMessages` be deprecated under
> > > failover and exclusive subType and add a new similar async and sync
> > > method called `rewind` for failover and exclusive subType.
> > >
> > > Please leave your comments or suggestions, thanks!
> > >
> > > Thanks,
> > > bo
> > >
> >
>

Re: [DISCUSS] The use of consumer redeliverUnacknowledgedMessages method

Posted by PengHui Li <pe...@apache.org>.
Hi, Bo

Thanks for starting the discussion.

I have no idea about the initial motivation for supporting message
redelivery for
Failover or Exclusive subscription. The redelivered messages will go to the
same
consumer under a single active consumer subscription mode.

Or maybe it is only designed for the Shared subscription?

It's better to get some feedback from Matteo, Joe, or anyone who knows the
background
about this part.

Thanks,
Penghui

On Tue, Nov 22, 2022 at 7:36 PM Yubiao Feng
<yu...@streamnative.io.invalid> wrote:

> Hi Congbo
>
> I think it is a goog idea.
>
> Thanks
> Yubiao
> Yu
>
> On Mon, Nov 21, 2022 at 9:04 PM 丛搏 <co...@gmail.com> wrote:
>
> > Hello, Pulsar community:
> >
> > Now client consumer `void redeliverUnacknowledgedMessages();` is an
> > async interface, but it doesn't have the return value. only
> > `writeAndFlush` the redeliver command then finishes.
> >
> > `ConsumerImpl`:
> >
> >
> https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1907-L1909
> >
> > `MultiTopicsConsumerImpl`:
> >
> >
> https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L667-L677
> >
> > in the shared subType, I think it doesn't need the response of the
> > `void redeliverUnacknowledgedMessages()`, and naming the
> > `redeliverUnacknowledgedMessages` is ok.
> >
> > but in failover and exclusive subType, if we don't get the response,
> > the user will receive the message from the `incomingQueue` then the
> > order of the message will be broken.  If the
> > `redeliverUnacknowledgedMessages` timeout, we should try again. but
> > `redeliverUnacknowledgedMessages` doesn't throw any exception or
> > retry. and the `redeliverUnacknowledgedMessages` name is not accurate
> > for failover and exclusive subType. it is named `rewind` is more
> > suitable.
> >
> > So I suggest `redeliverUnacknowledgedMessages` be deprecated under
> > failover and exclusive subType and add a new similar async and sync
> > method called `rewind` for failover and exclusive subType.
> >
> > Please leave your comments or suggestions, thanks!
> >
> > Thanks,
> > bo
> >
>

Re: [DISCUSS] The use of consumer redeliverUnacknowledgedMessages method

Posted by Yubiao Feng <yu...@streamnative.io.INVALID>.
Hi Congbo

I think it is a goog idea.

Thanks
Yubiao
Yu

On Mon, Nov 21, 2022 at 9:04 PM 丛搏 <co...@gmail.com> wrote:

> Hello, Pulsar community:
>
> Now client consumer `void redeliverUnacknowledgedMessages();` is an
> async interface, but it doesn't have the return value. only
> `writeAndFlush` the redeliver command then finishes.
>
> `ConsumerImpl`:
>
> https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1907-L1909
>
> `MultiTopicsConsumerImpl`:
>
> https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L667-L677
>
> in the shared subType, I think it doesn't need the response of the
> `void redeliverUnacknowledgedMessages()`, and naming the
> `redeliverUnacknowledgedMessages` is ok.
>
> but in failover and exclusive subType, if we don't get the response,
> the user will receive the message from the `incomingQueue` then the
> order of the message will be broken.  If the
> `redeliverUnacknowledgedMessages` timeout, we should try again. but
> `redeliverUnacknowledgedMessages` doesn't throw any exception or
> retry. and the `redeliverUnacknowledgedMessages` name is not accurate
> for failover and exclusive subType. it is named `rewind` is more
> suitable.
>
> So I suggest `redeliverUnacknowledgedMessages` be deprecated under
> failover and exclusive subType and add a new similar async and sync
> method called `rewind` for failover and exclusive subType.
>
> Please leave your comments or suggestions, thanks!
>
> Thanks,
> bo
>