You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by ZhangJian He <sh...@gmail.com> on 2021/11/08 14:01:19 UTC

Re: [DISCUSSION] PIP-108: Add method to help user judge if consumer queue has message

Hello, matteo, About my PIP 108, the reason I want to have method
`hasMessageInReceiverQueue` is that
We need to control consumer at different rate. For example, consumer A
10msg/s, consumer B 100 msg/s
so I can't use the `listener` mode.
the `receive(0)` method will remove the message from queue. currently my
work flow is 1) check if has message
2) apply for flow quota 3) receive messages. I can not put the mssage back
to the queue, can't use the `receive(0)`.
And `apply for flow quota` is a costly action.
And discussed with PengHui Li and Hang Chen, we think `hasLocalMessages` is
better than hasMessageInReceiverQueue

ZhangJian He <sh...@gmail.com> 于2021年10月27日周三 下午7:47写道:

> Some users want to use this api to judge if there's messages to receive,
> like that pseudo code:
> if (consumer.hasMessage()) {
>    .submit(() -> {
>       consumer.pollMessagesAccordingToTheDistributedFlowControl()
>    })
> }
>
> don't want to remove the message from queue.
>
> PengHui Li <pe...@apache.org> 于2021年10月27日周三 下午7:43写道:
>
>> @ZhangJian He, as Matteo mentioned, Use `consumer.receive(0,
>> TimeUnit.SECONDS)` can achieve the same purpose for checking if there are
>> messages in the local cache.
>>
>> Thanks
>> Penghui
>>
>> On Tue, Oct 26, 2021 at 2:35 PM ZhangJian He <sh...@gmail.com> wrote:
>>
>> > If some users need the message content to do user-defined actions, we
>> need
>> > to ensure the user can't use the `peekMessage` to do things like ack
>> > because the message are still in the blockingQueue, return just a
>> > content-copy?
>> >
>> > introduced `localBuffer` might be good ? `hasMessagesInLocalBuffer`
>> >
>> > JiangHaiting <ji...@foxmail.com> 于2021年10月26日周二 下午2:20写道:
>> >
>> > > I'm totally +1 for the feature to check if we can get
>> > > message&nbsp;immediately from consumer, this is to say we have message
>> > > locally.
>> > >
>> > >
>> > > In my understanding, it's useful to implement some user-defined order
>> to
>> > > consume messages among different topics, in your case, the
>> "distributed
>> > > flow control ability".
>> > > But in the past few years, I've met some users have defined the
>> consume
>> > > order of different topics by part of the message content, like
>> > > some&nbsp;critical property value.&nbsp;
>> > > In these situations, a `peek` method is more suitable.
>> > >
>> > >
>> > > Further more, peek is not effectively equals to `consumer.receive(0,
>> > > TimeUnit.SECONDS)`. As you will have to store the message somewhere
>> else
>> > if
>> > > you find that it's not the most priority message to process.
>> > >
>> > >
>> > > One last thing, put the concept of "receiverQueue" in the api of
>> > consumer,
>> > > seems a little bit strange, IMHO.
>> > >
>> > >
>> > >
>> > >
>> > > ------------------&nbsp;Original&nbsp;------------------
>> > > From:
>> > >                                                   "dev"
>> > >                                                                 <
>> > > shoothzj@gmail.com&gt;;
>> > > Date:&nbsp;Tue, Oct 26, 2021 12:54 PM
>> > > To:&nbsp;"dev"<dev@pulsar.apache.org&gt;;
>> > >
>> > > Subject:&nbsp;Re: [DISCUSSION] PIP-108: Add method to help user judge
>> if
>> > > consumer queue has message
>> > >
>> > >
>> > >
>> > > 3. Our solution implements the distributed flow control ability at
>> client
>> > > side, so we don't use the listener way.
>> > > 2. Per customer per consumer in different tenants and namespace, and
>> the
>> > > `flow-control` need(Some of our customer's machines can't work on high
>> > > traffic), So `Multi-topic` can't use.
>> > > 1. We want to use this api to judge if there's messages to receive,
>> like
>> > > that pseudo code
>> > > if (consumer.hasMessage()) {
>> > >  .submit(() -&gt; {
>> > >  consumer.pollMessagesAccordingToTheDistributedFlowControl()
>> > >  })
>> > > }
>> > >
>> > > Matteo Merli <matteo.merli@gmail.com&gt; 于2021年10月26日周二 下午12:15写道:
>> > >
>> > > &gt; I'm a bit hesitant about this because I think there are already
>> at
>> > > &gt; least 3 different ways to handle similar scenarios.
>> > > &gt;
>> > > &gt;&nbsp; 1. Using listener and avoid calling receive directly
>> > > &gt;&nbsp; 2. Use multi-topic consumer, so there's a single `Consumer`
>> > > instance
>> > > &gt; exposed
>> > > &gt;&nbsp; 3. Use `consumer.receive(0, TimeUnit.SECONDS)` to probe for
>> > > message
>> > > &gt;
>> > > &gt;
>> > > &gt; --
>> > > &gt; Matteo Merli
>> > > &gt; <matteo.merli@gmail.com&gt;
>> > > &gt;
>> > > &gt; On Mon, Oct 25, 2021 at 7:34 PM ZhangJian He <shoothzj@gmail.com
>> > &gt;
>> > > wrote:
>> > > &gt; &gt;
>> > > &gt; &gt; I think it's better to add the method to Consumer interface
>> > > instead of
>> > > &gt; let
>> > > &gt; &gt; user casting it to `ConsumerBase`.
>> > > &gt; &gt; `peek` is most complexly,&nbsp; for the reason, I can use
>> the
>> > > `peek` object to
>> > > &gt; &gt; ack、negative ack, but when to remove from the
>> `BlockingQueue`?
>> > > &gt; &gt; IMHO, people use this api are just to judge if has the
>> message,
>> > > &gt; otherwise,
>> > > &gt; &gt; they can just use `receive(0,TimeUnit)
>> > > &gt; &gt;
>> > > &gt; &gt; JiangHaiting <jianghaiting@foxmail.com&gt; 于2021年10月26日周二
>> > > 上午10:19写道:
>> > > &gt; &gt;
>> > > &gt; &gt; &gt; Can this method
>> > > &gt; &gt; &gt;
>> > > "org.apache.pulsar.client.impl.ConsumerBase#getTotalIncomingMessages"
>> > > &gt; do
>> > > &gt; &gt; &gt; the trick? Though you have to change the type to
>> > > ConsumerBase.
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; And maybe `peek` is more suitable and useful to add to
>> the
>> > > Consumer
>> > > &gt; &gt; &gt; interface?
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > ------------------&amp;nbsp;Original&amp;nbsp;------------------
>> > > &gt; &gt; &gt; From:
>> > > &gt; &gt;
>> > >
>> >
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>> > > "dev"
>> > > &gt; &gt;
>> > >
>> >
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>> > > <
>> > > &gt; &gt; &gt; shoothzj@gmail.com&amp;gt;;
>> > > &gt; &gt; &gt; Date:&amp;nbsp;Mon, Oct 25, 2021 07:24 PM
>> > > &gt; &gt; &gt; To:&amp;nbsp;"dev"<dev@pulsar.apache.org&amp;gt;;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; Subject:&amp;nbsp;[DISCUSSION] PIP-108: Add method to
>> help
>> > > user judge if
>> > > &gt; &gt; &gt; consumer queue has message
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; https://github.com/apache/pulsar/issues/12479
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; --- Pasted here for quoting convenience ---
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; ## Motivation
>> > > &gt; &gt; &gt; Currently, I have an application that manages ten
>> thousand
>> > > of
>> > > &gt; consumers,
>> > > &gt; &gt; &gt; and a logic to schedule consumers's receive. It would
>> be
>> > > helpful to
>> > > &gt; know if
>> > > &gt; &gt; &gt; one of the consumers have message to recive.
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; ## Goal
>> > > &gt; &gt; &gt; To make `Consumer` can judge if there are unreceiving
>> > > messages
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; ## API Changes
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; Add `hasMessageInReceiverQueue` on the `Consumer`
>> > interface.
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; ## Implementation
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; For `ZeroQueueConsumerImpl` return false, Others, judge
>> > the
>> > > &gt; &gt; &gt; `receiveQueueSize` greater than zero.
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; ## Reject Alternatives
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; No alternatives yet.
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt;
>> > > &gt; &gt; &gt; ---
>> > > &gt; &gt; &gt; Thanks,
>> > > &gt; &gt; &gt; Haiting Jiang (Github: Jason918)
>> > > &gt;
>> > >
>> > >
>> > >
>> > >
>> > > ---
>> > > Thanks,
>> > > Haiting Jiang (Github: Jason918)
>> >
>>
>

Re: [DISCUSSION] PIP-108: Add method to help user judge if consumer queue has message

Posted by PengHui Li <pe...@apache.org>.
Thanks for the explanation, hasLocalMessages looks good.

Penghui

On Mon, Nov 8, 2021 at 10:39 PM Enrico Olivelli <eo...@gmail.com> wrote:

> Il giorno lun 8 nov 2021 alle ore 15:01 ZhangJian He <sh...@gmail.com>
> ha scritto:
>
> > Hello, matteo, About my PIP 108, the reason I want to have method
> > `hasMessageInReceiverQueue` is that
> > We need to control consumer at different rate. For example, consumer A
> > 10msg/s, consumer B 100 msg/s
> > so I can't use the `listener` mode.
> > the `receive(0)` method will remove the message from queue. currently my
> > work flow is 1) check if has message
> > 2) apply for flow quota 3) receive messages. I can not put the mssage
> back
> > to the queue, can't use the `receive(0)`.
> > And `apply for flow quota` is a costly action.
> > And discussed with PengHui Li and Hang Chen, we think `hasLocalMessages`
> is
> > better than hasMessageInReceiverQueue
> >
>
> "hasLocalMessages" works for me
>
> Enrico
>
>
>
> >
> > ZhangJian He <sh...@gmail.com> 于2021年10月27日周三 下午7:47写道:
> >
> > > Some users want to use this api to judge if there's messages to
> receive,
> > > like that pseudo code:
> > > if (consumer.hasMessage()) {
> > >    .submit(() -> {
> > >       consumer.pollMessagesAccordingToTheDistributedFlowControl()
> > >    })
> > > }
> > >
> > > don't want to remove the message from queue.
> > >
> > > PengHui Li <pe...@apache.org> 于2021年10月27日周三 下午7:43写道:
> > >
> > >> @ZhangJian He, as Matteo mentioned, Use `consumer.receive(0,
> > >> TimeUnit.SECONDS)` can achieve the same purpose for checking if there
> > are
> > >> messages in the local cache.
> > >>
> > >> Thanks
> > >> Penghui
> > >>
> > >> On Tue, Oct 26, 2021 at 2:35 PM ZhangJian He <sh...@gmail.com>
> > wrote:
> > >>
> > >> > If some users need the message content to do user-defined actions,
> we
> > >> need
> > >> > to ensure the user can't use the `peekMessage` to do things like ack
> > >> > because the message are still in the blockingQueue, return just a
> > >> > content-copy?
> > >> >
> > >> > introduced `localBuffer` might be good ? `hasMessagesInLocalBuffer`
> > >> >
> > >> > JiangHaiting <ji...@foxmail.com> 于2021年10月26日周二 下午2:20写道:
> > >> >
> > >> > > I'm totally +1 for the feature to check if we can get
> > >> > > message&nbsp;immediately from consumer, this is to say we have
> > message
> > >> > > locally.
> > >> > >
> > >> > >
> > >> > > In my understanding, it's useful to implement some user-defined
> > order
> > >> to
> > >> > > consume messages among different topics, in your case, the
> > >> "distributed
> > >> > > flow control ability".
> > >> > > But in the past few years, I've met some users have defined the
> > >> consume
> > >> > > order of different topics by part of the message content, like
> > >> > > some&nbsp;critical property value.&nbsp;
> > >> > > In these situations, a `peek` method is more suitable.
> > >> > >
> > >> > >
> > >> > > Further more, peek is not effectively equals to
> `consumer.receive(0,
> > >> > > TimeUnit.SECONDS)`. As you will have to store the message
> somewhere
> > >> else
> > >> > if
> > >> > > you find that it's not the most priority message to process.
> > >> > >
> > >> > >
> > >> > > One last thing, put the concept of "receiverQueue" in the api of
> > >> > consumer,
> > >> > > seems a little bit strange, IMHO.
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > > ------------------&nbsp;Original&nbsp;------------------
> > >> > > From:
> > >> > >                                                   "dev"
> > >> > >                                                                 <
> > >> > > shoothzj@gmail.com&gt;;
> > >> > > Date:&nbsp;Tue, Oct 26, 2021 12:54 PM
> > >> > > To:&nbsp;"dev"<dev@pulsar.apache.org&gt;;
> > >> > >
> > >> > > Subject:&nbsp;Re: [DISCUSSION] PIP-108: Add method to help user
> > judge
> > >> if
> > >> > > consumer queue has message
> > >> > >
> > >> > >
> > >> > >
> > >> > > 3. Our solution implements the distributed flow control ability at
> > >> client
> > >> > > side, so we don't use the listener way.
> > >> > > 2. Per customer per consumer in different tenants and namespace,
> and
> > >> the
> > >> > > `flow-control` need(Some of our customer's machines can't work on
> > high
> > >> > > traffic), So `Multi-topic` can't use.
> > >> > > 1. We want to use this api to judge if there's messages to
> receive,
> > >> like
> > >> > > that pseudo code
> > >> > > if (consumer.hasMessage()) {
> > >> > >  .submit(() -&gt; {
> > >> > >  consumer.pollMessagesAccordingToTheDistributedFlowControl()
> > >> > >  })
> > >> > > }
> > >> > >
> > >> > > Matteo Merli <matteo.merli@gmail.com&gt; 于2021年10月26日周二
> 下午12:15写道:
> > >> > >
> > >> > > &gt; I'm a bit hesitant about this because I think there are
> already
> > >> at
> > >> > > &gt; least 3 different ways to handle similar scenarios.
> > >> > > &gt;
> > >> > > &gt;&nbsp; 1. Using listener and avoid calling receive directly
> > >> > > &gt;&nbsp; 2. Use multi-topic consumer, so there's a single
> > `Consumer`
> > >> > > instance
> > >> > > &gt; exposed
> > >> > > &gt;&nbsp; 3. Use `consumer.receive(0, TimeUnit.SECONDS)` to probe
> > for
> > >> > > message
> > >> > > &gt;
> > >> > > &gt;
> > >> > > &gt; --
> > >> > > &gt; Matteo Merli
> > >> > > &gt; <matteo.merli@gmail.com&gt;
> > >> > > &gt;
> > >> > > &gt; On Mon, Oct 25, 2021 at 7:34 PM ZhangJian He <
> > shoothzj@gmail.com
> > >> > &gt;
> > >> > > wrote:
> > >> > > &gt; &gt;
> > >> > > &gt; &gt; I think it's better to add the method to Consumer
> > interface
> > >> > > instead of
> > >> > > &gt; let
> > >> > > &gt; &gt; user casting it to `ConsumerBase`.
> > >> > > &gt; &gt; `peek` is most complexly,&nbsp; for the reason, I can
> use
> > >> the
> > >> > > `peek` object to
> > >> > > &gt; &gt; ack、negative ack, but when to remove from the
> > >> `BlockingQueue`?
> > >> > > &gt; &gt; IMHO, people use this api are just to judge if has the
> > >> message,
> > >> > > &gt; otherwise,
> > >> > > &gt; &gt; they can just use `receive(0,TimeUnit)
> > >> > > &gt; &gt;
> > >> > > &gt; &gt; JiangHaiting <jianghaiting@foxmail.com&gt;
> 于2021年10月26日周二
> > >> > > 上午10:19写道:
> > >> > > &gt; &gt;
> > >> > > &gt; &gt; &gt; Can this method
> > >> > > &gt; &gt; &gt;
> > >> > >
> > "org.apache.pulsar.client.impl.ConsumerBase#getTotalIncomingMessages"
> > >> > > &gt; do
> > >> > > &gt; &gt; &gt; the trick? Though you have to change the type to
> > >> > > ConsumerBase.
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; And maybe `peek` is more suitable and useful to add
> > to
> > >> the
> > >> > > Consumer
> > >> > > &gt; &gt; &gt; interface?
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > ------------------&amp;nbsp;Original&amp;nbsp;------------------
> > >> > > &gt; &gt; &gt; From:
> > >> > > &gt; &gt;
> > >> > >
> > >> >
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >> > > "dev"
> > >> > > &gt; &gt;
> > >> > >
> > >> >
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >> > > <
> > >> > > &gt; &gt; &gt; shoothzj@gmail.com&amp;gt;;
> > >> > > &gt; &gt; &gt; Date:&amp;nbsp;Mon, Oct 25, 2021 07:24 PM
> > >> > > &gt; &gt; &gt; To:&amp;nbsp;"dev"<dev@pulsar.apache.org&amp;gt;;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; Subject:&amp;nbsp;[DISCUSSION] PIP-108: Add method
> to
> > >> help
> > >> > > user judge if
> > >> > > &gt; &gt; &gt; consumer queue has message
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; https://github.com/apache/pulsar/issues/12479
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; --- Pasted here for quoting convenience ---
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; ## Motivation
> > >> > > &gt; &gt; &gt; Currently, I have an application that manages ten
> > >> thousand
> > >> > > of
> > >> > > &gt; consumers,
> > >> > > &gt; &gt; &gt; and a logic to schedule consumers's receive. It
> would
> > >> be
> > >> > > helpful to
> > >> > > &gt; know if
> > >> > > &gt; &gt; &gt; one of the consumers have message to recive.
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; ## Goal
> > >> > > &gt; &gt; &gt; To make `Consumer` can judge if there are
> unreceiving
> > >> > > messages
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; ## API Changes
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; Add `hasMessageInReceiverQueue` on the `Consumer`
> > >> > interface.
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; ## Implementation
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; For `ZeroQueueConsumerImpl` return false, Others,
> > judge
> > >> > the
> > >> > > &gt; &gt; &gt; `receiveQueueSize` greater than zero.
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; ## Reject Alternatives
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; No alternatives yet.
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt;
> > >> > > &gt; &gt; &gt; ---
> > >> > > &gt; &gt; &gt; Thanks,
> > >> > > &gt; &gt; &gt; Haiting Jiang (Github: Jason918)
> > >> > > &gt;
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > > ---
> > >> > > Thanks,
> > >> > > Haiting Jiang (Github: Jason918)
> > >> >
> > >>
> > >
> >
>

Re: [DISCUSSION] PIP-108: Add method to help user judge if consumer queue has message

Posted by Enrico Olivelli <eo...@gmail.com>.
Il giorno lun 8 nov 2021 alle ore 15:01 ZhangJian He <sh...@gmail.com>
ha scritto:

> Hello, matteo, About my PIP 108, the reason I want to have method
> `hasMessageInReceiverQueue` is that
> We need to control consumer at different rate. For example, consumer A
> 10msg/s, consumer B 100 msg/s
> so I can't use the `listener` mode.
> the `receive(0)` method will remove the message from queue. currently my
> work flow is 1) check if has message
> 2) apply for flow quota 3) receive messages. I can not put the mssage back
> to the queue, can't use the `receive(0)`.
> And `apply for flow quota` is a costly action.
> And discussed with PengHui Li and Hang Chen, we think `hasLocalMessages` is
> better than hasMessageInReceiverQueue
>

"hasLocalMessages" works for me

Enrico



>
> ZhangJian He <sh...@gmail.com> 于2021年10月27日周三 下午7:47写道:
>
> > Some users want to use this api to judge if there's messages to receive,
> > like that pseudo code:
> > if (consumer.hasMessage()) {
> >    .submit(() -> {
> >       consumer.pollMessagesAccordingToTheDistributedFlowControl()
> >    })
> > }
> >
> > don't want to remove the message from queue.
> >
> > PengHui Li <pe...@apache.org> 于2021年10月27日周三 下午7:43写道:
> >
> >> @ZhangJian He, as Matteo mentioned, Use `consumer.receive(0,
> >> TimeUnit.SECONDS)` can achieve the same purpose for checking if there
> are
> >> messages in the local cache.
> >>
> >> Thanks
> >> Penghui
> >>
> >> On Tue, Oct 26, 2021 at 2:35 PM ZhangJian He <sh...@gmail.com>
> wrote:
> >>
> >> > If some users need the message content to do user-defined actions, we
> >> need
> >> > to ensure the user can't use the `peekMessage` to do things like ack
> >> > because the message are still in the blockingQueue, return just a
> >> > content-copy?
> >> >
> >> > introduced `localBuffer` might be good ? `hasMessagesInLocalBuffer`
> >> >
> >> > JiangHaiting <ji...@foxmail.com> 于2021年10月26日周二 下午2:20写道:
> >> >
> >> > > I'm totally +1 for the feature to check if we can get
> >> > > message&nbsp;immediately from consumer, this is to say we have
> message
> >> > > locally.
> >> > >
> >> > >
> >> > > In my understanding, it's useful to implement some user-defined
> order
> >> to
> >> > > consume messages among different topics, in your case, the
> >> "distributed
> >> > > flow control ability".
> >> > > But in the past few years, I've met some users have defined the
> >> consume
> >> > > order of different topics by part of the message content, like
> >> > > some&nbsp;critical property value.&nbsp;
> >> > > In these situations, a `peek` method is more suitable.
> >> > >
> >> > >
> >> > > Further more, peek is not effectively equals to `consumer.receive(0,
> >> > > TimeUnit.SECONDS)`. As you will have to store the message somewhere
> >> else
> >> > if
> >> > > you find that it's not the most priority message to process.
> >> > >
> >> > >
> >> > > One last thing, put the concept of "receiverQueue" in the api of
> >> > consumer,
> >> > > seems a little bit strange, IMHO.
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > ------------------&nbsp;Original&nbsp;------------------
> >> > > From:
> >> > >                                                   "dev"
> >> > >                                                                 <
> >> > > shoothzj@gmail.com&gt;;
> >> > > Date:&nbsp;Tue, Oct 26, 2021 12:54 PM
> >> > > To:&nbsp;"dev"<dev@pulsar.apache.org&gt;;
> >> > >
> >> > > Subject:&nbsp;Re: [DISCUSSION] PIP-108: Add method to help user
> judge
> >> if
> >> > > consumer queue has message
> >> > >
> >> > >
> >> > >
> >> > > 3. Our solution implements the distributed flow control ability at
> >> client
> >> > > side, so we don't use the listener way.
> >> > > 2. Per customer per consumer in different tenants and namespace, and
> >> the
> >> > > `flow-control` need(Some of our customer's machines can't work on
> high
> >> > > traffic), So `Multi-topic` can't use.
> >> > > 1. We want to use this api to judge if there's messages to receive,
> >> like
> >> > > that pseudo code
> >> > > if (consumer.hasMessage()) {
> >> > >  .submit(() -&gt; {
> >> > >  consumer.pollMessagesAccordingToTheDistributedFlowControl()
> >> > >  })
> >> > > }
> >> > >
> >> > > Matteo Merli <matteo.merli@gmail.com&gt; 于2021年10月26日周二 下午12:15写道:
> >> > >
> >> > > &gt; I'm a bit hesitant about this because I think there are already
> >> at
> >> > > &gt; least 3 different ways to handle similar scenarios.
> >> > > &gt;
> >> > > &gt;&nbsp; 1. Using listener and avoid calling receive directly
> >> > > &gt;&nbsp; 2. Use multi-topic consumer, so there's a single
> `Consumer`
> >> > > instance
> >> > > &gt; exposed
> >> > > &gt;&nbsp; 3. Use `consumer.receive(0, TimeUnit.SECONDS)` to probe
> for
> >> > > message
> >> > > &gt;
> >> > > &gt;
> >> > > &gt; --
> >> > > &gt; Matteo Merli
> >> > > &gt; <matteo.merli@gmail.com&gt;
> >> > > &gt;
> >> > > &gt; On Mon, Oct 25, 2021 at 7:34 PM ZhangJian He <
> shoothzj@gmail.com
> >> > &gt;
> >> > > wrote:
> >> > > &gt; &gt;
> >> > > &gt; &gt; I think it's better to add the method to Consumer
> interface
> >> > > instead of
> >> > > &gt; let
> >> > > &gt; &gt; user casting it to `ConsumerBase`.
> >> > > &gt; &gt; `peek` is most complexly,&nbsp; for the reason, I can use
> >> the
> >> > > `peek` object to
> >> > > &gt; &gt; ack、negative ack, but when to remove from the
> >> `BlockingQueue`?
> >> > > &gt; &gt; IMHO, people use this api are just to judge if has the
> >> message,
> >> > > &gt; otherwise,
> >> > > &gt; &gt; they can just use `receive(0,TimeUnit)
> >> > > &gt; &gt;
> >> > > &gt; &gt; JiangHaiting <jianghaiting@foxmail.com&gt; 于2021年10月26日周二
> >> > > 上午10:19写道:
> >> > > &gt; &gt;
> >> > > &gt; &gt; &gt; Can this method
> >> > > &gt; &gt; &gt;
> >> > >
> "org.apache.pulsar.client.impl.ConsumerBase#getTotalIncomingMessages"
> >> > > &gt; do
> >> > > &gt; &gt; &gt; the trick? Though you have to change the type to
> >> > > ConsumerBase.
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; And maybe `peek` is more suitable and useful to add
> to
> >> the
> >> > > Consumer
> >> > > &gt; &gt; &gt; interface?
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > ------------------&amp;nbsp;Original&amp;nbsp;------------------
> >> > > &gt; &gt; &gt; From:
> >> > > &gt; &gt;
> >> > >
> >> >
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >> > > "dev"
> >> > > &gt; &gt;
> >> > >
> >> >
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >> > > <
> >> > > &gt; &gt; &gt; shoothzj@gmail.com&amp;gt;;
> >> > > &gt; &gt; &gt; Date:&amp;nbsp;Mon, Oct 25, 2021 07:24 PM
> >> > > &gt; &gt; &gt; To:&amp;nbsp;"dev"<dev@pulsar.apache.org&amp;gt;;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; Subject:&amp;nbsp;[DISCUSSION] PIP-108: Add method to
> >> help
> >> > > user judge if
> >> > > &gt; &gt; &gt; consumer queue has message
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; https://github.com/apache/pulsar/issues/12479
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; --- Pasted here for quoting convenience ---
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; ## Motivation
> >> > > &gt; &gt; &gt; Currently, I have an application that manages ten
> >> thousand
> >> > > of
> >> > > &gt; consumers,
> >> > > &gt; &gt; &gt; and a logic to schedule consumers's receive. It would
> >> be
> >> > > helpful to
> >> > > &gt; know if
> >> > > &gt; &gt; &gt; one of the consumers have message to recive.
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; ## Goal
> >> > > &gt; &gt; &gt; To make `Consumer` can judge if there are unreceiving
> >> > > messages
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; ## API Changes
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; Add `hasMessageInReceiverQueue` on the `Consumer`
> >> > interface.
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; ## Implementation
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; For `ZeroQueueConsumerImpl` return false, Others,
> judge
> >> > the
> >> > > &gt; &gt; &gt; `receiveQueueSize` greater than zero.
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; ## Reject Alternatives
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; No alternatives yet.
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt;
> >> > > &gt; &gt; &gt; ---
> >> > > &gt; &gt; &gt; Thanks,
> >> > > &gt; &gt; &gt; Haiting Jiang (Github: Jason918)
> >> > > &gt;
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > ---
> >> > > Thanks,
> >> > > Haiting Jiang (Github: Jason918)
> >> >
> >>
> >
>