You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/05/31 01:16:59 UTC

[pulsar] branch master updated: Fix flake in C++ negative acknowledgement tests (#7099)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ae324b1  Fix flake in C++ negative acknowledgement tests (#7099)
ae324b1 is described below

commit ae324b1abdd11244df93e220e4fb0733115208aa
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat May 30 18:16:43 2020 -0700

    Fix flake in C++ negative acknowledgement tests (#7099)
    
    Negative acknowledgement runs in the background on a consumer and
    triggers redelivery of messages. The tests verify a that messages do
    indeed get redelivered, and which messages they are, for the base
    case, batching and partitioned consumer.
    
    There's a fundamental dependency on timing in the base case. If 100ms
    pass between consumer creation and receiving the last message in first
    receive loop, redelivery will be triggered and the order of messages,
    as asserted by the test will fail.
    
    This first case can be fixed by moving the negative ack to run after
    all messages have been received. However, this can also then fail for
    the batch case.
    
    If the negative ack tracker kicks off during the loop to negatively
    ack the messages, then the redelivery will happen twice (and possibly
    more times depending on how many time it manages to run).
    
    For this reason, if we want the test to be deterministic, we need to
    disable the tracker from kicking off redelivery while we send mark the
    messages as negatively acked.
    
    Co-authored-by: Ivan Kelly <ik...@splunk.com>
---
 pulsar-client-cpp/lib/ConsumerImpl.cc            |  4 ++++
 pulsar-client-cpp/lib/ConsumerImpl.h             |  5 +++++
 pulsar-client-cpp/lib/ConsumerImplBase.h         |  5 +++++
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc |  7 +++++++
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h  |  3 +++
 pulsar-client-cpp/lib/NegativeAcksTracker.cc     | 16 +++++++++++++--
 pulsar-client-cpp/lib/NegativeAcksTracker.h      |  3 +++
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc |  7 +++++++
 pulsar-client-cpp/lib/PartitionedConsumerImpl.h  |  1 +
 pulsar-client-cpp/tests/BasicEndToEndTest.cc     | 26 ++++++++++++++++++++----
 pulsar-client-cpp/tests/PulsarFriend.h           |  4 ++++
 11 files changed, 75 insertions(+), 6 deletions(-)

diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 6e6b105..a7fd7d7 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -1184,4 +1184,8 @@ void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback
     }
 }
 
+void ConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) {
+    negativeAcksTracker_.setEnabledForTesting(enabled);
+}
+
 } /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index 6d81fd0..ae31cb8 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -150,6 +150,7 @@ class ConsumerImpl : public ConsumerImplBase,
     void statsCallback(Result, ResultCallback, proto::CommandAck_AckType);
     void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
     void failPendingReceiveCallback();
+    virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled);
 
     Optional<MessageId> clearReceiveQueue();
 
@@ -197,6 +198,10 @@ class ConsumerImpl : public ConsumerImplBase,
     }
 
     friend class PulsarFriend;
+
+    // these two declared friend to access setNegativeAcknowledgeEnabledForTesting
+    friend class MultiTopicsConsumerImpl;
+    friend class PartitionedConsumerImpl;
 };
 
 } /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h b/pulsar-client-cpp/lib/ConsumerImplBase.h
index fc15066..1747092 100644
--- a/pulsar-client-cpp/lib/ConsumerImplBase.h
+++ b/pulsar-client-cpp/lib/ConsumerImplBase.h
@@ -55,6 +55,11 @@ class ConsumerImplBase {
     virtual void seekAsync(const MessageId& msgId, ResultCallback callback) = 0;
     virtual void seekAsync(uint64_t timestamp, ResultCallback callback) = 0;
     virtual void negativeAcknowledge(const MessageId& msgId) = 0;
+
+   private:
+    virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled) = 0;
+
+    friend class PulsarFriend;
 };
 }  // namespace pulsar
 #endif  // PULSAR_CONSUMER_IMPL_BASE_HEADER
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 0b9033b..e0cc2c2 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -746,3 +746,10 @@ void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback c
 void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
     callback(ResultOperationNotSupported);
 }
+
+void MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) {
+    Lock lock(mutex_);
+    for (auto&& c : consumers_) {
+        c.second->setNegativeAcknowledgeEnabledForTesting(enabled);
+    }
+}
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index fa271fe..b91cac1 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -134,6 +134,9 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     void handleOneTopicUnsubscribedAsync(Result result, std::shared_ptr<std::atomic<int>> consumerUnsubed,
                                          int numberPartitions, TopicNamePtr topicNamePtr,
                                          std::string& topicPartitionName, ResultCallback callback);
+
+   private:
+    virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled);
 };
 
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/NegativeAcksTracker.cc b/pulsar-client-cpp/lib/NegativeAcksTracker.cc
index 202c5e4..8e501dc 100644
--- a/pulsar-client-cpp/lib/NegativeAcksTracker.cc
+++ b/pulsar-client-cpp/lib/NegativeAcksTracker.cc
@@ -31,7 +31,10 @@ namespace pulsar {
 
 NegativeAcksTracker::NegativeAcksTracker(ClientImplPtr client, ConsumerImpl &consumer,
                                          const ConsumerConfiguration &conf)
-    : consumer_(consumer), timerInterval_(0), executor_(client->getIOExecutorProvider()->get()) {
+    : consumer_(consumer),
+      timerInterval_(0),
+      executor_(client->getIOExecutorProvider()->get()),
+      enabledForTesting_(true) {
     static const long MIN_NACK_DELAY_MILLIS = 100;
 
     nackDelay_ =
@@ -56,7 +59,7 @@ void NegativeAcksTracker::handleTimer(const boost::system::error_code &ec) {
     std::lock_guard<std::mutex> lock(mutex_);
     timer_ = nullptr;
 
-    if (nackedMessages_.empty()) {
+    if (nackedMessages_.empty() || !enabledForTesting_) {
         return;
     }
 
@@ -103,4 +106,13 @@ void NegativeAcksTracker::close() {
     }
 }
 
+void NegativeAcksTracker::setEnabledForTesting(bool enabled) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    enabledForTesting_ = enabled;
+
+    if (enabledForTesting_ && !timer_) {
+        scheduleTimer();
+    }
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/NegativeAcksTracker.h b/pulsar-client-cpp/lib/NegativeAcksTracker.h
index b4ddf7c..1476275 100644
--- a/pulsar-client-cpp/lib/NegativeAcksTracker.h
+++ b/pulsar-client-cpp/lib/NegativeAcksTracker.h
@@ -41,6 +41,8 @@ class NegativeAcksTracker {
 
     void close();
 
+    void setEnabledForTesting(bool enabled);
+
    private:
     void scheduleTimer();
     void handleTimer(const boost::system::error_code &ec);
@@ -55,6 +57,7 @@ class NegativeAcksTracker {
 
     ExecutorServicePtr executor_;
     DeadlineTimerPtr timer_;
+    bool enabledForTesting_;  // to be able to test deterministically
 };
 
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 89fe1b9..2a0ec4e 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -592,4 +592,11 @@ void PartitionedConsumerImpl::handleGetPartitions(Result result,
     runPartitionUpdateTask();
 }
 
+void PartitionedConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) {
+    Lock lock(mutex_);
+    for (auto&& c : consumers_) {
+        c->setNegativeAcknowledgeEnabledForTesting(enabled);
+    }
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
index 02de7bc..7624d62 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
@@ -116,6 +116,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
     void internalListener(Consumer consumer);
     void receiveMessages();
     void failPendingReceiveCallback();
+    virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled);
     Promise<Result, ConsumerImplBaseWeakPtr> partitionedConsumerCreatedPromise_;
     UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
     std::queue<ReceiveCallback> pendingReceives_;
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 8c117d7..f285721 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -2934,6 +2934,16 @@ TEST(BasicEndToEndTest, testPartitionedReceiveAsyncFailedConsumer) {
     client.shutdown();
 }
 
+static void expectTimeoutOnRecv(Consumer &consumer) {
+    Message msg;
+    Result res = consumer.receive(msg, 100);
+    if (res != ResultTimeout) {
+        LOG_ERROR("Received a msg when not expecting to id(" << msg.getMessageId() << ") "
+                                                             << msg.getDataAsString());
+    }
+    ASSERT_EQ(ResultTimeout, res);
+}
+
 void testNegativeAcks(const std::string &topic, bool batchingEnabled) {
     Client client(lookupUrl);
     Consumer consumer;
@@ -2955,14 +2965,24 @@ void testNegativeAcks(const std::string &topic, bool batchingEnabled) {
 
     producer.flush();
 
+    std::vector<MessageId> toNeg;
     for (int i = 0; i < 10; i++) {
         Message msg;
         consumer.receive(msg);
 
         LOG_INFO("Received message " << msg.getDataAsString());
         ASSERT_EQ(msg.getDataAsString(), "test-" + std::to_string(i));
-        consumer.negativeAcknowledge(msg);
+        toNeg.push_back(msg.getMessageId());
     }
+    // No more messages expected
+    expectTimeoutOnRecv(consumer);
+
+    PulsarFriend::setNegativeAckEnabled(consumer, false);
+    // negatively acknowledge all at once
+    for (auto &&msgId : toNeg) {
+        consumer.negativeAcknowledge(msgId);
+    }
+    PulsarFriend::setNegativeAckEnabled(consumer, true);
 
     for (int i = 0; i < 10; i++) {
         Message msg;
@@ -2975,9 +2995,7 @@ void testNegativeAcks(const std::string &topic, bool batchingEnabled) {
     }
 
     // No more messages expected
-    Message msg;
-    Result res = consumer.receive(msg, 100);
-    ASSERT_EQ(ResultTimeout, res);
+    expectTimeoutOnRecv(consumer);
 
     client.shutdown();
 }
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index ea38881..2cd1e7f 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -81,6 +81,10 @@ class PulsarFriend {
         return *consumerImpl;
     }
 
+    static void setNegativeAckEnabled(Consumer consumer, bool enabled) {
+        consumer.impl_->setNegativeAcknowledgeEnabledForTesting(enabled);
+    }
+
     static ClientConnectionWeakPtr getClientConnection(HandlerBase& handler) { return handler.connection_; }
 
     static boost::posix_time::ptime& getFirstBackoffTime(Backoff& backoff) {