You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/11/22 23:08:50 UTC
[pulsar] branch master updated: [Issue 5676][C++ client] Expose
redelivery count (#5677)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a00791d [Issue 5676][C++ client] Expose redelivery count (#5677)
a00791d is described below
commit a00791d5d2d0cb236164c2270e05f94a4305fb1b
Author: Fernando Rejon Barrera <39...@users.noreply.github.com>
AuthorDate: Sat Nov 23 00:08:36 2019 +0100
[Issue 5676][C++ client] Expose redelivery count (#5677)
Fixes #5676
### Motivation
Expose the redelivery count in the C++ client.
### Modifications
Exposed the redelivery count from the broker in `Message` and `MessageImpl`. Set the counter when receiving messages. Added test.
---
pulsar-client-cpp/include/pulsar/Message.h | 5 +++
pulsar-client-cpp/include/pulsar/c/message.h | 2 ++
pulsar-client-cpp/lib/ConsumerImpl.cc | 6 ++--
pulsar-client-cpp/lib/ConsumerImpl.h | 3 +-
pulsar-client-cpp/lib/Message.cc | 7 ++++
pulsar-client-cpp/lib/MessageImpl.cc | 6 +++-
pulsar-client-cpp/lib/MessageImpl.h | 4 +++
pulsar-client-cpp/lib/c/c_Message.cc | 4 +++
pulsar-client-cpp/test-conf/standalone-ssl.conf | 2 ++
pulsar-client-cpp/test-conf/standalone.conf | 2 ++
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 47 +++++++++++++++++++++++++
11 files changed, 84 insertions(+), 4 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/Message.h b/pulsar-client-cpp/include/pulsar/Message.h
index 1e76d49..7c36cea 100644
--- a/pulsar-client-cpp/include/pulsar/Message.h
+++ b/pulsar-client-cpp/include/pulsar/Message.h
@@ -141,6 +141,11 @@ class PULSAR_PUBLIC Message {
*/
const std::string& getTopicName() const;
+ /**
+ * Get the redelivery count for this message
+ */
+ const int getRedeliveryCount() const;
+
bool operator==(const Message& msg) const;
private:
diff --git a/pulsar-client-cpp/include/pulsar/c/message.h b/pulsar-client-cpp/include/pulsar/c/message.h
index a2d6ea5..dcb9152 100644
--- a/pulsar-client-cpp/include/pulsar/c/message.h
+++ b/pulsar-client-cpp/include/pulsar/c/message.h
@@ -198,6 +198,8 @@ PULSAR_PUBLIC uint64_t pulsar_message_get_event_timestamp(pulsar_message_t *mess
PULSAR_PUBLIC const char *pulsar_message_get_topic_name(pulsar_message_t *message);
+PULSAR_PUBLIC int pulsar_message_get_redelivery_count(pulsar_message_t *message);
+
#ifdef __cplusplus
}
#endif
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 94eda12..9a7e506 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -283,6 +283,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
Message m(msg, metadata, payload, partitionIndex_);
m.impl_->cnx_ = cnx.get();
m.impl_->setTopicName(topic_);
+ m.impl_->setRedeliveryCount(msg.redelivery_count());
LOG_DEBUG(getName() << " metadata.num_messages_in_batch() = " << metadata.num_messages_in_batch());
LOG_DEBUG(getName() << " metadata.has_num_messages_in_batch() = "
@@ -291,7 +292,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
unsigned int numOfMessageReceived = 1;
if (metadata.has_num_messages_in_batch()) {
Lock lock(mutex_);
- numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m);
+ numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m, msg.redelivery_count());
} else {
Lock lock(pendingReceiveMutex_);
// if asyncReceive is waiting then notify callback without adding to incomingMessages queue
@@ -358,7 +359,7 @@ void ConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg,
// Zero Queue size is not supported with Batch Messages
uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx,
- Message& batchedMessage) {
+ Message& batchedMessage, int redeliveryCount) {
unsigned int batchSize = batchedMessage.impl_->metadata.num_messages_in_batch();
batchAcknowledgementTracker_.receivedMessage(batchedMessage);
LOG_DEBUG("Received Batch messages of size - " << batchSize
@@ -369,6 +370,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
for (int i = 0; i < batchSize; i++) {
// This is a cheap copy since message contains only one shared pointer (impl_)
Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i);
+ msg.impl_->setRedeliveryCount(redeliveryCount);
if (startMessageId_.is_present()) {
const MessageId& msgId = msg.getMessageId();
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index b3ba218..8a25b49 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -136,7 +136,8 @@ class ConsumerImpl : public ConsumerImplBase,
proto::CommandAck::ValidationError validationError);
void increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int numberOfPermits = 1);
void drainIncomingMessageQueue(size_t count);
- uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage);
+ uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage,
+ int redeliveryCount);
void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl, BrokerConsumerStatsCallback);
bool decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
diff --git a/pulsar-client-cpp/lib/Message.cc b/pulsar-client-cpp/lib/Message.cc
index 1f4e6f7..4d1af95 100644
--- a/pulsar-client-cpp/lib/Message.cc
+++ b/pulsar-client-cpp/lib/Message.cc
@@ -131,6 +131,13 @@ const std::string& Message::getTopicName() const {
return impl_->getTopicName();
}
+const int Message::getRedeliveryCount() const {
+ if (!impl_) {
+ return 0;
+ }
+ return impl_->getRedeliveryCount();
+}
+
uint64_t Message::getPublishTimestamp() const { return impl_ ? impl_->getPublishTimestamp() : 0ull; }
uint64_t Message::getEventTimestamp() const { return impl_ ? impl_->getEventTimestamp() : 0ull; }
diff --git a/pulsar-client-cpp/lib/MessageImpl.cc b/pulsar-client-cpp/lib/MessageImpl.cc
index 60ce86b..b41e7ae 100644
--- a/pulsar-client-cpp/lib/MessageImpl.cc
+++ b/pulsar-client-cpp/lib/MessageImpl.cc
@@ -20,7 +20,7 @@
namespace pulsar {
-MessageImpl::MessageImpl() : metadata(), payload(), messageId(), cnx_(0), topicName_() {}
+MessageImpl::MessageImpl() : metadata(), payload(), messageId(), cnx_(0), topicName_(), redeliveryCount_() {}
const Message::StringMap& MessageImpl::properties() {
if (properties_.size() == 0) {
@@ -92,4 +92,8 @@ void MessageImpl::setTopicName(const std::string& topicName) {
const std::string& MessageImpl::getTopicName() { return *topicName_; }
+int MessageImpl::getRedeliveryCount() { return redeliveryCount_; }
+
+void MessageImpl::setRedeliveryCount(int count) { redeliveryCount_ = count; }
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MessageImpl.h b/pulsar-client-cpp/lib/MessageImpl.h
index 9fd4a4d..ff2ac97 100644
--- a/pulsar-client-cpp/lib/MessageImpl.h
+++ b/pulsar-client-cpp/lib/MessageImpl.h
@@ -42,6 +42,7 @@ class MessageImpl {
MessageId messageId;
ClientConnection* cnx_;
const std::string* topicName_;
+ int redeliveryCount_;
const std::string& getPartitionKey() const;
bool hasPartitionKey() const;
@@ -62,6 +63,9 @@ class MessageImpl {
*/
void setTopicName(const std::string& topicName);
+ int getRedeliveryCount();
+ void setRedeliveryCount(int count);
+
friend class PulsarWrapper;
friend class MessageBuilder;
diff --git a/pulsar-client-cpp/lib/c/c_Message.cc b/pulsar-client-cpp/lib/c/c_Message.cc
index 29eab96..4fe4c39 100644
--- a/pulsar-client-cpp/lib/c/c_Message.cc
+++ b/pulsar-client-cpp/lib/c/c_Message.cc
@@ -122,3 +122,7 @@ pulsar_string_map_t *pulsar_message_get_properties(pulsar_message_t *message) {
const char *pulsar_message_get_topic_name(pulsar_message_t *message) {
return message->message.getTopicName().c_str();
}
+
+int pulsar_message_get_redelivery_count(pulsar_message_t *message) {
+ return message->message.getRedeliveryCount();
+}
diff --git a/pulsar-client-cpp/test-conf/standalone-ssl.conf b/pulsar-client-cpp/test-conf/standalone-ssl.conf
index 6ab4406..393cde2 100644
--- a/pulsar-client-cpp/test-conf/standalone-ssl.conf
+++ b/pulsar-client-cpp/test-conf/standalone-ssl.conf
@@ -83,6 +83,8 @@ statusFilePath=/usr/local/apache/htdocs
# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
maxUnackedMessagesPerConsumer=50000
+subscriptionRedeliveryTrackerEnabled=true
+
### --- Authentication --- ###
# Enable TLS
diff --git a/pulsar-client-cpp/test-conf/standalone.conf b/pulsar-client-cpp/test-conf/standalone.conf
index 2de6a37..fee4146 100644
--- a/pulsar-client-cpp/test-conf/standalone.conf
+++ b/pulsar-client-cpp/test-conf/standalone.conf
@@ -78,6 +78,8 @@ statusFilePath=/usr/local/apache/htdocs
# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
maxUnackedMessagesPerConsumer=50000
+subscriptionRedeliveryTrackerEnabled=true
+
### --- Authentication --- ###
# Enable authentication
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 8f5bb2d..54077ea 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -260,6 +260,53 @@ TEST(BasicEndToEndTest, testProduceConsume) {
ASSERT_EQ(ResultOk, client.close());
}
+TEST(BasicEndToEndTest, testRedeliveryCount) {
+ ClientConfiguration config;
+ Client client(lookupUrl, config);
+ std::string topicName = "persistent://public/default/test-redelivery-count";
+ std::string subName = "my-sub-name";
+
+ Producer producer;
+ Promise<Result, Producer> producerPromise;
+ client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+ Future<Result, Producer> producerFuture = producerPromise.getFuture();
+ Result result = producerFuture.get(producer);
+ ASSERT_EQ(ResultOk, result);
+
+ Consumer consumer;
+ Promise<Result, Consumer> consumerPromise;
+ ConsumerConfiguration consumerConf;
+ consumerConf.setNegativeAckRedeliveryDelayMs(500);
+ consumerConf.setConsumerType(ConsumerShared);
+ client.subscribeAsync(topicName, subName, consumerConf, WaitForCallbackValue<Consumer>(consumerPromise));
+ Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+ result = consumerFuture.get(consumer);
+ ASSERT_EQ(ResultOk, result);
+ std::string temp = producer.getTopic();
+ ASSERT_EQ(temp, topicName);
+ temp = consumer.getTopic();
+ ASSERT_EQ(temp, topicName);
+ ASSERT_EQ(consumer.getSubscriptionName(), subName);
+
+ std::string content = "msg-content";
+ Message msg = MessageBuilder().setContent(content).build();
+ producer.send(msg);
+
+ int redeliveryCount = 0;
+ Message msgReceived;
+ for (int i = 0; i < 4; i++) {
+ consumer.receive(msgReceived);
+ LOG_INFO("Received message " << msgReceived.getDataAsString());
+ consumer.negativeAcknowledge(msgReceived);
+ redeliveryCount = msgReceived.getRedeliveryCount();
+ }
+
+ ASSERT_EQ(3, redeliveryCount);
+ consumer.acknowledge(msgReceived);
+ consumer.close();
+ producer.close();
+}
+
TEST(BasicEndToEndTest, testLookupThrottling) {
std::string topicName = "testLookupThrottling";
ClientConfiguration config;