You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/11/07 23:36:53 UTC
[GitHub] [pulsar] leizhiyuan created a discussion: sometimes broker send messages to consumer,but consumer can not receive
GitHub user leizhiyuan created a discussion: sometimes broker send messages to consumer,but consumer can not receive
we use pulsar client 2.7.4 and use **batchReceive**() to receive messages. we add a tracer in broker, after we invoke ctx.write(new Message), and flush ,we will write a tracer record (so we can judge we send the message to consumer).
PulsarCommandSenderImpl.java
<img width="1182" alt="image" src="https://user-images.githubusercontent.com/2684384/200438085-76d2b13a-4b29-4619-85f8-4b28906a419b.png">
<img width="656" alt="image" src="https://user-images.githubusercontent.com/2684384/200437712-c306a780-82b2-4b6a-b970-6aca43fe7b60.png">
but sometimes ,we can see there will have an unacked message, but from consumer, once we receive a message, we will print a log, it seems we can not receive this message sometimes.
<img width="839" alt="image" src="https://user-images.githubusercontent.com/2684384/200437843-ba8c44cc-40b8-4bc9-9c88-f307578feec8.png">
do you have any idea for this issue? thanks for any idea.
GitHub link: https://github.com/apache/pulsar/discussions/18377
----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org
[GitHub] [pulsar] leizhiyuan added a comment to the discussion: sometimes broker send messages to consumer,but consumer can not receive
Posted by GitBox <gi...@apache.org>.
GitHub user leizhiyuan added a comment to the discussion: sometimes broker send messages to consumer,but consumer can not receive
```
Future<Void> writeAndFlushPromise =
cnx.getCommandSender().sendMessagesToConsumer(consumerId, topicName, subscription, partitionIdx,
entries, batchSizes, batchIndexesAcks, redeliveryTracker, epoch);
writeAndFlushPromise.addListener(status -> {
// only increment counters after the messages have been successfully written to the TCP/IP connection
if (status.isSuccess()) {
msgOut.recordMultipleEvents(totalMessages, totalBytes);
msgOutCounter.add(totalMessages);
bytesOutCounter.add(totalBytes);
chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0);
}
});
```
maybe this write fail
GitHub link: https://github.com/apache/pulsar/discussions/18377#discussioncomment-4083271
----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org