You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by 丛搏 <bo...@apache.org> on 2022/09/08 02:55:57 UTC

[DISCUSS] Consumer reconnection causes repeated consumption messages

Hello, Pulsar community:


Now the consumer does not filter messages that have already been
consumed. After consumer reconnection, the broker will dispatch
messages to the consumer from the markDeletePosition. In Failover and
Exclusive subscription type, all messages in a topic will be
dispatched to the same consumer. Let's look at the following example:

```

    @Test

    public void testConsumerReconnectionRepeatedConsumeMessage()
throws Exception {

        final String topic = "testConsumerReconnectionRepeatedConsumeMessage";

        @Cleanup

        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)

                .topic(topic).sendTimeout(0,
TimeUnit.SECONDS).enableBatching(false).create();

        @Cleanup

        Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive)

                .topic(topic).subscriptionName("test").subscribe();


        // send 5 message

        for (int i = 0; i < 5; i++) {

            producer.send("test" + i);

        }


        // consumer receive 5 messages

        for (int i = 0; i < 5; i++) {

            consumer.receive();

        }


        admin.topics().unload(topic);


        // consumer receive also can receive 5 messages

        Message<String> message = null;

        for (int i = 0; i < 5; i++) {

            message = consumer.receive();

        }

        consumer.acknowledgeCumulativeAsync(message.getMessageId());

    }

```

Through the above example, the consumer repeatedly consumes the 5
messages sent by the producer, and acks through cumulative ack. If per
1000, 10000 messages cumulative ack once, there will be a lot of
repeated consumption that may be caused by consumer reconnection.
Although it does not affect the semantics at at-least-once, it will
cause a lot of useless overhead.


Most importantly it destroys the semantics of pulsar transactions exactly-once.


I want to discuss whether we should fix normal and transaction
cumulative acks in the same way. Prevent repeated consumption of
messages due to consumer reconnection, and filter messages that users
have received through `consumer.receive()`. Or do we only guarantee
excetly-once semantics, only guarantee use transaction will not
receive the same messages by cumulative ack with the transaction?


Please leave your opinion, thanks! :)



Thanks,

Bo

Re: [DISCUSS] Consumer reconnection causes repeated consumption messages

Posted by Xiangying Meng <xi...@apache.org>.
Hi, Bo
I totally agree with this approach.
Suppose we now implement deduplication for common messages on the client
side. In that case, there is no need to add other logic that may cause API
break-changes to guarantee transaction exactly-once semantics.
Yours,
Xiangying

On Fri, Sep 9, 2022 at 10:17 PM 丛搏 <co...@gmail.com> wrote:

> Hi all,
>
> you can see the problem in a google doc and comments.
>
> google doc link:
>
> https://docs.google.com/document/d/1J1xGcj8YORrdlCa_XDt28uV0TMp03gSmX_z43ZRhwZo/edit?usp=sharing
>
> Thanks!
> Bo
>
> 丛搏 <bo...@apache.org> 于2022年9月8日周四 10:55写道:
> >
> > Hello, Pulsar community:
> >
> >
> > Now the consumer does not filter messages that have already been
> > consumed. After consumer reconnection, the broker will dispatch
> > messages to the consumer from the markDeletePosition. In Failover and
> > Exclusive subscription type, all messages in a topic will be
> > dispatched to the same consumer. Let's look at the following example:
> >
> > ```
> >
> >     @Test
> >
> >     public void testConsumerReconnectionRepeatedConsumeMessage()
> > throws Exception {
> >
> >         final String topic =
> "testConsumerReconnectionRepeatedConsumeMessage";
> >
> >         @Cleanup
> >
> >         Producer<String> producer =
> pulsarClient.newProducer(Schema.STRING)
> >
> >                 .topic(topic).sendTimeout(0,
> > TimeUnit.SECONDS).enableBatching(false).create();
> >
> >         @Cleanup
> >
> >         Consumer<String> consumer =
> >
> pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive)
> >
> >                 .topic(topic).subscriptionName("test").subscribe();
> >
> >
> >         // send 5 message
> >
> >         for (int i = 0; i < 5; i++) {
> >
> >             producer.send("test" + i);
> >
> >         }
> >
> >
> >         // consumer receive 5 messages
> >
> >         for (int i = 0; i < 5; i++) {
> >
> >             consumer.receive();
> >
> >         }
> >
> >
> >         admin.topics().unload(topic);
> >
> >
> >         // consumer receive also can receive 5 messages
> >
> >         Message<String> message = null;
> >
> >         for (int i = 0; i < 5; i++) {
> >
> >             message = consumer.receive();
> >
> >         }
> >
> >         consumer.acknowledgeCumulativeAsync(message.getMessageId());
> >
> >     }
> >
> > ```
> >
> > Through the above example, the consumer repeatedly consumes the 5
> > messages sent by the producer, and acks through cumulative ack. If per
> > 1000, 10000 messages cumulative ack once, there will be a lot of
> > repeated consumption that may be caused by consumer reconnection.
> > Although it does not affect the semantics at at-least-once, it will
> > cause a lot of useless overhead.
> >
> >
> > Most importantly it destroys the semantics of pulsar transactions
> exactly-once.
> >
> >
> > I want to discuss whether we should fix normal and transaction
> > cumulative acks in the same way. Prevent repeated consumption of
> > messages due to consumer reconnection, and filter messages that users
> > have received through `consumer.receive()`. Or do we only guarantee
> > excetly-once semantics, only guarantee use transaction will not
> > receive the same messages by cumulative ack with the transaction?
> >
> >
> > Please leave your opinion, thanks! :)
> >
> >
> >
> > Thanks,
> >
> > Bo
>

Re: [DISCUSS] Consumer reconnection causes repeated consumption messages

Posted by 丛搏 <co...@gmail.com>.
Hi all,

you can see the problem in a google doc and comments.

google doc link:
https://docs.google.com/document/d/1J1xGcj8YORrdlCa_XDt28uV0TMp03gSmX_z43ZRhwZo/edit?usp=sharing

Thanks!
Bo

丛搏 <bo...@apache.org> 于2022年9月8日周四 10:55写道:
>
> Hello, Pulsar community:
>
>
> Now the consumer does not filter messages that have already been
> consumed. After consumer reconnection, the broker will dispatch
> messages to the consumer from the markDeletePosition. In Failover and
> Exclusive subscription type, all messages in a topic will be
> dispatched to the same consumer. Let's look at the following example:
>
> ```
>
>     @Test
>
>     public void testConsumerReconnectionRepeatedConsumeMessage()
> throws Exception {
>
>         final String topic = "testConsumerReconnectionRepeatedConsumeMessage";
>
>         @Cleanup
>
>         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
>
>                 .topic(topic).sendTimeout(0,
> TimeUnit.SECONDS).enableBatching(false).create();
>
>         @Cleanup
>
>         Consumer<String> consumer =
> pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive)
>
>                 .topic(topic).subscriptionName("test").subscribe();
>
>
>         // send 5 message
>
>         for (int i = 0; i < 5; i++) {
>
>             producer.send("test" + i);
>
>         }
>
>
>         // consumer receive 5 messages
>
>         for (int i = 0; i < 5; i++) {
>
>             consumer.receive();
>
>         }
>
>
>         admin.topics().unload(topic);
>
>
>         // consumer receive also can receive 5 messages
>
>         Message<String> message = null;
>
>         for (int i = 0; i < 5; i++) {
>
>             message = consumer.receive();
>
>         }
>
>         consumer.acknowledgeCumulativeAsync(message.getMessageId());
>
>     }
>
> ```
>
> Through the above example, the consumer repeatedly consumes the 5
> messages sent by the producer, and acks through cumulative ack. If per
> 1000, 10000 messages cumulative ack once, there will be a lot of
> repeated consumption that may be caused by consumer reconnection.
> Although it does not affect the semantics at at-least-once, it will
> cause a lot of useless overhead.
>
>
> Most importantly it destroys the semantics of pulsar transactions exactly-once.
>
>
> I want to discuss whether we should fix normal and transaction
> cumulative acks in the same way. Prevent repeated consumption of
> messages due to consumer reconnection, and filter messages that users
> have received through `consumer.receive()`. Or do we only guarantee
> excetly-once semantics, only guarantee use transaction will not
> receive the same messages by cumulative ack with the transaction?
>
>
> Please leave your opinion, thanks! :)
>
>
>
> Thanks,
>
> Bo

Re: [DISCUSS] Consumer reconnection causes repeated consumption messages

Posted by 丛搏 <co...@gmail.com>.
Hi Haiting,

good point! we need to pay attention to all reset cursor operations. I
think this is a configurable optimization because some users don't
call `void redeliverUnacknowledgedMessages().` This optimization is a
break-change operation.
I will find all the operations to reset the cursor, and give the
solution, and I will update it in the mail later.

Thanks!
Bo

Haiting Jiang <ji...@gmail.com> 于2022年9月9日周五 10:33写道:
>
> Hi Bo,
>
> Overall it makes sense to me.
> It is basically the same as broker side deduplication mechanism when
> producing messages, which uses `sequenceId`.
> In your case, messageId is used for deduplication. It should work as
> long as the received messageId increases monotonically.
>
> So we should be careful of any operations that would reset the cursor.
> For example, if the user resets the cursor with the admin client. We
> need more detail info on this matter.
> And I am not sure if there are other operations that would reset the
> cursor implicitly.
>
> Thanks,
> Haiting
>
> On Thu, Sep 8, 2022 at 11:36 PM 丛搏 <co...@gmail.com> wrote:
> >
> > Hi Haiting
> >
> > When using cumulative ack, we can save the maximum received MessageId
> > on the consumer client side to filter the message duplication caused
> > by reconnection, if the consumer client process restarts the maximum
> > received MessageId will not exist in the consumer client. This
> > requires the user to be responsible for the received messages, and if
> > the user wants to re-consume the received messages, they need to call
> > `void redeliverUnacknowledgedMessages().` then clear the maximum
> > received MessageId from the consumer client
> >
> > Thanks,
> > Bo
> >
> > Haiting Jiang <ji...@gmail.com> 于2022年9月8日周四 14:51写道:
> > >
> > > From the user's perspective, I think we should always avoid delivering
> > > repeated messages.
> > > But can the broker tell if the reconnection is caused by topic
> > > unloading or consumer client process restarting?
> > > For the latter case, the message should be redelivered, it's the whole
> > > point of user explicit acking.
> > >
> > > Thanks,
> > > Haiting
> > >
> > > On Thu, Sep 8, 2022 at 10:56 AM 丛搏 <bo...@apache.org> wrote:
> > > >
> > > > Hello, Pulsar community:
> > > >
> > > >
> > > > Now the consumer does not filter messages that have already been
> > > > consumed. After consumer reconnection, the broker will dispatch
> > > > messages to the consumer from the markDeletePosition. In Failover and
> > > > Exclusive subscription type, all messages in a topic will be
> > > > dispatched to the same consumer. Let's look at the following example:
> > > >
> > > > ```
> > > >
> > > >     @Test
> > > >
> > > >     public void testConsumerReconnectionRepeatedConsumeMessage()
> > > > throws Exception {
> > > >
> > > >         final String topic = "testConsumerReconnectionRepeatedConsumeMessage";
> > > >
> > > >         @Cleanup
> > > >
> > > >         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
> > > >
> > > >                 .topic(topic).sendTimeout(0,
> > > > TimeUnit.SECONDS).enableBatching(false).create();
> > > >
> > > >         @Cleanup
> > > >
> > > >         Consumer<String> consumer =
> > > > pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive)
> > > >
> > > >                 .topic(topic).subscriptionName("test").subscribe();
> > > >
> > > >
> > > >         // send 5 message
> > > >
> > > >         for (int i = 0; i < 5; i++) {
> > > >
> > > >             producer.send("test" + i);
> > > >
> > > >         }
> > > >
> > > >
> > > >         // consumer receive 5 messages
> > > >
> > > >         for (int i = 0; i < 5; i++) {
> > > >
> > > >             consumer.receive();
> > > >
> > > >         }
> > > >
> > > >
> > > >         admin.topics().unload(topic);
> > > >
> > > >
> > > >         // consumer receive also can receive 5 messages
> > > >
> > > >         Message<String> message = null;
> > > >
> > > >         for (int i = 0; i < 5; i++) {
> > > >
> > > >             message = consumer.receive();
> > > >
> > > >         }
> > > >
> > > >         consumer.acknowledgeCumulativeAsync(message.getMessageId());
> > > >
> > > >     }
> > > >
> > > > ```
> > > >
> > > > Through the above example, the consumer repeatedly consumes the 5
> > > > messages sent by the producer, and acks through cumulative ack. If per
> > > > 1000, 10000 messages cumulative ack once, there will be a lot of
> > > > repeated consumption that may be caused by consumer reconnection.
> > > > Although it does not affect the semantics at at-least-once, it will
> > > > cause a lot of useless overhead.
> > > >
> > > >
> > > > Most importantly it destroys the semantics of pulsar transactions exactly-once.
> > > >
> > > >
> > > > I want to discuss whether we should fix normal and transaction
> > > > cumulative acks in the same way. Prevent repeated consumption of
> > > > messages due to consumer reconnection, and filter messages that users
> > > > have received through `consumer.receive()`. Or do we only guarantee
> > > > excetly-once semantics, only guarantee use transaction will not
> > > > receive the same messages by cumulative ack with the transaction?
> > > >
> > > >
> > > > Please leave your opinion, thanks! :)
> > > >
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Bo

Re: [DISCUSS] Consumer reconnection causes repeated consumption messages

Posted by 丛搏 <co...@gmail.com>.
Hi Haiting,

In this case, the user wants to reset to the first message reconsume
all the messages, but the user can't receive a message anymore, the
`admin.topics().resetCursor(topic, subName, MessageId.earliest)` may
use HTTP to call, so it is wrong behavior.

Thanks,
Bo

Haiting Jiang <ji...@gmail.com> 于2022年9月15日周四 11:27写道:
>
> Hi Bo,
>
> > In this case, the consumer can not receive any message again in the
> > end. We have to fix it because it causes a loss of messages.
> > I think we need to redefine the use of cumulative acks and not just
> > solve the problem of receiving messages repeatedly.
> >
>
> There seems to be nothing wrong in this case. You already ack the last
> message, although, you reset the cursor before.
> Currently, we don't have a restriction on the message id user acked.
> They don't have to come from a valid message, and they can even be
> created directly by the user.
>
> Thanks,
> Haiting
>
> On Wed, Sep 14, 2022 at 8:34 PM 丛搏 <co...@gmail.com> wrote:
> >
> > Hi Haiting, Michael ,
> >
> > I tested an example, and there may be some problems when using the sub
> > reset cursor and cumulative ack at the same time. e.g
> >
> > ```
> >
> > Message<byte[]> message = null;
> > for (int i = 0; i < 3; i++) {
> >     message = consumer.receive();
> > }
> > admin.topics().resetCursor(topic, subName, MessageId.earliest);
> >
> > consumer.acknowledgeCumulative(message.getMessageId());
> >
> > admin.topics().unload(topic);
> > consumer.receive();
> >
> > ```
> > In this case, the consumer can not receive any message again in the
> > end. We have to fix it because it causes a loss of messages.
> > I think we need to redefine the use of cumulative acks and not just
> > solve the problem of receiving messages repeatedly.
> >
> > Thanks!
> > Bo
> >
> > Michael Marshall <mm...@apache.org> 于2022年9月9日周五 12:05写道:
> > >
> > > I agree that reducing unnecessary duplicates is a good goal.
> > >
> > > For the topic unloading case, it might help to think about how we can
> > > improve the protocol. The current handover is very blunt. A new
> > > solution could focus on decreasing duplicate message delivery while
> > > also focusing on decreasing time where the topic is unavailable.
> > >
> > > This thread is probably a good requirement when thinking about ways to
> > > improve the topic handover logic, which has been discussed as a
> > > potential load balancing enhancement or a 3.0 enhancement.
> > >
> > > > So we should be careful of any operations that would reset the cursor.
> > > > For example, if the user resets the cursor with the admin client. We
> > > > need more detail info on this matter.
> > >
> > > This is a great point! In this case, it seems important to redeliver
> > > these messages. (For those unfamiliar, I just confirmed that the
> > > broker disconnects consumers when a cursor is reset.)
> > >
> > > Thanks,
> > > Michael
> > >
> > > On Thu, Sep 8, 2022 at 9:33 PM Haiting Jiang <ji...@gmail.com> wrote:
> > > >
> > > > Hi Bo,
> > > >
> > > > Overall it makes sense to me.
> > > > It is basically the same as broker side deduplication mechanism when
> > > > producing messages, which uses `sequenceId`.
> > > > In your case, messageId is used for deduplication. It should work as
> > > > long as the received messageId increases monotonically.
> > > >
> > > > So we should be careful of any operations that would reset the cursor.
> > > > For example, if the user resets the cursor with the admin client. We
> > > > need more detail info on this matter.
> > > > And I am not sure if there are other operations that would reset the
> > > > cursor implicitly.
> > > >
> > > > Thanks,
> > > > Haiting
> > > >
> > > > On Thu, Sep 8, 2022 at 11:36 PM 丛搏 <co...@gmail.com> wrote:
> > > > >
> > > > > Hi Haiting
> > > > >
> > > > > When using cumulative ack, we can save the maximum received MessageId
> > > > > on the consumer client side to filter the message duplication caused
> > > > > by reconnection, if the consumer client process restarts the maximum
> > > > > received MessageId will not exist in the consumer client. This
> > > > > requires the user to be responsible for the received messages, and if
> > > > > the user wants to re-consume the received messages, they need to call
> > > > > `void redeliverUnacknowledgedMessages().` then clear the maximum
> > > > > received MessageId from the consumer client
> > > > >
> > > > > Thanks,
> > > > > Bo
> > > > >
> > > > > Haiting Jiang <ji...@gmail.com> 于2022年9月8日周四 14:51写道:
> > > > > >
> > > > > > From the user's perspective, I think we should always avoid delivering
> > > > > > repeated messages.
> > > > > > But can the broker tell if the reconnection is caused by topic
> > > > > > unloading or consumer client process restarting?
> > > > > > For the latter case, the message should be redelivered, it's the whole
> > > > > > point of user explicit acking.
> > > > > >
> > > > > > Thanks,
> > > > > > Haiting
> > > > > >
> > > > > > On Thu, Sep 8, 2022 at 10:56 AM 丛搏 <bo...@apache.org> wrote:
> > > > > > >
> > > > > > > Hello, Pulsar community:
> > > > > > >
> > > > > > >
> > > > > > > Now the consumer does not filter messages that have already been
> > > > > > > consumed. After consumer reconnection, the broker will dispatch
> > > > > > > messages to the consumer from the markDeletePosition. In Failover and
> > > > > > > Exclusive subscription type, all messages in a topic will be
> > > > > > > dispatched to the same consumer. Let's look at the following example:
> > > > > > >
> > > > > > > ```
> > > > > > >
> > > > > > >     @Test
> > > > > > >
> > > > > > >     public void testConsumerReconnectionRepeatedConsumeMessage()
> > > > > > > throws Exception {
> > > > > > >
> > > > > > >         final String topic = "testConsumerReconnectionRepeatedConsumeMessage";
> > > > > > >
> > > > > > >         @Cleanup
> > > > > > >
> > > > > > >         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
> > > > > > >
> > > > > > >                 .topic(topic).sendTimeout(0,
> > > > > > > TimeUnit.SECONDS).enableBatching(false).create();
> > > > > > >
> > > > > > >         @Cleanup
> > > > > > >
> > > > > > >         Consumer<String> consumer =
> > > > > > > pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive)
> > > > > > >
> > > > > > >                 .topic(topic).subscriptionName("test").subscribe();
> > > > > > >
> > > > > > >
> > > > > > >         // send 5 message
> > > > > > >
> > > > > > >         for (int i = 0; i < 5; i++) {
> > > > > > >
> > > > > > >             producer.send("test" + i);
> > > > > > >
> > > > > > >         }
> > > > > > >
> > > > > > >
> > > > > > >         // consumer receive 5 messages
> > > > > > >
> > > > > > >         for (int i = 0; i < 5; i++) {
> > > > > > >
> > > > > > >             consumer.receive();
> > > > > > >
> > > > > > >         }
> > > > > > >
> > > > > > >
> > > > > > >         admin.topics().unload(topic);
> > > > > > >
> > > > > > >
> > > > > > >         // consumer receive also can receive 5 messages
> > > > > > >
> > > > > > >         Message<String> message = null;
> > > > > > >
> > > > > > >         for (int i = 0; i < 5; i++) {
> > > > > > >
> > > > > > >             message = consumer.receive();
> > > > > > >
> > > > > > >         }
> > > > > > >
> > > > > > >         consumer.acknowledgeCumulativeAsync(message.getMessageId());
> > > > > > >
> > > > > > >     }
> > > > > > >
> > > > > > > ```
> > > > > > >
> > > > > > > Through the above example, the consumer repeatedly consumes the 5
> > > > > > > messages sent by the producer, and acks through cumulative ack. If per
> > > > > > > 1000, 10000 messages cumulative ack once, there will be a lot of
> > > > > > > repeated consumption that may be caused by consumer reconnection.
> > > > > > > Although it does not affect the semantics at at-least-once, it will
> > > > > > > cause a lot of useless overhead.
> > > > > > >
> > > > > > >
> > > > > > > Most importantly it destroys the semantics of pulsar transactions exactly-once.
> > > > > > >
> > > > > > >
> > > > > > > I want to discuss whether we should fix normal and transaction
> > > > > > > cumulative acks in the same way. Prevent repeated consumption of
> > > > > > > messages due to consumer reconnection, and filter messages that users
> > > > > > > have received through `consumer.receive()`. Or do we only guarantee
> > > > > > > excetly-once semantics, only guarantee use transaction will not
> > > > > > > receive the same messages by cumulative ack with the transaction?
> > > > > > >
> > > > > > >
> > > > > > > Please leave your opinion, thanks! :)
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Bo

Re: [DISCUSS] Consumer reconnection causes repeated consumption messages

Posted by Haiting Jiang <ji...@gmail.com>.
Hi Bo,

> In this case, the consumer can not receive any message again in the
> end. We have to fix it because it causes a loss of messages.
> I think we need to redefine the use of cumulative acks and not just
> solve the problem of receiving messages repeatedly.
>

There seems to be nothing wrong in this case. You already ack the last
message, although, you reset the cursor before.
Currently, we don't have a restriction on the message id user acked.
They don't have to come from a valid message, and they can even be
created directly by the user.

Thanks,
Haiting

On Wed, Sep 14, 2022 at 8:34 PM 丛搏 <co...@gmail.com> wrote:
>
> Hi Haiting, Michael ,
>
> I tested an example, and there may be some problems when using the sub
> reset cursor and cumulative ack at the same time. e.g
>
> ```
>
> Message<byte[]> message = null;
> for (int i = 0; i < 3; i++) {
>     message = consumer.receive();
> }
> admin.topics().resetCursor(topic, subName, MessageId.earliest);
>
> consumer.acknowledgeCumulative(message.getMessageId());
>
> admin.topics().unload(topic);
> consumer.receive();
>
> ```
> In this case, the consumer can not receive any message again in the
> end. We have to fix it because it causes a loss of messages.
> I think we need to redefine the use of cumulative acks and not just
> solve the problem of receiving messages repeatedly.
>
> Thanks!
> Bo
>
> Michael Marshall <mm...@apache.org> 于2022年9月9日周五 12:05写道:
> >
> > I agree that reducing unnecessary duplicates is a good goal.
> >
> > For the topic unloading case, it might help to think about how we can
> > improve the protocol. The current handover is very blunt. A new
> > solution could focus on decreasing duplicate message delivery while
> > also focusing on decreasing time where the topic is unavailable.
> >
> > This thread is probably a good requirement when thinking about ways to
> > improve the topic handover logic, which has been discussed as a
> > potential load balancing enhancement or a 3.0 enhancement.
> >
> > > So we should be careful of any operations that would reset the cursor.
> > > For example, if the user resets the cursor with the admin client. We
> > > need more detail info on this matter.
> >
> > This is a great point! In this case, it seems important to redeliver
> > these messages. (For those unfamiliar, I just confirmed that the
> > broker disconnects consumers when a cursor is reset.)
> >
> > Thanks,
> > Michael
> >
> > On Thu, Sep 8, 2022 at 9:33 PM Haiting Jiang <ji...@gmail.com> wrote:
> > >
> > > Hi Bo,
> > >
> > > Overall it makes sense to me.
> > > It is basically the same as broker side deduplication mechanism when
> > > producing messages, which uses `sequenceId`.
> > > In your case, messageId is used for deduplication. It should work as
> > > long as the received messageId increases monotonically.
> > >
> > > So we should be careful of any operations that would reset the cursor.
> > > For example, if the user resets the cursor with the admin client. We
> > > need more detail info on this matter.
> > > And I am not sure if there are other operations that would reset the
> > > cursor implicitly.
> > >
> > > Thanks,
> > > Haiting
> > >
> > > On Thu, Sep 8, 2022 at 11:36 PM 丛搏 <co...@gmail.com> wrote:
> > > >
> > > > Hi Haiting
> > > >
> > > > When using cumulative ack, we can save the maximum received MessageId
> > > > on the consumer client side to filter the message duplication caused
> > > > by reconnection, if the consumer client process restarts the maximum
> > > > received MessageId will not exist in the consumer client. This
> > > > requires the user to be responsible for the received messages, and if
> > > > the user wants to re-consume the received messages, they need to call
> > > > `void redeliverUnacknowledgedMessages().` then clear the maximum
> > > > received MessageId from the consumer client
> > > >
> > > > Thanks,
> > > > Bo
> > > >
> > > > Haiting Jiang <ji...@gmail.com> 于2022年9月8日周四 14:51写道:
> > > > >
> > > > > From the user's perspective, I think we should always avoid delivering
> > > > > repeated messages.
> > > > > But can the broker tell if the reconnection is caused by topic
> > > > > unloading or consumer client process restarting?
> > > > > For the latter case, the message should be redelivered, it's the whole
> > > > > point of user explicit acking.
> > > > >
> > > > > Thanks,
> > > > > Haiting
> > > > >
> > > > > On Thu, Sep 8, 2022 at 10:56 AM 丛搏 <bo...@apache.org> wrote:
> > > > > >
> > > > > > Hello, Pulsar community:
> > > > > >
> > > > > >
> > > > > > Now the consumer does not filter messages that have already been
> > > > > > consumed. After consumer reconnection, the broker will dispatch
> > > > > > messages to the consumer from the markDeletePosition. In Failover and
> > > > > > Exclusive subscription type, all messages in a topic will be
> > > > > > dispatched to the same consumer. Let's look at the following example:
> > > > > >
> > > > > > ```
> > > > > >
> > > > > >     @Test
> > > > > >
> > > > > >     public void testConsumerReconnectionRepeatedConsumeMessage()
> > > > > > throws Exception {
> > > > > >
> > > > > >         final String topic = "testConsumerReconnectionRepeatedConsumeMessage";
> > > > > >
> > > > > >         @Cleanup
> > > > > >
> > > > > >         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
> > > > > >
> > > > > >                 .topic(topic).sendTimeout(0,
> > > > > > TimeUnit.SECONDS).enableBatching(false).create();
> > > > > >
> > > > > >         @Cleanup
> > > > > >
> > > > > >         Consumer<String> consumer =
> > > > > > pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive)
> > > > > >
> > > > > >                 .topic(topic).subscriptionName("test").subscribe();
> > > > > >
> > > > > >
> > > > > >         // send 5 message
> > > > > >
> > > > > >         for (int i = 0; i < 5; i++) {
> > > > > >
> > > > > >             producer.send("test" + i);
> > > > > >
> > > > > >         }
> > > > > >
> > > > > >
> > > > > >         // consumer receive 5 messages
> > > > > >
> > > > > >         for (int i = 0; i < 5; i++) {
> > > > > >
> > > > > >             consumer.receive();
> > > > > >
> > > > > >         }
> > > > > >
> > > > > >
> > > > > >         admin.topics().unload(topic);
> > > > > >
> > > > > >
> > > > > >         // consumer receive also can receive 5 messages
> > > > > >
> > > > > >         Message<String> message = null;
> > > > > >
> > > > > >         for (int i = 0; i < 5; i++) {
> > > > > >
> > > > > >             message = consumer.receive();
> > > > > >
> > > > > >         }
> > > > > >
> > > > > >         consumer.acknowledgeCumulativeAsync(message.getMessageId());
> > > > > >
> > > > > >     }
> > > > > >
> > > > > > ```
> > > > > >
> > > > > > Through the above example, the consumer repeatedly consumes the 5
> > > > > > messages sent by the producer, and acks through cumulative ack. If per
> > > > > > 1000, 10000 messages cumulative ack once, there will be a lot of
> > > > > > repeated consumption that may be caused by consumer reconnection.
> > > > > > Although it does not affect the semantics at at-least-once, it will
> > > > > > cause a lot of useless overhead.
> > > > > >
> > > > > >
> > > > > > Most importantly it destroys the semantics of pulsar transactions exactly-once.
> > > > > >
> > > > > >
> > > > > > I want to discuss whether we should fix normal and transaction
> > > > > > cumulative acks in the same way. Prevent repeated consumption of
> > > > > > messages due to consumer reconnection, and filter messages that users
> > > > > > have received through `consumer.receive()`. Or do we only guarantee
> > > > > > excetly-once semantics, only guarantee use transaction will not
> > > > > > receive the same messages by cumulative ack with the transaction?
> > > > > >
> > > > > >
> > > > > > Please leave your opinion, thanks! :)
> > > > > >
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Bo

Re: [DISCUSS] Consumer reconnection causes repeated consumption messages

Posted by 丛搏 <co...@gmail.com>.
Hi Haiting, Michael ,

I tested an example, and there may be some problems when using the sub
reset cursor and cumulative ack at the same time. e.g

```

Message<byte[]> message = null;
for (int i = 0; i < 3; i++) {
    message = consumer.receive();
}
admin.topics().resetCursor(topic, subName, MessageId.earliest);

consumer.acknowledgeCumulative(message.getMessageId());

admin.topics().unload(topic);
consumer.receive();

```
In this case, the consumer can not receive any message again in the
end. We have to fix it because it causes a loss of messages.
I think we need to redefine the use of cumulative acks and not just
solve the problem of receiving messages repeatedly.

Thanks!
Bo

Michael Marshall <mm...@apache.org> 于2022年9月9日周五 12:05写道:
>
> I agree that reducing unnecessary duplicates is a good goal.
>
> For the topic unloading case, it might help to think about how we can
> improve the protocol. The current handover is very blunt. A new
> solution could focus on decreasing duplicate message delivery while
> also focusing on decreasing time where the topic is unavailable.
>
> This thread is probably a good requirement when thinking about ways to
> improve the topic handover logic, which has been discussed as a
> potential load balancing enhancement or a 3.0 enhancement.
>
> > So we should be careful of any operations that would reset the cursor.
> > For example, if the user resets the cursor with the admin client. We
> > need more detail info on this matter.
>
> This is a great point! In this case, it seems important to redeliver
> these messages. (For those unfamiliar, I just confirmed that the
> broker disconnects consumers when a cursor is reset.)
>
> Thanks,
> Michael
>
> On Thu, Sep 8, 2022 at 9:33 PM Haiting Jiang <ji...@gmail.com> wrote:
> >
> > Hi Bo,
> >
> > Overall it makes sense to me.
> > It is basically the same as broker side deduplication mechanism when
> > producing messages, which uses `sequenceId`.
> > In your case, messageId is used for deduplication. It should work as
> > long as the received messageId increases monotonically.
> >
> > So we should be careful of any operations that would reset the cursor.
> > For example, if the user resets the cursor with the admin client. We
> > need more detail info on this matter.
> > And I am not sure if there are other operations that would reset the
> > cursor implicitly.
> >
> > Thanks,
> > Haiting
> >
> > On Thu, Sep 8, 2022 at 11:36 PM 丛搏 <co...@gmail.com> wrote:
> > >
> > > Hi Haiting
> > >
> > > When using cumulative ack, we can save the maximum received MessageId
> > > on the consumer client side to filter the message duplication caused
> > > by reconnection, if the consumer client process restarts the maximum
> > > received MessageId will not exist in the consumer client. This
> > > requires the user to be responsible for the received messages, and if
> > > the user wants to re-consume the received messages, they need to call
> > > `void redeliverUnacknowledgedMessages().` then clear the maximum
> > > received MessageId from the consumer client
> > >
> > > Thanks,
> > > Bo
> > >
> > > Haiting Jiang <ji...@gmail.com> 于2022年9月8日周四 14:51写道:
> > > >
> > > > From the user's perspective, I think we should always avoid delivering
> > > > repeated messages.
> > > > But can the broker tell if the reconnection is caused by topic
> > > > unloading or consumer client process restarting?
> > > > For the latter case, the message should be redelivered, it's the whole
> > > > point of user explicit acking.
> > > >
> > > > Thanks,
> > > > Haiting
> > > >
> > > > On Thu, Sep 8, 2022 at 10:56 AM 丛搏 <bo...@apache.org> wrote:
> > > > >
> > > > > Hello, Pulsar community:
> > > > >
> > > > >
> > > > > Now the consumer does not filter messages that have already been
> > > > > consumed. After consumer reconnection, the broker will dispatch
> > > > > messages to the consumer from the markDeletePosition. In Failover and
> > > > > Exclusive subscription type, all messages in a topic will be
> > > > > dispatched to the same consumer. Let's look at the following example:
> > > > >
> > > > > ```
> > > > >
> > > > >     @Test
> > > > >
> > > > >     public void testConsumerReconnectionRepeatedConsumeMessage()
> > > > > throws Exception {
> > > > >
> > > > >         final String topic = "testConsumerReconnectionRepeatedConsumeMessage";
> > > > >
> > > > >         @Cleanup
> > > > >
> > > > >         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
> > > > >
> > > > >                 .topic(topic).sendTimeout(0,
> > > > > TimeUnit.SECONDS).enableBatching(false).create();
> > > > >
> > > > >         @Cleanup
> > > > >
> > > > >         Consumer<String> consumer =
> > > > > pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive)
> > > > >
> > > > >                 .topic(topic).subscriptionName("test").subscribe();
> > > > >
> > > > >
> > > > >         // send 5 message
> > > > >
> > > > >         for (int i = 0; i < 5; i++) {
> > > > >
> > > > >             producer.send("test" + i);
> > > > >
> > > > >         }
> > > > >
> > > > >
> > > > >         // consumer receive 5 messages
> > > > >
> > > > >         for (int i = 0; i < 5; i++) {
> > > > >
> > > > >             consumer.receive();
> > > > >
> > > > >         }
> > > > >
> > > > >
> > > > >         admin.topics().unload(topic);
> > > > >
> > > > >
> > > > >         // consumer receive also can receive 5 messages
> > > > >
> > > > >         Message<String> message = null;
> > > > >
> > > > >         for (int i = 0; i < 5; i++) {
> > > > >
> > > > >             message = consumer.receive();
> > > > >
> > > > >         }
> > > > >
> > > > >         consumer.acknowledgeCumulativeAsync(message.getMessageId());
> > > > >
> > > > >     }
> > > > >
> > > > > ```
> > > > >
> > > > > Through the above example, the consumer repeatedly consumes the 5
> > > > > messages sent by the producer, and acks through cumulative ack. If per
> > > > > 1000, 10000 messages cumulative ack once, there will be a lot of
> > > > > repeated consumption that may be caused by consumer reconnection.
> > > > > Although it does not affect the semantics at at-least-once, it will
> > > > > cause a lot of useless overhead.
> > > > >
> > > > >
> > > > > Most importantly it destroys the semantics of pulsar transactions exactly-once.
> > > > >
> > > > >
> > > > > I want to discuss whether we should fix normal and transaction
> > > > > cumulative acks in the same way. Prevent repeated consumption of
> > > > > messages due to consumer reconnection, and filter messages that users
> > > > > have received through `consumer.receive()`. Or do we only guarantee
> > > > > excetly-once semantics, only guarantee use transaction will not
> > > > > receive the same messages by cumulative ack with the transaction?
> > > > >
> > > > >
> > > > > Please leave your opinion, thanks! :)
> > > > >
> > > > >
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Bo

Re: [DISCUSS] Consumer reconnection causes repeated consumption messages

Posted by 丛搏 <co...@gmail.com>.
Hi Michael

you are right, now the current handover is very blunt. We can
implement a flexible and efficient solution by changing the protocol
in a 3.0 enhancement. But we also need this optimization in 2.0, but
this optimization should not be too complicated at best.

Thanks,
Bo

Michael Marshall <mm...@apache.org> 于2022年9月9日周五 12:05写道:
>
> I agree that reducing unnecessary duplicates is a good goal.
>
> For the topic unloading case, it might help to think about how we can
> improve the protocol. The current handover is very blunt. A new
> solution could focus on decreasing duplicate message delivery while
> also focusing on decreasing time where the topic is unavailable.
>
> This thread is probably a good requirement when thinking about ways to
> improve the topic handover logic, which has been discussed as a
> potential load balancing enhancement or a 3.0 enhancement.
>
> > So we should be careful of any operations that would reset the cursor.
> > For example, if the user resets the cursor with the admin client. We
> > need more detail info on this matter.
>
> This is a great point! In this case, it seems important to redeliver
> these messages. (For those unfamiliar, I just confirmed that the
> broker disconnects consumers when a cursor is reset.)
>
> Thanks,
> Michael
>
> On Thu, Sep 8, 2022 at 9:33 PM Haiting Jiang <ji...@gmail.com> wrote:
> >
> > Hi Bo,
> >
> > Overall it makes sense to me.
> > It is basically the same as broker side deduplication mechanism when
> > producing messages, which uses `sequenceId`.
> > In your case, messageId is used for deduplication. It should work as
> > long as the received messageId increases monotonically.
> >
> > So we should be careful of any operations that would reset the cursor.
> > For example, if the user resets the cursor with the admin client. We
> > need more detail info on this matter.
> > And I am not sure if there are other operations that would reset the
> > cursor implicitly.
> >
> > Thanks,
> > Haiting
> >
> > On Thu, Sep 8, 2022 at 11:36 PM 丛搏 <co...@gmail.com> wrote:
> > >
> > > Hi Haiting
> > >
> > > When using cumulative ack, we can save the maximum received MessageId
> > > on the consumer client side to filter the message duplication caused
> > > by reconnection, if the consumer client process restarts the maximum
> > > received MessageId will not exist in the consumer client. This
> > > requires the user to be responsible for the received messages, and if
> > > the user wants to re-consume the received messages, they need to call
> > > `void redeliverUnacknowledgedMessages().` then clear the maximum
> > > received MessageId from the consumer client
> > >
> > > Thanks,
> > > Bo
> > >
> > > Haiting Jiang <ji...@gmail.com> 于2022年9月8日周四 14:51写道:
> > > >
> > > > From the user's perspective, I think we should always avoid delivering
> > > > repeated messages.
> > > > But can the broker tell if the reconnection is caused by topic
> > > > unloading or consumer client process restarting?
> > > > For the latter case, the message should be redelivered, it's the whole
> > > > point of user explicit acking.
> > > >
> > > > Thanks,
> > > > Haiting
> > > >
> > > > On Thu, Sep 8, 2022 at 10:56 AM 丛搏 <bo...@apache.org> wrote:
> > > > >
> > > > > Hello, Pulsar community:
> > > > >
> > > > >
> > > > > Now the consumer does not filter messages that have already been
> > > > > consumed. After consumer reconnection, the broker will dispatch
> > > > > messages to the consumer from the markDeletePosition. In Failover and
> > > > > Exclusive subscription type, all messages in a topic will be
> > > > > dispatched to the same consumer. Let's look at the following example:
> > > > >
> > > > > ```
> > > > >
> > > > >     @Test
> > > > >
> > > > >     public void testConsumerReconnectionRepeatedConsumeMessage()
> > > > > throws Exception {
> > > > >
> > > > >         final String topic = "testConsumerReconnectionRepeatedConsumeMessage";
> > > > >
> > > > >         @Cleanup
> > > > >
> > > > >         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
> > > > >
> > > > >                 .topic(topic).sendTimeout(0,
> > > > > TimeUnit.SECONDS).enableBatching(false).create();
> > > > >
> > > > >         @Cleanup
> > > > >
> > > > >         Consumer<String> consumer =
> > > > > pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive)
> > > > >
> > > > >                 .topic(topic).subscriptionName("test").subscribe();
> > > > >
> > > > >
> > > > >         // send 5 message
> > > > >
> > > > >         for (int i = 0; i < 5; i++) {
> > > > >
> > > > >             producer.send("test" + i);
> > > > >
> > > > >         }
> > > > >
> > > > >
> > > > >         // consumer receive 5 messages
> > > > >
> > > > >         for (int i = 0; i < 5; i++) {
> > > > >
> > > > >             consumer.receive();
> > > > >
> > > > >         }
> > > > >
> > > > >
> > > > >         admin.topics().unload(topic);
> > > > >
> > > > >
> > > > >         // consumer receive also can receive 5 messages
> > > > >
> > > > >         Message<String> message = null;
> > > > >
> > > > >         for (int i = 0; i < 5; i++) {
> > > > >
> > > > >             message = consumer.receive();
> > > > >
> > > > >         }
> > > > >
> > > > >         consumer.acknowledgeCumulativeAsync(message.getMessageId());
> > > > >
> > > > >     }
> > > > >
> > > > > ```
> > > > >
> > > > > Through the above example, the consumer repeatedly consumes the 5
> > > > > messages sent by the producer, and acks through cumulative ack. If per
> > > > > 1000, 10000 messages cumulative ack once, there will be a lot of
> > > > > repeated consumption that may be caused by consumer reconnection.
> > > > > Although it does not affect the semantics at at-least-once, it will
> > > > > cause a lot of useless overhead.
> > > > >
> > > > >
> > > > > Most importantly it destroys the semantics of pulsar transactions exactly-once.
> > > > >
> > > > >
> > > > > I want to discuss whether we should fix normal and transaction
> > > > > cumulative acks in the same way. Prevent repeated consumption of
> > > > > messages due to consumer reconnection, and filter messages that users
> > > > > have received through `consumer.receive()`. Or do we only guarantee
> > > > > excetly-once semantics, only guarantee use transaction will not
> > > > > receive the same messages by cumulative ack with the transaction?
> > > > >
> > > > >
> > > > > Please leave your opinion, thanks! :)
> > > > >
> > > > >
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Bo

Re: [DISCUSS] Consumer reconnection causes repeated consumption messages

Posted by Michael Marshall <mm...@apache.org>.
I agree that reducing unnecessary duplicates is a good goal.

For the topic unloading case, it might help to think about how we can
improve the protocol. The current handover is very blunt. A new
solution could focus on decreasing duplicate message delivery while
also focusing on decreasing time where the topic is unavailable.

This thread is probably a good requirement when thinking about ways to
improve the topic handover logic, which has been discussed as a
potential load balancing enhancement or a 3.0 enhancement.

> So we should be careful of any operations that would reset the cursor.
> For example, if the user resets the cursor with the admin client. We
> need more detail info on this matter.

This is a great point! In this case, it seems important to redeliver
these messages. (For those unfamiliar, I just confirmed that the
broker disconnects consumers when a cursor is reset.)

Thanks,
Michael

On Thu, Sep 8, 2022 at 9:33 PM Haiting Jiang <ji...@gmail.com> wrote:
>
> Hi Bo,
>
> Overall it makes sense to me.
> It is basically the same as broker side deduplication mechanism when
> producing messages, which uses `sequenceId`.
> In your case, messageId is used for deduplication. It should work as
> long as the received messageId increases monotonically.
>
> So we should be careful of any operations that would reset the cursor.
> For example, if the user resets the cursor with the admin client. We
> need more detail info on this matter.
> And I am not sure if there are other operations that would reset the
> cursor implicitly.
>
> Thanks,
> Haiting
>
> On Thu, Sep 8, 2022 at 11:36 PM 丛搏 <co...@gmail.com> wrote:
> >
> > Hi Haiting
> >
> > When using cumulative ack, we can save the maximum received MessageId
> > on the consumer client side to filter the message duplication caused
> > by reconnection, if the consumer client process restarts the maximum
> > received MessageId will not exist in the consumer client. This
> > requires the user to be responsible for the received messages, and if
> > the user wants to re-consume the received messages, they need to call
> > `void redeliverUnacknowledgedMessages().` then clear the maximum
> > received MessageId from the consumer client
> >
> > Thanks,
> > Bo
> >
> > Haiting Jiang <ji...@gmail.com> 于2022年9月8日周四 14:51写道:
> > >
> > > From the user's perspective, I think we should always avoid delivering
> > > repeated messages.
> > > But can the broker tell if the reconnection is caused by topic
> > > unloading or consumer client process restarting?
> > > For the latter case, the message should be redelivered, it's the whole
> > > point of user explicit acking.
> > >
> > > Thanks,
> > > Haiting
> > >
> > > On Thu, Sep 8, 2022 at 10:56 AM 丛搏 <bo...@apache.org> wrote:
> > > >
> > > > Hello, Pulsar community:
> > > >
> > > >
> > > > Now the consumer does not filter messages that have already been
> > > > consumed. After consumer reconnection, the broker will dispatch
> > > > messages to the consumer from the markDeletePosition. In Failover and
> > > > Exclusive subscription type, all messages in a topic will be
> > > > dispatched to the same consumer. Let's look at the following example:
> > > >
> > > > ```
> > > >
> > > >     @Test
> > > >
> > > >     public void testConsumerReconnectionRepeatedConsumeMessage()
> > > > throws Exception {
> > > >
> > > >         final String topic = "testConsumerReconnectionRepeatedConsumeMessage";
> > > >
> > > >         @Cleanup
> > > >
> > > >         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
> > > >
> > > >                 .topic(topic).sendTimeout(0,
> > > > TimeUnit.SECONDS).enableBatching(false).create();
> > > >
> > > >         @Cleanup
> > > >
> > > >         Consumer<String> consumer =
> > > > pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive)
> > > >
> > > >                 .topic(topic).subscriptionName("test").subscribe();
> > > >
> > > >
> > > >         // send 5 message
> > > >
> > > >         for (int i = 0; i < 5; i++) {
> > > >
> > > >             producer.send("test" + i);
> > > >
> > > >         }
> > > >
> > > >
> > > >         // consumer receive 5 messages
> > > >
> > > >         for (int i = 0; i < 5; i++) {
> > > >
> > > >             consumer.receive();
> > > >
> > > >         }
> > > >
> > > >
> > > >         admin.topics().unload(topic);
> > > >
> > > >
> > > >         // consumer receive also can receive 5 messages
> > > >
> > > >         Message<String> message = null;
> > > >
> > > >         for (int i = 0; i < 5; i++) {
> > > >
> > > >             message = consumer.receive();
> > > >
> > > >         }
> > > >
> > > >         consumer.acknowledgeCumulativeAsync(message.getMessageId());
> > > >
> > > >     }
> > > >
> > > > ```
> > > >
> > > > Through the above example, the consumer repeatedly consumes the 5
> > > > messages sent by the producer, and acks through cumulative ack. If per
> > > > 1000, 10000 messages cumulative ack once, there will be a lot of
> > > > repeated consumption that may be caused by consumer reconnection.
> > > > Although it does not affect the semantics at at-least-once, it will
> > > > cause a lot of useless overhead.
> > > >
> > > >
> > > > Most importantly it destroys the semantics of pulsar transactions exactly-once.
> > > >
> > > >
> > > > I want to discuss whether we should fix normal and transaction
> > > > cumulative acks in the same way. Prevent repeated consumption of
> > > > messages due to consumer reconnection, and filter messages that users
> > > > have received through `consumer.receive()`. Or do we only guarantee
> > > > excetly-once semantics, only guarantee use transaction will not
> > > > receive the same messages by cumulative ack with the transaction?
> > > >
> > > >
> > > > Please leave your opinion, thanks! :)
> > > >
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Bo

Re: [DISCUSS] Consumer reconnection causes repeated consumption messages

Posted by Haiting Jiang <ji...@gmail.com>.
Hi Bo,

Overall it makes sense to me.
It is basically the same as broker side deduplication mechanism when
producing messages, which uses `sequenceId`.
In your case, messageId is used for deduplication. It should work as
long as the received messageId increases monotonically.

So we should be careful of any operations that would reset the cursor.
For example, if the user resets the cursor with the admin client. We
need more detail info on this matter.
And I am not sure if there are other operations that would reset the
cursor implicitly.

Thanks,
Haiting

On Thu, Sep 8, 2022 at 11:36 PM 丛搏 <co...@gmail.com> wrote:
>
> Hi Haiting
>
> When using cumulative ack, we can save the maximum received MessageId
> on the consumer client side to filter the message duplication caused
> by reconnection, if the consumer client process restarts the maximum
> received MessageId will not exist in the consumer client. This
> requires the user to be responsible for the received messages, and if
> the user wants to re-consume the received messages, they need to call
> `void redeliverUnacknowledgedMessages().` then clear the maximum
> received MessageId from the consumer client
>
> Thanks,
> Bo
>
> Haiting Jiang <ji...@gmail.com> 于2022年9月8日周四 14:51写道:
> >
> > From the user's perspective, I think we should always avoid delivering
> > repeated messages.
> > But can the broker tell if the reconnection is caused by topic
> > unloading or consumer client process restarting?
> > For the latter case, the message should be redelivered, it's the whole
> > point of user explicit acking.
> >
> > Thanks,
> > Haiting
> >
> > On Thu, Sep 8, 2022 at 10:56 AM 丛搏 <bo...@apache.org> wrote:
> > >
> > > Hello, Pulsar community:
> > >
> > >
> > > Now the consumer does not filter messages that have already been
> > > consumed. After consumer reconnection, the broker will dispatch
> > > messages to the consumer from the markDeletePosition. In Failover and
> > > Exclusive subscription type, all messages in a topic will be
> > > dispatched to the same consumer. Let's look at the following example:
> > >
> > > ```
> > >
> > >     @Test
> > >
> > >     public void testConsumerReconnectionRepeatedConsumeMessage()
> > > throws Exception {
> > >
> > >         final String topic = "testConsumerReconnectionRepeatedConsumeMessage";
> > >
> > >         @Cleanup
> > >
> > >         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
> > >
> > >                 .topic(topic).sendTimeout(0,
> > > TimeUnit.SECONDS).enableBatching(false).create();
> > >
> > >         @Cleanup
> > >
> > >         Consumer<String> consumer =
> > > pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive)
> > >
> > >                 .topic(topic).subscriptionName("test").subscribe();
> > >
> > >
> > >         // send 5 message
> > >
> > >         for (int i = 0; i < 5; i++) {
> > >
> > >             producer.send("test" + i);
> > >
> > >         }
> > >
> > >
> > >         // consumer receive 5 messages
> > >
> > >         for (int i = 0; i < 5; i++) {
> > >
> > >             consumer.receive();
> > >
> > >         }
> > >
> > >
> > >         admin.topics().unload(topic);
> > >
> > >
> > >         // consumer receive also can receive 5 messages
> > >
> > >         Message<String> message = null;
> > >
> > >         for (int i = 0; i < 5; i++) {
> > >
> > >             message = consumer.receive();
> > >
> > >         }
> > >
> > >         consumer.acknowledgeCumulativeAsync(message.getMessageId());
> > >
> > >     }
> > >
> > > ```
> > >
> > > Through the above example, the consumer repeatedly consumes the 5
> > > messages sent by the producer, and acks through cumulative ack. If per
> > > 1000, 10000 messages cumulative ack once, there will be a lot of
> > > repeated consumption that may be caused by consumer reconnection.
> > > Although it does not affect the semantics at at-least-once, it will
> > > cause a lot of useless overhead.
> > >
> > >
> > > Most importantly it destroys the semantics of pulsar transactions exactly-once.
> > >
> > >
> > > I want to discuss whether we should fix normal and transaction
> > > cumulative acks in the same way. Prevent repeated consumption of
> > > messages due to consumer reconnection, and filter messages that users
> > > have received through `consumer.receive()`. Or do we only guarantee
> > > excetly-once semantics, only guarantee use transaction will not
> > > receive the same messages by cumulative ack with the transaction?
> > >
> > >
> > > Please leave your opinion, thanks! :)
> > >
> > >
> > >
> > > Thanks,
> > >
> > > Bo

Re: [DISCUSS] Consumer reconnection causes repeated consumption messages

Posted by 丛搏 <co...@gmail.com>.
Hi Haiting

When using cumulative ack, we can save the maximum received MessageId
on the consumer client side to filter the message duplication caused
by reconnection, if the consumer client process restarts the maximum
received MessageId will not exist in the consumer client. This
requires the user to be responsible for the received messages, and if
the user wants to re-consume the received messages, they need to call
`void redeliverUnacknowledgedMessages().` then clear the maximum
received MessageId from the consumer client

Thanks,
Bo

Haiting Jiang <ji...@gmail.com> 于2022年9月8日周四 14:51写道:
>
> From the user's perspective, I think we should always avoid delivering
> repeated messages.
> But can the broker tell if the reconnection is caused by topic
> unloading or consumer client process restarting?
> For the latter case, the message should be redelivered, it's the whole
> point of user explicit acking.
>
> Thanks,
> Haiting
>
> On Thu, Sep 8, 2022 at 10:56 AM 丛搏 <bo...@apache.org> wrote:
> >
> > Hello, Pulsar community:
> >
> >
> > Now the consumer does not filter messages that have already been
> > consumed. After consumer reconnection, the broker will dispatch
> > messages to the consumer from the markDeletePosition. In Failover and
> > Exclusive subscription type, all messages in a topic will be
> > dispatched to the same consumer. Let's look at the following example:
> >
> > ```
> >
> >     @Test
> >
> >     public void testConsumerReconnectionRepeatedConsumeMessage()
> > throws Exception {
> >
> >         final String topic = "testConsumerReconnectionRepeatedConsumeMessage";
> >
> >         @Cleanup
> >
> >         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
> >
> >                 .topic(topic).sendTimeout(0,
> > TimeUnit.SECONDS).enableBatching(false).create();
> >
> >         @Cleanup
> >
> >         Consumer<String> consumer =
> > pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive)
> >
> >                 .topic(topic).subscriptionName("test").subscribe();
> >
> >
> >         // send 5 message
> >
> >         for (int i = 0; i < 5; i++) {
> >
> >             producer.send("test" + i);
> >
> >         }
> >
> >
> >         // consumer receive 5 messages
> >
> >         for (int i = 0; i < 5; i++) {
> >
> >             consumer.receive();
> >
> >         }
> >
> >
> >         admin.topics().unload(topic);
> >
> >
> >         // consumer receive also can receive 5 messages
> >
> >         Message<String> message = null;
> >
> >         for (int i = 0; i < 5; i++) {
> >
> >             message = consumer.receive();
> >
> >         }
> >
> >         consumer.acknowledgeCumulativeAsync(message.getMessageId());
> >
> >     }
> >
> > ```
> >
> > Through the above example, the consumer repeatedly consumes the 5
> > messages sent by the producer, and acks through cumulative ack. If per
> > 1000, 10000 messages cumulative ack once, there will be a lot of
> > repeated consumption that may be caused by consumer reconnection.
> > Although it does not affect the semantics at at-least-once, it will
> > cause a lot of useless overhead.
> >
> >
> > Most importantly it destroys the semantics of pulsar transactions exactly-once.
> >
> >
> > I want to discuss whether we should fix normal and transaction
> > cumulative acks in the same way. Prevent repeated consumption of
> > messages due to consumer reconnection, and filter messages that users
> > have received through `consumer.receive()`. Or do we only guarantee
> > excetly-once semantics, only guarantee use transaction will not
> > receive the same messages by cumulative ack with the transaction?
> >
> >
> > Please leave your opinion, thanks! :)
> >
> >
> >
> > Thanks,
> >
> > Bo

Re: [DISCUSS] Consumer reconnection causes repeated consumption messages

Posted by Haiting Jiang <ji...@gmail.com>.
From the user's perspective, I think we should always avoid delivering
repeated messages.
But can the broker tell if the reconnection is caused by topic
unloading or consumer client process restarting?
For the latter case, the message should be redelivered, it's the whole
point of user explicit acking.

Thanks,
Haiting

On Thu, Sep 8, 2022 at 10:56 AM 丛搏 <bo...@apache.org> wrote:
>
> Hello, Pulsar community:
>
>
> Now the consumer does not filter messages that have already been
> consumed. After consumer reconnection, the broker will dispatch
> messages to the consumer from the markDeletePosition. In Failover and
> Exclusive subscription type, all messages in a topic will be
> dispatched to the same consumer. Let's look at the following example:
>
> ```
>
>     @Test
>
>     public void testConsumerReconnectionRepeatedConsumeMessage()
> throws Exception {
>
>         final String topic = "testConsumerReconnectionRepeatedConsumeMessage";
>
>         @Cleanup
>
>         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
>
>                 .topic(topic).sendTimeout(0,
> TimeUnit.SECONDS).enableBatching(false).create();
>
>         @Cleanup
>
>         Consumer<String> consumer =
> pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive)
>
>                 .topic(topic).subscriptionName("test").subscribe();
>
>
>         // send 5 message
>
>         for (int i = 0; i < 5; i++) {
>
>             producer.send("test" + i);
>
>         }
>
>
>         // consumer receive 5 messages
>
>         for (int i = 0; i < 5; i++) {
>
>             consumer.receive();
>
>         }
>
>
>         admin.topics().unload(topic);
>
>
>         // consumer receive also can receive 5 messages
>
>         Message<String> message = null;
>
>         for (int i = 0; i < 5; i++) {
>
>             message = consumer.receive();
>
>         }
>
>         consumer.acknowledgeCumulativeAsync(message.getMessageId());
>
>     }
>
> ```
>
> Through the above example, the consumer repeatedly consumes the 5
> messages sent by the producer, and acks through cumulative ack. If per
> 1000, 10000 messages cumulative ack once, there will be a lot of
> repeated consumption that may be caused by consumer reconnection.
> Although it does not affect the semantics at at-least-once, it will
> cause a lot of useless overhead.
>
>
> Most importantly it destroys the semantics of pulsar transactions exactly-once.
>
>
> I want to discuss whether we should fix normal and transaction
> cumulative acks in the same way. Prevent repeated consumption of
> messages due to consumer reconnection, and filter messages that users
> have received through `consumer.receive()`. Or do we only guarantee
> excetly-once semantics, only guarantee use transaction will not
> receive the same messages by cumulative ack with the transaction?
>
>
> Please leave your opinion, thanks! :)
>
>
>
> Thanks,
>
> Bo