You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/09/19 17:15:01 UTC

[GitHub] merlimat closed pull request #2580: [Python] Consolidated duplicated subscribe_*() methods into a single one

merlimat closed pull request #2580: [Python] Consolidated duplicated subscribe_*() methods into a single one
URL: https://github.com/apache/incubator-pulsar/pull/2580
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py
index 806c7e2032..6849ecc245 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -104,6 +104,9 @@ def send_callback(res, msg):
 from pulsar.functions.context import Context
 from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe
 
+import re
+_retype = type(re.compile('x'))
+
 class MessageId:
     """
     Represents a message id
@@ -412,114 +415,19 @@ def subscribe(self, topic, subscription_name,
                   unacked_messages_timeout_ms=None,
                   broker_consumer_stats_cache_time_ms=30000,
                   is_read_compacted=False,
-                  properties=None
+                  properties=None,
+                  pattern_auto_discovery_period=60
                   ):
         """
         Subscribe to the given topic and subscription combination.
 
         **Args**
 
-        * `topic`: The name of the topic.
-        * `subscription`: The name of the subscription.
-
-        **Options**
-
-        * `consumer_type`:
-          Select the subscription type to be used when subscribing to the topic.
-        * `message_listener`:
-          Sets a message listener for the consumer. When the listener is set,
-          the application will receive messages through it. Calls to
-          `consumer.receive()` will not be allowed. The listener function needs
-          to accept (consumer, message), for example:
-
-                #!python
-                def my_listener(consumer, message):
-                    # process message
-                    consumer.acknowledge(message)
-
-        * `receiver_queue_size`:
-          Sets the size of the consumer receive queue. The consumer receive
-          queue controls how many messages can be accumulated by the consumer
-          before the application calls `receive()`. Using a higher value could
-          potentially increase the consumer throughput at the expense of higher
-          memory utilization. Setting the consumer queue size to zero decreases
-          the throughput of the consumer by disabling pre-fetching of messages.
-          This approach improves the message distribution on shared subscription
-          by pushing messages only to those consumers that are ready to process
-          them. Neither receive with timeout nor partitioned topics can be used
-          if the consumer queue size is zero. The `receive()` function call
-          should not be interrupted when the consumer queue size is zero. The
-          default value is 1000 messages and should work well for most use
-          cases.
-        * `max_total_receiver_queue_size_across_partitions`
-          Set the max total receiver queue size across partitions.
-          This setting will be used to reduce the receiver queue size for individual partitions
-        * `consumer_name`:
-          Sets the consumer name.
-        * `unacked_messages_timeout_ms`:
-          Sets the timeout in milliseconds for unacknowledged messages. The
-          timeout needs to be greater than 10 seconds. An exception is thrown if
-          the given value is less than 10 seconds. If a successful
-          acknowledgement is not sent within the timeout, all the unacknowledged
-          messages are redelivered.
-        * `broker_consumer_stats_cache_time_ms`:
-          Sets the time duration for which the broker-side consumer stats will
-          be cached in the client.
-        * `properties`:
-          Sets the properties for the consumer. The properties associated with a consumer
-          can be used for identify a consumer at broker side.
-        """
-        _check_type(str, topic, 'topic')
-        _check_type(str, subscription_name, 'subscription_name')
-        _check_type(ConsumerType, consumer_type, 'consumer_type')
-        _check_type(int, receiver_queue_size, 'receiver_queue_size')
-        _check_type(int, max_total_receiver_queue_size_across_partitions,
-                    'max_total_receiver_queue_size_across_partitions')
-        _check_type_or_none(str, consumer_name, 'consumer_name')
-        _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
-        _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
-        _check_type(bool, is_read_compacted, 'is_read_compacted')
-        _check_type_or_none(dict, properties, 'properties')
-
-        conf = _pulsar.ConsumerConfiguration()
-        conf.consumer_type(consumer_type)
-        conf.read_compacted(is_read_compacted)
-        if message_listener:
-            conf.message_listener(message_listener)
-        conf.receiver_queue_size(receiver_queue_size)
-        conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
-        if consumer_name:
-            conf.consumer_name(consumer_name)
-        if unacked_messages_timeout_ms:
-            conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
-        conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
-        if properties:
-            for k, v in properties.items():
-                conf.property(k, v)
-
-        c = Consumer()
-        c._consumer = self._client.subscribe(topic, subscription_name, conf)
-        c._client = self
-        self._consumers.append(c)
-        return c
-
-    def subscribe_topics(self, topics, subscription_name,
-                         consumer_type=ConsumerType.Exclusive,
-                         message_listener=None,
-                         receiver_queue_size=1000,
-                         max_total_receiver_queue_size_across_partitions=50000,
-                         consumer_name=None,
-                         unacked_messages_timeout_ms=None,
-                         broker_consumer_stats_cache_time_ms=30000,
-                         is_read_compacted=False,
-                         properties=None
-                         ):
-        """
-        Subscribe to the given topics and subscription combination.
-
-        **Args**
-
-        * `topics`: The list name of the topics.
+        * `topic`: The name of the topic, list of topics or regex pattern.
+                  This method will accept these forms:
+                    - `topic='my-topic'`
+                    - `topic=['topic-1', 'topic-2', 'topic-3']`
+                    - `topic=re.compile('topic-.*')`
         * `subscription`: The name of the subscription.
 
         **Options**
@@ -568,111 +476,9 @@ def my_listener(consumer, message):
         * `properties`:
           Sets the properties for the consumer. The properties associated with a consumer
           can be used for identify a consumer at broker side.
-        """
-        _check_type(list, topics, 'topics')
-        _check_type(str, subscription_name, 'subscription_name')
-        _check_type(ConsumerType, consumer_type, 'consumer_type')
-        _check_type(int, receiver_queue_size, 'receiver_queue_size')
-        _check_type(int, max_total_receiver_queue_size_across_partitions,
-                    'max_total_receiver_queue_size_across_partitions')
-        _check_type_or_none(str, consumer_name, 'consumer_name')
-        _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
-        _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
-        _check_type(bool, is_read_compacted, 'is_read_compacted')
-        _check_type_or_none(dict, properties, 'properties')
-
-        conf = _pulsar.ConsumerConfiguration()
-        conf.consumer_type(consumer_type)
-        conf.read_compacted(is_read_compacted)
-        if message_listener:
-            conf.message_listener(message_listener)
-        conf.receiver_queue_size(receiver_queue_size)
-        conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
-        if consumer_name:
-            conf.consumer_name(consumer_name)
-        if unacked_messages_timeout_ms:
-            conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
-        conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
-        if properties:
-            for k, v in properties.items():
-                conf.property(k, v)
-
-        c = Consumer()
-        c._consumer = self._client.subscribe_topics(topics, subscription_name, conf)
-        c._client = self
-        self._consumers.append(c)
-        return c
-
-    def subscribe_pattern(self, topics_pattern, subscription_name,
-                          consumer_type=ConsumerType.Exclusive,
-                          message_listener=None,
-                          receiver_queue_size=1000,
-                          max_total_receiver_queue_size_across_partitions=50000,
-                          consumer_name=None,
-                          unacked_messages_timeout_ms=None,
-                          broker_consumer_stats_cache_time_ms=30000,
-                          is_read_compacted=False,
-                          pattern_auto_discovery_period=60,
-                          properties=None
-                          ):
-        """
-        Subscribe to multiple topics, which match given regexPattern, under the same namespace.
-
-        **Args**
-
-        * `topics_pattern`: The regex pattern to match topics.
-        * `subscription`: The name of the subscription.
-
-        **Options**
-
-        * `consumer_type`:
-          Select the subscription type to be used when subscribing to the topic.
-        * `message_listener`:
-          Sets a message listener for the consumer. When the listener is set,
-          the application will receive messages through it. Calls to
-          `consumer.receive()` will not be allowed. The listener function needs
-          to accept (consumer, message), for example:
-
-                #!python
-                def my_listener(consumer, message):
-                    # process message
-                    consumer.acknowledge(message)
-
-        * `receiver_queue_size`:
-          Sets the size of the consumer receive queue. The consumer receive
-          queue controls how many messages can be accumulated by the consumer
-          before the application calls `receive()`. Using a higher value could
-          potentially increase the consumer throughput at the expense of higher
-          memory utilization. Setting the consumer queue size to zero decreases
-          the throughput of the consumer by disabling pre-fetching of messages.
-          This approach improves the message distribution on shared subscription
-          by pushing messages only to those consumers that are ready to process
-          them. Neither receive with timeout nor partitioned topics can be used
-          if the consumer queue size is zero. The `receive()` function call
-          should not be interrupted when the consumer queue size is zero. The
-          default value is 1000 messages and should work well for most use
-          cases.
-        * `max_total_receiver_queue_size_across_partitions`
-          Set the max total receiver queue size across partitions.
-          This setting will be used to reduce the receiver queue size for individual partitions
-        * `consumer_name`:
-          Sets the consumer name.
-        * `unacked_messages_timeout_ms`:
-          Sets the timeout in milliseconds for unacknowledged messages. The
-          timeout needs to be greater than 10 seconds. An exception is thrown if
-          the given value is less than 10 seconds. If a successful
-          acknowledgement is not sent within the timeout, all the unacknowledged
-          messages are redelivered.
-        * `broker_consumer_stats_cache_time_ms`:
-          Sets the time duration for which the broker-side consumer stats will
-          be cached in the client.
         * `pattern_auto_discovery_period`:
           Periods of seconds for consumer to auto discover match topics.
-        * `properties`:
-          Sets the properties for the consumer. The properties associated with a consumer
-          can be used for identify a consumer at broker side.
         """
-        _check_type(str, topics_pattern, 'topics_pattern')
         _check_type(str, subscription_name, 'subscription_name')
         _check_type(ConsumerType, consumer_type, 'consumer_type')
         _check_type(int, receiver_queue_size, 'receiver_queue_size')
@@ -682,7 +488,6 @@ def my_listener(consumer, message):
         _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
         _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
         _check_type(bool, is_read_compacted, 'is_read_compacted')
-        _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period')
         _check_type_or_none(dict, properties, 'properties')
 
         conf = _pulsar.ConsumerConfiguration()
@@ -697,13 +502,23 @@ def my_listener(consumer, message):
         if unacked_messages_timeout_ms:
             conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
         conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
-        conf.pattern_auto_discovery_period(pattern_auto_discovery_period)
         if properties:
             for k, v in properties.items():
                 conf.property(k, v)
 
         c = Consumer()
-        c._consumer = self._client.subscribe_pattern(topics_pattern, subscription_name, conf)
+        if isinstance(topic, str):
+            # Single topic
+            c._consumer = self._client.subscribe(topic, subscription_name, conf)
+        elif isinstance(topic, list):
+            # List of topics
+            c._consumer = self._client.subscribe_topics(topic, subscription_name, conf)
+        elif isinstance(topic, _retype):
+            # Regex pattern
+            c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf)
+        else:
+            raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)")
+
         c._client = self
         self._consumers.append(c)
         return c
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index 7fd60e661a..d5163bcddf 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -669,11 +669,11 @@ def test_topics_consumer(self):
         producer2 = client.create_producer(topic2)
         producer3 = client.create_producer(topic3)
 
-        consumer = client.subscribe_topics(topics,
-                                           'my-topics-consumer-sub',
-                                           consumer_type=ConsumerType.Shared,
-                                           receiver_queue_size=10
-                                           )
+        consumer = client.subscribe(topics,
+                                    'my-topics-consumer-sub',
+                                    consumer_type=ConsumerType.Shared,
+                                    receiver_queue_size=10
+                                    )
 
         for i in range(100):
             producer1.send('hello-1-%d' % i)
@@ -699,18 +699,11 @@ def test_topics_consumer(self):
         client.close()
 
     def test_topics_pattern_consumer(self):
+        import re
         client = Client(self.serviceUrl)
 
         topics_pattern = 'persistent://sample/standalone/ns/my-python-pattern-consumer.*'
 
-
-        consumer = client.subscribe_pattern(topics_pattern,
-                                            'my-pattern-consumer-sub',
-                                            consumer_type=ConsumerType.Shared,
-                                            receiver_queue_size=10,
-                                            pattern_auto_discovery_period=1
-                                            )
-
         topic1 = 'persistent://sample/standalone/ns/my-python-pattern-consumer-1'
         topic2 = 'persistent://sample/standalone/ns/my-python-pattern-consumer-2'
         topic3 = 'persistent://sample/standalone/ns/my-python-pattern-consumer-3'
@@ -727,6 +720,13 @@ def test_topics_pattern_consumer(self):
         producer2 = client.create_producer(topic2)
         producer3 = client.create_producer(topic3)
 
+        consumer = client.subscribe(re.compile(topics_pattern),
+                                    'my-pattern-consumer-sub',
+                                    consumer_type = ConsumerType.Shared,
+                                    receiver_queue_size = 10,
+                                    pattern_auto_discovery_period = 1
+                                   )
+
         # wait enough time to trigger auto discovery
         time.sleep(2)
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services