You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/05 06:41:48 UTC

[pulsar] branch master updated: Merge Request for #4808: TYPO in C++ client producer method for processing failure case, and add corresponding unit test case. (#4873)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b90b4ea  Merge Request for #4808: TYPO in C++ client producer method for processing failure case, and add corresponding unit test case. (#4873)
b90b4ea is described below

commit b90b4ea1d1397e3707e6f51151492feff5a75ba6
Author: Easyfan Zheng <zh...@gmail.com>
AuthorDate: Mon Aug 5 14:41:42 2019 +0800

    Merge Request for #4808: TYPO in C++ client producer method for processing failure case, and add corresponding unit test case. (#4873)
    
    Definitely, this is a typo. This method is dealing with the Failed Message with the GIVEN result, but not a CERTAIN result.
    
    Contribution Checklist
    #4808 : TYPO in C++ client producer method for processing failure case
    Add c++ client producer failure message unit test case.
    
    UT passed:
    
    BatchMessageTest
---
 pulsar-client-cpp/include/pulsar/Producer.h |  3 +++
 pulsar-client-cpp/lib/Producer.cc           |  7 ++++++
 pulsar-client-cpp/lib/ProducerImpl.cc       |  2 +-
 pulsar-client-cpp/lib/ProducerImpl.h        |  4 ++++
 pulsar-client-cpp/tests/BatchMessageTest.cc | 34 +++++++++++++++++++++++++++++
 pulsar-client-cpp/tests/PulsarFriend.h      |  4 ++++
 6 files changed, 53 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-cpp/include/pulsar/Producer.h b/pulsar-client-cpp/include/pulsar/Producer.h
index c3f25ef..ae55093 100644
--- a/pulsar-client-cpp/include/pulsar/Producer.h
+++ b/pulsar-client-cpp/include/pulsar/Producer.h
@@ -147,6 +147,9 @@ class PULSAR_PUBLIC Producer {
     friend class PulsarWrapper;
 
     ProducerImplBasePtr impl_;
+
+    // For unit test case BatchMessageTest::producerFailureResult only
+    void producerFailMessages(Result result);
 };
 }  // namespace pulsar
 
diff --git a/pulsar-client-cpp/lib/Producer.cc b/pulsar-client-cpp/lib/Producer.cc
index 1659a21..e02f1d8 100644
--- a/pulsar-client-cpp/lib/Producer.cc
+++ b/pulsar-client-cpp/lib/Producer.cc
@@ -97,4 +97,11 @@ void Producer::flushAsync(FlushCallback callback) {
 
     impl_->flushAsync(callback);
 }
+
+void Producer::producerFailMessages(Result result) {
+    if (impl_) {
+        ProducerImpl* producerImpl = static_cast<ProducerImpl*>(impl_.get());
+        producerImpl->failPendingMessages(result);
+    }
+}
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index bf9e3ac..666df3b 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -256,7 +256,7 @@ void ProducerImpl::failPendingMessages(Result result) {
     }
 
     // this function can handle null pointer
-    BatchMessageContainer::batchMessageCallBack(ResultTimeout, messageContainerListPtr, NULL);
+    BatchMessageContainer::batchMessageCallBack(result, messageContainerListPtr, NULL);
 }
 
 void ProducerImpl::resendMessages(ClientConnectionPtr cnx) {
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index d7ac603..cb2a8a6 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -43,6 +43,8 @@ typedef std::shared_ptr<MessageCrypto> MessageCryptoPtr;
 
 class PulsarFriend;
 
+class Producer;
+
 struct OpSendMsg {
     Message msg_;
     SendCallback sendCallback_;
@@ -110,6 +112,8 @@ class ProducerImpl : public HandlerBase,
 
     friend class PulsarFriend;
 
+    friend class Producer;
+
     friend class BatchMessageContainer;
 
     virtual void connectionOpened(const ClientConnectionPtr& connection);
diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc b/pulsar-client-cpp/tests/BatchMessageTest.cc
index c1bfe60..62f68cf 100644
--- a/pulsar-client-cpp/tests/BatchMessageTest.cc
+++ b/pulsar-client-cpp/tests/BatchMessageTest.cc
@@ -30,6 +30,8 @@
 #include <thread>
 #include "LogUtils.h"
 #include "PulsarFriend.h"
+#include <unistd.h>
+#include <functional>
 #include "ConsumerTest.h"
 #include "HttpHelper.h"
 DECLARE_LOG_OBJECT();
@@ -55,6 +57,8 @@ static void sendCallBack(Result r, const Message& msg) {
     LOG_DEBUG("Received publish acknowledgement for " << msg.getDataAsString());
 }
 
+static void sendFailCallBack(Result r, Result expect_result) { EXPECT_EQ(r, expect_result); }
+
 static int globalPublishCountSuccess = 0;
 static int globalPublishCountQueueFull = 0;
 
@@ -914,3 +918,33 @@ TEST(BatchMessageTest, testPartitionedTopics) {
     // Number of messages consumed
     ASSERT_EQ(i, numOfMessages - globalPublishCountQueueFull);
 }
+
+TEST(BatchMessageTest, producerFailureResult) {
+    std::string testName = std::to_string(epochTime) + "testCumulativeAck";
+
+    ClientConfiguration clientConfig;
+    clientConfig.setStatsIntervalInSeconds(100);
+
+    Client client(lookupUrl, clientConfig);
+    std::string topicName = "persistent://public/default/" + testName;
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    int batchSize = 100;
+    int numOfMessages = 10000;
+    ProducerConfiguration conf;
+
+    conf.setCompressionType(CompressionZLib);
+    conf.setBatchingMaxMessages(batchSize);
+    conf.setBatchingEnabled(true);
+    conf.setBatchingMaxPublishDelayMs(50000);
+    conf.setBlockIfQueueFull(false);
+    conf.setMaxPendingMessages(10);
+
+    Result res = Result::ResultBrokerMetadataError;
+
+    client.createProducer(topicName, conf, producer);
+    Message msg = MessageBuilder().setContent("test").build();
+    producer.sendAsync(msg, std::bind(&sendFailCallBack, std::placeholders::_1, res));
+    PulsarFriend::producerFailMessages(producer, res);
+}
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index 95c49f8..a50bd67 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -56,6 +56,10 @@ class PulsarFriend {
         return *producerImpl;
     }
 
+    static void producerFailMessages(Producer producer, Result result) {
+        producer.producerFailMessages(result);
+    }
+
     static ConsumerImpl& getConsumerImpl(Consumer consumer) {
         ConsumerImpl* consumerImpl = static_cast<ConsumerImpl*>(consumer.impl_.get());
         return *consumerImpl;