You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by 丛搏 <co...@gmail.com> on 2023/03/20 14:21:05 UTC

[DISCUSS] PIP-260: Client consumer filter received messages

Hi, pulsar community:

I started a PIP about `Client consumer filter received messages`.

PIP: https://github.com/apache/pulsar/issues/19864

Thanks,
Bo

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by Yunze Xu <yz...@streamnative.io.INVALID>.
Thanks for your explanation. It now makes sense to me. So I suggest:
1. Document this use case in the PIP
2. Document the result that resetting cursor might lead to in the API
doc of this configuration

Thanks,
Yunze

On Wed, Mar 29, 2023 at 9:11 PM 丛搏 <co...@gmail.com> wrote:
>
> Hi, Yunze:
>
> > It's better to describe how it could bring the benefit to transaction
> > use cases, since now it's designed to be a configuration related to
> > the transaction.
> sorry, that I haven't explained in detail why the transaction needs it.
> let's look at a simple example:
>
> ```
> Transaction txn = getTxn();
> int num = 0;
> MessageId messageId = null;
> while (num < 10) {
>     messageId = consumer.receive(5, TimeUnit.SECONDS).getMessageId();
>     producer.newMessage(txn).value(messageId.toString()).sendAsync();
>     num++;
> }
> consumer.acknowledgeCumulativeAsync(messageId);
> txn.commit();
> ```
> This example mainly describes the atomicity of ack and produce of
> 10 messages by a transaction.
> If the messages we receive are duplicates, the messages we
> produce will also be duplicates. Therefore, we need to ensure that
> the messages we receive will not be repeated and are ordered in
> failover and exclusive subscription modes. But the client consumer
> does not currently have this guarantee. And it must be exactly,
> otherwise, it will break the exactly-once semantics
>
>
> > With this proposal and the option enabled, all these cases will filter
> > the messages. That's why I think we have to consider the case for
> > resetting cursors because it makes things worse.
>
> Yes, This configuration may make the reset cursor more
> difficult to use, But without this configuration, it is difficult to guarantee
> the correctness of the transaction. Although we made the reset
> cursor worse, we ensured correctness.
>
> For transaction, we must first consider its correctness, and secondly,
> what features to support (reset cursor eg.)
>
> Thanks,
> Bo
> >
> > The three cases above do not involve transaction operations. So it
> > would be better to understand the benefit if you can show some typical
> > cases involved with transaction operations.
> >
> > Thanks,
> > Yunze
> >
> > On Wed, Mar 29, 2023 at 12:02 PM 丛搏 <co...@gmail.com> wrote:
> > >
> > > Hi, all :
> > >
> > > Thanks to everyone who discussed it.
> > >
> > > Our current care points include the following aspects:
> > >
> > > 1. The filtering efficiency of the client consumer is not as
> > > good as doing something directly in startMessageId
> > > 2. Does not support reset cursor
> > >
> > > Because my previous PIP description is to add configuration
> > > in consumerBuilder. The definition of this configuration is not
> > > clear, and it will cause great trouble to users.
> > >
> > > We can add a separate configuration that is only used for
> > > acks with transactions. Simple example:
> > >
> > > ```
> > > ConsumerBuilder<T> transactionConfiguration(ConsumerTransactionConfiguration);
> > >
> > > @Builder
> > > @Data
> > > @NoArgsConstructor
> > > @AllArgsConstructor
> > > @InterfaceAudience.Public
> > > @InterfaceStability.Stable
> > >
> > > public class ConsumerTransactionConfiguration {
> > >    boolean isFilterReceivedMessagesEnabled = false;
> > > }
> > >
> > > ```
> > >
> > > if the design of startMessageId can provide the feature,
> > > we can remove the configuration, or currently has a startMessageId
> > > closed loop solution, I agree to use startMessageId.
> > >
> > > As for the reset cursor, I think it is another problem,
> > > not related to this PIP.
> > >
> > > Thanks,
> > > Bo
> > >
> > > 丛搏 <co...@gmail.com> 于2023年3月24日周五 18:53写道:
> > > >
> > > > Hi, Michael:
> > > >
> > > > I thought about it carefully, and using 'startMessageId'
> > > > is indeed a good idea. But it is more complicated, we
> > > > need to ensure its absolute correctness, and take
> > > > performance into consideration. If you can come up
> > > >  with a closed-loop solution based on 'startMessageId',
> > > > I support you. If it can't take into account performance
> > > > and correctness, I think we will make a combination of
> > > > our two solutions. You are responsible for ensuring that
> > > > a certain degree of messages are not re-delivered, which
> > > >  reduces the overhead caused by the repeated delivery
> > > > of many messages. My design is responsible for
> > > > the final consistency.
> > > >
> > > > Thanks,
> > > > Bo
> > > >
> > > > Michael Marshall <mm...@apache.org> 于2023年3月22日周三 14:22写道:
> > > > >
> > > > > Because we already send the `startMessageId`, there is a chance where
> > > > > we might not even need to update the protocol for the
> > > > > CommandSubscribe. In light of that, I quickly put together a PR
> > > > > showing how that field might be used to inform the broker where to
> > > > > start the read position for the cursor.
> > > > >
> > > > > https://github.com/apache/pulsar/pull/19892
> > > > >
> > > > > The PR is not complete, but it does convey the general idea. I wrote
> > > > > additional details in the draft's description.
> > > > >
> > > > > Thanks,
> > > > > Michael
> > > > >
> > > > > On Tue, Mar 21, 2023 at 11:31 PM Michael Marshall <mm...@apache.org> wrote:
> > > > > >
> > > > > > I am not following your objections to the protocol solution. It might
> > > > > > be more productive if I provided a draft PR with a sample
> > > > > > implementation. I'm not sure that I'll have time, but I'll try to put
> > > > > > something together this week.
> > > > > >
> > > > > > > At least it will simplify the process of using cumulative ack with the
> > > > > > > transaction.
> > > > > >
> > > > > > Is this the underlying motivation for the PIP?
> > > > > >
> > > > > > From my perspective, the PIP is seeking to decrease duplicate messages
> > > > > > experienced due to disconnections from the broker.
> > > > > >
> > > > > > > The problem of the resetting cursor can be optimized in the future
> > > > > >
> > > > > > Why should we push off solving this problem? It seems fundamental to
> > > > > > this PIP and should not be ignored. At the very least, I think we need
> > > > > > to have an idea of what the future solution would be before we defer
> > > > > > its implementation.
> > > > > >
> > > > > > Thanks,
> > > > > > Michael
> > > > > >
> > > > > >
> > > > > > On Tue, Mar 21, 2023 at 10:52 PM 丛搏 <co...@gmail.com> wrote:
> > > > > > >
> > > > > > > Hi, Michael
> > > > > > > > In this case, the consumer does not have the source of truth for the
> > > > > > > > readPosition. It would leave the new protocol field for `readPosition`
> > > > > > > > empty and the broker would use its source of truth for the read
> > > > > > > > position.
> > > > > > > application has received all the messages by application thread. we also need a
> > > > > > > correct `startPosition`, right? but in your way, we will think about
> > > > > > > the consumer
> > > > > > > hasn't received any messages.
> > > > > > >
> > > > > > > >
> > > > > > > > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > > > > > > > > same logic? it's a bad code.
> > > > > > > >
> > > > > > > > We don't need to synchronize this code here because the logic will
> > > > > > > > come after the consumer has been disconnected from broker a and before
> > > > > > > > it is connected to broker b.
> > > > > > > The application takes a message from the queue then reconnect,
> > > > > > > the SubCommond can use the right startPostion? example:
> > > > > > > 1. application receives one message with `MessageId = 1`
> > > > > > > 2. consumer reconnect discovers the queue is empty, and the
> > > > > > > lastDequeMessageId doesn't change.
> > > > > > > 3. consumer sends a subcommand with MessageId.earliest, the `MessageId = 1`
> > > > > > > will redeliver from broker to client consumer, right?
> > > > > > >
> > > > > > > As we can see in the example, the application also can receive
> > > > > > > `MessageId = 1`, right?
> > > > > > > > We would not need to lock here because we do not enqueue new messages
> > > > > > > > after we've been disconnected from the broker and before we've sent
> > > > > > > > CommandSubscribe.
> > > > > > > we can see the code [0], the thread has changed.
> > > > > > > Where do we guarantee that no new messages will come in?
> > > > > > >
> > > > > > > >
> > > > > > > > Ultimately, I think a protocol solution will yield better results,
> > > > > > > > especially since we'll want to implement this feature in the other
> > > > > > > > client languages.
> > > > > > > The problem of the resetting cursor can be optimized in the future,
> > > > > > > but can you ensure the
> > > > > > > correctness of all the cases I mentioned above? IMO, if we use my
> > > > > > > design, client change,
> > > > > > > we don't need the broker to make any changes. its simple and it's easy
> > > > > > > to implement.
> > > > > > > I can make sure it's completely correct, I can make sure it's
> > > > > > > completely correct. In your design,
> > > > > > > I currently do not see a closed-loop implementation that can achieve
> > > > > > > at least in the java client.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Bo
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Michael
> > > > > > > >
> > > > > > > > On Tue, Mar 21, 2023 at 9:29 PM 丛搏 <co...@gmail.com> wrote:
> > > > > > > > >
> > > > > > > > > Hi, Michael:
> > > > > > > > >
> > > > > > > > > Michael Marshall <mm...@apache.org> 于2023年3月21日周二 23:17写道:
> > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > One more point. Instead of keeping track of the latest message seen by
> > > > > > > > > > the application, the logic in my solution would actually just check
> > > > > > > > > > the last message in the `incomingMessages` queue (as in the most
> > > > > > > > > > recently added), and use that as the read position in the subscribe
> > > > > > > > > > command. If we made this change, we would have to change this code [0]
> > > > > > > > > > to not drop the `incomingMessages` queue.
> > > > > > > > >
> > > > > > > > > case 1:
> > > > > > > > > What we define the message that the application has seen?
> > > > > > > > > I think it is the[0], when the `incomingMessages` queue is empty,
> > > > > > > > > how do we get the correct `startPosition`?
> > > > > > > > > What I think we should lock the receive logic in [1]
> > > > > > > > > ```
> > > > > > > > > synchronized (this) {
> > > > > > > > >     message = incomingMessages.take();
> > > > > > > > >     messageProcessed(message);
> > > > > > > > > }
> > > > > > > > > ```
> > > > > > > > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > > > > > > > > same logic? it's a bad code.
> > > > > > > > >
> > > > > > > > > case 2:
> > > > > > > > > If we sub with `startMessageId`, we also should lock any enqueue
> > > > > > > > > logic, like [2] and
> > > > > > > > > check to consumer's current state
> > > > > > > > > ```
> > > > > > > > > synchronized (this) {
> > > > > > > > >     if (consumer.isConnected) {
> > > > > > > > >         if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
> > > > > > > > >             // After we have enqueued the messages on
> > > > > > > > > `incomingMessages` queue, we cannot touch the message
> > > > > > > > >             // instance anymore, since for pooled messages, this
> > > > > > > > > instance was possibly already been released
> > > > > > > > >             // and recycled.
> > > > > > > > >             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
> > > > > > > > >             getMemoryLimitController().ifPresent(limiter ->
> > > > > > > > > limiter.forceReserveMemory(messageSize));
> > > > > > > > >             updateAutoScaleReceiverQueueHint();
> > > > > > > > >         }
> > > > > > > > >     }
> > > > > > > > > }
> > > > > > > > > ```
> > > > > > > > > case 3:
> > > > > > > > > when we subcommand sends to broker with `startMessageId = 1`, then the
> > > > > > > > > broker push message
> > > > > > > > > has not yet entered `incommingQueue`, the application invokes
> > > > > > > > > redeliver. in this way, we don't
> > > > > > > > > filter messages are correct, right?
> > > > > > > > >
> > > > > > > > > These are some cases that I simply thought of, and there must be
> > > > > > > > > others that I haven't thought
> > > > > > > > > of. Are you sure we can handle these problems correctly?
> > > > > > > > >
> > > > > > > > > > The problem of "the consumer doesn't know" seems like something that
> > > > > > > > > > is reasonably within the protocol's responsibilities. In this case, an
> > > > > > > > > > event happens on the broker, and the broker can tell the consumer.
> > > > > > > > >
> > > > > > > > > I don't think a simple change protocol can solve these problems,
> > > > > > > > > We can't promise that every consumer can receive the broker reset
> > > > > > > > > cursor request.
> > > > > > > > > When the consumer reconnects, the broker can't send the reset cursor request to
> > > > > > > > > the client consumers, right? In this case, the consumer is still unaware, right?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > [0] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
> > > > > > > > > [1] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
> > > > > > > > > [2] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Michael
> > > > > > > > > >
> > > > > > > > > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > > > > > >
> > > > > > > > > > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <mm...@apache.org> wrote:
> > > > > > > > > > >
> > > > > > > > > > > > if we add the new field in CommandSubscribe, we should ensure
> > > > > > > > > > > > the synchronization between consumer reconnection and user
> > > > > > > > > > > > calling receive and redeliverUnack method. it will affect the performance
> > > > > > > > > > > > of receive. expose synchronization to hot paths it not a good idea.
> > > > > > > > > > >
> > > > > > > > > > > I don't think this is a valid objection. I am pretty sure we already
> > > > > > > > > > > synchronize in the relevant places in the consumer to solve the exact
> > > > > > > > > > > race condition you're concerned about: [0] [1].
> > > > > > > > > > >
> > > > > > > > > > > My proposed operation is to keep track of the latest message id that
> > > > > > > > > > > the application has seen, and then tell the broker that id when
> > > > > > > > > > > sending the Subscribe command. We already do similar logic here [2]
> > > > > > > > > > > [3], but instead of getting the first message id the consumer hasn't
> > > > > > > > > > > seen, we'll get the latest message id seen.
> > > > > > > > > > >
> > > > > > > > > > > Regarding performance, the PIP doesn't touch on how it will filter out
> > > > > > > > > > > messages. What is the planned approach? In my understanding, the
> > > > > > > > > > > client will keep track of the latest message id that the application
> > > > > > > > > > > has seen and then will need to compare that message id against every
> > > > > > > > > > > new mess. As such, it seems like telling the broker where to start
> > > > > > > > > > > instead of naively checking a filter on every message would be
> > > > > > > > > > > cheaper.
> > > > > > > > > > >
> > > > > > > > > > > > As described in Compatibility in PIP. Client consumer doesn't know
> > > > > > > > > > > > Pulsar Admin reset cursor.
> > > > > > > > > > >
> > > > > > > > > > > The problem of "the consumer doesn't know" seems like something that
> > > > > > > > > > > is reasonably within the protocol's responsibilities. In this case, an
> > > > > > > > > > > event happens on the broker, and the broker can tell the consumer.
> > > > > > > > > > >
> > > > > > > > > > > > * <p>Consumers should close when the server resets the cursor,
> > > > > > > > > > > > * when the cursor reset success, and then restart. Otherwise,
> > > > > > > > > > > > * the consumer will not receive the history messages.
> > > > > > > > > > >
> > > > > > > > > > > This is introducing a confusing edge case that requires reading a
> > > > > > > > > > > Javadoc in order to understand. That seems risky to me, and I do not
> > > > > > > > > > > think we should add such an edge case. A new protocol message would
> > > > > > > > > > > easily handle it and make it transparent to the application.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Michael
> > > > > > > > > > >
> > > > > > > > > > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> > > > > > > > > > > [1] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> > > > > > > > > > > [2] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > > > > > > > [3] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> > > > > > > > > > > <yu...@streamnative.io.invalid> wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > +1
> > > > > > > > > > > >
> > > > > > > > > > > > Hi, Bo :
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for your explanation. That makes sense to me.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Yubiao Feng
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi, pulsar community:
> > > > > > > > > > > > >
> > > > > > > > > > > > > I started a PIP about `Client consumer filter received messages`.
> > > > > > > > > > > > >
> > > > > > > > > > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Bo
> > > > > > > > > > > > >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by 丛搏 <co...@gmail.com>.
Hi, Yunze:

> It's better to describe how it could bring the benefit to transaction
> use cases, since now it's designed to be a configuration related to
> the transaction.
sorry, that I haven't explained in detail why the transaction needs it.
let's look at a simple example:

```
Transaction txn = getTxn();
int num = 0;
MessageId messageId = null;
while (num < 10) {
    messageId = consumer.receive(5, TimeUnit.SECONDS).getMessageId();
    producer.newMessage(txn).value(messageId.toString()).sendAsync();
    num++;
}
consumer.acknowledgeCumulativeAsync(messageId);
txn.commit();
```
This example mainly describes the atomicity of ack and produce of
10 messages by a transaction.
If the messages we receive are duplicates, the messages we
produce will also be duplicates. Therefore, we need to ensure that
the messages we receive will not be repeated and are ordered in
failover and exclusive subscription modes. But the client consumer
does not currently have this guarantee. And it must be exactly,
otherwise, it will break the exactly-once semantics


> With this proposal and the option enabled, all these cases will filter
> the messages. That's why I think we have to consider the case for
> resetting cursors because it makes things worse.

Yes, This configuration may make the reset cursor more
difficult to use, But without this configuration, it is difficult to guarantee
the correctness of the transaction. Although we made the reset
cursor worse, we ensured correctness.

For transaction, we must first consider its correctness, and secondly,
what features to support (reset cursor eg.)

Thanks,
Bo
>
> The three cases above do not involve transaction operations. So it
> would be better to understand the benefit if you can show some typical
> cases involved with transaction operations.
>
> Thanks,
> Yunze
>
> On Wed, Mar 29, 2023 at 12:02 PM 丛搏 <co...@gmail.com> wrote:
> >
> > Hi, all :
> >
> > Thanks to everyone who discussed it.
> >
> > Our current care points include the following aspects:
> >
> > 1. The filtering efficiency of the client consumer is not as
> > good as doing something directly in startMessageId
> > 2. Does not support reset cursor
> >
> > Because my previous PIP description is to add configuration
> > in consumerBuilder. The definition of this configuration is not
> > clear, and it will cause great trouble to users.
> >
> > We can add a separate configuration that is only used for
> > acks with transactions. Simple example:
> >
> > ```
> > ConsumerBuilder<T> transactionConfiguration(ConsumerTransactionConfiguration);
> >
> > @Builder
> > @Data
> > @NoArgsConstructor
> > @AllArgsConstructor
> > @InterfaceAudience.Public
> > @InterfaceStability.Stable
> >
> > public class ConsumerTransactionConfiguration {
> >    boolean isFilterReceivedMessagesEnabled = false;
> > }
> >
> > ```
> >
> > if the design of startMessageId can provide the feature,
> > we can remove the configuration, or currently has a startMessageId
> > closed loop solution, I agree to use startMessageId.
> >
> > As for the reset cursor, I think it is another problem,
> > not related to this PIP.
> >
> > Thanks,
> > Bo
> >
> > 丛搏 <co...@gmail.com> 于2023年3月24日周五 18:53写道:
> > >
> > > Hi, Michael:
> > >
> > > I thought about it carefully, and using 'startMessageId'
> > > is indeed a good idea. But it is more complicated, we
> > > need to ensure its absolute correctness, and take
> > > performance into consideration. If you can come up
> > >  with a closed-loop solution based on 'startMessageId',
> > > I support you. If it can't take into account performance
> > > and correctness, I think we will make a combination of
> > > our two solutions. You are responsible for ensuring that
> > > a certain degree of messages are not re-delivered, which
> > >  reduces the overhead caused by the repeated delivery
> > > of many messages. My design is responsible for
> > > the final consistency.
> > >
> > > Thanks,
> > > Bo
> > >
> > > Michael Marshall <mm...@apache.org> 于2023年3月22日周三 14:22写道:
> > > >
> > > > Because we already send the `startMessageId`, there is a chance where
> > > > we might not even need to update the protocol for the
> > > > CommandSubscribe. In light of that, I quickly put together a PR
> > > > showing how that field might be used to inform the broker where to
> > > > start the read position for the cursor.
> > > >
> > > > https://github.com/apache/pulsar/pull/19892
> > > >
> > > > The PR is not complete, but it does convey the general idea. I wrote
> > > > additional details in the draft's description.
> > > >
> > > > Thanks,
> > > > Michael
> > > >
> > > > On Tue, Mar 21, 2023 at 11:31 PM Michael Marshall <mm...@apache.org> wrote:
> > > > >
> > > > > I am not following your objections to the protocol solution. It might
> > > > > be more productive if I provided a draft PR with a sample
> > > > > implementation. I'm not sure that I'll have time, but I'll try to put
> > > > > something together this week.
> > > > >
> > > > > > At least it will simplify the process of using cumulative ack with the
> > > > > > transaction.
> > > > >
> > > > > Is this the underlying motivation for the PIP?
> > > > >
> > > > > From my perspective, the PIP is seeking to decrease duplicate messages
> > > > > experienced due to disconnections from the broker.
> > > > >
> > > > > > The problem of the resetting cursor can be optimized in the future
> > > > >
> > > > > Why should we push off solving this problem? It seems fundamental to
> > > > > this PIP and should not be ignored. At the very least, I think we need
> > > > > to have an idea of what the future solution would be before we defer
> > > > > its implementation.
> > > > >
> > > > > Thanks,
> > > > > Michael
> > > > >
> > > > >
> > > > > On Tue, Mar 21, 2023 at 10:52 PM 丛搏 <co...@gmail.com> wrote:
> > > > > >
> > > > > > Hi, Michael
> > > > > > > In this case, the consumer does not have the source of truth for the
> > > > > > > readPosition. It would leave the new protocol field for `readPosition`
> > > > > > > empty and the broker would use its source of truth for the read
> > > > > > > position.
> > > > > > application has received all the messages by application thread. we also need a
> > > > > > correct `startPosition`, right? but in your way, we will think about
> > > > > > the consumer
> > > > > > hasn't received any messages.
> > > > > >
> > > > > > >
> > > > > > > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > > > > > > > same logic? it's a bad code.
> > > > > > >
> > > > > > > We don't need to synchronize this code here because the logic will
> > > > > > > come after the consumer has been disconnected from broker a and before
> > > > > > > it is connected to broker b.
> > > > > > The application takes a message from the queue then reconnect,
> > > > > > the SubCommond can use the right startPostion? example:
> > > > > > 1. application receives one message with `MessageId = 1`
> > > > > > 2. consumer reconnect discovers the queue is empty, and the
> > > > > > lastDequeMessageId doesn't change.
> > > > > > 3. consumer sends a subcommand with MessageId.earliest, the `MessageId = 1`
> > > > > > will redeliver from broker to client consumer, right?
> > > > > >
> > > > > > As we can see in the example, the application also can receive
> > > > > > `MessageId = 1`, right?
> > > > > > > We would not need to lock here because we do not enqueue new messages
> > > > > > > after we've been disconnected from the broker and before we've sent
> > > > > > > CommandSubscribe.
> > > > > > we can see the code [0], the thread has changed.
> > > > > > Where do we guarantee that no new messages will come in?
> > > > > >
> > > > > > >
> > > > > > > Ultimately, I think a protocol solution will yield better results,
> > > > > > > especially since we'll want to implement this feature in the other
> > > > > > > client languages.
> > > > > > The problem of the resetting cursor can be optimized in the future,
> > > > > > but can you ensure the
> > > > > > correctness of all the cases I mentioned above? IMO, if we use my
> > > > > > design, client change,
> > > > > > we don't need the broker to make any changes. its simple and it's easy
> > > > > > to implement.
> > > > > > I can make sure it's completely correct, I can make sure it's
> > > > > > completely correct. In your design,
> > > > > > I currently do not see a closed-loop implementation that can achieve
> > > > > > at least in the java client.
> > > > > >
> > > > > > Thanks,
> > > > > > Bo
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Michael
> > > > > > >
> > > > > > > On Tue, Mar 21, 2023 at 9:29 PM 丛搏 <co...@gmail.com> wrote:
> > > > > > > >
> > > > > > > > Hi, Michael:
> > > > > > > >
> > > > > > > > Michael Marshall <mm...@apache.org> 于2023年3月21日周二 23:17写道:
> > > > > > > >
> > > > > > > > >
> > > > > > > > > One more point. Instead of keeping track of the latest message seen by
> > > > > > > > > the application, the logic in my solution would actually just check
> > > > > > > > > the last message in the `incomingMessages` queue (as in the most
> > > > > > > > > recently added), and use that as the read position in the subscribe
> > > > > > > > > command. If we made this change, we would have to change this code [0]
> > > > > > > > > to not drop the `incomingMessages` queue.
> > > > > > > >
> > > > > > > > case 1:
> > > > > > > > What we define the message that the application has seen?
> > > > > > > > I think it is the[0], when the `incomingMessages` queue is empty,
> > > > > > > > how do we get the correct `startPosition`?
> > > > > > > > What I think we should lock the receive logic in [1]
> > > > > > > > ```
> > > > > > > > synchronized (this) {
> > > > > > > >     message = incomingMessages.take();
> > > > > > > >     messageProcessed(message);
> > > > > > > > }
> > > > > > > > ```
> > > > > > > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > > > > > > > same logic? it's a bad code.
> > > > > > > >
> > > > > > > > case 2:
> > > > > > > > If we sub with `startMessageId`, we also should lock any enqueue
> > > > > > > > logic, like [2] and
> > > > > > > > check to consumer's current state
> > > > > > > > ```
> > > > > > > > synchronized (this) {
> > > > > > > >     if (consumer.isConnected) {
> > > > > > > >         if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
> > > > > > > >             // After we have enqueued the messages on
> > > > > > > > `incomingMessages` queue, we cannot touch the message
> > > > > > > >             // instance anymore, since for pooled messages, this
> > > > > > > > instance was possibly already been released
> > > > > > > >             // and recycled.
> > > > > > > >             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
> > > > > > > >             getMemoryLimitController().ifPresent(limiter ->
> > > > > > > > limiter.forceReserveMemory(messageSize));
> > > > > > > >             updateAutoScaleReceiverQueueHint();
> > > > > > > >         }
> > > > > > > >     }
> > > > > > > > }
> > > > > > > > ```
> > > > > > > > case 3:
> > > > > > > > when we subcommand sends to broker with `startMessageId = 1`, then the
> > > > > > > > broker push message
> > > > > > > > has not yet entered `incommingQueue`, the application invokes
> > > > > > > > redeliver. in this way, we don't
> > > > > > > > filter messages are correct, right?
> > > > > > > >
> > > > > > > > These are some cases that I simply thought of, and there must be
> > > > > > > > others that I haven't thought
> > > > > > > > of. Are you sure we can handle these problems correctly?
> > > > > > > >
> > > > > > > > > The problem of "the consumer doesn't know" seems like something that
> > > > > > > > > is reasonably within the protocol's responsibilities. In this case, an
> > > > > > > > > event happens on the broker, and the broker can tell the consumer.
> > > > > > > >
> > > > > > > > I don't think a simple change protocol can solve these problems,
> > > > > > > > We can't promise that every consumer can receive the broker reset
> > > > > > > > cursor request.
> > > > > > > > When the consumer reconnects, the broker can't send the reset cursor request to
> > > > > > > > the client consumers, right? In this case, the consumer is still unaware, right?
> > > > > > > >
> > > > > > > >
> > > > > > > > [0] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
> > > > > > > > [1] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
> > > > > > > > [2] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Michael
> > > > > > > > >
> > > > > > > > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > > > > >
> > > > > > > > > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <mm...@apache.org> wrote:
> > > > > > > > > >
> > > > > > > > > > > if we add the new field in CommandSubscribe, we should ensure
> > > > > > > > > > > the synchronization between consumer reconnection and user
> > > > > > > > > > > calling receive and redeliverUnack method. it will affect the performance
> > > > > > > > > > > of receive. expose synchronization to hot paths it not a good idea.
> > > > > > > > > >
> > > > > > > > > > I don't think this is a valid objection. I am pretty sure we already
> > > > > > > > > > synchronize in the relevant places in the consumer to solve the exact
> > > > > > > > > > race condition you're concerned about: [0] [1].
> > > > > > > > > >
> > > > > > > > > > My proposed operation is to keep track of the latest message id that
> > > > > > > > > > the application has seen, and then tell the broker that id when
> > > > > > > > > > sending the Subscribe command. We already do similar logic here [2]
> > > > > > > > > > [3], but instead of getting the first message id the consumer hasn't
> > > > > > > > > > seen, we'll get the latest message id seen.
> > > > > > > > > >
> > > > > > > > > > Regarding performance, the PIP doesn't touch on how it will filter out
> > > > > > > > > > messages. What is the planned approach? In my understanding, the
> > > > > > > > > > client will keep track of the latest message id that the application
> > > > > > > > > > has seen and then will need to compare that message id against every
> > > > > > > > > > new mess. As such, it seems like telling the broker where to start
> > > > > > > > > > instead of naively checking a filter on every message would be
> > > > > > > > > > cheaper.
> > > > > > > > > >
> > > > > > > > > > > As described in Compatibility in PIP. Client consumer doesn't know
> > > > > > > > > > > Pulsar Admin reset cursor.
> > > > > > > > > >
> > > > > > > > > > The problem of "the consumer doesn't know" seems like something that
> > > > > > > > > > is reasonably within the protocol's responsibilities. In this case, an
> > > > > > > > > > event happens on the broker, and the broker can tell the consumer.
> > > > > > > > > >
> > > > > > > > > > > * <p>Consumers should close when the server resets the cursor,
> > > > > > > > > > > * when the cursor reset success, and then restart. Otherwise,
> > > > > > > > > > > * the consumer will not receive the history messages.
> > > > > > > > > >
> > > > > > > > > > This is introducing a confusing edge case that requires reading a
> > > > > > > > > > Javadoc in order to understand. That seems risky to me, and I do not
> > > > > > > > > > think we should add such an edge case. A new protocol message would
> > > > > > > > > > easily handle it and make it transparent to the application.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Michael
> > > > > > > > > >
> > > > > > > > > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> > > > > > > > > > [1] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> > > > > > > > > > [2] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > > > > > > [3] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
> > > > > > > > > >
> > > > > > > > > > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> > > > > > > > > > <yu...@streamnative.io.invalid> wrote:
> > > > > > > > > > >
> > > > > > > > > > > +1
> > > > > > > > > > >
> > > > > > > > > > > Hi, Bo :
> > > > > > > > > > >
> > > > > > > > > > > Thanks for your explanation. That makes sense to me.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Yubiao Feng
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi, pulsar community:
> > > > > > > > > > > >
> > > > > > > > > > > > I started a PIP about `Client consumer filter received messages`.
> > > > > > > > > > > >
> > > > > > > > > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Bo
> > > > > > > > > > > >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by Yunze Xu <yz...@streamnative.io.INVALID>.
It's better to describe how it could bring the benefit to transaction
use cases, since now it's designed to be a configuration related to
the transaction.

I've thought about some cases:
1. A consumer received N messages, then the cursor was reset to the earliest.
2. A consumer received N messages and acknowledged all of them, then
the cursor was reset to the earliest.
2.1 The acknowledgment is flushed
2.2 The acknowledgment is not flushed

Without this proposal, only 2.2 will filter all these messages because
the MessageIds are cached in
`PersistentAcknowledgmentsGroupingTracker#pendingIndividualAcks`. It's
an existing bug and I agree that we can discuss how to solve this in
another proposal. (e.g. distinguish the normal network issue and
cursor reset)

With this proposal and the option enabled, all these cases will filter
the messages. That's why I think we have to consider the case for
resetting cursors because it makes things worse.

The three cases above do not involve transaction operations. So it
would be better to understand the benefit if you can show some typical
cases involved with transaction operations.

Thanks,
Yunze

On Wed, Mar 29, 2023 at 12:02 PM 丛搏 <co...@gmail.com> wrote:
>
> Hi, all :
>
> Thanks to everyone who discussed it.
>
> Our current care points include the following aspects:
>
> 1. The filtering efficiency of the client consumer is not as
> good as doing something directly in startMessageId
> 2. Does not support reset cursor
>
> Because my previous PIP description is to add configuration
> in consumerBuilder. The definition of this configuration is not
> clear, and it will cause great trouble to users.
>
> We can add a separate configuration that is only used for
> acks with transactions. Simple example:
>
> ```
> ConsumerBuilder<T> transactionConfiguration(ConsumerTransactionConfiguration);
>
> @Builder
> @Data
> @NoArgsConstructor
> @AllArgsConstructor
> @InterfaceAudience.Public
> @InterfaceStability.Stable
>
> public class ConsumerTransactionConfiguration {
>    boolean isFilterReceivedMessagesEnabled = false;
> }
>
> ```
>
> if the design of startMessageId can provide the feature,
> we can remove the configuration, or currently has a startMessageId
> closed loop solution, I agree to use startMessageId.
>
> As for the reset cursor, I think it is another problem,
> not related to this PIP.
>
> Thanks,
> Bo
>
> 丛搏 <co...@gmail.com> 于2023年3月24日周五 18:53写道:
> >
> > Hi, Michael:
> >
> > I thought about it carefully, and using 'startMessageId'
> > is indeed a good idea. But it is more complicated, we
> > need to ensure its absolute correctness, and take
> > performance into consideration. If you can come up
> >  with a closed-loop solution based on 'startMessageId',
> > I support you. If it can't take into account performance
> > and correctness, I think we will make a combination of
> > our two solutions. You are responsible for ensuring that
> > a certain degree of messages are not re-delivered, which
> >  reduces the overhead caused by the repeated delivery
> > of many messages. My design is responsible for
> > the final consistency.
> >
> > Thanks,
> > Bo
> >
> > Michael Marshall <mm...@apache.org> 于2023年3月22日周三 14:22写道:
> > >
> > > Because we already send the `startMessageId`, there is a chance where
> > > we might not even need to update the protocol for the
> > > CommandSubscribe. In light of that, I quickly put together a PR
> > > showing how that field might be used to inform the broker where to
> > > start the read position for the cursor.
> > >
> > > https://github.com/apache/pulsar/pull/19892
> > >
> > > The PR is not complete, but it does convey the general idea. I wrote
> > > additional details in the draft's description.
> > >
> > > Thanks,
> > > Michael
> > >
> > > On Tue, Mar 21, 2023 at 11:31 PM Michael Marshall <mm...@apache.org> wrote:
> > > >
> > > > I am not following your objections to the protocol solution. It might
> > > > be more productive if I provided a draft PR with a sample
> > > > implementation. I'm not sure that I'll have time, but I'll try to put
> > > > something together this week.
> > > >
> > > > > At least it will simplify the process of using cumulative ack with the
> > > > > transaction.
> > > >
> > > > Is this the underlying motivation for the PIP?
> > > >
> > > > From my perspective, the PIP is seeking to decrease duplicate messages
> > > > experienced due to disconnections from the broker.
> > > >
> > > > > The problem of the resetting cursor can be optimized in the future
> > > >
> > > > Why should we push off solving this problem? It seems fundamental to
> > > > this PIP and should not be ignored. At the very least, I think we need
> > > > to have an idea of what the future solution would be before we defer
> > > > its implementation.
> > > >
> > > > Thanks,
> > > > Michael
> > > >
> > > >
> > > > On Tue, Mar 21, 2023 at 10:52 PM 丛搏 <co...@gmail.com> wrote:
> > > > >
> > > > > Hi, Michael
> > > > > > In this case, the consumer does not have the source of truth for the
> > > > > > readPosition. It would leave the new protocol field for `readPosition`
> > > > > > empty and the broker would use its source of truth for the read
> > > > > > position.
> > > > > application has received all the messages by application thread. we also need a
> > > > > correct `startPosition`, right? but in your way, we will think about
> > > > > the consumer
> > > > > hasn't received any messages.
> > > > >
> > > > > >
> > > > > > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > > > > > > same logic? it's a bad code.
> > > > > >
> > > > > > We don't need to synchronize this code here because the logic will
> > > > > > come after the consumer has been disconnected from broker a and before
> > > > > > it is connected to broker b.
> > > > > The application takes a message from the queue then reconnect,
> > > > > the SubCommond can use the right startPostion? example:
> > > > > 1. application receives one message with `MessageId = 1`
> > > > > 2. consumer reconnect discovers the queue is empty, and the
> > > > > lastDequeMessageId doesn't change.
> > > > > 3. consumer sends a subcommand with MessageId.earliest, the `MessageId = 1`
> > > > > will redeliver from broker to client consumer, right?
> > > > >
> > > > > As we can see in the example, the application also can receive
> > > > > `MessageId = 1`, right?
> > > > > > We would not need to lock here because we do not enqueue new messages
> > > > > > after we've been disconnected from the broker and before we've sent
> > > > > > CommandSubscribe.
> > > > > we can see the code [0], the thread has changed.
> > > > > Where do we guarantee that no new messages will come in?
> > > > >
> > > > > >
> > > > > > Ultimately, I think a protocol solution will yield better results,
> > > > > > especially since we'll want to implement this feature in the other
> > > > > > client languages.
> > > > > The problem of the resetting cursor can be optimized in the future,
> > > > > but can you ensure the
> > > > > correctness of all the cases I mentioned above? IMO, if we use my
> > > > > design, client change,
> > > > > we don't need the broker to make any changes. its simple and it's easy
> > > > > to implement.
> > > > > I can make sure it's completely correct, I can make sure it's
> > > > > completely correct. In your design,
> > > > > I currently do not see a closed-loop implementation that can achieve
> > > > > at least in the java client.
> > > > >
> > > > > Thanks,
> > > > > Bo
> > > > > >
> > > > > > Thanks,
> > > > > > Michael
> > > > > >
> > > > > > On Tue, Mar 21, 2023 at 9:29 PM 丛搏 <co...@gmail.com> wrote:
> > > > > > >
> > > > > > > Hi, Michael:
> > > > > > >
> > > > > > > Michael Marshall <mm...@apache.org> 于2023年3月21日周二 23:17写道:
> > > > > > >
> > > > > > > >
> > > > > > > > One more point. Instead of keeping track of the latest message seen by
> > > > > > > > the application, the logic in my solution would actually just check
> > > > > > > > the last message in the `incomingMessages` queue (as in the most
> > > > > > > > recently added), and use that as the read position in the subscribe
> > > > > > > > command. If we made this change, we would have to change this code [0]
> > > > > > > > to not drop the `incomingMessages` queue.
> > > > > > >
> > > > > > > case 1:
> > > > > > > What we define the message that the application has seen?
> > > > > > > I think it is the[0], when the `incomingMessages` queue is empty,
> > > > > > > how do we get the correct `startPosition`?
> > > > > > > What I think we should lock the receive logic in [1]
> > > > > > > ```
> > > > > > > synchronized (this) {
> > > > > > >     message = incomingMessages.take();
> > > > > > >     messageProcessed(message);
> > > > > > > }
> > > > > > > ```
> > > > > > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > > > > > > same logic? it's a bad code.
> > > > > > >
> > > > > > > case 2:
> > > > > > > If we sub with `startMessageId`, we also should lock any enqueue
> > > > > > > logic, like [2] and
> > > > > > > check to consumer's current state
> > > > > > > ```
> > > > > > > synchronized (this) {
> > > > > > >     if (consumer.isConnected) {
> > > > > > >         if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
> > > > > > >             // After we have enqueued the messages on
> > > > > > > `incomingMessages` queue, we cannot touch the message
> > > > > > >             // instance anymore, since for pooled messages, this
> > > > > > > instance was possibly already been released
> > > > > > >             // and recycled.
> > > > > > >             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
> > > > > > >             getMemoryLimitController().ifPresent(limiter ->
> > > > > > > limiter.forceReserveMemory(messageSize));
> > > > > > >             updateAutoScaleReceiverQueueHint();
> > > > > > >         }
> > > > > > >     }
> > > > > > > }
> > > > > > > ```
> > > > > > > case 3:
> > > > > > > when we subcommand sends to broker with `startMessageId = 1`, then the
> > > > > > > broker push message
> > > > > > > has not yet entered `incommingQueue`, the application invokes
> > > > > > > redeliver. in this way, we don't
> > > > > > > filter messages are correct, right?
> > > > > > >
> > > > > > > These are some cases that I simply thought of, and there must be
> > > > > > > others that I haven't thought
> > > > > > > of. Are you sure we can handle these problems correctly?
> > > > > > >
> > > > > > > > The problem of "the consumer doesn't know" seems like something that
> > > > > > > > is reasonably within the protocol's responsibilities. In this case, an
> > > > > > > > event happens on the broker, and the broker can tell the consumer.
> > > > > > >
> > > > > > > I don't think a simple change protocol can solve these problems,
> > > > > > > We can't promise that every consumer can receive the broker reset
> > > > > > > cursor request.
> > > > > > > When the consumer reconnects, the broker can't send the reset cursor request to
> > > > > > > the client consumers, right? In this case, the consumer is still unaware, right?
> > > > > > >
> > > > > > >
> > > > > > > [0] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
> > > > > > > [1] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
> > > > > > > [2] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Michael
> > > > > > > >
> > > > > > > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > > > >
> > > > > > > > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <mm...@apache.org> wrote:
> > > > > > > > >
> > > > > > > > > > if we add the new field in CommandSubscribe, we should ensure
> > > > > > > > > > the synchronization between consumer reconnection and user
> > > > > > > > > > calling receive and redeliverUnack method. it will affect the performance
> > > > > > > > > > of receive. expose synchronization to hot paths it not a good idea.
> > > > > > > > >
> > > > > > > > > I don't think this is a valid objection. I am pretty sure we already
> > > > > > > > > synchronize in the relevant places in the consumer to solve the exact
> > > > > > > > > race condition you're concerned about: [0] [1].
> > > > > > > > >
> > > > > > > > > My proposed operation is to keep track of the latest message id that
> > > > > > > > > the application has seen, and then tell the broker that id when
> > > > > > > > > sending the Subscribe command. We already do similar logic here [2]
> > > > > > > > > [3], but instead of getting the first message id the consumer hasn't
> > > > > > > > > seen, we'll get the latest message id seen.
> > > > > > > > >
> > > > > > > > > Regarding performance, the PIP doesn't touch on how it will filter out
> > > > > > > > > messages. What is the planned approach? In my understanding, the
> > > > > > > > > client will keep track of the latest message id that the application
> > > > > > > > > has seen and then will need to compare that message id against every
> > > > > > > > > new mess. As such, it seems like telling the broker where to start
> > > > > > > > > instead of naively checking a filter on every message would be
> > > > > > > > > cheaper.
> > > > > > > > >
> > > > > > > > > > As described in Compatibility in PIP. Client consumer doesn't know
> > > > > > > > > > Pulsar Admin reset cursor.
> > > > > > > > >
> > > > > > > > > The problem of "the consumer doesn't know" seems like something that
> > > > > > > > > is reasonably within the protocol's responsibilities. In this case, an
> > > > > > > > > event happens on the broker, and the broker can tell the consumer.
> > > > > > > > >
> > > > > > > > > > * <p>Consumers should close when the server resets the cursor,
> > > > > > > > > > * when the cursor reset success, and then restart. Otherwise,
> > > > > > > > > > * the consumer will not receive the history messages.
> > > > > > > > >
> > > > > > > > > This is introducing a confusing edge case that requires reading a
> > > > > > > > > Javadoc in order to understand. That seems risky to me, and I do not
> > > > > > > > > think we should add such an edge case. A new protocol message would
> > > > > > > > > easily handle it and make it transparent to the application.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Michael
> > > > > > > > >
> > > > > > > > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> > > > > > > > > [1] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> > > > > > > > > [2] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > > > > > [3] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
> > > > > > > > >
> > > > > > > > > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> > > > > > > > > <yu...@streamnative.io.invalid> wrote:
> > > > > > > > > >
> > > > > > > > > > +1
> > > > > > > > > >
> > > > > > > > > > Hi, Bo :
> > > > > > > > > >
> > > > > > > > > > Thanks for your explanation. That makes sense to me.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Yubiao Feng
> > > > > > > > > >
> > > > > > > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi, pulsar community:
> > > > > > > > > > >
> > > > > > > > > > > I started a PIP about `Client consumer filter received messages`.
> > > > > > > > > > >
> > > > > > > > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Bo
> > > > > > > > > > >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by 丛搏 <co...@gmail.com>.
Hi, all :

Thanks to everyone who discussed it.

Our current care points include the following aspects:

1. The filtering efficiency of the client consumer is not as
good as doing something directly in startMessageId
2. Does not support reset cursor

Because my previous PIP description is to add configuration
in consumerBuilder. The definition of this configuration is not
clear, and it will cause great trouble to users.

We can add a separate configuration that is only used for
acks with transactions. Simple example:

```
ConsumerBuilder<T> transactionConfiguration(ConsumerTransactionConfiguration);

@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
@InterfaceAudience.Public
@InterfaceStability.Stable

public class ConsumerTransactionConfiguration {
   boolean isFilterReceivedMessagesEnabled = false;
}

```

if the design of startMessageId can provide the feature,
we can remove the configuration, or currently has a startMessageId
closed loop solution, I agree to use startMessageId.

As for the reset cursor, I think it is another problem,
not related to this PIP.

Thanks,
Bo

丛搏 <co...@gmail.com> 于2023年3月24日周五 18:53写道:
>
> Hi, Michael:
>
> I thought about it carefully, and using 'startMessageId'
> is indeed a good idea. But it is more complicated, we
> need to ensure its absolute correctness, and take
> performance into consideration. If you can come up
>  with a closed-loop solution based on 'startMessageId',
> I support you. If it can't take into account performance
> and correctness, I think we will make a combination of
> our two solutions. You are responsible for ensuring that
> a certain degree of messages are not re-delivered, which
>  reduces the overhead caused by the repeated delivery
> of many messages. My design is responsible for
> the final consistency.
>
> Thanks,
> Bo
>
> Michael Marshall <mm...@apache.org> 于2023年3月22日周三 14:22写道:
> >
> > Because we already send the `startMessageId`, there is a chance where
> > we might not even need to update the protocol for the
> > CommandSubscribe. In light of that, I quickly put together a PR
> > showing how that field might be used to inform the broker where to
> > start the read position for the cursor.
> >
> > https://github.com/apache/pulsar/pull/19892
> >
> > The PR is not complete, but it does convey the general idea. I wrote
> > additional details in the draft's description.
> >
> > Thanks,
> > Michael
> >
> > On Tue, Mar 21, 2023 at 11:31 PM Michael Marshall <mm...@apache.org> wrote:
> > >
> > > I am not following your objections to the protocol solution. It might
> > > be more productive if I provided a draft PR with a sample
> > > implementation. I'm not sure that I'll have time, but I'll try to put
> > > something together this week.
> > >
> > > > At least it will simplify the process of using cumulative ack with the
> > > > transaction.
> > >
> > > Is this the underlying motivation for the PIP?
> > >
> > > From my perspective, the PIP is seeking to decrease duplicate messages
> > > experienced due to disconnections from the broker.
> > >
> > > > The problem of the resetting cursor can be optimized in the future
> > >
> > > Why should we push off solving this problem? It seems fundamental to
> > > this PIP and should not be ignored. At the very least, I think we need
> > > to have an idea of what the future solution would be before we defer
> > > its implementation.
> > >
> > > Thanks,
> > > Michael
> > >
> > >
> > > On Tue, Mar 21, 2023 at 10:52 PM 丛搏 <co...@gmail.com> wrote:
> > > >
> > > > Hi, Michael
> > > > > In this case, the consumer does not have the source of truth for the
> > > > > readPosition. It would leave the new protocol field for `readPosition`
> > > > > empty and the broker would use its source of truth for the read
> > > > > position.
> > > > application has received all the messages by application thread. we also need a
> > > > correct `startPosition`, right? but in your way, we will think about
> > > > the consumer
> > > > hasn't received any messages.
> > > >
> > > > >
> > > > > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > > > > > same logic? it's a bad code.
> > > > >
> > > > > We don't need to synchronize this code here because the logic will
> > > > > come after the consumer has been disconnected from broker a and before
> > > > > it is connected to broker b.
> > > > The application takes a message from the queue then reconnect,
> > > > the SubCommond can use the right startPostion? example:
> > > > 1. application receives one message with `MessageId = 1`
> > > > 2. consumer reconnect discovers the queue is empty, and the
> > > > lastDequeMessageId doesn't change.
> > > > 3. consumer sends a subcommand with MessageId.earliest, the `MessageId = 1`
> > > > will redeliver from broker to client consumer, right?
> > > >
> > > > As we can see in the example, the application also can receive
> > > > `MessageId = 1`, right?
> > > > > We would not need to lock here because we do not enqueue new messages
> > > > > after we've been disconnected from the broker and before we've sent
> > > > > CommandSubscribe.
> > > > we can see the code [0], the thread has changed.
> > > > Where do we guarantee that no new messages will come in?
> > > >
> > > > >
> > > > > Ultimately, I think a protocol solution will yield better results,
> > > > > especially since we'll want to implement this feature in the other
> > > > > client languages.
> > > > The problem of the resetting cursor can be optimized in the future,
> > > > but can you ensure the
> > > > correctness of all the cases I mentioned above? IMO, if we use my
> > > > design, client change,
> > > > we don't need the broker to make any changes. its simple and it's easy
> > > > to implement.
> > > > I can make sure it's completely correct, I can make sure it's
> > > > completely correct. In your design,
> > > > I currently do not see a closed-loop implementation that can achieve
> > > > at least in the java client.
> > > >
> > > > Thanks,
> > > > Bo
> > > > >
> > > > > Thanks,
> > > > > Michael
> > > > >
> > > > > On Tue, Mar 21, 2023 at 9:29 PM 丛搏 <co...@gmail.com> wrote:
> > > > > >
> > > > > > Hi, Michael:
> > > > > >
> > > > > > Michael Marshall <mm...@apache.org> 于2023年3月21日周二 23:17写道:
> > > > > >
> > > > > > >
> > > > > > > One more point. Instead of keeping track of the latest message seen by
> > > > > > > the application, the logic in my solution would actually just check
> > > > > > > the last message in the `incomingMessages` queue (as in the most
> > > > > > > recently added), and use that as the read position in the subscribe
> > > > > > > command. If we made this change, we would have to change this code [0]
> > > > > > > to not drop the `incomingMessages` queue.
> > > > > >
> > > > > > case 1:
> > > > > > What we define the message that the application has seen?
> > > > > > I think it is the[0], when the `incomingMessages` queue is empty,
> > > > > > how do we get the correct `startPosition`?
> > > > > > What I think we should lock the receive logic in [1]
> > > > > > ```
> > > > > > synchronized (this) {
> > > > > >     message = incomingMessages.take();
> > > > > >     messageProcessed(message);
> > > > > > }
> > > > > > ```
> > > > > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > > > > > same logic? it's a bad code.
> > > > > >
> > > > > > case 2:
> > > > > > If we sub with `startMessageId`, we also should lock any enqueue
> > > > > > logic, like [2] and
> > > > > > check to consumer's current state
> > > > > > ```
> > > > > > synchronized (this) {
> > > > > >     if (consumer.isConnected) {
> > > > > >         if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
> > > > > >             // After we have enqueued the messages on
> > > > > > `incomingMessages` queue, we cannot touch the message
> > > > > >             // instance anymore, since for pooled messages, this
> > > > > > instance was possibly already been released
> > > > > >             // and recycled.
> > > > > >             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
> > > > > >             getMemoryLimitController().ifPresent(limiter ->
> > > > > > limiter.forceReserveMemory(messageSize));
> > > > > >             updateAutoScaleReceiverQueueHint();
> > > > > >         }
> > > > > >     }
> > > > > > }
> > > > > > ```
> > > > > > case 3:
> > > > > > when we subcommand sends to broker with `startMessageId = 1`, then the
> > > > > > broker push message
> > > > > > has not yet entered `incommingQueue`, the application invokes
> > > > > > redeliver. in this way, we don't
> > > > > > filter messages are correct, right?
> > > > > >
> > > > > > These are some cases that I simply thought of, and there must be
> > > > > > others that I haven't thought
> > > > > > of. Are you sure we can handle these problems correctly?
> > > > > >
> > > > > > > The problem of "the consumer doesn't know" seems like something that
> > > > > > > is reasonably within the protocol's responsibilities. In this case, an
> > > > > > > event happens on the broker, and the broker can tell the consumer.
> > > > > >
> > > > > > I don't think a simple change protocol can solve these problems,
> > > > > > We can't promise that every consumer can receive the broker reset
> > > > > > cursor request.
> > > > > > When the consumer reconnects, the broker can't send the reset cursor request to
> > > > > > the client consumers, right? In this case, the consumer is still unaware, right?
> > > > > >
> > > > > >
> > > > > > [0] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
> > > > > > [1] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
> > > > > > [2] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Michael
> > > > > > >
> > > > > > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > > >
> > > > > > > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <mm...@apache.org> wrote:
> > > > > > > >
> > > > > > > > > if we add the new field in CommandSubscribe, we should ensure
> > > > > > > > > the synchronization between consumer reconnection and user
> > > > > > > > > calling receive and redeliverUnack method. it will affect the performance
> > > > > > > > > of receive. expose synchronization to hot paths it not a good idea.
> > > > > > > >
> > > > > > > > I don't think this is a valid objection. I am pretty sure we already
> > > > > > > > synchronize in the relevant places in the consumer to solve the exact
> > > > > > > > race condition you're concerned about: [0] [1].
> > > > > > > >
> > > > > > > > My proposed operation is to keep track of the latest message id that
> > > > > > > > the application has seen, and then tell the broker that id when
> > > > > > > > sending the Subscribe command. We already do similar logic here [2]
> > > > > > > > [3], but instead of getting the first message id the consumer hasn't
> > > > > > > > seen, we'll get the latest message id seen.
> > > > > > > >
> > > > > > > > Regarding performance, the PIP doesn't touch on how it will filter out
> > > > > > > > messages. What is the planned approach? In my understanding, the
> > > > > > > > client will keep track of the latest message id that the application
> > > > > > > > has seen and then will need to compare that message id against every
> > > > > > > > new mess. As such, it seems like telling the broker where to start
> > > > > > > > instead of naively checking a filter on every message would be
> > > > > > > > cheaper.
> > > > > > > >
> > > > > > > > > As described in Compatibility in PIP. Client consumer doesn't know
> > > > > > > > > Pulsar Admin reset cursor.
> > > > > > > >
> > > > > > > > The problem of "the consumer doesn't know" seems like something that
> > > > > > > > is reasonably within the protocol's responsibilities. In this case, an
> > > > > > > > event happens on the broker, and the broker can tell the consumer.
> > > > > > > >
> > > > > > > > > * <p>Consumers should close when the server resets the cursor,
> > > > > > > > > * when the cursor reset success, and then restart. Otherwise,
> > > > > > > > > * the consumer will not receive the history messages.
> > > > > > > >
> > > > > > > > This is introducing a confusing edge case that requires reading a
> > > > > > > > Javadoc in order to understand. That seems risky to me, and I do not
> > > > > > > > think we should add such an edge case. A new protocol message would
> > > > > > > > easily handle it and make it transparent to the application.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Michael
> > > > > > > >
> > > > > > > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> > > > > > > > [1] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> > > > > > > > [2] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > > > > [3] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
> > > > > > > >
> > > > > > > > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> > > > > > > > <yu...@streamnative.io.invalid> wrote:
> > > > > > > > >
> > > > > > > > > +1
> > > > > > > > >
> > > > > > > > > Hi, Bo :
> > > > > > > > >
> > > > > > > > > Thanks for your explanation. That makes sense to me.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Yubiao Feng
> > > > > > > > >
> > > > > > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
> > > > > > > > >
> > > > > > > > > > Hi, pulsar community:
> > > > > > > > > >
> > > > > > > > > > I started a PIP about `Client consumer filter received messages`.
> > > > > > > > > >
> > > > > > > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Bo
> > > > > > > > > >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by 丛搏 <co...@gmail.com>.
Hi, Michael:

I thought about it carefully, and using 'startMessageId'
is indeed a good idea. But it is more complicated, we
need to ensure its absolute correctness, and take
performance into consideration. If you can come up
 with a closed-loop solution based on 'startMessageId',
I support you. If it can't take into account performance
and correctness, I think we will make a combination of
our two solutions. You are responsible for ensuring that
a certain degree of messages are not re-delivered, which
 reduces the overhead caused by the repeated delivery
of many messages. My design is responsible for
the final consistency.

Thanks,
Bo

Michael Marshall <mm...@apache.org> 于2023年3月22日周三 14:22写道:
>
> Because we already send the `startMessageId`, there is a chance where
> we might not even need to update the protocol for the
> CommandSubscribe. In light of that, I quickly put together a PR
> showing how that field might be used to inform the broker where to
> start the read position for the cursor.
>
> https://github.com/apache/pulsar/pull/19892
>
> The PR is not complete, but it does convey the general idea. I wrote
> additional details in the draft's description.
>
> Thanks,
> Michael
>
> On Tue, Mar 21, 2023 at 11:31 PM Michael Marshall <mm...@apache.org> wrote:
> >
> > I am not following your objections to the protocol solution. It might
> > be more productive if I provided a draft PR with a sample
> > implementation. I'm not sure that I'll have time, but I'll try to put
> > something together this week.
> >
> > > At least it will simplify the process of using cumulative ack with the
> > > transaction.
> >
> > Is this the underlying motivation for the PIP?
> >
> > From my perspective, the PIP is seeking to decrease duplicate messages
> > experienced due to disconnections from the broker.
> >
> > > The problem of the resetting cursor can be optimized in the future
> >
> > Why should we push off solving this problem? It seems fundamental to
> > this PIP and should not be ignored. At the very least, I think we need
> > to have an idea of what the future solution would be before we defer
> > its implementation.
> >
> > Thanks,
> > Michael
> >
> >
> > On Tue, Mar 21, 2023 at 10:52 PM 丛搏 <co...@gmail.com> wrote:
> > >
> > > Hi, Michael
> > > > In this case, the consumer does not have the source of truth for the
> > > > readPosition. It would leave the new protocol field for `readPosition`
> > > > empty and the broker would use its source of truth for the read
> > > > position.
> > > application has received all the messages by application thread. we also need a
> > > correct `startPosition`, right? but in your way, we will think about
> > > the consumer
> > > hasn't received any messages.
> > >
> > > >
> > > > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > > > > same logic? it's a bad code.
> > > >
> > > > We don't need to synchronize this code here because the logic will
> > > > come after the consumer has been disconnected from broker a and before
> > > > it is connected to broker b.
> > > The application takes a message from the queue then reconnect,
> > > the SubCommond can use the right startPostion? example:
> > > 1. application receives one message with `MessageId = 1`
> > > 2. consumer reconnect discovers the queue is empty, and the
> > > lastDequeMessageId doesn't change.
> > > 3. consumer sends a subcommand with MessageId.earliest, the `MessageId = 1`
> > > will redeliver from broker to client consumer, right?
> > >
> > > As we can see in the example, the application also can receive
> > > `MessageId = 1`, right?
> > > > We would not need to lock here because we do not enqueue new messages
> > > > after we've been disconnected from the broker and before we've sent
> > > > CommandSubscribe.
> > > we can see the code [0], the thread has changed.
> > > Where do we guarantee that no new messages will come in?
> > >
> > > >
> > > > Ultimately, I think a protocol solution will yield better results,
> > > > especially since we'll want to implement this feature in the other
> > > > client languages.
> > > The problem of the resetting cursor can be optimized in the future,
> > > but can you ensure the
> > > correctness of all the cases I mentioned above? IMO, if we use my
> > > design, client change,
> > > we don't need the broker to make any changes. its simple and it's easy
> > > to implement.
> > > I can make sure it's completely correct, I can make sure it's
> > > completely correct. In your design,
> > > I currently do not see a closed-loop implementation that can achieve
> > > at least in the java client.
> > >
> > > Thanks,
> > > Bo
> > > >
> > > > Thanks,
> > > > Michael
> > > >
> > > > On Tue, Mar 21, 2023 at 9:29 PM 丛搏 <co...@gmail.com> wrote:
> > > > >
> > > > > Hi, Michael:
> > > > >
> > > > > Michael Marshall <mm...@apache.org> 于2023年3月21日周二 23:17写道:
> > > > >
> > > > > >
> > > > > > One more point. Instead of keeping track of the latest message seen by
> > > > > > the application, the logic in my solution would actually just check
> > > > > > the last message in the `incomingMessages` queue (as in the most
> > > > > > recently added), and use that as the read position in the subscribe
> > > > > > command. If we made this change, we would have to change this code [0]
> > > > > > to not drop the `incomingMessages` queue.
> > > > >
> > > > > case 1:
> > > > > What we define the message that the application has seen?
> > > > > I think it is the[0], when the `incomingMessages` queue is empty,
> > > > > how do we get the correct `startPosition`?
> > > > > What I think we should lock the receive logic in [1]
> > > > > ```
> > > > > synchronized (this) {
> > > > >     message = incomingMessages.take();
> > > > >     messageProcessed(message);
> > > > > }
> > > > > ```
> > > > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > > > > same logic? it's a bad code.
> > > > >
> > > > > case 2:
> > > > > If we sub with `startMessageId`, we also should lock any enqueue
> > > > > logic, like [2] and
> > > > > check to consumer's current state
> > > > > ```
> > > > > synchronized (this) {
> > > > >     if (consumer.isConnected) {
> > > > >         if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
> > > > >             // After we have enqueued the messages on
> > > > > `incomingMessages` queue, we cannot touch the message
> > > > >             // instance anymore, since for pooled messages, this
> > > > > instance was possibly already been released
> > > > >             // and recycled.
> > > > >             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
> > > > >             getMemoryLimitController().ifPresent(limiter ->
> > > > > limiter.forceReserveMemory(messageSize));
> > > > >             updateAutoScaleReceiverQueueHint();
> > > > >         }
> > > > >     }
> > > > > }
> > > > > ```
> > > > > case 3:
> > > > > when we subcommand sends to broker with `startMessageId = 1`, then the
> > > > > broker push message
> > > > > has not yet entered `incommingQueue`, the application invokes
> > > > > redeliver. in this way, we don't
> > > > > filter messages are correct, right?
> > > > >
> > > > > These are some cases that I simply thought of, and there must be
> > > > > others that I haven't thought
> > > > > of. Are you sure we can handle these problems correctly?
> > > > >
> > > > > > The problem of "the consumer doesn't know" seems like something that
> > > > > > is reasonably within the protocol's responsibilities. In this case, an
> > > > > > event happens on the broker, and the broker can tell the consumer.
> > > > >
> > > > > I don't think a simple change protocol can solve these problems,
> > > > > We can't promise that every consumer can receive the broker reset
> > > > > cursor request.
> > > > > When the consumer reconnects, the broker can't send the reset cursor request to
> > > > > the client consumers, right? In this case, the consumer is still unaware, right?
> > > > >
> > > > >
> > > > > [0] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
> > > > > [1] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
> > > > > [2] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
> > > > > >
> > > > > > Thanks,
> > > > > > Michael
> > > > > >
> > > > > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > >
> > > > > > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <mm...@apache.org> wrote:
> > > > > > >
> > > > > > > > if we add the new field in CommandSubscribe, we should ensure
> > > > > > > > the synchronization between consumer reconnection and user
> > > > > > > > calling receive and redeliverUnack method. it will affect the performance
> > > > > > > > of receive. expose synchronization to hot paths it not a good idea.
> > > > > > >
> > > > > > > I don't think this is a valid objection. I am pretty sure we already
> > > > > > > synchronize in the relevant places in the consumer to solve the exact
> > > > > > > race condition you're concerned about: [0] [1].
> > > > > > >
> > > > > > > My proposed operation is to keep track of the latest message id that
> > > > > > > the application has seen, and then tell the broker that id when
> > > > > > > sending the Subscribe command. We already do similar logic here [2]
> > > > > > > [3], but instead of getting the first message id the consumer hasn't
> > > > > > > seen, we'll get the latest message id seen.
> > > > > > >
> > > > > > > Regarding performance, the PIP doesn't touch on how it will filter out
> > > > > > > messages. What is the planned approach? In my understanding, the
> > > > > > > client will keep track of the latest message id that the application
> > > > > > > has seen and then will need to compare that message id against every
> > > > > > > new mess. As such, it seems like telling the broker where to start
> > > > > > > instead of naively checking a filter on every message would be
> > > > > > > cheaper.
> > > > > > >
> > > > > > > > As described in Compatibility in PIP. Client consumer doesn't know
> > > > > > > > Pulsar Admin reset cursor.
> > > > > > >
> > > > > > > The problem of "the consumer doesn't know" seems like something that
> > > > > > > is reasonably within the protocol's responsibilities. In this case, an
> > > > > > > event happens on the broker, and the broker can tell the consumer.
> > > > > > >
> > > > > > > > * <p>Consumers should close when the server resets the cursor,
> > > > > > > > * when the cursor reset success, and then restart. Otherwise,
> > > > > > > > * the consumer will not receive the history messages.
> > > > > > >
> > > > > > > This is introducing a confusing edge case that requires reading a
> > > > > > > Javadoc in order to understand. That seems risky to me, and I do not
> > > > > > > think we should add such an edge case. A new protocol message would
> > > > > > > easily handle it and make it transparent to the application.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Michael
> > > > > > >
> > > > > > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> > > > > > > [1] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> > > > > > > [2] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > > > [3] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
> > > > > > >
> > > > > > > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> > > > > > > <yu...@streamnative.io.invalid> wrote:
> > > > > > > >
> > > > > > > > +1
> > > > > > > >
> > > > > > > > Hi, Bo :
> > > > > > > >
> > > > > > > > Thanks for your explanation. That makes sense to me.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Yubiao Feng
> > > > > > > >
> > > > > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
> > > > > > > >
> > > > > > > > > Hi, pulsar community:
> > > > > > > > >
> > > > > > > > > I started a PIP about `Client consumer filter received messages`.
> > > > > > > > >
> > > > > > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Bo
> > > > > > > > >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by Michael Marshall <mm...@apache.org>.
Because we already send the `startMessageId`, there is a chance where
we might not even need to update the protocol for the
CommandSubscribe. In light of that, I quickly put together a PR
showing how that field might be used to inform the broker where to
start the read position for the cursor.

https://github.com/apache/pulsar/pull/19892

The PR is not complete, but it does convey the general idea. I wrote
additional details in the draft's description.

Thanks,
Michael

On Tue, Mar 21, 2023 at 11:31 PM Michael Marshall <mm...@apache.org> wrote:
>
> I am not following your objections to the protocol solution. It might
> be more productive if I provided a draft PR with a sample
> implementation. I'm not sure that I'll have time, but I'll try to put
> something together this week.
>
> > At least it will simplify the process of using cumulative ack with the
> > transaction.
>
> Is this the underlying motivation for the PIP?
>
> From my perspective, the PIP is seeking to decrease duplicate messages
> experienced due to disconnections from the broker.
>
> > The problem of the resetting cursor can be optimized in the future
>
> Why should we push off solving this problem? It seems fundamental to
> this PIP and should not be ignored. At the very least, I think we need
> to have an idea of what the future solution would be before we defer
> its implementation.
>
> Thanks,
> Michael
>
>
> On Tue, Mar 21, 2023 at 10:52 PM 丛搏 <co...@gmail.com> wrote:
> >
> > Hi, Michael
> > > In this case, the consumer does not have the source of truth for the
> > > readPosition. It would leave the new protocol field for `readPosition`
> > > empty and the broker would use its source of truth for the read
> > > position.
> > application has received all the messages by application thread. we also need a
> > correct `startPosition`, right? but in your way, we will think about
> > the consumer
> > hasn't received any messages.
> >
> > >
> > > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > > > same logic? it's a bad code.
> > >
> > > We don't need to synchronize this code here because the logic will
> > > come after the consumer has been disconnected from broker a and before
> > > it is connected to broker b.
> > The application takes a message from the queue then reconnect,
> > the SubCommond can use the right startPostion? example:
> > 1. application receives one message with `MessageId = 1`
> > 2. consumer reconnect discovers the queue is empty, and the
> > lastDequeMessageId doesn't change.
> > 3. consumer sends a subcommand with MessageId.earliest, the `MessageId = 1`
> > will redeliver from broker to client consumer, right?
> >
> > As we can see in the example, the application also can receive
> > `MessageId = 1`, right?
> > > We would not need to lock here because we do not enqueue new messages
> > > after we've been disconnected from the broker and before we've sent
> > > CommandSubscribe.
> > we can see the code [0], the thread has changed.
> > Where do we guarantee that no new messages will come in?
> >
> > >
> > > Ultimately, I think a protocol solution will yield better results,
> > > especially since we'll want to implement this feature in the other
> > > client languages.
> > The problem of the resetting cursor can be optimized in the future,
> > but can you ensure the
> > correctness of all the cases I mentioned above? IMO, if we use my
> > design, client change,
> > we don't need the broker to make any changes. its simple and it's easy
> > to implement.
> > I can make sure it's completely correct, I can make sure it's
> > completely correct. In your design,
> > I currently do not see a closed-loop implementation that can achieve
> > at least in the java client.
> >
> > Thanks,
> > Bo
> > >
> > > Thanks,
> > > Michael
> > >
> > > On Tue, Mar 21, 2023 at 9:29 PM 丛搏 <co...@gmail.com> wrote:
> > > >
> > > > Hi, Michael:
> > > >
> > > > Michael Marshall <mm...@apache.org> 于2023年3月21日周二 23:17写道:
> > > >
> > > > >
> > > > > One more point. Instead of keeping track of the latest message seen by
> > > > > the application, the logic in my solution would actually just check
> > > > > the last message in the `incomingMessages` queue (as in the most
> > > > > recently added), and use that as the read position in the subscribe
> > > > > command. If we made this change, we would have to change this code [0]
> > > > > to not drop the `incomingMessages` queue.
> > > >
> > > > case 1:
> > > > What we define the message that the application has seen?
> > > > I think it is the[0], when the `incomingMessages` queue is empty,
> > > > how do we get the correct `startPosition`?
> > > > What I think we should lock the receive logic in [1]
> > > > ```
> > > > synchronized (this) {
> > > >     message = incomingMessages.take();
> > > >     messageProcessed(message);
> > > > }
> > > > ```
> > > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > > > same logic? it's a bad code.
> > > >
> > > > case 2:
> > > > If we sub with `startMessageId`, we also should lock any enqueue
> > > > logic, like [2] and
> > > > check to consumer's current state
> > > > ```
> > > > synchronized (this) {
> > > >     if (consumer.isConnected) {
> > > >         if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
> > > >             // After we have enqueued the messages on
> > > > `incomingMessages` queue, we cannot touch the message
> > > >             // instance anymore, since for pooled messages, this
> > > > instance was possibly already been released
> > > >             // and recycled.
> > > >             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
> > > >             getMemoryLimitController().ifPresent(limiter ->
> > > > limiter.forceReserveMemory(messageSize));
> > > >             updateAutoScaleReceiverQueueHint();
> > > >         }
> > > >     }
> > > > }
> > > > ```
> > > > case 3:
> > > > when we subcommand sends to broker with `startMessageId = 1`, then the
> > > > broker push message
> > > > has not yet entered `incommingQueue`, the application invokes
> > > > redeliver. in this way, we don't
> > > > filter messages are correct, right?
> > > >
> > > > These are some cases that I simply thought of, and there must be
> > > > others that I haven't thought
> > > > of. Are you sure we can handle these problems correctly?
> > > >
> > > > > The problem of "the consumer doesn't know" seems like something that
> > > > > is reasonably within the protocol's responsibilities. In this case, an
> > > > > event happens on the broker, and the broker can tell the consumer.
> > > >
> > > > I don't think a simple change protocol can solve these problems,
> > > > We can't promise that every consumer can receive the broker reset
> > > > cursor request.
> > > > When the consumer reconnects, the broker can't send the reset cursor request to
> > > > the client consumers, right? In this case, the consumer is still unaware, right?
> > > >
> > > >
> > > > [0] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
> > > > [1] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
> > > > [2] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
> > > > >
> > > > > Thanks,
> > > > > Michael
> > > > >
> > > > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > >
> > > > > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <mm...@apache.org> wrote:
> > > > > >
> > > > > > > if we add the new field in CommandSubscribe, we should ensure
> > > > > > > the synchronization between consumer reconnection and user
> > > > > > > calling receive and redeliverUnack method. it will affect the performance
> > > > > > > of receive. expose synchronization to hot paths it not a good idea.
> > > > > >
> > > > > > I don't think this is a valid objection. I am pretty sure we already
> > > > > > synchronize in the relevant places in the consumer to solve the exact
> > > > > > race condition you're concerned about: [0] [1].
> > > > > >
> > > > > > My proposed operation is to keep track of the latest message id that
> > > > > > the application has seen, and then tell the broker that id when
> > > > > > sending the Subscribe command. We already do similar logic here [2]
> > > > > > [3], but instead of getting the first message id the consumer hasn't
> > > > > > seen, we'll get the latest message id seen.
> > > > > >
> > > > > > Regarding performance, the PIP doesn't touch on how it will filter out
> > > > > > messages. What is the planned approach? In my understanding, the
> > > > > > client will keep track of the latest message id that the application
> > > > > > has seen and then will need to compare that message id against every
> > > > > > new mess. As such, it seems like telling the broker where to start
> > > > > > instead of naively checking a filter on every message would be
> > > > > > cheaper.
> > > > > >
> > > > > > > As described in Compatibility in PIP. Client consumer doesn't know
> > > > > > > Pulsar Admin reset cursor.
> > > > > >
> > > > > > The problem of "the consumer doesn't know" seems like something that
> > > > > > is reasonably within the protocol's responsibilities. In this case, an
> > > > > > event happens on the broker, and the broker can tell the consumer.
> > > > > >
> > > > > > > * <p>Consumers should close when the server resets the cursor,
> > > > > > > * when the cursor reset success, and then restart. Otherwise,
> > > > > > > * the consumer will not receive the history messages.
> > > > > >
> > > > > > This is introducing a confusing edge case that requires reading a
> > > > > > Javadoc in order to understand. That seems risky to me, and I do not
> > > > > > think we should add such an edge case. A new protocol message would
> > > > > > easily handle it and make it transparent to the application.
> > > > > >
> > > > > > Thanks,
> > > > > > Michael
> > > > > >
> > > > > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> > > > > > [1] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> > > > > > [2] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > > [3] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
> > > > > >
> > > > > > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> > > > > > <yu...@streamnative.io.invalid> wrote:
> > > > > > >
> > > > > > > +1
> > > > > > >
> > > > > > > Hi, Bo :
> > > > > > >
> > > > > > > Thanks for your explanation. That makes sense to me.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Yubiao Feng
> > > > > > >
> > > > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
> > > > > > >
> > > > > > > > Hi, pulsar community:
> > > > > > > >
> > > > > > > > I started a PIP about `Client consumer filter received messages`.
> > > > > > > >
> > > > > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Bo
> > > > > > > >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by 丛搏 <co...@gmail.com>.
Hi, Yunze :

>
> > At least it will simplify the process of using cumulative ack with the transaction.
>
> So the transaction use cases assume the cursor will never be reset by
> other applications? We cannot ignore the impact brought by this
> proposal that resetting the cursor could have an unexpected effect.

I think the problem of reset cursor and this discussion are two problems,
even if there is no such discussion. There is still a problem
with reset cursor

>
> If the transaction use case is the underlying motivation, you should
> not add the configuration for regular use. Instead, you should explain
> how this feature could simplify the transaction use case, rather than
> pretending to solve a problem for regular scenarios.
>
> And I agree with Michael that the problem of the resetting cursor
> should not be ignored. I've seen many times that someone said "let's
> fix this issue later" and he never went back to the issue.

Transaction is just one aspect. What we want to solve is the problem
that messages cannot be delivered repeatedly, not just transaction
needs.
>
> I'm also curious about whether adding the position in
> `CommandSubscribe` could solve the problem with resetting the cursor.
> I'm wondering if the broker can send the position to all connected
> consumers when a consumer seeks a position or an admin resets the
> cursor. Then consumers can update the internal received position that
> is added in this proposal.
This is difficult to solve. It may require an epoch to be added when
the message is pushed. Simply notifying the client cannot solve the
problem that the consumer has received the message of the application.

Thanks,
Bo
>
> Thanks,
> Yunze
>
> On Fri, Mar 24, 2023 at 10:16 AM PengHui Li <pe...@apache.org> wrote:
> >
> > Hi Bo,
> >
> > > Only support Consumer#redeliverUnacknowledgedMessages()
> > >
> > > If we redeliver individual messages, they will be filtered. Because we
> > can't clear the record latest message in the consumer when redelivering
> > individual messages. It will make this config unclear, and if every
> > redeliver method changes, it will bring a lot of redundant code, which is >
> > difficult to maintain. If there is a need in the future, just support it.
> >
> > I think it's not correct, right? If we redeliver individual messages,
> > we should not filter out any messages.
> >
> > And it also will be an issue for redeliverUnacknowledgedMessages()?
> > The application received messages from the internal queue of the consumer,
> > but the message processing failed due to some temporary failures.
> > So, they want to redeliver all the received messages and try again.
> > In this case, we should not filter the messages.
> >
> > And for a failover subscription. The active consumer might be changed after
> > the disconnection. We can't ensure the consumer will not receive
> > duplicate messages, right? If yes, we should mention it in the proposal.
> >
> > Thanks,
> > Penghui
> >
> > On Wed, Mar 22, 2023 at 3:28 PM 丛搏 <co...@gmail.com> wrote:
> >
> > > Hi, Michael:
> > >
> > > >
> > > > Is this the underlying motivation for the PIP?
> > > >
> > > > From my perspective, the PIP is seeking to decrease duplicate messages
> > > > experienced due to disconnections from the broker.
> > > This PIP is not aimed at reducing message duplication but at
> > > completely preventing it.
> > > Therefore we have to consider all edge cases, including redeliver,
> > > reconnection, and receive.
> > > We must fully guarantee that the message will not be received repeatedly.
> > > >
> > > > > The problem of the resetting cursor can be optimized in the future
> > > >
> > > > Why should we push off solving this problem? It seems fundamental to
> > > > this PIP and should not be ignored. At the very least, I think we need
> > > > to have an idea of what the future solution would be before we defer
> > > > its implementation.
> > > the reset cursor is difficult to guarantee correctness. when we use
> > > cumulative ack,
> > > We may generate some errors, after reset cursor then client consumer
> > > cumulative ack,
> > > we will lose some messages.
> > > Here are more edge cases, I just want to make the problem simple,
> > > at least under the transaction I think it is very good, the current
> > > design will not make mistakes, and it will not affect your current
> > > thinking.
> > > You can do what you want, I can do what I want.
> > >
> > > Thanks,
> > > Bo
> > > >
> > > > Thanks,
> > > > Michael
> > > >
> > > >
> > > > On Tue, Mar 21, 2023 at 10:52 PM 丛搏 <co...@gmail.com> wrote:
> > > > >
> > > > > Hi, Michael
> > > > > > In this case, the consumer does not have the source of truth for the
> > > > > > readPosition. It would leave the new protocol field for
> > > `readPosition`
> > > > > > empty and the broker would use its source of truth for the read
> > > > > > position.
> > > > > application has received all the messages by application thread. we
> > > also need a
> > > > > correct `startPosition`, right? but in your way, we will think about
> > > > > the consumer
> > > > > hasn't received any messages.
> > > > >
> > > > > >
> > > > > > > why do we need to invoke `BlockingQueue.take` and `synchronized`
> > > in the
> > > > > > > same logic? it's a bad code.
> > > > > >
> > > > > > We don't need to synchronize this code here because the logic will
> > > > > > come after the consumer has been disconnected from broker a and
> > > before
> > > > > > it is connected to broker b.
> > > > > The application takes a message from the queue then reconnect,
> > > > > the SubCommond can use the right startPostion? example:
> > > > > 1. application receives one message with `MessageId = 1`
> > > > > 2. consumer reconnect discovers the queue is empty, and the
> > > > > lastDequeMessageId doesn't change.
> > > > > 3. consumer sends a subcommand with MessageId.earliest, the `MessageId
> > > = 1`
> > > > > will redeliver from broker to client consumer, right?
> > > > >
> > > > > As we can see in the example, the application also can receive
> > > > > `MessageId = 1`, right?
> > > > > > We would not need to lock here because we do not enqueue new messages
> > > > > > after we've been disconnected from the broker and before we've sent
> > > > > > CommandSubscribe.
> > > > > we can see the code [0], the thread has changed.
> > > > > Where do we guarantee that no new messages will come in?
> > > > >
> > > > > >
> > > > > > Ultimately, I think a protocol solution will yield better results,
> > > > > > especially since we'll want to implement this feature in the other
> > > > > > client languages.
> > > > > The problem of the resetting cursor can be optimized in the future,
> > > > > but can you ensure the
> > > > > correctness of all the cases I mentioned above? IMO, if we use my
> > > > > design, client change,
> > > > > we don't need the broker to make any changes. its simple and it's easy
> > > > > to implement.
> > > > > I can make sure it's completely correct, I can make sure it's
> > > > > completely correct. In your design,
> > > > > I currently do not see a closed-loop implementation that can achieve
> > > > > at least in the java client.
> > > > >
> > > > > Thanks,
> > > > > Bo
> > > > > >
> > > > > > Thanks,
> > > > > > Michael
> > > > > >
> > > > > > On Tue, Mar 21, 2023 at 9:29 PM 丛搏 <co...@gmail.com> wrote:
> > > > > > >
> > > > > > > Hi, Michael:
> > > > > > >
> > > > > > > Michael Marshall <mm...@apache.org> 于2023年3月21日周二 23:17写道:
> > > > > > >
> > > > > > > >
> > > > > > > > One more point. Instead of keeping track of the latest message
> > > seen by
> > > > > > > > the application, the logic in my solution would actually just
> > > check
> > > > > > > > the last message in the `incomingMessages` queue (as in the most
> > > > > > > > recently added), and use that as the read position in the
> > > subscribe
> > > > > > > > command. If we made this change, we would have to change this
> > > code [0]
> > > > > > > > to not drop the `incomingMessages` queue.
> > > > > > >
> > > > > > > case 1:
> > > > > > > What we define the message that the application has seen?
> > > > > > > I think it is the[0], when the `incomingMessages` queue is empty,
> > > > > > > how do we get the correct `startPosition`?
> > > > > > > What I think we should lock the receive logic in [1]
> > > > > > > ```
> > > > > > > synchronized (this) {
> > > > > > >     message = incomingMessages.take();
> > > > > > >     messageProcessed(message);
> > > > > > > }
> > > > > > > ```
> > > > > > > why do we need to invoke `BlockingQueue.take` and `synchronized`
> > > in the
> > > > > > > same logic? it's a bad code.
> > > > > > >
> > > > > > > case 2:
> > > > > > > If we sub with `startMessageId`, we also should lock any enqueue
> > > > > > > logic, like [2] and
> > > > > > > check to consumer's current state
> > > > > > > ```
> > > > > > > synchronized (this) {
> > > > > > >     if (consumer.isConnected) {
> > > > > > >         if (canEnqueueMessage(message) &&
> > > incomingMessages.offer(message)) {
> > > > > > >             // After we have enqueued the messages on
> > > > > > > `incomingMessages` queue, we cannot touch the message
> > > > > > >             // instance anymore, since for pooled messages, this
> > > > > > > instance was possibly already been released
> > > > > > >             // and recycled.
> > > > > > >             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
> > > messageSize);
> > > > > > >             getMemoryLimitController().ifPresent(limiter ->
> > > > > > > limiter.forceReserveMemory(messageSize));
> > > > > > >             updateAutoScaleReceiverQueueHint();
> > > > > > >         }
> > > > > > >     }
> > > > > > > }
> > > > > > > ```
> > > > > > > case 3:
> > > > > > > when we subcommand sends to broker with `startMessageId = 1`, then
> > > the
> > > > > > > broker push message
> > > > > > > has not yet entered `incommingQueue`, the application invokes
> > > > > > > redeliver. in this way, we don't
> > > > > > > filter messages are correct, right?
> > > > > > >
> > > > > > > These are some cases that I simply thought of, and there must be
> > > > > > > others that I haven't thought
> > > > > > > of. Are you sure we can handle these problems correctly?
> > > > > > >
> > > > > > > > The problem of "the consumer doesn't know" seems like something
> > > that
> > > > > > > > is reasonably within the protocol's responsibilities. In this
> > > case, an
> > > > > > > > event happens on the broker, and the broker can tell the
> > > consumer.
> > > > > > >
> > > > > > > I don't think a simple change protocol can solve these problems,
> > > > > > > We can't promise that every consumer can receive the broker reset
> > > > > > > cursor request.
> > > > > > > When the consumer reconnects, the broker can't send the reset
> > > cursor request to
> > > > > > > the client consumers, right? In this case, the consumer is still
> > > unaware, right?
> > > > > > >
> > > > > > >
> > > > > > > [0]
> > > https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
> > > > > > > [1]
> > > https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
> > > > > > > [2]
> > > https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Michael
> > > > > > > >
> > > > > > > > [0]
> > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > > > >
> > > > > > > > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <
> > > mmarshall@apache.org> wrote:
> > > > > > > > >
> > > > > > > > > > if we add the new field in CommandSubscribe, we should ensure
> > > > > > > > > > the synchronization between consumer reconnection and user
> > > > > > > > > > calling receive and redeliverUnack method. it will affect
> > > the performance
> > > > > > > > > > of receive. expose synchronization to hot paths it not a
> > > good idea.
> > > > > > > > >
> > > > > > > > > I don't think this is a valid objection. I am pretty sure we
> > > already
> > > > > > > > > synchronize in the relevant places in the consumer to solve
> > > the exact
> > > > > > > > > race condition you're concerned about: [0] [1].
> > > > > > > > >
> > > > > > > > > My proposed operation is to keep track of the latest message
> > > id that
> > > > > > > > > the application has seen, and then tell the broker that id when
> > > > > > > > > sending the Subscribe command. We already do similar logic
> > > here [2]
> > > > > > > > > [3], but instead of getting the first message id the consumer
> > > hasn't
> > > > > > > > > seen, we'll get the latest message id seen.
> > > > > > > > >
> > > > > > > > > Regarding performance, the PIP doesn't touch on how it will
> > > filter out
> > > > > > > > > messages. What is the planned approach? In my understanding,
> > > the
> > > > > > > > > client will keep track of the latest message id that the
> > > application
> > > > > > > > > has seen and then will need to compare that message id against
> > > every
> > > > > > > > > new mess. As such, it seems like telling the broker where to
> > > start
> > > > > > > > > instead of naively checking a filter on every message would be
> > > > > > > > > cheaper.
> > > > > > > > >
> > > > > > > > > > As described in Compatibility in PIP. Client consumer
> > > doesn't know
> > > > > > > > > > Pulsar Admin reset cursor.
> > > > > > > > >
> > > > > > > > > The problem of "the consumer doesn't know" seems like
> > > something that
> > > > > > > > > is reasonably within the protocol's responsibilities. In this
> > > case, an
> > > > > > > > > event happens on the broker, and the broker can tell the
> > > consumer.
> > > > > > > > >
> > > > > > > > > > * <p>Consumers should close when the server resets the
> > > cursor,
> > > > > > > > > > * when the cursor reset success, and then restart. Otherwise,
> > > > > > > > > > * the consumer will not receive the history messages.
> > > > > > > > >
> > > > > > > > > This is introducing a confusing edge case that requires
> > > reading a
> > > > > > > > > Javadoc in order to understand. That seems risky to me, and I
> > > do not
> > > > > > > > > think we should add such an edge case. A new protocol message
> > > would
> > > > > > > > > easily handle it and make it transparent to the application.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Michael
> > > > > > > > >
> > > > > > > > > [0]
> > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> > > > > > > > > [1]
> > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> > > > > > > > > [2]
> > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > > > > > [3]
> > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
> > > > > > > > >
> > > > > > > > > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> > > > > > > > > <yu...@streamnative.io.invalid> wrote:
> > > > > > > > > >
> > > > > > > > > > +1
> > > > > > > > > >
> > > > > > > > > > Hi, Bo :
> > > > > > > > > >
> > > > > > > > > > Thanks for your explanation. That makes sense to me.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Yubiao Feng
> > > > > > > > > >
> > > > > > > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com>
> > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi, pulsar community:
> > > > > > > > > > >
> > > > > > > > > > > I started a PIP about `Client consumer filter received
> > > messages`.
> > > > > > > > > > >
> > > > > > > > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Bo
> > > > > > > > > > >
> > >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by Yunze Xu <yz...@streamnative.io.INVALID>.
> At least it will simplify the process of using cumulative ack with the transaction.

So the transaction use cases assume the cursor will never be reset by
other applications? We cannot ignore the impact brought by this
proposal that resetting the cursor could have an unexpected effect.

If the transaction use case is the underlying motivation, you should
not add the configuration for regular use. Instead, you should explain
how this feature could simplify the transaction use case, rather than
pretending to solve a problem for regular scenarios.

And I agree with Michael that the problem of the resetting cursor
should not be ignored. I've seen many times that someone said "let's
fix this issue later" and he never went back to the issue.

I'm also curious about whether adding the position in
`CommandSubscribe` could solve the problem with resetting the cursor.
I'm wondering if the broker can send the position to all connected
consumers when a consumer seeks a position or an admin resets the
cursor. Then consumers can update the internal received position that
is added in this proposal.

Thanks,
Yunze

On Fri, Mar 24, 2023 at 10:16 AM PengHui Li <pe...@apache.org> wrote:
>
> Hi Bo,
>
> > Only support Consumer#redeliverUnacknowledgedMessages()
> >
> > If we redeliver individual messages, they will be filtered. Because we
> can't clear the record latest message in the consumer when redelivering
> individual messages. It will make this config unclear, and if every
> redeliver method changes, it will bring a lot of redundant code, which is >
> difficult to maintain. If there is a need in the future, just support it.
>
> I think it's not correct, right? If we redeliver individual messages,
> we should not filter out any messages.
>
> And it also will be an issue for redeliverUnacknowledgedMessages()?
> The application received messages from the internal queue of the consumer,
> but the message processing failed due to some temporary failures.
> So, they want to redeliver all the received messages and try again.
> In this case, we should not filter the messages.
>
> And for a failover subscription. The active consumer might be changed after
> the disconnection. We can't ensure the consumer will not receive
> duplicate messages, right? If yes, we should mention it in the proposal.
>
> Thanks,
> Penghui
>
> On Wed, Mar 22, 2023 at 3:28 PM 丛搏 <co...@gmail.com> wrote:
>
> > Hi, Michael:
> >
> > >
> > > Is this the underlying motivation for the PIP?
> > >
> > > From my perspective, the PIP is seeking to decrease duplicate messages
> > > experienced due to disconnections from the broker.
> > This PIP is not aimed at reducing message duplication but at
> > completely preventing it.
> > Therefore we have to consider all edge cases, including redeliver,
> > reconnection, and receive.
> > We must fully guarantee that the message will not be received repeatedly.
> > >
> > > > The problem of the resetting cursor can be optimized in the future
> > >
> > > Why should we push off solving this problem? It seems fundamental to
> > > this PIP and should not be ignored. At the very least, I think we need
> > > to have an idea of what the future solution would be before we defer
> > > its implementation.
> > the reset cursor is difficult to guarantee correctness. when we use
> > cumulative ack,
> > We may generate some errors, after reset cursor then client consumer
> > cumulative ack,
> > we will lose some messages.
> > Here are more edge cases, I just want to make the problem simple,
> > at least under the transaction I think it is very good, the current
> > design will not make mistakes, and it will not affect your current
> > thinking.
> > You can do what you want, I can do what I want.
> >
> > Thanks,
> > Bo
> > >
> > > Thanks,
> > > Michael
> > >
> > >
> > > On Tue, Mar 21, 2023 at 10:52 PM 丛搏 <co...@gmail.com> wrote:
> > > >
> > > > Hi, Michael
> > > > > In this case, the consumer does not have the source of truth for the
> > > > > readPosition. It would leave the new protocol field for
> > `readPosition`
> > > > > empty and the broker would use its source of truth for the read
> > > > > position.
> > > > application has received all the messages by application thread. we
> > also need a
> > > > correct `startPosition`, right? but in your way, we will think about
> > > > the consumer
> > > > hasn't received any messages.
> > > >
> > > > >
> > > > > > why do we need to invoke `BlockingQueue.take` and `synchronized`
> > in the
> > > > > > same logic? it's a bad code.
> > > > >
> > > > > We don't need to synchronize this code here because the logic will
> > > > > come after the consumer has been disconnected from broker a and
> > before
> > > > > it is connected to broker b.
> > > > The application takes a message from the queue then reconnect,
> > > > the SubCommond can use the right startPostion? example:
> > > > 1. application receives one message with `MessageId = 1`
> > > > 2. consumer reconnect discovers the queue is empty, and the
> > > > lastDequeMessageId doesn't change.
> > > > 3. consumer sends a subcommand with MessageId.earliest, the `MessageId
> > = 1`
> > > > will redeliver from broker to client consumer, right?
> > > >
> > > > As we can see in the example, the application also can receive
> > > > `MessageId = 1`, right?
> > > > > We would not need to lock here because we do not enqueue new messages
> > > > > after we've been disconnected from the broker and before we've sent
> > > > > CommandSubscribe.
> > > > we can see the code [0], the thread has changed.
> > > > Where do we guarantee that no new messages will come in?
> > > >
> > > > >
> > > > > Ultimately, I think a protocol solution will yield better results,
> > > > > especially since we'll want to implement this feature in the other
> > > > > client languages.
> > > > The problem of the resetting cursor can be optimized in the future,
> > > > but can you ensure the
> > > > correctness of all the cases I mentioned above? IMO, if we use my
> > > > design, client change,
> > > > we don't need the broker to make any changes. its simple and it's easy
> > > > to implement.
> > > > I can make sure it's completely correct, I can make sure it's
> > > > completely correct. In your design,
> > > > I currently do not see a closed-loop implementation that can achieve
> > > > at least in the java client.
> > > >
> > > > Thanks,
> > > > Bo
> > > > >
> > > > > Thanks,
> > > > > Michael
> > > > >
> > > > > On Tue, Mar 21, 2023 at 9:29 PM 丛搏 <co...@gmail.com> wrote:
> > > > > >
> > > > > > Hi, Michael:
> > > > > >
> > > > > > Michael Marshall <mm...@apache.org> 于2023年3月21日周二 23:17写道:
> > > > > >
> > > > > > >
> > > > > > > One more point. Instead of keeping track of the latest message
> > seen by
> > > > > > > the application, the logic in my solution would actually just
> > check
> > > > > > > the last message in the `incomingMessages` queue (as in the most
> > > > > > > recently added), and use that as the read position in the
> > subscribe
> > > > > > > command. If we made this change, we would have to change this
> > code [0]
> > > > > > > to not drop the `incomingMessages` queue.
> > > > > >
> > > > > > case 1:
> > > > > > What we define the message that the application has seen?
> > > > > > I think it is the[0], when the `incomingMessages` queue is empty,
> > > > > > how do we get the correct `startPosition`?
> > > > > > What I think we should lock the receive logic in [1]
> > > > > > ```
> > > > > > synchronized (this) {
> > > > > >     message = incomingMessages.take();
> > > > > >     messageProcessed(message);
> > > > > > }
> > > > > > ```
> > > > > > why do we need to invoke `BlockingQueue.take` and `synchronized`
> > in the
> > > > > > same logic? it's a bad code.
> > > > > >
> > > > > > case 2:
> > > > > > If we sub with `startMessageId`, we also should lock any enqueue
> > > > > > logic, like [2] and
> > > > > > check to consumer's current state
> > > > > > ```
> > > > > > synchronized (this) {
> > > > > >     if (consumer.isConnected) {
> > > > > >         if (canEnqueueMessage(message) &&
> > incomingMessages.offer(message)) {
> > > > > >             // After we have enqueued the messages on
> > > > > > `incomingMessages` queue, we cannot touch the message
> > > > > >             // instance anymore, since for pooled messages, this
> > > > > > instance was possibly already been released
> > > > > >             // and recycled.
> > > > > >             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
> > messageSize);
> > > > > >             getMemoryLimitController().ifPresent(limiter ->
> > > > > > limiter.forceReserveMemory(messageSize));
> > > > > >             updateAutoScaleReceiverQueueHint();
> > > > > >         }
> > > > > >     }
> > > > > > }
> > > > > > ```
> > > > > > case 3:
> > > > > > when we subcommand sends to broker with `startMessageId = 1`, then
> > the
> > > > > > broker push message
> > > > > > has not yet entered `incommingQueue`, the application invokes
> > > > > > redeliver. in this way, we don't
> > > > > > filter messages are correct, right?
> > > > > >
> > > > > > These are some cases that I simply thought of, and there must be
> > > > > > others that I haven't thought
> > > > > > of. Are you sure we can handle these problems correctly?
> > > > > >
> > > > > > > The problem of "the consumer doesn't know" seems like something
> > that
> > > > > > > is reasonably within the protocol's responsibilities. In this
> > case, an
> > > > > > > event happens on the broker, and the broker can tell the
> > consumer.
> > > > > >
> > > > > > I don't think a simple change protocol can solve these problems,
> > > > > > We can't promise that every consumer can receive the broker reset
> > > > > > cursor request.
> > > > > > When the consumer reconnects, the broker can't send the reset
> > cursor request to
> > > > > > the client consumers, right? In this case, the consumer is still
> > unaware, right?
> > > > > >
> > > > > >
> > > > > > [0]
> > https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
> > > > > > [1]
> > https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
> > > > > > [2]
> > https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Michael
> > > > > > >
> > > > > > > [0]
> > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > > >
> > > > > > > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <
> > mmarshall@apache.org> wrote:
> > > > > > > >
> > > > > > > > > if we add the new field in CommandSubscribe, we should ensure
> > > > > > > > > the synchronization between consumer reconnection and user
> > > > > > > > > calling receive and redeliverUnack method. it will affect
> > the performance
> > > > > > > > > of receive. expose synchronization to hot paths it not a
> > good idea.
> > > > > > > >
> > > > > > > > I don't think this is a valid objection. I am pretty sure we
> > already
> > > > > > > > synchronize in the relevant places in the consumer to solve
> > the exact
> > > > > > > > race condition you're concerned about: [0] [1].
> > > > > > > >
> > > > > > > > My proposed operation is to keep track of the latest message
> > id that
> > > > > > > > the application has seen, and then tell the broker that id when
> > > > > > > > sending the Subscribe command. We already do similar logic
> > here [2]
> > > > > > > > [3], but instead of getting the first message id the consumer
> > hasn't
> > > > > > > > seen, we'll get the latest message id seen.
> > > > > > > >
> > > > > > > > Regarding performance, the PIP doesn't touch on how it will
> > filter out
> > > > > > > > messages. What is the planned approach? In my understanding,
> > the
> > > > > > > > client will keep track of the latest message id that the
> > application
> > > > > > > > has seen and then will need to compare that message id against
> > every
> > > > > > > > new mess. As such, it seems like telling the broker where to
> > start
> > > > > > > > instead of naively checking a filter on every message would be
> > > > > > > > cheaper.
> > > > > > > >
> > > > > > > > > As described in Compatibility in PIP. Client consumer
> > doesn't know
> > > > > > > > > Pulsar Admin reset cursor.
> > > > > > > >
> > > > > > > > The problem of "the consumer doesn't know" seems like
> > something that
> > > > > > > > is reasonably within the protocol's responsibilities. In this
> > case, an
> > > > > > > > event happens on the broker, and the broker can tell the
> > consumer.
> > > > > > > >
> > > > > > > > > * <p>Consumers should close when the server resets the
> > cursor,
> > > > > > > > > * when the cursor reset success, and then restart. Otherwise,
> > > > > > > > > * the consumer will not receive the history messages.
> > > > > > > >
> > > > > > > > This is introducing a confusing edge case that requires
> > reading a
> > > > > > > > Javadoc in order to understand. That seems risky to me, and I
> > do not
> > > > > > > > think we should add such an edge case. A new protocol message
> > would
> > > > > > > > easily handle it and make it transparent to the application.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Michael
> > > > > > > >
> > > > > > > > [0]
> > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> > > > > > > > [1]
> > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> > > > > > > > [2]
> > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > > > > [3]
> > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
> > > > > > > >
> > > > > > > > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> > > > > > > > <yu...@streamnative.io.invalid> wrote:
> > > > > > > > >
> > > > > > > > > +1
> > > > > > > > >
> > > > > > > > > Hi, Bo :
> > > > > > > > >
> > > > > > > > > Thanks for your explanation. That makes sense to me.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Yubiao Feng
> > > > > > > > >
> > > > > > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com>
> > wrote:
> > > > > > > > >
> > > > > > > > > > Hi, pulsar community:
> > > > > > > > > >
> > > > > > > > > > I started a PIP about `Client consumer filter received
> > messages`.
> > > > > > > > > >
> > > > > > > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Bo
> > > > > > > > > >
> >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by 丛搏 <co...@gmail.com>.
Hi, PengHui

PengHui Li <pe...@apache.org> 于2023年3月24日周五 10:16写道:
>
> Hi Bo,
>
> > Only support Consumer#redeliverUnacknowledgedMessages()
> >
> > If we redeliver individual messages, they will be filtered. Because we
> can't clear the record latest message in the consumer when redelivering
> individual messages. It will make this config unclear, and if every
> redeliver method changes, it will bring a lot of redundant code, which is >
> difficult to maintain. If there is a need in the future, just support it.
>
> I think it's not correct, right? If we redeliver individual messages,
> we should not filter out any messages.
>
> And it also will be an issue for redeliverUnacknowledgedMessages()?
> The application received messages from the internal queue of the consumer,
> but the message processing failed due to some temporary failures.
> So, they want to redeliver all the received messages and try again.
> In this case, we should not filter the messages.
only redeliverUnacknowledgedMessages() will clear the
lastDequeMessage. so only using redeliverUnacknowledgedMessages() will
not filter the messages. any other individual redeliver will not clear
lastDequeMessage, so the messages also will be filtered. support individual
redeliver filters, It is difficult to implement and does not meet the cumulative
ack scenario.

>
> And for a failover subscription. The active consumer might be changed after
> the disconnection. We can't ensure the consumer will not receive
> duplicate messages, right? If yes, we should mention it in the proposal.

good point!
Yes, I will add it to the proposal.

Thanks,
Bo
>
> Thanks,
> Penghui
>
> On Wed, Mar 22, 2023 at 3:28 PM 丛搏 <co...@gmail.com> wrote:
>
> > Hi, Michael:
> >
> > >
> > > Is this the underlying motivation for the PIP?
> > >
> > > From my perspective, the PIP is seeking to decrease duplicate messages
> > > experienced due to disconnections from the broker.
> > This PIP is not aimed at reducing message duplication but at
> > completely preventing it.
> > Therefore we have to consider all edge cases, including redeliver,
> > reconnection, and receive.
> > We must fully guarantee that the message will not be received repeatedly.
> > >
> > > > The problem of the resetting cursor can be optimized in the future
> > >
> > > Why should we push off solving this problem? It seems fundamental to
> > > this PIP and should not be ignored. At the very least, I think we need
> > > to have an idea of what the future solution would be before we defer
> > > its implementation.
> > the reset cursor is difficult to guarantee correctness. when we use
> > cumulative ack,
> > We may generate some errors, after reset cursor then client consumer
> > cumulative ack,
> > we will lose some messages.
> > Here are more edge cases, I just want to make the problem simple,
> > at least under the transaction I think it is very good, the current
> > design will not make mistakes, and it will not affect your current
> > thinking.
> > You can do what you want, I can do what I want.
> >
> > Thanks,
> > Bo
> > >
> > > Thanks,
> > > Michael
> > >
> > >
> > > On Tue, Mar 21, 2023 at 10:52 PM 丛搏 <co...@gmail.com> wrote:
> > > >
> > > > Hi, Michael
> > > > > In this case, the consumer does not have the source of truth for the
> > > > > readPosition. It would leave the new protocol field for
> > `readPosition`
> > > > > empty and the broker would use its source of truth for the read
> > > > > position.
> > > > application has received all the messages by application thread. we
> > also need a
> > > > correct `startPosition`, right? but in your way, we will think about
> > > > the consumer
> > > > hasn't received any messages.
> > > >
> > > > >
> > > > > > why do we need to invoke `BlockingQueue.take` and `synchronized`
> > in the
> > > > > > same logic? it's a bad code.
> > > > >
> > > > > We don't need to synchronize this code here because the logic will
> > > > > come after the consumer has been disconnected from broker a and
> > before
> > > > > it is connected to broker b.
> > > > The application takes a message from the queue then reconnect,
> > > > the SubCommond can use the right startPostion? example:
> > > > 1. application receives one message with `MessageId = 1`
> > > > 2. consumer reconnect discovers the queue is empty, and the
> > > > lastDequeMessageId doesn't change.
> > > > 3. consumer sends a subcommand with MessageId.earliest, the `MessageId
> > = 1`
> > > > will redeliver from broker to client consumer, right?
> > > >
> > > > As we can see in the example, the application also can receive
> > > > `MessageId = 1`, right?
> > > > > We would not need to lock here because we do not enqueue new messages
> > > > > after we've been disconnected from the broker and before we've sent
> > > > > CommandSubscribe.
> > > > we can see the code [0], the thread has changed.
> > > > Where do we guarantee that no new messages will come in?
> > > >
> > > > >
> > > > > Ultimately, I think a protocol solution will yield better results,
> > > > > especially since we'll want to implement this feature in the other
> > > > > client languages.
> > > > The problem of the resetting cursor can be optimized in the future,
> > > > but can you ensure the
> > > > correctness of all the cases I mentioned above? IMO, if we use my
> > > > design, client change,
> > > > we don't need the broker to make any changes. its simple and it's easy
> > > > to implement.
> > > > I can make sure it's completely correct, I can make sure it's
> > > > completely correct. In your design,
> > > > I currently do not see a closed-loop implementation that can achieve
> > > > at least in the java client.
> > > >
> > > > Thanks,
> > > > Bo
> > > > >
> > > > > Thanks,
> > > > > Michael
> > > > >
> > > > > On Tue, Mar 21, 2023 at 9:29 PM 丛搏 <co...@gmail.com> wrote:
> > > > > >
> > > > > > Hi, Michael:
> > > > > >
> > > > > > Michael Marshall <mm...@apache.org> 于2023年3月21日周二 23:17写道:
> > > > > >
> > > > > > >
> > > > > > > One more point. Instead of keeping track of the latest message
> > seen by
> > > > > > > the application, the logic in my solution would actually just
> > check
> > > > > > > the last message in the `incomingMessages` queue (as in the most
> > > > > > > recently added), and use that as the read position in the
> > subscribe
> > > > > > > command. If we made this change, we would have to change this
> > code [0]
> > > > > > > to not drop the `incomingMessages` queue.
> > > > > >
> > > > > > case 1:
> > > > > > What we define the message that the application has seen?
> > > > > > I think it is the[0], when the `incomingMessages` queue is empty,
> > > > > > how do we get the correct `startPosition`?
> > > > > > What I think we should lock the receive logic in [1]
> > > > > > ```
> > > > > > synchronized (this) {
> > > > > >     message = incomingMessages.take();
> > > > > >     messageProcessed(message);
> > > > > > }
> > > > > > ```
> > > > > > why do we need to invoke `BlockingQueue.take` and `synchronized`
> > in the
> > > > > > same logic? it's a bad code.
> > > > > >
> > > > > > case 2:
> > > > > > If we sub with `startMessageId`, we also should lock any enqueue
> > > > > > logic, like [2] and
> > > > > > check to consumer's current state
> > > > > > ```
> > > > > > synchronized (this) {
> > > > > >     if (consumer.isConnected) {
> > > > > >         if (canEnqueueMessage(message) &&
> > incomingMessages.offer(message)) {
> > > > > >             // After we have enqueued the messages on
> > > > > > `incomingMessages` queue, we cannot touch the message
> > > > > >             // instance anymore, since for pooled messages, this
> > > > > > instance was possibly already been released
> > > > > >             // and recycled.
> > > > > >             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
> > messageSize);
> > > > > >             getMemoryLimitController().ifPresent(limiter ->
> > > > > > limiter.forceReserveMemory(messageSize));
> > > > > >             updateAutoScaleReceiverQueueHint();
> > > > > >         }
> > > > > >     }
> > > > > > }
> > > > > > ```
> > > > > > case 3:
> > > > > > when we subcommand sends to broker with `startMessageId = 1`, then
> > the
> > > > > > broker push message
> > > > > > has not yet entered `incommingQueue`, the application invokes
> > > > > > redeliver. in this way, we don't
> > > > > > filter messages are correct, right?
> > > > > >
> > > > > > These are some cases that I simply thought of, and there must be
> > > > > > others that I haven't thought
> > > > > > of. Are you sure we can handle these problems correctly?
> > > > > >
> > > > > > > The problem of "the consumer doesn't know" seems like something
> > that
> > > > > > > is reasonably within the protocol's responsibilities. In this
> > case, an
> > > > > > > event happens on the broker, and the broker can tell the
> > consumer.
> > > > > >
> > > > > > I don't think a simple change protocol can solve these problems,
> > > > > > We can't promise that every consumer can receive the broker reset
> > > > > > cursor request.
> > > > > > When the consumer reconnects, the broker can't send the reset
> > cursor request to
> > > > > > the client consumers, right? In this case, the consumer is still
> > unaware, right?
> > > > > >
> > > > > >
> > > > > > [0]
> > https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
> > > > > > [1]
> > https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
> > > > > > [2]
> > https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Michael
> > > > > > >
> > > > > > > [0]
> > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > > >
> > > > > > > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <
> > mmarshall@apache.org> wrote:
> > > > > > > >
> > > > > > > > > if we add the new field in CommandSubscribe, we should ensure
> > > > > > > > > the synchronization between consumer reconnection and user
> > > > > > > > > calling receive and redeliverUnack method. it will affect
> > the performance
> > > > > > > > > of receive. expose synchronization to hot paths it not a
> > good idea.
> > > > > > > >
> > > > > > > > I don't think this is a valid objection. I am pretty sure we
> > already
> > > > > > > > synchronize in the relevant places in the consumer to solve
> > the exact
> > > > > > > > race condition you're concerned about: [0] [1].
> > > > > > > >
> > > > > > > > My proposed operation is to keep track of the latest message
> > id that
> > > > > > > > the application has seen, and then tell the broker that id when
> > > > > > > > sending the Subscribe command. We already do similar logic
> > here [2]
> > > > > > > > [3], but instead of getting the first message id the consumer
> > hasn't
> > > > > > > > seen, we'll get the latest message id seen.
> > > > > > > >
> > > > > > > > Regarding performance, the PIP doesn't touch on how it will
> > filter out
> > > > > > > > messages. What is the planned approach? In my understanding,
> > the
> > > > > > > > client will keep track of the latest message id that the
> > application
> > > > > > > > has seen and then will need to compare that message id against
> > every
> > > > > > > > new mess. As such, it seems like telling the broker where to
> > start
> > > > > > > > instead of naively checking a filter on every message would be
> > > > > > > > cheaper.
> > > > > > > >
> > > > > > > > > As described in Compatibility in PIP. Client consumer
> > doesn't know
> > > > > > > > > Pulsar Admin reset cursor.
> > > > > > > >
> > > > > > > > The problem of "the consumer doesn't know" seems like
> > something that
> > > > > > > > is reasonably within the protocol's responsibilities. In this
> > case, an
> > > > > > > > event happens on the broker, and the broker can tell the
> > consumer.
> > > > > > > >
> > > > > > > > > * <p>Consumers should close when the server resets the
> > cursor,
> > > > > > > > > * when the cursor reset success, and then restart. Otherwise,
> > > > > > > > > * the consumer will not receive the history messages.
> > > > > > > >
> > > > > > > > This is introducing a confusing edge case that requires
> > reading a
> > > > > > > > Javadoc in order to understand. That seems risky to me, and I
> > do not
> > > > > > > > think we should add such an edge case. A new protocol message
> > would
> > > > > > > > easily handle it and make it transparent to the application.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Michael
> > > > > > > >
> > > > > > > > [0]
> > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> > > > > > > > [1]
> > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> > > > > > > > [2]
> > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > > > > [3]
> > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
> > > > > > > >
> > > > > > > > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> > > > > > > > <yu...@streamnative.io.invalid> wrote:
> > > > > > > > >
> > > > > > > > > +1
> > > > > > > > >
> > > > > > > > > Hi, Bo :
> > > > > > > > >
> > > > > > > > > Thanks for your explanation. That makes sense to me.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Yubiao Feng
> > > > > > > > >
> > > > > > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com>
> > wrote:
> > > > > > > > >
> > > > > > > > > > Hi, pulsar community:
> > > > > > > > > >
> > > > > > > > > > I started a PIP about `Client consumer filter received
> > messages`.
> > > > > > > > > >
> > > > > > > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Bo
> > > > > > > > > >
> >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by PengHui Li <pe...@apache.org>.
Hi Bo,

> Only support Consumer#redeliverUnacknowledgedMessages()
>
> If we redeliver individual messages, they will be filtered. Because we
can't clear the record latest message in the consumer when redelivering
individual messages. It will make this config unclear, and if every
redeliver method changes, it will bring a lot of redundant code, which is >
difficult to maintain. If there is a need in the future, just support it.

I think it's not correct, right? If we redeliver individual messages,
we should not filter out any messages.

And it also will be an issue for redeliverUnacknowledgedMessages()?
The application received messages from the internal queue of the consumer,
but the message processing failed due to some temporary failures.
So, they want to redeliver all the received messages and try again.
In this case, we should not filter the messages.

And for a failover subscription. The active consumer might be changed after
the disconnection. We can't ensure the consumer will not receive
duplicate messages, right? If yes, we should mention it in the proposal.

Thanks,
Penghui

On Wed, Mar 22, 2023 at 3:28 PM 丛搏 <co...@gmail.com> wrote:

> Hi, Michael:
>
> >
> > Is this the underlying motivation for the PIP?
> >
> > From my perspective, the PIP is seeking to decrease duplicate messages
> > experienced due to disconnections from the broker.
> This PIP is not aimed at reducing message duplication but at
> completely preventing it.
> Therefore we have to consider all edge cases, including redeliver,
> reconnection, and receive.
> We must fully guarantee that the message will not be received repeatedly.
> >
> > > The problem of the resetting cursor can be optimized in the future
> >
> > Why should we push off solving this problem? It seems fundamental to
> > this PIP and should not be ignored. At the very least, I think we need
> > to have an idea of what the future solution would be before we defer
> > its implementation.
> the reset cursor is difficult to guarantee correctness. when we use
> cumulative ack,
> We may generate some errors, after reset cursor then client consumer
> cumulative ack,
> we will lose some messages.
> Here are more edge cases, I just want to make the problem simple,
> at least under the transaction I think it is very good, the current
> design will not make mistakes, and it will not affect your current
> thinking.
> You can do what you want, I can do what I want.
>
> Thanks,
> Bo
> >
> > Thanks,
> > Michael
> >
> >
> > On Tue, Mar 21, 2023 at 10:52 PM 丛搏 <co...@gmail.com> wrote:
> > >
> > > Hi, Michael
> > > > In this case, the consumer does not have the source of truth for the
> > > > readPosition. It would leave the new protocol field for
> `readPosition`
> > > > empty and the broker would use its source of truth for the read
> > > > position.
> > > application has received all the messages by application thread. we
> also need a
> > > correct `startPosition`, right? but in your way, we will think about
> > > the consumer
> > > hasn't received any messages.
> > >
> > > >
> > > > > why do we need to invoke `BlockingQueue.take` and `synchronized`
> in the
> > > > > same logic? it's a bad code.
> > > >
> > > > We don't need to synchronize this code here because the logic will
> > > > come after the consumer has been disconnected from broker a and
> before
> > > > it is connected to broker b.
> > > The application takes a message from the queue then reconnect,
> > > the SubCommond can use the right startPostion? example:
> > > 1. application receives one message with `MessageId = 1`
> > > 2. consumer reconnect discovers the queue is empty, and the
> > > lastDequeMessageId doesn't change.
> > > 3. consumer sends a subcommand with MessageId.earliest, the `MessageId
> = 1`
> > > will redeliver from broker to client consumer, right?
> > >
> > > As we can see in the example, the application also can receive
> > > `MessageId = 1`, right?
> > > > We would not need to lock here because we do not enqueue new messages
> > > > after we've been disconnected from the broker and before we've sent
> > > > CommandSubscribe.
> > > we can see the code [0], the thread has changed.
> > > Where do we guarantee that no new messages will come in?
> > >
> > > >
> > > > Ultimately, I think a protocol solution will yield better results,
> > > > especially since we'll want to implement this feature in the other
> > > > client languages.
> > > The problem of the resetting cursor can be optimized in the future,
> > > but can you ensure the
> > > correctness of all the cases I mentioned above? IMO, if we use my
> > > design, client change,
> > > we don't need the broker to make any changes. its simple and it's easy
> > > to implement.
> > > I can make sure it's completely correct, I can make sure it's
> > > completely correct. In your design,
> > > I currently do not see a closed-loop implementation that can achieve
> > > at least in the java client.
> > >
> > > Thanks,
> > > Bo
> > > >
> > > > Thanks,
> > > > Michael
> > > >
> > > > On Tue, Mar 21, 2023 at 9:29 PM 丛搏 <co...@gmail.com> wrote:
> > > > >
> > > > > Hi, Michael:
> > > > >
> > > > > Michael Marshall <mm...@apache.org> 于2023年3月21日周二 23:17写道:
> > > > >
> > > > > >
> > > > > > One more point. Instead of keeping track of the latest message
> seen by
> > > > > > the application, the logic in my solution would actually just
> check
> > > > > > the last message in the `incomingMessages` queue (as in the most
> > > > > > recently added), and use that as the read position in the
> subscribe
> > > > > > command. If we made this change, we would have to change this
> code [0]
> > > > > > to not drop the `incomingMessages` queue.
> > > > >
> > > > > case 1:
> > > > > What we define the message that the application has seen?
> > > > > I think it is the[0], when the `incomingMessages` queue is empty,
> > > > > how do we get the correct `startPosition`?
> > > > > What I think we should lock the receive logic in [1]
> > > > > ```
> > > > > synchronized (this) {
> > > > >     message = incomingMessages.take();
> > > > >     messageProcessed(message);
> > > > > }
> > > > > ```
> > > > > why do we need to invoke `BlockingQueue.take` and `synchronized`
> in the
> > > > > same logic? it's a bad code.
> > > > >
> > > > > case 2:
> > > > > If we sub with `startMessageId`, we also should lock any enqueue
> > > > > logic, like [2] and
> > > > > check to consumer's current state
> > > > > ```
> > > > > synchronized (this) {
> > > > >     if (consumer.isConnected) {
> > > > >         if (canEnqueueMessage(message) &&
> incomingMessages.offer(message)) {
> > > > >             // After we have enqueued the messages on
> > > > > `incomingMessages` queue, we cannot touch the message
> > > > >             // instance anymore, since for pooled messages, this
> > > > > instance was possibly already been released
> > > > >             // and recycled.
> > > > >             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
> messageSize);
> > > > >             getMemoryLimitController().ifPresent(limiter ->
> > > > > limiter.forceReserveMemory(messageSize));
> > > > >             updateAutoScaleReceiverQueueHint();
> > > > >         }
> > > > >     }
> > > > > }
> > > > > ```
> > > > > case 3:
> > > > > when we subcommand sends to broker with `startMessageId = 1`, then
> the
> > > > > broker push message
> > > > > has not yet entered `incommingQueue`, the application invokes
> > > > > redeliver. in this way, we don't
> > > > > filter messages are correct, right?
> > > > >
> > > > > These are some cases that I simply thought of, and there must be
> > > > > others that I haven't thought
> > > > > of. Are you sure we can handle these problems correctly?
> > > > >
> > > > > > The problem of "the consumer doesn't know" seems like something
> that
> > > > > > is reasonably within the protocol's responsibilities. In this
> case, an
> > > > > > event happens on the broker, and the broker can tell the
> consumer.
> > > > >
> > > > > I don't think a simple change protocol can solve these problems,
> > > > > We can't promise that every consumer can receive the broker reset
> > > > > cursor request.
> > > > > When the consumer reconnects, the broker can't send the reset
> cursor request to
> > > > > the client consumers, right? In this case, the consumer is still
> unaware, right?
> > > > >
> > > > >
> > > > > [0]
> https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
> > > > > [1]
> https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
> > > > > [2]
> https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
> > > > > >
> > > > > > Thanks,
> > > > > > Michael
> > > > > >
> > > > > > [0]
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > >
> > > > > > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <
> mmarshall@apache.org> wrote:
> > > > > > >
> > > > > > > > if we add the new field in CommandSubscribe, we should ensure
> > > > > > > > the synchronization between consumer reconnection and user
> > > > > > > > calling receive and redeliverUnack method. it will affect
> the performance
> > > > > > > > of receive. expose synchronization to hot paths it not a
> good idea.
> > > > > > >
> > > > > > > I don't think this is a valid objection. I am pretty sure we
> already
> > > > > > > synchronize in the relevant places in the consumer to solve
> the exact
> > > > > > > race condition you're concerned about: [0] [1].
> > > > > > >
> > > > > > > My proposed operation is to keep track of the latest message
> id that
> > > > > > > the application has seen, and then tell the broker that id when
> > > > > > > sending the Subscribe command. We already do similar logic
> here [2]
> > > > > > > [3], but instead of getting the first message id the consumer
> hasn't
> > > > > > > seen, we'll get the latest message id seen.
> > > > > > >
> > > > > > > Regarding performance, the PIP doesn't touch on how it will
> filter out
> > > > > > > messages. What is the planned approach? In my understanding,
> the
> > > > > > > client will keep track of the latest message id that the
> application
> > > > > > > has seen and then will need to compare that message id against
> every
> > > > > > > new mess. As such, it seems like telling the broker where to
> start
> > > > > > > instead of naively checking a filter on every message would be
> > > > > > > cheaper.
> > > > > > >
> > > > > > > > As described in Compatibility in PIP. Client consumer
> doesn't know
> > > > > > > > Pulsar Admin reset cursor.
> > > > > > >
> > > > > > > The problem of "the consumer doesn't know" seems like
> something that
> > > > > > > is reasonably within the protocol's responsibilities. In this
> case, an
> > > > > > > event happens on the broker, and the broker can tell the
> consumer.
> > > > > > >
> > > > > > > > * <p>Consumers should close when the server resets the
> cursor,
> > > > > > > > * when the cursor reset success, and then restart. Otherwise,
> > > > > > > > * the consumer will not receive the history messages.
> > > > > > >
> > > > > > > This is introducing a confusing edge case that requires
> reading a
> > > > > > > Javadoc in order to understand. That seems risky to me, and I
> do not
> > > > > > > think we should add such an edge case. A new protocol message
> would
> > > > > > > easily handle it and make it transparent to the application.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Michael
> > > > > > >
> > > > > > > [0]
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> > > > > > > [1]
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> > > > > > > [2]
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > > > [3]
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
> > > > > > >
> > > > > > > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> > > > > > > <yu...@streamnative.io.invalid> wrote:
> > > > > > > >
> > > > > > > > +1
> > > > > > > >
> > > > > > > > Hi, Bo :
> > > > > > > >
> > > > > > > > Thanks for your explanation. That makes sense to me.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Yubiao Feng
> > > > > > > >
> > > > > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com>
> wrote:
> > > > > > > >
> > > > > > > > > Hi, pulsar community:
> > > > > > > > >
> > > > > > > > > I started a PIP about `Client consumer filter received
> messages`.
> > > > > > > > >
> > > > > > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Bo
> > > > > > > > >
>

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by 丛搏 <co...@gmail.com>.
Hi, Michael:

>
> Is this the underlying motivation for the PIP?
>
> From my perspective, the PIP is seeking to decrease duplicate messages
> experienced due to disconnections from the broker.
This PIP is not aimed at reducing message duplication but at
completely preventing it.
Therefore we have to consider all edge cases, including redeliver,
reconnection, and receive.
We must fully guarantee that the message will not be received repeatedly.
>
> > The problem of the resetting cursor can be optimized in the future
>
> Why should we push off solving this problem? It seems fundamental to
> this PIP and should not be ignored. At the very least, I think we need
> to have an idea of what the future solution would be before we defer
> its implementation.
the reset cursor is difficult to guarantee correctness. when we use
cumulative ack,
We may generate some errors, after reset cursor then client consumer
cumulative ack,
we will lose some messages.
Here are more edge cases, I just want to make the problem simple,
at least under the transaction I think it is very good, the current
design will not make mistakes, and it will not affect your current thinking.
You can do what you want, I can do what I want.

Thanks,
Bo
>
> Thanks,
> Michael
>
>
> On Tue, Mar 21, 2023 at 10:52 PM 丛搏 <co...@gmail.com> wrote:
> >
> > Hi, Michael
> > > In this case, the consumer does not have the source of truth for the
> > > readPosition. It would leave the new protocol field for `readPosition`
> > > empty and the broker would use its source of truth for the read
> > > position.
> > application has received all the messages by application thread. we also need a
> > correct `startPosition`, right? but in your way, we will think about
> > the consumer
> > hasn't received any messages.
> >
> > >
> > > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > > > same logic? it's a bad code.
> > >
> > > We don't need to synchronize this code here because the logic will
> > > come after the consumer has been disconnected from broker a and before
> > > it is connected to broker b.
> > The application takes a message from the queue then reconnect,
> > the SubCommond can use the right startPostion? example:
> > 1. application receives one message with `MessageId = 1`
> > 2. consumer reconnect discovers the queue is empty, and the
> > lastDequeMessageId doesn't change.
> > 3. consumer sends a subcommand with MessageId.earliest, the `MessageId = 1`
> > will redeliver from broker to client consumer, right?
> >
> > As we can see in the example, the application also can receive
> > `MessageId = 1`, right?
> > > We would not need to lock here because we do not enqueue new messages
> > > after we've been disconnected from the broker and before we've sent
> > > CommandSubscribe.
> > we can see the code [0], the thread has changed.
> > Where do we guarantee that no new messages will come in?
> >
> > >
> > > Ultimately, I think a protocol solution will yield better results,
> > > especially since we'll want to implement this feature in the other
> > > client languages.
> > The problem of the resetting cursor can be optimized in the future,
> > but can you ensure the
> > correctness of all the cases I mentioned above? IMO, if we use my
> > design, client change,
> > we don't need the broker to make any changes. its simple and it's easy
> > to implement.
> > I can make sure it's completely correct, I can make sure it's
> > completely correct. In your design,
> > I currently do not see a closed-loop implementation that can achieve
> > at least in the java client.
> >
> > Thanks,
> > Bo
> > >
> > > Thanks,
> > > Michael
> > >
> > > On Tue, Mar 21, 2023 at 9:29 PM 丛搏 <co...@gmail.com> wrote:
> > > >
> > > > Hi, Michael:
> > > >
> > > > Michael Marshall <mm...@apache.org> 于2023年3月21日周二 23:17写道:
> > > >
> > > > >
> > > > > One more point. Instead of keeping track of the latest message seen by
> > > > > the application, the logic in my solution would actually just check
> > > > > the last message in the `incomingMessages` queue (as in the most
> > > > > recently added), and use that as the read position in the subscribe
> > > > > command. If we made this change, we would have to change this code [0]
> > > > > to not drop the `incomingMessages` queue.
> > > >
> > > > case 1:
> > > > What we define the message that the application has seen?
> > > > I think it is the[0], when the `incomingMessages` queue is empty,
> > > > how do we get the correct `startPosition`?
> > > > What I think we should lock the receive logic in [1]
> > > > ```
> > > > synchronized (this) {
> > > >     message = incomingMessages.take();
> > > >     messageProcessed(message);
> > > > }
> > > > ```
> > > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > > > same logic? it's a bad code.
> > > >
> > > > case 2:
> > > > If we sub with `startMessageId`, we also should lock any enqueue
> > > > logic, like [2] and
> > > > check to consumer's current state
> > > > ```
> > > > synchronized (this) {
> > > >     if (consumer.isConnected) {
> > > >         if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
> > > >             // After we have enqueued the messages on
> > > > `incomingMessages` queue, we cannot touch the message
> > > >             // instance anymore, since for pooled messages, this
> > > > instance was possibly already been released
> > > >             // and recycled.
> > > >             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
> > > >             getMemoryLimitController().ifPresent(limiter ->
> > > > limiter.forceReserveMemory(messageSize));
> > > >             updateAutoScaleReceiverQueueHint();
> > > >         }
> > > >     }
> > > > }
> > > > ```
> > > > case 3:
> > > > when we subcommand sends to broker with `startMessageId = 1`, then the
> > > > broker push message
> > > > has not yet entered `incommingQueue`, the application invokes
> > > > redeliver. in this way, we don't
> > > > filter messages are correct, right?
> > > >
> > > > These are some cases that I simply thought of, and there must be
> > > > others that I haven't thought
> > > > of. Are you sure we can handle these problems correctly?
> > > >
> > > > > The problem of "the consumer doesn't know" seems like something that
> > > > > is reasonably within the protocol's responsibilities. In this case, an
> > > > > event happens on the broker, and the broker can tell the consumer.
> > > >
> > > > I don't think a simple change protocol can solve these problems,
> > > > We can't promise that every consumer can receive the broker reset
> > > > cursor request.
> > > > When the consumer reconnects, the broker can't send the reset cursor request to
> > > > the client consumers, right? In this case, the consumer is still unaware, right?
> > > >
> > > >
> > > > [0] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
> > > > [1] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
> > > > [2] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
> > > > >
> > > > > Thanks,
> > > > > Michael
> > > > >
> > > > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > >
> > > > > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <mm...@apache.org> wrote:
> > > > > >
> > > > > > > if we add the new field in CommandSubscribe, we should ensure
> > > > > > > the synchronization between consumer reconnection and user
> > > > > > > calling receive and redeliverUnack method. it will affect the performance
> > > > > > > of receive. expose synchronization to hot paths it not a good idea.
> > > > > >
> > > > > > I don't think this is a valid objection. I am pretty sure we already
> > > > > > synchronize in the relevant places in the consumer to solve the exact
> > > > > > race condition you're concerned about: [0] [1].
> > > > > >
> > > > > > My proposed operation is to keep track of the latest message id that
> > > > > > the application has seen, and then tell the broker that id when
> > > > > > sending the Subscribe command. We already do similar logic here [2]
> > > > > > [3], but instead of getting the first message id the consumer hasn't
> > > > > > seen, we'll get the latest message id seen.
> > > > > >
> > > > > > Regarding performance, the PIP doesn't touch on how it will filter out
> > > > > > messages. What is the planned approach? In my understanding, the
> > > > > > client will keep track of the latest message id that the application
> > > > > > has seen and then will need to compare that message id against every
> > > > > > new mess. As such, it seems like telling the broker where to start
> > > > > > instead of naively checking a filter on every message would be
> > > > > > cheaper.
> > > > > >
> > > > > > > As described in Compatibility in PIP. Client consumer doesn't know
> > > > > > > Pulsar Admin reset cursor.
> > > > > >
> > > > > > The problem of "the consumer doesn't know" seems like something that
> > > > > > is reasonably within the protocol's responsibilities. In this case, an
> > > > > > event happens on the broker, and the broker can tell the consumer.
> > > > > >
> > > > > > > * <p>Consumers should close when the server resets the cursor,
> > > > > > > * when the cursor reset success, and then restart. Otherwise,
> > > > > > > * the consumer will not receive the history messages.
> > > > > >
> > > > > > This is introducing a confusing edge case that requires reading a
> > > > > > Javadoc in order to understand. That seems risky to me, and I do not
> > > > > > think we should add such an edge case. A new protocol message would
> > > > > > easily handle it and make it transparent to the application.
> > > > > >
> > > > > > Thanks,
> > > > > > Michael
> > > > > >
> > > > > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> > > > > > [1] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> > > > > > [2] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > > [3] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
> > > > > >
> > > > > > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> > > > > > <yu...@streamnative.io.invalid> wrote:
> > > > > > >
> > > > > > > +1
> > > > > > >
> > > > > > > Hi, Bo :
> > > > > > >
> > > > > > > Thanks for your explanation. That makes sense to me.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Yubiao Feng
> > > > > > >
> > > > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
> > > > > > >
> > > > > > > > Hi, pulsar community:
> > > > > > > >
> > > > > > > > I started a PIP about `Client consumer filter received messages`.
> > > > > > > >
> > > > > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Bo
> > > > > > > >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by Michael Marshall <mm...@apache.org>.
I am not following your objections to the protocol solution. It might
be more productive if I provided a draft PR with a sample
implementation. I'm not sure that I'll have time, but I'll try to put
something together this week.

> At least it will simplify the process of using cumulative ack with the
> transaction.

Is this the underlying motivation for the PIP?

From my perspective, the PIP is seeking to decrease duplicate messages
experienced due to disconnections from the broker.

> The problem of the resetting cursor can be optimized in the future

Why should we push off solving this problem? It seems fundamental to
this PIP and should not be ignored. At the very least, I think we need
to have an idea of what the future solution would be before we defer
its implementation.

Thanks,
Michael


On Tue, Mar 21, 2023 at 10:52 PM 丛搏 <co...@gmail.com> wrote:
>
> Hi, Michael
> > In this case, the consumer does not have the source of truth for the
> > readPosition. It would leave the new protocol field for `readPosition`
> > empty and the broker would use its source of truth for the read
> > position.
> application has received all the messages by application thread. we also need a
> correct `startPosition`, right? but in your way, we will think about
> the consumer
> hasn't received any messages.
>
> >
> > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > > same logic? it's a bad code.
> >
> > We don't need to synchronize this code here because the logic will
> > come after the consumer has been disconnected from broker a and before
> > it is connected to broker b.
> The application takes a message from the queue then reconnect,
> the SubCommond can use the right startPostion? example:
> 1. application receives one message with `MessageId = 1`
> 2. consumer reconnect discovers the queue is empty, and the
> lastDequeMessageId doesn't change.
> 3. consumer sends a subcommand with MessageId.earliest, the `MessageId = 1`
> will redeliver from broker to client consumer, right?
>
> As we can see in the example, the application also can receive
> `MessageId = 1`, right?
> > We would not need to lock here because we do not enqueue new messages
> > after we've been disconnected from the broker and before we've sent
> > CommandSubscribe.
> we can see the code [0], the thread has changed.
> Where do we guarantee that no new messages will come in?
>
> >
> > Ultimately, I think a protocol solution will yield better results,
> > especially since we'll want to implement this feature in the other
> > client languages.
> The problem of the resetting cursor can be optimized in the future,
> but can you ensure the
> correctness of all the cases I mentioned above? IMO, if we use my
> design, client change,
> we don't need the broker to make any changes. its simple and it's easy
> to implement.
> I can make sure it's completely correct, I can make sure it's
> completely correct. In your design,
> I currently do not see a closed-loop implementation that can achieve
> at least in the java client.
>
> Thanks,
> Bo
> >
> > Thanks,
> > Michael
> >
> > On Tue, Mar 21, 2023 at 9:29 PM 丛搏 <co...@gmail.com> wrote:
> > >
> > > Hi, Michael:
> > >
> > > Michael Marshall <mm...@apache.org> 于2023年3月21日周二 23:17写道:
> > >
> > > >
> > > > One more point. Instead of keeping track of the latest message seen by
> > > > the application, the logic in my solution would actually just check
> > > > the last message in the `incomingMessages` queue (as in the most
> > > > recently added), and use that as the read position in the subscribe
> > > > command. If we made this change, we would have to change this code [0]
> > > > to not drop the `incomingMessages` queue.
> > >
> > > case 1:
> > > What we define the message that the application has seen?
> > > I think it is the[0], when the `incomingMessages` queue is empty,
> > > how do we get the correct `startPosition`?
> > > What I think we should lock the receive logic in [1]
> > > ```
> > > synchronized (this) {
> > >     message = incomingMessages.take();
> > >     messageProcessed(message);
> > > }
> > > ```
> > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > > same logic? it's a bad code.
> > >
> > > case 2:
> > > If we sub with `startMessageId`, we also should lock any enqueue
> > > logic, like [2] and
> > > check to consumer's current state
> > > ```
> > > synchronized (this) {
> > >     if (consumer.isConnected) {
> > >         if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
> > >             // After we have enqueued the messages on
> > > `incomingMessages` queue, we cannot touch the message
> > >             // instance anymore, since for pooled messages, this
> > > instance was possibly already been released
> > >             // and recycled.
> > >             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
> > >             getMemoryLimitController().ifPresent(limiter ->
> > > limiter.forceReserveMemory(messageSize));
> > >             updateAutoScaleReceiverQueueHint();
> > >         }
> > >     }
> > > }
> > > ```
> > > case 3:
> > > when we subcommand sends to broker with `startMessageId = 1`, then the
> > > broker push message
> > > has not yet entered `incommingQueue`, the application invokes
> > > redeliver. in this way, we don't
> > > filter messages are correct, right?
> > >
> > > These are some cases that I simply thought of, and there must be
> > > others that I haven't thought
> > > of. Are you sure we can handle these problems correctly?
> > >
> > > > The problem of "the consumer doesn't know" seems like something that
> > > > is reasonably within the protocol's responsibilities. In this case, an
> > > > event happens on the broker, and the broker can tell the consumer.
> > >
> > > I don't think a simple change protocol can solve these problems,
> > > We can't promise that every consumer can receive the broker reset
> > > cursor request.
> > > When the consumer reconnects, the broker can't send the reset cursor request to
> > > the client consumers, right? In this case, the consumer is still unaware, right?
> > >
> > >
> > > [0] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
> > > [1] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
> > > [2] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
> > > >
> > > > Thanks,
> > > > Michael
> > > >
> > > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > >
> > > > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <mm...@apache.org> wrote:
> > > > >
> > > > > > if we add the new field in CommandSubscribe, we should ensure
> > > > > > the synchronization between consumer reconnection and user
> > > > > > calling receive and redeliverUnack method. it will affect the performance
> > > > > > of receive. expose synchronization to hot paths it not a good idea.
> > > > >
> > > > > I don't think this is a valid objection. I am pretty sure we already
> > > > > synchronize in the relevant places in the consumer to solve the exact
> > > > > race condition you're concerned about: [0] [1].
> > > > >
> > > > > My proposed operation is to keep track of the latest message id that
> > > > > the application has seen, and then tell the broker that id when
> > > > > sending the Subscribe command. We already do similar logic here [2]
> > > > > [3], but instead of getting the first message id the consumer hasn't
> > > > > seen, we'll get the latest message id seen.
> > > > >
> > > > > Regarding performance, the PIP doesn't touch on how it will filter out
> > > > > messages. What is the planned approach? In my understanding, the
> > > > > client will keep track of the latest message id that the application
> > > > > has seen and then will need to compare that message id against every
> > > > > new mess. As such, it seems like telling the broker where to start
> > > > > instead of naively checking a filter on every message would be
> > > > > cheaper.
> > > > >
> > > > > > As described in Compatibility in PIP. Client consumer doesn't know
> > > > > > Pulsar Admin reset cursor.
> > > > >
> > > > > The problem of "the consumer doesn't know" seems like something that
> > > > > is reasonably within the protocol's responsibilities. In this case, an
> > > > > event happens on the broker, and the broker can tell the consumer.
> > > > >
> > > > > > * <p>Consumers should close when the server resets the cursor,
> > > > > > * when the cursor reset success, and then restart. Otherwise,
> > > > > > * the consumer will not receive the history messages.
> > > > >
> > > > > This is introducing a confusing edge case that requires reading a
> > > > > Javadoc in order to understand. That seems risky to me, and I do not
> > > > > think we should add such an edge case. A new protocol message would
> > > > > easily handle it and make it transparent to the application.
> > > > >
> > > > > Thanks,
> > > > > Michael
> > > > >
> > > > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> > > > > [1] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> > > > > [2] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > [3] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
> > > > >
> > > > > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> > > > > <yu...@streamnative.io.invalid> wrote:
> > > > > >
> > > > > > +1
> > > > > >
> > > > > > Hi, Bo :
> > > > > >
> > > > > > Thanks for your explanation. That makes sense to me.
> > > > > >
> > > > > > Thanks,
> > > > > > Yubiao Feng
> > > > > >
> > > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
> > > > > >
> > > > > > > Hi, pulsar community:
> > > > > > >
> > > > > > > I started a PIP about `Client consumer filter received messages`.
> > > > > > >
> > > > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Bo
> > > > > > >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by 丛搏 <co...@gmail.com>.
Hi, Michael
> In this case, the consumer does not have the source of truth for the
> readPosition. It would leave the new protocol field for `readPosition`
> empty and the broker would use its source of truth for the read
> position.
application has received all the messages by application thread. we also need a
correct `startPosition`, right? but in your way, we will think about
the consumer
hasn't received any messages.

>
> > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > same logic? it's a bad code.
>
> We don't need to synchronize this code here because the logic will
> come after the consumer has been disconnected from broker a and before
> it is connected to broker b.
The application takes a message from the queue then reconnect,
the SubCommond can use the right startPostion? example:
1. application receives one message with `MessageId = 1`
2. consumer reconnect discovers the queue is empty, and the
lastDequeMessageId doesn't change.
3. consumer sends a subcommand with MessageId.earliest, the `MessageId = 1`
will redeliver from broker to client consumer, right?

As we can see in the example, the application also can receive
`MessageId = 1`, right?
> We would not need to lock here because we do not enqueue new messages
> after we've been disconnected from the broker and before we've sent
> CommandSubscribe.
we can see the code [0], the thread has changed.
Where do we guarantee that no new messages will come in?

>
> Ultimately, I think a protocol solution will yield better results,
> especially since we'll want to implement this feature in the other
> client languages.
The problem of the resetting cursor can be optimized in the future,
but can you ensure the
correctness of all the cases I mentioned above? IMO, if we use my
design, client change,
we don't need the broker to make any changes. its simple and it's easy
to implement.
I can make sure it's completely correct, I can make sure it's
completely correct. In your design,
I currently do not see a closed-loop implementation that can achieve
at least in the java client.

Thanks,
Bo
>
> Thanks,
> Michael
>
> On Tue, Mar 21, 2023 at 9:29 PM 丛搏 <co...@gmail.com> wrote:
> >
> > Hi, Michael:
> >
> > Michael Marshall <mm...@apache.org> 于2023年3月21日周二 23:17写道:
> >
> > >
> > > One more point. Instead of keeping track of the latest message seen by
> > > the application, the logic in my solution would actually just check
> > > the last message in the `incomingMessages` queue (as in the most
> > > recently added), and use that as the read position in the subscribe
> > > command. If we made this change, we would have to change this code [0]
> > > to not drop the `incomingMessages` queue.
> >
> > case 1:
> > What we define the message that the application has seen?
> > I think it is the[0], when the `incomingMessages` queue is empty,
> > how do we get the correct `startPosition`?
> > What I think we should lock the receive logic in [1]
> > ```
> > synchronized (this) {
> >     message = incomingMessages.take();
> >     messageProcessed(message);
> > }
> > ```
> > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > same logic? it's a bad code.
> >
> > case 2:
> > If we sub with `startMessageId`, we also should lock any enqueue
> > logic, like [2] and
> > check to consumer's current state
> > ```
> > synchronized (this) {
> >     if (consumer.isConnected) {
> >         if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
> >             // After we have enqueued the messages on
> > `incomingMessages` queue, we cannot touch the message
> >             // instance anymore, since for pooled messages, this
> > instance was possibly already been released
> >             // and recycled.
> >             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
> >             getMemoryLimitController().ifPresent(limiter ->
> > limiter.forceReserveMemory(messageSize));
> >             updateAutoScaleReceiverQueueHint();
> >         }
> >     }
> > }
> > ```
> > case 3:
> > when we subcommand sends to broker with `startMessageId = 1`, then the
> > broker push message
> > has not yet entered `incommingQueue`, the application invokes
> > redeliver. in this way, we don't
> > filter messages are correct, right?
> >
> > These are some cases that I simply thought of, and there must be
> > others that I haven't thought
> > of. Are you sure we can handle these problems correctly?
> >
> > > The problem of "the consumer doesn't know" seems like something that
> > > is reasonably within the protocol's responsibilities. In this case, an
> > > event happens on the broker, and the broker can tell the consumer.
> >
> > I don't think a simple change protocol can solve these problems,
> > We can't promise that every consumer can receive the broker reset
> > cursor request.
> > When the consumer reconnects, the broker can't send the reset cursor request to
> > the client consumers, right? In this case, the consumer is still unaware, right?
> >
> >
> > [0] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
> > [1] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
> > [2] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
> > >
> > > Thanks,
> > > Michael
> > >
> > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > >
> > > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <mm...@apache.org> wrote:
> > > >
> > > > > if we add the new field in CommandSubscribe, we should ensure
> > > > > the synchronization between consumer reconnection and user
> > > > > calling receive and redeliverUnack method. it will affect the performance
> > > > > of receive. expose synchronization to hot paths it not a good idea.
> > > >
> > > > I don't think this is a valid objection. I am pretty sure we already
> > > > synchronize in the relevant places in the consumer to solve the exact
> > > > race condition you're concerned about: [0] [1].
> > > >
> > > > My proposed operation is to keep track of the latest message id that
> > > > the application has seen, and then tell the broker that id when
> > > > sending the Subscribe command. We already do similar logic here [2]
> > > > [3], but instead of getting the first message id the consumer hasn't
> > > > seen, we'll get the latest message id seen.
> > > >
> > > > Regarding performance, the PIP doesn't touch on how it will filter out
> > > > messages. What is the planned approach? In my understanding, the
> > > > client will keep track of the latest message id that the application
> > > > has seen and then will need to compare that message id against every
> > > > new mess. As such, it seems like telling the broker where to start
> > > > instead of naively checking a filter on every message would be
> > > > cheaper.
> > > >
> > > > > As described in Compatibility in PIP. Client consumer doesn't know
> > > > > Pulsar Admin reset cursor.
> > > >
> > > > The problem of "the consumer doesn't know" seems like something that
> > > > is reasonably within the protocol's responsibilities. In this case, an
> > > > event happens on the broker, and the broker can tell the consumer.
> > > >
> > > > > * <p>Consumers should close when the server resets the cursor,
> > > > > * when the cursor reset success, and then restart. Otherwise,
> > > > > * the consumer will not receive the history messages.
> > > >
> > > > This is introducing a confusing edge case that requires reading a
> > > > Javadoc in order to understand. That seems risky to me, and I do not
> > > > think we should add such an edge case. A new protocol message would
> > > > easily handle it and make it transparent to the application.
> > > >
> > > > Thanks,
> > > > Michael
> > > >
> > > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> > > > [1] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> > > > [2] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > [3] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
> > > >
> > > > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> > > > <yu...@streamnative.io.invalid> wrote:
> > > > >
> > > > > +1
> > > > >
> > > > > Hi, Bo :
> > > > >
> > > > > Thanks for your explanation. That makes sense to me.
> > > > >
> > > > > Thanks,
> > > > > Yubiao Feng
> > > > >
> > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
> > > > >
> > > > > > Hi, pulsar community:
> > > > > >
> > > > > > I started a PIP about `Client consumer filter received messages`.
> > > > > >
> > > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > > >
> > > > > > Thanks,
> > > > > > Bo
> > > > > >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by Michael Marshall <mm...@apache.org>.
Good questions. There were some gaps in my description above.

> I think it is the[0], when the `incomingMessages` queue is empty,
> how do we get the correct `startPosition`?

In this case, the consumer does not have the source of truth for the
readPosition. It would leave the new protocol field for `readPosition`
empty and the broker would use its source of truth for the read
position.

> why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> same logic? it's a bad code.

We don't need to synchronize this code here because the logic will
come after the consumer has been disconnected from broker a and before
it is connected to broker b.

> If we sub with `startMessageId`, we also should lock any enqueue
> logic, like [2] and check to consumer's current state

We would not need to lock here because we do not enqueue new messages
after we've been disconnected from the broker and before we've sent
CommandSubscribe.

The logic is that when a client sends CommandSubcribe, it will tell
the broker the last message id it received in the last session. The
broker will set the readPosition to that message id +1.

> We can't promise that every consumer can receive the broker reset
> cursor request.

This is a valid concern. In the normal case where we do not drop the
connection, we trivially guarantee the client receives the protocol
command before receiving the next messages because TCP connections
guarantee order. Interestingly, the current design disconnects
consumers when we reset the cursor, but if we add a protocol message
to tell consumers what is happening, we can avoid this unnecessary
disconnection.

In the case where we drop a connection, there are two options.

1. We could implement some kind of reconciliation logic using a vector clock.
2. We could say that in the network failure scenario, the consumer
should not tell the broker its readPosition because it might have
missed an event on the broker. In that case, we would accept that some
duplicates will happen and we can "let it fail". The advantage to
letting it fail is that we have simpler code.

I think 2 would be the right initial implementation, and we could
implement 1 if we find 2 is a problem.

My biggest concern with the current proposal is that cursor resets
will not work correctly when this feature is used. If an administrator
resets a cursor to earliest and a client is using this setting, it
will filter out every message. That could be an expensive and
confusing mistake. I think we need to find a better solution than
simple filtering.

Ultimately, I think a protocol solution will yield better results,
especially since we'll want to implement this feature in the other
client languages.

Thanks,
Michael

On Tue, Mar 21, 2023 at 9:29 PM 丛搏 <co...@gmail.com> wrote:
>
> Hi, Michael:
>
> Michael Marshall <mm...@apache.org> 于2023年3月21日周二 23:17写道:
>
> >
> > One more point. Instead of keeping track of the latest message seen by
> > the application, the logic in my solution would actually just check
> > the last message in the `incomingMessages` queue (as in the most
> > recently added), and use that as the read position in the subscribe
> > command. If we made this change, we would have to change this code [0]
> > to not drop the `incomingMessages` queue.
>
> case 1:
> What we define the message that the application has seen?
> I think it is the[0], when the `incomingMessages` queue is empty,
> how do we get the correct `startPosition`?
> What I think we should lock the receive logic in [1]
> ```
> synchronized (this) {
>     message = incomingMessages.take();
>     messageProcessed(message);
> }
> ```
> why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> same logic? it's a bad code.
>
> case 2:
> If we sub with `startMessageId`, we also should lock any enqueue
> logic, like [2] and
> check to consumer's current state
> ```
> synchronized (this) {
>     if (consumer.isConnected) {
>         if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
>             // After we have enqueued the messages on
> `incomingMessages` queue, we cannot touch the message
>             // instance anymore, since for pooled messages, this
> instance was possibly already been released
>             // and recycled.
>             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
>             getMemoryLimitController().ifPresent(limiter ->
> limiter.forceReserveMemory(messageSize));
>             updateAutoScaleReceiverQueueHint();
>         }
>     }
> }
> ```
> case 3:
> when we subcommand sends to broker with `startMessageId = 1`, then the
> broker push message
> has not yet entered `incommingQueue`, the application invokes
> redeliver. in this way, we don't
> filter messages are correct, right?
>
> These are some cases that I simply thought of, and there must be
> others that I haven't thought
> of. Are you sure we can handle these problems correctly?
>
> > The problem of "the consumer doesn't know" seems like something that
> > is reasonably within the protocol's responsibilities. In this case, an
> > event happens on the broker, and the broker can tell the consumer.
>
> I don't think a simple change protocol can solve these problems,
> We can't promise that every consumer can receive the broker reset
> cursor request.
> When the consumer reconnects, the broker can't send the reset cursor request to
> the client consumers, right? In this case, the consumer is still unaware, right?
>
>
> [0] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
> [1] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
> [2] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
> >
> > Thanks,
> > Michael
> >
> > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> >
> > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <mm...@apache.org> wrote:
> > >
> > > > if we add the new field in CommandSubscribe, we should ensure
> > > > the synchronization between consumer reconnection and user
> > > > calling receive and redeliverUnack method. it will affect the performance
> > > > of receive. expose synchronization to hot paths it not a good idea.
> > >
> > > I don't think this is a valid objection. I am pretty sure we already
> > > synchronize in the relevant places in the consumer to solve the exact
> > > race condition you're concerned about: [0] [1].
> > >
> > > My proposed operation is to keep track of the latest message id that
> > > the application has seen, and then tell the broker that id when
> > > sending the Subscribe command. We already do similar logic here [2]
> > > [3], but instead of getting the first message id the consumer hasn't
> > > seen, we'll get the latest message id seen.
> > >
> > > Regarding performance, the PIP doesn't touch on how it will filter out
> > > messages. What is the planned approach? In my understanding, the
> > > client will keep track of the latest message id that the application
> > > has seen and then will need to compare that message id against every
> > > new mess. As such, it seems like telling the broker where to start
> > > instead of naively checking a filter on every message would be
> > > cheaper.
> > >
> > > > As described in Compatibility in PIP. Client consumer doesn't know
> > > > Pulsar Admin reset cursor.
> > >
> > > The problem of "the consumer doesn't know" seems like something that
> > > is reasonably within the protocol's responsibilities. In this case, an
> > > event happens on the broker, and the broker can tell the consumer.
> > >
> > > > * <p>Consumers should close when the server resets the cursor,
> > > > * when the cursor reset success, and then restart. Otherwise,
> > > > * the consumer will not receive the history messages.
> > >
> > > This is introducing a confusing edge case that requires reading a
> > > Javadoc in order to understand. That seems risky to me, and I do not
> > > think we should add such an edge case. A new protocol message would
> > > easily handle it and make it transparent to the application.
> > >
> > > Thanks,
> > > Michael
> > >
> > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> > > [1] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> > > [2] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > [3] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
> > >
> > > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> > > <yu...@streamnative.io.invalid> wrote:
> > > >
> > > > +1
> > > >
> > > > Hi, Bo :
> > > >
> > > > Thanks for your explanation. That makes sense to me.
> > > >
> > > > Thanks,
> > > > Yubiao Feng
> > > >
> > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
> > > >
> > > > > Hi, pulsar community:
> > > > >
> > > > > I started a PIP about `Client consumer filter received messages`.
> > > > >
> > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > >
> > > > > Thanks,
> > > > > Bo
> > > > >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by 丛搏 <co...@gmail.com>.
Hi, Yunze:

It is true that hiding these details in the SDK still requires users to have a
certain level of understanding of the configuration. However, this approach
can still be helpful for users who want to use the feature but want
to avoid dealing with the nitty-gritty details of implementation.

At least it will simplify the process of using cumulative ack with the
transaction.

Thanks,
Bo

Yunze Xu <yz...@streamnative.io.invalid> 于2023年3月22日周三 10:32写道:
>
> I just missed the point that the reset cursor operations do not work
> for the consumer. IIUC, the seek operation does not work as well. Then
> I think the option is not user-friendly as the PIP says:
>
> >  It needs to be enabled with a complete understanding of this configuration.
>
> If users want, they can also record the latest position for each
> consumer at the application side and filter the messages by the public
> `MessageId#compareTo` API. If hiding these details in SDK still
> requires users to know these details, I think it would not be better
> than doing that explicitly in the application.
>
> Thanks,
> Yunze
>
> On Wed, Mar 22, 2023 at 10:29 AM 丛搏 <co...@gmail.com> wrote:
> >
> > Hi, Michael:
> >
> > Michael Marshall <mm...@apache.org> 于2023年3月21日周二 23:17写道:
> >
> > >
> > > One more point. Instead of keeping track of the latest message seen by
> > > the application, the logic in my solution would actually just check
> > > the last message in the `incomingMessages` queue (as in the most
> > > recently added), and use that as the read position in the subscribe
> > > command. If we made this change, we would have to change this code [0]
> > > to not drop the `incomingMessages` queue.
> >
> > case 1:
> > What we define the message that the application has seen?
> > I think it is the[0], when the `incomingMessages` queue is empty,
> > how do we get the correct `startPosition`?
> > What I think we should lock the receive logic in [1]
> > ```
> > synchronized (this) {
> >     message = incomingMessages.take();
> >     messageProcessed(message);
> > }
> > ```
> > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > same logic? it's a bad code.
> >
> > case 2:
> > If we sub with `startMessageId`, we also should lock any enqueue
> > logic, like [2] and
> > check to consumer's current state
> > ```
> > synchronized (this) {
> >     if (consumer.isConnected) {
> >         if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
> >             // After we have enqueued the messages on
> > `incomingMessages` queue, we cannot touch the message
> >             // instance anymore, since for pooled messages, this
> > instance was possibly already been released
> >             // and recycled.
> >             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
> >             getMemoryLimitController().ifPresent(limiter ->
> > limiter.forceReserveMemory(messageSize));
> >             updateAutoScaleReceiverQueueHint();
> >         }
> >     }
> > }
> > ```
> > case 3:
> > when we subcommand sends to broker with `startMessageId = 1`, then the
> > broker push message
> > has not yet entered `incommingQueue`, the application invokes
> > redeliver. in this way, we don't
> > filter messages are correct, right?
> >
> > These are some cases that I simply thought of, and there must be
> > others that I haven't thought
> > of. Are you sure we can handle these problems correctly?
> >
> > > The problem of "the consumer doesn't know" seems like something that
> > > is reasonably within the protocol's responsibilities. In this case, an
> > > event happens on the broker, and the broker can tell the consumer.
> >
> > I don't think a simple change protocol can solve these problems,
> > We can't promise that every consumer can receive the broker reset
> > cursor request.
> > When the consumer reconnects, the broker can't send the reset cursor request to
> > the client consumers, right? In this case, the consumer is still unaware, right?
> >
> >
> > [0] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
> > [1] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
> > [2] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
> > >
> > > Thanks,
> > > Michael
> > >
> > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > >
> > > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <mm...@apache.org> wrote:
> > > >
> > > > > if we add the new field in CommandSubscribe, we should ensure
> > > > > the synchronization between consumer reconnection and user
> > > > > calling receive and redeliverUnack method. it will affect the performance
> > > > > of receive. expose synchronization to hot paths it not a good idea.
> > > >
> > > > I don't think this is a valid objection. I am pretty sure we already
> > > > synchronize in the relevant places in the consumer to solve the exact
> > > > race condition you're concerned about: [0] [1].
> > > >
> > > > My proposed operation is to keep track of the latest message id that
> > > > the application has seen, and then tell the broker that id when
> > > > sending the Subscribe command. We already do similar logic here [2]
> > > > [3], but instead of getting the first message id the consumer hasn't
> > > > seen, we'll get the latest message id seen.
> > > >
> > > > Regarding performance, the PIP doesn't touch on how it will filter out
> > > > messages. What is the planned approach? In my understanding, the
> > > > client will keep track of the latest message id that the application
> > > > has seen and then will need to compare that message id against every
> > > > new mess. As such, it seems like telling the broker where to start
> > > > instead of naively checking a filter on every message would be
> > > > cheaper.
> > > >
> > > > > As described in Compatibility in PIP. Client consumer doesn't know
> > > > > Pulsar Admin reset cursor.
> > > >
> > > > The problem of "the consumer doesn't know" seems like something that
> > > > is reasonably within the protocol's responsibilities. In this case, an
> > > > event happens on the broker, and the broker can tell the consumer.
> > > >
> > > > > * <p>Consumers should close when the server resets the cursor,
> > > > > * when the cursor reset success, and then restart. Otherwise,
> > > > > * the consumer will not receive the history messages.
> > > >
> > > > This is introducing a confusing edge case that requires reading a
> > > > Javadoc in order to understand. That seems risky to me, and I do not
> > > > think we should add such an edge case. A new protocol message would
> > > > easily handle it and make it transparent to the application.
> > > >
> > > > Thanks,
> > > > Michael
> > > >
> > > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> > > > [1] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> > > > [2] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > [3] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
> > > >
> > > > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> > > > <yu...@streamnative.io.invalid> wrote:
> > > > >
> > > > > +1
> > > > >
> > > > > Hi, Bo :
> > > > >
> > > > > Thanks for your explanation. That makes sense to me.
> > > > >
> > > > > Thanks,
> > > > > Yubiao Feng
> > > > >
> > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
> > > > >
> > > > > > Hi, pulsar community:
> > > > > >
> > > > > > I started a PIP about `Client consumer filter received messages`.
> > > > > >
> > > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > > >
> > > > > > Thanks,
> > > > > > Bo
> > > > > >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by Yunze Xu <yz...@streamnative.io.INVALID>.
I just missed the point that the reset cursor operations do not work
for the consumer. IIUC, the seek operation does not work as well. Then
I think the option is not user-friendly as the PIP says:

>  It needs to be enabled with a complete understanding of this configuration.

If users want, they can also record the latest position for each
consumer at the application side and filter the messages by the public
`MessageId#compareTo` API. If hiding these details in SDK still
requires users to know these details, I think it would not be better
than doing that explicitly in the application.

Thanks,
Yunze

On Wed, Mar 22, 2023 at 10:29 AM 丛搏 <co...@gmail.com> wrote:
>
> Hi, Michael:
>
> Michael Marshall <mm...@apache.org> 于2023年3月21日周二 23:17写道:
>
> >
> > One more point. Instead of keeping track of the latest message seen by
> > the application, the logic in my solution would actually just check
> > the last message in the `incomingMessages` queue (as in the most
> > recently added), and use that as the read position in the subscribe
> > command. If we made this change, we would have to change this code [0]
> > to not drop the `incomingMessages` queue.
>
> case 1:
> What we define the message that the application has seen?
> I think it is the[0], when the `incomingMessages` queue is empty,
> how do we get the correct `startPosition`?
> What I think we should lock the receive logic in [1]
> ```
> synchronized (this) {
>     message = incomingMessages.take();
>     messageProcessed(message);
> }
> ```
> why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> same logic? it's a bad code.
>
> case 2:
> If we sub with `startMessageId`, we also should lock any enqueue
> logic, like [2] and
> check to consumer's current state
> ```
> synchronized (this) {
>     if (consumer.isConnected) {
>         if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
>             // After we have enqueued the messages on
> `incomingMessages` queue, we cannot touch the message
>             // instance anymore, since for pooled messages, this
> instance was possibly already been released
>             // and recycled.
>             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
>             getMemoryLimitController().ifPresent(limiter ->
> limiter.forceReserveMemory(messageSize));
>             updateAutoScaleReceiverQueueHint();
>         }
>     }
> }
> ```
> case 3:
> when we subcommand sends to broker with `startMessageId = 1`, then the
> broker push message
> has not yet entered `incommingQueue`, the application invokes
> redeliver. in this way, we don't
> filter messages are correct, right?
>
> These are some cases that I simply thought of, and there must be
> others that I haven't thought
> of. Are you sure we can handle these problems correctly?
>
> > The problem of "the consumer doesn't know" seems like something that
> > is reasonably within the protocol's responsibilities. In this case, an
> > event happens on the broker, and the broker can tell the consumer.
>
> I don't think a simple change protocol can solve these problems,
> We can't promise that every consumer can receive the broker reset
> cursor request.
> When the consumer reconnects, the broker can't send the reset cursor request to
> the client consumers, right? In this case, the consumer is still unaware, right?
>
>
> [0] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
> [1] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
> [2] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
> >
> > Thanks,
> > Michael
> >
> > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> >
> > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <mm...@apache.org> wrote:
> > >
> > > > if we add the new field in CommandSubscribe, we should ensure
> > > > the synchronization between consumer reconnection and user
> > > > calling receive and redeliverUnack method. it will affect the performance
> > > > of receive. expose synchronization to hot paths it not a good idea.
> > >
> > > I don't think this is a valid objection. I am pretty sure we already
> > > synchronize in the relevant places in the consumer to solve the exact
> > > race condition you're concerned about: [0] [1].
> > >
> > > My proposed operation is to keep track of the latest message id that
> > > the application has seen, and then tell the broker that id when
> > > sending the Subscribe command. We already do similar logic here [2]
> > > [3], but instead of getting the first message id the consumer hasn't
> > > seen, we'll get the latest message id seen.
> > >
> > > Regarding performance, the PIP doesn't touch on how it will filter out
> > > messages. What is the planned approach? In my understanding, the
> > > client will keep track of the latest message id that the application
> > > has seen and then will need to compare that message id against every
> > > new mess. As such, it seems like telling the broker where to start
> > > instead of naively checking a filter on every message would be
> > > cheaper.
> > >
> > > > As described in Compatibility in PIP. Client consumer doesn't know
> > > > Pulsar Admin reset cursor.
> > >
> > > The problem of "the consumer doesn't know" seems like something that
> > > is reasonably within the protocol's responsibilities. In this case, an
> > > event happens on the broker, and the broker can tell the consumer.
> > >
> > > > * <p>Consumers should close when the server resets the cursor,
> > > > * when the cursor reset success, and then restart. Otherwise,
> > > > * the consumer will not receive the history messages.
> > >
> > > This is introducing a confusing edge case that requires reading a
> > > Javadoc in order to understand. That seems risky to me, and I do not
> > > think we should add such an edge case. A new protocol message would
> > > easily handle it and make it transparent to the application.
> > >
> > > Thanks,
> > > Michael
> > >
> > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> > > [1] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> > > [2] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > [3] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
> > >
> > > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> > > <yu...@streamnative.io.invalid> wrote:
> > > >
> > > > +1
> > > >
> > > > Hi, Bo :
> > > >
> > > > Thanks for your explanation. That makes sense to me.
> > > >
> > > > Thanks,
> > > > Yubiao Feng
> > > >
> > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
> > > >
> > > > > Hi, pulsar community:
> > > > >
> > > > > I started a PIP about `Client consumer filter received messages`.
> > > > >
> > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > >
> > > > > Thanks,
> > > > > Bo
> > > > >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by 丛搏 <co...@gmail.com>.
Hi, Michael:

Michael Marshall <mm...@apache.org> 于2023年3月21日周二 23:17写道:

>
> One more point. Instead of keeping track of the latest message seen by
> the application, the logic in my solution would actually just check
> the last message in the `incomingMessages` queue (as in the most
> recently added), and use that as the read position in the subscribe
> command. If we made this change, we would have to change this code [0]
> to not drop the `incomingMessages` queue.

case 1:
What we define the message that the application has seen?
I think it is the[0], when the `incomingMessages` queue is empty,
how do we get the correct `startPosition`?
What I think we should lock the receive logic in [1]
```
synchronized (this) {
    message = incomingMessages.take();
    messageProcessed(message);
}
```
why do we need to invoke `BlockingQueue.take` and `synchronized` in the
same logic? it's a bad code.

case 2:
If we sub with `startMessageId`, we also should lock any enqueue
logic, like [2] and
check to consumer's current state
```
synchronized (this) {
    if (consumer.isConnected) {
        if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
            // After we have enqueued the messages on
`incomingMessages` queue, we cannot touch the message
            // instance anymore, since for pooled messages, this
instance was possibly already been released
            // and recycled.
            INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
            getMemoryLimitController().ifPresent(limiter ->
limiter.forceReserveMemory(messageSize));
            updateAutoScaleReceiverQueueHint();
        }
    }
}
```
case 3:
when we subcommand sends to broker with `startMessageId = 1`, then the
broker push message
has not yet entered `incommingQueue`, the application invokes
redeliver. in this way, we don't
filter messages are correct, right?

These are some cases that I simply thought of, and there must be
others that I haven't thought
of. Are you sure we can handle these problems correctly?

> The problem of "the consumer doesn't know" seems like something that
> is reasonably within the protocol's responsibilities. In this case, an
> event happens on the broker, and the broker can tell the consumer.

I don't think a simple change protocol can solve these problems,
We can't promise that every consumer can receive the broker reset
cursor request.
When the consumer reconnects, the broker can't send the reset cursor request to
the client consumers, right? In this case, the consumer is still unaware, right?


[0] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
[1] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
[2] https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
>
> Thanks,
> Michael
>
> [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
>
> On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <mm...@apache.org> wrote:
> >
> > > if we add the new field in CommandSubscribe, we should ensure
> > > the synchronization between consumer reconnection and user
> > > calling receive and redeliverUnack method. it will affect the performance
> > > of receive. expose synchronization to hot paths it not a good idea.
> >
> > I don't think this is a valid objection. I am pretty sure we already
> > synchronize in the relevant places in the consumer to solve the exact
> > race condition you're concerned about: [0] [1].
> >
> > My proposed operation is to keep track of the latest message id that
> > the application has seen, and then tell the broker that id when
> > sending the Subscribe command. We already do similar logic here [2]
> > [3], but instead of getting the first message id the consumer hasn't
> > seen, we'll get the latest message id seen.
> >
> > Regarding performance, the PIP doesn't touch on how it will filter out
> > messages. What is the planned approach? In my understanding, the
> > client will keep track of the latest message id that the application
> > has seen and then will need to compare that message id against every
> > new mess. As such, it seems like telling the broker where to start
> > instead of naively checking a filter on every message would be
> > cheaper.
> >
> > > As described in Compatibility in PIP. Client consumer doesn't know
> > > Pulsar Admin reset cursor.
> >
> > The problem of "the consumer doesn't know" seems like something that
> > is reasonably within the protocol's responsibilities. In this case, an
> > event happens on the broker, and the broker can tell the consumer.
> >
> > > * <p>Consumers should close when the server resets the cursor,
> > > * when the cursor reset success, and then restart. Otherwise,
> > > * the consumer will not receive the history messages.
> >
> > This is introducing a confusing edge case that requires reading a
> > Javadoc in order to understand. That seems risky to me, and I do not
> > think we should add such an edge case. A new protocol message would
> > easily handle it and make it transparent to the application.
> >
> > Thanks,
> > Michael
> >
> > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> > [1] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> > [2] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > [3] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
> >
> > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> > <yu...@streamnative.io.invalid> wrote:
> > >
> > > +1
> > >
> > > Hi, Bo :
> > >
> > > Thanks for your explanation. That makes sense to me.
> > >
> > > Thanks,
> > > Yubiao Feng
> > >
> > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
> > >
> > > > Hi, pulsar community:
> > > >
> > > > I started a PIP about `Client consumer filter received messages`.
> > > >
> > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > >
> > > > Thanks,
> > > > Bo
> > > >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by Michael Marshall <mm...@apache.org>.
One more point. Instead of keeping track of the latest message seen by
the application, the logic in my solution would actually just check
the last message in the `incomingMessages` queue (as in the most
recently added), and use that as the read position in the subscribe
command. If we made this change, we would have to change this code [0]
to not drop the `incomingMessages` queue.

Thanks,
Michael

[0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795

On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <mm...@apache.org> wrote:
>
> > if we add the new field in CommandSubscribe, we should ensure
> > the synchronization between consumer reconnection and user
> > calling receive and redeliverUnack method. it will affect the performance
> > of receive. expose synchronization to hot paths it not a good idea.
>
> I don't think this is a valid objection. I am pretty sure we already
> synchronize in the relevant places in the consumer to solve the exact
> race condition you're concerned about: [0] [1].
>
> My proposed operation is to keep track of the latest message id that
> the application has seen, and then tell the broker that id when
> sending the Subscribe command. We already do similar logic here [2]
> [3], but instead of getting the first message id the consumer hasn't
> seen, we'll get the latest message id seen.
>
> Regarding performance, the PIP doesn't touch on how it will filter out
> messages. What is the planned approach? In my understanding, the
> client will keep track of the latest message id that the application
> has seen and then will need to compare that message id against every
> new mess. As such, it seems like telling the broker where to start
> instead of naively checking a filter on every message would be
> cheaper.
>
> > As described in Compatibility in PIP. Client consumer doesn't know
> > Pulsar Admin reset cursor.
>
> The problem of "the consumer doesn't know" seems like something that
> is reasonably within the protocol's responsibilities. In this case, an
> event happens on the broker, and the broker can tell the consumer.
>
> > * <p>Consumers should close when the server resets the cursor,
> > * when the cursor reset success, and then restart. Otherwise,
> > * the consumer will not receive the history messages.
>
> This is introducing a confusing edge case that requires reading a
> Javadoc in order to understand. That seems risky to me, and I do not
> think we should add such an edge case. A new protocol message would
> easily handle it and make it transparent to the application.
>
> Thanks,
> Michael
>
> [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> [1] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> [2] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> [3] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
>
> On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> <yu...@streamnative.io.invalid> wrote:
> >
> > +1
> >
> > Hi, Bo :
> >
> > Thanks for your explanation. That makes sense to me.
> >
> > Thanks,
> > Yubiao Feng
> >
> > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
> >
> > > Hi, pulsar community:
> > >
> > > I started a PIP about `Client consumer filter received messages`.
> > >
> > > PIP: https://github.com/apache/pulsar/issues/19864
> > >
> > > Thanks,
> > > Bo
> > >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by Michael Marshall <mm...@apache.org>.
> if we add the new field in CommandSubscribe, we should ensure
> the synchronization between consumer reconnection and user
> calling receive and redeliverUnack method. it will affect the performance
> of receive. expose synchronization to hot paths it not a good idea.

I don't think this is a valid objection. I am pretty sure we already
synchronize in the relevant places in the consumer to solve the exact
race condition you're concerned about: [0] [1].

My proposed operation is to keep track of the latest message id that
the application has seen, and then tell the broker that id when
sending the Subscribe command. We already do similar logic here [2]
[3], but instead of getting the first message id the consumer hasn't
seen, we'll get the latest message id seen.

Regarding performance, the PIP doesn't touch on how it will filter out
messages. What is the planned approach? In my understanding, the
client will keep track of the latest message id that the application
has seen and then will need to compare that message id against every
new mess. As such, it seems like telling the broker where to start
instead of naively checking a filter on every message would be
cheaper.

> As described in Compatibility in PIP. Client consumer doesn't know
> Pulsar Admin reset cursor.

The problem of "the consumer doesn't know" seems like something that
is reasonably within the protocol's responsibilities. In this case, an
event happens on the broker, and the broker can tell the consumer.

> * <p>Consumers should close when the server resets the cursor,
> * when the cursor reset success, and then restart. Otherwise,
> * the consumer will not receive the history messages.

This is introducing a confusing edge case that requires reading a
Javadoc in order to understand. That seems risky to me, and I do not
think we should add such an edge case. A new protocol message would
easily handle it and make it transparent to the application.

Thanks,
Michael

[0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
[1] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
[2] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
[3] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960

On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
<yu...@streamnative.io.invalid> wrote:
>
> +1
>
> Hi, Bo :
>
> Thanks for your explanation. That makes sense to me.
>
> Thanks,
> Yubiao Feng
>
> On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
>
> > Hi, pulsar community:
> >
> > I started a PIP about `Client consumer filter received messages`.
> >
> > PIP: https://github.com/apache/pulsar/issues/19864
> >
> > Thanks,
> > Bo
> >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by Yubiao Feng <yu...@streamnative.io.INVALID>.
+1

Hi, Bo :

Thanks for your explanation. That makes sense to me.

Thanks,
Yubiao Feng

On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:

> Hi, pulsar community:
>
> I started a PIP about `Client consumer filter received messages`.
>
> PIP: https://github.com/apache/pulsar/issues/19864
>
> Thanks,
> Bo
>

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by 丛搏 <co...@gmail.com>.
Hi, Yubiao :

Yubiao Feng <yu...@streamnative.io.invalid> 于2023年3月20日周一 22:53写道:
>
> Hi Bo
>
> I think this is a good way to filter messages that the client has received.
>
> And I have two questions:
>
> 1. This is more powerful than the original way
> (`acknowledgmentsGroupingTracker.isDuplicate(msgId)) to filter out
> duplicated messages.
>  Is it possible to turn off the original de-replay logic to improve
> performance after enabling this new feature?
>
Good question! Yes, the original
(`acknowledgmentsGroupingTracker.isDuplicate(msgId)) works for
duplicated messages,
we can turn off the original de-replay logic to improve. in this PIP,
only consider the function, this improvement can be done later

> 2. There should be a typo in the article
>
> > ## Only support Consumer#redeliverUnacknowledgedMessages()
> > If we redeliver individual messages, they will be filtered. Because we
> can't clear the record latest message
> >in the consumer when redelivering individual messages. It will make this
> config unclear, and if every redeliver
> > method changes, it will bring a lot of redundant code, which is difficult
> to maintain. If there is a need in the
> > future, just support it.
>
> I suppose you want to say not support `redeliverUnacknowledgedMessages`,
> right?
we only support `redeliverUnacknowledgedMessages` to clear the record
in client consumer,
any other redeliver method doesn't support
>
>
> Thanks
> Yubiao Feng
>
> On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
>
> > Hi, pulsar community:
> >
> > I started a PIP about `Client consumer filter received messages`.
> >
> > PIP: https://github.com/apache/pulsar/issues/19864
> >
> > Thanks,
> > Bo
> >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by 丛搏 <co...@gmail.com>.
Hi, Xiangying

Xiangying Meng <xi...@apache.org> 于2023年3月20日周一 23:56写道:
>
> Hi Congbo,
> I think this is a great idea.
> This is more efficient in filtering duplicate messages for a single
> consumer.
> And maybe more details about implementation should be shown in the proposal.

we add the public interface, the code detail needs review in PR not int PIP

>
> Best regards,
> Xiangying
>
> On Mon, Mar 20, 2023 at 10:53 PM Yubiao Feng
> <yu...@streamnative.io.invalid> wrote:
>
> > Hi Bo
> >
> > I think this is a good way to filter messages that the client has received.
> >
> > And I have two questions:
> >
> > 1. This is more powerful than the original way
> > (`acknowledgmentsGroupingTracker.isDuplicate(msgId)) to filter out
> > duplicated messages.
> >  Is it possible to turn off the original de-replay logic to improve
> > performance after enabling this new feature?
> >
> > 2. There should be a typo in the article
> >
> > > ## Only support Consumer#redeliverUnacknowledgedMessages()
> > > If we redeliver individual messages, they will be filtered. Because we
> > can't clear the record latest message
> > >in the consumer when redelivering individual messages. It will make this
> > config unclear, and if every redeliver
> > > method changes, it will bring a lot of redundant code, which is difficult
> > to maintain. If there is a need in the
> > > future, just support it.
> >
> > I suppose you want to say not support `redeliverUnacknowledgedMessages`,
> > right?
> >
> >
> > Thanks
> > Yubiao Feng
> >
> > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
> >
> > > Hi, pulsar community:
> > >
> > > I started a PIP about `Client consumer filter received messages`.
> > >
> > > PIP: https://github.com/apache/pulsar/issues/19864
> > >
> > > Thanks,
> > > Bo
> > >
> >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by 丛搏 <co...@gmail.com>.
Hi, Aloys:

Yes, it will work with `consumer.seek()`.
Sorry, I missed this, I will add this description to the PIP.

But the current seek method has some problems, the detail in
https://lists.apache.org/thread/97o9t4ltkds5pfq41l9xbbd31t41qm8w,
I am not sure, does it make sense to support seek method in this PIP.

Thanks,
Bo

Aloys Zhang <al...@apache.org> 于2023年3月21日周二 19:08写道:
>
> Nice proposal.
>
> I'm interested in a point
> >  So when we need to reset the cursor, the client consumer should all be
> closed, and then reset the cursor then restart the consumer.
>
> Does this requirement apply to `consumer.seek`?
> Because in some scenarios, we need to create consumers first and then seek
> a position or timestamp.
>
>
> Yunze Xu <yz...@streamnative.io.invalid> 于2023年3月21日周二 17:19写道:
>
> > First, I agree with Yubiao that we can avoid calling the `isDuplicate`
> > method once this option is enabled.
> >
> > Then, I'm wondering in which case would users want to disable this
> > option? What's the disadvantage to disable the option? I think we can
> > just record the latest position (ledger id, entry id, batch index) of
> > the message received if the subscription type is Exclusive or
> > Failover.
> >
> > Is there any breaking change if we just apply this filter without
> > adding a configuration option?
> >
> > Thanks,
> > Yunze
> >
> > On Tue, Mar 21, 2023 at 2:26 PM 丛搏 <co...@gmail.com> wrote:
> > >
> > > Hi, Michael
> > >
> > > Michael Marshall <mm...@apache.org> 于2023年3月21日周二 13:03写道:
> > > >
> > > > This is a great problem to improve.
> > > >
> > > > What if we instead expand the CommandSubscribe [0] protocol message
> > > > with a new field to represent the client's desired read position? This
> > > > way, the client can tell the second broker where to start sending
> > > > messages, and there is no need to send the messages twice.
> > > >
> > > > I like the protocol expansion because it saves on unnecessary network
> > > > transfer in several places and because it will be more straightforward
> > > > for clients in other languages to implement.
> > > >
> > > > What do you think?
> > > if we add the new field in CommandSubscribe, we should ensure
> > > the synchronization between consumer reconnection and user
> > > calling receive and redeliverUnack method. it will affect the performance
> > > of receive. expose synchronization to hot paths it not a good idea.
> > > Although the message is re-delivered twice, I don't think it
> > > will cause too much performance loss.
> > >
> > > This filtering is rigorous, and there cannot be some race condition
> > problems
> > > because it involves transactions. I want it to be simple and efficient,
> > > and I don't want it to become complicated and difficult to maintain.
> > >
> > > Of course, if the failover and exclusive consumers are changed to pull
> > mode,
> > > I believe that the change protocol is a very good idea. But at present,
> > > there is obviously no sufficient reason to do so.
> > >
> > > Thanks,
> > > Bo
> > >
> > > >
> > > > Thanks,
> > > > Michael
> > > >
> > > > [0]
> > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-common/src/main/proto/PulsarApi.proto#L339-L400
> > > >
> > > >
> > > > On Mon, Mar 20, 2023 at 10:56 AM Xiangying Meng <xi...@apache.org>
> > wrote:
> > > > >
> > > > > Hi Congbo,
> > > > > I think this is a great idea.
> > > > > This is more efficient in filtering duplicate messages for a single
> > > > > consumer.
> > > > > And maybe more details about implementation should be shown in the
> > proposal.
> > > > >
> > > > > Best regards,
> > > > > Xiangying
> > > > >
> > > > > On Mon, Mar 20, 2023 at 10:53 PM Yubiao Feng
> > > > > <yu...@streamnative.io.invalid> wrote:
> > > > >
> > > > > > Hi Bo
> > > > > >
> > > > > > I think this is a good way to filter messages that the client has
> > received.
> > > > > >
> > > > > > And I have two questions:
> > > > > >
> > > > > > 1. This is more powerful than the original way
> > > > > > (`acknowledgmentsGroupingTracker.isDuplicate(msgId)) to filter out
> > > > > > duplicated messages.
> > > > > >  Is it possible to turn off the original de-replay logic to improve
> > > > > > performance after enabling this new feature?
> > > > > >
> > > > > > 2. There should be a typo in the article
> > > > > >
> > > > > > > ## Only support Consumer#redeliverUnacknowledgedMessages()
> > > > > > > If we redeliver individual messages, they will be filtered.
> > Because we
> > > > > > can't clear the record latest message
> > > > > > >in the consumer when redelivering individual messages. It will
> > make this
> > > > > > config unclear, and if every redeliver
> > > > > > > method changes, it will bring a lot of redundant code, which is
> > difficult
> > > > > > to maintain. If there is a need in the
> > > > > > > future, just support it.
> > > > > >
> > > > > > I suppose you want to say not support
> > `redeliverUnacknowledgedMessages`,
> > > > > > right?
> > > > > >
> > > > > >
> > > > > > Thanks
> > > > > > Yubiao Feng
> > > > > >
> > > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
> > > > > >
> > > > > > > Hi, pulsar community:
> > > > > > >
> > > > > > > I started a PIP about `Client consumer filter received messages`.
> > > > > > >
> > > > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Bo
> > > > > > >
> > > > > >
> >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by Aloys Zhang <al...@apache.org>.
Nice proposal.

I'm interested in a point
>  So when we need to reset the cursor, the client consumer should all be
closed, and then reset the cursor then restart the consumer.

Does this requirement apply to `consumer.seek`?
Because in some scenarios, we need to create consumers first and then seek
a position or timestamp.


Yunze Xu <yz...@streamnative.io.invalid> 于2023年3月21日周二 17:19写道:

> First, I agree with Yubiao that we can avoid calling the `isDuplicate`
> method once this option is enabled.
>
> Then, I'm wondering in which case would users want to disable this
> option? What's the disadvantage to disable the option? I think we can
> just record the latest position (ledger id, entry id, batch index) of
> the message received if the subscription type is Exclusive or
> Failover.
>
> Is there any breaking change if we just apply this filter without
> adding a configuration option?
>
> Thanks,
> Yunze
>
> On Tue, Mar 21, 2023 at 2:26 PM 丛搏 <co...@gmail.com> wrote:
> >
> > Hi, Michael
> >
> > Michael Marshall <mm...@apache.org> 于2023年3月21日周二 13:03写道:
> > >
> > > This is a great problem to improve.
> > >
> > > What if we instead expand the CommandSubscribe [0] protocol message
> > > with a new field to represent the client's desired read position? This
> > > way, the client can tell the second broker where to start sending
> > > messages, and there is no need to send the messages twice.
> > >
> > > I like the protocol expansion because it saves on unnecessary network
> > > transfer in several places and because it will be more straightforward
> > > for clients in other languages to implement.
> > >
> > > What do you think?
> > if we add the new field in CommandSubscribe, we should ensure
> > the synchronization between consumer reconnection and user
> > calling receive and redeliverUnack method. it will affect the performance
> > of receive. expose synchronization to hot paths it not a good idea.
> > Although the message is re-delivered twice, I don't think it
> > will cause too much performance loss.
> >
> > This filtering is rigorous, and there cannot be some race condition
> problems
> > because it involves transactions. I want it to be simple and efficient,
> > and I don't want it to become complicated and difficult to maintain.
> >
> > Of course, if the failover and exclusive consumers are changed to pull
> mode,
> > I believe that the change protocol is a very good idea. But at present,
> > there is obviously no sufficient reason to do so.
> >
> > Thanks,
> > Bo
> >
> > >
> > > Thanks,
> > > Michael
> > >
> > > [0]
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-common/src/main/proto/PulsarApi.proto#L339-L400
> > >
> > >
> > > On Mon, Mar 20, 2023 at 10:56 AM Xiangying Meng <xi...@apache.org>
> wrote:
> > > >
> > > > Hi Congbo,
> > > > I think this is a great idea.
> > > > This is more efficient in filtering duplicate messages for a single
> > > > consumer.
> > > > And maybe more details about implementation should be shown in the
> proposal.
> > > >
> > > > Best regards,
> > > > Xiangying
> > > >
> > > > On Mon, Mar 20, 2023 at 10:53 PM Yubiao Feng
> > > > <yu...@streamnative.io.invalid> wrote:
> > > >
> > > > > Hi Bo
> > > > >
> > > > > I think this is a good way to filter messages that the client has
> received.
> > > > >
> > > > > And I have two questions:
> > > > >
> > > > > 1. This is more powerful than the original way
> > > > > (`acknowledgmentsGroupingTracker.isDuplicate(msgId)) to filter out
> > > > > duplicated messages.
> > > > >  Is it possible to turn off the original de-replay logic to improve
> > > > > performance after enabling this new feature?
> > > > >
> > > > > 2. There should be a typo in the article
> > > > >
> > > > > > ## Only support Consumer#redeliverUnacknowledgedMessages()
> > > > > > If we redeliver individual messages, they will be filtered.
> Because we
> > > > > can't clear the record latest message
> > > > > >in the consumer when redelivering individual messages. It will
> make this
> > > > > config unclear, and if every redeliver
> > > > > > method changes, it will bring a lot of redundant code, which is
> difficult
> > > > > to maintain. If there is a need in the
> > > > > > future, just support it.
> > > > >
> > > > > I suppose you want to say not support
> `redeliverUnacknowledgedMessages`,
> > > > > right?
> > > > >
> > > > >
> > > > > Thanks
> > > > > Yubiao Feng
> > > > >
> > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
> > > > >
> > > > > > Hi, pulsar community:
> > > > > >
> > > > > > I started a PIP about `Client consumer filter received messages`.
> > > > > >
> > > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > > >
> > > > > > Thanks,
> > > > > > Bo
> > > > > >
> > > > >
>

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by 丛搏 <co...@gmail.com>.
Hi, Yunze:

Yunze Xu <yz...@streamnative.io.invalid> 于2023年3月21日周二 17:19写道:
>
>
> Is there any breaking change if we just apply this filter without
> adding a configuration option?

If not add this configuration, the Pulsar Admin reset cursor will cause
the wrong behavior. It will filter the messages which have been reset.

As described in Compatibility in PIP. Client consumer doesn't know
Pulsar Admin reset cursor.
>
> Thanks,
> Yunze
>
> On Tue, Mar 21, 2023 at 2:26 PM 丛搏 <co...@gmail.com> wrote:
> >
> > Hi, Michael
> >
> > Michael Marshall <mm...@apache.org> 于2023年3月21日周二 13:03写道:
> > >
> > > This is a great problem to improve.
> > >
> > > What if we instead expand the CommandSubscribe [0] protocol message
> > > with a new field to represent the client's desired read position? This
> > > way, the client can tell the second broker where to start sending
> > > messages, and there is no need to send the messages twice.
> > >
> > > I like the protocol expansion because it saves on unnecessary network
> > > transfer in several places and because it will be more straightforward
> > > for clients in other languages to implement.
> > >
> > > What do you think?
> > if we add the new field in CommandSubscribe, we should ensure
> > the synchronization between consumer reconnection and user
> > calling receive and redeliverUnack method. it will affect the performance
> > of receive. expose synchronization to hot paths it not a good idea.
> > Although the message is re-delivered twice, I don't think it
> > will cause too much performance loss.
> >
> > This filtering is rigorous, and there cannot be some race condition problems
> > because it involves transactions. I want it to be simple and efficient,
> > and I don't want it to become complicated and difficult to maintain.
> >
> > Of course, if the failover and exclusive consumers are changed to pull mode,
> > I believe that the change protocol is a very good idea. But at present,
> > there is obviously no sufficient reason to do so.
> >
> > Thanks,
> > Bo
> >
> > >
> > > Thanks,
> > > Michael
> > >
> > > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-common/src/main/proto/PulsarApi.proto#L339-L400
> > >
> > >
> > > On Mon, Mar 20, 2023 at 10:56 AM Xiangying Meng <xi...@apache.org> wrote:
> > > >
> > > > Hi Congbo,
> > > > I think this is a great idea.
> > > > This is more efficient in filtering duplicate messages for a single
> > > > consumer.
> > > > And maybe more details about implementation should be shown in the proposal.
> > > >
> > > > Best regards,
> > > > Xiangying
> > > >
> > > > On Mon, Mar 20, 2023 at 10:53 PM Yubiao Feng
> > > > <yu...@streamnative.io.invalid> wrote:
> > > >
> > > > > Hi Bo
> > > > >
> > > > > I think this is a good way to filter messages that the client has received.
> > > > >
> > > > > And I have two questions:
> > > > >
> > > > > 1. This is more powerful than the original way
> > > > > (`acknowledgmentsGroupingTracker.isDuplicate(msgId)) to filter out
> > > > > duplicated messages.
> > > > >  Is it possible to turn off the original de-replay logic to improve
> > > > > performance after enabling this new feature?
> > > > >
> > > > > 2. There should be a typo in the article
> > > > >
> > > > > > ## Only support Consumer#redeliverUnacknowledgedMessages()
> > > > > > If we redeliver individual messages, they will be filtered. Because we
> > > > > can't clear the record latest message
> > > > > >in the consumer when redelivering individual messages. It will make this
> > > > > config unclear, and if every redeliver
> > > > > > method changes, it will bring a lot of redundant code, which is difficult
> > > > > to maintain. If there is a need in the
> > > > > > future, just support it.
> > > > >
> > > > > I suppose you want to say not support `redeliverUnacknowledgedMessages`,
> > > > > right?
> > > > >
> > > > >
> > > > > Thanks
> > > > > Yubiao Feng
> > > > >
> > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
> > > > >
> > > > > > Hi, pulsar community:
> > > > > >
> > > > > > I started a PIP about `Client consumer filter received messages`.
> > > > > >
> > > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > > >
> > > > > > Thanks,
> > > > > > Bo
> > > > > >
> > > > >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by Yunze Xu <yz...@streamnative.io.INVALID>.
First, I agree with Yubiao that we can avoid calling the `isDuplicate`
method once this option is enabled.

Then, I'm wondering in which case would users want to disable this
option? What's the disadvantage to disable the option? I think we can
just record the latest position (ledger id, entry id, batch index) of
the message received if the subscription type is Exclusive or
Failover.

Is there any breaking change if we just apply this filter without
adding a configuration option?

Thanks,
Yunze

On Tue, Mar 21, 2023 at 2:26 PM 丛搏 <co...@gmail.com> wrote:
>
> Hi, Michael
>
> Michael Marshall <mm...@apache.org> 于2023年3月21日周二 13:03写道:
> >
> > This is a great problem to improve.
> >
> > What if we instead expand the CommandSubscribe [0] protocol message
> > with a new field to represent the client's desired read position? This
> > way, the client can tell the second broker where to start sending
> > messages, and there is no need to send the messages twice.
> >
> > I like the protocol expansion because it saves on unnecessary network
> > transfer in several places and because it will be more straightforward
> > for clients in other languages to implement.
> >
> > What do you think?
> if we add the new field in CommandSubscribe, we should ensure
> the synchronization between consumer reconnection and user
> calling receive and redeliverUnack method. it will affect the performance
> of receive. expose synchronization to hot paths it not a good idea.
> Although the message is re-delivered twice, I don't think it
> will cause too much performance loss.
>
> This filtering is rigorous, and there cannot be some race condition problems
> because it involves transactions. I want it to be simple and efficient,
> and I don't want it to become complicated and difficult to maintain.
>
> Of course, if the failover and exclusive consumers are changed to pull mode,
> I believe that the change protocol is a very good idea. But at present,
> there is obviously no sufficient reason to do so.
>
> Thanks,
> Bo
>
> >
> > Thanks,
> > Michael
> >
> > [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-common/src/main/proto/PulsarApi.proto#L339-L400
> >
> >
> > On Mon, Mar 20, 2023 at 10:56 AM Xiangying Meng <xi...@apache.org> wrote:
> > >
> > > Hi Congbo,
> > > I think this is a great idea.
> > > This is more efficient in filtering duplicate messages for a single
> > > consumer.
> > > And maybe more details about implementation should be shown in the proposal.
> > >
> > > Best regards,
> > > Xiangying
> > >
> > > On Mon, Mar 20, 2023 at 10:53 PM Yubiao Feng
> > > <yu...@streamnative.io.invalid> wrote:
> > >
> > > > Hi Bo
> > > >
> > > > I think this is a good way to filter messages that the client has received.
> > > >
> > > > And I have two questions:
> > > >
> > > > 1. This is more powerful than the original way
> > > > (`acknowledgmentsGroupingTracker.isDuplicate(msgId)) to filter out
> > > > duplicated messages.
> > > >  Is it possible to turn off the original de-replay logic to improve
> > > > performance after enabling this new feature?
> > > >
> > > > 2. There should be a typo in the article
> > > >
> > > > > ## Only support Consumer#redeliverUnacknowledgedMessages()
> > > > > If we redeliver individual messages, they will be filtered. Because we
> > > > can't clear the record latest message
> > > > >in the consumer when redelivering individual messages. It will make this
> > > > config unclear, and if every redeliver
> > > > > method changes, it will bring a lot of redundant code, which is difficult
> > > > to maintain. If there is a need in the
> > > > > future, just support it.
> > > >
> > > > I suppose you want to say not support `redeliverUnacknowledgedMessages`,
> > > > right?
> > > >
> > > >
> > > > Thanks
> > > > Yubiao Feng
> > > >
> > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
> > > >
> > > > > Hi, pulsar community:
> > > > >
> > > > > I started a PIP about `Client consumer filter received messages`.
> > > > >
> > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > >
> > > > > Thanks,
> > > > > Bo
> > > > >
> > > >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by 丛搏 <co...@gmail.com>.
Hi, Michael

Michael Marshall <mm...@apache.org> 于2023年3月21日周二 13:03写道:
>
> This is a great problem to improve.
>
> What if we instead expand the CommandSubscribe [0] protocol message
> with a new field to represent the client's desired read position? This
> way, the client can tell the second broker where to start sending
> messages, and there is no need to send the messages twice.
>
> I like the protocol expansion because it saves on unnecessary network
> transfer in several places and because it will be more straightforward
> for clients in other languages to implement.
>
> What do you think?
if we add the new field in CommandSubscribe, we should ensure
the synchronization between consumer reconnection and user
calling receive and redeliverUnack method. it will affect the performance
of receive. expose synchronization to hot paths it not a good idea.
Although the message is re-delivered twice, I don't think it
will cause too much performance loss.

This filtering is rigorous, and there cannot be some race condition problems
because it involves transactions. I want it to be simple and efficient,
and I don't want it to become complicated and difficult to maintain.

Of course, if the failover and exclusive consumers are changed to pull mode,
I believe that the change protocol is a very good idea. But at present,
there is obviously no sufficient reason to do so.

Thanks,
Bo

>
> Thanks,
> Michael
>
> [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-common/src/main/proto/PulsarApi.proto#L339-L400
>
>
> On Mon, Mar 20, 2023 at 10:56 AM Xiangying Meng <xi...@apache.org> wrote:
> >
> > Hi Congbo,
> > I think this is a great idea.
> > This is more efficient in filtering duplicate messages for a single
> > consumer.
> > And maybe more details about implementation should be shown in the proposal.
> >
> > Best regards,
> > Xiangying
> >
> > On Mon, Mar 20, 2023 at 10:53 PM Yubiao Feng
> > <yu...@streamnative.io.invalid> wrote:
> >
> > > Hi Bo
> > >
> > > I think this is a good way to filter messages that the client has received.
> > >
> > > And I have two questions:
> > >
> > > 1. This is more powerful than the original way
> > > (`acknowledgmentsGroupingTracker.isDuplicate(msgId)) to filter out
> > > duplicated messages.
> > >  Is it possible to turn off the original de-replay logic to improve
> > > performance after enabling this new feature?
> > >
> > > 2. There should be a typo in the article
> > >
> > > > ## Only support Consumer#redeliverUnacknowledgedMessages()
> > > > If we redeliver individual messages, they will be filtered. Because we
> > > can't clear the record latest message
> > > >in the consumer when redelivering individual messages. It will make this
> > > config unclear, and if every redeliver
> > > > method changes, it will bring a lot of redundant code, which is difficult
> > > to maintain. If there is a need in the
> > > > future, just support it.
> > >
> > > I suppose you want to say not support `redeliverUnacknowledgedMessages`,
> > > right?
> > >
> > >
> > > Thanks
> > > Yubiao Feng
> > >
> > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
> > >
> > > > Hi, pulsar community:
> > > >
> > > > I started a PIP about `Client consumer filter received messages`.
> > > >
> > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > >
> > > > Thanks,
> > > > Bo
> > > >
> > >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by Michael Marshall <mm...@apache.org>.
This is a great problem to improve.

What if we instead expand the CommandSubscribe [0] protocol message
with a new field to represent the client's desired read position? This
way, the client can tell the second broker where to start sending
messages, and there is no need to send the messages twice.

I like the protocol expansion because it saves on unnecessary network
transfer in several places and because it will be more straightforward
for clients in other languages to implement.

What do you think?

Thanks,
Michael

[0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-common/src/main/proto/PulsarApi.proto#L339-L400


On Mon, Mar 20, 2023 at 10:56 AM Xiangying Meng <xi...@apache.org> wrote:
>
> Hi Congbo,
> I think this is a great idea.
> This is more efficient in filtering duplicate messages for a single
> consumer.
> And maybe more details about implementation should be shown in the proposal.
>
> Best regards,
> Xiangying
>
> On Mon, Mar 20, 2023 at 10:53 PM Yubiao Feng
> <yu...@streamnative.io.invalid> wrote:
>
> > Hi Bo
> >
> > I think this is a good way to filter messages that the client has received.
> >
> > And I have two questions:
> >
> > 1. This is more powerful than the original way
> > (`acknowledgmentsGroupingTracker.isDuplicate(msgId)) to filter out
> > duplicated messages.
> >  Is it possible to turn off the original de-replay logic to improve
> > performance after enabling this new feature?
> >
> > 2. There should be a typo in the article
> >
> > > ## Only support Consumer#redeliverUnacknowledgedMessages()
> > > If we redeliver individual messages, they will be filtered. Because we
> > can't clear the record latest message
> > >in the consumer when redelivering individual messages. It will make this
> > config unclear, and if every redeliver
> > > method changes, it will bring a lot of redundant code, which is difficult
> > to maintain. If there is a need in the
> > > future, just support it.
> >
> > I suppose you want to say not support `redeliverUnacknowledgedMessages`,
> > right?
> >
> >
> > Thanks
> > Yubiao Feng
> >
> > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
> >
> > > Hi, pulsar community:
> > >
> > > I started a PIP about `Client consumer filter received messages`.
> > >
> > > PIP: https://github.com/apache/pulsar/issues/19864
> > >
> > > Thanks,
> > > Bo
> > >
> >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by Xiangying Meng <xi...@apache.org>.
Hi Congbo,
I think this is a great idea.
This is more efficient in filtering duplicate messages for a single
consumer.
And maybe more details about implementation should be shown in the proposal.

Best regards,
Xiangying

On Mon, Mar 20, 2023 at 10:53 PM Yubiao Feng
<yu...@streamnative.io.invalid> wrote:

> Hi Bo
>
> I think this is a good way to filter messages that the client has received.
>
> And I have two questions:
>
> 1. This is more powerful than the original way
> (`acknowledgmentsGroupingTracker.isDuplicate(msgId)) to filter out
> duplicated messages.
>  Is it possible to turn off the original de-replay logic to improve
> performance after enabling this new feature?
>
> 2. There should be a typo in the article
>
> > ## Only support Consumer#redeliverUnacknowledgedMessages()
> > If we redeliver individual messages, they will be filtered. Because we
> can't clear the record latest message
> >in the consumer when redelivering individual messages. It will make this
> config unclear, and if every redeliver
> > method changes, it will bring a lot of redundant code, which is difficult
> to maintain. If there is a need in the
> > future, just support it.
>
> I suppose you want to say not support `redeliverUnacknowledgedMessages`,
> right?
>
>
> Thanks
> Yubiao Feng
>
> On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:
>
> > Hi, pulsar community:
> >
> > I started a PIP about `Client consumer filter received messages`.
> >
> > PIP: https://github.com/apache/pulsar/issues/19864
> >
> > Thanks,
> > Bo
> >
>

Re: [DISCUSS] PIP-260: Client consumer filter received messages

Posted by Yubiao Feng <yu...@streamnative.io.INVALID>.
Hi Bo

I think this is a good way to filter messages that the client has received.

And I have two questions:

1. This is more powerful than the original way
(`acknowledgmentsGroupingTracker.isDuplicate(msgId)) to filter out
duplicated messages.
 Is it possible to turn off the original de-replay logic to improve
performance after enabling this new feature?

2. There should be a typo in the article

> ## Only support Consumer#redeliverUnacknowledgedMessages()
> If we redeliver individual messages, they will be filtered. Because we
can't clear the record latest message
>in the consumer when redelivering individual messages. It will make this
config unclear, and if every redeliver
> method changes, it will bring a lot of redundant code, which is difficult
to maintain. If there is a need in the
> future, just support it.

I suppose you want to say not support `redeliverUnacknowledgedMessages`,
right?


Thanks
Yubiao Feng

On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <co...@gmail.com> wrote:

> Hi, pulsar community:
>
> I started a PIP about `Client consumer filter received messages`.
>
> PIP: https://github.com/apache/pulsar/issues/19864
>
> Thanks,
> Bo
>