You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/12/05 02:01:03 UTC
[pulsar] branch branch-2.10 updated: [fix][cpp] Fix flaky testReferenceCount (#17645)
This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 380031d913a [fix][cpp] Fix flaky testReferenceCount (#17645)
380031d913a is described below
commit 380031d913aaff6b25a9e78573ac7a464a2be43c
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Sep 15 00:05:38 2022 +0800
[fix][cpp] Fix flaky testReferenceCount (#17645)
Fixes #14848
### Motivation
There were several fixes on `ClientTest.testReferenceCount` but it's
still very flaky.
The root cause is even after a `Reader` went out of the scope and
destructed, there was still a `Reader` object existed in the thread of
the event loop. See
https://github.com/apache/pulsar/blob/845daf5cac23a4dda4a209d91c85804a0bcaf28a/pulsar-client-cpp/lib/ReaderImpl.cc#L88
To verify this point, I added some logs and saw:
```
2022-09-14 03:52:28.427 INFO [140046042864960] Reader:39 | Reader ctor 0x7fffd2a7c110
# ...
2022-09-14 03:52:28.444 INFO [140046039774976] Reader:42 | Reader ctor 0x7f5f0273d720 ReaderImpl(0x7f5efc00a9d0, 3)
# ...
2022-09-14 03:52:28.445 INFO [140046042864960] ClientTest:217 | Reference count of the reader: 4
# ...
2022-09-14 03:52:28.445 INFO [140046042864960] ClientImpl:490 | Closing Pulsar client with 1 producers and 2 consumers
2022-09-14 03:52:28.445 INFO [140046039774976] Reader:55 | Reader dtor 0x7f5f0273d720 ReaderImpl(0x7f5efc00a9d0, 3)
```
The first `Reader` object 0x7fffd2a7c110 was constructed in main thread
140046042864960. However, it destructed before another `Reader` object
0x0x7f5f0273d720 that was constructed in event loop thread
140046039774976. When the callback passed to `createReaderAsync`
completed the promise, the `createReader` immediately returns, at the
same time the `Reader` object in the callback was still in the scope and
not destructed.
Since `Reader` holds a `shared_ptr<ReaderImpl>` and `ReaderImpl` holds
a `shared_ptr<ConsumerImpl>`, if we check the reference count too
quickly, the reference count of the underlying consumer is still
positive because the `Reader` was not destructed at the moment.
### Modifications
Since we cannot determine the precise destructed time point because that
`Reader` object is in the event loop thread, we have to wait for a
while. This PR adds a `waitUntil` utility function to wait for at most
some time until the condition is met. Then wait until the reference
count becomes 0 after the `Reader` object goes out of scope.
Replace `ASSERT_EQ` with `EXPECT_EQ` to let the test continue if it
failed.
### Verifying this change
Following the steps here to reproduce:
https://github.com/apache/pulsar/issues/14848#issuecomment-1246375370
The test never failed even with `--gtest_repeat=100`.
(cherry picked from commit 4ef8dc5abcb2b7a2e1d10449c59331b974d10854)
---
pulsar-client-cpp/tests/ClientTest.cc | 11 ++++++---
pulsar-client-cpp/tests/WaitUtils.h | 43 +++++++++++++++++++++++++++++++++++
2 files changed, 51 insertions(+), 3 deletions(-)
diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index 58c889f074a..317cb542faa 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -20,6 +20,7 @@
#include "HttpHelper.h"
#include "PulsarFriend.h"
+#include "WaitUtils.h"
#include <future>
#include <pulsar/Client.h>
@@ -231,9 +232,13 @@ TEST(ClientTest, testReferenceCount) {
ASSERT_EQ(producers.size(), 1);
ASSERT_EQ(producers[0].use_count(), 0);
ASSERT_EQ(consumers.size(), 2);
- ASSERT_EQ(consumers[0].use_count(), 0);
- ASSERT_EQ(consumers[1].use_count(), 0);
- ASSERT_EQ(readerWeakPtr.use_count(), 0);
+
+ waitUntil(std::chrono::seconds(1), [&consumers, &readerWeakPtr] {
+ return consumers[0].use_count() == 0 && consumers[1].use_count() == 0 && readerWeakPtr.expired();
+ });
+ EXPECT_EQ(consumers[0].use_count(), 0);
+ EXPECT_EQ(consumers[1].use_count(), 0);
+ EXPECT_EQ(readerWeakPtr.use_count(), 0);
client.close();
}
diff --git a/pulsar-client-cpp/tests/WaitUtils.h b/pulsar-client-cpp/tests/WaitUtils.h
new file mode 100644
index 00000000000..abe3efccff4
--- /dev/null
+++ b/pulsar-client-cpp/tests/WaitUtils.h
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <chrono>
+#include <functional>
+#include <thread>
+
+namespace pulsar {
+
+template <typename Rep, typename Period>
+inline void waitUntil(std::chrono::duration<Rep, Period> timeout, std::function<bool()> condition) {
+ auto timeoutMs = std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count();
+ while (timeoutMs > 0) {
+ auto now = std::chrono::high_resolution_clock::now();
+ if (condition()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::high_resolution_clock::now() - now)
+ .count();
+ timeoutMs -= elapsed;
+ }
+}
+
+} // namespace pulsar