You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/04/08 14:37:59 UTC

[pulsar] branch master updated: [client-cpp] add subscription properties to consumer for cpp (#15020)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fc1cf25d056 [client-cpp] add subscription properties to consumer for cpp (#15020)
fc1cf25d056 is described below

commit fc1cf25d05676f1f302732e0d2fff7c728724e76
Author: La4nh <33...@users.noreply.github.com>
AuthorDate: Fri Apr 8 22:37:47 2022 +0800

    [client-cpp] add subscription properties to consumer for cpp (#15020)
    
    # Motivation
    Pulsar already support entry filter, But pulsar-client-cpp ConsumerImpl not contains subscriptionProperties Option. So, this PR want to solve it.
    
    # Modifications
    Enable subscriptionProperties on consumer for cpp client.
---
 pulsar-client-cpp/.gitignore                             |  1 +
 pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h | 15 +++++++++++++++
 pulsar-client-cpp/lib/Commands.cc                        |  8 ++++++++
 pulsar-client-cpp/lib/Commands.h                         |  1 +
 pulsar-client-cpp/lib/ConsumerConfiguration.cc           | 12 ++++++++++++
 pulsar-client-cpp/lib/ConsumerConfigurationImpl.h        |  1 +
 pulsar-client-cpp/lib/ConsumerImpl.cc                    |  6 +++---
 pulsar-client-cpp/tests/ConsumerConfigurationTest.cc     |  4 ++++
 8 files changed, 45 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-cpp/.gitignore b/pulsar-client-cpp/.gitignore
index f2a623dfad7..0d8d323e7ff 100644
--- a/pulsar-client-cpp/.gitignore
+++ b/pulsar-client-cpp/.gitignore
@@ -42,6 +42,7 @@ lib*.so*
 /examples/SampleConsumerListener
 /examples/SampleConsumerListenerCApi
 /examples/SampleReaderCApi
+/examples/SampleFileLogger
 /tests/main
 /perf/perfProducer
 /perf/perfConsumer
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 70c90cff3d7..0898b95736a 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -424,6 +424,21 @@ class PULSAR_PUBLIC ConsumerConfiguration {
      */
     ConsumerConfiguration& setProperties(const std::map<std::string, std::string>& properties);
 
+    /**
+     * Get all the subscription properties attached to this subscription.
+     */
+    std::map<std::string, std::string>& getSubscriptionProperties() const;
+
+    /**
+     * Sets a new subscription properties for this subscription.
+     * Notice: SubscriptionProperties are immutable, and consumers under the same subscription will fail to
+     * create a subscription if they use different properties.
+     *
+     * @param subscriptionProperties all the subscription properties in the provided map
+     */
+    ConsumerConfiguration& setSubscriptionProperties(
+        const std::map<std::string, std::string>& subscriptionProperties);
+
     /**
      * Set the Priority Level for consumer (0 is the default value and means the highest priority).
      *
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index a860ab75404..33134a6c04b 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -268,6 +268,7 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
                                     const std::string& consumerName, SubscriptionMode subscriptionMode,
                                     Optional<MessageId> startMessageId, bool readCompacted,
                                     const std::map<std::string, std::string>& metadata,
+                                    const std::map<std::string, std::string>& subscriptionProperties,
                                     const SchemaInfo& schemaInfo,
                                     CommandSubscribe_InitialPosition subscriptionInitialPosition,
                                     bool replicateSubscriptionState, KeySharedPolicy keySharedPolicy,
@@ -308,6 +309,13 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
         subscribe->mutable_metadata()->AddAllocated(keyValue);
     }
 
+    for (const auto& subscriptionProperty : subscriptionProperties) {
+        proto::KeyValue* keyValue = proto::KeyValue().New();
+        keyValue->set_key(subscriptionProperty.first);
+        keyValue->set_value(subscriptionProperty.second);
+        subscribe->mutable_subscription_properties()->AddAllocated(keyValue);
+    }
+
     if (subType == CommandSubscribe_SubType_Key_Shared) {
         KeySharedMeta& ksm = *subscribe->mutable_keysharedmeta();
         switch (keySharedPolicy.getKeySharedMode()) {
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index bab2211f7fa..87207e4df22 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -88,6 +88,7 @@ class Commands {
                                      proto::CommandSubscribe_SubType subType, const std::string& consumerName,
                                      SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId,
                                      bool readCompacted, const std::map<std::string, std::string>& metadata,
+                                     const std::map<std::string, std::string>& subscriptionProperties,
                                      const SchemaInfo& schemaInfo,
                                      proto::CommandSubscribe_InitialPosition subscriptionInitialPosition,
                                      bool replicateSubscriptionState, KeySharedPolicy keySharedPolicy,
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index b755063d419..2b58835cdbe 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -214,6 +214,18 @@ ConsumerConfiguration& ConsumerConfiguration::setProperties(
     return *this;
 }
 
+std::map<std::string, std::string>& ConsumerConfiguration::getSubscriptionProperties() const {
+    return impl_->subscriptionProperties;
+}
+
+ConsumerConfiguration& ConsumerConfiguration::setSubscriptionProperties(
+    const std::map<std::string, std::string>& subscriptionProperties) {
+    for (const auto& subscriptionProperty : subscriptionProperties) {
+        impl_->subscriptionProperties.emplace(subscriptionProperty.first, subscriptionProperty.second);
+    }
+    return *this;
+}
+
 ConsumerConfiguration& ConsumerConfiguration::setPriorityLevel(int priorityLevel) {
     if (priorityLevel < 0) {
         throw std::invalid_argument("Consumer Config Exception: PriorityLevel should be nonnegative number.");
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 1848f2dafad..1c13f729b55 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -48,6 +48,7 @@ struct ConsumerConfigurationImpl {
     int patternAutoDiscoveryPeriod{60};
     bool replicateSubscriptionStateEnabled{false};
     std::map<std::string, std::string> properties;
+    std::map<std::string, std::string> subscriptionProperties;
     int priorityLevel{0};
     KeySharedPolicy keySharedPolicy;
     size_t maxPendingChunkedMessage{10};
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index b5b5ceb046f..d3ba47f3a80 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -189,9 +189,9 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
     uint64_t requestId = client->newRequestId();
     SharedBuffer cmd = Commands::newSubscribe(
         topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
-        startMessageId, readCompacted_, config_.getProperties(), config_.getSchema(), getInitialPosition(),
-        config_.isReplicateSubscriptionStateEnabled(), config_.getKeySharedPolicy(),
-        config_.getPriorityLevel());
+        startMessageId, readCompacted_, config_.getProperties(), config_.getSubscriptionProperties(),
+        config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(),
+        config_.getKeySharedPolicy(), config_.getPriorityLevel());
     cnx->sendRequestWithId(cmd, requestId)
         .addListener(
             std::bind(&ConsumerImpl::handleCreateConsumer, shared_from_this(), cnx, std::placeholders::_1));
diff --git a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
index fc67e867e3d..24f541b57ba 100644
--- a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
@@ -139,6 +139,10 @@ TEST(ConsumerConfigurationTest, testCustomConfig) {
     ASSERT_EQ(conf.getProperties()["k1"], "v1");
     ASSERT_EQ(conf.hasProperty("k1"), true);
 
+    std::map<std::string, std::string> subscriptionProperties = {{"k1", "v1"}};
+    conf.setSubscriptionProperties(subscriptionProperties);
+    ASSERT_EQ(conf.getSubscriptionProperties()["k1"], "v1");
+
     conf.setPriorityLevel(1);
     ASSERT_EQ(conf.getPriorityLevel(), 1);