You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2020/05/01 09:11:03 UTC

Slack digest for #dev - 2020-05-01

2020-04-30 10:47:28 UTC - Avimas: Hi Guys,
I started to work with JavaClient 2.5.1 batchReceive method and I think we have few issues in this area, and I like to get your feedback on this.

1. Batch size is not limited to the minimum of the maxNumberOfMessages and maxSizeOfMessages from the BatchRecieve policy.
I think this issue can be easly fixed by changing the canAdd function in MessagesImpl from:

protected boolean canAdd(Message&lt;T&gt; message) {
    if (this.maxNumberOfMessages &lt;= 0 &amp;&amp; this.maxSizeOfMessages &lt;= 0L) {
        return true;
    } else {
        return this.maxNumberOfMessages &gt; 0 &amp;&amp; this.currentNumberOfMessages + 1 &lt;= this.maxNumberOfMessages || this.maxSizeOfMessages &gt; 0L &amp;&amp;   this.currentSizeOfMessages + (long)message.getData().length &lt;= this.maxSizeOfMessages;
    }
}

to (changing the condintion in the else to &amp;&amp; instead of ||):

protected boolean canAdd(Message&lt;T&gt; message) {
    if (this.maxNumberOfMessages &lt;= 0 &amp;&amp; this.maxSizeOfMessages &lt;= 0L) {
        return true;
    } else {
        return (this.maxNumberOfMessages &gt; 0 &amp;&amp; this.currentNumberOfMessages + 1 &lt;= this.maxNumberOfMessages) *&amp;&amp;* (this.maxSizeOfMessages &gt; 0L &amp;&amp;   this.currentSizeOfMessages + (long)message.getData().length &lt;= this.maxSizeOfMessages);
    }
}

2. When the batch size is higher than the recieveQ of the consumer (I used a batch size of 3000 and a receiveQ of 500) I noticed the following issues:

	a. In a mutliTopic (pattern) consumer the client stops receiving any messages I think it getting paused and never resumed when setting a timeout in the batch policy, only one batch is fetched and the client never resumed.

	b.In a simple topic consumer, I get three batches of 3000 which I think is very bad considering maxReceive Q is 500, and after that the client stops receiving any traffic.


Do you think I should open issues on this?
Any limitation I am not aware of?
----
2020-04-30 12:58:33 UTC - Penghui Li: Hi @Avimas, thanks for your feedback.  You can open an issue for these two problems. Other users may also met these problems, so that they can find them from Github issues. For the first problem, the change looks good, pull request is welcome. I will take a look at the second issue later.
----
2020-04-30 14:28:36 UTC - Avimas: Great thanks! I will create a pull request for the first problem, and open issues for the others.
----
2020-04-30 14:57:08 UTC - Penghui Li: @Avimas , I run a test for the second issue (single topic consumer, but can’t reproduce it, I’m working on the master branch
```    @Test
    public void test() throws PulsarClientException {
        final String topic = "<persistent://my-property/my-ns/test>";

        Producer&lt;byte[]&gt; producer = pulsarClient.newProducer().topic(topic).blockIfQueueFull(true).create();
        new Thread(() -&gt; {
            while (true) {
                producer.sendAsync("".getBytes());
            }
        }).start();

        Consumer&lt;byte[]&gt; consumer = pulsarClient.newConsumer()
                .topic(topic)
                .receiverQueueSize(100)
                .subscriptionName("test")
                .batchReceivePolicy(BatchReceivePolicy.builder().maxNumMessages(5000).timeout(1, TimeUnit.SECONDS).build())
                .subscribe();

        while (true){
            Messages&lt;byte[]&gt; msg = consumer.batchReceive();
            System.out.println(msg.size());
        }

    }```
----
2020-04-30 14:57:08 UTC - Penghui Li: ```5000
5000
2993
977
5000
5000
5000
5000
1790
5000
5000
5000
5000
5000
5000
1136
5000
5000
5000
5000
5000
5000
5000
5000
5000
5000```
----
2020-04-30 14:57:37 UTC - Penghui Li: I’m not sure if I missed some thing.
----
2020-04-30 15:09:26 UTC - Avimas: Try Removing the 
timeout
----
2020-04-30 15:10:40 UTC - Avimas: Notice that the batch size is greater than the receive q which is not so good I assume 
----
2020-04-30 15:22:32 UTC - Avimas: @Penghui Li this is my producer configuration
----
2020-04-30 15:22:49 UTC - Avimas: ```clientProviderForTests.getPulsarClient()
        .newProducer()
        .blockIfQueueFull(true)
        .maxPendingMessages(2000)
        .enableBatching(true)
        .batchingMaxMessages(2000)
        .sendTimeout(30, TimeUnit.SECONDS)
        .topic("public/default/test").create();```

----
2020-04-30 15:24:06 UTC - Avimas: and this my consumer
----
2020-04-30 15:24:14 UTC - Avimas: ```consumer = clientProviderForTests.getPulsarClient().newConsumer()
        //.subscriptionType(SubscriptionType.Shared)
        .subscriptionName("disconnections3")
        .topic("public/default/test")
        .receiverQueueSize(500)
        .batchReceivePolicy(BatchReceivePolicy.builder().maxNumBytes(1024*1024).maxNumMessages(3000).build())
        .subscribe();```

----
2020-04-30 15:24:53 UTC - Avimas: let me know if the issue reproduced.
----
2020-04-30 15:57:47 UTC - Avimas: @Penghui Li just verified the issue is reproduced in my environment when the timeout is omitted
----
2020-04-30 18:25:34 UTC - Greg Methvin: I also noticed something similar to (2) even without batching when `receiverQueueSize` is less than `batchingMaxMessages`. If I had 10 consumers I might only have one or two that actually were processing messages, and the rest were “stuck”. I learned the hard way to always set `receiverQueueSize &gt; batchingMaxMessages`.
----
2020-04-30 18:49:24 UTC - Avimas: Yes it is a good solution, but I think it hides a bigger issue 
----
2020-05-01 01:16:39 UTC - Penghui Li: @Avimas Oh, I see. The problem is related to the flow permits control when you set `batchingMaxMessages` greater that `receiverQueueSize` . I have create an issue <https://github.com/apache/pulsar/issues/6854> and will fix it later.
----
2020-05-01 04:16:51 UTC - Frank Xu: @Frank Xu has joined the channel
----
2020-05-01 05:23:58 UTC - Avimas: Thanks @Penghui Li 
----