You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/11/22 23:08:50 UTC

[pulsar] branch master updated: [Issue 5676][C++ client] Expose redelivery count (#5677)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a00791d  [Issue 5676][C++ client] Expose redelivery count (#5677)
a00791d is described below

commit a00791d5d2d0cb236164c2270e05f94a4305fb1b
Author: Fernando Rejon Barrera <39...@users.noreply.github.com>
AuthorDate: Sat Nov 23 00:08:36 2019 +0100

    [Issue 5676][C++ client] Expose redelivery count (#5677)
    
    Fixes #5676
    
    ### Motivation
    
    
    Expose the redelivery count in the C++ client.
    
    ### Modifications
    
    Exposed the redelivery count from the broker in `Message` and `MessageImpl`. Set the counter when receiving messages. Added test.
---
 pulsar-client-cpp/include/pulsar/Message.h      |  5 +++
 pulsar-client-cpp/include/pulsar/c/message.h    |  2 ++
 pulsar-client-cpp/lib/ConsumerImpl.cc           |  6 ++--
 pulsar-client-cpp/lib/ConsumerImpl.h            |  3 +-
 pulsar-client-cpp/lib/Message.cc                |  7 ++++
 pulsar-client-cpp/lib/MessageImpl.cc            |  6 +++-
 pulsar-client-cpp/lib/MessageImpl.h             |  4 +++
 pulsar-client-cpp/lib/c/c_Message.cc            |  4 +++
 pulsar-client-cpp/test-conf/standalone-ssl.conf |  2 ++
 pulsar-client-cpp/test-conf/standalone.conf     |  2 ++
 pulsar-client-cpp/tests/BasicEndToEndTest.cc    | 47 +++++++++++++++++++++++++
 11 files changed, 84 insertions(+), 4 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/Message.h b/pulsar-client-cpp/include/pulsar/Message.h
index 1e76d49..7c36cea 100644
--- a/pulsar-client-cpp/include/pulsar/Message.h
+++ b/pulsar-client-cpp/include/pulsar/Message.h
@@ -141,6 +141,11 @@ class PULSAR_PUBLIC Message {
      */
     const std::string& getTopicName() const;
 
+    /**
+     * Get the redelivery count for this message
+     */
+    const int getRedeliveryCount() const;
+
     bool operator==(const Message& msg) const;
 
    private:
diff --git a/pulsar-client-cpp/include/pulsar/c/message.h b/pulsar-client-cpp/include/pulsar/c/message.h
index a2d6ea5..dcb9152 100644
--- a/pulsar-client-cpp/include/pulsar/c/message.h
+++ b/pulsar-client-cpp/include/pulsar/c/message.h
@@ -198,6 +198,8 @@ PULSAR_PUBLIC uint64_t pulsar_message_get_event_timestamp(pulsar_message_t *mess
 
 PULSAR_PUBLIC const char *pulsar_message_get_topic_name(pulsar_message_t *message);
 
+PULSAR_PUBLIC int pulsar_message_get_redelivery_count(pulsar_message_t *message);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 94eda12..9a7e506 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -283,6 +283,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
     Message m(msg, metadata, payload, partitionIndex_);
     m.impl_->cnx_ = cnx.get();
     m.impl_->setTopicName(topic_);
+    m.impl_->setRedeliveryCount(msg.redelivery_count());
 
     LOG_DEBUG(getName() << " metadata.num_messages_in_batch() = " << metadata.num_messages_in_batch());
     LOG_DEBUG(getName() << " metadata.has_num_messages_in_batch() = "
@@ -291,7 +292,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
     unsigned int numOfMessageReceived = 1;
     if (metadata.has_num_messages_in_batch()) {
         Lock lock(mutex_);
-        numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m);
+        numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m, msg.redelivery_count());
     } else {
         Lock lock(pendingReceiveMutex_);
         // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
@@ -358,7 +359,7 @@ void ConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg,
 
 // Zero Queue size is not supported with Batch Messages
 uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx,
-                                                          Message& batchedMessage) {
+                                                          Message& batchedMessage, int redeliveryCount) {
     unsigned int batchSize = batchedMessage.impl_->metadata.num_messages_in_batch();
     batchAcknowledgementTracker_.receivedMessage(batchedMessage);
     LOG_DEBUG("Received Batch messages of size - " << batchSize
@@ -369,6 +370,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
     for (int i = 0; i < batchSize; i++) {
         // This is a cheap copy since message contains only one shared pointer (impl_)
         Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i);
+        msg.impl_->setRedeliveryCount(redeliveryCount);
 
         if (startMessageId_.is_present()) {
             const MessageId& msgId = msg.getMessageId();
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index b3ba218..8a25b49 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -136,7 +136,8 @@ class ConsumerImpl : public ConsumerImplBase,
                                  proto::CommandAck::ValidationError validationError);
     void increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int numberOfPermits = 1);
     void drainIncomingMessageQueue(size_t count);
-    uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage);
+    uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage,
+                                                int redeliveryCount);
     void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl, BrokerConsumerStatsCallback);
 
     bool decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
diff --git a/pulsar-client-cpp/lib/Message.cc b/pulsar-client-cpp/lib/Message.cc
index 1f4e6f7..4d1af95 100644
--- a/pulsar-client-cpp/lib/Message.cc
+++ b/pulsar-client-cpp/lib/Message.cc
@@ -131,6 +131,13 @@ const std::string& Message::getTopicName() const {
     return impl_->getTopicName();
 }
 
+const int Message::getRedeliveryCount() const {
+    if (!impl_) {
+        return 0;
+    }
+    return impl_->getRedeliveryCount();
+}
+
 uint64_t Message::getPublishTimestamp() const { return impl_ ? impl_->getPublishTimestamp() : 0ull; }
 
 uint64_t Message::getEventTimestamp() const { return impl_ ? impl_->getEventTimestamp() : 0ull; }
diff --git a/pulsar-client-cpp/lib/MessageImpl.cc b/pulsar-client-cpp/lib/MessageImpl.cc
index 60ce86b..b41e7ae 100644
--- a/pulsar-client-cpp/lib/MessageImpl.cc
+++ b/pulsar-client-cpp/lib/MessageImpl.cc
@@ -20,7 +20,7 @@
 
 namespace pulsar {
 
-MessageImpl::MessageImpl() : metadata(), payload(), messageId(), cnx_(0), topicName_() {}
+MessageImpl::MessageImpl() : metadata(), payload(), messageId(), cnx_(0), topicName_(), redeliveryCount_() {}
 
 const Message::StringMap& MessageImpl::properties() {
     if (properties_.size() == 0) {
@@ -92,4 +92,8 @@ void MessageImpl::setTopicName(const std::string& topicName) {
 
 const std::string& MessageImpl::getTopicName() { return *topicName_; }
 
+int MessageImpl::getRedeliveryCount() { return redeliveryCount_; }
+
+void MessageImpl::setRedeliveryCount(int count) { redeliveryCount_ = count; }
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MessageImpl.h b/pulsar-client-cpp/lib/MessageImpl.h
index 9fd4a4d..ff2ac97 100644
--- a/pulsar-client-cpp/lib/MessageImpl.h
+++ b/pulsar-client-cpp/lib/MessageImpl.h
@@ -42,6 +42,7 @@ class MessageImpl {
     MessageId messageId;
     ClientConnection* cnx_;
     const std::string* topicName_;
+    int redeliveryCount_;
 
     const std::string& getPartitionKey() const;
     bool hasPartitionKey() const;
@@ -62,6 +63,9 @@ class MessageImpl {
      */
     void setTopicName(const std::string& topicName);
 
+    int getRedeliveryCount();
+    void setRedeliveryCount(int count);
+
     friend class PulsarWrapper;
     friend class MessageBuilder;
 
diff --git a/pulsar-client-cpp/lib/c/c_Message.cc b/pulsar-client-cpp/lib/c/c_Message.cc
index 29eab96..4fe4c39 100644
--- a/pulsar-client-cpp/lib/c/c_Message.cc
+++ b/pulsar-client-cpp/lib/c/c_Message.cc
@@ -122,3 +122,7 @@ pulsar_string_map_t *pulsar_message_get_properties(pulsar_message_t *message) {
 const char *pulsar_message_get_topic_name(pulsar_message_t *message) {
     return message->message.getTopicName().c_str();
 }
+
+int pulsar_message_get_redelivery_count(pulsar_message_t *message) {
+    return message->message.getRedeliveryCount();
+}
diff --git a/pulsar-client-cpp/test-conf/standalone-ssl.conf b/pulsar-client-cpp/test-conf/standalone-ssl.conf
index 6ab4406..393cde2 100644
--- a/pulsar-client-cpp/test-conf/standalone-ssl.conf
+++ b/pulsar-client-cpp/test-conf/standalone-ssl.conf
@@ -83,6 +83,8 @@ statusFilePath=/usr/local/apache/htdocs
 # Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
 maxUnackedMessagesPerConsumer=50000
 
+subscriptionRedeliveryTrackerEnabled=true
+
 ### --- Authentication --- ###
 
 # Enable TLS
diff --git a/pulsar-client-cpp/test-conf/standalone.conf b/pulsar-client-cpp/test-conf/standalone.conf
index 2de6a37..fee4146 100644
--- a/pulsar-client-cpp/test-conf/standalone.conf
+++ b/pulsar-client-cpp/test-conf/standalone.conf
@@ -78,6 +78,8 @@ statusFilePath=/usr/local/apache/htdocs
 # Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
 maxUnackedMessagesPerConsumer=50000
 
+subscriptionRedeliveryTrackerEnabled=true
+
 ### --- Authentication --- ###
 
 # Enable authentication
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 8f5bb2d..54077ea 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -260,6 +260,53 @@ TEST(BasicEndToEndTest, testProduceConsume) {
     ASSERT_EQ(ResultOk, client.close());
 }
 
+TEST(BasicEndToEndTest, testRedeliveryCount) {
+    ClientConfiguration config;
+    Client client(lookupUrl, config);
+    std::string topicName = "persistent://public/default/test-redelivery-count";
+    std::string subName = "my-sub-name";
+
+    Producer producer;
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    Promise<Result, Consumer> consumerPromise;
+    ConsumerConfiguration consumerConf;
+    consumerConf.setNegativeAckRedeliveryDelayMs(500);
+    consumerConf.setConsumerType(ConsumerShared);
+    client.subscribeAsync(topicName, subName, consumerConf, WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+    std::string temp = producer.getTopic();
+    ASSERT_EQ(temp, topicName);
+    temp = consumer.getTopic();
+    ASSERT_EQ(temp, topicName);
+    ASSERT_EQ(consumer.getSubscriptionName(), subName);
+
+    std::string content = "msg-content";
+    Message msg = MessageBuilder().setContent(content).build();
+    producer.send(msg);
+
+    int redeliveryCount = 0;
+    Message msgReceived;
+    for (int i = 0; i < 4; i++) {
+        consumer.receive(msgReceived);
+        LOG_INFO("Received message " << msgReceived.getDataAsString());
+        consumer.negativeAcknowledge(msgReceived);
+        redeliveryCount = msgReceived.getRedeliveryCount();
+    }
+
+    ASSERT_EQ(3, redeliveryCount);
+    consumer.acknowledge(msgReceived);
+    consumer.close();
+    producer.close();
+}
+
 TEST(BasicEndToEndTest, testLookupThrottling) {
     std::string topicName = "testLookupThrottling";
     ClientConfiguration config;