You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/04/02 05:21:11 UTC

[pulsar] branch branch-2.10 updated: [C++] Fix flaky tests about reference count (#14854)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 6898fc8  [C++] Fix flaky tests about reference count (#14854)
6898fc8 is described below

commit 6898fc843123a1bd50d21f5b5b10c1218e8330a9
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri Apr 1 19:09:14 2022 +0800

    [C++] Fix flaky tests about reference count (#14854)
    
    Fixes #14848
    Fixes #14719
    
    ### Motivation
    
    #7793 introduced a `testReferenceLeak` to avoid cyclic referenece of the
    reader. However, it adds a unused field `readerImplWeakPtr_` only for
    tests. The access to this field is not thread safe that the write
    operation happens in `handleConsumerCreated` while the read operation
    can happen anywhere via the getter. So there is a little chance that
    `readerPtr` in `testReferenceLeak` doesn't point to the right object.
    
    In addition, we should only guarantee the reference count becomes 0
    after the producer, consumer or reader goes out of its scope. #14797
    adds a `ClientTest.testReferenceCount` but it's also flaky. It's caused
    by the shared pointer of `ProducerImpl` is published to another thread
    via `shared_from_this()` but the test has a strong expectation that the
    reference count is exactly 1.
    
    ### Modifications
    
    - Remove `readerImplWeakPtr_` from `ReaderImpl` and get the weak pointer
      from `Reader` directly by adding a method to `PulsarFriend`.
    - Add the check of reader's reference count to `testReferenceCount` and
      remove the redundant `testReferenceLeak`.
    - Instead of asserting the reference count of producer/consumer/reader
      is 1, just assume the it's greater than 0.
    
    (cherry picked from commit f84ff571df95f99efa4596e65324def1084fc11b)
---
 pulsar-client-cpp/lib/ReaderImpl.cc    |  5 ----
 pulsar-client-cpp/lib/ReaderImpl.h     |  5 +---
 pulsar-client-cpp/tests/ClientTest.cc  | 26 +++++++++++++++--
 pulsar-client-cpp/tests/PulsarFriend.h |  7 +++++
 pulsar-client-cpp/tests/ReaderTest.cc  | 51 +++-------------------------------
 pulsar-client-cpp/tests/ReaderTest.h   | 32 ---------------------
 6 files changed, 35 insertions(+), 91 deletions(-)

diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index 0a7b321..9401c12 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -90,11 +90,8 @@ const std::string& ReaderImpl::getTopic() const { return consumer_->getTopic();
 void ReaderImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer) {
     auto self = shared_from_this();
     readerCreatedCallback_(result, Reader(self));
-    readerImplWeakPtr_ = self;
 }
 
-ConsumerImplPtr ReaderImpl::getConsumer() { return consumer_; }
-
 Result ReaderImpl::readNext(Message& msg) {
     Result res = consumer_->receive(msg);
     acknowledgeIfNecessary(res, msg);
@@ -144,8 +141,6 @@ void ReaderImpl::getLastMessageIdAsync(GetLastMessageIdCallback callback) {
     });
 }
 
-ReaderImplWeakPtr ReaderImpl::getReaderImplWeakPtr() { return readerImplWeakPtr_; }
-
 bool ReaderImpl::isConnected() const { return consumer_->isConnected(); }
 
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ReaderImpl.h b/pulsar-client-cpp/lib/ReaderImpl.h
index a546ae8..6de6c02 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.h
+++ b/pulsar-client-cpp/lib/ReaderImpl.h
@@ -53,7 +53,7 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
 
     Future<Result, ReaderImplWeakPtr> getReaderCreatedFuture();
 
-    ConsumerImplPtr getConsumer();
+    ConsumerImplBaseWeakPtr getConsumer() const noexcept { return consumer_; }
 
     void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
 
@@ -62,8 +62,6 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
 
     void getLastMessageIdAsync(GetLastMessageIdCallback callback);
 
-    ReaderImplWeakPtr getReaderImplWeakPtr();
-
     bool isConnected() const;
 
    private:
@@ -79,7 +77,6 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
     ConsumerImplPtr consumer_;
     ReaderCallback readerCreatedCallback_;
     ReaderListener readerListener_;
-    ReaderImplWeakPtr readerImplWeakPtr_;
 };
 }  // namespace pulsar
 
diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index eb94ba6..3281416 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -24,6 +24,9 @@
 #include <future>
 #include <pulsar/Client.h>
 #include "../lib/checksum/ChecksumProvider.h"
+#include "lib/LogUtils.h"
+
+DECLARE_LOG_OBJECT()
 
 using namespace pulsar;
 
@@ -197,22 +200,39 @@ TEST(ClientTest, testReferenceCount) {
 
     auto &producers = PulsarFriend::getProducers(client);
     auto &consumers = PulsarFriend::getConsumers(client);
+    ReaderImplWeakPtr readerWeakPtr;
 
     {
         Producer producer;
         ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
         ASSERT_EQ(producers.size(), 1);
-        ASSERT_EQ(producers[0].use_count(), 1);
+        ASSERT_TRUE(producers[0].use_count() > 0);
+        LOG_INFO("Reference count of the producer: " << producers[0].use_count());
 
         Consumer consumer;
         ASSERT_EQ(ResultOk, client.subscribe(topic, "my-sub", consumer));
         ASSERT_EQ(consumers.size(), 1);
-        ASSERT_EQ(consumers[0].use_count(), 1);
+        ASSERT_TRUE(consumers[0].use_count() > 0);
+        LOG_INFO("Reference count of the consumer: " << consumers[0].use_count());
+
+        ReaderConfiguration readerConf;
+        Reader reader;
+        ASSERT_EQ(ResultOk,
+                  client.createReader(topic + "-reader", MessageId::earliest(), readerConf, reader));
+        ASSERT_EQ(consumers.size(), 2);
+        ASSERT_TRUE(consumers[1].use_count() > 0);
+        LOG_INFO("Reference count of the reader's underlying consumer: " << consumers[1].use_count());
+
+        readerWeakPtr = PulsarFriend::getReaderImplWeakPtr(reader);
+        ASSERT_EQ(readerWeakPtr.use_count(), 1);
+        LOG_INFO("Reference count of the reader: " << readerWeakPtr.use_count());
     }
 
     ASSERT_EQ(producers.size(), 1);
     ASSERT_EQ(producers[0].use_count(), 0);
-    ASSERT_EQ(consumers.size(), 1);
+    ASSERT_EQ(consumers.size(), 2);
     ASSERT_EQ(consumers[0].use_count(), 0);
+    ASSERT_EQ(consumers[1].use_count(), 0);
+    ASSERT_EQ(readerWeakPtr.use_count(), 0);
     client.close();
 }
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index b04899c..a12f058 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -25,6 +25,7 @@
 #include "lib/ConsumerImpl.h"
 #include "lib/PartitionedConsumerImpl.h"
 #include "lib/MultiTopicsConsumerImpl.h"
+#include "lib/ReaderImpl.h"
 
 using std::string;
 
@@ -79,6 +80,12 @@ class PulsarFriend {
         return std::static_pointer_cast<ConsumerImpl>(consumer.impl_);
     }
 
+    static ConsumerImplPtr getConsumer(Reader reader) {
+        return std::static_pointer_cast<ConsumerImpl>(reader.impl_->getConsumer().lock());
+    }
+
+    static ReaderImplWeakPtr getReaderImplWeakPtr(Reader reader) { return reader.impl_; }
+
     static decltype(ConsumerImpl::chunkedMessageCache_) & getChunkedMessageCache(Consumer consumer) {
         auto consumerImpl = getConsumerImplPtr(consumer);
         ConsumerImpl::Lock lock(consumerImpl->chunkProcessMutex_);
diff --git a/pulsar-client-cpp/tests/ReaderTest.cc b/pulsar-client-cpp/tests/ReaderTest.cc
index 8cd535c..bf15692 100644
--- a/pulsar-client-cpp/tests/ReaderTest.cc
+++ b/pulsar-client-cpp/tests/ReaderTest.cc
@@ -18,8 +18,8 @@
  */
 #include <pulsar/Client.h>
 #include <pulsar/Reader.h>
-#include "ReaderTest.h"
 #include "HttpHelper.h"
+#include "PulsarFriend.h"
 
 #include <gtest/gtest.h>
 
@@ -28,6 +28,7 @@
 
 #include <lib/Latch.h>
 #include <lib/LogUtils.h>
+#include <lib/ReaderImpl.h>
 DECLARE_LOG_OBJECT()
 
 using namespace pulsar;
@@ -423,50 +424,6 @@ TEST(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) {
     client.close();
 }
 
-TEST(ReaderTest, testReferenceLeak) {
-    Client client(serviceUrl);
-
-    std::string topicName = "persistent://public/default/testReferenceLeak";
-
-    Producer producer;
-    ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
-
-    for (int i = 0; i < 10; i++) {
-        std::string content = "my-message-" + std::to_string(i);
-        Message msg = MessageBuilder().setContent(content).build();
-        ASSERT_EQ(ResultOk, producer.send(msg));
-    }
-
-    ReaderConfiguration readerConf;
-    Reader reader;
-    ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
-
-    ConsumerImplBaseWeakPtr consumerPtr = ReaderTest::getConsumer(reader);
-    ReaderImplWeakPtr readerPtr = ReaderTest::getReaderImplWeakPtr(reader);
-
-    LOG_INFO("1 consumer use count " << consumerPtr.use_count());
-    LOG_INFO("1 reader use count " << readerPtr.use_count());
-
-    for (int i = 0; i < 10; i++) {
-        Message msg;
-        ASSERT_EQ(ResultOk, reader.readNext(msg));
-
-        std::string content = msg.getDataAsString();
-        std::string expected = "my-message-" + std::to_string(i);
-        ASSERT_EQ(expected, content);
-    }
-
-    producer.close();
-    reader.close();
-    // will be released after exit this method.
-    ASSERT_EQ(1, consumerPtr.use_count());
-    ASSERT_EQ(1, readerPtr.use_count());
-    client.close();
-    // will be released after exit this method.
-    ASSERT_EQ(1, consumerPtr.use_count());
-    ASSERT_EQ(1, readerPtr.use_count());
-}
-
 TEST(ReaderTest, testPartitionIndex) {
     Client client(serviceUrl);
 
@@ -519,7 +476,7 @@ TEST(ReaderTest, testSubscriptionNameSetting) {
     Reader reader;
     ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
 
-    ASSERT_EQ(subName, ReaderTest::getConsumer(reader)->getSubscriptionName());
+    ASSERT_EQ(subName, PulsarFriend::getConsumer(reader)->getSubscriptionName());
 
     reader.close();
     client.close();
@@ -537,7 +494,7 @@ TEST(ReaderTest, testSetSubscriptionNameAndPrefix) {
     Reader reader;
     ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
 
-    ASSERT_EQ(subName, ReaderTest::getConsumer(reader)->getSubscriptionName());
+    ASSERT_EQ(subName, PulsarFriend::getConsumer(reader)->getSubscriptionName());
 
     reader.close();
     client.close();
diff --git a/pulsar-client-cpp/tests/ReaderTest.h b/pulsar-client-cpp/tests/ReaderTest.h
deleted file mode 100644
index fd0387f..0000000
--- a/pulsar-client-cpp/tests/ReaderTest.h
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#include "lib/ReaderImpl.h"
-#include <string>
-
-using std::string;
-
-namespace pulsar {
-class ReaderTest {
-   public:
-    static ConsumerImplPtr getConsumer(const Reader& reader) { return reader.impl_->getConsumer(); }
-    static ReaderImplWeakPtr getReaderImplWeakPtr(const Reader& reader) {
-        return reader.impl_->getReaderImplWeakPtr();
-    }
-};
-}  // namespace pulsar