You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Yunze Xu <yz...@streamnative.io.INVALID> on 2021/10/01 03:26:22 UTC

Re: Correct semantics of producer close

You're right that before a CommandCloseProducer request was completed, the
pending messages should be persisted before this close request was completed
in normal cases. It’s guaranteed by broker side.

Then there’s no inconsistency between the implementation and JavaDocs now.

The key point is whether should we flush the messages in batch container. I prefer
keeping the current semantics. But I found the messages in batch container
never failed. We need to fix the problem. For example, here’s my unit test:

```java
    @Test
    public void test() throws Exception {
        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic("topic")
                .batchingMaxMessages(10000)
                .batchingMaxBytes(10000000)
                .batchingMaxPublishDelay(100, TimeUnit.SECONDS)
                .sendTimeout(1, TimeUnit.SECONDS)
                .create();
        final CountDownLatch latch = new CountDownLatch(10);
        final Map<Integer, Throwable> throwableMap = new ConcurrentHashMap<>();
        for (int i = 0; i < 10; i++) {
            final Integer index = i;
            producer.sendAsync("msg-" + i).whenComplete((id, e) -> {
                if (e != null) {
                    throwableMap.put(index, e);
                }
                latch.countDown();
            });
        }
        producer.close();
        latch.await();
        throwableMap.forEach((i, e) -> {
            log.info("Message {} failed with {}", i, e);
        });
    }
```

The test would block forever.

> 2021年10月1日 上午4:22,Michael Marshall <mi...@gmail.com> 写道:
> 
> Following up here. I am pretty sure part of this conversation has been
> based on a misunderstanding of the code. From what I can tell, the
> behavior for `Producer#closeAsync` in the client (mostly) aligns with
> the current Javadocs.
> 
>> The existing implementation of producer close is:
>> 1. Cancel all timers, including send and batch container (`batchMessageContainer`).
>> 2. Complete all pending messages (`pendingMessages`) with `AlreadyCloseException`.
> 
> I agree with 1, but I think 2 is only partially correct. The client
> will only exceptionally complete pending messages if the connection is
> null or not ready, or after the broker responds to the
> `CLOSE_PRODUCER` command or a timeout passes. This behavior seems
> right to me.
> 
> Here is the relevant code:
> https://github.com/apache/pulsar/blob/9d309145f342bc416b8b4663125e1216903a3e83/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L875-L909
> 
> The only remaining question: does close imply flush? If not, we'll
> need update the logic to fail the messages contained in the
> `batchMessageContainer` during close. Otherwise, we'll update the
> logic to call flush before sending the `CLOSE_PRODUCER` command and
> everything should work as documented. In both cases, we should update
> the Javadocs to make the behavior clearer.
> 
> Thanks,
> Michael
> 
> 
> 
> On Thu, Sep 30, 2021 at 11:55 AM Michael Marshall <mi...@gmail.com> wrote:
>> 
>> I have two questions:
>> 
>> 1. Does close imply immediate shutdown?
>> 2. Does close imply flush?
>> 
>> There is not yet consensus on 1, and 2 is only relevant if 1's answer is "no".
>> 
>> Thus far, the conversation has centered on the `Producer#close`
>> method. I'd like to broaden the discussion to include some other
>> methods from the `PulsarClient` interface: `shutdown` and `close`.
>> 
>> The Javadoc for `PulsarClient#shutdown` describes the "shutdown
>> immediately" behavior. It says:
>> 
>>> Release all the resources and close all the producer, consumer and
>>> reader instances without waiting for ongoing operations to complete.
>> 
>> The Javadoc for `PulsarClient#close` describes waiting for
>> pending/in-flight messages to complete before returning. It says:
>> 
>>> This operation will trigger a graceful close of all producer, consumer
>>> and reader instances that this client has currently active. That implies
>>> that close will block and wait until all pending producer send requests
>>> are persisted.
>> 
>> One question that follows from the above: why does the `Producer` not
>> have a `shutdown` method? I think this is because the "immediate
>> shutdown" behavior is not necessary for a single producer. When
>> immediate shutdown semantics are required, the `PulsarClient#shutdown`
>> method is sufficient because it is used when shutting down the whole
>> application. (If this is not correct, perhaps we should add a
>> `shutdown` method to the producer?)
>> 
>> Since immediate shutdown semantics are already available via our
>> client API, I posit that the answer to question 1 is no, `close` does
>> not imply immediate shutdown. At the very least, `close` in the Pulsar
>> Client has not historically implied immediate shutdown.
>> 
>> Additionally, it is relevant to point out that the `Producer#close`
>> method is already sending a `CLOSE_PRODUCER` command and waiting on a
>> response back from the broker. The broker's producer close method has
>> the following Javadoc:
>> 
>>> Close the producer immediately if: a. the connection is dropped
>>> b. it's a graceful close and no pending publish acks are left
>>> else wait for pending publish acks
>> 
>> Since we're already waiting on the broker to respond to the producer's
>> `CLOSE_PRODUCER` request, I see no reason to fail pending/in-flight
>> messages immediately, especially because we should get a response back
>> for those messages before getting the `SUCCESS` response from the
>> broker since the responses will come on the same TCP connection. We
>> could even simplify the close logic so that when the `CLOSE_PRODUCER`
>> request completes (either successfully or because of a failure), we
>> fail all remaining pending message futures.
>> 
>> Ultimately, we need to decide whether to update the implementation to
>> match the existing Javadocs, or to update the Javadocs to indicate
>> that `close` means an immediate shutdown, which includes failing all
>> outstanding message futures immediately. My vote is to make the
>> implementation align with the Javadocs.
>> 
>> Regarding question 2, I prefer that `close` implies flush because it
>> is only a single (batched) message being flushed. If we do flush this
>> message, we'll need to make sure that the message is sent before
>> the `CLOSE_PRODUCER` command is sent.
>> 
>> Thanks,
>> Michael
>> 
>> 
>> On Wed, Sep 29, 2021 at 7:04 AM Enrico Olivelli <eo...@gmail.com> wrote:
>>> 
>>> I agree that we must ensure that every pending callback must be completed
>>> eventually (timeout or another error is not a problem),
>>> because we cannot let the client application hang forever.
>>> I believe that the application can perform a flush() explicitly and also
>>> wait for every callback to be executed if that is the requirement.
>>> 
>>> Usually you call close() when:
>>> 1. you have a serious problem: you already know that there is a hard error,
>>> and you want to close the Producer or the Application and possibly start a
>>> new one to recover
>>> 2. you are shutting down your application or component: you have control
>>> over the callbacks, so you can wait for them to complete
>>> 
>>> So case 2. can be covered by the application. We have to support case 1:
>>> fail fast and close (no need for flush()) .
>>> 
>>> In my experience trying to implement "graceful stops" adds only complexity
>>> and false hopes to the users.
>>> 
>>> Enrico
>>> 
>>> 
>>> 
>>> Il giorno mer 29 set 2021 alle ore 13:58 Nirvana <15...@qq.com.invalid>
>>> ha scritto:
>>> 
>>>> I agree to try to ensure ”at most once“ when closing。
>>>> 
>>>> 
>>>> &gt; That would still get controlled by send timeout, after that, the send
>>>> will fail and close should proceed.
>>>> This sounds more in line with “at most once”。
>>>> 
>>>> 
>>>> ------------------&nbsp;原始邮件&nbsp;------------------
>>>> 发件人:
>>>>                                                  "dev"
>>>>                                                                <
>>>> matteo.merli@gmail.com&gt;;
>>>> 发送时间:&nbsp;2021年9月29日(星期三) 下午3:55
>>>> 收件人:&nbsp;"Dev"<dev@pulsar.apache.org&gt;;
>>>> 
>>>> 主题:&nbsp;Re: Correct semantics of producer close
>>>> 
>>>> 
>>>> 
>>>> &gt; but equally they might be
>>>> &gt; surprised when closeAsync doesn't complete because the pending
>>>> messages
>>>> &gt; can't be cleared
>>>> 
>>>> That would still get controlled by send timeout, after that, the send
>>>> will fail and close should proceed.
>>>> 
>>>> --
>>>> Matteo Merli
>>>> <matteo.merli@gmail.com&gt;
>>>> 
>>>> On Wed, Sep 29, 2021 at 12:52 AM Jack Vanlightly
>>>> <jvanlightly@splunk.com.invalid&gt; wrote:
>>>> &gt;
>>>> &gt; I can see both sides of the argument regarding whether to flush
>>>> pending
>>>> &gt; messages or not. But I think what is definitely in the contract is
>>>> not to
>>>> &gt; discard any callbacks causing user code to block forever. No matter
>>>> what,
>>>> &gt; we must always call the callbacks.
>>>> &gt;
>>>> &gt; Personally, I am in favour of a close operation not flushing pending
>>>> &gt; messages (and I define pending here as any message that has a
>>>> callback).
>>>> &gt; The reason is that if we wait for all pending messages to be sent
>>>> then we
>>>> &gt; now face a number of edge cases that could cause the close operation
>>>> to
>>>> &gt; take a very long time to complete. What if the user code really just
>>>> needs
>>>> &gt; to close the producer right now? If we amend the documentation to
>>>> make it
>>>> &gt; clear that close does not flush pending messages then the user is now
>>>> able
>>>> &gt; to explicitly craft the behaviour they need. If they want all messages
>>>> &gt; flushed first then chaing flushAsync-&gt;closeAsync else just
>>>> closeAsync.
>>>> &gt;
>>>> &gt; Unfortunately I think user expectation, regardless of the current
>>>> javadoc,
>>>> &gt; is that close would flush everything and in an ideal world it would.
>>>> We
>>>> &gt; have the Principle of Least Surprise but we also have Safe By Default.
>>>> &gt; Users might be surprised that when calling closeAsync, a load of their
>>>> &gt; pending messages get ConnectionAlreadyClosed, but equally they might
>>>> be
>>>> &gt; surprised when closeAsync doesn't complete because the pending
>>>> messages
>>>> &gt; can't be cleared. Failing pending messages is the safer option. User
>>>> code
>>>> &gt; must handle failure responses and cannot claim data loss with a
>>>> &gt; non-positive response. But if they can't close a producer, that could
>>>> &gt; result in a wider impact on their system, not to mention more issues
>>>> &gt; created in GitHub.
>>>> &gt;
>>>> &gt; Jack
>>>> &gt;
>>>> &gt; On Wed, Sep 29, 2021 at 7:05 AM Joe F <joefrancisk@gmail.com&gt;
>>>> wrote:
>>>> &gt;
>>>> &gt; &gt; [ External sender. Exercise caution. ]
>>>> &gt; &gt;
>>>> &gt; &gt; &gt;I don't think that implementing `closeAsync` with graceful
>>>> shutdown
>>>> &gt; &gt; logic implies a guarantee of message publishing. Rather, it
>>>> guarantees
>>>> &gt; &gt; that failures will be the result of a real exception or a
>>>> timeout.
>>>> &gt; &gt;
>>>> &gt; &gt; I think that's beside the point.&nbsp;&nbsp;&nbsp;&nbsp; There
>>>> is no definition of "real"
>>>> &gt; &gt; exceptions.&nbsp;&nbsp; At that point the app is publishing on a
>>>> best effort basis,
>>>> &gt; &gt; and there are no guarantees anywhere in client or server.
>>>> &gt; &gt;
>>>> &gt; &gt; There is no concept&nbsp; of&nbsp; "maybe published". OR
>>>> &gt; &gt; "published-if-no_real_errors".&nbsp; What does that even
>>>> mean?&nbsp; That is only a
>>>> &gt; &gt; can of worms which is going to add to developer confusion and
>>>> lead to
>>>> &gt; &gt; Pulsar users finding in the worst possible way that something
>>>> got lost
>>>> &gt; &gt; because it never got published.&nbsp; It's a poor experience
>>>> when you find it.
>>>> &gt; &gt; I have a real life experience where a user used async APIs (in a
>>>> lambda),
>>>> &gt; &gt; which hummed along fine.&nbsp; One day much later, the cloud had
>>>> a hitch, and
>>>> &gt; &gt; they discovered a message was&nbsp; not delivered.
>>>> &gt; &gt;
>>>> &gt; &gt; I am more concerned about developers discovering at the worst
>>>> possible time
>>>> &gt; &gt; that&nbsp; ""published-if-no_real_errors"&nbsp; is a concept.
>>>> &gt; &gt;
>>>> &gt; &gt; My suggestion is to make this simple for developers.
>>>> &gt; &gt;
>>>> &gt; &gt; ----The sync/async nature of the close() [ or any other API, for
>>>> that
>>>> &gt; &gt; matter ]&nbsp; is completely orthogonal to the API semantics,
>>>> and is just a
>>>> &gt; &gt; programmatic choice to deal with&nbsp; how resources are managed
>>>> within the
>>>> &gt; &gt; program. That's not material here.---
>>>> &gt; &gt;
>>>> &gt; &gt; A close() is an action that is shutting down the producer right
>>>> now, not
>>>> &gt; &gt; even waiting for any acks of inflight messages. A willingness to
>>>> lose
>>>> &gt; &gt; pending/inflight messages is explicit in that call.&nbsp; The
>>>> producer will&nbsp; not
>>>> &gt; &gt; be around to deal with errors or to retry failed messages once
>>>> close() is
>>>> &gt; &gt; invoked.
>>>> &gt; &gt;
>>>> &gt; &gt; On the contrary, if the client does not want to deal with
>>>> message loss,
>>>> &gt; &gt; then flush(), stick around to gather the acks, deal with errors
>>>> and retries
>>>> &gt; &gt; etc and then do close() . Then close() will be just a resource
>>>> management
>>>> &gt; &gt; action on the client.
>>>> &gt; &gt;
>>>> &gt; &gt; So update the documentation to reflect that. ---&gt; if close()
>>>> is called on a
>>>> &gt; &gt; producer with messages pending acks, those messages are left
>>>> indoubt. Avoid
>>>> &gt; &gt; all mention of flushes, best effort etc.&nbsp; Users must buy
>>>> into&nbsp; uncertainty,
>>>> &gt; &gt; without any qualifications.
>>>> &gt; &gt;
>>>> &gt; &gt; I would at all costs avoid using the term "graceful"
>>>> anywhere.&nbsp; That word
>>>> &gt; &gt; has specific semantics associated with it in the systems/storage
>>>> domain ,
>>>> &gt; &gt; and what is being proposed here is nothing like that.
>>>> &gt; &gt;
>>>> &gt; &gt; -j
>>>> &gt; &gt;
>>>> &gt; &gt;
>>>> &gt; &gt; On Tue, Sep 28, 2021 at 7:05 PM Yunze Xu
>>>> <yzxu@streamnative.io.invalid&gt;
>>>> &gt; &gt; wrote:
>>>> &gt; &gt;
>>>> &gt; &gt; &gt; It’s a good point that
>>>> `ProducerImpl#failPendingBatchMessages` treats
>>>> &gt; &gt; &gt; messages in batch container also as pending messages.
>>>> &gt; &gt; &gt;
>>>> &gt; &gt; &gt; I agree with your definition of "graceful close”. It’s more
>>>> like a “at
>>>> &gt; &gt; &gt; most once”
>>>> &gt; &gt; &gt; semantics, like the original JavaDoc said
>>>> &gt; &gt; &gt;
>>>> &gt; &gt; &gt; &gt; pending writes will not be retried
>>>> &gt; &gt; &gt;
>>>> &gt; &gt; &gt; Thanks,
>>>> &gt; &gt; &gt; Yunze
>>>> &gt; &gt; &gt;
>>>> &gt; &gt; &gt; &gt; 2021年9月29日 上午5:24,Michael Marshall <
>>>> mikemarsh17@gmail.com&gt; 写道:
>>>> &gt; &gt; &gt; &gt;
>>>> &gt; &gt; &gt; &gt; Thanks for bringing this thread to the mailing list,
>>>> Yunze.
>>>> &gt; &gt; &gt; &gt;
>>>> &gt; &gt; &gt; &gt; I think the right change is to update the `closeAsync`
>>>> method to first
>>>> &gt; &gt; &gt; &gt; flush `batchMessageContainer` and to then
>>>> asynchronously wait for the
>>>> &gt; &gt; &gt; &gt; `pendingMessages` queue to drain. We could add a new
>>>> timeout or rely
>>>> &gt; &gt; &gt; &gt; on the already implemented `sendTimeout` config to put
>>>> an upper time
>>>> &gt; &gt; &gt; &gt; limit on `closeAsync`. My reasoning as well as
>>>> responses to Joe and
>>>> &gt; &gt; &gt; &gt; Yunze follow:
>>>> &gt; &gt; &gt; &gt;
>>>> &gt; &gt; &gt; &gt;&gt; we need to define the behavior for how to process
>>>> `pendingMessages`
>>>> &gt; &gt; &gt; &gt;&gt; and `batchMessageContainer` when producer call
>>>> `closeAsync`.
>>>> &gt; &gt; &gt; &gt;
>>>> &gt; &gt; &gt; &gt; Yes, this is exactly the clarification required, and I
>>>> agree that the
>>>> &gt; &gt; &gt; &gt; Javadoc is ambiguous and that the implementation
>>>> doesn't align with
>>>> &gt; &gt; &gt; &gt; the Javadoc.
>>>> &gt; &gt; &gt; &gt;
>>>> &gt; &gt; &gt; &gt; If we view the Javadoc as binding, then the
>>>> fundamental question is
>>>> &gt; &gt; &gt; &gt; what messages are "pending". The `pendingMessages`
>>>> seem pretty easy to
>>>> &gt; &gt; &gt; &gt; classify as "pending" given that they are already in
>>>> flight on the
>>>> &gt; &gt; &gt; &gt; network.
>>>> &gt; &gt; &gt; &gt;
>>>> &gt; &gt; &gt; &gt; I also consider `batchMessageContainer` to be
>>>> "pending" because a
>>>> &gt; &gt; &gt; &gt; client application already has callbacks for the
>>>> messages in this
>>>> &gt; &gt; &gt; &gt; container. These callbacks are expected to complete
>>>> when the batch
>>>> &gt; &gt; &gt; &gt; message delivery completes. Since the client
>>>> application already has a
>>>> &gt; &gt; &gt; &gt; reference to a callback, it isn't a problem that the
>>>> producer
>>>> &gt; &gt; &gt; &gt; implementation initiates the flush logic. (Note that
>>>> the current
>>>> &gt; &gt; &gt; &gt; design fails the `pendingMessages` but does not fail
>>>> the
>>>> &gt; &gt; &gt; &gt; `batchMessageContainer` when `closeAsync` is called,
>>>> so the callbacks
>>>> &gt; &gt; &gt; &gt; for that container are currently left incomplete
>>>> forever if the client
>>>> &gt; &gt; &gt; &gt; is closed with an unsent batch. We will need to
>>>> address this design in
>>>> &gt; &gt; &gt; &gt; the work that comes from this discussion.)
>>>> &gt; &gt; &gt; &gt;
>>>> &gt; &gt; &gt; &gt; Further, the `ProducerImpl#failPendingMessages` method
>>>> includes logic
>>>> &gt; &gt; &gt; &gt; to call `ProducerImpl#failPendingBatchMessages`, which
>>>> implies that
>>>> &gt; &gt; &gt; &gt; these batched, but not sent, messages have been
>>>> historically
>>>> &gt; &gt; &gt; &gt; considered "pending".
>>>> &gt; &gt; &gt; &gt;
>>>> &gt; &gt; &gt; &gt; If we view the Javadoc as non-binding, I think my
>>>> guiding influence
>>>> &gt; &gt; &gt; &gt; for the new design would be that the `closeAsync`
>>>> method should result
>>>> &gt; &gt; &gt; &gt; in a "graceful" shutdown of the client.
>>>> &gt; &gt; &gt; &gt;
>>>> &gt; &gt; &gt; &gt;&gt; What exactly does "graceful" convey here?
>>>> &gt; &gt; &gt; &gt;
>>>> &gt; &gt; &gt; &gt; This is a great question, and will likely drive the
>>>> design here. I
>>>> &gt; &gt; &gt; &gt; view graceful to mean that the producer attempts to
>>>> avoid artificial
>>>> &gt; &gt; &gt; &gt; failures. That means trying to drain the queue instead
>>>> of
>>>> &gt; &gt; &gt; &gt; automatically failing all of the queue's callbacks.
>>>> The tradeoff is
>>>> &gt; &gt; &gt; &gt; that closing the producer takes longer. This reasoning
>>>> would justify
>>>> &gt; &gt; &gt; &gt; my claim that we should first flush the
>>>> `batchMessageContainer`
>>>> &gt; &gt; &gt; &gt; instead of failing the batch without any effort at
>>>> delivery, as that
>>>> &gt; &gt; &gt; &gt; would be artificial.
>>>> &gt; &gt; &gt; &gt;
>>>> &gt; &gt; &gt; &gt;&gt; There is no guarantee that either case will ensure
>>>> the message
>>>> &gt; &gt; &gt; &gt;&gt; is published.
>>>> &gt; &gt; &gt; &gt;
>>>> &gt; &gt; &gt; &gt; I don't think that implementing `closeAsync` with
>>>> graceful shutdown
>>>> &gt; &gt; &gt; &gt; logic implies a guarantee of message publishing.
>>>> Rather, it guarantees
>>>> &gt; &gt; &gt; &gt; that failures will be the result of a real exception
>>>> or a timeout.
>>>> &gt; &gt; &gt; &gt; Since calling `closeAsync` prevents additional
>>>> messages from
>>>> &gt; &gt; &gt; &gt; delivering, users leveraging this functionality might
>>>> be operating
>>>> &gt; &gt; &gt; &gt; with "at most once" delivery semantics where they'd
>>>> prefer to deliver
>>>> &gt; &gt; &gt; &gt; the messages if possible, but they aren't going to
>>>> delay application
>>>> &gt; &gt; &gt; &gt; shutdown indefinitely to deliver its last messages. If
>>>> users need
>>>> &gt; &gt; &gt; &gt; stronger guarantees about whether their messages are
>>>> delivered, they
>>>> &gt; &gt; &gt; &gt; are probably already using the flush methods to ensure
>>>> that the
>>>> &gt; &gt; &gt; &gt; producer's queues are empty before calling
>>>> `closeAsync`.
>>>> &gt; &gt; &gt; &gt;
>>>> &gt; &gt; &gt; &gt; I also agree that in all of these cases, we're
>>>> assuming that users are
>>>> &gt; &gt; &gt; &gt; capturing references to the async callbacks and then
>>>> making business
>>>> &gt; &gt; &gt; &gt; logic decisions based on the results of those
>>>> callbacks.
>>>> &gt; &gt; &gt; &gt;
>>>> &gt; &gt; &gt; &gt; Thanks,
>>>> &gt; &gt; &gt; &gt; Michael
>>>> &gt; &gt; &gt; &gt;
>>>> &gt; &gt; &gt; &gt; On Tue, Sep 28, 2021 at 4:58 AM Yunze Xu
>>>> <yzxu@streamnative.io.invalid
>>>> &gt; &gt; &gt;
>>>> &gt; &gt; &gt; wrote:
>>>> &gt; &gt; &gt; &gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt; I can’t agree more, just like what I’ve said in PR
>>>> 12195:
>>>> &gt; &gt; &gt; &gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt;&gt; At any case, when you choose `sendAsync`, you
>>>> should always make use
>>>> &gt; &gt; &gt; of the returned future to confirm the result of all
>>>> messages. In Kafka,
>>>> &gt; &gt; &gt; it's the send callback.
>>>> &gt; &gt; &gt; &gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt; But I found many users are confused about the
>>>> current behavior,
>>>> &gt; &gt; &gt; especially
>>>> &gt; &gt; &gt; &gt;&gt; those are used to Kafka’s close semantics. They
>>>> might expect a simple
>>>> &gt; &gt; &gt; try
>>>> &gt; &gt; &gt; &gt;&gt; to flush existing messages, which works at a
>>>> simple test environment,
>>>> &gt; &gt; &gt; even
>>>> &gt; &gt; &gt; &gt;&gt; there's no guarantee for exception cases.
>>>> &gt; &gt; &gt; &gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt;&gt; 2021年9月28日 下午4:37,Joe F <joefrancisk@gmail.com&gt;
>>>> 写道:
>>>> &gt; &gt; &gt; &gt;&gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt;&gt; Clients should not depend on any of this
>>>> behaviour, since the broker
>>>> &gt; &gt; &gt; is at
>>>> &gt; &gt; &gt; &gt;&gt;&gt; the other end of an unreliable&nbsp; network
>>>> connection. The
>>>> &gt; &gt; &gt; &gt;&gt;&gt; semantic differences are kind of meaningless
>>>> from a usability point,
>>>> &gt; &gt; &gt; since
>>>> &gt; &gt; &gt; &gt;&gt;&gt; flushing on close =/= published.&nbsp; What
>>>> exactly does "graceful" convey
>>>> &gt; &gt; &gt; &gt;&gt;&gt; here?&nbsp; Flush the&nbsp; buffer on the
>>>> client end and hope it makes it to
>>>> &gt; &gt; the
>>>> &gt; &gt; &gt; &gt;&gt;&gt; server.
>>>> &gt; &gt; &gt; &gt;&gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt;&gt; Is there a&nbsp; difference whether you
>>>> flush(or process) pending messages
>>>> &gt; &gt; &gt; or
>>>> &gt; &gt; &gt; &gt;&gt;&gt; not? There is no guarantee that either case
>>>> will ensure the message
>>>> &gt; &gt; is
>>>> &gt; &gt; &gt; &gt;&gt;&gt; published.
>>>> &gt; &gt; &gt; &gt;&gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt;&gt; The only way to ensure that messages are
>>>> published is to wait for the
>>>> &gt; &gt; &gt; ack.
>>>> &gt; &gt; &gt; &gt;&gt;&gt; The correct model should be to wait for return
>>>> on the blocking API,
>>>> &gt; &gt; or
>>>> &gt; &gt; &gt; wait
>>>> &gt; &gt; &gt; &gt;&gt;&gt; for future completion of the async API, then
>>>> handle any publish
>>>> &gt; &gt; errors
>>>> &gt; &gt; &gt; and
>>>> &gt; &gt; &gt; &gt;&gt;&gt; then only close the producer.
>>>> &gt; &gt; &gt; &gt;&gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt;&gt; On Mon, Sep 27, 2021 at 8:50 PM Yunze Xu
>>>> &gt; &gt; <yzxu@streamnative.io.invalid
>>>> &gt; &gt; &gt; &gt;
>>>> &gt; &gt; &gt; &gt;&gt;&gt; wrote:
>>>> &gt; &gt; &gt; &gt;&gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Hi all,
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Recently I found a PR (
>>>> https://github.com/apache/pulsar/pull/12195
>>>> &gt <https://github.com/apache/pulsar/pull/12195&gt>; &gt; <
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
>>>> https://github.com/apache/pulsar/pull/12195&gt;) that
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; modifies the existing semantics of
>>>> producer close. There're already
>>>> &gt; &gt; &gt; some
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; communications in this PR, but I think
>>>> it's better to start a
>>>> &gt; &gt; &gt; discussion
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; here
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; to let more know.
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; The existing implementation of producer
>>>> close is:
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; 1. Cancel all timers, including send and
>>>> batch container
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; (`batchMessageContainer`).
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; 2. Complete all pending messages
>>>> (`pendingMessages`) with
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; `AlreadyCloseException`.
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; See `ProducerImpl#closeAsync` for details.
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; But the JavaDoc of `Producer#closeAsync`
>>>> is:
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;&gt; No more writes will be accepted from
>>>> this producer. Waits until all
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; pending write request are persisted.
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Anyway, the document and implementation
>>>> are inconsistent. But
>>>> &gt; &gt; &gt; specifically,
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; we need to define the behavior for how to
>>>> process `pendingMessages`
>>>> &gt; &gt; &gt; and
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; `batchMessageContainer` when producer call
>>>> `closeAsync`.
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; 1. batchMessageContainer: contains the
>>>> buffered single messages
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; (`Message<T&gt;`).
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; 2. pendingMessages: all inflight messages
>>>> (`OpSendMsg`) in network.
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; IMO, from the JavaDoc, only
>>>> `pendingMessages` should be processed
>>>> &gt; &gt; and
>>>> &gt; &gt; &gt; the
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; messages in `batchMessageContainer` should
>>>> be discarded.
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Since other clients might have already
>>>> implemented the similar
>>>> &gt; &gt; &gt; semantics of
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Java clients. If we changed the semantics
>>>> now, the behaviors among
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; different
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; clients might be inconsistent.
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Should we add a configuration to support
>>>> graceful close to follow
>>>> &gt; &gt; the
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; docs? Or
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; just change the current behavior?
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Thanks,
>>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Yunze
>>>> &gt; &gt; &gt; &gt;&gt;
>>>> &gt; &gt; &gt;
>>>> &gt; &gt; &gt;
>>>> &gt; &gt;


Re: Correct semantics of producer close

Posted by Joe F <jo...@gmail.com>.
I am with Enrrico on this, and for more or less the same reasons.

 My vote is to update the doc to say that on  close(), there are no
guarantees on pending messages.

There is no reasonable basis for any user  to claim that messages are not
getting flushed, because there are no guarantees that the flush  will
result in successful publishing.  User expectations and reality are
different, even with today's implementation.

if someone really requires a flush within close(), that's already
possible by doing a flush  before the close.

The best thing to do is to update the close()  documentation. (1) Remove
mention of flushing from the javadoc, and (2) add that close does not
guarantee that outstanding messages are published successfully.






On Thu, Sep 30, 2021 at 9:52 PM Michael Marshall <mi...@gmail.com>
wrote:

> Thanks for your analysis, Yunze. I identified above that the messages
> in the batch container were not getting completed correctly, so I put
> together a PR to fix the problematic behavior. This PR will be valid
> regardless of our decision to add flush logic to the close method.
>
> Here is the PR: https://github.com/apache/pulsar/pull/12259.
>
> Thanks,
> Michael
>
>
> On Thu, Sep 30, 2021 at 10:27 PM Yunze Xu <yz...@streamnative.io.invalid>
> wrote:
> >
> > You're right that before a CommandCloseProducer request was completed,
> the
> > pending messages should be persisted before this close request was
> completed
> > in normal cases. It’s guaranteed by broker side.
> >
> > Then there’s no inconsistency between the implementation and JavaDocs
> now.
> >
> > The key point is whether should we flush the messages in batch
> container. I prefer
> > keeping the current semantics. But I found the messages in batch
> container
> > never failed. We need to fix the problem. For example, here’s my unit
> test:
> >
> > ```java
> >     @Test
> >     public void test() throws Exception {
> >         final Producer<String> producer =
> pulsarClient.newProducer(Schema.STRING)
> >                 .topic("topic")
> >                 .batchingMaxMessages(10000)
> >                 .batchingMaxBytes(10000000)
> >                 .batchingMaxPublishDelay(100, TimeUnit.SECONDS)
> >                 .sendTimeout(1, TimeUnit.SECONDS)
> >                 .create();
> >         final CountDownLatch latch = new CountDownLatch(10);
> >         final Map<Integer, Throwable> throwableMap = new
> ConcurrentHashMap<>();
> >         for (int i = 0; i < 10; i++) {
> >             final Integer index = i;
> >             producer.sendAsync("msg-" + i).whenComplete((id, e) -> {
> >                 if (e != null) {
> >                     throwableMap.put(index, e);
> >                 }
> >                 latch.countDown();
> >             });
> >         }
> >         producer.close();
> >         latch.await();
> >         throwableMap.forEach((i, e) -> {
> >             log.info("Message {} failed with {}", i, e);
> >         });
> >     }
> > ```
> >
> > The test would block forever.
> >
> > > 2021年10月1日 上午4:22,Michael Marshall <mi...@gmail.com> 写道:
> > >
> > > Following up here. I am pretty sure part of this conversation has been
> > > based on a misunderstanding of the code. From what I can tell, the
> > > behavior for `Producer#closeAsync` in the client (mostly) aligns with
> > > the current Javadocs.
> > >
> > >> The existing implementation of producer close is:
> > >> 1. Cancel all timers, including send and batch container
> (`batchMessageContainer`).
> > >> 2. Complete all pending messages (`pendingMessages`) with
> `AlreadyCloseException`.
> > >
> > > I agree with 1, but I think 2 is only partially correct. The client
> > > will only exceptionally complete pending messages if the connection is
> > > null or not ready, or after the broker responds to the
> > > `CLOSE_PRODUCER` command or a timeout passes. This behavior seems
> > > right to me.
> > >
> > > Here is the relevant code:
> > >
> https://github.com/apache/pulsar/blob/9d309145f342bc416b8b4663125e1216903a3e83/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L875-L909
> > >
> > > The only remaining question: does close imply flush? If not, we'll
> > > need update the logic to fail the messages contained in the
> > > `batchMessageContainer` during close. Otherwise, we'll update the
> > > logic to call flush before sending the `CLOSE_PRODUCER` command and
> > > everything should work as documented. In both cases, we should update
> > > the Javadocs to make the behavior clearer.
> > >
> > > Thanks,
> > > Michael
> > >
> > >
> > >
> > > On Thu, Sep 30, 2021 at 11:55 AM Michael Marshall <
> mikemarsh17@gmail.com> wrote:
> > >>
> > >> I have two questions:
> > >>
> > >> 1. Does close imply immediate shutdown?
> > >> 2. Does close imply flush?
> > >>
> > >> There is not yet consensus on 1, and 2 is only relevant if 1's answer
> is "no".
> > >>
> > >> Thus far, the conversation has centered on the `Producer#close`
> > >> method. I'd like to broaden the discussion to include some other
> > >> methods from the `PulsarClient` interface: `shutdown` and `close`.
> > >>
> > >> The Javadoc for `PulsarClient#shutdown` describes the "shutdown
> > >> immediately" behavior. It says:
> > >>
> > >>> Release all the resources and close all the producer, consumer and
> > >>> reader instances without waiting for ongoing operations to complete.
> > >>
> > >> The Javadoc for `PulsarClient#close` describes waiting for
> > >> pending/in-flight messages to complete before returning. It says:
> > >>
> > >>> This operation will trigger a graceful close of all producer,
> consumer
> > >>> and reader instances that this client has currently active. That
> implies
> > >>> that close will block and wait until all pending producer send
> requests
> > >>> are persisted.
> > >>
> > >> One question that follows from the above: why does the `Producer` not
> > >> have a `shutdown` method? I think this is because the "immediate
> > >> shutdown" behavior is not necessary for a single producer. When
> > >> immediate shutdown semantics are required, the `PulsarClient#shutdown`
> > >> method is sufficient because it is used when shutting down the whole
> > >> application. (If this is not correct, perhaps we should add a
> > >> `shutdown` method to the producer?)
> > >>
> > >> Since immediate shutdown semantics are already available via our
> > >> client API, I posit that the answer to question 1 is no, `close` does
> > >> not imply immediate shutdown. At the very least, `close` in the Pulsar
> > >> Client has not historically implied immediate shutdown.
> > >>
> > >> Additionally, it is relevant to point out that the `Producer#close`
> > >> method is already sending a `CLOSE_PRODUCER` command and waiting on a
> > >> response back from the broker. The broker's producer close method has
> > >> the following Javadoc:
> > >>
> > >>> Close the producer immediately if: a. the connection is dropped
> > >>> b. it's a graceful close and no pending publish acks are left
> > >>> else wait for pending publish acks
> > >>
> > >> Since we're already waiting on the broker to respond to the producer's
> > >> `CLOSE_PRODUCER` request, I see no reason to fail pending/in-flight
> > >> messages immediately, especially because we should get a response back
> > >> for those messages before getting the `SUCCESS` response from the
> > >> broker since the responses will come on the same TCP connection. We
> > >> could even simplify the close logic so that when the `CLOSE_PRODUCER`
> > >> request completes (either successfully or because of a failure), we
> > >> fail all remaining pending message futures.
> > >>
> > >> Ultimately, we need to decide whether to update the implementation to
> > >> match the existing Javadocs, or to update the Javadocs to indicate
> > >> that `close` means an immediate shutdown, which includes failing all
> > >> outstanding message futures immediately. My vote is to make the
> > >> implementation align with the Javadocs.
> > >>
> > >> Regarding question 2, I prefer that `close` implies flush because it
> > >> is only a single (batched) message being flushed. If we do flush this
> > >> message, we'll need to make sure that the message is sent before
> > >> the `CLOSE_PRODUCER` command is sent.
> > >>
> > >> Thanks,
> > >> Michael
> > >>
> > >>
> > >> On Wed, Sep 29, 2021 at 7:04 AM Enrico Olivelli <eo...@gmail.com>
> wrote:
> > >>>
> > >>> I agree that we must ensure that every pending callback must be
> completed
> > >>> eventually (timeout or another error is not a problem),
> > >>> because we cannot let the client application hang forever.
> > >>> I believe that the application can perform a flush() explicitly and
> also
> > >>> wait for every callback to be executed if that is the requirement.
> > >>>
> > >>> Usually you call close() when:
> > >>> 1. you have a serious problem: you already know that there is a hard
> error,
> > >>> and you want to close the Producer or the Application and possibly
> start a
> > >>> new one to recover
> > >>> 2. you are shutting down your application or component: you have
> control
> > >>> over the callbacks, so you can wait for them to complete
> > >>>
> > >>> So case 2. can be covered by the application. We have to support
> case 1:
> > >>> fail fast and close (no need for flush()) .
> > >>>
> > >>> In my experience trying to implement "graceful stops" adds only
> complexity
> > >>> and false hopes to the users.
> > >>>
> > >>> Enrico
> > >>>
> > >>>
> > >>>
> > >>> Il giorno mer 29 set 2021 alle ore 13:58 Nirvana
> <15...@qq.com.invalid>
> > >>> ha scritto:
> > >>>
> > >>>> I agree to try to ensure ”at most once“ when closing。
> > >>>>
> > >>>>
> > >>>> &gt; That would still get controlled by send timeout, after that,
> the send
> > >>>> will fail and close should proceed.
> > >>>> This sounds more in line with “at most once”。
> > >>>>
> > >>>>
> > >>>> ------------------&nbsp;原始邮件&nbsp;------------------
> > >>>> 发件人:
> > >>>>                                                  "dev"
> > >>>>                                                                <
> > >>>> matteo.merli@gmail.com&gt;;
> > >>>> 发送时间:&nbsp;2021年9月29日(星期三) 下午3:55
> > >>>> 收件人:&nbsp;"Dev"<dev@pulsar.apache.org&gt;;
> > >>>>
> > >>>> 主题:&nbsp;Re: Correct semantics of producer close
> > >>>>
> > >>>>
> > >>>>
> > >>>> &gt; but equally they might be
> > >>>> &gt; surprised when closeAsync doesn't complete because the pending
> > >>>> messages
> > >>>> &gt; can't be cleared
> > >>>>
> > >>>> That would still get controlled by send timeout, after that, the
> send
> > >>>> will fail and close should proceed.
> > >>>>
> > >>>> --
> > >>>> Matteo Merli
> > >>>> <matteo.merli@gmail.com&gt;
> > >>>>
> > >>>> On Wed, Sep 29, 2021 at 12:52 AM Jack Vanlightly
> > >>>> <jvanlightly@splunk.com.invalid&gt; wrote:
> > >>>> &gt;
> > >>>> &gt; I can see both sides of the argument regarding whether to flush
> > >>>> pending
> > >>>> &gt; messages or not. But I think what is definitely in the
> contract is
> > >>>> not to
> > >>>> &gt; discard any callbacks causing user code to block forever. No
> matter
> > >>>> what,
> > >>>> &gt; we must always call the callbacks.
> > >>>> &gt;
> > >>>> &gt; Personally, I am in favour of a close operation not flushing
> pending
> > >>>> &gt; messages (and I define pending here as any message that has a
> > >>>> callback).
> > >>>> &gt; The reason is that if we wait for all pending messages to be
> sent
> > >>>> then we
> > >>>> &gt; now face a number of edge cases that could cause the close
> operation
> > >>>> to
> > >>>> &gt; take a very long time to complete. What if the user code
> really just
> > >>>> needs
> > >>>> &gt; to close the producer right now? If we amend the documentation
> to
> > >>>> make it
> > >>>> &gt; clear that close does not flush pending messages then the user
> is now
> > >>>> able
> > >>>> &gt; to explicitly craft the behaviour they need. If they want all
> messages
> > >>>> &gt; flushed first then chaing flushAsync-&gt;closeAsync else just
> > >>>> closeAsync.
> > >>>> &gt;
> > >>>> &gt; Unfortunately I think user expectation, regardless of the
> current
> > >>>> javadoc,
> > >>>> &gt; is that close would flush everything and in an ideal world it
> would.
> > >>>> We
> > >>>> &gt; have the Principle of Least Surprise but we also have Safe By
> Default.
> > >>>> &gt; Users might be surprised that when calling closeAsync, a load
> of their
> > >>>> &gt; pending messages get ConnectionAlreadyClosed, but equally they
> might
> > >>>> be
> > >>>> &gt; surprised when closeAsync doesn't complete because the pending
> > >>>> messages
> > >>>> &gt; can't be cleared. Failing pending messages is the safer
> option. User
> > >>>> code
> > >>>> &gt; must handle failure responses and cannot claim data loss with a
> > >>>> &gt; non-positive response. But if they can't close a producer,
> that could
> > >>>> &gt; result in a wider impact on their system, not to mention more
> issues
> > >>>> &gt; created in GitHub.
> > >>>> &gt;
> > >>>> &gt; Jack
> > >>>> &gt;
> > >>>> &gt; On Wed, Sep 29, 2021 at 7:05 AM Joe F <joefrancisk@gmail.com
> &gt;
> > >>>> wrote:
> > >>>> &gt;
> > >>>> &gt; &gt; [ External sender. Exercise caution. ]
> > >>>> &gt; &gt;
> > >>>> &gt; &gt; &gt;I don't think that implementing `closeAsync` with
> graceful
> > >>>> shutdown
> > >>>> &gt; &gt; logic implies a guarantee of message publishing. Rather,
> it
> > >>>> guarantees
> > >>>> &gt; &gt; that failures will be the result of a real exception or a
> > >>>> timeout.
> > >>>> &gt; &gt;
> > >>>> &gt; &gt; I think that's beside the point.&nbsp;&nbsp;&nbsp;&nbsp;
> There
> > >>>> is no definition of "real"
> > >>>> &gt; &gt; exceptions.&nbsp;&nbsp; At that point the app is
> publishing on a
> > >>>> best effort basis,
> > >>>> &gt; &gt; and there are no guarantees anywhere in client or server.
> > >>>> &gt; &gt;
> > >>>> &gt; &gt; There is no concept&nbsp; of&nbsp; "maybe published". OR
> > >>>> &gt; &gt; "published-if-no_real_errors".&nbsp; What does that even
> > >>>> mean?&nbsp; That is only a
> > >>>> &gt; &gt; can of worms which is going to add to developer confusion
> and
> > >>>> lead to
> > >>>> &gt; &gt; Pulsar users finding in the worst possible way that
> something
> > >>>> got lost
> > >>>> &gt; &gt; because it never got published.&nbsp; It's a poor
> experience
> > >>>> when you find it.
> > >>>> &gt; &gt; I have a real life experience where a user used async
> APIs (in a
> > >>>> lambda),
> > >>>> &gt; &gt; which hummed along fine.&nbsp; One day much later, the
> cloud had
> > >>>> a hitch, and
> > >>>> &gt; &gt; they discovered a message was&nbsp; not delivered.
> > >>>> &gt; &gt;
> > >>>> &gt; &gt; I am more concerned about developers discovering at the
> worst
> > >>>> possible time
> > >>>> &gt; &gt; that&nbsp; ""published-if-no_real_errors"&nbsp; is a
> concept.
> > >>>> &gt; &gt;
> > >>>> &gt; &gt; My suggestion is to make this simple for developers.
> > >>>> &gt; &gt;
> > >>>> &gt; &gt; ----The sync/async nature of the close() [ or any other
> API, for
> > >>>> that
> > >>>> &gt; &gt; matter ]&nbsp; is completely orthogonal to the API
> semantics,
> > >>>> and is just a
> > >>>> &gt; &gt; programmatic choice to deal with&nbsp; how resources are
> managed
> > >>>> within the
> > >>>> &gt; &gt; program. That's not material here.---
> > >>>> &gt; &gt;
> > >>>> &gt; &gt; A close() is an action that is shutting down the producer
> right
> > >>>> now, not
> > >>>> &gt; &gt; even waiting for any acks of inflight messages. A
> willingness to
> > >>>> lose
> > >>>> &gt; &gt; pending/inflight messages is explicit in that call.&nbsp;
> The
> > >>>> producer will&nbsp; not
> > >>>> &gt; &gt; be around to deal with errors or to retry failed messages
> once
> > >>>> close() is
> > >>>> &gt; &gt; invoked.
> > >>>> &gt; &gt;
> > >>>> &gt; &gt; On the contrary, if the client does not want to deal with
> > >>>> message loss,
> > >>>> &gt; &gt; then flush(), stick around to gather the acks, deal with
> errors
> > >>>> and retries
> > >>>> &gt; &gt; etc and then do close() . Then close() will be just a
> resource
> > >>>> management
> > >>>> &gt; &gt; action on the client.
> > >>>> &gt; &gt;
> > >>>> &gt; &gt; So update the documentation to reflect that. ---&gt; if
> close()
> > >>>> is called on a
> > >>>> &gt; &gt; producer with messages pending acks, those messages are
> left
> > >>>> indoubt. Avoid
> > >>>> &gt; &gt; all mention of flushes, best effort etc.&nbsp; Users must
> buy
> > >>>> into&nbsp; uncertainty,
> > >>>> &gt; &gt; without any qualifications.
> > >>>> &gt; &gt;
> > >>>> &gt; &gt; I would at all costs avoid using the term "graceful"
> > >>>> anywhere.&nbsp; That word
> > >>>> &gt; &gt; has specific semantics associated with it in the
> systems/storage
> > >>>> domain ,
> > >>>> &gt; &gt; and what is being proposed here is nothing like that.
> > >>>> &gt; &gt;
> > >>>> &gt; &gt; -j
> > >>>> &gt; &gt;
> > >>>> &gt; &gt;
> > >>>> &gt; &gt; On Tue, Sep 28, 2021 at 7:05 PM Yunze Xu
> > >>>> <yzxu@streamnative.io.invalid&gt;
> > >>>> &gt; &gt; wrote:
> > >>>> &gt; &gt;
> > >>>> &gt; &gt; &gt; It’s a good point that
> > >>>> `ProducerImpl#failPendingBatchMessages` treats
> > >>>> &gt; &gt; &gt; messages in batch container also as pending messages.
> > >>>> &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt; I agree with your definition of "graceful close”.
> It’s more
> > >>>> like a “at
> > >>>> &gt; &gt; &gt; most once”
> > >>>> &gt; &gt; &gt; semantics, like the original JavaDoc said
> > >>>> &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt; &gt; pending writes will not be retried
> > >>>> &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt; Thanks,
> > >>>> &gt; &gt; &gt; Yunze
> > >>>> &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt; &gt; 2021年9月29日 上午5:24,Michael Marshall <
> > >>>> mikemarsh17@gmail.com&gt; 写道:
> > >>>> &gt; &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt; &gt; Thanks for bringing this thread to the mailing
> list,
> > >>>> Yunze.
> > >>>> &gt; &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt; &gt; I think the right change is to update the
> `closeAsync`
> > >>>> method to first
> > >>>> &gt; &gt; &gt; &gt; flush `batchMessageContainer` and to then
> > >>>> asynchronously wait for the
> > >>>> &gt; &gt; &gt; &gt; `pendingMessages` queue to drain. We could add
> a new
> > >>>> timeout or rely
> > >>>> &gt; &gt; &gt; &gt; on the already implemented `sendTimeout` config
> to put
> > >>>> an upper time
> > >>>> &gt; &gt; &gt; &gt; limit on `closeAsync`. My reasoning as well as
> > >>>> responses to Joe and
> > >>>> &gt; &gt; &gt; &gt; Yunze follow:
> > >>>> &gt; &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt; &gt;&gt; we need to define the behavior for how to
> process
> > >>>> `pendingMessages`
> > >>>> &gt; &gt; &gt; &gt;&gt; and `batchMessageContainer` when producer
> call
> > >>>> `closeAsync`.
> > >>>> &gt; &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt; &gt; Yes, this is exactly the clarification
> required, and I
> > >>>> agree that the
> > >>>> &gt; &gt; &gt; &gt; Javadoc is ambiguous and that the implementation
> > >>>> doesn't align with
> > >>>> &gt; &gt; &gt; &gt; the Javadoc.
> > >>>> &gt; &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt; &gt; If we view the Javadoc as binding, then the
> > >>>> fundamental question is
> > >>>> &gt; &gt; &gt; &gt; what messages are "pending". The
> `pendingMessages`
> > >>>> seem pretty easy to
> > >>>> &gt; &gt; &gt; &gt; classify as "pending" given that they are
> already in
> > >>>> flight on the
> > >>>> &gt; &gt; &gt; &gt; network.
> > >>>> &gt; &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt; &gt; I also consider `batchMessageContainer` to be
> > >>>> "pending" because a
> > >>>> &gt; &gt; &gt; &gt; client application already has callbacks for the
> > >>>> messages in this
> > >>>> &gt; &gt; &gt; &gt; container. These callbacks are expected to
> complete
> > >>>> when the batch
> > >>>> &gt; &gt; &gt; &gt; message delivery completes. Since the client
> > >>>> application already has a
> > >>>> &gt; &gt; &gt; &gt; reference to a callback, it isn't a problem
> that the
> > >>>> producer
> > >>>> &gt; &gt; &gt; &gt; implementation initiates the flush logic. (Note
> that
> > >>>> the current
> > >>>> &gt; &gt; &gt; &gt; design fails the `pendingMessages` but does not
> fail
> > >>>> the
> > >>>> &gt; &gt; &gt; &gt; `batchMessageContainer` when `closeAsync` is
> called,
> > >>>> so the callbacks
> > >>>> &gt; &gt; &gt; &gt; for that container are currently left incomplete
> > >>>> forever if the client
> > >>>> &gt; &gt; &gt; &gt; is closed with an unsent batch. We will need to
> > >>>> address this design in
> > >>>> &gt; &gt; &gt; &gt; the work that comes from this discussion.)
> > >>>> &gt; &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt; &gt; Further, the `ProducerImpl#failPendingMessages`
> method
> > >>>> includes logic
> > >>>> &gt; &gt; &gt; &gt; to call
> `ProducerImpl#failPendingBatchMessages`, which
> > >>>> implies that
> > >>>> &gt; &gt; &gt; &gt; these batched, but not sent, messages have been
> > >>>> historically
> > >>>> &gt; &gt; &gt; &gt; considered "pending".
> > >>>> &gt; &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt; &gt; If we view the Javadoc as non-binding, I think
> my
> > >>>> guiding influence
> > >>>> &gt; &gt; &gt; &gt; for the new design would be that the
> `closeAsync`
> > >>>> method should result
> > >>>> &gt; &gt; &gt; &gt; in a "graceful" shutdown of the client.
> > >>>> &gt; &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt; &gt;&gt; What exactly does "graceful" convey here?
> > >>>> &gt; &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt; &gt; This is a great question, and will likely drive
> the
> > >>>> design here. I
> > >>>> &gt; &gt; &gt; &gt; view graceful to mean that the producer
> attempts to
> > >>>> avoid artificial
> > >>>> &gt; &gt; &gt; &gt; failures. That means trying to drain the queue
> instead
> > >>>> of
> > >>>> &gt; &gt; &gt; &gt; automatically failing all of the queue's
> callbacks.
> > >>>> The tradeoff is
> > >>>> &gt; &gt; &gt; &gt; that closing the producer takes longer. This
> reasoning
> > >>>> would justify
> > >>>> &gt; &gt; &gt; &gt; my claim that we should first flush the
> > >>>> `batchMessageContainer`
> > >>>> &gt; &gt; &gt; &gt; instead of failing the batch without any effort
> at
> > >>>> delivery, as that
> > >>>> &gt; &gt; &gt; &gt; would be artificial.
> > >>>> &gt; &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt; &gt;&gt; There is no guarantee that either case will
> ensure
> > >>>> the message
> > >>>> &gt; &gt; &gt; &gt;&gt; is published.
> > >>>> &gt; &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt; &gt; I don't think that implementing `closeAsync`
> with
> > >>>> graceful shutdown
> > >>>> &gt; &gt; &gt; &gt; logic implies a guarantee of message publishing.
> > >>>> Rather, it guarantees
> > >>>> &gt; &gt; &gt; &gt; that failures will be the result of a real
> exception
> > >>>> or a timeout.
> > >>>> &gt; &gt; &gt; &gt; Since calling `closeAsync` prevents additional
> > >>>> messages from
> > >>>> &gt; &gt; &gt; &gt; delivering, users leveraging this functionality
> might
> > >>>> be operating
> > >>>> &gt; &gt; &gt; &gt; with "at most once" delivery semantics where
> they'd
> > >>>> prefer to deliver
> > >>>> &gt; &gt; &gt; &gt; the messages if possible, but they aren't going
> to
> > >>>> delay application
> > >>>> &gt; &gt; &gt; &gt; shutdown indefinitely to deliver its last
> messages. If
> > >>>> users need
> > >>>> &gt; &gt; &gt; &gt; stronger guarantees about whether their
> messages are
> > >>>> delivered, they
> > >>>> &gt; &gt; &gt; &gt; are probably already using the flush methods to
> ensure
> > >>>> that the
> > >>>> &gt; &gt; &gt; &gt; producer's queues are empty before calling
> > >>>> `closeAsync`.
> > >>>> &gt; &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt; &gt; I also agree that in all of these cases, we're
> > >>>> assuming that users are
> > >>>> &gt; &gt; &gt; &gt; capturing references to the async callbacks and
> then
> > >>>> making business
> > >>>> &gt; &gt; &gt; &gt; logic decisions based on the results of those
> > >>>> callbacks.
> > >>>> &gt; &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt; &gt; Thanks,
> > >>>> &gt; &gt; &gt; &gt; Michael
> > >>>> &gt; &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt; &gt; On Tue, Sep 28, 2021 at 4:58 AM Yunze Xu
> > >>>> <yzxu@streamnative.io.invalid
> > >>>> &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt; wrote:
> > >>>> &gt; &gt; &gt; &gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt; I can’t agree more, just like what I’ve
> said in PR
> > >>>> 12195:
> > >>>> &gt; &gt; &gt; &gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt; At any case, when you choose
> `sendAsync`, you
> > >>>> should always make use
> > >>>> &gt; &gt; &gt; of the returned future to confirm the result of all
> > >>>> messages. In Kafka,
> > >>>> &gt; &gt; &gt; it's the send callback.
> > >>>> &gt; &gt; &gt; &gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt; But I found many users are confused about
> the
> > >>>> current behavior,
> > >>>> &gt; &gt; &gt; especially
> > >>>> &gt; &gt; &gt; &gt;&gt; those are used to Kafka’s close semantics.
> They
> > >>>> might expect a simple
> > >>>> &gt; &gt; &gt; try
> > >>>> &gt; &gt; &gt; &gt;&gt; to flush existing messages, which works at a
> > >>>> simple test environment,
> > >>>> &gt; &gt; &gt; even
> > >>>> &gt; &gt; &gt; &gt;&gt; there's no guarantee for exception cases.
> > >>>> &gt; &gt; &gt; &gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt; 2021年9月28日 下午4:37,Joe F <
> joefrancisk@gmail.com&gt;
> > >>>> 写道:
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt; Clients should not depend on any of this
> > >>>> behaviour, since the broker
> > >>>> &gt; &gt; &gt; is at
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt; the other end of an unreliable&nbsp;
> network
> > >>>> connection. The
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt; semantic differences are kind of
> meaningless
> > >>>> from a usability point,
> > >>>> &gt; &gt; &gt; since
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt; flushing on close =/= published.&nbsp;
> What
> > >>>> exactly does "graceful" convey
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt; here?&nbsp; Flush the&nbsp; buffer on
> the
> > >>>> client end and hope it makes it to
> > >>>> &gt; &gt; the
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt; server.
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt; Is there a&nbsp; difference whether you
> > >>>> flush(or process) pending messages
> > >>>> &gt; &gt; &gt; or
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt; not? There is no guarantee that either
> case
> > >>>> will ensure the message
> > >>>> &gt; &gt; is
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt; published.
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt; The only way to ensure that messages are
> > >>>> published is to wait for the
> > >>>> &gt; &gt; &gt; ack.
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt; The correct model should be to wait for
> return
> > >>>> on the blocking API,
> > >>>> &gt; &gt; or
> > >>>> &gt; &gt; &gt; wait
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt; for future completion of the async API,
> then
> > >>>> handle any publish
> > >>>> &gt; &gt; errors
> > >>>> &gt; &gt; &gt; and
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt; then only close the producer.
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt; On Mon, Sep 27, 2021 at 8:50 PM Yunze Xu
> > >>>> &gt; &gt; <yzxu@streamnative.io.invalid
> > >>>> &gt; &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt; wrote:
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Hi all,
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Recently I found a PR (
> > >>>> https://github.com/apache/pulsar/pull/12195
> > >>>> &gt <https://github.com/apache/pulsar/pull/12195&gt>; &gt; <
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> > >>>> https://github.com/apache/pulsar/pull/12195&gt;) that
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; modifies the existing semantics of
> > >>>> producer close. There're already
> > >>>> &gt; &gt; &gt; some
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; communications in this PR, but I
> think
> > >>>> it's better to start a
> > >>>> &gt; &gt; &gt; discussion
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; here
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; to let more know.
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; The existing implementation of
> producer
> > >>>> close is:
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; 1. Cancel all timers, including
> send and
> > >>>> batch container
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; (`batchMessageContainer`).
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; 2. Complete all pending messages
> > >>>> (`pendingMessages`) with
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; `AlreadyCloseException`.
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; See `ProducerImpl#closeAsync` for
> details.
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; But the JavaDoc of
> `Producer#closeAsync`
> > >>>> is:
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;&gt; No more writes will be accepted
> from
> > >>>> this producer. Waits until all
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; pending write request are persisted.
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Anyway, the document and
> implementation
> > >>>> are inconsistent. But
> > >>>> &gt; &gt; &gt; specifically,
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; we need to define the behavior for
> how to
> > >>>> process `pendingMessages`
> > >>>> &gt; &gt; &gt; and
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; `batchMessageContainer` when
> producer call
> > >>>> `closeAsync`.
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; 1. batchMessageContainer: contains
> the
> > >>>> buffered single messages
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; (`Message<T&gt;`).
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; 2. pendingMessages: all inflight
> messages
> > >>>> (`OpSendMsg`) in network.
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; IMO, from the JavaDoc, only
> > >>>> `pendingMessages` should be processed
> > >>>> &gt; &gt; and
> > >>>> &gt; &gt; &gt; the
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; messages in `batchMessageContainer`
> should
> > >>>> be discarded.
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Since other clients might have
> already
> > >>>> implemented the similar
> > >>>> &gt; &gt; &gt; semantics of
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Java clients. If we changed the
> semantics
> > >>>> now, the behaviors among
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; different
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; clients might be inconsistent.
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Should we add a configuration to
> support
> > >>>> graceful close to follow
> > >>>> &gt; &gt; the
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; docs? Or
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; just change the current behavior?
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Thanks,
> > >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Yunze
> > >>>> &gt; &gt; &gt; &gt;&gt;
> > >>>> &gt; &gt; &gt;
> > >>>> &gt; &gt; &gt;
> > >>>> &gt; &gt;
> >
>

Re: Correct semantics of producer close

Posted by Michael Marshall <mi...@gmail.com>.
Thanks for your analysis, Yunze. I identified above that the messages
in the batch container were not getting completed correctly, so I put
together a PR to fix the problematic behavior. This PR will be valid
regardless of our decision to add flush logic to the close method.

Here is the PR: https://github.com/apache/pulsar/pull/12259.

Thanks,
Michael


On Thu, Sep 30, 2021 at 10:27 PM Yunze Xu <yz...@streamnative.io.invalid> wrote:
>
> You're right that before a CommandCloseProducer request was completed, the
> pending messages should be persisted before this close request was completed
> in normal cases. It’s guaranteed by broker side.
>
> Then there’s no inconsistency between the implementation and JavaDocs now.
>
> The key point is whether should we flush the messages in batch container. I prefer
> keeping the current semantics. But I found the messages in batch container
> never failed. We need to fix the problem. For example, here’s my unit test:
>
> ```java
>     @Test
>     public void test() throws Exception {
>         final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
>                 .topic("topic")
>                 .batchingMaxMessages(10000)
>                 .batchingMaxBytes(10000000)
>                 .batchingMaxPublishDelay(100, TimeUnit.SECONDS)
>                 .sendTimeout(1, TimeUnit.SECONDS)
>                 .create();
>         final CountDownLatch latch = new CountDownLatch(10);
>         final Map<Integer, Throwable> throwableMap = new ConcurrentHashMap<>();
>         for (int i = 0; i < 10; i++) {
>             final Integer index = i;
>             producer.sendAsync("msg-" + i).whenComplete((id, e) -> {
>                 if (e != null) {
>                     throwableMap.put(index, e);
>                 }
>                 latch.countDown();
>             });
>         }
>         producer.close();
>         latch.await();
>         throwableMap.forEach((i, e) -> {
>             log.info("Message {} failed with {}", i, e);
>         });
>     }
> ```
>
> The test would block forever.
>
> > 2021年10月1日 上午4:22,Michael Marshall <mi...@gmail.com> 写道:
> >
> > Following up here. I am pretty sure part of this conversation has been
> > based on a misunderstanding of the code. From what I can tell, the
> > behavior for `Producer#closeAsync` in the client (mostly) aligns with
> > the current Javadocs.
> >
> >> The existing implementation of producer close is:
> >> 1. Cancel all timers, including send and batch container (`batchMessageContainer`).
> >> 2. Complete all pending messages (`pendingMessages`) with `AlreadyCloseException`.
> >
> > I agree with 1, but I think 2 is only partially correct. The client
> > will only exceptionally complete pending messages if the connection is
> > null or not ready, or after the broker responds to the
> > `CLOSE_PRODUCER` command or a timeout passes. This behavior seems
> > right to me.
> >
> > Here is the relevant code:
> > https://github.com/apache/pulsar/blob/9d309145f342bc416b8b4663125e1216903a3e83/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L875-L909
> >
> > The only remaining question: does close imply flush? If not, we'll
> > need update the logic to fail the messages contained in the
> > `batchMessageContainer` during close. Otherwise, we'll update the
> > logic to call flush before sending the `CLOSE_PRODUCER` command and
> > everything should work as documented. In both cases, we should update
> > the Javadocs to make the behavior clearer.
> >
> > Thanks,
> > Michael
> >
> >
> >
> > On Thu, Sep 30, 2021 at 11:55 AM Michael Marshall <mi...@gmail.com> wrote:
> >>
> >> I have two questions:
> >>
> >> 1. Does close imply immediate shutdown?
> >> 2. Does close imply flush?
> >>
> >> There is not yet consensus on 1, and 2 is only relevant if 1's answer is "no".
> >>
> >> Thus far, the conversation has centered on the `Producer#close`
> >> method. I'd like to broaden the discussion to include some other
> >> methods from the `PulsarClient` interface: `shutdown` and `close`.
> >>
> >> The Javadoc for `PulsarClient#shutdown` describes the "shutdown
> >> immediately" behavior. It says:
> >>
> >>> Release all the resources and close all the producer, consumer and
> >>> reader instances without waiting for ongoing operations to complete.
> >>
> >> The Javadoc for `PulsarClient#close` describes waiting for
> >> pending/in-flight messages to complete before returning. It says:
> >>
> >>> This operation will trigger a graceful close of all producer, consumer
> >>> and reader instances that this client has currently active. That implies
> >>> that close will block and wait until all pending producer send requests
> >>> are persisted.
> >>
> >> One question that follows from the above: why does the `Producer` not
> >> have a `shutdown` method? I think this is because the "immediate
> >> shutdown" behavior is not necessary for a single producer. When
> >> immediate shutdown semantics are required, the `PulsarClient#shutdown`
> >> method is sufficient because it is used when shutting down the whole
> >> application. (If this is not correct, perhaps we should add a
> >> `shutdown` method to the producer?)
> >>
> >> Since immediate shutdown semantics are already available via our
> >> client API, I posit that the answer to question 1 is no, `close` does
> >> not imply immediate shutdown. At the very least, `close` in the Pulsar
> >> Client has not historically implied immediate shutdown.
> >>
> >> Additionally, it is relevant to point out that the `Producer#close`
> >> method is already sending a `CLOSE_PRODUCER` command and waiting on a
> >> response back from the broker. The broker's producer close method has
> >> the following Javadoc:
> >>
> >>> Close the producer immediately if: a. the connection is dropped
> >>> b. it's a graceful close and no pending publish acks are left
> >>> else wait for pending publish acks
> >>
> >> Since we're already waiting on the broker to respond to the producer's
> >> `CLOSE_PRODUCER` request, I see no reason to fail pending/in-flight
> >> messages immediately, especially because we should get a response back
> >> for those messages before getting the `SUCCESS` response from the
> >> broker since the responses will come on the same TCP connection. We
> >> could even simplify the close logic so that when the `CLOSE_PRODUCER`
> >> request completes (either successfully or because of a failure), we
> >> fail all remaining pending message futures.
> >>
> >> Ultimately, we need to decide whether to update the implementation to
> >> match the existing Javadocs, or to update the Javadocs to indicate
> >> that `close` means an immediate shutdown, which includes failing all
> >> outstanding message futures immediately. My vote is to make the
> >> implementation align with the Javadocs.
> >>
> >> Regarding question 2, I prefer that `close` implies flush because it
> >> is only a single (batched) message being flushed. If we do flush this
> >> message, we'll need to make sure that the message is sent before
> >> the `CLOSE_PRODUCER` command is sent.
> >>
> >> Thanks,
> >> Michael
> >>
> >>
> >> On Wed, Sep 29, 2021 at 7:04 AM Enrico Olivelli <eo...@gmail.com> wrote:
> >>>
> >>> I agree that we must ensure that every pending callback must be completed
> >>> eventually (timeout or another error is not a problem),
> >>> because we cannot let the client application hang forever.
> >>> I believe that the application can perform a flush() explicitly and also
> >>> wait for every callback to be executed if that is the requirement.
> >>>
> >>> Usually you call close() when:
> >>> 1. you have a serious problem: you already know that there is a hard error,
> >>> and you want to close the Producer or the Application and possibly start a
> >>> new one to recover
> >>> 2. you are shutting down your application or component: you have control
> >>> over the callbacks, so you can wait for them to complete
> >>>
> >>> So case 2. can be covered by the application. We have to support case 1:
> >>> fail fast and close (no need for flush()) .
> >>>
> >>> In my experience trying to implement "graceful stops" adds only complexity
> >>> and false hopes to the users.
> >>>
> >>> Enrico
> >>>
> >>>
> >>>
> >>> Il giorno mer 29 set 2021 alle ore 13:58 Nirvana <15...@qq.com.invalid>
> >>> ha scritto:
> >>>
> >>>> I agree to try to ensure ”at most once“ when closing。
> >>>>
> >>>>
> >>>> &gt; That would still get controlled by send timeout, after that, the send
> >>>> will fail and close should proceed.
> >>>> This sounds more in line with “at most once”。
> >>>>
> >>>>
> >>>> ------------------&nbsp;原始邮件&nbsp;------------------
> >>>> 发件人:
> >>>>                                                  "dev"
> >>>>                                                                <
> >>>> matteo.merli@gmail.com&gt;;
> >>>> 发送时间:&nbsp;2021年9月29日(星期三) 下午3:55
> >>>> 收件人:&nbsp;"Dev"<dev@pulsar.apache.org&gt;;
> >>>>
> >>>> 主题:&nbsp;Re: Correct semantics of producer close
> >>>>
> >>>>
> >>>>
> >>>> &gt; but equally they might be
> >>>> &gt; surprised when closeAsync doesn't complete because the pending
> >>>> messages
> >>>> &gt; can't be cleared
> >>>>
> >>>> That would still get controlled by send timeout, after that, the send
> >>>> will fail and close should proceed.
> >>>>
> >>>> --
> >>>> Matteo Merli
> >>>> <matteo.merli@gmail.com&gt;
> >>>>
> >>>> On Wed, Sep 29, 2021 at 12:52 AM Jack Vanlightly
> >>>> <jvanlightly@splunk.com.invalid&gt; wrote:
> >>>> &gt;
> >>>> &gt; I can see both sides of the argument regarding whether to flush
> >>>> pending
> >>>> &gt; messages or not. But I think what is definitely in the contract is
> >>>> not to
> >>>> &gt; discard any callbacks causing user code to block forever. No matter
> >>>> what,
> >>>> &gt; we must always call the callbacks.
> >>>> &gt;
> >>>> &gt; Personally, I am in favour of a close operation not flushing pending
> >>>> &gt; messages (and I define pending here as any message that has a
> >>>> callback).
> >>>> &gt; The reason is that if we wait for all pending messages to be sent
> >>>> then we
> >>>> &gt; now face a number of edge cases that could cause the close operation
> >>>> to
> >>>> &gt; take a very long time to complete. What if the user code really just
> >>>> needs
> >>>> &gt; to close the producer right now? If we amend the documentation to
> >>>> make it
> >>>> &gt; clear that close does not flush pending messages then the user is now
> >>>> able
> >>>> &gt; to explicitly craft the behaviour they need. If they want all messages
> >>>> &gt; flushed first then chaing flushAsync-&gt;closeAsync else just
> >>>> closeAsync.
> >>>> &gt;
> >>>> &gt; Unfortunately I think user expectation, regardless of the current
> >>>> javadoc,
> >>>> &gt; is that close would flush everything and in an ideal world it would.
> >>>> We
> >>>> &gt; have the Principle of Least Surprise but we also have Safe By Default.
> >>>> &gt; Users might be surprised that when calling closeAsync, a load of their
> >>>> &gt; pending messages get ConnectionAlreadyClosed, but equally they might
> >>>> be
> >>>> &gt; surprised when closeAsync doesn't complete because the pending
> >>>> messages
> >>>> &gt; can't be cleared. Failing pending messages is the safer option. User
> >>>> code
> >>>> &gt; must handle failure responses and cannot claim data loss with a
> >>>> &gt; non-positive response. But if they can't close a producer, that could
> >>>> &gt; result in a wider impact on their system, not to mention more issues
> >>>> &gt; created in GitHub.
> >>>> &gt;
> >>>> &gt; Jack
> >>>> &gt;
> >>>> &gt; On Wed, Sep 29, 2021 at 7:05 AM Joe F <joefrancisk@gmail.com&gt;
> >>>> wrote:
> >>>> &gt;
> >>>> &gt; &gt; [ External sender. Exercise caution. ]
> >>>> &gt; &gt;
> >>>> &gt; &gt; &gt;I don't think that implementing `closeAsync` with graceful
> >>>> shutdown
> >>>> &gt; &gt; logic implies a guarantee of message publishing. Rather, it
> >>>> guarantees
> >>>> &gt; &gt; that failures will be the result of a real exception or a
> >>>> timeout.
> >>>> &gt; &gt;
> >>>> &gt; &gt; I think that's beside the point.&nbsp;&nbsp;&nbsp;&nbsp; There
> >>>> is no definition of "real"
> >>>> &gt; &gt; exceptions.&nbsp;&nbsp; At that point the app is publishing on a
> >>>> best effort basis,
> >>>> &gt; &gt; and there are no guarantees anywhere in client or server.
> >>>> &gt; &gt;
> >>>> &gt; &gt; There is no concept&nbsp; of&nbsp; "maybe published". OR
> >>>> &gt; &gt; "published-if-no_real_errors".&nbsp; What does that even
> >>>> mean?&nbsp; That is only a
> >>>> &gt; &gt; can of worms which is going to add to developer confusion and
> >>>> lead to
> >>>> &gt; &gt; Pulsar users finding in the worst possible way that something
> >>>> got lost
> >>>> &gt; &gt; because it never got published.&nbsp; It's a poor experience
> >>>> when you find it.
> >>>> &gt; &gt; I have a real life experience where a user used async APIs (in a
> >>>> lambda),
> >>>> &gt; &gt; which hummed along fine.&nbsp; One day much later, the cloud had
> >>>> a hitch, and
> >>>> &gt; &gt; they discovered a message was&nbsp; not delivered.
> >>>> &gt; &gt;
> >>>> &gt; &gt; I am more concerned about developers discovering at the worst
> >>>> possible time
> >>>> &gt; &gt; that&nbsp; ""published-if-no_real_errors"&nbsp; is a concept.
> >>>> &gt; &gt;
> >>>> &gt; &gt; My suggestion is to make this simple for developers.
> >>>> &gt; &gt;
> >>>> &gt; &gt; ----The sync/async nature of the close() [ or any other API, for
> >>>> that
> >>>> &gt; &gt; matter ]&nbsp; is completely orthogonal to the API semantics,
> >>>> and is just a
> >>>> &gt; &gt; programmatic choice to deal with&nbsp; how resources are managed
> >>>> within the
> >>>> &gt; &gt; program. That's not material here.---
> >>>> &gt; &gt;
> >>>> &gt; &gt; A close() is an action that is shutting down the producer right
> >>>> now, not
> >>>> &gt; &gt; even waiting for any acks of inflight messages. A willingness to
> >>>> lose
> >>>> &gt; &gt; pending/inflight messages is explicit in that call.&nbsp; The
> >>>> producer will&nbsp; not
> >>>> &gt; &gt; be around to deal with errors or to retry failed messages once
> >>>> close() is
> >>>> &gt; &gt; invoked.
> >>>> &gt; &gt;
> >>>> &gt; &gt; On the contrary, if the client does not want to deal with
> >>>> message loss,
> >>>> &gt; &gt; then flush(), stick around to gather the acks, deal with errors
> >>>> and retries
> >>>> &gt; &gt; etc and then do close() . Then close() will be just a resource
> >>>> management
> >>>> &gt; &gt; action on the client.
> >>>> &gt; &gt;
> >>>> &gt; &gt; So update the documentation to reflect that. ---&gt; if close()
> >>>> is called on a
> >>>> &gt; &gt; producer with messages pending acks, those messages are left
> >>>> indoubt. Avoid
> >>>> &gt; &gt; all mention of flushes, best effort etc.&nbsp; Users must buy
> >>>> into&nbsp; uncertainty,
> >>>> &gt; &gt; without any qualifications.
> >>>> &gt; &gt;
> >>>> &gt; &gt; I would at all costs avoid using the term "graceful"
> >>>> anywhere.&nbsp; That word
> >>>> &gt; &gt; has specific semantics associated with it in the systems/storage
> >>>> domain ,
> >>>> &gt; &gt; and what is being proposed here is nothing like that.
> >>>> &gt; &gt;
> >>>> &gt; &gt; -j
> >>>> &gt; &gt;
> >>>> &gt; &gt;
> >>>> &gt; &gt; On Tue, Sep 28, 2021 at 7:05 PM Yunze Xu
> >>>> <yzxu@streamnative.io.invalid&gt;
> >>>> &gt; &gt; wrote:
> >>>> &gt; &gt;
> >>>> &gt; &gt; &gt; It’s a good point that
> >>>> `ProducerImpl#failPendingBatchMessages` treats
> >>>> &gt; &gt; &gt; messages in batch container also as pending messages.
> >>>> &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; I agree with your definition of "graceful close”. It’s more
> >>>> like a “at
> >>>> &gt; &gt; &gt; most once”
> >>>> &gt; &gt; &gt; semantics, like the original JavaDoc said
> >>>> &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; pending writes will not be retried
> >>>> &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; Thanks,
> >>>> &gt; &gt; &gt; Yunze
> >>>> &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; 2021年9月29日 上午5:24,Michael Marshall <
> >>>> mikemarsh17@gmail.com&gt; 写道:
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; Thanks for bringing this thread to the mailing list,
> >>>> Yunze.
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; I think the right change is to update the `closeAsync`
> >>>> method to first
> >>>> &gt; &gt; &gt; &gt; flush `batchMessageContainer` and to then
> >>>> asynchronously wait for the
> >>>> &gt; &gt; &gt; &gt; `pendingMessages` queue to drain. We could add a new
> >>>> timeout or rely
> >>>> &gt; &gt; &gt; &gt; on the already implemented `sendTimeout` config to put
> >>>> an upper time
> >>>> &gt; &gt; &gt; &gt; limit on `closeAsync`. My reasoning as well as
> >>>> responses to Joe and
> >>>> &gt; &gt; &gt; &gt; Yunze follow:
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt;&gt; we need to define the behavior for how to process
> >>>> `pendingMessages`
> >>>> &gt; &gt; &gt; &gt;&gt; and `batchMessageContainer` when producer call
> >>>> `closeAsync`.
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; Yes, this is exactly the clarification required, and I
> >>>> agree that the
> >>>> &gt; &gt; &gt; &gt; Javadoc is ambiguous and that the implementation
> >>>> doesn't align with
> >>>> &gt; &gt; &gt; &gt; the Javadoc.
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; If we view the Javadoc as binding, then the
> >>>> fundamental question is
> >>>> &gt; &gt; &gt; &gt; what messages are "pending". The `pendingMessages`
> >>>> seem pretty easy to
> >>>> &gt; &gt; &gt; &gt; classify as "pending" given that they are already in
> >>>> flight on the
> >>>> &gt; &gt; &gt; &gt; network.
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; I also consider `batchMessageContainer` to be
> >>>> "pending" because a
> >>>> &gt; &gt; &gt; &gt; client application already has callbacks for the
> >>>> messages in this
> >>>> &gt; &gt; &gt; &gt; container. These callbacks are expected to complete
> >>>> when the batch
> >>>> &gt; &gt; &gt; &gt; message delivery completes. Since the client
> >>>> application already has a
> >>>> &gt; &gt; &gt; &gt; reference to a callback, it isn't a problem that the
> >>>> producer
> >>>> &gt; &gt; &gt; &gt; implementation initiates the flush logic. (Note that
> >>>> the current
> >>>> &gt; &gt; &gt; &gt; design fails the `pendingMessages` but does not fail
> >>>> the
> >>>> &gt; &gt; &gt; &gt; `batchMessageContainer` when `closeAsync` is called,
> >>>> so the callbacks
> >>>> &gt; &gt; &gt; &gt; for that container are currently left incomplete
> >>>> forever if the client
> >>>> &gt; &gt; &gt; &gt; is closed with an unsent batch. We will need to
> >>>> address this design in
> >>>> &gt; &gt; &gt; &gt; the work that comes from this discussion.)
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; Further, the `ProducerImpl#failPendingMessages` method
> >>>> includes logic
> >>>> &gt; &gt; &gt; &gt; to call `ProducerImpl#failPendingBatchMessages`, which
> >>>> implies that
> >>>> &gt; &gt; &gt; &gt; these batched, but not sent, messages have been
> >>>> historically
> >>>> &gt; &gt; &gt; &gt; considered "pending".
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; If we view the Javadoc as non-binding, I think my
> >>>> guiding influence
> >>>> &gt; &gt; &gt; &gt; for the new design would be that the `closeAsync`
> >>>> method should result
> >>>> &gt; &gt; &gt; &gt; in a "graceful" shutdown of the client.
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt;&gt; What exactly does "graceful" convey here?
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; This is a great question, and will likely drive the
> >>>> design here. I
> >>>> &gt; &gt; &gt; &gt; view graceful to mean that the producer attempts to
> >>>> avoid artificial
> >>>> &gt; &gt; &gt; &gt; failures. That means trying to drain the queue instead
> >>>> of
> >>>> &gt; &gt; &gt; &gt; automatically failing all of the queue's callbacks.
> >>>> The tradeoff is
> >>>> &gt; &gt; &gt; &gt; that closing the producer takes longer. This reasoning
> >>>> would justify
> >>>> &gt; &gt; &gt; &gt; my claim that we should first flush the
> >>>> `batchMessageContainer`
> >>>> &gt; &gt; &gt; &gt; instead of failing the batch without any effort at
> >>>> delivery, as that
> >>>> &gt; &gt; &gt; &gt; would be artificial.
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt;&gt; There is no guarantee that either case will ensure
> >>>> the message
> >>>> &gt; &gt; &gt; &gt;&gt; is published.
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; I don't think that implementing `closeAsync` with
> >>>> graceful shutdown
> >>>> &gt; &gt; &gt; &gt; logic implies a guarantee of message publishing.
> >>>> Rather, it guarantees
> >>>> &gt; &gt; &gt; &gt; that failures will be the result of a real exception
> >>>> or a timeout.
> >>>> &gt; &gt; &gt; &gt; Since calling `closeAsync` prevents additional
> >>>> messages from
> >>>> &gt; &gt; &gt; &gt; delivering, users leveraging this functionality might
> >>>> be operating
> >>>> &gt; &gt; &gt; &gt; with "at most once" delivery semantics where they'd
> >>>> prefer to deliver
> >>>> &gt; &gt; &gt; &gt; the messages if possible, but they aren't going to
> >>>> delay application
> >>>> &gt; &gt; &gt; &gt; shutdown indefinitely to deliver its last messages. If
> >>>> users need
> >>>> &gt; &gt; &gt; &gt; stronger guarantees about whether their messages are
> >>>> delivered, they
> >>>> &gt; &gt; &gt; &gt; are probably already using the flush methods to ensure
> >>>> that the
> >>>> &gt; &gt; &gt; &gt; producer's queues are empty before calling
> >>>> `closeAsync`.
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; I also agree that in all of these cases, we're
> >>>> assuming that users are
> >>>> &gt; &gt; &gt; &gt; capturing references to the async callbacks and then
> >>>> making business
> >>>> &gt; &gt; &gt; &gt; logic decisions based on the results of those
> >>>> callbacks.
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; Thanks,
> >>>> &gt; &gt; &gt; &gt; Michael
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; On Tue, Sep 28, 2021 at 4:58 AM Yunze Xu
> >>>> <yzxu@streamnative.io.invalid
> >>>> &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; wrote:
> >>>> &gt; &gt; &gt; &gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt; I can’t agree more, just like what I’ve said in PR
> >>>> 12195:
> >>>> &gt; &gt; &gt; &gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; At any case, when you choose `sendAsync`, you
> >>>> should always make use
> >>>> &gt; &gt; &gt; of the returned future to confirm the result of all
> >>>> messages. In Kafka,
> >>>> &gt; &gt; &gt; it's the send callback.
> >>>> &gt; &gt; &gt; &gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt; But I found many users are confused about the
> >>>> current behavior,
> >>>> &gt; &gt; &gt; especially
> >>>> &gt; &gt; &gt; &gt;&gt; those are used to Kafka’s close semantics. They
> >>>> might expect a simple
> >>>> &gt; &gt; &gt; try
> >>>> &gt; &gt; &gt; &gt;&gt; to flush existing messages, which works at a
> >>>> simple test environment,
> >>>> &gt; &gt; &gt; even
> >>>> &gt; &gt; &gt; &gt;&gt; there's no guarantee for exception cases.
> >>>> &gt; &gt; &gt; &gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; 2021年9月28日 下午4:37,Joe F <joefrancisk@gmail.com&gt;
> >>>> 写道:
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; Clients should not depend on any of this
> >>>> behaviour, since the broker
> >>>> &gt; &gt; &gt; is at
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; the other end of an unreliable&nbsp; network
> >>>> connection. The
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; semantic differences are kind of meaningless
> >>>> from a usability point,
> >>>> &gt; &gt; &gt; since
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; flushing on close =/= published.&nbsp; What
> >>>> exactly does "graceful" convey
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; here?&nbsp; Flush the&nbsp; buffer on the
> >>>> client end and hope it makes it to
> >>>> &gt; &gt; the
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; server.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; Is there a&nbsp; difference whether you
> >>>> flush(or process) pending messages
> >>>> &gt; &gt; &gt; or
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; not? There is no guarantee that either case
> >>>> will ensure the message
> >>>> &gt; &gt; is
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; published.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; The only way to ensure that messages are
> >>>> published is to wait for the
> >>>> &gt; &gt; &gt; ack.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; The correct model should be to wait for return
> >>>> on the blocking API,
> >>>> &gt; &gt; or
> >>>> &gt; &gt; &gt; wait
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; for future completion of the async API, then
> >>>> handle any publish
> >>>> &gt; &gt; errors
> >>>> &gt; &gt; &gt; and
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; then only close the producer.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; On Mon, Sep 27, 2021 at 8:50 PM Yunze Xu
> >>>> &gt; &gt; <yzxu@streamnative.io.invalid
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; wrote:
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Hi all,
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Recently I found a PR (
> >>>> https://github.com/apache/pulsar/pull/12195
> >>>> &gt <https://github.com/apache/pulsar/pull/12195&gt>; &gt; <
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> https://github.com/apache/pulsar/pull/12195&gt;) that
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; modifies the existing semantics of
> >>>> producer close. There're already
> >>>> &gt; &gt; &gt; some
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; communications in this PR, but I think
> >>>> it's better to start a
> >>>> &gt; &gt; &gt; discussion
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; here
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; to let more know.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; The existing implementation of producer
> >>>> close is:
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; 1. Cancel all timers, including send and
> >>>> batch container
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; (`batchMessageContainer`).
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; 2. Complete all pending messages
> >>>> (`pendingMessages`) with
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; `AlreadyCloseException`.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; See `ProducerImpl#closeAsync` for details.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; But the JavaDoc of `Producer#closeAsync`
> >>>> is:
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;&gt; No more writes will be accepted from
> >>>> this producer. Waits until all
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; pending write request are persisted.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Anyway, the document and implementation
> >>>> are inconsistent. But
> >>>> &gt; &gt; &gt; specifically,
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; we need to define the behavior for how to
> >>>> process `pendingMessages`
> >>>> &gt; &gt; &gt; and
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; `batchMessageContainer` when producer call
> >>>> `closeAsync`.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; 1. batchMessageContainer: contains the
> >>>> buffered single messages
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; (`Message<T&gt;`).
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; 2. pendingMessages: all inflight messages
> >>>> (`OpSendMsg`) in network.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; IMO, from the JavaDoc, only
> >>>> `pendingMessages` should be processed
> >>>> &gt; &gt; and
> >>>> &gt; &gt; &gt; the
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; messages in `batchMessageContainer` should
> >>>> be discarded.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Since other clients might have already
> >>>> implemented the similar
> >>>> &gt; &gt; &gt; semantics of
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Java clients. If we changed the semantics
> >>>> now, the behaviors among
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; different
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; clients might be inconsistent.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Should we add a configuration to support
> >>>> graceful close to follow
> >>>> &gt; &gt; the
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; docs? Or
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; just change the current behavior?
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Thanks,
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Yunze
> >>>> &gt; &gt; &gt; &gt;&gt;
> >>>> &gt; &gt; &gt;
> >>>> &gt; &gt; &gt;
> >>>> &gt; &gt;
>