You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/11/07 23:36:53 UTC

[GitHub] [pulsar] leizhiyuan created a discussion: sometimes broker send messages to consumer,but consumer can not receive

GitHub user leizhiyuan created a discussion: sometimes broker send messages to consumer,but consumer can not receive

we use pulsar client 2.7.4 and use **batchReceive**() to receive messages. we add a tracer in broker, after we invoke ctx.write(new Message), and flush ,we will write a tracer record (so we can judge we send the message to consumer).

PulsarCommandSenderImpl.java
<img width="1182" alt="image" src="https://user-images.githubusercontent.com/2684384/200438085-76d2b13a-4b29-4619-85f8-4b28906a419b.png">


<img width="656" alt="image" src="https://user-images.githubusercontent.com/2684384/200437712-c306a780-82b2-4b6a-b970-6aca43fe7b60.png">

but sometimes ,we can see there will have an unacked message, but from consumer, once we receive a message, we will print a log, it seems we can not receive this message sometimes.

<img width="839" alt="image" src="https://user-images.githubusercontent.com/2684384/200437843-ba8c44cc-40b8-4bc9-9c88-f307578feec8.png">

do you have any idea for this issue? thanks for any idea.


GitHub link: https://github.com/apache/pulsar/discussions/18377

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org


[GitHub] [pulsar] leizhiyuan added a comment to the discussion: sometimes broker send messages to consumer,but consumer can not receive

Posted by GitBox <gi...@apache.org>.
GitHub user leizhiyuan added a comment to the discussion: sometimes broker send messages to consumer,but consumer can not receive

```
   Future<Void> writeAndFlushPromise =
                cnx.getCommandSender().sendMessagesToConsumer(consumerId, topicName, subscription, partitionIdx,
                        entries, batchSizes, batchIndexesAcks, redeliveryTracker, epoch);
        writeAndFlushPromise.addListener(status -> {
            // only increment counters after the messages have been successfully written to the TCP/IP connection
            if (status.isSuccess()) {
                msgOut.recordMultipleEvents(totalMessages, totalBytes);
                msgOutCounter.add(totalMessages);
                bytesOutCounter.add(totalBytes);
                chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0);
            }
        });
```

maybe this write fail

GitHub link: https://github.com/apache/pulsar/discussions/18377#discussioncomment-4083271

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org