You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/19 15:31:13 UTC

[pulsar] branch master updated: [Issue #3712][python-client] exposing InitialPosition management in the ConsumerConfiguration. (#3714)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 614a98d  [Issue #3712][python-client] exposing InitialPosition management in the ConsumerConfiguration. (#3714)
614a98d is described below

commit 614a98d01a3d3e3f8cad76b758f6d3364f5c62ce
Author: Le Labourier Marc <ma...@gmail.com>
AuthorDate: Tue Mar 19 16:31:08 2019 +0100

    [Issue #3712][python-client] exposing InitialPosition management in the ConsumerConfiguration. (#3714)
    
    ### Motivation
    PR #3567 introduced the SubscriptionInitialPosition option in ConsumerConfiguration in the CPP client but did not exposed any methods to be able to do it with the Python client.
    
    This PR aims to expose the code introduced in the previous PR to allows Python user to choose the initial position of the consumer in python.
    
    ### Modifications
    
    - Implemented a boost object to expose InitialPosition Enum.
    - Added to boost ConsumerConfiguration object the getter/setter of the InitialPosition attribute.
    - Added a initial_position parameter to Client.subscribe in order to modify the ConsumerConfiguration instance created.
    
    ### Verifying this change
    
    This change is a trivial rework / code cleanup without any test coverage.
---
 pulsar-client-cpp/python/pulsar/__init__.py | 11 ++++++++--
 pulsar-client-cpp/python/pulsar_test.py     | 34 ++++++++++++++++++++++++++++-
 pulsar-client-cpp/python/src/config.cc      |  2 ++
 pulsar-client-cpp/python/src/enums.cc       |  4 ++++
 4 files changed, 48 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py
index 8d972aa..54db963 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -101,7 +101,7 @@ To install the Python bindings:
 
 import _pulsar
 
-from _pulsar import Result, CompressionType, ConsumerType, PartitionsRoutingMode  # noqa: F401
+from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode  # noqa: F401
 
 from pulsar.functions.function import Function
 from pulsar.functions.context import Context
@@ -476,7 +476,8 @@ class Client:
                   negative_ack_redelivery_delay_ms=60000,
                   is_read_compacted=False,
                   properties=None,
-                  pattern_auto_discovery_period=60
+                  pattern_auto_discovery_period=60,
+                  initial_position=InitialPosition.Latest
                   ):
         """
         Subscribe to the given topic and subscription combination.
@@ -543,6 +544,10 @@ class Client:
           can be used for identify a consumer at broker side.
         * `pattern_auto_discovery_period`:
           Periods of seconds for consumer to auto discover match topics.
+        * `initial_position`:
+          Set the initial position of a consumer  when subscribing to the topic.
+          It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`.
+          Default: `Latest`.
         """
         _check_type(str, subscription_name, 'subscription_name')
         _check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -557,6 +562,7 @@ class Client:
         _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period')
         _check_type(bool, is_read_compacted, 'is_read_compacted')
         _check_type_or_none(dict, properties, 'properties')
+        _check_type(InitialPosition, initial_position, 'initial_position')
 
         conf = _pulsar.ConsumerConfiguration()
         conf.consumer_type(consumer_type)
@@ -575,6 +581,7 @@ class Client:
         if properties:
             for k, v in properties.items():
                 conf.property(k, v)
+        conf.subscription_initial_position(initial_position)
 
         conf.schema(schema.schema_info())
 
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index 18d1a23..6e4a14b 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -24,7 +24,7 @@ import time
 import os
 from pulsar import Client, MessageId, \
             CompressionType, ConsumerType, PartitionsRoutingMode, \
-            AuthenticationTLS, Authentication, AuthenticationToken
+            AuthenticationTLS, Authentication, AuthenticationToken, InitialPosition
 
 from _pulsar import ProducerConfiguration, ConsumerConfiguration
 
@@ -140,6 +140,38 @@ class PulsarTest(TestCase):
         consumer.unsubscribe()
         client.close()
 
+    def test_consumer_initial_position(self):
+        client = Client(self.serviceUrl)
+        producer = client.create_producer('my-python-topic-producer-consumer')
+
+        # Sending 5 messages before consumer creation.
+        # These should be received with initial_position set to Earliest but not with Latest.
+        for i in range(5):
+            producer.send(b'hello-%d' % i)
+
+        consumer = client.subscribe('my-python-topic-producer-consumer',
+                                    'my-sub',
+                                    consumer_type=ConsumerType.Shared,
+                                    initial_position=InitialPosition.Earliest)
+
+        # Sending 5 other messages that should be received regardless of the initial_position.
+        for i in range(5, 10):
+            producer.send(b'hello-%d' % i)
+
+        for i in range(10):
+            msg = consumer.receive(1000)
+            self.assertTrue(msg)
+            self.assertEqual(msg.data(), b'hello-%d' % i)
+
+        try:
+            msg = consumer.receive(100)
+            self.assertTrue(False)  # Should not reach this point
+        except:
+            pass  # Exception is expected
+
+        consumer.unsubscribe()
+        client.close()
+
     def test_message_properties(self):
         client = Client(self.serviceUrl)
         topic = 'my-python-test-message-properties'
diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc
index 2c985de..4b1454b 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -151,6 +151,8 @@ void export_config() {
             .def("read_compacted", &ConsumerConfiguration::isReadCompacted)
             .def("read_compacted", &ConsumerConfiguration::setReadCompacted)
             .def("property", &ConsumerConfiguration::setProperty, return_self<>())
+            .def("subscription_initial_position", &ConsumerConfiguration::getSubscriptionInitialPosition)
+            .def("subscription_initial_position", &ConsumerConfiguration::setSubscriptionInitialPosition)
             ;
 
     class_<ReaderConfiguration>("ReaderConfiguration")
diff --git a/pulsar-client-cpp/python/src/enums.cc b/pulsar-client-cpp/python/src/enums.cc
index 3c0a6b9..24410ca 100644
--- a/pulsar-client-cpp/python/src/enums.cc
+++ b/pulsar-client-cpp/python/src/enums.cc
@@ -94,4 +94,8 @@ void export_enums() {
             .value("KEY_VALUE", KEY_VALUE)
             ;
 
+    enum_<InitialPosition>("InitialPosition", "Supported initial position")
+            .value("Latest", InitialPositionLatest)
+            .value("Earliest", InitialPositionEarliest)
+            ;
 }