You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/11/21 06:41:47 UTC

[pulsar] branch master updated: Fix consumer of python queue size is zero (#5706)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


View the commit online:
https://github.com/apache/pulsar/commit/fa029700d3edbc398356f6b74cad1120649d2494

The following commit(s) were added to refs/heads/master by this push:
     new fa02970  Fix consumer of python queue size is zero (#5706)
fa02970 is described below

commit fa029700d3edbc398356f6b74cad1120649d2494
Author: tuteng <gu...@apache.org>
AuthorDate: Thu Nov 21 14:41:39 2019 +0800

    Fix consumer of python queue size is zero (#5706)
    
    Fixes https://github.com/apache/pulsar/issues/5634
    
    
    Master Issue: https://github.com/apache/pulsar/issues/5634
    
    ### Motivation
    
    In java clients, when we call the `receive`, we will block it until a message arrives. in python clients, when we call the `receive` function, we add a delay parameter of 100ms. when the queue size is 0, the `receive` will have a strict check on the queue size, causing the following exception to be thrown
    
    ```
    Traceback (most recent call last):
      File "tst.py", line 10, in <module>
        msg = consumer.receive()
      File "/python3.7/site-packages/pulsar/__init__.py", line 930, in receive
        msg = self._consumer.receive()
    Exception: Pulsar error: InvalidConfiguration
    ```
    ### Modifications
    
    * Removing timeout parameter in synchronous `receive`
    * Add test for queue size is 0
    
    ### Verifying this change
    
    Add Test
---
 pulsar-client-cpp/python/pulsar_test.py  | 17 +++++++++++++++++
 pulsar-client-cpp/python/src/consumer.cc |  4 +---
 2 files changed, 18 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index f31a7e1..3f8bc08 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -174,6 +174,23 @@ class PulsarTest(TestCase):
         consumer.unsubscribe()
         client.close()
 
+    def test_consumer_queue_size_is_zero(self):
+        client = Client(self.serviceUrl)
+        consumer = client.subscribe('my-python-topic-consumer-init-queue-size-is-zero',
+                                    'my-sub',
+                                    consumer_type=ConsumerType.Shared,
+                                    receiver_queue_size=0,
+                                    initial_position=InitialPosition.Earliest)
+        producer = client.create_producer('my-python-topic-consumer-init-queue-size-is-zero')
+        producer.send(b'hello')
+        time.sleep(0.1)
+        msg = consumer.receive()
+        self.assertTrue(msg)
+        self.assertEqual(msg.data(), b'hello')
+
+        consumer.unsubscribe()
+        client.close()
+
     def test_message_properties(self):
         client = Client(self.serviceUrl)
         topic = 'my-python-test-message-properties'
diff --git a/pulsar-client-cpp/python/src/consumer.cc b/pulsar-client-cpp/python/src/consumer.cc
index 930b159..815282d 100644
--- a/pulsar-client-cpp/python/src/consumer.cc
+++ b/pulsar-client-cpp/python/src/consumer.cc
@@ -33,9 +33,7 @@ Message Consumer_receive(Consumer& consumer) {
 
     while (true) {
         Py_BEGIN_ALLOW_THREADS
-        // Use 100ms timeout to periodically check whether the
-        // interpreter was interrupted
-        res = consumer.receive(msg, 100);
+        res = consumer.receive(msg);
         Py_END_ALLOW_THREADS
 
         if (res != ResultTimeout) {