You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/05/24 08:42:17 UTC
[pulsar] branch branch-2.9 updated: [C++] Remove the flaky and meaningless tests (#15271)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new efed983c3e8 [C++] Remove the flaky and meaningless tests (#15271)
efed983c3e8 is described below
commit efed983c3e8f3ab9ee69ac069203bcdb8c8affc2
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri Apr 22 23:48:42 2022 +0800
[C++] Remove the flaky and meaningless tests (#15271)
Fixes #13849
Fixes #14848
### Motivation
#11570 adds a `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers` for
the case that some `sendAsync` calls that are invoked after `closeAsync`
is called in another thread must complete with `ResultAlreadyClosed`.
It's flaky because the synchronization between two threads is not
strict. This test uses `sendStartLatch` for the order of `sendAsync` and
`closeAsync`:
```
sendAsync 0,1,...,9 -> sendStartLatch is done -> closeAsync
```
However, it cannot guarantee the rest `sendAsync` calls happen after
`closeAsync` is called. If so, all `sendAsync` calls will complete with
`ResultOk`.
On the other hand, this test is meaningless because it requires strict
synchronization between two threads so there is no need to run
`sendAsync` and `closeAsync` in two threads.
The verification of this test is also wrong, see
https://github.com/apache/pulsar/issues/13849#issuecomment-1079098248.
When `closeAsync` is called, the previous `sendAsync` calls might not
complete, so all `sendAsync` will complete with `ResultAlreadyClosed`,
not only those called after `closeAsync`.
In addition, this PR also tries to fix the flaky `testReferenceCount`,
which assumes too strictly.
### Modifications
- Remove `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers`
- Only check the reference count is greater than 0 instead of equal to 1
(cherry picked from commit eeea9ca1f6eeef1248b7fe8f36be30be835d2480)
---
pulsar-client-cpp/tests/ClientTest.cc | 2 +-
pulsar-client-cpp/tests/ProducerTest.cc | 83 ---------------------------------
2 files changed, 1 insertion(+), 84 deletions(-)
diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index 1ba0164ad87..364e170f896 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -211,7 +211,7 @@ TEST(ClientTest, testReferenceCount) {
LOG_INFO("Reference count of the reader's underlying consumer: " << consumers[1].use_count());
readerWeakPtr = PulsarFriend::getReaderImplWeakPtr(reader);
- ASSERT_EQ(readerWeakPtr.use_count(), 1);
+ ASSERT_TRUE(readerWeakPtr.use_count() > 0);
LOG_INFO("Reference count of the reader: " << readerWeakPtr.use_count());
}
diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc
index 258811fcdaf..9ddca1f7042 100644
--- a/pulsar-client-cpp/tests/ProducerTest.cc
+++ b/pulsar-client-cpp/tests/ProducerTest.cc
@@ -159,89 +159,6 @@ TEST(ProducerTest, testSendAsyncAfterCloseAsyncWithLazyProducers) {
ASSERT_EQ(ResultOk, result);
}
-TEST(ProducerTest, testSendAsyncCloseAsyncConcurrentlyWithLazyProducers) {
- // run sendAsync and closeAsync concurrently and verify that all sendAsync callbacks are called
- // and that messages sent after closeAsync is invoked receive ResultAlreadyClosed.
- for (int run = 0; run < 20; run++) {
- LOG_INFO("Start of run " << run);
- Client client(serviceUrl);
- const std::string partitionedTopic =
- "testProducerIsConnectedPartitioned-" + std::to_string(time(nullptr));
-
- int res = makePutRequest(
- adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", "10");
- ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
-
- ProducerConfiguration producerConfiguration;
- producerConfiguration.setLazyStartPartitionedProducers(true);
- producerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
- producerConfiguration.setBatchingEnabled(true);
- Producer producer;
- ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfiguration, producer));
-
- int sendCount = 100;
- std::vector<Promise<Result, MessageId>> promises(sendCount);
- Promise<bool, Result> promiseClose;
-
- // only call closeAsync once at least 10 messages have been sent
- Latch sendStartLatch(10);
- Latch closeLatch(1);
- int closedAt = 0;
-
- std::thread t1([&]() {
- for (int i = 0; i < sendCount; i++) {
- sendStartLatch.countdown();
- Message msg = MessageBuilder().setContent("test").build();
-
- if (closeLatch.getCount() == 0 && closedAt == 0) {
- closedAt = i;
- LOG_INFO("closedAt set to " << closedAt)
- }
-
- producer.sendAsync(msg, WaitForCallbackValue<MessageId>(promises[i]));
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- }
- });
-
- std::thread t2([&]() {
- sendStartLatch.wait(std::chrono::milliseconds(1000));
- LOG_INFO("Closing");
- producer.closeAsync(WaitForCallback(promiseClose));
- LOG_INFO("Close called");
- closeLatch.countdown();
- Result result;
- promiseClose.getFuture().get(result);
- ASSERT_EQ(ResultOk, result);
- LOG_INFO("Closed");
- });
-
- t1.join();
- t2.join();
-
- // make sure that all messages after the moment when closeAsync was invoked
- // return AlreadyClosed
- for (int i = 0; i < sendCount; i++) {
- LOG_DEBUG("Checking " << i)
-
- // whether a message was sent successfully or not, it's callback
- // must have been invoked
- ASSERT_EQ(true, promises[i].isComplete());
- MessageId mi;
- Result res = promises[i].getFuture().get(mi);
- LOG_DEBUG("Result is " << res);
-
- // for the messages sent after closeAsync was invoked, they
- // should all return ResultAlreadyClosed
- if (i >= closedAt) {
- ASSERT_EQ(ResultAlreadyClosed, res);
- }
- }
-
- client.close();
- LOG_INFO("End of run " << run);
- }
-}
-
TEST(ProducerTest, testBacklogQuotasExceeded) {
std::string ns = "public/test-backlog-quotas";
std::string topic = ns + "/testBacklogQuotasExceeded" + std::to_string(time(nullptr));