You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/05/28 12:11:25 UTC

[pulsar] branch master updated: [cpp client]support key shared for cpp client (#4366)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ab6b1b  [cpp client]support key shared for cpp client (#4366)
8ab6b1b is described below

commit 8ab6b1bf34e0e5653e68a53e47c2942a47675c24
Author: 冉小龙 <ra...@gmail.com>
AuthorDate: Tue May 28 20:11:19 2019 +0800

    [cpp client]support key shared for cpp client (#4366)
    
    support key shared for cpp client
---
 pulsar-client-cpp/include/pulsar/ConsumerType.h     |  8 +++++++-
 pulsar-client-cpp/include/pulsar/Message.h          | 15 +++++++++++++++
 pulsar-client-cpp/include/pulsar/MessageBuilder.h   |  8 +++++++-
 .../include/pulsar/c/consumer_configuration.h       |  8 +++++++-
 pulsar-client-cpp/include/pulsar/c/message.h        | 15 ++++++++++++++-
 pulsar-client-cpp/lib/BrokerConsumerStatsImpl.cc    |  2 ++
 pulsar-client-cpp/lib/ConsumerImpl.cc               |  3 +++
 pulsar-client-cpp/lib/Message.cc                    | 14 ++++++++++++++
 pulsar-client-cpp/lib/MessageBuilder.cc             |  6 ++++++
 pulsar-client-cpp/lib/MessageImpl.cc                |  6 ++++++
 pulsar-client-cpp/lib/MessageImpl.h                 |  4 ++++
 pulsar-client-cpp/lib/c/c_Message.cc                | 10 ++++++++++
 .../tests/ConsumerConfigurationTest.cc              | 21 +++++++++++++++++++++
 13 files changed, 116 insertions(+), 4 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ConsumerType.h b/pulsar-client-cpp/include/pulsar/ConsumerType.h
index 6621123..8068a5c 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerType.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerType.h
@@ -36,7 +36,13 @@ enum ConsumerType
     /** Only one consumer is active on the subscription; Subscription can have N consumers
      *  connected one of which will get promoted to master if the current master becomes inactive
      */
-    ConsumerFailover
+    ConsumerFailover,
+
+    /**
+     * Multiple consumer will be able to use the same subscription and all messages with the same key
+     * will be dispatched to only one consumer
+     */
+    ConsumerKeyShared
 };
 }
 
diff --git a/pulsar-client-cpp/include/pulsar/Message.h b/pulsar-client-cpp/include/pulsar/Message.h
index 87e29a2..4341a7d 100644
--- a/pulsar-client-cpp/include/pulsar/Message.h
+++ b/pulsar-client-cpp/include/pulsar/Message.h
@@ -111,6 +111,21 @@ class PULSAR_PUBLIC Message {
     bool hasPartitionKey() const;
 
     /**
+     * Get the ordering key of the message
+     *
+     * @return the ordering key of the message
+     */
+    const std::string& getOrderingKey() const;
+
+    /**
+     * Check whether the message has a ordering key
+     *
+     * @return true if the ordering key was set while creating the message
+     *         false if the ordering key was not set while creating the message
+     */
+    bool hasOrderingKey() const;
+
+    /**
      * Get the UTC based timestamp in milliseconds referring to when the message was published by the client
      * producer
      */
diff --git a/pulsar-client-cpp/include/pulsar/MessageBuilder.h b/pulsar-client-cpp/include/pulsar/MessageBuilder.h
index 20d5f0d..39a5ece 100644
--- a/pulsar-client-cpp/include/pulsar/MessageBuilder.h
+++ b/pulsar-client-cpp/include/pulsar/MessageBuilder.h
@@ -62,13 +62,19 @@ class PULSAR_PUBLIC MessageBuilder {
      */
     MessageBuilder& setProperties(const StringMap& properties);
 
-    /*
+    /**
      * set partition key for the message routing
      * @param hash of this key is used to determine message's topic partition
      */
     MessageBuilder& setPartitionKey(const std::string& partitionKey);
 
     /**
+     * set ordering key for the message routing
+     * @param the ordering key for the message
+     */
+    MessageBuilder& setOrderingKey(const std::string& orderingKey);
+
+    /**
      * Set the event timestamp for the message.
      */
     MessageBuilder& setEventTimestamp(uint64_t eventTimestamp);
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
index 07b8ee8..737b768 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
@@ -43,7 +43,13 @@ typedef enum {
     /** Only one consumer is active on the subscription; Subscription can have N consumers
      *  connected one of which will get promoted to master if the current master becomes inactive
      */
-    pulsar_ConsumerFailover
+    pulsar_ConsumerFailover,
+
+    /**
+     * Multiple consumer will be able to use the same subscription and all messages with the same key
+     * will be dispatched to only one consumer
+     */
+    pulsar_ConsumerKeyShared
 } pulsar_consumer_type;
 
 typedef enum {
diff --git a/pulsar-client-cpp/include/pulsar/c/message.h b/pulsar-client-cpp/include/pulsar/c/message.h
index ce86382..d689403 100644
--- a/pulsar-client-cpp/include/pulsar/c/message.h
+++ b/pulsar-client-cpp/include/pulsar/c/message.h
@@ -49,13 +49,19 @@ PULSAR_PUBLIC void pulsar_message_set_allocated_content(pulsar_message_t *messag
 PULSAR_PUBLIC void pulsar_message_set_property(pulsar_message_t *message, const char *name,
                                                const char *value);
 
-/*
+/**
  * set partition key for the message routing
  * @param hash of this key is used to determine message's topic partition
  */
 PULSAR_PUBLIC void pulsar_message_set_partition_key(pulsar_message_t *message, const char *partitionKey);
 
 /**
+ * Sets the ordering key of the message for message dispatch in Key_Shared mode.
+ * @param the ordering key for the message
+ */
+PULSAR_PUBLIC void pulsar_message_set_ordering_key(pulsar_message_t *message, const char *orderingKey);
+
+/**
  * Set the event timestamp for the message.
  */
 PULSAR_PUBLIC void pulsar_message_set_event_timestamp(pulsar_message_t *message, uint64_t eventTimestamp);
@@ -158,6 +164,13 @@ PULSAR_PUBLIC const char *pulsar_message_get_partitionKey(pulsar_message_t *mess
 PULSAR_PUBLIC int pulsar_message_has_partition_key(pulsar_message_t *message);
 
 /**
+ * Get the ordering key of the message for message dispatch in Key_Shared mode.
+ * Partition key Will be used if ordering key not specified
+ */
+PULSAR_PUBLIC const char *pulsar_message_get_orderingKey(pulsar_message_t *message);
+PULSAR_PUBLIC int pulsar_message_has_ordering_key(pulsar_message_t *message);
+
+/**
  * Get the UTC based timestamp in milliseconds referring to when the message was published by the client
  * producer
  */
diff --git a/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.cc b/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.cc
index 4032f5f..220415a 100644
--- a/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.cc
+++ b/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.cc
@@ -95,6 +95,8 @@ ConsumerType BrokerConsumerStatsImpl::convertStringToConsumerType(const std::str
         return ConsumerFailover;
     } else if (str == "ConsumerShared" || str == "Shared") {
         return ConsumerShared;
+    } else if (str == "ConsumerKeyShared" || str == "KeyShared") {
+        return ConsumerKeyShared;
     } else {
         return ConsumerExclusive;
     }
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 2cc5a3b..31df0eb 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -719,6 +719,9 @@ inline proto::CommandSubscribe_SubType ConsumerImpl::getSubType() {
 
         case ConsumerFailover:
             return proto::CommandSubscribe::Failover;
+
+        case ConsumerKeyShared:
+            return proto::CommandSubscribe_SubType_Key_Shared;
     }
 }
 
diff --git a/pulsar-client-cpp/lib/Message.cc b/pulsar-client-cpp/lib/Message.cc
index 510650b..355c483 100644
--- a/pulsar-client-cpp/lib/Message.cc
+++ b/pulsar-client-cpp/lib/Message.cc
@@ -110,6 +110,20 @@ const std::string& Message::getPartitionKey() const {
     return impl_->getPartitionKey();
 }
 
+bool Message::hasOrderingKey() const {
+    if (impl_) {
+        return impl_->hasOrderingKey();
+    }
+    return false;
+}
+
+const std::string& Message::getOrderingKey() const {
+    if (!impl_) {
+        return emptyString;
+    }
+    return impl_->getOrderingKey();
+}
+
 const std::string& Message::getTopicName() const {
     if (!impl_) {
         return emptyString;
diff --git a/pulsar-client-cpp/lib/MessageBuilder.cc b/pulsar-client-cpp/lib/MessageBuilder.cc
index b1e58c1..e11c425 100644
--- a/pulsar-client-cpp/lib/MessageBuilder.cc
+++ b/pulsar-client-cpp/lib/MessageBuilder.cc
@@ -94,6 +94,12 @@ MessageBuilder& MessageBuilder::setPartitionKey(const std::string& partitionKey)
     return *this;
 }
 
+MessageBuilder& MessageBuilder::setOrderingKey(const std::string& orderingKey) {
+    checkMetadata();
+    impl_->metadata.set_ordering_key(orderingKey);
+    return *this;
+}
+
 MessageBuilder& MessageBuilder::setEventTimestamp(uint64_t eventTimestamp) {
     checkMetadata();
     impl_->metadata.set_event_time(eventTimestamp);
diff --git a/pulsar-client-cpp/lib/MessageImpl.cc b/pulsar-client-cpp/lib/MessageImpl.cc
index 9b59eff..60ce86b 100644
--- a/pulsar-client-cpp/lib/MessageImpl.cc
+++ b/pulsar-client-cpp/lib/MessageImpl.cc
@@ -37,6 +37,10 @@ const std::string& MessageImpl::getPartitionKey() const { return metadata.partit
 
 bool MessageImpl::hasPartitionKey() const { return metadata.has_partition_key(); }
 
+const std::string& MessageImpl::getOrderingKey() const { return metadata.ordering_key(); }
+
+bool MessageImpl::hasOrderingKey() const { return metadata.has_ordering_key(); }
+
 uint64_t MessageImpl::getPublishTimestamp() const {
     if (metadata.has_publish_time()) {
         return metadata.publish_time();
@@ -77,6 +81,8 @@ void MessageImpl::setPartitionKey(const std::string& partitionKey) {
     metadata.set_partition_key(partitionKey);
 }
 
+void MessageImpl::setOrderingKey(const std::string& orderingKey) { metadata.set_ordering_key(orderingKey); }
+
 void MessageImpl::setEventTimestamp(uint64_t eventTimestamp) { metadata.set_event_time(eventTimestamp); }
 
 void MessageImpl::setTopicName(const std::string& topicName) {
diff --git a/pulsar-client-cpp/lib/MessageImpl.h b/pulsar-client-cpp/lib/MessageImpl.h
index 140d86d..9fd4a4d 100644
--- a/pulsar-client-cpp/lib/MessageImpl.h
+++ b/pulsar-client-cpp/lib/MessageImpl.h
@@ -46,6 +46,9 @@ class MessageImpl {
     const std::string& getPartitionKey() const;
     bool hasPartitionKey() const;
 
+    const std::string& getOrderingKey() const;
+    bool hasOrderingKey() const;
+
     uint64_t getPublishTimestamp() const;
     uint64_t getEventTimestamp() const;
 
@@ -67,6 +70,7 @@ class MessageImpl {
     void setProperty(const std::string& name, const std::string& value);
     void disableReplication(bool flag);
     void setPartitionKey(const std::string& partitionKey);
+    void setOrderingKey(const std::string& orderingKey);
     void setEventTimestamp(uint64_t eventTimestamp);
     Message::StringMap properties_;
 };
diff --git a/pulsar-client-cpp/lib/c/c_Message.cc b/pulsar-client-cpp/lib/c/c_Message.cc
index 3047fa0..a020d30 100644
--- a/pulsar-client-cpp/lib/c/c_Message.cc
+++ b/pulsar-client-cpp/lib/c/c_Message.cc
@@ -40,6 +40,10 @@ void pulsar_message_set_partition_key(pulsar_message_t *message, const char *par
     message->builder.setPartitionKey(partitionKey);
 }
 
+void pulsar_message_set_ordering_key(pulsar_message_t *message, const char *orderingKey) {
+    message->builder.setOrderingKey(orderingKey);
+}
+
 void pulsar_message_set_event_timestamp(pulsar_message_t *message, uint64_t eventTimestamp) {
     message->builder.setEventTimestamp(eventTimestamp);
 }
@@ -87,6 +91,12 @@ const char *pulsar_message_get_partitionKey(pulsar_message_t *message) {
 
 int pulsar_message_has_partition_key(pulsar_message_t *message) { return message->message.hasPartitionKey(); }
 
+const char *pulsar_message_get_orderingKey(pulsar_message_t *message) {
+    return message->message.getOrderingKey().c_str();
+}
+
+int pulsar_message_has_ordering_key(pulsar_message_t *message) { return message->message.hasOrderingKey(); }
+
 uint64_t pulsar_message_get_publish_timestamp(pulsar_message_t *message) {
     return message->message.getPublishTimestamp();
 }
diff --git a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
index 0a7411d..76d2b34 100644
--- a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
@@ -69,6 +69,27 @@ TEST(ConsumerConfigurationTest, testReadCompactPersistentFailover) {
     consumer.close();
 }
 
+TEST(ConsumerConfigurationTest, testSubscribePersistentKeyShared) {
+    std::string lookupUrl = "pulsar://localhost:6650";
+    std::string topicName = "persist-key-shared-topic";
+    std::string subName = "test-persist-key-shared";
+
+    Result result;
+
+    ConsumerConfiguration config;
+    // now, key-shared not support read compact
+    config.setReadCompacted(false);
+    config.setConsumerType(ConsumerKeyShared);
+
+    ClientConfiguration clientConfig;
+    Client client(lookupUrl, clientConfig);
+
+    Consumer consumer;
+    result = client.subscribe(topicName, subName, config, consumer);
+    ASSERT_EQ(ResultOk, result);
+    consumer.close();
+}
+
 TEST(ConsumerConfigurationTest, testReadCompactPersistentShared) {
     std::string lookupUrl = "pulsar://localhost:6650";
     std::string topicName = "persist-topic";