You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by PengHui Li <pe...@apache.org> on 2022/08/11 05:28:25 UTC

[DISCUSS] Allow the producer to connect to the topic with producer_request_hold backlog policy

Hi all,

Pulsar has a backlog quota policy `producer_request_hold` which will hold
the message
publish request. It is very useful for some data sync scenarios. It looks
like the producer
waiting for the consumer to process the data.

But the new producer is not allowed to connect to the topic after reaching
the max backlog
limitation with producer_request_hold policy. The producer will get
`ProducerBlockedQuotaExceededError` error. This caused inconvenience to
users, they need
to have some retry to logic to try to create the producer again until the
consumer acked more
messages

IMO, we should allow the producer to connect the topic. To keep the
behavior consistent with
the producer is already connected.

WDYT?

Best Regards,
Penghui

Re: [DISCUSS] Allow the producer to connect to the topic with producer_request_hold backlog policy

Posted by PengHui Li <pe...@apache.org>.
Hi Lin,

For the producer_request_hold policy, users should set the infinite publish
timeout.
It is not like moving the create producer timeout to publish message
timeout.

The key point that we want to improve is to provide consistent behavior to
users.
For example, we have a producer that has been blocked due to
producer_request_hold.
But after the producer restart(intentionally or unintentionally), the
application will fail to create
the producer again due to producer_request_hold.

What could we do without improving this part?
To catch the timeout exception, write a retry logic to try to create the
producer again and again.

> requires changing significantly how client protocols are handled and I
don't think it's worth it.

It's uncertain, we should define the behavior first and then find
a low-cost solution.
IMO, this is a poor user experience. It looks like users need to handle so
many things to
use the producer_request_hold policy. If it is not handled properly, it may
cause more serious effects.
e.g. Retry to create the producer very frequently which introduces more
overhead to brokers.

Thanks,
Penghui

On Mon, Aug 29, 2022 at 9:22 AM Lin Zhao <li...@streamnative.io> wrote:

> It's best to keep the behavior of producer_request_hold as it is, instead
> of changing it. The current behavior is, the publisher holds at most
> operationTimeoutMs (client side configuration) after which throwing the
> exception. This behavior is consistent between ongoing clients and clients
> with producers created at a time that the quota exceeds.
>
> 1. With an ongoing client, the `send()` is held in memory until the
> timeout exceeds, after which a timeout exception is thrown.
> 2. With a new client/producer, the producer `create()` method is blocked
> until the configured operationTimeout. And an exception is thrown.
>
> The two scenarios behave similarly. And changing the second call to be
> successful while subsequent `send()` calls timeout requires changing
> significantly how client protocols are handled and I don't think it's worth
> it.
>
> Thoughts?
>
> On Sun, Aug 21, 2022 at 10:58 PM PengHui Li <pe...@apache.org> wrote:
>
>>
>>
>> ---------- Forwarded message ---------
>> From: Michael Marshall <mm...@apache.org>
>> Date: Tue, Aug 16, 2022 at 2:07 PM
>> Subject: Re: [DISCUSS] Allow the producer to connect to the topic with
>> producer_request_hold backlog policy
>> To: Dev <de...@pulsar.apache.org>
>>
>>
>> I agree that metrics on these "blocked" producers are helpful to end
>> users.
>>
>> > Another concern is that if the topic is unloaded to another broker, we
>> have to notify the client to reconnect, I'm not sure if
>> `waitingExclusiveProducers` does that or if I missing some logic.
>>
>> If I remember correctly, the unload disconnects a producer and then
>> the client looks up the topic and rediscovers its quota exceeded
>> state.
>>
>> > let the producer know why the producer does not ready.
>>
>> What is the role of a unique reason? When a topic's quota is exceeded,
>> the problem is with the consumers (or the retention policy) and there
>> is nothing a producer can do other than applying backpressure.
>> Although, the producer may be the first one to find out, unless we
>> expose sufficient broker metrics for users to alert on, and in that
>> case, a helpful error message could speed up diagnosing the problem.
>> That being said, it'd be easy to add an enum to the protobuf and
>> expand the `CommandProducerSuccess` command.
>>
>> > I think based on the current logic, we can support the producer_hold
>> policy by `producer_ready=false` first. and then draft a PIP to change the
>> protocol to improve the logic.
>>
>> I agree that we could implement the change and defer expanding the
>> protocol. For now, it's probably enough to infer the reason for the
>> topic not being ready. If the producer is not an exclusive producer,
>> it can only be blocked by the quota.
>>
>> Thanks,
>> Michael
>>
>> On Sun, Aug 14, 2022 at 11:20 PM Qiang Zhao <ma...@apache.org>
>> wrote:
>> >
>> > Great idea, Michael
>> >
>> > > 1. A new producer connecting to a topic that has exceeded its quota.
>> > > This case is trivial because the broker tells the producer it is
>> > > connected but it cannot send any messages (i.e. producer_ready=false),
>> > > and the client holds the producer's future until the broker sends
>> > > producer_ready=true.
>> >
>> > Agree with this method of dealing with new producers connecting to a
>> topic that exceeds the quota, but maybe we need to add some metrics to
>> these producers so that users can observe these producers that are
>> connected successfully but not ready.
>> >
>> > Another concern is that if the topic is unloaded to another broker, we
>> have to notify the client to reconnect, I'm not sure if
>> `waitingExclusiveProducers` does that or if I missing some logic.
>> >
>> > >  but it could be worth
>> > > extending the client so that an application could reactively discover
>> > > that the topic's quota is exceeded using a listener. Additionally, we
>> > > could disable the send timeouts when the producer enters this "hold"
>> > > state so that messages.
>> >
>> > It makes sense to me but looks like we will change the protocol to let
>> the producer know why the producer does not ready. maybe need to add the
>> type field in the `CommandProducerSuccess` command.
>> >
>> > I think based on the current logic, we can support the producer_hold
>> policy by `producer_ready=false` first. and then draft a PIP to change the
>> protocol to improve the logic.
>> >
>> > Best,
>> > Mattison
>> >
>> >
>> > On 2022/08/14 05:29:35 Michael Marshall wrote:
>> > > Great points, Penghui.
>> > >
>> > > > To optimize, I think we can only let the producer connect to the
>> broker
>> > > > and the broker should tell the producer the backlog is exceeded.
>> > >
>> > > In looking at the `CommandProducerSuccess` protobuf message, we
>> > > already have the `producer_ready` boolean field. It was added for the
>> > > exclusive producer case, and it seems to match this logic exactly,
>> > > though the client wouldn't know "why" the producer was not ready. I
>> > > think this field meets our needs because the producer just needs to
>> > > know that it is connected and should not send messages yet.
>> > >
>> > > > If we have too many producers, try to reconnect to the broker again
>> and
>> > > > again. It is also a non-negligible cost.
>> > >
>> > > This is a really important point. The current protocol implementation
>> > > leads to unnecessary network traffic, and it will be worse for
>> > > topics with many producers. Note that the lookup part of the protocol
>> > > introduces additional load on all of the brokers serving these
>> > > requests.
>> > >
>> > > > Looks like we need to fix the client-side to make sure users will
>> not get
>> > > > ProducerBlockedQuotaExceededError when creating the producer with
>> > > > producer_hold_request backlog policy. I have tested it locally, the
>> behavior
>> > > > can be confirmed
>> > >
>> > > Thanks for confirming this case. I think it would make sense to update
>> > > the behavior on the producer_requests_hold feature so that the future
>> > > is incomplete until the producer is ready to produce, just like the
>> > > exclusive producer implementation.
>> > >
>> > > Ultimately, there are two cases this feature needs to handle.
>> > >
>> > > 1. A new producer connecting to a topic that has exceeded its quota.
>> > > This case is trivial because the broker tells the producer it is
>> > > connected but it cannot send any messages (i.e. producer_ready=false),
>> > > and the client holds the producer's future until the broker sends
>> > > producer_ready=true.
>> > >
>> > > 2. An existing producer gets disconnected due to an exceeded quota. In
>> > > this case, it'd be easy enough for the producer to stop sending
>> > > messages, but because the client application already has a reference
>> > > to this producer, the application will be able to submit messages
>> > > until the client's buffer is full, at which point the send is blocked
>> > > or gets an exception. I think that would work, but it could be worth
>> > > extending the client so that an application could reactively discover
>> > > that the topic's quota is exceeded using a listener. Additionally, we
>> > > could disable the send timeouts when the producer enters this "hold"
>> > > state so that messages.
>> > >
>> > > In case number 2, it probably makes sense to extend the protocol so
>> > > that the broker sends a protocol message indicating the producer
>> > > should stop sending messages. This would be more elegant than
>> > > disconnecting the producer and making it look up the topic again.
>> > >
>> > > Thanks,
>> > > Michael
>> > >
>> > >
>> > > On Fri, Aug 12, 2022 at 5:50 AM PengHui Li <pe...@apache.org>
>> wrote:
>> > > >
>> > > > > The producer fails
>> > > > the pending messages when the policy is producer_exception and the
>> > > > producer does nothing when the policy is producer_request_hold
>> > > >
>> > > > Eventually, it will fail [0] the user's create producer request
>> because of
>> > > > the operation timeout [1].
>> > > >
>> > > > > The primary advantage for this solution is that the broker does
>> not
>> > > > need to hold a producer's messages in memory for some undefined
>> time.
>> > > >
>> > > > Yes, I agree. And changing the protocol will also affect other
>> clients.
>> > > >
>> > > > To optimize, I think we can only let the producer connect to the
>> broker
>> > > > and the broker should tell the producer the backlog is exceeded.
>> > > > The producer can only send one message to test. Only push out more
>> messages
>> > > > after the first message gets the response. Just a rough idea, not
>> for now.
>> > > > If we have too many producers, try to reconnect to the broker again
>> and
>> > > > again.
>> > > > It is also a non-negligible cost.
>> > > >
>> > > > Looks like we need to fix the client-side to make sure users will
>> not get
>> > > > ProducerBlockedQuotaExceededError when creating the producer with
>> > > > producer_hold_request backlog policy. I have tested it locally, the
>> behavior
>> > > > can be confirmed
>> > > >
>> > > > ```
>> > > >
>> org.apache.pulsar.client.api.PulsarClientException$ProducerBlockedQuotaExceededError:
>> > > >
>> {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$TopicBacklogQuotaExceededException:
>> > > > Cannot create producer on topic with backlog quota
>> > > > exceeded","reqId":1841236212888635356, "remote":"localhost/
>> 127.0.0.1:64805",
>> > > > "local":"/127.0.0.1:64809"}
>> > > >
>> > > > at
>> > > >
>> org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1052)
>> > > > at
>> > > >
>> org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:88)
>> > > > at
>> > > >
>> org.apache.pulsar.broker.service.BacklogQuotaManagerTest.createProducer(BacklogQuotaManagerTest.java:664)
>> > > > at
>> > > >
>> org.apache.pulsar.broker.service.BacklogQuotaManagerTest.testProducerException(BacklogQuotaManagerTest.java:1091)
>> > > > at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> > > > Method)
>> > > > at
>> > > >
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>> > > > at
>> > > >
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > > > at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>> > > > at
>> > > >
>> org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
>> > > > at
>> org.testng.internal.TestInvoker.invokeMethod(TestInvoker.java:599)
>> > > > at
>> org.testng.internal.TestInvoker.invokeTestMethod(TestInvoker.java:174)
>> > > > at
>> org.testng.internal.MethodRunner.runInSequence(MethodRunner.java:46)
>> > > > at
>> > > >
>> org.testng.internal.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:822)
>> > > > at
>> org.testng.internal.TestInvoker.invokeTestMethods(TestInvoker.java:147)
>> > > > at
>> > > >
>> org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146)
>> > > > at
>> org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:128)
>> > > > at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
>> > > > at org.testng.TestRunner.privateRun(TestRunner.java:764)
>> > > > at org.testng.TestRunner.run(TestRunner.java:585)
>> > > > at org.testng.SuiteRunner.runTest(SuiteRunner.java:384)
>> > > > at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:378)
>> > > > at org.testng.SuiteRunner.privateRun(SuiteRunner.java:337)
>> > > > at org.testng.SuiteRunner.run(SuiteRunner.java:286)
>> > > > at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:53)
>> > > > at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:96)
>> > > > at org.testng.TestNG.runSuitesSequentially(TestNG.java:1218)
>> > > > at org.testng.TestNG.runSuitesLocally(TestNG.java:1140)
>> > > > at org.testng.TestNG.runSuites(TestNG.java:1069)
>> > > > at org.testng.TestNG.run(TestNG.java:1037)
>> > > > at
>> com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
>> > > > at
>> > > >
>> com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:109)
>> > > > ```
>> > > >
>> > > > Best,
>> > > > Penghui
>> > > >
>> > > > [0]
>> > > >
>> https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1778-L1786
>> > > > [1]
>> > > >
>> https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1773
>> > > >
>> > > >
>> > > > On Fri, Aug 12, 2022 at 1:36 PM Michael Marshall <
>> mmarshall@apache.org>
>> > > > wrote:
>> > > >
>> > > > > > IMO, we should allow the producer to connect the topic.
>> > > > >
>> > > > > I actually think the current producer_request_hold feature works
>> based
>> > > > > on disconnecting a producer and only letting it connect when the
>> topic
>> > > > > no longer exceeds its quota.
>> > > > >
>> > > > > > It looks like we do not support the `producer_request_hold`
>> semantics.
>> > > > > We just easily use the same behaviour like `producer_exception`.
>> Maybe it's
>> > > > > a missing feature.
>> > > > >
>> > > > > I agree that the only references to the
>> > > > > RetentionPolicy.producer_request_hold enum have to do with
>> disallowing
>> > > > > producer creation or with disconnecting the producer when the
>> backlog
>> > > > > is exceeded [0].
>> > > > >
>> > > > > However, I think the feature does work if we look closer. The
>> > > > > implementation is in the client (at least it is in the Java
>> client).
>> > > > > First, note that the only functional difference between
>> > > > > producer_exception and producer_request_hold comes here [1] where
>> two
>> > > > > different exceptions are sent to the producer. Then, see that the
>> > > > > producer handles these exceptions differently [2]. The producer
>> fails
>> > > > > the pending messages when the policy is producer_exception and the
>> > > > > producer does nothing when the policy is producer_request_hold.
>> In the
>> > > > > second case, the producer will attempt to reconnect to the broker
>> and
>> > > > > will resend the messages that have been "held".
>> > > > >
>> > > > > It seems relevant to point out that the backlog quota's state is
>> only
>> > > > > changed on a 60 second interval by default (see
>> > > > > backlogQuotaCheckIntervalInSeconds) and the default send timeout
>> is 30
>> > > > > seconds. Therefore, many sends will likely timeout on the client
>> side
>> > > > > before the broker updates the topic's state to "writable" and
>> lets the
>> > > > > producer reconnect. To use this feature meaningfully, it might
>> make
>> > > > > sense to increase the send timeout.
>> > > > >
>> > > > > The primary advantage for this solution is that the broker does
>> not
>> > > > > need to hold a producer's messages in memory for some undefined
>> time.
>> > > > >
>> > > > > I just checked, and we do not have this behavior documented in the
>> > > > > pulsar binary protocol spec [3]. We should update the spec to
>> indicate
>> > > > > how this feature is supposed to work, assuming we keep it this
>> way.
>> > > > >
>> > > > > Thanks,
>> > > > > Michael
>> > > > >
>> > > > > [0]
>> > > > >
>> https://github.com/apache/pulsar/blob/d24d82780fd27a98c6cdbee28d756ee7d419495f/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L104-L107
>> > > > > [1]
>> > > > >
>> https://github.com/apache/pulsar/blob/4c6989c4da6c0b18c9b0196630e03daf437cea68/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1383-L1391
>> > > > > [2]
>> > > > >
>> https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1735-L1753
>> > > > > [3]
>> https://pulsar.apache.org/docs/next/developing-binary-protocol/
>> > > > >
>> > > > >
>> > > > > On Thu, Aug 11, 2022 at 6:52 AM Qiang Zhao <
>> mattisonchao@apache.org>
>> > > > > wrote:
>> > > > > >
>> > > > > > Hi, Penghui
>> > > > > >
>> > > > > > I support your opinion.
>> > > > > >
>> > > > > > It looks like we do not support the `producer_request_hold`
>> semantics.
>> > > > > We just easily use the same behaviour like `producer_exception`.
>> Maybe it's
>> > > > > a missing feature.
>> > > > > >
>> > > > > > Best,
>> > > > > > Mattison
>> > > > > >
>> > > > > > On 2022/08/11 05:28:25 PengHui Li wrote:
>> > > > > > > Hi all,
>> > > > > > >
>> > > > > > > Pulsar has a backlog quota policy `producer_request_hold`
>> which will
>> > > > > hold
>> > > > > > > the message
>> > > > > > > publish request. It is very useful for some data sync
>> scenarios. It
>> > > > > looks
>> > > > > > > like the producer
>> > > > > > > waiting for the consumer to process the data.
>> > > > > > >
>> > > > > > > But the new producer is not allowed to connect to the topic
>> after
>> > > > > reaching
>> > > > > > > the max backlog
>> > > > > > > limitation with producer_request_hold policy. The producer
>> will get
>> > > > > > > `ProducerBlockedQuotaExceededError` error. This caused
>> inconvenience to
>> > > > > > > users, they need
>> > > > > > > to have some retry to logic to try to create the producer
>> again until
>> > > > > the
>> > > > > > > consumer acked more
>> > > > > > > messages
>> > > > > > >
>> > > > > > > IMO, we should allow the producer to connect the topic. To
>> keep the
>> > > > > > > behavior consistent with
>> > > > > > > the producer is already connected.
>> > > > > > >
>> > > > > > > WDYT?
>> > > > > > >
>> > > > > > > Best Regards,
>> > > > > > > Penghui
>> > > > > > >
>> > > > >
>> > >
>>
>

Re: [DISCUSS] Allow the producer to connect to the topic with producer_request_hold backlog policy

Posted by Lin Zhao <li...@streamnative.io.INVALID>.
It's best to keep the behavior of producer_request_hold as it is, instead
of changing it. The current behavior is, the publisher holds at most
operationTimeoutMs (client side configuration) after which throwing the
exception. This behavior is consistent between ongoing clients and clients
with producers created at a time that the quota exceeds.

1. With an ongoing client, the `send()` is held in memory until the timeout
exceeds, after which a timeout exception is thrown.
2. With a new client/producer, the producer `create()` method is blocked
until the configured operationTimeout. And an exception is thrown.

The two scenarios behave similarly. And changing the second call to be
successful while subsequent `send()` calls timeout requires changing
significantly how client protocols are handled and I don't think it's worth
it.

Thoughts?

On Sun, Aug 21, 2022 at 10:58 PM PengHui Li <pe...@apache.org> wrote:

>
>
> ---------- Forwarded message ---------
> From: Michael Marshall <mm...@apache.org>
> Date: Tue, Aug 16, 2022 at 2:07 PM
> Subject: Re: [DISCUSS] Allow the producer to connect to the topic with
> producer_request_hold backlog policy
> To: Dev <de...@pulsar.apache.org>
>
>
> I agree that metrics on these "blocked" producers are helpful to end users.
>
> > Another concern is that if the topic is unloaded to another broker, we
> have to notify the client to reconnect, I'm not sure if
> `waitingExclusiveProducers` does that or if I missing some logic.
>
> If I remember correctly, the unload disconnects a producer and then
> the client looks up the topic and rediscovers its quota exceeded
> state.
>
> > let the producer know why the producer does not ready.
>
> What is the role of a unique reason? When a topic's quota is exceeded,
> the problem is with the consumers (or the retention policy) and there
> is nothing a producer can do other than applying backpressure.
> Although, the producer may be the first one to find out, unless we
> expose sufficient broker metrics for users to alert on, and in that
> case, a helpful error message could speed up diagnosing the problem.
> That being said, it'd be easy to add an enum to the protobuf and
> expand the `CommandProducerSuccess` command.
>
> > I think based on the current logic, we can support the producer_hold
> policy by `producer_ready=false` first. and then draft a PIP to change the
> protocol to improve the logic.
>
> I agree that we could implement the change and defer expanding the
> protocol. For now, it's probably enough to infer the reason for the
> topic not being ready. If the producer is not an exclusive producer,
> it can only be blocked by the quota.
>
> Thanks,
> Michael
>
> On Sun, Aug 14, 2022 at 11:20 PM Qiang Zhao <ma...@apache.org>
> wrote:
> >
> > Great idea, Michael
> >
> > > 1. A new producer connecting to a topic that has exceeded its quota.
> > > This case is trivial because the broker tells the producer it is
> > > connected but it cannot send any messages (i.e. producer_ready=false),
> > > and the client holds the producer's future until the broker sends
> > > producer_ready=true.
> >
> > Agree with this method of dealing with new producers connecting to a
> topic that exceeds the quota, but maybe we need to add some metrics to
> these producers so that users can observe these producers that are
> connected successfully but not ready.
> >
> > Another concern is that if the topic is unloaded to another broker, we
> have to notify the client to reconnect, I'm not sure if
> `waitingExclusiveProducers` does that or if I missing some logic.
> >
> > >  but it could be worth
> > > extending the client so that an application could reactively discover
> > > that the topic's quota is exceeded using a listener. Additionally, we
> > > could disable the send timeouts when the producer enters this "hold"
> > > state so that messages.
> >
> > It makes sense to me but looks like we will change the protocol to let
> the producer know why the producer does not ready. maybe need to add the
> type field in the `CommandProducerSuccess` command.
> >
> > I think based on the current logic, we can support the producer_hold
> policy by `producer_ready=false` first. and then draft a PIP to change the
> protocol to improve the logic.
> >
> > Best,
> > Mattison
> >
> >
> > On 2022/08/14 05:29:35 Michael Marshall wrote:
> > > Great points, Penghui.
> > >
> > > > To optimize, I think we can only let the producer connect to the
> broker
> > > > and the broker should tell the producer the backlog is exceeded.
> > >
> > > In looking at the `CommandProducerSuccess` protobuf message, we
> > > already have the `producer_ready` boolean field. It was added for the
> > > exclusive producer case, and it seems to match this logic exactly,
> > > though the client wouldn't know "why" the producer was not ready. I
> > > think this field meets our needs because the producer just needs to
> > > know that it is connected and should not send messages yet.
> > >
> > > > If we have too many producers, try to reconnect to the broker again
> and
> > > > again. It is also a non-negligible cost.
> > >
> > > This is a really important point. The current protocol implementation
> > > leads to unnecessary network traffic, and it will be worse for
> > > topics with many producers. Note that the lookup part of the protocol
> > > introduces additional load on all of the brokers serving these
> > > requests.
> > >
> > > > Looks like we need to fix the client-side to make sure users will
> not get
> > > > ProducerBlockedQuotaExceededError when creating the producer with
> > > > producer_hold_request backlog policy. I have tested it locally, the
> behavior
> > > > can be confirmed
> > >
> > > Thanks for confirming this case. I think it would make sense to update
> > > the behavior on the producer_requests_hold feature so that the future
> > > is incomplete until the producer is ready to produce, just like the
> > > exclusive producer implementation.
> > >
> > > Ultimately, there are two cases this feature needs to handle.
> > >
> > > 1. A new producer connecting to a topic that has exceeded its quota.
> > > This case is trivial because the broker tells the producer it is
> > > connected but it cannot send any messages (i.e. producer_ready=false),
> > > and the client holds the producer's future until the broker sends
> > > producer_ready=true.
> > >
> > > 2. An existing producer gets disconnected due to an exceeded quota. In
> > > this case, it'd be easy enough for the producer to stop sending
> > > messages, but because the client application already has a reference
> > > to this producer, the application will be able to submit messages
> > > until the client's buffer is full, at which point the send is blocked
> > > or gets an exception. I think that would work, but it could be worth
> > > extending the client so that an application could reactively discover
> > > that the topic's quota is exceeded using a listener. Additionally, we
> > > could disable the send timeouts when the producer enters this "hold"
> > > state so that messages.
> > >
> > > In case number 2, it probably makes sense to extend the protocol so
> > > that the broker sends a protocol message indicating the producer
> > > should stop sending messages. This would be more elegant than
> > > disconnecting the producer and making it look up the topic again.
> > >
> > > Thanks,
> > > Michael
> > >
> > >
> > > On Fri, Aug 12, 2022 at 5:50 AM PengHui Li <pe...@apache.org> wrote:
> > > >
> > > > > The producer fails
> > > > the pending messages when the policy is producer_exception and the
> > > > producer does nothing when the policy is producer_request_hold
> > > >
> > > > Eventually, it will fail [0] the user's create producer request
> because of
> > > > the operation timeout [1].
> > > >
> > > > > The primary advantage for this solution is that the broker does not
> > > > need to hold a producer's messages in memory for some undefined time.
> > > >
> > > > Yes, I agree. And changing the protocol will also affect other
> clients.
> > > >
> > > > To optimize, I think we can only let the producer connect to the
> broker
> > > > and the broker should tell the producer the backlog is exceeded.
> > > > The producer can only send one message to test. Only push out more
> messages
> > > > after the first message gets the response. Just a rough idea, not
> for now.
> > > > If we have too many producers, try to reconnect to the broker again
> and
> > > > again.
> > > > It is also a non-negligible cost.
> > > >
> > > > Looks like we need to fix the client-side to make sure users will
> not get
> > > > ProducerBlockedQuotaExceededError when creating the producer with
> > > > producer_hold_request backlog policy. I have tested it locally, the
> behavior
> > > > can be confirmed
> > > >
> > > > ```
> > > >
> org.apache.pulsar.client.api.PulsarClientException$ProducerBlockedQuotaExceededError:
> > > >
> {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$TopicBacklogQuotaExceededException:
> > > > Cannot create producer on topic with backlog quota
> > > > exceeded","reqId":1841236212888635356, "remote":"localhost/
> 127.0.0.1:64805",
> > > > "local":"/127.0.0.1:64809"}
> > > >
> > > > at
> > > >
> org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1052)
> > > > at
> > > >
> org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:88)
> > > > at
> > > >
> org.apache.pulsar.broker.service.BacklogQuotaManagerTest.createProducer(BacklogQuotaManagerTest.java:664)
> > > > at
> > > >
> org.apache.pulsar.broker.service.BacklogQuotaManagerTest.testProducerException(BacklogQuotaManagerTest.java:1091)
> > > > at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> > > > Method)
> > > > at
> > > >
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
> > > > at
> > > >
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > > at java.base/java.lang.reflect.Method.invoke(Method.java:568)
> > > > at
> > > >
> org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
> > > > at org.testng.internal.TestInvoker.invokeMethod(TestInvoker.java:599)
> > > > at
> org.testng.internal.TestInvoker.invokeTestMethod(TestInvoker.java:174)
> > > > at
> org.testng.internal.MethodRunner.runInSequence(MethodRunner.java:46)
> > > > at
> > > >
> org.testng.internal.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:822)
> > > > at
> org.testng.internal.TestInvoker.invokeTestMethods(TestInvoker.java:147)
> > > > at
> > > >
> org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146)
> > > > at
> org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:128)
> > > > at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
> > > > at org.testng.TestRunner.privateRun(TestRunner.java:764)
> > > > at org.testng.TestRunner.run(TestRunner.java:585)
> > > > at org.testng.SuiteRunner.runTest(SuiteRunner.java:384)
> > > > at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:378)
> > > > at org.testng.SuiteRunner.privateRun(SuiteRunner.java:337)
> > > > at org.testng.SuiteRunner.run(SuiteRunner.java:286)
> > > > at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:53)
> > > > at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:96)
> > > > at org.testng.TestNG.runSuitesSequentially(TestNG.java:1218)
> > > > at org.testng.TestNG.runSuitesLocally(TestNG.java:1140)
> > > > at org.testng.TestNG.runSuites(TestNG.java:1069)
> > > > at org.testng.TestNG.run(TestNG.java:1037)
> > > > at
> com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
> > > > at
> > > >
> com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:109)
> > > > ```
> > > >
> > > > Best,
> > > > Penghui
> > > >
> > > > [0]
> > > >
> https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1778-L1786
> > > > [1]
> > > >
> https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1773
> > > >
> > > >
> > > > On Fri, Aug 12, 2022 at 1:36 PM Michael Marshall <
> mmarshall@apache.org>
> > > > wrote:
> > > >
> > > > > > IMO, we should allow the producer to connect the topic.
> > > > >
> > > > > I actually think the current producer_request_hold feature works
> based
> > > > > on disconnecting a producer and only letting it connect when the
> topic
> > > > > no longer exceeds its quota.
> > > > >
> > > > > > It looks like we do not support the `producer_request_hold`
> semantics.
> > > > > We just easily use the same behaviour like `producer_exception`.
> Maybe it's
> > > > > a missing feature.
> > > > >
> > > > > I agree that the only references to the
> > > > > RetentionPolicy.producer_request_hold enum have to do with
> disallowing
> > > > > producer creation or with disconnecting the producer when the
> backlog
> > > > > is exceeded [0].
> > > > >
> > > > > However, I think the feature does work if we look closer. The
> > > > > implementation is in the client (at least it is in the Java
> client).
> > > > > First, note that the only functional difference between
> > > > > producer_exception and producer_request_hold comes here [1] where
> two
> > > > > different exceptions are sent to the producer. Then, see that the
> > > > > producer handles these exceptions differently [2]. The producer
> fails
> > > > > the pending messages when the policy is producer_exception and the
> > > > > producer does nothing when the policy is producer_request_hold. In
> the
> > > > > second case, the producer will attempt to reconnect to the broker
> and
> > > > > will resend the messages that have been "held".
> > > > >
> > > > > It seems relevant to point out that the backlog quota's state is
> only
> > > > > changed on a 60 second interval by default (see
> > > > > backlogQuotaCheckIntervalInSeconds) and the default send timeout
> is 30
> > > > > seconds. Therefore, many sends will likely timeout on the client
> side
> > > > > before the broker updates the topic's state to "writable" and lets
> the
> > > > > producer reconnect. To use this feature meaningfully, it might make
> > > > > sense to increase the send timeout.
> > > > >
> > > > > The primary advantage for this solution is that the broker does not
> > > > > need to hold a producer's messages in memory for some undefined
> time.
> > > > >
> > > > > I just checked, and we do not have this behavior documented in the
> > > > > pulsar binary protocol spec [3]. We should update the spec to
> indicate
> > > > > how this feature is supposed to work, assuming we keep it this way.
> > > > >
> > > > > Thanks,
> > > > > Michael
> > > > >
> > > > > [0]
> > > > >
> https://github.com/apache/pulsar/blob/d24d82780fd27a98c6cdbee28d756ee7d419495f/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L104-L107
> > > > > [1]
> > > > >
> https://github.com/apache/pulsar/blob/4c6989c4da6c0b18c9b0196630e03daf437cea68/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1383-L1391
> > > > > [2]
> > > > >
> https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1735-L1753
> > > > > [3]
> https://pulsar.apache.org/docs/next/developing-binary-protocol/
> > > > >
> > > > >
> > > > > On Thu, Aug 11, 2022 at 6:52 AM Qiang Zhao <
> mattisonchao@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > Hi, Penghui
> > > > > >
> > > > > > I support your opinion.
> > > > > >
> > > > > > It looks like we do not support the `producer_request_hold`
> semantics.
> > > > > We just easily use the same behaviour like `producer_exception`.
> Maybe it's
> > > > > a missing feature.
> > > > > >
> > > > > > Best,
> > > > > > Mattison
> > > > > >
> > > > > > On 2022/08/11 05:28:25 PengHui Li wrote:
> > > > > > > Hi all,
> > > > > > >
> > > > > > > Pulsar has a backlog quota policy `producer_request_hold`
> which will
> > > > > hold
> > > > > > > the message
> > > > > > > publish request. It is very useful for some data sync
> scenarios. It
> > > > > looks
> > > > > > > like the producer
> > > > > > > waiting for the consumer to process the data.
> > > > > > >
> > > > > > > But the new producer is not allowed to connect to the topic
> after
> > > > > reaching
> > > > > > > the max backlog
> > > > > > > limitation with producer_request_hold policy. The producer
> will get
> > > > > > > `ProducerBlockedQuotaExceededError` error. This caused
> inconvenience to
> > > > > > > users, they need
> > > > > > > to have some retry to logic to try to create the producer
> again until
> > > > > the
> > > > > > > consumer acked more
> > > > > > > messages
> > > > > > >
> > > > > > > IMO, we should allow the producer to connect the topic. To
> keep the
> > > > > > > behavior consistent with
> > > > > > > the producer is already connected.
> > > > > > >
> > > > > > > WDYT?
> > > > > > >
> > > > > > > Best Regards,
> > > > > > > Penghui
> > > > > > >
> > > > >
> > >
>

Re: [DISCUSS] Allow the producer to connect to the topic with producer_request_hold backlog policy

Posted by Michael Marshall <mm...@apache.org>.
I agree that metrics on these "blocked" producers are helpful to end users.

> Another concern is that if the topic is unloaded to another broker, we have to notify the client to reconnect, I'm not sure if `waitingExclusiveProducers` does that or if I missing some logic.

If I remember correctly, the unload disconnects a producer and then
the client looks up the topic and rediscovers its quota exceeded
state.

> let the producer know why the producer does not ready.

What is the role of a unique reason? When a topic's quota is exceeded,
the problem is with the consumers (or the retention policy) and there
is nothing a producer can do other than applying backpressure.
Although, the producer may be the first one to find out, unless we
expose sufficient broker metrics for users to alert on, and in that
case, a helpful error message could speed up diagnosing the problem.
That being said, it'd be easy to add an enum to the protobuf and
expand the `CommandProducerSuccess` command.

> I think based on the current logic, we can support the producer_hold policy by `producer_ready=false` first. and then draft a PIP to change the protocol to improve the logic.

I agree that we could implement the change and defer expanding the
protocol. For now, it's probably enough to infer the reason for the
topic not being ready. If the producer is not an exclusive producer,
it can only be blocked by the quota.

Thanks,
Michael

On Sun, Aug 14, 2022 at 11:20 PM Qiang Zhao <ma...@apache.org> wrote:
>
> Great idea, Michael
>
> > 1. A new producer connecting to a topic that has exceeded its quota.
> > This case is trivial because the broker tells the producer it is
> > connected but it cannot send any messages (i.e. producer_ready=false),
> > and the client holds the producer's future until the broker sends
> > producer_ready=true.
>
> Agree with this method of dealing with new producers connecting to a topic that exceeds the quota, but maybe we need to add some metrics to these producers so that users can observe these producers that are connected successfully but not ready.
>
> Another concern is that if the topic is unloaded to another broker, we have to notify the client to reconnect, I'm not sure if `waitingExclusiveProducers` does that or if I missing some logic.
>
> >  but it could be worth
> > extending the client so that an application could reactively discover
> > that the topic's quota is exceeded using a listener. Additionally, we
> > could disable the send timeouts when the producer enters this "hold"
> > state so that messages.
>
> It makes sense to me but looks like we will change the protocol to let the producer know why the producer does not ready. maybe need to add the type field in the `CommandProducerSuccess` command.
>
> I think based on the current logic, we can support the producer_hold policy by `producer_ready=false` first. and then draft a PIP to change the protocol to improve the logic.
>
> Best,
> Mattison
>
>
> On 2022/08/14 05:29:35 Michael Marshall wrote:
> > Great points, Penghui.
> >
> > > To optimize, I think we can only let the producer connect to the broker
> > > and the broker should tell the producer the backlog is exceeded.
> >
> > In looking at the `CommandProducerSuccess` protobuf message, we
> > already have the `producer_ready` boolean field. It was added for the
> > exclusive producer case, and it seems to match this logic exactly,
> > though the client wouldn't know "why" the producer was not ready. I
> > think this field meets our needs because the producer just needs to
> > know that it is connected and should not send messages yet.
> >
> > > If we have too many producers, try to reconnect to the broker again and
> > > again. It is also a non-negligible cost.
> >
> > This is a really important point. The current protocol implementation
> > leads to unnecessary network traffic, and it will be worse for
> > topics with many producers. Note that the lookup part of the protocol
> > introduces additional load on all of the brokers serving these
> > requests.
> >
> > > Looks like we need to fix the client-side to make sure users will not get
> > > ProducerBlockedQuotaExceededError when creating the producer with
> > > producer_hold_request backlog policy. I have tested it locally, the behavior
> > > can be confirmed
> >
> > Thanks for confirming this case. I think it would make sense to update
> > the behavior on the producer_requests_hold feature so that the future
> > is incomplete until the producer is ready to produce, just like the
> > exclusive producer implementation.
> >
> > Ultimately, there are two cases this feature needs to handle.
> >
> > 1. A new producer connecting to a topic that has exceeded its quota.
> > This case is trivial because the broker tells the producer it is
> > connected but it cannot send any messages (i.e. producer_ready=false),
> > and the client holds the producer's future until the broker sends
> > producer_ready=true.
> >
> > 2. An existing producer gets disconnected due to an exceeded quota. In
> > this case, it'd be easy enough for the producer to stop sending
> > messages, but because the client application already has a reference
> > to this producer, the application will be able to submit messages
> > until the client's buffer is full, at which point the send is blocked
> > or gets an exception. I think that would work, but it could be worth
> > extending the client so that an application could reactively discover
> > that the topic's quota is exceeded using a listener. Additionally, we
> > could disable the send timeouts when the producer enters this "hold"
> > state so that messages.
> >
> > In case number 2, it probably makes sense to extend the protocol so
> > that the broker sends a protocol message indicating the producer
> > should stop sending messages. This would be more elegant than
> > disconnecting the producer and making it look up the topic again.
> >
> > Thanks,
> > Michael
> >
> >
> > On Fri, Aug 12, 2022 at 5:50 AM PengHui Li <pe...@apache.org> wrote:
> > >
> > > > The producer fails
> > > the pending messages when the policy is producer_exception and the
> > > producer does nothing when the policy is producer_request_hold
> > >
> > > Eventually, it will fail [0] the user's create producer request because of
> > > the operation timeout [1].
> > >
> > > > The primary advantage for this solution is that the broker does not
> > > need to hold a producer's messages in memory for some undefined time.
> > >
> > > Yes, I agree. And changing the protocol will also affect other clients.
> > >
> > > To optimize, I think we can only let the producer connect to the broker
> > > and the broker should tell the producer the backlog is exceeded.
> > > The producer can only send one message to test. Only push out more messages
> > > after the first message gets the response. Just a rough idea, not for now.
> > > If we have too many producers, try to reconnect to the broker again and
> > > again.
> > > It is also a non-negligible cost.
> > >
> > > Looks like we need to fix the client-side to make sure users will not get
> > > ProducerBlockedQuotaExceededError when creating the producer with
> > > producer_hold_request backlog policy. I have tested it locally, the behavior
> > > can be confirmed
> > >
> > > ```
> > > org.apache.pulsar.client.api.PulsarClientException$ProducerBlockedQuotaExceededError:
> > > {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$TopicBacklogQuotaExceededException:
> > > Cannot create producer on topic with backlog quota
> > > exceeded","reqId":1841236212888635356, "remote":"localhost/127.0.0.1:64805",
> > > "local":"/127.0.0.1:64809"}
> > >
> > > at
> > > org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1052)
> > > at
> > > org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:88)
> > > at
> > > org.apache.pulsar.broker.service.BacklogQuotaManagerTest.createProducer(BacklogQuotaManagerTest.java:664)
> > > at
> > > org.apache.pulsar.broker.service.BacklogQuotaManagerTest.testProducerException(BacklogQuotaManagerTest.java:1091)
> > > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> > > Method)
> > > at
> > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
> > > at
> > > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > at java.base/java.lang.reflect.Method.invoke(Method.java:568)
> > > at
> > > org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
> > > at org.testng.internal.TestInvoker.invokeMethod(TestInvoker.java:599)
> > > at org.testng.internal.TestInvoker.invokeTestMethod(TestInvoker.java:174)
> > > at org.testng.internal.MethodRunner.runInSequence(MethodRunner.java:46)
> > > at
> > > org.testng.internal.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:822)
> > > at org.testng.internal.TestInvoker.invokeTestMethods(TestInvoker.java:147)
> > > at
> > > org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146)
> > > at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:128)
> > > at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
> > > at org.testng.TestRunner.privateRun(TestRunner.java:764)
> > > at org.testng.TestRunner.run(TestRunner.java:585)
> > > at org.testng.SuiteRunner.runTest(SuiteRunner.java:384)
> > > at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:378)
> > > at org.testng.SuiteRunner.privateRun(SuiteRunner.java:337)
> > > at org.testng.SuiteRunner.run(SuiteRunner.java:286)
> > > at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:53)
> > > at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:96)
> > > at org.testng.TestNG.runSuitesSequentially(TestNG.java:1218)
> > > at org.testng.TestNG.runSuitesLocally(TestNG.java:1140)
> > > at org.testng.TestNG.runSuites(TestNG.java:1069)
> > > at org.testng.TestNG.run(TestNG.java:1037)
> > > at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
> > > at
> > > com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:109)
> > > ```
> > >
> > > Best,
> > > Penghui
> > >
> > > [0]
> > > https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1778-L1786
> > > [1]
> > > https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1773
> > >
> > >
> > > On Fri, Aug 12, 2022 at 1:36 PM Michael Marshall <mm...@apache.org>
> > > wrote:
> > >
> > > > > IMO, we should allow the producer to connect the topic.
> > > >
> > > > I actually think the current producer_request_hold feature works based
> > > > on disconnecting a producer and only letting it connect when the topic
> > > > no longer exceeds its quota.
> > > >
> > > > > It looks like we do not support the `producer_request_hold` semantics.
> > > > We just easily use the same behaviour like `producer_exception`. Maybe it's
> > > > a missing feature.
> > > >
> > > > I agree that the only references to the
> > > > RetentionPolicy.producer_request_hold enum have to do with disallowing
> > > > producer creation or with disconnecting the producer when the backlog
> > > > is exceeded [0].
> > > >
> > > > However, I think the feature does work if we look closer. The
> > > > implementation is in the client (at least it is in the Java client).
> > > > First, note that the only functional difference between
> > > > producer_exception and producer_request_hold comes here [1] where two
> > > > different exceptions are sent to the producer. Then, see that the
> > > > producer handles these exceptions differently [2]. The producer fails
> > > > the pending messages when the policy is producer_exception and the
> > > > producer does nothing when the policy is producer_request_hold. In the
> > > > second case, the producer will attempt to reconnect to the broker and
> > > > will resend the messages that have been "held".
> > > >
> > > > It seems relevant to point out that the backlog quota's state is only
> > > > changed on a 60 second interval by default (see
> > > > backlogQuotaCheckIntervalInSeconds) and the default send timeout is 30
> > > > seconds. Therefore, many sends will likely timeout on the client side
> > > > before the broker updates the topic's state to "writable" and lets the
> > > > producer reconnect. To use this feature meaningfully, it might make
> > > > sense to increase the send timeout.
> > > >
> > > > The primary advantage for this solution is that the broker does not
> > > > need to hold a producer's messages in memory for some undefined time.
> > > >
> > > > I just checked, and we do not have this behavior documented in the
> > > > pulsar binary protocol spec [3]. We should update the spec to indicate
> > > > how this feature is supposed to work, assuming we keep it this way.
> > > >
> > > > Thanks,
> > > > Michael
> > > >
> > > > [0]
> > > > https://github.com/apache/pulsar/blob/d24d82780fd27a98c6cdbee28d756ee7d419495f/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L104-L107
> > > > [1]
> > > > https://github.com/apache/pulsar/blob/4c6989c4da6c0b18c9b0196630e03daf437cea68/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1383-L1391
> > > > [2]
> > > > https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1735-L1753
> > > > [3] https://pulsar.apache.org/docs/next/developing-binary-protocol/
> > > >
> > > >
> > > > On Thu, Aug 11, 2022 at 6:52 AM Qiang Zhao <ma...@apache.org>
> > > > wrote:
> > > > >
> > > > > Hi, Penghui
> > > > >
> > > > > I support your opinion.
> > > > >
> > > > > It looks like we do not support the `producer_request_hold` semantics.
> > > > We just easily use the same behaviour like `producer_exception`. Maybe it's
> > > > a missing feature.
> > > > >
> > > > > Best,
> > > > > Mattison
> > > > >
> > > > > On 2022/08/11 05:28:25 PengHui Li wrote:
> > > > > > Hi all,
> > > > > >
> > > > > > Pulsar has a backlog quota policy `producer_request_hold` which will
> > > > hold
> > > > > > the message
> > > > > > publish request. It is very useful for some data sync scenarios. It
> > > > looks
> > > > > > like the producer
> > > > > > waiting for the consumer to process the data.
> > > > > >
> > > > > > But the new producer is not allowed to connect to the topic after
> > > > reaching
> > > > > > the max backlog
> > > > > > limitation with producer_request_hold policy. The producer will get
> > > > > > `ProducerBlockedQuotaExceededError` error. This caused inconvenience to
> > > > > > users, they need
> > > > > > to have some retry to logic to try to create the producer again until
> > > > the
> > > > > > consumer acked more
> > > > > > messages
> > > > > >
> > > > > > IMO, we should allow the producer to connect the topic. To keep the
> > > > > > behavior consistent with
> > > > > > the producer is already connected.
> > > > > >
> > > > > > WDYT?
> > > > > >
> > > > > > Best Regards,
> > > > > > Penghui
> > > > > >
> > > >
> >

Re: [DISCUSS] Allow the producer to connect to the topic with producer_request_hold backlog policy

Posted by Qiang Zhao <ma...@apache.org>.
Great idea, Michael

> 1. A new producer connecting to a topic that has exceeded its quota.
> This case is trivial because the broker tells the producer it is
> connected but it cannot send any messages (i.e. producer_ready=false),
> and the client holds the producer's future until the broker sends
> producer_ready=true.

Agree with this method of dealing with new producers connecting to a topic that exceeds the quota, but maybe we need to add some metrics to these producers so that users can observe these producers that are connected successfully but not ready. 

Another concern is that if the topic is unloaded to another broker, we have to notify the client to reconnect, I'm not sure if `waitingExclusiveProducers` does that or if I missing some logic.

>  but it could be worth
> extending the client so that an application could reactively discover
> that the topic's quota is exceeded using a listener. Additionally, we
> could disable the send timeouts when the producer enters this "hold"
> state so that messages.

It makes sense to me but looks like we will change the protocol to let the producer know why the producer does not ready. maybe need to add the type field in the `CommandProducerSuccess` command.

I think based on the current logic, we can support the producer_hold policy by `producer_ready=false` first. and then draft a PIP to change the protocol to improve the logic. 

Best,
Mattison


On 2022/08/14 05:29:35 Michael Marshall wrote:
> Great points, Penghui.
> 
> > To optimize, I think we can only let the producer connect to the broker
> > and the broker should tell the producer the backlog is exceeded.
> 
> In looking at the `CommandProducerSuccess` protobuf message, we
> already have the `producer_ready` boolean field. It was added for the
> exclusive producer case, and it seems to match this logic exactly,
> though the client wouldn't know "why" the producer was not ready. I
> think this field meets our needs because the producer just needs to
> know that it is connected and should not send messages yet.
> 
> > If we have too many producers, try to reconnect to the broker again and
> > again. It is also a non-negligible cost.
> 
> This is a really important point. The current protocol implementation
> leads to unnecessary network traffic, and it will be worse for
> topics with many producers. Note that the lookup part of the protocol
> introduces additional load on all of the brokers serving these
> requests.
> 
> > Looks like we need to fix the client-side to make sure users will not get
> > ProducerBlockedQuotaExceededError when creating the producer with
> > producer_hold_request backlog policy. I have tested it locally, the behavior
> > can be confirmed
> 
> Thanks for confirming this case. I think it would make sense to update
> the behavior on the producer_requests_hold feature so that the future
> is incomplete until the producer is ready to produce, just like the
> exclusive producer implementation.
> 
> Ultimately, there are two cases this feature needs to handle.
> 
> 1. A new producer connecting to a topic that has exceeded its quota.
> This case is trivial because the broker tells the producer it is
> connected but it cannot send any messages (i.e. producer_ready=false),
> and the client holds the producer's future until the broker sends
> producer_ready=true.
> 
> 2. An existing producer gets disconnected due to an exceeded quota. In
> this case, it'd be easy enough for the producer to stop sending
> messages, but because the client application already has a reference
> to this producer, the application will be able to submit messages
> until the client's buffer is full, at which point the send is blocked
> or gets an exception. I think that would work, but it could be worth
> extending the client so that an application could reactively discover
> that the topic's quota is exceeded using a listener. Additionally, we
> could disable the send timeouts when the producer enters this "hold"
> state so that messages.
> 
> In case number 2, it probably makes sense to extend the protocol so
> that the broker sends a protocol message indicating the producer
> should stop sending messages. This would be more elegant than
> disconnecting the producer and making it look up the topic again.
> 
> Thanks,
> Michael
> 
> 
> On Fri, Aug 12, 2022 at 5:50 AM PengHui Li <pe...@apache.org> wrote:
> >
> > > The producer fails
> > the pending messages when the policy is producer_exception and the
> > producer does nothing when the policy is producer_request_hold
> >
> > Eventually, it will fail [0] the user's create producer request because of
> > the operation timeout [1].
> >
> > > The primary advantage for this solution is that the broker does not
> > need to hold a producer's messages in memory for some undefined time.
> >
> > Yes, I agree. And changing the protocol will also affect other clients.
> >
> > To optimize, I think we can only let the producer connect to the broker
> > and the broker should tell the producer the backlog is exceeded.
> > The producer can only send one message to test. Only push out more messages
> > after the first message gets the response. Just a rough idea, not for now.
> > If we have too many producers, try to reconnect to the broker again and
> > again.
> > It is also a non-negligible cost.
> >
> > Looks like we need to fix the client-side to make sure users will not get
> > ProducerBlockedQuotaExceededError when creating the producer with
> > producer_hold_request backlog policy. I have tested it locally, the behavior
> > can be confirmed
> >
> > ```
> > org.apache.pulsar.client.api.PulsarClientException$ProducerBlockedQuotaExceededError:
> > {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$TopicBacklogQuotaExceededException:
> > Cannot create producer on topic with backlog quota
> > exceeded","reqId":1841236212888635356, "remote":"localhost/127.0.0.1:64805",
> > "local":"/127.0.0.1:64809"}
> >
> > at
> > org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1052)
> > at
> > org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:88)
> > at
> > org.apache.pulsar.broker.service.BacklogQuotaManagerTest.createProducer(BacklogQuotaManagerTest.java:664)
> > at
> > org.apache.pulsar.broker.service.BacklogQuotaManagerTest.testProducerException(BacklogQuotaManagerTest.java:1091)
> > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> > Method)
> > at
> > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
> > at
> > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.base/java.lang.reflect.Method.invoke(Method.java:568)
> > at
> > org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
> > at org.testng.internal.TestInvoker.invokeMethod(TestInvoker.java:599)
> > at org.testng.internal.TestInvoker.invokeTestMethod(TestInvoker.java:174)
> > at org.testng.internal.MethodRunner.runInSequence(MethodRunner.java:46)
> > at
> > org.testng.internal.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:822)
> > at org.testng.internal.TestInvoker.invokeTestMethods(TestInvoker.java:147)
> > at
> > org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146)
> > at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:128)
> > at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
> > at org.testng.TestRunner.privateRun(TestRunner.java:764)
> > at org.testng.TestRunner.run(TestRunner.java:585)
> > at org.testng.SuiteRunner.runTest(SuiteRunner.java:384)
> > at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:378)
> > at org.testng.SuiteRunner.privateRun(SuiteRunner.java:337)
> > at org.testng.SuiteRunner.run(SuiteRunner.java:286)
> > at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:53)
> > at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:96)
> > at org.testng.TestNG.runSuitesSequentially(TestNG.java:1218)
> > at org.testng.TestNG.runSuitesLocally(TestNG.java:1140)
> > at org.testng.TestNG.runSuites(TestNG.java:1069)
> > at org.testng.TestNG.run(TestNG.java:1037)
> > at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
> > at
> > com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:109)
> > ```
> >
> > Best,
> > Penghui
> >
> > [0]
> > https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1778-L1786
> > [1]
> > https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1773
> >
> >
> > On Fri, Aug 12, 2022 at 1:36 PM Michael Marshall <mm...@apache.org>
> > wrote:
> >
> > > > IMO, we should allow the producer to connect the topic.
> > >
> > > I actually think the current producer_request_hold feature works based
> > > on disconnecting a producer and only letting it connect when the topic
> > > no longer exceeds its quota.
> > >
> > > > It looks like we do not support the `producer_request_hold` semantics.
> > > We just easily use the same behaviour like `producer_exception`. Maybe it's
> > > a missing feature.
> > >
> > > I agree that the only references to the
> > > RetentionPolicy.producer_request_hold enum have to do with disallowing
> > > producer creation or with disconnecting the producer when the backlog
> > > is exceeded [0].
> > >
> > > However, I think the feature does work if we look closer. The
> > > implementation is in the client (at least it is in the Java client).
> > > First, note that the only functional difference between
> > > producer_exception and producer_request_hold comes here [1] where two
> > > different exceptions are sent to the producer. Then, see that the
> > > producer handles these exceptions differently [2]. The producer fails
> > > the pending messages when the policy is producer_exception and the
> > > producer does nothing when the policy is producer_request_hold. In the
> > > second case, the producer will attempt to reconnect to the broker and
> > > will resend the messages that have been "held".
> > >
> > > It seems relevant to point out that the backlog quota's state is only
> > > changed on a 60 second interval by default (see
> > > backlogQuotaCheckIntervalInSeconds) and the default send timeout is 30
> > > seconds. Therefore, many sends will likely timeout on the client side
> > > before the broker updates the topic's state to "writable" and lets the
> > > producer reconnect. To use this feature meaningfully, it might make
> > > sense to increase the send timeout.
> > >
> > > The primary advantage for this solution is that the broker does not
> > > need to hold a producer's messages in memory for some undefined time.
> > >
> > > I just checked, and we do not have this behavior documented in the
> > > pulsar binary protocol spec [3]. We should update the spec to indicate
> > > how this feature is supposed to work, assuming we keep it this way.
> > >
> > > Thanks,
> > > Michael
> > >
> > > [0]
> > > https://github.com/apache/pulsar/blob/d24d82780fd27a98c6cdbee28d756ee7d419495f/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L104-L107
> > > [1]
> > > https://github.com/apache/pulsar/blob/4c6989c4da6c0b18c9b0196630e03daf437cea68/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1383-L1391
> > > [2]
> > > https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1735-L1753
> > > [3] https://pulsar.apache.org/docs/next/developing-binary-protocol/
> > >
> > >
> > > On Thu, Aug 11, 2022 at 6:52 AM Qiang Zhao <ma...@apache.org>
> > > wrote:
> > > >
> > > > Hi, Penghui
> > > >
> > > > I support your opinion.
> > > >
> > > > It looks like we do not support the `producer_request_hold` semantics.
> > > We just easily use the same behaviour like `producer_exception`. Maybe it's
> > > a missing feature.
> > > >
> > > > Best,
> > > > Mattison
> > > >
> > > > On 2022/08/11 05:28:25 PengHui Li wrote:
> > > > > Hi all,
> > > > >
> > > > > Pulsar has a backlog quota policy `producer_request_hold` which will
> > > hold
> > > > > the message
> > > > > publish request. It is very useful for some data sync scenarios. It
> > > looks
> > > > > like the producer
> > > > > waiting for the consumer to process the data.
> > > > >
> > > > > But the new producer is not allowed to connect to the topic after
> > > reaching
> > > > > the max backlog
> > > > > limitation with producer_request_hold policy. The producer will get
> > > > > `ProducerBlockedQuotaExceededError` error. This caused inconvenience to
> > > > > users, they need
> > > > > to have some retry to logic to try to create the producer again until
> > > the
> > > > > consumer acked more
> > > > > messages
> > > > >
> > > > > IMO, we should allow the producer to connect the topic. To keep the
> > > > > behavior consistent with
> > > > > the producer is already connected.
> > > > >
> > > > > WDYT?
> > > > >
> > > > > Best Regards,
> > > > > Penghui
> > > > >
> > >
> 

Re: [DISCUSS] Allow the producer to connect to the topic with producer_request_hold backlog policy

Posted by Michael Marshall <mm...@apache.org>.
Great points, Penghui.

> To optimize, I think we can only let the producer connect to the broker
> and the broker should tell the producer the backlog is exceeded.

In looking at the `CommandProducerSuccess` protobuf message, we
already have the `producer_ready` boolean field. It was added for the
exclusive producer case, and it seems to match this logic exactly,
though the client wouldn't know "why" the producer was not ready. I
think this field meets our needs because the producer just needs to
know that it is connected and should not send messages yet.

> If we have too many producers, try to reconnect to the broker again and
> again. It is also a non-negligible cost.

This is a really important point. The current protocol implementation
leads to unnecessary network traffic, and it will be worse for
topics with many producers. Note that the lookup part of the protocol
introduces additional load on all of the brokers serving these
requests.

> Looks like we need to fix the client-side to make sure users will not get
> ProducerBlockedQuotaExceededError when creating the producer with
> producer_hold_request backlog policy. I have tested it locally, the behavior
> can be confirmed

Thanks for confirming this case. I think it would make sense to update
the behavior on the producer_requests_hold feature so that the future
is incomplete until the producer is ready to produce, just like the
exclusive producer implementation.

Ultimately, there are two cases this feature needs to handle.

1. A new producer connecting to a topic that has exceeded its quota.
This case is trivial because the broker tells the producer it is
connected but it cannot send any messages (i.e. producer_ready=false),
and the client holds the producer's future until the broker sends
producer_ready=true.

2. An existing producer gets disconnected due to an exceeded quota. In
this case, it'd be easy enough for the producer to stop sending
messages, but because the client application already has a reference
to this producer, the application will be able to submit messages
until the client's buffer is full, at which point the send is blocked
or gets an exception. I think that would work, but it could be worth
extending the client so that an application could reactively discover
that the topic's quota is exceeded using a listener. Additionally, we
could disable the send timeouts when the producer enters this "hold"
state so that messages.

In case number 2, it probably makes sense to extend the protocol so
that the broker sends a protocol message indicating the producer
should stop sending messages. This would be more elegant than
disconnecting the producer and making it look up the topic again.

Thanks,
Michael


On Fri, Aug 12, 2022 at 5:50 AM PengHui Li <pe...@apache.org> wrote:
>
> > The producer fails
> the pending messages when the policy is producer_exception and the
> producer does nothing when the policy is producer_request_hold
>
> Eventually, it will fail [0] the user's create producer request because of
> the operation timeout [1].
>
> > The primary advantage for this solution is that the broker does not
> need to hold a producer's messages in memory for some undefined time.
>
> Yes, I agree. And changing the protocol will also affect other clients.
>
> To optimize, I think we can only let the producer connect to the broker
> and the broker should tell the producer the backlog is exceeded.
> The producer can only send one message to test. Only push out more messages
> after the first message gets the response. Just a rough idea, not for now.
> If we have too many producers, try to reconnect to the broker again and
> again.
> It is also a non-negligible cost.
>
> Looks like we need to fix the client-side to make sure users will not get
> ProducerBlockedQuotaExceededError when creating the producer with
> producer_hold_request backlog policy. I have tested it locally, the behavior
> can be confirmed
>
> ```
> org.apache.pulsar.client.api.PulsarClientException$ProducerBlockedQuotaExceededError:
> {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$TopicBacklogQuotaExceededException:
> Cannot create producer on topic with backlog quota
> exceeded","reqId":1841236212888635356, "remote":"localhost/127.0.0.1:64805",
> "local":"/127.0.0.1:64809"}
>
> at
> org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1052)
> at
> org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:88)
> at
> org.apache.pulsar.broker.service.BacklogQuotaManagerTest.createProducer(BacklogQuotaManagerTest.java:664)
> at
> org.apache.pulsar.broker.service.BacklogQuotaManagerTest.testProducerException(BacklogQuotaManagerTest.java:1091)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:568)
> at
> org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
> at org.testng.internal.TestInvoker.invokeMethod(TestInvoker.java:599)
> at org.testng.internal.TestInvoker.invokeTestMethod(TestInvoker.java:174)
> at org.testng.internal.MethodRunner.runInSequence(MethodRunner.java:46)
> at
> org.testng.internal.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:822)
> at org.testng.internal.TestInvoker.invokeTestMethods(TestInvoker.java:147)
> at
> org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146)
> at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:128)
> at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
> at org.testng.TestRunner.privateRun(TestRunner.java:764)
> at org.testng.TestRunner.run(TestRunner.java:585)
> at org.testng.SuiteRunner.runTest(SuiteRunner.java:384)
> at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:378)
> at org.testng.SuiteRunner.privateRun(SuiteRunner.java:337)
> at org.testng.SuiteRunner.run(SuiteRunner.java:286)
> at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:53)
> at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:96)
> at org.testng.TestNG.runSuitesSequentially(TestNG.java:1218)
> at org.testng.TestNG.runSuitesLocally(TestNG.java:1140)
> at org.testng.TestNG.runSuites(TestNG.java:1069)
> at org.testng.TestNG.run(TestNG.java:1037)
> at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
> at
> com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:109)
> ```
>
> Best,
> Penghui
>
> [0]
> https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1778-L1786
> [1]
> https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1773
>
>
> On Fri, Aug 12, 2022 at 1:36 PM Michael Marshall <mm...@apache.org>
> wrote:
>
> > > IMO, we should allow the producer to connect the topic.
> >
> > I actually think the current producer_request_hold feature works based
> > on disconnecting a producer and only letting it connect when the topic
> > no longer exceeds its quota.
> >
> > > It looks like we do not support the `producer_request_hold` semantics.
> > We just easily use the same behaviour like `producer_exception`. Maybe it's
> > a missing feature.
> >
> > I agree that the only references to the
> > RetentionPolicy.producer_request_hold enum have to do with disallowing
> > producer creation or with disconnecting the producer when the backlog
> > is exceeded [0].
> >
> > However, I think the feature does work if we look closer. The
> > implementation is in the client (at least it is in the Java client).
> > First, note that the only functional difference between
> > producer_exception and producer_request_hold comes here [1] where two
> > different exceptions are sent to the producer. Then, see that the
> > producer handles these exceptions differently [2]. The producer fails
> > the pending messages when the policy is producer_exception and the
> > producer does nothing when the policy is producer_request_hold. In the
> > second case, the producer will attempt to reconnect to the broker and
> > will resend the messages that have been "held".
> >
> > It seems relevant to point out that the backlog quota's state is only
> > changed on a 60 second interval by default (see
> > backlogQuotaCheckIntervalInSeconds) and the default send timeout is 30
> > seconds. Therefore, many sends will likely timeout on the client side
> > before the broker updates the topic's state to "writable" and lets the
> > producer reconnect. To use this feature meaningfully, it might make
> > sense to increase the send timeout.
> >
> > The primary advantage for this solution is that the broker does not
> > need to hold a producer's messages in memory for some undefined time.
> >
> > I just checked, and we do not have this behavior documented in the
> > pulsar binary protocol spec [3]. We should update the spec to indicate
> > how this feature is supposed to work, assuming we keep it this way.
> >
> > Thanks,
> > Michael
> >
> > [0]
> > https://github.com/apache/pulsar/blob/d24d82780fd27a98c6cdbee28d756ee7d419495f/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L104-L107
> > [1]
> > https://github.com/apache/pulsar/blob/4c6989c4da6c0b18c9b0196630e03daf437cea68/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1383-L1391
> > [2]
> > https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1735-L1753
> > [3] https://pulsar.apache.org/docs/next/developing-binary-protocol/
> >
> >
> > On Thu, Aug 11, 2022 at 6:52 AM Qiang Zhao <ma...@apache.org>
> > wrote:
> > >
> > > Hi, Penghui
> > >
> > > I support your opinion.
> > >
> > > It looks like we do not support the `producer_request_hold` semantics.
> > We just easily use the same behaviour like `producer_exception`. Maybe it's
> > a missing feature.
> > >
> > > Best,
> > > Mattison
> > >
> > > On 2022/08/11 05:28:25 PengHui Li wrote:
> > > > Hi all,
> > > >
> > > > Pulsar has a backlog quota policy `producer_request_hold` which will
> > hold
> > > > the message
> > > > publish request. It is very useful for some data sync scenarios. It
> > looks
> > > > like the producer
> > > > waiting for the consumer to process the data.
> > > >
> > > > But the new producer is not allowed to connect to the topic after
> > reaching
> > > > the max backlog
> > > > limitation with producer_request_hold policy. The producer will get
> > > > `ProducerBlockedQuotaExceededError` error. This caused inconvenience to
> > > > users, they need
> > > > to have some retry to logic to try to create the producer again until
> > the
> > > > consumer acked more
> > > > messages
> > > >
> > > > IMO, we should allow the producer to connect the topic. To keep the
> > > > behavior consistent with
> > > > the producer is already connected.
> > > >
> > > > WDYT?
> > > >
> > > > Best Regards,
> > > > Penghui
> > > >
> >

Re: [DISCUSS] Allow the producer to connect to the topic with producer_request_hold backlog policy

Posted by PengHui Li <pe...@apache.org>.
> The producer fails
the pending messages when the policy is producer_exception and the
producer does nothing when the policy is producer_request_hold

Eventually, it will fail [0] the user's create producer request because of
the operation timeout [1].

> The primary advantage for this solution is that the broker does not
need to hold a producer's messages in memory for some undefined time.

Yes, I agree. And changing the protocol will also affect other clients.

To optimize, I think we can only let the producer connect to the broker
and the broker should tell the producer the backlog is exceeded.
The producer can only send one message to test. Only push out more messages
after the first message gets the response. Just a rough idea, not for now.
If we have too many producers, try to reconnect to the broker again and
again.
It is also a non-negligible cost.

Looks like we need to fix the client-side to make sure users will not get
ProducerBlockedQuotaExceededError when creating the producer with
producer_hold_request backlog policy. I have tested it locally, the behavior
can be confirmed

```
org.apache.pulsar.client.api.PulsarClientException$ProducerBlockedQuotaExceededError:
{"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$TopicBacklogQuotaExceededException:
Cannot create producer on topic with backlog quota
exceeded","reqId":1841236212888635356, "remote":"localhost/127.0.0.1:64805",
"local":"/127.0.0.1:64809"}

at
org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1052)
at
org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:88)
at
org.apache.pulsar.broker.service.BacklogQuotaManagerTest.createProducer(BacklogQuotaManagerTest.java:664)
at
org.apache.pulsar.broker.service.BacklogQuotaManagerTest.testProducerException(BacklogQuotaManagerTest.java:1091)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at
org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
at org.testng.internal.TestInvoker.invokeMethod(TestInvoker.java:599)
at org.testng.internal.TestInvoker.invokeTestMethod(TestInvoker.java:174)
at org.testng.internal.MethodRunner.runInSequence(MethodRunner.java:46)
at
org.testng.internal.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:822)
at org.testng.internal.TestInvoker.invokeTestMethods(TestInvoker.java:147)
at
org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146)
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:128)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.testng.TestRunner.privateRun(TestRunner.java:764)
at org.testng.TestRunner.run(TestRunner.java:585)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:384)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:378)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:337)
at org.testng.SuiteRunner.run(SuiteRunner.java:286)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:53)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:96)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1218)
at org.testng.TestNG.runSuitesLocally(TestNG.java:1140)
at org.testng.TestNG.runSuites(TestNG.java:1069)
at org.testng.TestNG.run(TestNG.java:1037)
at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
at
com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:109)
```

Best,
Penghui

[0]
https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1778-L1786
[1]
https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1773


On Fri, Aug 12, 2022 at 1:36 PM Michael Marshall <mm...@apache.org>
wrote:

> > IMO, we should allow the producer to connect the topic.
>
> I actually think the current producer_request_hold feature works based
> on disconnecting a producer and only letting it connect when the topic
> no longer exceeds its quota.
>
> > It looks like we do not support the `producer_request_hold` semantics.
> We just easily use the same behaviour like `producer_exception`. Maybe it's
> a missing feature.
>
> I agree that the only references to the
> RetentionPolicy.producer_request_hold enum have to do with disallowing
> producer creation or with disconnecting the producer when the backlog
> is exceeded [0].
>
> However, I think the feature does work if we look closer. The
> implementation is in the client (at least it is in the Java client).
> First, note that the only functional difference between
> producer_exception and producer_request_hold comes here [1] where two
> different exceptions are sent to the producer. Then, see that the
> producer handles these exceptions differently [2]. The producer fails
> the pending messages when the policy is producer_exception and the
> producer does nothing when the policy is producer_request_hold. In the
> second case, the producer will attempt to reconnect to the broker and
> will resend the messages that have been "held".
>
> It seems relevant to point out that the backlog quota's state is only
> changed on a 60 second interval by default (see
> backlogQuotaCheckIntervalInSeconds) and the default send timeout is 30
> seconds. Therefore, many sends will likely timeout on the client side
> before the broker updates the topic's state to "writable" and lets the
> producer reconnect. To use this feature meaningfully, it might make
> sense to increase the send timeout.
>
> The primary advantage for this solution is that the broker does not
> need to hold a producer's messages in memory for some undefined time.
>
> I just checked, and we do not have this behavior documented in the
> pulsar binary protocol spec [3]. We should update the spec to indicate
> how this feature is supposed to work, assuming we keep it this way.
>
> Thanks,
> Michael
>
> [0]
> https://github.com/apache/pulsar/blob/d24d82780fd27a98c6cdbee28d756ee7d419495f/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L104-L107
> [1]
> https://github.com/apache/pulsar/blob/4c6989c4da6c0b18c9b0196630e03daf437cea68/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1383-L1391
> [2]
> https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1735-L1753
> [3] https://pulsar.apache.org/docs/next/developing-binary-protocol/
>
>
> On Thu, Aug 11, 2022 at 6:52 AM Qiang Zhao <ma...@apache.org>
> wrote:
> >
> > Hi, Penghui
> >
> > I support your opinion.
> >
> > It looks like we do not support the `producer_request_hold` semantics.
> We just easily use the same behaviour like `producer_exception`. Maybe it's
> a missing feature.
> >
> > Best,
> > Mattison
> >
> > On 2022/08/11 05:28:25 PengHui Li wrote:
> > > Hi all,
> > >
> > > Pulsar has a backlog quota policy `producer_request_hold` which will
> hold
> > > the message
> > > publish request. It is very useful for some data sync scenarios. It
> looks
> > > like the producer
> > > waiting for the consumer to process the data.
> > >
> > > But the new producer is not allowed to connect to the topic after
> reaching
> > > the max backlog
> > > limitation with producer_request_hold policy. The producer will get
> > > `ProducerBlockedQuotaExceededError` error. This caused inconvenience to
> > > users, they need
> > > to have some retry to logic to try to create the producer again until
> the
> > > consumer acked more
> > > messages
> > >
> > > IMO, we should allow the producer to connect the topic. To keep the
> > > behavior consistent with
> > > the producer is already connected.
> > >
> > > WDYT?
> > >
> > > Best Regards,
> > > Penghui
> > >
>

Re: [DISCUSS] Allow the producer to connect to the topic with producer_request_hold backlog policy

Posted by Michael Marshall <mm...@apache.org>.
> IMO, we should allow the producer to connect the topic.

I actually think the current producer_request_hold feature works based
on disconnecting a producer and only letting it connect when the topic
no longer exceeds its quota.

> It looks like we do not support the `producer_request_hold` semantics. We just easily use the same behaviour like `producer_exception`. Maybe it's a missing feature.

I agree that the only references to the
RetentionPolicy.producer_request_hold enum have to do with disallowing
producer creation or with disconnecting the producer when the backlog
is exceeded [0].

However, I think the feature does work if we look closer. The
implementation is in the client (at least it is in the Java client).
First, note that the only functional difference between
producer_exception and producer_request_hold comes here [1] where two
different exceptions are sent to the producer. Then, see that the
producer handles these exceptions differently [2]. The producer fails
the pending messages when the policy is producer_exception and the
producer does nothing when the policy is producer_request_hold. In the
second case, the producer will attempt to reconnect to the broker and
will resend the messages that have been "held".

It seems relevant to point out that the backlog quota's state is only
changed on a 60 second interval by default (see
backlogQuotaCheckIntervalInSeconds) and the default send timeout is 30
seconds. Therefore, many sends will likely timeout on the client side
before the broker updates the topic's state to "writable" and lets the
producer reconnect. To use this feature meaningfully, it might make
sense to increase the send timeout.

The primary advantage for this solution is that the broker does not
need to hold a producer's messages in memory for some undefined time.

I just checked, and we do not have this behavior documented in the
pulsar binary protocol spec [3]. We should update the spec to indicate
how this feature is supposed to work, assuming we keep it this way.

Thanks,
Michael

[0] https://github.com/apache/pulsar/blob/d24d82780fd27a98c6cdbee28d756ee7d419495f/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L104-L107
[1] https://github.com/apache/pulsar/blob/4c6989c4da6c0b18c9b0196630e03daf437cea68/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1383-L1391
[2] https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1735-L1753
[3] https://pulsar.apache.org/docs/next/developing-binary-protocol/


On Thu, Aug 11, 2022 at 6:52 AM Qiang Zhao <ma...@apache.org> wrote:
>
> Hi, Penghui
>
> I support your opinion.
>
> It looks like we do not support the `producer_request_hold` semantics. We just easily use the same behaviour like `producer_exception`. Maybe it's a missing feature.
>
> Best,
> Mattison
>
> On 2022/08/11 05:28:25 PengHui Li wrote:
> > Hi all,
> >
> > Pulsar has a backlog quota policy `producer_request_hold` which will hold
> > the message
> > publish request. It is very useful for some data sync scenarios. It looks
> > like the producer
> > waiting for the consumer to process the data.
> >
> > But the new producer is not allowed to connect to the topic after reaching
> > the max backlog
> > limitation with producer_request_hold policy. The producer will get
> > `ProducerBlockedQuotaExceededError` error. This caused inconvenience to
> > users, they need
> > to have some retry to logic to try to create the producer again until the
> > consumer acked more
> > messages
> >
> > IMO, we should allow the producer to connect the topic. To keep the
> > behavior consistent with
> > the producer is already connected.
> >
> > WDYT?
> >
> > Best Regards,
> > Penghui
> >

Re: [DISCUSS] Allow the producer to connect to the topic with producer_request_hold backlog policy

Posted by Qiang Zhao <ma...@apache.org>.
Hi, Penghui

I support your opinion. 

It looks like we do not support the `producer_request_hold` semantics. We just easily use the same behaviour like `producer_exception`. Maybe it's a missing feature.

Best,
Mattison

On 2022/08/11 05:28:25 PengHui Li wrote:
> Hi all,
> 
> Pulsar has a backlog quota policy `producer_request_hold` which will hold
> the message
> publish request. It is very useful for some data sync scenarios. It looks
> like the producer
> waiting for the consumer to process the data.
> 
> But the new producer is not allowed to connect to the topic after reaching
> the max backlog
> limitation with producer_request_hold policy. The producer will get
> `ProducerBlockedQuotaExceededError` error. This caused inconvenience to
> users, they need
> to have some retry to logic to try to create the producer again until the
> consumer acked more
> messages
> 
> IMO, we should allow the producer to connect the topic. To keep the
> behavior consistent with
> the producer is already connected.
> 
> WDYT?
> 
> Best Regards,
> Penghui
>