You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/05/24 08:42:17 UTC

[pulsar] branch branch-2.9 updated: [C++] Remove the flaky and meaningless tests (#15271)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new efed983c3e8 [C++] Remove the flaky and meaningless tests (#15271)
efed983c3e8 is described below

commit efed983c3e8f3ab9ee69ac069203bcdb8c8affc2
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri Apr 22 23:48:42 2022 +0800

    [C++] Remove the flaky and meaningless tests (#15271)
    
    Fixes #13849
    Fixes #14848
    
    ### Motivation
    
    #11570 adds a `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers` for
    the case that some `sendAsync` calls that are invoked after `closeAsync`
    is called in another thread must complete with `ResultAlreadyClosed`.
    It's flaky because the synchronization between two threads is not
    strict. This test uses `sendStartLatch` for the order of `sendAsync` and
    `closeAsync`:
    
    ```
    sendAsync 0,1,...,9 -> sendStartLatch is done -> closeAsync
    ```
    
    However, it cannot guarantee the rest `sendAsync` calls happen after
    `closeAsync` is called. If so, all `sendAsync` calls will complete with
    `ResultOk`.
    
    On the other hand, this test is meaningless because it requires strict
    synchronization between two threads so there is no need to run
    `sendAsync` and `closeAsync` in two threads.
    
    The verification of this test is also wrong, see
    https://github.com/apache/pulsar/issues/13849#issuecomment-1079098248.
    When `closeAsync` is called, the previous `sendAsync` calls might not
    complete, so all `sendAsync` will complete with `ResultAlreadyClosed`,
    not only those called after `closeAsync`.
    
    In addition, this PR also tries to fix the flaky `testReferenceCount`,
    which assumes too strictly.
    
    ### Modifications
    
    - Remove `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers`
    - Only check the reference count is greater than 0 instead of equal to 1
    
    (cherry picked from commit eeea9ca1f6eeef1248b7fe8f36be30be835d2480)
---
 pulsar-client-cpp/tests/ClientTest.cc   |  2 +-
 pulsar-client-cpp/tests/ProducerTest.cc | 83 ---------------------------------
 2 files changed, 1 insertion(+), 84 deletions(-)

diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index 1ba0164ad87..364e170f896 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -211,7 +211,7 @@ TEST(ClientTest, testReferenceCount) {
         LOG_INFO("Reference count of the reader's underlying consumer: " << consumers[1].use_count());
 
         readerWeakPtr = PulsarFriend::getReaderImplWeakPtr(reader);
-        ASSERT_EQ(readerWeakPtr.use_count(), 1);
+        ASSERT_TRUE(readerWeakPtr.use_count() > 0);
         LOG_INFO("Reference count of the reader: " << readerWeakPtr.use_count());
     }
 
diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc
index 258811fcdaf..9ddca1f7042 100644
--- a/pulsar-client-cpp/tests/ProducerTest.cc
+++ b/pulsar-client-cpp/tests/ProducerTest.cc
@@ -159,89 +159,6 @@ TEST(ProducerTest, testSendAsyncAfterCloseAsyncWithLazyProducers) {
     ASSERT_EQ(ResultOk, result);
 }
 
-TEST(ProducerTest, testSendAsyncCloseAsyncConcurrentlyWithLazyProducers) {
-    // run sendAsync and closeAsync concurrently and verify that all sendAsync callbacks are called
-    // and that messages sent after closeAsync is invoked receive ResultAlreadyClosed.
-    for (int run = 0; run < 20; run++) {
-        LOG_INFO("Start of run " << run);
-        Client client(serviceUrl);
-        const std::string partitionedTopic =
-            "testProducerIsConnectedPartitioned-" + std::to_string(time(nullptr));
-
-        int res = makePutRequest(
-            adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", "10");
-        ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
-
-        ProducerConfiguration producerConfiguration;
-        producerConfiguration.setLazyStartPartitionedProducers(true);
-        producerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
-        producerConfiguration.setBatchingEnabled(true);
-        Producer producer;
-        ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfiguration, producer));
-
-        int sendCount = 100;
-        std::vector<Promise<Result, MessageId>> promises(sendCount);
-        Promise<bool, Result> promiseClose;
-
-        // only call closeAsync once at least 10 messages have been sent
-        Latch sendStartLatch(10);
-        Latch closeLatch(1);
-        int closedAt = 0;
-
-        std::thread t1([&]() {
-            for (int i = 0; i < sendCount; i++) {
-                sendStartLatch.countdown();
-                Message msg = MessageBuilder().setContent("test").build();
-
-                if (closeLatch.getCount() == 0 && closedAt == 0) {
-                    closedAt = i;
-                    LOG_INFO("closedAt set to " << closedAt)
-                }
-
-                producer.sendAsync(msg, WaitForCallbackValue<MessageId>(promises[i]));
-                std::this_thread::sleep_for(std::chrono::milliseconds(1));
-            }
-        });
-
-        std::thread t2([&]() {
-            sendStartLatch.wait(std::chrono::milliseconds(1000));
-            LOG_INFO("Closing");
-            producer.closeAsync(WaitForCallback(promiseClose));
-            LOG_INFO("Close called");
-            closeLatch.countdown();
-            Result result;
-            promiseClose.getFuture().get(result);
-            ASSERT_EQ(ResultOk, result);
-            LOG_INFO("Closed");
-        });
-
-        t1.join();
-        t2.join();
-
-        // make sure that all messages after the moment when closeAsync was invoked
-        // return AlreadyClosed
-        for (int i = 0; i < sendCount; i++) {
-            LOG_DEBUG("Checking " << i)
-
-            // whether a message was sent successfully or not, it's callback
-            // must have been invoked
-            ASSERT_EQ(true, promises[i].isComplete());
-            MessageId mi;
-            Result res = promises[i].getFuture().get(mi);
-            LOG_DEBUG("Result is " << res);
-
-            // for the messages sent after closeAsync was invoked, they
-            // should all return ResultAlreadyClosed
-            if (i >= closedAt) {
-                ASSERT_EQ(ResultAlreadyClosed, res);
-            }
-        }
-
-        client.close();
-        LOG_INFO("End of run " << run);
-    }
-}
-
 TEST(ProducerTest, testBacklogQuotasExceeded) {
     std::string ns = "public/test-backlog-quotas";
     std::string topic = ns + "/testBacklogQuotasExceeded" + std::to_string(time(nullptr));