You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by ZJ H <sh...@gmail.com> on 2021/10/25 11:24:48 UTC

[DISCUSSION] PIP-108: Add method to help user judge if consumer queue has message

https://github.com/apache/pulsar/issues/12479

--- Pasted here for quoting convenience ---

## Motivation
Currently, I have an application that manages ten thousand of consumers,
and a logic to schedule consumers's receive. It would be helpful to know if
one of the consumers have message to recive.

## Goal
To make `Consumer` can judge if there are unreceiving messages

## API Changes

Add `hasMessageInReceiverQueue` on the `Consumer` interface.

## Implementation

For `ZeroQueueConsumerImpl` return false, Others, judge the
`receiveQueueSize` greater than zero.


## Reject Alternatives

No alternatives yet.

Re: [DISCUSSION] PIP-108: Add method to help user judge if consumer queue has message

Posted by PengHui Li <pe...@apache.org>.
LGTM +1

Thanks,
Penghui

On Mon, Oct 25, 2021 at 7:25 PM ZJ H <sh...@gmail.com> wrote:

> https://github.com/apache/pulsar/issues/12479
>
> --- Pasted here for quoting convenience ---
>
> ## Motivation
> Currently, I have an application that manages ten thousand of consumers,
> and a logic to schedule consumers's receive. It would be helpful to know if
> one of the consumers have message to recive.
>
> ## Goal
> To make `Consumer` can judge if there are unreceiving messages
>
> ## API Changes
>
> Add `hasMessageInReceiverQueue` on the `Consumer` interface.
>
> ## Implementation
>
> For `ZeroQueueConsumerImpl` return false, Others, judge the
> `receiveQueueSize` greater than zero.
>
>
> ## Reject Alternatives
>
> No alternatives yet.
>

Re: [DISCUSSION] PIP-108: Add method to help user judge if consumer queue has message

Posted by PengHui Li <pe...@apache.org>.
Thanks for the explanation, hasLocalMessages looks good.

Penghui

On Mon, Nov 8, 2021 at 10:39 PM Enrico Olivelli <eo...@gmail.com> wrote:

> Il giorno lun 8 nov 2021 alle ore 15:01 ZhangJian He <sh...@gmail.com>
> ha scritto:
>
> > Hello, matteo, About my PIP 108, the reason I want to have method
> > `hasMessageInReceiverQueue` is that
> > We need to control consumer at different rate. For example, consumer A
> > 10msg/s, consumer B 100 msg/s
> > so I can't use the `listener` mode.
> > the `receive(0)` method will remove the message from queue. currently my
> > work flow is 1) check if has message
> > 2) apply for flow quota 3) receive messages. I can not put the mssage
> back
> > to the queue, can't use the `receive(0)`.
> > And `apply for flow quota` is a costly action.
> > And discussed with PengHui Li and Hang Chen, we think `hasLocalMessages`
> is
> > better than hasMessageInReceiverQueue
> >
>
> "hasLocalMessages" works for me
>
> Enrico
>
>
>
> >
> > ZhangJian He <sh...@gmail.com> 于2021年10月27日周三 下午7:47写道:
> >
> > > Some users want to use this api to judge if there's messages to
> receive,
> > > like that pseudo code:
> > > if (consumer.hasMessage()) {
> > >    .submit(() -> {
> > >       consumer.pollMessagesAccordingToTheDistributedFlowControl()
> > >    })
> > > }
> > >
> > > don't want to remove the message from queue.
> > >
> > > PengHui Li <pe...@apache.org> 于2021年10月27日周三 下午7:43写道:
> > >
> > >> @ZhangJian He, as Matteo mentioned, Use `consumer.receive(0,
> > >> TimeUnit.SECONDS)` can achieve the same purpose for checking if there
> > are
> > >> messages in the local cache.
> > >>
> > >> Thanks
> > >> Penghui
> > >>
> > >> On Tue, Oct 26, 2021 at 2:35 PM ZhangJian He <sh...@gmail.com>
> > wrote:
> > >>
> > >> > If some users need the message content to do user-defined actions,
> we
> > >> need
> > >> > to ensure the user can't use the `peekMessage` to do things like ack
> > >> > because the message are still in the blockingQueue, return just a
> > >> > content-copy?
> > >> >
> > >> > introduced `localBuffer` might be good ? `hasMessagesInLocalBuffer`
> > >> >
> > >> > JiangHaiting <ji...@foxmail.com> 于2021年10月26日周二 下午2:20写道:
> > >> >
> > >> > > I'm totally +1 for the feature to check if we can get
> > >> > > message&nbsp;immediately from consumer, this is to say we have
> > message
> > >> > > locally.
> > >> > >
> > >> > >
> > >> > > In my understanding, it's useful to implement some user-defined
> > order
> > >> to
> > >> > > consume messages among different topics, in your case, the
> > >> "distributed
> > >> > > flow control ability".
> > >> > > But in the past few years, I've met some users have defined the
> > >> consume
> > >> > > order of different topics by part of the message content, like
> > >> > > some&nbsp;critical property value.&nbsp;
> > >> > > In these situations, a `peek` method is more suitable.
> > >> > >
> > >> > >
> > >> > > Further more, peek is not effectively equals to
> `consumer.receive(0,
> > >> > > TimeUnit.SECONDS)`. As you will have to store the message
> somewhere
> > >> else
> > >> > if
> > >> > > you find that it's not the most priority message to process.
> > >> > >
> > >> > >
> > >> > > One last thing, put the concept of "receiverQueue" in the api of
> > >> > consumer,
> > >> > > seems a little bit strange, IMHO.
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > > ------------------&nbsp;Original&nbsp;------------------
> > >> > > From:
> > >> > >                                                   "dev"
> > >> > >                                                                 <
> > >> > > shoothzj@gmail.com&gt;;
> > >> > > Date:&nbsp;Tue, Oct 26, 2021 12:54 PM
> > >> > > To:&nbsp;"dev"<dev@pulsar.apache.org&gt;;
> > >> > >
> > >> > > Subject:&nbsp;Re: [DISCUSSION] PIP-108: Add method to help user
> > judge
> > >> if
> > >> > > consumer queue has message
> > >> > >
> > >> > >
> > >> > >
> > >> > > 3. Our solution implements the distributed flow control ability at
> > >> client
> > >> > > side, so we don't use the listener way.
> > >> > > 2. Per customer per consumer in different tenants and namespace,
> and
> > >> the
> > >> > > `flow-control` need(Some of our customer's machines can't work on
> > high
> > >> > > traffic), So `Multi-topic` can't use.
> > >> > > 1. We want to use this api to judge if there's messages to
> receive,
> > >> like
> > >> > > that pseudo code
> > >> > > if (consumer.hasMessage()) {
> > >> > >  .submit(() -&gt; {
> > >> > >  consumer.pollMessagesAccordingToTheDistributedFlowControl()
> > >> > >  })
> > >> > > }
> > >> > >
> > >> > > Matteo Merli <matteo.merli@gmail.com&gt; 于2021年10月26日周二
> 下午12:15写道:
> > >> > >
> > >> > > &gt; I'm a bit hesitant about this because I think there are
> already
> > >> at
> > >> > > &gt; least 3 different ways to handle similar scenarios.
> > >> > > &gt;
> > >> > > &gt;&nbsp; 1. Using listener and avoid calling receive directly
> > >> > > &gt;&nbsp; 2. Use multi-topic consumer, so there's a single
> > `Consumer`
> > >> > > instance
> > >> > > &gt; exposed
> > >> > > &gt;&nbsp; 3. Use `consumer.receive(0, TimeUnit.SECONDS)` to probe
> > for
> > >> > > message
> > >> > > &gt;
> > >> > > &gt;
> > >> > > &gt; --
> > >> > > &gt; Matteo Merli
> > >> > > &gt; <matteo.merli@gmail.com&gt;
> > >> > > &gt;
> > >> > > &gt; On Mon, Oct 25, 2021 at 7:34 PM ZhangJian He <
> > shoothzj@gmail.com
> > >> > &gt;
> > >> > > wrote:
> > >> > > &gt; &gt;
> > >> > > &gt; &gt; I think it's better to add the method to Consumer
> > interface
> > >> > > instead of
> > >> > > &gt; let
> > >> > > &gt; &gt; user casting it to `ConsumerBase`.
> > >> > > &gt; &gt; `peek` is most complexly,&nbsp; for the reason, I can
> use
> > >> the
> > >> > > `peek` object to
> > >> > > &gt; &gt; ack、negative ack, but when to remove from the
> > >> `BlockingQueue`?
> > >> > > &gt; &gt; IMHO, people use this api are just to judge if has the
> > >> message,
> > >> > > &gt; otherwise,
> > >> > > &gt; &gt; they can just use `receive(0,TimeUnit)
> > >> > > &gt; &gt;
> > >> > > &gt; &gt; JiangHaiting <jianghaiting@foxmail.com&gt;
> 于2021年10月26日周二
> > >> > > 上午10:19写道:
> > >> > > &gt; &gt;
> > >> > > &gt; &gt; &gt; Can this method
> > >> > > &gt; &gt; &gt;
> > >> > >
> > "org.apache.pulsar.client.impl.ConsumerBase#getTotalIncomingMessages"
> > >> > > &gt; do
> > >> > > &gt; &gt; &gt; the trick? Though you have to change the type to
> > >> > > ConsumerBase.
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; And maybe `peek` is more suitable and useful to add
> > to
> > >> the
> > >> > > Consumer
> > >> > > &gt; &gt; &gt; interface?
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > ------------------&amp;nbsp;Original&amp;nbsp;------------------
> > >> > > &gt; &gt; &gt; From:
> > >> > > &gt; &gt;
> > >> > >
> > >> >
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >> > > "dev"
> > >> > > &gt; &gt;
> > >> > >
> > >> >
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >> > > <
> > >> > > &gt; &gt; &gt; shoothzj@gmail.com&amp;gt;;
> > >> > > &gt; &gt; &gt; Date:&amp;nbsp;Mon, Oct 25, 2021 07:24 PM
> > >> > > &gt; &gt; &gt; To:&amp;nbsp;"dev"<dev@pulsar.apache.org&amp;gt;;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; Subject:&amp;nbsp;[DISCUSSION] PIP-108: Add method
> to
> > >> help
> > >> > > user judge if
> > >> > > &gt; &gt; &gt; consumer queue has message
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; https://github.com/apache/pulsar/issues/12479
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; --- Pasted here for quoting convenience ---
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; ## Motivation
> > >> > > &gt; &gt; &gt; Currently, I have an application that manages ten
> > >> thousand
> > >> > > of
> > >> > > &gt; consumers,
> > >> > > &gt; &gt; &gt; and a logic to schedule consumers's receive. It
> would
> > >> be
> > >> > > helpful to
> > >> > > &gt; know if
> > >> > > &gt; &gt; &gt; one of the consumers have message to recive.
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; ## Goal
> > >> > > &gt; &gt; &gt; To make `Consumer` can judge if there are
> unreceiving
> > >> > > messages
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; ## API Changes
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; Add `hasMessageInReceiverQueue` on the `Consumer`
> > >> > interface.
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; ## Implementation
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; For `ZeroQueueConsumerImpl` return false, Others,
> > judge
> > >> > the
> > >> > > &gt; &gt; &gt; `receiveQueueSize` greater than zero.
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; ## Reject Alternatives
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; No alternatives yet.
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; ---
> > >> > > &gt; &gt; &gt; Thanks,
> > >> > > &gt; &gt; &gt; Haiting Jiang (Github: Jason918)
> > >> > > &gt;
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > > ---
> > >> > > Thanks,
> > >> > > Haiting Jiang (Github: Jason918)
> > >> >
> > >>
> > >
> >
>

Re: [DISCUSSION] PIP-108: Add method to help user judge if consumer queue has message

Posted by Enrico Olivelli <eo...@gmail.com>.
Il giorno lun 8 nov 2021 alle ore 15:01 ZhangJian He <sh...@gmail.com>
ha scritto:

> Hello, matteo, About my PIP 108, the reason I want to have method
> `hasMessageInReceiverQueue` is that
> We need to control consumer at different rate. For example, consumer A
> 10msg/s, consumer B 100 msg/s
> so I can't use the `listener` mode.
> the `receive(0)` method will remove the message from queue. currently my
> work flow is 1) check if has message
> 2) apply for flow quota 3) receive messages. I can not put the mssage back
> to the queue, can't use the `receive(0)`.
> And `apply for flow quota` is a costly action.
> And discussed with PengHui Li and Hang Chen, we think `hasLocalMessages` is
> better than hasMessageInReceiverQueue
>

"hasLocalMessages" works for me

Enrico



>
> ZhangJian He <sh...@gmail.com> 于2021年10月27日周三 下午7:47写道:
>
> > Some users want to use this api to judge if there's messages to receive,
> > like that pseudo code:
> > if (consumer.hasMessage()) {
> >    .submit(() -> {
> >       consumer.pollMessagesAccordingToTheDistributedFlowControl()
> >    })
> > }
> >
> > don't want to remove the message from queue.
> >
> > PengHui Li <pe...@apache.org> 于2021年10月27日周三 下午7:43写道:
> >
> >> @ZhangJian He, as Matteo mentioned, Use `consumer.receive(0,
> >> TimeUnit.SECONDS)` can achieve the same purpose for checking if there
> are
> >> messages in the local cache.
> >>
> >> Thanks
> >> Penghui
> >>
> >> On Tue, Oct 26, 2021 at 2:35 PM ZhangJian He <sh...@gmail.com>
> wrote:
> >>
> >> > If some users need the message content to do user-defined actions, we
> >> need
> >> > to ensure the user can't use the `peekMessage` to do things like ack
> >> > because the message are still in the blockingQueue, return just a
> >> > content-copy?
> >> >
> >> > introduced `localBuffer` might be good ? `hasMessagesInLocalBuffer`
> >> >
> >> > JiangHaiting <ji...@foxmail.com> 于2021年10月26日周二 下午2:20写道:
> >> >
> >> > > I'm totally +1 for the feature to check if we can get
> >> > > message&nbsp;immediately from consumer, this is to say we have
> message
> >> > > locally.
> >> > >
> >> > >
> >> > > In my understanding, it's useful to implement some user-defined
> order
> >> to
> >> > > consume messages among different topics, in your case, the
> >> "distributed
> >> > > flow control ability".
> >> > > But in the past few years, I've met some users have defined the
> >> consume
> >> > > order of different topics by part of the message content, like
> >> > > some&nbsp;critical property value.&nbsp;
> >> > > In these situations, a `peek` method is more suitable.
> >> > >
> >> > >
> >> > > Further more, peek is not effectively equals to `consumer.receive(0,
> >> > > TimeUnit.SECONDS)`. As you will have to store the message somewhere
> >> else
> >> > if
> >> > > you find that it's not the most priority message to process.
> >> > >
> >> > >
> >> > > One last thing, put the concept of "receiverQueue" in the api of
> >> > consumer,
> >> > > seems a little bit strange, IMHO.
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > ------------------&nbsp;Original&nbsp;------------------
> >> > > From:
> >> > >                                                   "dev"
> >> > >                                                                 <
> >> > > shoothzj@gmail.com&gt;;
> >> > > Date:&nbsp;Tue, Oct 26, 2021 12:54 PM
> >> > > To:&nbsp;"dev"<dev@pulsar.apache.org&gt;;
> >> > >
> >> > > Subject:&nbsp;Re: [DISCUSSION] PIP-108: Add method to help user
> judge
> >> if
> >> > > consumer queue has message
> >> > >
> >> > >
> >> > >
> >> > > 3. Our solution implements the distributed flow control ability at
> >> client
> >> > > side, so we don't use the listener way.
> >> > > 2. Per customer per consumer in different tenants and namespace, and
> >> the
> >> > > `flow-control` need(Some of our customer's machines can't work on
> high
> >> > > traffic), So `Multi-topic` can't use.
> >> > > 1. We want to use this api to judge if there's messages to receive,
> >> like
> >> > > that pseudo code
> >> > > if (consumer.hasMessage()) {
> >> > >  .submit(() -&gt; {
> >> > >  consumer.pollMessagesAccordingToTheDistributedFlowControl()
> >> > >  })
> >> > > }
> >> > >
> >> > > Matteo Merli <matteo.merli@gmail.com&gt; 于2021年10月26日周二 下午12:15写道:
> >> > >
> >> > > &gt; I'm a bit hesitant about this because I think there are already
> >> at
> >> > > &gt; least 3 different ways to handle similar scenarios.
> >> > > &gt;
> >> > > &gt;&nbsp; 1. Using listener and avoid calling receive directly
> >> > > &gt;&nbsp; 2. Use multi-topic consumer, so there's a single
> `Consumer`
> >> > > instance
> >> > > &gt; exposed
> >> > > &gt;&nbsp; 3. Use `consumer.receive(0, TimeUnit.SECONDS)` to probe
> for
> >> > > message
> >> > > &gt;
> >> > > &gt;
> >> > > &gt; --
> >> > > &gt; Matteo Merli
> >> > > &gt; <matteo.merli@gmail.com&gt;
> >> > > &gt;
> >> > > &gt; On Mon, Oct 25, 2021 at 7:34 PM ZhangJian He <
> shoothzj@gmail.com
> >> > &gt;
> >> > > wrote:
> >> > > &gt; &gt;
> >> > > &gt; &gt; I think it's better to add the method to Consumer
> interface
> >> > > instead of
> >> > > &gt; let
> >> > > &gt; &gt; user casting it to `ConsumerBase`.
> >> > > &gt; &gt; `peek` is most complexly,&nbsp; for the reason, I can use
> >> the
> >> > > `peek` object to
> >> > > &gt; &gt; ack、negative ack, but when to remove from the
> >> `BlockingQueue`?
> >> > > &gt; &gt; IMHO, people use this api are just to judge if has the
> >> message,
> >> > > &gt; otherwise,
> >> > > &gt; &gt; they can just use `receive(0,TimeUnit)
> >> > > &gt; &gt;
> >> > > &gt; &gt; JiangHaiting <jianghaiting@foxmail.com&gt; 于2021年10月26日周二
> >> > > 上午10:19写道:
> >> > > &gt; &gt;
> >> > > &gt; &gt; &gt; Can this method
> >> > > &gt; &gt; &gt;
> >> > >
> "org.apache.pulsar.client.impl.ConsumerBase#getTotalIncomingMessages"
> >> > > &gt; do
> >> > > &gt; &gt; &gt; the trick? Though you have to change the type to
> >> > > ConsumerBase.
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; And maybe `peek` is more suitable and useful to add
> to
> >> the
> >> > > Consumer
> >> > > &gt; &gt; &gt; interface?
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > ------------------&amp;nbsp;Original&amp;nbsp;------------------
> >> > > &gt; &gt; &gt; From:
> >> > > &gt; &gt;
> >> > >
> >> >
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >> > > "dev"
> >> > > &gt; &gt;
> >> > >
> >> >
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >> > > <
> >> > > &gt; &gt; &gt; shoothzj@gmail.com&amp;gt;;
> >> > > &gt; &gt; &gt; Date:&amp;nbsp;Mon, Oct 25, 2021 07:24 PM
> >> > > &gt; &gt; &gt; To:&amp;nbsp;"dev"<dev@pulsar.apache.org&amp;gt;;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; Subject:&amp;nbsp;[DISCUSSION] PIP-108: Add method to
> >> help
> >> > > user judge if
> >> > > &gt; &gt; &gt; consumer queue has message
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; https://github.com/apache/pulsar/issues/12479
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; --- Pasted here for quoting convenience ---
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; ## Motivation
> >> > > &gt; &gt; &gt; Currently, I have an application that manages ten
> >> thousand
> >> > > of
> >> > > &gt; consumers,
> >> > > &gt; &gt; &gt; and a logic to schedule consumers's receive. It would
> >> be
> >> > > helpful to
> >> > > &gt; know if
> >> > > &gt; &gt; &gt; one of the consumers have message to recive.
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; ## Goal
> >> > > &gt; &gt; &gt; To make `Consumer` can judge if there are unreceiving
> >> > > messages
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; ## API Changes
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; Add `hasMessageInReceiverQueue` on the `Consumer`
> >> > interface.
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; ## Implementation
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; For `ZeroQueueConsumerImpl` return false, Others,
> judge
> >> > the
> >> > > &gt; &gt; &gt; `receiveQueueSize` greater than zero.
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; ## Reject Alternatives
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; No alternatives yet.
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; ---
> >> > > &gt; &gt; &gt; Thanks,
> >> > > &gt; &gt; &gt; Haiting Jiang (Github: Jason918)
> >> > > &gt;
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > ---
> >> > > Thanks,
> >> > > Haiting Jiang (Github: Jason918)
> >> >
> >>
> >
>

Re: [DISCUSSION] PIP-108: Add method to help user judge if consumer queue has message

Posted by ZhangJian He <sh...@gmail.com>.
Hello, matteo, About my PIP 108, the reason I want to have method
`hasMessageInReceiverQueue` is that
We need to control consumer at different rate. For example, consumer A
10msg/s, consumer B 100 msg/s
so I can't use the `listener` mode.
the `receive(0)` method will remove the message from queue. currently my
work flow is 1) check if has message
2) apply for flow quota 3) receive messages. I can not put the mssage back
to the queue, can't use the `receive(0)`.
And `apply for flow quota` is a costly action.
And discussed with PengHui Li and Hang Chen, we think `hasLocalMessages` is
better than hasMessageInReceiverQueue

ZhangJian He <sh...@gmail.com> 于2021年10月27日周三 下午7:47写道:

> Some users want to use this api to judge if there's messages to receive,
> like that pseudo code:
> if (consumer.hasMessage()) {
>    .submit(() -> {
>       consumer.pollMessagesAccordingToTheDistributedFlowControl()
>    })
> }
>
> don't want to remove the message from queue.
>
> PengHui Li <pe...@apache.org> 于2021年10月27日周三 下午7:43写道:
>
>> @ZhangJian He, as Matteo mentioned, Use `consumer.receive(0,
>> TimeUnit.SECONDS)` can achieve the same purpose for checking if there are
>> messages in the local cache.
>>
>> Thanks
>> Penghui
>>
>> On Tue, Oct 26, 2021 at 2:35 PM ZhangJian He <sh...@gmail.com> wrote:
>>
>> > If some users need the message content to do user-defined actions, we
>> need
>> > to ensure the user can't use the `peekMessage` to do things like ack
>> > because the message are still in the blockingQueue, return just a
>> > content-copy?
>> >
>> > introduced `localBuffer` might be good ? `hasMessagesInLocalBuffer`
>> >
>> > JiangHaiting <ji...@foxmail.com> 于2021年10月26日周二 下午2:20写道:
>> >
>> > > I'm totally +1 for the feature to check if we can get
>> > > message&nbsp;immediately from consumer, this is to say we have message
>> > > locally.
>> > >
>> > >
>> > > In my understanding, it's useful to implement some user-defined order
>> to
>> > > consume messages among different topics, in your case, the
>> "distributed
>> > > flow control ability".
>> > > But in the past few years, I've met some users have defined the
>> consume
>> > > order of different topics by part of the message content, like
>> > > some&nbsp;critical property value.&nbsp;
>> > > In these situations, a `peek` method is more suitable.
>> > >
>> > >
>> > > Further more, peek is not effectively equals to `consumer.receive(0,
>> > > TimeUnit.SECONDS)`. As you will have to store the message somewhere
>> else
>> > if
>> > > you find that it's not the most priority message to process.
>> > >
>> > >
>> > > One last thing, put the concept of "receiverQueue" in the api of
>> > consumer,
>> > > seems a little bit strange, IMHO.
>> > >
>> > >
>> > >
>> > >
>> > > ------------------&nbsp;Original&nbsp;------------------
>> > > From:
>> > >                                                   "dev"
>> > >                                                                 <
>> > > shoothzj@gmail.com&gt;;
>> > > Date:&nbsp;Tue, Oct 26, 2021 12:54 PM
>> > > To:&nbsp;"dev"<dev@pulsar.apache.org&gt;;
>> > >
>> > > Subject:&nbsp;Re: [DISCUSSION] PIP-108: Add method to help user judge
>> if
>> > > consumer queue has message
>> > >
>> > >
>> > >
>> > > 3. Our solution implements the distributed flow control ability at
>> client
>> > > side, so we don't use the listener way.
>> > > 2. Per customer per consumer in different tenants and namespace, and
>> the
>> > > `flow-control` need(Some of our customer's machines can't work on high
>> > > traffic), So `Multi-topic` can't use.
>> > > 1. We want to use this api to judge if there's messages to receive,
>> like
>> > > that pseudo code
>> > > if (consumer.hasMessage()) {
>> > >  .submit(() -&gt; {
>> > >  consumer.pollMessagesAccordingToTheDistributedFlowControl()
>> > >  })
>> > > }
>> > >
>> > > Matteo Merli <matteo.merli@gmail.com&gt; 于2021年10月26日周二 下午12:15写道:
>> > >
>> > > &gt; I'm a bit hesitant about this because I think there are already
>> at
>> > > &gt; least 3 different ways to handle similar scenarios.
>> > > &gt;
>> > > &gt;&nbsp; 1. Using listener and avoid calling receive directly
>> > > &gt;&nbsp; 2. Use multi-topic consumer, so there's a single `Consumer`
>> > > instance
>> > > &gt; exposed
>> > > &gt;&nbsp; 3. Use `consumer.receive(0, TimeUnit.SECONDS)` to probe for
>> > > message
>> > > &gt;
>> > > &gt;
>> > > &gt; --
>> > > &gt; Matteo Merli
>> > > &gt; <matteo.merli@gmail.com&gt;
>> > > &gt;
>> > > &gt; On Mon, Oct 25, 2021 at 7:34 PM ZhangJian He <shoothzj@gmail.com
>> > &gt;
>> > > wrote:
>> > > &gt; &gt;
>> > > &gt; &gt; I think it's better to add the method to Consumer interface
>> > > instead of
>> > > &gt; let
>> > > &gt; &gt; user casting it to `ConsumerBase`.
>> > > &gt; &gt; `peek` is most complexly,&nbsp; for the reason, I can use
>> the
>> > > `peek` object to
>> > > &gt; &gt; ack、negative ack, but when to remove from the
>> `BlockingQueue`?
>> > > &gt; &gt; IMHO, people use this api are just to judge if has the
>> message,
>> > > &gt; otherwise,
>> > > &gt; &gt; they can just use `receive(0,TimeUnit)
>> > > &gt; &gt;
>> > > &gt; &gt; JiangHaiting <jianghaiting@foxmail.com&gt; 于2021年10月26日周二
>> > > 上午10:19写道:
>> > > &gt; &gt;
>> > > &gt; &gt; &gt; Can this method
>> > > &gt; &gt; &gt;
>> > > "org.apache.pulsar.client.impl.ConsumerBase#getTotalIncomingMessages"
>> > > &gt; do
>> > > &gt; &gt; &gt; the trick? Though you have to change the type to
>> > > ConsumerBase.
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; And maybe `peek` is more suitable and useful to add to
>> the
>> > > Consumer
>> > > &gt; &gt; &gt; interface?
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > ------------------&amp;nbsp;Original&amp;nbsp;------------------
>> > > &gt; &gt; &gt; From:
>> > > &gt; &gt;
>> > >
>> >
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>> > > "dev"
>> > > &gt; &gt;
>> > >
>> >
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>> > > <
>> > > &gt; &gt; &gt; shoothzj@gmail.com&amp;gt;;
>> > > &gt; &gt; &gt; Date:&amp;nbsp;Mon, Oct 25, 2021 07:24 PM
>> > > &gt; &gt; &gt; To:&amp;nbsp;"dev"<dev@pulsar.apache.org&amp;gt;;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; Subject:&amp;nbsp;[DISCUSSION] PIP-108: Add method to
>> help
>> > > user judge if
>> > > &gt; &gt; &gt; consumer queue has message
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; https://github.com/apache/pulsar/issues/12479
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; --- Pasted here for quoting convenience ---
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; ## Motivation
>> > > &gt; &gt; &gt; Currently, I have an application that manages ten
>> thousand
>> > > of
>> > > &gt; consumers,
>> > > &gt; &gt; &gt; and a logic to schedule consumers's receive. It would
>> be
>> > > helpful to
>> > > &gt; know if
>> > > &gt; &gt; &gt; one of the consumers have message to recive.
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; ## Goal
>> > > &gt; &gt; &gt; To make `Consumer` can judge if there are unreceiving
>> > > messages
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; ## API Changes
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; Add `hasMessageInReceiverQueue` on the `Consumer`
>> > interface.
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; ## Implementation
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; For `ZeroQueueConsumerImpl` return false, Others, judge
>> > the
>> > > &gt; &gt; &gt; `receiveQueueSize` greater than zero.
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; ## Reject Alternatives
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; No alternatives yet.
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; ---
>> > > &gt; &gt; &gt; Thanks,
>> > > &gt; &gt; &gt; Haiting Jiang (Github: Jason918)
>> > > &gt;
>> > >
>> > >
>> > >
>> > >
>> > > ---
>> > > Thanks,
>> > > Haiting Jiang (Github: Jason918)
>> >
>>
>

Re: [DISCUSSION] PIP-108: Add method to help user judge if consumer queue has message

Posted by ZhangJian He <sh...@gmail.com>.
Some users want to use this api to judge if there's messages to receive,
like that pseudo code:
if (consumer.hasMessage()) {
   .submit(() -> {
      consumer.pollMessagesAccordingToTheDistributedFlowControl()
   })
}

don't want to remove the message from queue.

PengHui Li <pe...@apache.org> 于2021年10月27日周三 下午7:43写道:

> @ZhangJian He, as Matteo mentioned, Use `consumer.receive(0,
> TimeUnit.SECONDS)` can achieve the same purpose for checking if there are
> messages in the local cache.
>
> Thanks
> Penghui
>
> On Tue, Oct 26, 2021 at 2:35 PM ZhangJian He <sh...@gmail.com> wrote:
>
> > If some users need the message content to do user-defined actions, we
> need
> > to ensure the user can't use the `peekMessage` to do things like ack
> > because the message are still in the blockingQueue, return just a
> > content-copy?
> >
> > introduced `localBuffer` might be good ? `hasMessagesInLocalBuffer`
> >
> > JiangHaiting <ji...@foxmail.com> 于2021年10月26日周二 下午2:20写道:
> >
> > > I'm totally +1 for the feature to check if we can get
> > > message&nbsp;immediately from consumer, this is to say we have message
> > > locally.
> > >
> > >
> > > In my understanding, it's useful to implement some user-defined order
> to
> > > consume messages among different topics, in your case, the "distributed
> > > flow control ability".
> > > But in the past few years, I've met some users have defined the consume
> > > order of different topics by part of the message content, like
> > > some&nbsp;critical property value.&nbsp;
> > > In these situations, a `peek` method is more suitable.
> > >
> > >
> > > Further more, peek is not effectively equals to `consumer.receive(0,
> > > TimeUnit.SECONDS)`. As you will have to store the message somewhere
> else
> > if
> > > you find that it's not the most priority message to process.
> > >
> > >
> > > One last thing, put the concept of "receiverQueue" in the api of
> > consumer,
> > > seems a little bit strange, IMHO.
> > >
> > >
> > >
> > >
> > > ------------------&nbsp;Original&nbsp;------------------
> > > From:
> > >                                                   "dev"
> > >                                                                 <
> > > shoothzj@gmail.com&gt;;
> > > Date:&nbsp;Tue, Oct 26, 2021 12:54 PM
> > > To:&nbsp;"dev"<dev@pulsar.apache.org&gt;;
> > >
> > > Subject:&nbsp;Re: [DISCUSSION] PIP-108: Add method to help user judge
> if
> > > consumer queue has message
> > >
> > >
> > >
> > > 3. Our solution implements the distributed flow control ability at
> client
> > > side, so we don't use the listener way.
> > > 2. Per customer per consumer in different tenants and namespace, and
> the
> > > `flow-control` need(Some of our customer's machines can't work on high
> > > traffic), So `Multi-topic` can't use.
> > > 1. We want to use this api to judge if there's messages to receive,
> like
> > > that pseudo code
> > > if (consumer.hasMessage()) {
> > >  .submit(() -&gt; {
> > >  consumer.pollMessagesAccordingToTheDistributedFlowControl()
> > >  })
> > > }
> > >
> > > Matteo Merli <matteo.merli@gmail.com&gt; 于2021年10月26日周二 下午12:15写道:
> > >
> > > &gt; I'm a bit hesitant about this because I think there are already at
> > > &gt; least 3 different ways to handle similar scenarios.
> > > &gt;
> > > &gt;&nbsp; 1. Using listener and avoid calling receive directly
> > > &gt;&nbsp; 2. Use multi-topic consumer, so there's a single `Consumer`
> > > instance
> > > &gt; exposed
> > > &gt;&nbsp; 3. Use `consumer.receive(0, TimeUnit.SECONDS)` to probe for
> > > message
> > > &gt;
> > > &gt;
> > > &gt; --
> > > &gt; Matteo Merli
> > > &gt; <matteo.merli@gmail.com&gt;
> > > &gt;
> > > &gt; On Mon, Oct 25, 2021 at 7:34 PM ZhangJian He <shoothzj@gmail.com
> > &gt;
> > > wrote:
> > > &gt; &gt;
> > > &gt; &gt; I think it's better to add the method to Consumer interface
> > > instead of
> > > &gt; let
> > > &gt; &gt; user casting it to `ConsumerBase`.
> > > &gt; &gt; `peek` is most complexly,&nbsp; for the reason, I can use the
> > > `peek` object to
> > > &gt; &gt; ack、negative ack, but when to remove from the
> `BlockingQueue`?
> > > &gt; &gt; IMHO, people use this api are just to judge if has the
> message,
> > > &gt; otherwise,
> > > &gt; &gt; they can just use `receive(0,TimeUnit)
> > > &gt; &gt;
> > > &gt; &gt; JiangHaiting <jianghaiting@foxmail.com&gt; 于2021年10月26日周二
> > > 上午10:19写道:
> > > &gt; &gt;
> > > &gt; &gt; &gt; Can this method
> > > &gt; &gt; &gt;
> > > "org.apache.pulsar.client.impl.ConsumerBase#getTotalIncomingMessages"
> > > &gt; do
> > > &gt; &gt; &gt; the trick? Though you have to change the type to
> > > ConsumerBase.
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt; And maybe `peek` is more suitable and useful to add to
> the
> > > Consumer
> > > &gt; &gt; &gt; interface?
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt;
> > > ------------------&amp;nbsp;Original&amp;nbsp;------------------
> > > &gt; &gt; &gt; From:
> > > &gt; &gt;
> > >
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > > "dev"
> > > &gt; &gt;
> > >
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > > <
> > > &gt; &gt; &gt; shoothzj@gmail.com&amp;gt;;
> > > &gt; &gt; &gt; Date:&amp;nbsp;Mon, Oct 25, 2021 07:24 PM
> > > &gt; &gt; &gt; To:&amp;nbsp;"dev"<dev@pulsar.apache.org&amp;gt;;
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt; Subject:&amp;nbsp;[DISCUSSION] PIP-108: Add method to
> help
> > > user judge if
> > > &gt; &gt; &gt; consumer queue has message
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt; https://github.com/apache/pulsar/issues/12479
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt; --- Pasted here for quoting convenience ---
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt; ## Motivation
> > > &gt; &gt; &gt; Currently, I have an application that manages ten
> thousand
> > > of
> > > &gt; consumers,
> > > &gt; &gt; &gt; and a logic to schedule consumers's receive. It would be
> > > helpful to
> > > &gt; know if
> > > &gt; &gt; &gt; one of the consumers have message to recive.
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt; ## Goal
> > > &gt; &gt; &gt; To make `Consumer` can judge if there are unreceiving
> > > messages
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt; ## API Changes
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt; Add `hasMessageInReceiverQueue` on the `Consumer`
> > interface.
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt; ## Implementation
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt; For `ZeroQueueConsumerImpl` return false, Others, judge
> > the
> > > &gt; &gt; &gt; `receiveQueueSize` greater than zero.
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt; ## Reject Alternatives
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt; No alternatives yet.
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt;
> > > &gt; &gt; &gt; ---
> > > &gt; &gt; &gt; Thanks,
> > > &gt; &gt; &gt; Haiting Jiang (Github: Jason918)
> > > &gt;
> > >
> > >
> > >
> > >
> > > ---
> > > Thanks,
> > > Haiting Jiang (Github: Jason918)
> >
>

Re: [DISCUSSION] PIP-108: Add method to help user judge if consumer queue has message

Posted by PengHui Li <pe...@apache.org>.
@ZhangJian He, as Matteo mentioned, Use `consumer.receive(0,
TimeUnit.SECONDS)` can achieve the same purpose for checking if there are
messages in the local cache.

Thanks
Penghui

On Tue, Oct 26, 2021 at 2:35 PM ZhangJian He <sh...@gmail.com> wrote:

> If some users need the message content to do user-defined actions, we need
> to ensure the user can't use the `peekMessage` to do things like ack
> because the message are still in the blockingQueue, return just a
> content-copy?
>
> introduced `localBuffer` might be good ? `hasMessagesInLocalBuffer`
>
> JiangHaiting <ji...@foxmail.com> 于2021年10月26日周二 下午2:20写道:
>
> > I'm totally +1 for the feature to check if we can get
> > message&nbsp;immediately from consumer, this is to say we have message
> > locally.
> >
> >
> > In my understanding, it's useful to implement some user-defined order to
> > consume messages among different topics, in your case, the "distributed
> > flow control ability".
> > But in the past few years, I've met some users have defined the consume
> > order of different topics by part of the message content, like
> > some&nbsp;critical property value.&nbsp;
> > In these situations, a `peek` method is more suitable.
> >
> >
> > Further more, peek is not effectively equals to `consumer.receive(0,
> > TimeUnit.SECONDS)`. As you will have to store the message somewhere else
> if
> > you find that it's not the most priority message to process.
> >
> >
> > One last thing, put the concept of "receiverQueue" in the api of
> consumer,
> > seems a little bit strange, IMHO.
> >
> >
> >
> >
> > ------------------&nbsp;Original&nbsp;------------------
> > From:
> >                                                   "dev"
> >                                                                 <
> > shoothzj@gmail.com&gt;;
> > Date:&nbsp;Tue, Oct 26, 2021 12:54 PM
> > To:&nbsp;"dev"<dev@pulsar.apache.org&gt;;
> >
> > Subject:&nbsp;Re: [DISCUSSION] PIP-108: Add method to help user judge if
> > consumer queue has message
> >
> >
> >
> > 3. Our solution implements the distributed flow control ability at client
> > side, so we don't use the listener way.
> > 2. Per customer per consumer in different tenants and namespace, and the
> > `flow-control` need(Some of our customer's machines can't work on high
> > traffic), So `Multi-topic` can't use.
> > 1. We want to use this api to judge if there's messages to receive, like
> > that pseudo code
> > if (consumer.hasMessage()) {
> >  .submit(() -&gt; {
> >  consumer.pollMessagesAccordingToTheDistributedFlowControl()
> >  })
> > }
> >
> > Matteo Merli <matteo.merli@gmail.com&gt; 于2021年10月26日周二 下午12:15写道:
> >
> > &gt; I'm a bit hesitant about this because I think there are already at
> > &gt; least 3 different ways to handle similar scenarios.
> > &gt;
> > &gt;&nbsp; 1. Using listener and avoid calling receive directly
> > &gt;&nbsp; 2. Use multi-topic consumer, so there's a single `Consumer`
> > instance
> > &gt; exposed
> > &gt;&nbsp; 3. Use `consumer.receive(0, TimeUnit.SECONDS)` to probe for
> > message
> > &gt;
> > &gt;
> > &gt; --
> > &gt; Matteo Merli
> > &gt; <matteo.merli@gmail.com&gt;
> > &gt;
> > &gt; On Mon, Oct 25, 2021 at 7:34 PM ZhangJian He <shoothzj@gmail.com
> &gt;
> > wrote:
> > &gt; &gt;
> > &gt; &gt; I think it's better to add the method to Consumer interface
> > instead of
> > &gt; let
> > &gt; &gt; user casting it to `ConsumerBase`.
> > &gt; &gt; `peek` is most complexly,&nbsp; for the reason, I can use the
> > `peek` object to
> > &gt; &gt; ack、negative ack, but when to remove from the `BlockingQueue`?
> > &gt; &gt; IMHO, people use this api are just to judge if has the message,
> > &gt; otherwise,
> > &gt; &gt; they can just use `receive(0,TimeUnit)
> > &gt; &gt;
> > &gt; &gt; JiangHaiting <jianghaiting@foxmail.com&gt; 于2021年10月26日周二
> > 上午10:19写道:
> > &gt; &gt;
> > &gt; &gt; &gt; Can this method
> > &gt; &gt; &gt;
> > "org.apache.pulsar.client.impl.ConsumerBase#getTotalIncomingMessages"
> > &gt; do
> > &gt; &gt; &gt; the trick? Though you have to change the type to
> > ConsumerBase.
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;
> > &gt; &gt; &gt; And maybe `peek` is more suitable and useful to add to the
> > Consumer
> > &gt; &gt; &gt; interface?
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;
> > ------------------&amp;nbsp;Original&amp;nbsp;------------------
> > &gt; &gt; &gt; From:
> > &gt; &gt;
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > "dev"
> > &gt; &gt;
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > <
> > &gt; &gt; &gt; shoothzj@gmail.com&amp;gt;;
> > &gt; &gt; &gt; Date:&amp;nbsp;Mon, Oct 25, 2021 07:24 PM
> > &gt; &gt; &gt; To:&amp;nbsp;"dev"<dev@pulsar.apache.org&amp;gt;;
> > &gt; &gt; &gt;
> > &gt; &gt; &gt; Subject:&amp;nbsp;[DISCUSSION] PIP-108: Add method to help
> > user judge if
> > &gt; &gt; &gt; consumer queue has message
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;
> > &gt; &gt; &gt; https://github.com/apache/pulsar/issues/12479
> > &gt; &gt; &gt;
> > &gt; &gt; &gt; --- Pasted here for quoting convenience ---
> > &gt; &gt; &gt;
> > &gt; &gt; &gt; ## Motivation
> > &gt; &gt; &gt; Currently, I have an application that manages ten thousand
> > of
> > &gt; consumers,
> > &gt; &gt; &gt; and a logic to schedule consumers's receive. It would be
> > helpful to
> > &gt; know if
> > &gt; &gt; &gt; one of the consumers have message to recive.
> > &gt; &gt; &gt;
> > &gt; &gt; &gt; ## Goal
> > &gt; &gt; &gt; To make `Consumer` can judge if there are unreceiving
> > messages
> > &gt; &gt; &gt;
> > &gt; &gt; &gt; ## API Changes
> > &gt; &gt; &gt;
> > &gt; &gt; &gt; Add `hasMessageInReceiverQueue` on the `Consumer`
> interface.
> > &gt; &gt; &gt;
> > &gt; &gt; &gt; ## Implementation
> > &gt; &gt; &gt;
> > &gt; &gt; &gt; For `ZeroQueueConsumerImpl` return false, Others, judge
> the
> > &gt; &gt; &gt; `receiveQueueSize` greater than zero.
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;
> > &gt; &gt; &gt; ## Reject Alternatives
> > &gt; &gt; &gt;
> > &gt; &gt; &gt; No alternatives yet.
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;
> > &gt; &gt; &gt; ---
> > &gt; &gt; &gt; Thanks,
> > &gt; &gt; &gt; Haiting Jiang (Github: Jason918)
> > &gt;
> >
> >
> >
> >
> > ---
> > Thanks,
> > Haiting Jiang (Github: Jason918)
>

Re: [DISCUSSION] PIP-108: Add method to help user judge if consumer queue has message

Posted by ZhangJian He <sh...@gmail.com>.
If some users need the message content to do user-defined actions, we need
to ensure the user can't use the `peekMessage` to do things like ack
because the message are still in the blockingQueue, return just a
content-copy?

introduced `localBuffer` might be good ? `hasMessagesInLocalBuffer`

JiangHaiting <ji...@foxmail.com> 于2021年10月26日周二 下午2:20写道:

> I'm totally +1 for the feature to check if we can get
> message&nbsp;immediately from consumer, this is to say we have message
> locally.
>
>
> In my understanding, it's useful to implement some user-defined order to
> consume messages among different topics, in your case, the "distributed
> flow control ability".
> But in the past few years, I've met some users have defined the consume
> order of different topics by part of the message content, like
> some&nbsp;critical property value.&nbsp;
> In these situations, a `peek` method is more suitable.
>
>
> Further more, peek is not effectively equals to `consumer.receive(0,
> TimeUnit.SECONDS)`. As you will have to store the message somewhere else if
> you find that it's not the most priority message to process.
>
>
> One last thing, put the concept of "receiverQueue" in the api of consumer,
> seems a little bit strange, IMHO.
>
>
>
>
> ------------------&nbsp;Original&nbsp;------------------
> From:
>                                                   "dev"
>                                                                 <
> shoothzj@gmail.com&gt;;
> Date:&nbsp;Tue, Oct 26, 2021 12:54 PM
> To:&nbsp;"dev"<dev@pulsar.apache.org&gt;;
>
> Subject:&nbsp;Re: [DISCUSSION] PIP-108: Add method to help user judge if
> consumer queue has message
>
>
>
> 3. Our solution implements the distributed flow control ability at client
> side, so we don't use the listener way.
> 2. Per customer per consumer in different tenants and namespace, and the
> `flow-control` need(Some of our customer's machines can't work on high
> traffic), So `Multi-topic` can't use.
> 1. We want to use this api to judge if there's messages to receive, like
> that pseudo code
> if (consumer.hasMessage()) {
>  .submit(() -&gt; {
>  consumer.pollMessagesAccordingToTheDistributedFlowControl()
>  })
> }
>
> Matteo Merli <matteo.merli@gmail.com&gt; 于2021年10月26日周二 下午12:15写道:
>
> &gt; I'm a bit hesitant about this because I think there are already at
> &gt; least 3 different ways to handle similar scenarios.
> &gt;
> &gt;&nbsp; 1. Using listener and avoid calling receive directly
> &gt;&nbsp; 2. Use multi-topic consumer, so there's a single `Consumer`
> instance
> &gt; exposed
> &gt;&nbsp; 3. Use `consumer.receive(0, TimeUnit.SECONDS)` to probe for
> message
> &gt;
> &gt;
> &gt; --
> &gt; Matteo Merli
> &gt; <matteo.merli@gmail.com&gt;
> &gt;
> &gt; On Mon, Oct 25, 2021 at 7:34 PM ZhangJian He <shoothzj@gmail.com&gt;
> wrote:
> &gt; &gt;
> &gt; &gt; I think it's better to add the method to Consumer interface
> instead of
> &gt; let
> &gt; &gt; user casting it to `ConsumerBase`.
> &gt; &gt; `peek` is most complexly,&nbsp; for the reason, I can use the
> `peek` object to
> &gt; &gt; ack、negative ack, but when to remove from the `BlockingQueue`?
> &gt; &gt; IMHO, people use this api are just to judge if has the message,
> &gt; otherwise,
> &gt; &gt; they can just use `receive(0,TimeUnit)
> &gt; &gt;
> &gt; &gt; JiangHaiting <jianghaiting@foxmail.com&gt; 于2021年10月26日周二
> 上午10:19写道:
> &gt; &gt;
> &gt; &gt; &gt; Can this method
> &gt; &gt; &gt;
> "org.apache.pulsar.client.impl.ConsumerBase#getTotalIncomingMessages"
> &gt; do
> &gt; &gt; &gt; the trick? Though you have to change the type to
> ConsumerBase.
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt; And maybe `peek` is more suitable and useful to add to the
> Consumer
> &gt; &gt; &gt; interface?
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> ------------------&amp;nbsp;Original&amp;nbsp;------------------
> &gt; &gt; &gt; From:
> &gt; &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "dev"
> &gt; &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> <
> &gt; &gt; &gt; shoothzj@gmail.com&amp;gt;;
> &gt; &gt; &gt; Date:&amp;nbsp;Mon, Oct 25, 2021 07:24 PM
> &gt; &gt; &gt; To:&amp;nbsp;"dev"<dev@pulsar.apache.org&amp;gt;;
> &gt; &gt; &gt;
> &gt; &gt; &gt; Subject:&amp;nbsp;[DISCUSSION] PIP-108: Add method to help
> user judge if
> &gt; &gt; &gt; consumer queue has message
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt; https://github.com/apache/pulsar/issues/12479
> &gt; &gt; &gt;
> &gt; &gt; &gt; --- Pasted here for quoting convenience ---
> &gt; &gt; &gt;
> &gt; &gt; &gt; ## Motivation
> &gt; &gt; &gt; Currently, I have an application that manages ten thousand
> of
> &gt; consumers,
> &gt; &gt; &gt; and a logic to schedule consumers's receive. It would be
> helpful to
> &gt; know if
> &gt; &gt; &gt; one of the consumers have message to recive.
> &gt; &gt; &gt;
> &gt; &gt; &gt; ## Goal
> &gt; &gt; &gt; To make `Consumer` can judge if there are unreceiving
> messages
> &gt; &gt; &gt;
> &gt; &gt; &gt; ## API Changes
> &gt; &gt; &gt;
> &gt; &gt; &gt; Add `hasMessageInReceiverQueue` on the `Consumer` interface.
> &gt; &gt; &gt;
> &gt; &gt; &gt; ## Implementation
> &gt; &gt; &gt;
> &gt; &gt; &gt; For `ZeroQueueConsumerImpl` return false, Others, judge the
> &gt; &gt; &gt; `receiveQueueSize` greater than zero.
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt; ## Reject Alternatives
> &gt; &gt; &gt;
> &gt; &gt; &gt; No alternatives yet.
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt; ---
> &gt; &gt; &gt; Thanks,
> &gt; &gt; &gt; Haiting Jiang (Github: Jason918)
> &gt;
>
>
>
>
> ---
> Thanks,
> Haiting Jiang (Github: Jason918)

Re: [DISCUSSION] PIP-108: Add method to help user judge if consumer queue has message

Posted by JiangHaiting <ji...@foxmail.com>.
I'm totally +1 for the feature to check if we can get message&nbsp;immediately from consumer, this is to say we have message locally.


In my understanding, it's useful to implement some user-defined order to consume messages among different topics, in your case, the "distributed flow control ability".
But in the past few years, I've met some users have defined the consume order of different topics by part of the message content, like some&nbsp;critical property value.&nbsp;
In these situations, a `peek` method is more suitable.


Further more, peek is not effectively equals to `consumer.receive(0, TimeUnit.SECONDS)`. As you will have to store the message somewhere else if you find that it's not the most priority message to process.


One last thing, put the concept of "receiverQueue" in the api of consumer, seems a little bit strange, IMHO.




------------------&nbsp;Original&nbsp;------------------
From:                                                                                                                        "dev"                                                                                    <shoothzj@gmail.com&gt;;
Date:&nbsp;Tue, Oct 26, 2021 12:54 PM
To:&nbsp;"dev"<dev@pulsar.apache.org&gt;;

Subject:&nbsp;Re: [DISCUSSION] PIP-108: Add method to help user judge if consumer queue has message



3. Our solution implements the distributed flow control ability at client
side, so we don't use the listener way.
2. Per customer per consumer in different tenants and namespace, and the
`flow-control` need(Some of our customer's machines can't work on high
traffic), So `Multi-topic` can't use.
1. We want to use this api to judge if there's messages to receive, like
that pseudo code
if (consumer.hasMessage()) {
 .submit(() -&gt; {
 consumer.pollMessagesAccordingToTheDistributedFlowControl()
 })
}

Matteo Merli <matteo.merli@gmail.com&gt; 于2021年10月26日周二 下午12:15写道:

&gt; I'm a bit hesitant about this because I think there are already at
&gt; least 3 different ways to handle similar scenarios.
&gt;
&gt;&nbsp; 1. Using listener and avoid calling receive directly
&gt;&nbsp; 2. Use multi-topic consumer, so there's a single `Consumer` instance
&gt; exposed
&gt;&nbsp; 3. Use `consumer.receive(0, TimeUnit.SECONDS)` to probe for message
&gt;
&gt;
&gt; --
&gt; Matteo Merli
&gt; <matteo.merli@gmail.com&gt;
&gt;
&gt; On Mon, Oct 25, 2021 at 7:34 PM ZhangJian He <shoothzj@gmail.com&gt; wrote:
&gt; &gt;
&gt; &gt; I think it's better to add the method to Consumer interface instead of
&gt; let
&gt; &gt; user casting it to `ConsumerBase`.
&gt; &gt; `peek` is most complexly,&nbsp; for the reason, I can use the `peek` object to
&gt; &gt; ack、negative ack, but when to remove from the `BlockingQueue`?
&gt; &gt; IMHO, people use this api are just to judge if has the message,
&gt; otherwise,
&gt; &gt; they can just use `receive(0,TimeUnit)
&gt; &gt;
&gt; &gt; JiangHaiting <jianghaiting@foxmail.com&gt; 于2021年10月26日周二 上午10:19写道:
&gt; &gt;
&gt; &gt; &gt; Can this method
&gt; &gt; &gt; "org.apache.pulsar.client.impl.ConsumerBase#getTotalIncomingMessages"
&gt; do
&gt; &gt; &gt; the trick? Though you have to change the type to ConsumerBase.
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt; And maybe `peek` is more suitable and useful to add to the Consumer
&gt; &gt; &gt; interface?
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt; ------------------&amp;nbsp;Original&amp;nbsp;------------------
&gt; &gt; &gt; From:
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "dev"
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <
&gt; &gt; &gt; shoothzj@gmail.com&amp;gt;;
&gt; &gt; &gt; Date:&amp;nbsp;Mon, Oct 25, 2021 07:24 PM
&gt; &gt; &gt; To:&amp;nbsp;"dev"<dev@pulsar.apache.org&amp;gt;;
&gt; &gt; &gt;
&gt; &gt; &gt; Subject:&amp;nbsp;[DISCUSSION] PIP-108: Add method to help user judge if
&gt; &gt; &gt; consumer queue has message
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt; https://github.com/apache/pulsar/issues/12479
&gt; &gt; &gt;
&gt; &gt; &gt; --- Pasted here for quoting convenience ---
&gt; &gt; &gt;
&gt; &gt; &gt; ## Motivation
&gt; &gt; &gt; Currently, I have an application that manages ten thousand of
&gt; consumers,
&gt; &gt; &gt; and a logic to schedule consumers's receive. It would be helpful to
&gt; know if
&gt; &gt; &gt; one of the consumers have message to recive.
&gt; &gt; &gt;
&gt; &gt; &gt; ## Goal
&gt; &gt; &gt; To make `Consumer` can judge if there are unreceiving messages
&gt; &gt; &gt;
&gt; &gt; &gt; ## API Changes
&gt; &gt; &gt;
&gt; &gt; &gt; Add `hasMessageInReceiverQueue` on the `Consumer` interface.
&gt; &gt; &gt;
&gt; &gt; &gt; ## Implementation
&gt; &gt; &gt;
&gt; &gt; &gt; For `ZeroQueueConsumerImpl` return false, Others, judge the
&gt; &gt; &gt; `receiveQueueSize` greater than zero.
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt; ## Reject Alternatives
&gt; &gt; &gt;
&gt; &gt; &gt; No alternatives yet.
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt; ---
&gt; &gt; &gt; Thanks,
&gt; &gt; &gt; Haiting Jiang (Github: Jason918)
&gt;




---
Thanks,
Haiting Jiang (Github: Jason918)

Re: [DISCUSSION] PIP-108: Add method to help user judge if consumer queue has message

Posted by ZhangJian He <sh...@gmail.com>.
3. Our solution implements the distributed flow control ability at client
side, so we don't use the listener way.
2. Per customer per consumer in different tenants and namespace, and the
`flow-control` need(Some of our customer's machines can't work on high
traffic), So `Multi-topic` can't use.
1. We want to use this api to judge if there's messages to receive, like
that pseudo code
if (consumer.hasMessage()) {
   .submit(() -> {
      consumer.pollMessagesAccordingToTheDistributedFlowControl()
   })
}

Matteo Merli <ma...@gmail.com> 于2021年10月26日周二 下午12:15写道:

> I'm a bit hesitant about this because I think there are already at
> least 3 different ways to handle similar scenarios.
>
>  1. Using listener and avoid calling receive directly
>  2. Use multi-topic consumer, so there's a single `Consumer` instance
> exposed
>  3. Use `consumer.receive(0, TimeUnit.SECONDS)` to probe for message
>
>
> --
> Matteo Merli
> <ma...@gmail.com>
>
> On Mon, Oct 25, 2021 at 7:34 PM ZhangJian He <sh...@gmail.com> wrote:
> >
> > I think it's better to add the method to Consumer interface instead of
> let
> > user casting it to `ConsumerBase`.
> > `peek` is most complexly,  for the reason, I can use the `peek` object to
> > ack、negative ack, but when to remove from the `BlockingQueue`?
> > IMHO, people use this api are just to judge if has the message,
> otherwise,
> > they can just use `receive(0,TimeUnit)
> >
> > JiangHaiting <ji...@foxmail.com> 于2021年10月26日周二 上午10:19写道:
> >
> > > Can this method
> > > "org.apache.pulsar.client.impl.ConsumerBase#getTotalIncomingMessages"
> do
> > > the trick? Though you have to change the type to ConsumerBase.
> > >
> > >
> > > And maybe `peek` is more suitable and useful to add to the Consumer
> > > interface?
> > >
> > >
> > >
> > >
> > >
> > >
> > > ------------------&nbsp;Original&nbsp;------------------
> > > From:
> > >                                                   "dev"
> > >                                                                 <
> > > shoothzj@gmail.com&gt;;
> > > Date:&nbsp;Mon, Oct 25, 2021 07:24 PM
> > > To:&nbsp;"dev"<dev@pulsar.apache.org&gt;;
> > >
> > > Subject:&nbsp;[DISCUSSION] PIP-108: Add method to help user judge if
> > > consumer queue has message
> > >
> > >
> > >
> > > https://github.com/apache/pulsar/issues/12479
> > >
> > > --- Pasted here for quoting convenience ---
> > >
> > > ## Motivation
> > > Currently, I have an application that manages ten thousand of
> consumers,
> > > and a logic to schedule consumers's receive. It would be helpful to
> know if
> > > one of the consumers have message to recive.
> > >
> > > ## Goal
> > > To make `Consumer` can judge if there are unreceiving messages
> > >
> > > ## API Changes
> > >
> > > Add `hasMessageInReceiverQueue` on the `Consumer` interface.
> > >
> > > ## Implementation
> > >
> > > For `ZeroQueueConsumerImpl` return false, Others, judge the
> > > `receiveQueueSize` greater than zero.
> > >
> > >
> > > ## Reject Alternatives
> > >
> > > No alternatives yet.
> > >
> > >
> > >
> > >
> > > ---
> > > Thanks,
> > > Haiting Jiang (Github: Jason918)
>

Re: [DISCUSSION] PIP-108: Add method to help user judge if consumer queue has message

Posted by Matteo Merli <ma...@gmail.com>.
I'm a bit hesitant about this because I think there are already at
least 3 different ways to handle similar scenarios.

 1. Using listener and avoid calling receive directly
 2. Use multi-topic consumer, so there's a single `Consumer` instance exposed
 3. Use `consumer.receive(0, TimeUnit.SECONDS)` to probe for message


--
Matteo Merli
<ma...@gmail.com>

On Mon, Oct 25, 2021 at 7:34 PM ZhangJian He <sh...@gmail.com> wrote:
>
> I think it's better to add the method to Consumer interface instead of let
> user casting it to `ConsumerBase`.
> `peek` is most complexly,  for the reason, I can use the `peek` object to
> ack、negative ack, but when to remove from the `BlockingQueue`?
> IMHO, people use this api are just to judge if has the message, otherwise,
> they can just use `receive(0,TimeUnit)
>
> JiangHaiting <ji...@foxmail.com> 于2021年10月26日周二 上午10:19写道:
>
> > Can this method
> > "org.apache.pulsar.client.impl.ConsumerBase#getTotalIncomingMessages" do
> > the trick? Though you have to change the type to ConsumerBase.
> >
> >
> > And maybe `peek` is more suitable and useful to add to the Consumer
> > interface?
> >
> >
> >
> >
> >
> >
> > ------------------&nbsp;Original&nbsp;------------------
> > From:
> >                                                   "dev"
> >                                                                 <
> > shoothzj@gmail.com&gt;;
> > Date:&nbsp;Mon, Oct 25, 2021 07:24 PM
> > To:&nbsp;"dev"<dev@pulsar.apache.org&gt;;
> >
> > Subject:&nbsp;[DISCUSSION] PIP-108: Add method to help user judge if
> > consumer queue has message
> >
> >
> >
> > https://github.com/apache/pulsar/issues/12479
> >
> > --- Pasted here for quoting convenience ---
> >
> > ## Motivation
> > Currently, I have an application that manages ten thousand of consumers,
> > and a logic to schedule consumers's receive. It would be helpful to know if
> > one of the consumers have message to recive.
> >
> > ## Goal
> > To make `Consumer` can judge if there are unreceiving messages
> >
> > ## API Changes
> >
> > Add `hasMessageInReceiverQueue` on the `Consumer` interface.
> >
> > ## Implementation
> >
> > For `ZeroQueueConsumerImpl` return false, Others, judge the
> > `receiveQueueSize` greater than zero.
> >
> >
> > ## Reject Alternatives
> >
> > No alternatives yet.
> >
> >
> >
> >
> > ---
> > Thanks,
> > Haiting Jiang (Github: Jason918)

Re: [DISCUSSION] PIP-108: Add method to help user judge if consumer queue has message

Posted by ZhangJian He <sh...@gmail.com>.
I think it's better to add the method to Consumer interface instead of let
user casting it to `ConsumerBase`.
`peek` is most complexly,  for the reason, I can use the `peek` object to
ack、negative ack, but when to remove from the `BlockingQueue`?
IMHO, people use this api are just to judge if has the message, otherwise,
they can just use `receive(0,TimeUnit)

JiangHaiting <ji...@foxmail.com> 于2021年10月26日周二 上午10:19写道:

> Can this method
> "org.apache.pulsar.client.impl.ConsumerBase#getTotalIncomingMessages" do
> the trick? Though you have to change the type to ConsumerBase.
>
>
> And maybe `peek` is more suitable and useful to add to the Consumer
> interface?
>
>
>
>
>
>
> ------------------&nbsp;Original&nbsp;------------------
> From:
>                                                   "dev"
>                                                                 <
> shoothzj@gmail.com&gt;;
> Date:&nbsp;Mon, Oct 25, 2021 07:24 PM
> To:&nbsp;"dev"<dev@pulsar.apache.org&gt;;
>
> Subject:&nbsp;[DISCUSSION] PIP-108: Add method to help user judge if
> consumer queue has message
>
>
>
> https://github.com/apache/pulsar/issues/12479
>
> --- Pasted here for quoting convenience ---
>
> ## Motivation
> Currently, I have an application that manages ten thousand of consumers,
> and a logic to schedule consumers's receive. It would be helpful to know if
> one of the consumers have message to recive.
>
> ## Goal
> To make `Consumer` can judge if there are unreceiving messages
>
> ## API Changes
>
> Add `hasMessageInReceiverQueue` on the `Consumer` interface.
>
> ## Implementation
>
> For `ZeroQueueConsumerImpl` return false, Others, judge the
> `receiveQueueSize` greater than zero.
>
>
> ## Reject Alternatives
>
> No alternatives yet.
>
>
>
>
> ---
> Thanks,
> Haiting Jiang (Github: Jason918)

Re:[DISCUSSION] PIP-108: Add method to help user judge if consumer queue has message

Posted by JiangHaiting <ji...@foxmail.com>.
Can this method "org.apache.pulsar.client.impl.ConsumerBase#getTotalIncomingMessages" do the trick? Though you have to change the type to ConsumerBase.


And maybe `peek` is more suitable and useful to add to the Consumer interface?






------------------&nbsp;Original&nbsp;------------------
From:                                                                                                                        "dev"                                                                                    <shoothzj@gmail.com&gt;;
Date:&nbsp;Mon, Oct 25, 2021 07:24 PM
To:&nbsp;"dev"<dev@pulsar.apache.org&gt;;

Subject:&nbsp;[DISCUSSION] PIP-108: Add method to help user judge if consumer queue has message



https://github.com/apache/pulsar/issues/12479

--- Pasted here for quoting convenience ---

## Motivation
Currently, I have an application that manages ten thousand of consumers,
and a logic to schedule consumers's receive. It would be helpful to know if
one of the consumers have message to recive.

## Goal
To make `Consumer` can judge if there are unreceiving messages

## API Changes

Add `hasMessageInReceiverQueue` on the `Consumer` interface.

## Implementation

For `ZeroQueueConsumerImpl` return false, Others, judge the
`receiveQueueSize` greater than zero.


## Reject Alternatives

No alternatives yet.




---
Thanks,
Haiting Jiang (Github: Jason918)