You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/05 06:41:48 UTC
[pulsar] branch master updated: Merge Request for #4808: TYPO in
C++ client producer method for processing failure case,
and add corresponding unit test case. (#4873)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b90b4ea Merge Request for #4808: TYPO in C++ client producer method for processing failure case, and add corresponding unit test case. (#4873)
b90b4ea is described below
commit b90b4ea1d1397e3707e6f51151492feff5a75ba6
Author: Easyfan Zheng <zh...@gmail.com>
AuthorDate: Mon Aug 5 14:41:42 2019 +0800
Merge Request for #4808: TYPO in C++ client producer method for processing failure case, and add corresponding unit test case. (#4873)
Definitely, this is a typo. This method is dealing with the Failed Message with the GIVEN result, but not a CERTAIN result.
Contribution Checklist
#4808 : TYPO in C++ client producer method for processing failure case
Add c++ client producer failure message unit test case.
UT passed:
BatchMessageTest
---
pulsar-client-cpp/include/pulsar/Producer.h | 3 +++
pulsar-client-cpp/lib/Producer.cc | 7 ++++++
pulsar-client-cpp/lib/ProducerImpl.cc | 2 +-
pulsar-client-cpp/lib/ProducerImpl.h | 4 ++++
pulsar-client-cpp/tests/BatchMessageTest.cc | 34 +++++++++++++++++++++++++++++
pulsar-client-cpp/tests/PulsarFriend.h | 4 ++++
6 files changed, 53 insertions(+), 1 deletion(-)
diff --git a/pulsar-client-cpp/include/pulsar/Producer.h b/pulsar-client-cpp/include/pulsar/Producer.h
index c3f25ef..ae55093 100644
--- a/pulsar-client-cpp/include/pulsar/Producer.h
+++ b/pulsar-client-cpp/include/pulsar/Producer.h
@@ -147,6 +147,9 @@ class PULSAR_PUBLIC Producer {
friend class PulsarWrapper;
ProducerImplBasePtr impl_;
+
+ // For unit test case BatchMessageTest::producerFailureResult only
+ void producerFailMessages(Result result);
};
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/Producer.cc b/pulsar-client-cpp/lib/Producer.cc
index 1659a21..e02f1d8 100644
--- a/pulsar-client-cpp/lib/Producer.cc
+++ b/pulsar-client-cpp/lib/Producer.cc
@@ -97,4 +97,11 @@ void Producer::flushAsync(FlushCallback callback) {
impl_->flushAsync(callback);
}
+
+void Producer::producerFailMessages(Result result) {
+ if (impl_) {
+ ProducerImpl* producerImpl = static_cast<ProducerImpl*>(impl_.get());
+ producerImpl->failPendingMessages(result);
+ }
+}
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index bf9e3ac..666df3b 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -256,7 +256,7 @@ void ProducerImpl::failPendingMessages(Result result) {
}
// this function can handle null pointer
- BatchMessageContainer::batchMessageCallBack(ResultTimeout, messageContainerListPtr, NULL);
+ BatchMessageContainer::batchMessageCallBack(result, messageContainerListPtr, NULL);
}
void ProducerImpl::resendMessages(ClientConnectionPtr cnx) {
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index d7ac603..cb2a8a6 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -43,6 +43,8 @@ typedef std::shared_ptr<MessageCrypto> MessageCryptoPtr;
class PulsarFriend;
+class Producer;
+
struct OpSendMsg {
Message msg_;
SendCallback sendCallback_;
@@ -110,6 +112,8 @@ class ProducerImpl : public HandlerBase,
friend class PulsarFriend;
+ friend class Producer;
+
friend class BatchMessageContainer;
virtual void connectionOpened(const ClientConnectionPtr& connection);
diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc b/pulsar-client-cpp/tests/BatchMessageTest.cc
index c1bfe60..62f68cf 100644
--- a/pulsar-client-cpp/tests/BatchMessageTest.cc
+++ b/pulsar-client-cpp/tests/BatchMessageTest.cc
@@ -30,6 +30,8 @@
#include <thread>
#include "LogUtils.h"
#include "PulsarFriend.h"
+#include <unistd.h>
+#include <functional>
#include "ConsumerTest.h"
#include "HttpHelper.h"
DECLARE_LOG_OBJECT();
@@ -55,6 +57,8 @@ static void sendCallBack(Result r, const Message& msg) {
LOG_DEBUG("Received publish acknowledgement for " << msg.getDataAsString());
}
+static void sendFailCallBack(Result r, Result expect_result) { EXPECT_EQ(r, expect_result); }
+
static int globalPublishCountSuccess = 0;
static int globalPublishCountQueueFull = 0;
@@ -914,3 +918,33 @@ TEST(BatchMessageTest, testPartitionedTopics) {
// Number of messages consumed
ASSERT_EQ(i, numOfMessages - globalPublishCountQueueFull);
}
+
+TEST(BatchMessageTest, producerFailureResult) {
+ std::string testName = std::to_string(epochTime) + "testCumulativeAck";
+
+ ClientConfiguration clientConfig;
+ clientConfig.setStatsIntervalInSeconds(100);
+
+ Client client(lookupUrl, clientConfig);
+ std::string topicName = "persistent://public/default/" + testName;
+ std::string subName = "subscription-name";
+ Producer producer;
+
+ int batchSize = 100;
+ int numOfMessages = 10000;
+ ProducerConfiguration conf;
+
+ conf.setCompressionType(CompressionZLib);
+ conf.setBatchingMaxMessages(batchSize);
+ conf.setBatchingEnabled(true);
+ conf.setBatchingMaxPublishDelayMs(50000);
+ conf.setBlockIfQueueFull(false);
+ conf.setMaxPendingMessages(10);
+
+ Result res = Result::ResultBrokerMetadataError;
+
+ client.createProducer(topicName, conf, producer);
+ Message msg = MessageBuilder().setContent("test").build();
+ producer.sendAsync(msg, std::bind(&sendFailCallBack, std::placeholders::_1, res));
+ PulsarFriend::producerFailMessages(producer, res);
+}
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index 95c49f8..a50bd67 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -56,6 +56,10 @@ class PulsarFriend {
return *producerImpl;
}
+ static void producerFailMessages(Producer producer, Result result) {
+ producer.producerFailMessages(result);
+ }
+
static ConsumerImpl& getConsumerImpl(Consumer consumer) {
ConsumerImpl* consumerImpl = static_cast<ConsumerImpl*>(consumer.impl_.get());
return *consumerImpl;