You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/12/05 02:01:03 UTC

[pulsar] branch branch-2.10 updated: [fix][cpp] Fix flaky testReferenceCount (#17645)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 380031d913a [fix][cpp] Fix flaky testReferenceCount (#17645)
380031d913a is described below

commit 380031d913aaff6b25a9e78573ac7a464a2be43c
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Sep 15 00:05:38 2022 +0800

    [fix][cpp] Fix flaky testReferenceCount (#17645)
    
    Fixes #14848
    
    ### Motivation
    
    There were several fixes on `ClientTest.testReferenceCount` but it's
    still very flaky.
    
    The root cause is even after a `Reader` went out of the scope and
    destructed, there was still a `Reader` object existed in the thread of
    the event loop. See
    https://github.com/apache/pulsar/blob/845daf5cac23a4dda4a209d91c85804a0bcaf28a/pulsar-client-cpp/lib/ReaderImpl.cc#L88
    
    To verify this point, I added some logs and saw:
    
    ```
    2022-09-14 03:52:28.427 INFO  [140046042864960] Reader:39 | Reader ctor 0x7fffd2a7c110
    # ...
    2022-09-14 03:52:28.444 INFO  [140046039774976] Reader:42 | Reader ctor 0x7f5f0273d720 ReaderImpl(0x7f5efc00a9d0, 3)
    # ...
    2022-09-14 03:52:28.445 INFO  [140046042864960] ClientTest:217 | Reference count of the reader: 4
    # ...
    2022-09-14 03:52:28.445 INFO  [140046042864960] ClientImpl:490 | Closing Pulsar client with 1 producers and 2 consumers
    2022-09-14 03:52:28.445 INFO  [140046039774976] Reader:55 | Reader dtor 0x7f5f0273d720 ReaderImpl(0x7f5efc00a9d0, 3)
    ```
    
    The first `Reader` object 0x7fffd2a7c110 was constructed in main thread
    140046042864960. However, it destructed before another `Reader` object
    0x0x7f5f0273d720 that was constructed in event loop thread
    140046039774976. When the callback passed to `createReaderAsync`
    completed the promise, the `createReader` immediately returns, at the
    same time the `Reader` object in the callback was still in the scope and
    not destructed.
    
    Since `Reader` holds a `shared_ptr<ReaderImpl>` and `ReaderImpl` holds
    a `shared_ptr<ConsumerImpl>`, if we check the reference count too
    quickly, the reference count of the underlying consumer is still
    positive because the `Reader` was not destructed at the moment.
    
    ### Modifications
    
    Since we cannot determine the precise destructed time point because that
    `Reader` object is in the event loop thread, we have to wait for a
    while. This PR adds a `waitUntil` utility function to wait for at most
    some time until the condition is met. Then wait until the reference
    count becomes 0 after the `Reader` object goes out of scope.
    
    Replace `ASSERT_EQ` with `EXPECT_EQ` to let the test continue if it
    failed.
    
    ### Verifying this change
    
    Following the steps here to reproduce:
    https://github.com/apache/pulsar/issues/14848#issuecomment-1246375370
    
    The test never failed even with `--gtest_repeat=100`.
    
    (cherry picked from commit 4ef8dc5abcb2b7a2e1d10449c59331b974d10854)
---
 pulsar-client-cpp/tests/ClientTest.cc | 11 ++++++---
 pulsar-client-cpp/tests/WaitUtils.h   | 43 +++++++++++++++++++++++++++++++++++
 2 files changed, 51 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index 58c889f074a..317cb542faa 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -20,6 +20,7 @@
 
 #include "HttpHelper.h"
 #include "PulsarFriend.h"
+#include "WaitUtils.h"
 
 #include <future>
 #include <pulsar/Client.h>
@@ -231,9 +232,13 @@ TEST(ClientTest, testReferenceCount) {
     ASSERT_EQ(producers.size(), 1);
     ASSERT_EQ(producers[0].use_count(), 0);
     ASSERT_EQ(consumers.size(), 2);
-    ASSERT_EQ(consumers[0].use_count(), 0);
-    ASSERT_EQ(consumers[1].use_count(), 0);
-    ASSERT_EQ(readerWeakPtr.use_count(), 0);
+
+    waitUntil(std::chrono::seconds(1), [&consumers, &readerWeakPtr] {
+        return consumers[0].use_count() == 0 && consumers[1].use_count() == 0 && readerWeakPtr.expired();
+    });
+    EXPECT_EQ(consumers[0].use_count(), 0);
+    EXPECT_EQ(consumers[1].use_count(), 0);
+    EXPECT_EQ(readerWeakPtr.use_count(), 0);
     client.close();
 }
 
diff --git a/pulsar-client-cpp/tests/WaitUtils.h b/pulsar-client-cpp/tests/WaitUtils.h
new file mode 100644
index 00000000000..abe3efccff4
--- /dev/null
+++ b/pulsar-client-cpp/tests/WaitUtils.h
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <chrono>
+#include <functional>
+#include <thread>
+
+namespace pulsar {
+
+template <typename Rep, typename Period>
+inline void waitUntil(std::chrono::duration<Rep, Period> timeout, std::function<bool()> condition) {
+    auto timeoutMs = std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count();
+    while (timeoutMs > 0) {
+        auto now = std::chrono::high_resolution_clock::now();
+        if (condition()) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
+                           std::chrono::high_resolution_clock::now() - now)
+                           .count();
+        timeoutMs -= elapsed;
+    }
+}
+
+}  // namespace pulsar