You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/11/21 06:41:47 UTC
[pulsar] branch master updated: Fix consumer of python queue size
is zero (#5706)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
View the commit online:
https://github.com/apache/pulsar/commit/fa029700d3edbc398356f6b74cad1120649d2494
The following commit(s) were added to refs/heads/master by this push:
new fa02970 Fix consumer of python queue size is zero (#5706)
fa02970 is described below
commit fa029700d3edbc398356f6b74cad1120649d2494
Author: tuteng <gu...@apache.org>
AuthorDate: Thu Nov 21 14:41:39 2019 +0800
Fix consumer of python queue size is zero (#5706)
Fixes https://github.com/apache/pulsar/issues/5634
Master Issue: https://github.com/apache/pulsar/issues/5634
### Motivation
In java clients, when we call the `receive`, we will block it until a message arrives. in python clients, when we call the `receive` function, we add a delay parameter of 100ms. when the queue size is 0, the `receive` will have a strict check on the queue size, causing the following exception to be thrown
```
Traceback (most recent call last):
File "tst.py", line 10, in <module>
msg = consumer.receive()
File "/python3.7/site-packages/pulsar/__init__.py", line 930, in receive
msg = self._consumer.receive()
Exception: Pulsar error: InvalidConfiguration
```
### Modifications
* Removing timeout parameter in synchronous `receive`
* Add test for queue size is 0
### Verifying this change
Add Test
---
pulsar-client-cpp/python/pulsar_test.py | 17 +++++++++++++++++
pulsar-client-cpp/python/src/consumer.cc | 4 +---
2 files changed, 18 insertions(+), 3 deletions(-)
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index f31a7e1..3f8bc08 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -174,6 +174,23 @@ class PulsarTest(TestCase):
consumer.unsubscribe()
client.close()
+ def test_consumer_queue_size_is_zero(self):
+ client = Client(self.serviceUrl)
+ consumer = client.subscribe('my-python-topic-consumer-init-queue-size-is-zero',
+ 'my-sub',
+ consumer_type=ConsumerType.Shared,
+ receiver_queue_size=0,
+ initial_position=InitialPosition.Earliest)
+ producer = client.create_producer('my-python-topic-consumer-init-queue-size-is-zero')
+ producer.send(b'hello')
+ time.sleep(0.1)
+ msg = consumer.receive()
+ self.assertTrue(msg)
+ self.assertEqual(msg.data(), b'hello')
+
+ consumer.unsubscribe()
+ client.close()
+
def test_message_properties(self):
client = Client(self.serviceUrl)
topic = 'my-python-test-message-properties'
diff --git a/pulsar-client-cpp/python/src/consumer.cc b/pulsar-client-cpp/python/src/consumer.cc
index 930b159..815282d 100644
--- a/pulsar-client-cpp/python/src/consumer.cc
+++ b/pulsar-client-cpp/python/src/consumer.cc
@@ -33,9 +33,7 @@ Message Consumer_receive(Consumer& consumer) {
while (true) {
Py_BEGIN_ALLOW_THREADS
- // Use 100ms timeout to periodically check whether the
- // interpreter was interrupted
- res = consumer.receive(msg, 100);
+ res = consumer.receive(msg);
Py_END_ALLOW_THREADS
if (res != ResultTimeout) {