You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/21 12:31:36 UTC

[pulsar] branch branch-2.7 updated (c1e9f9e -> 2f9f639)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from c1e9f9e  Fix single-quotes added to user-conf (#8780)
     new 3ad1730  [C++] Fix race condition in BlockingQueue (#8765)
     new 2f9f639  [Issue 8787][C++] Add reader internal subscription name setter. (#8823)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../include/pulsar/ReaderConfiguration.h           |  7 +++
 pulsar-client-cpp/lib/BlockingQueue.h              | 12 ++---
 pulsar-client-cpp/lib/ReaderConfiguration.cc       |  9 ++++
 pulsar-client-cpp/lib/ReaderConfigurationImpl.h    |  1 +
 pulsar-client-cpp/lib/ReaderImpl.cc                | 11 +++--
 pulsar-client-cpp/tests/BlockingQueueTest.cc       | 39 +++++++++++++++
 pulsar-client-cpp/tests/ReaderTest.cc              | 57 ++++++++++++++++++++++
 7 files changed, 127 insertions(+), 9 deletions(-)


[pulsar] 01/02: [C++] Fix race condition in BlockingQueue (#8765)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3ad17308ff5a924f0a05df7c46cc4d3fcbad511a
Author: TT <er...@users.noreply.github.com>
AuthorDate: Tue Dec 8 17:42:57 2020 +0800

    [C++] Fix race condition in BlockingQueue (#8765)
    
    ### Motivation
    
    BlockingQueue has race condition that can cause threads waiting forever in multithreading environment. ProducerImpl uses BlockingQueue as pendingMessagesQueue_ and can be blocked forever at it. This PR fixes race condition in BlockingQueue.
    
    #### Race condition details
    https://github.com/apache/pulsar/blob/91e2f832178d9ffd5d78161145d895910296c2d9/pulsar-client-cpp/lib/BlockingQueue.h#L172-L185
    Use BlockingQueue::Pop as example, its procedure is:
    1. lock
    2. check wasFull and then change queue state
    3. unlock
    4. if wasFull, notify one thread waiting at queueFullCondition
    
    Race condition sequence:
    1. queue is full and there are multiple threads waiting on queueFullCondition
    2. thread A call Pop, lock, wasFull is true, unlock -> queue has one free space
    3. thread B call Pop, lock, wasFull is false, unlock -> queue has two free spaces
    4. thread A notify one thread waiting at queueFullCondition
    5. queue is no loger full again
    6. result: except one thread is notified by A, other threads waiting on queueFullCondition are waiting forever
    
    ### Modifications
    
    * Use notify_all instead of notify_one to notify threads waiting on condition variables
      Reason: Currently only notify threads when queue is full or empty. After unlock, other threads may change queue state, so thread to notify condition can not determine how queue state changed and should use notify_all in case of more then one  change occured.
    
    ### Verifying this change
    
      - Add a test case BlockingQueueTest.testPushPopRace to test concurrent push and pop
    
    (cherry picked from commit 18b38766fbcf3d5944824e828566edca310ee9d8)
---
 pulsar-client-cpp/lib/BlockingQueue.h        | 12 ++++-----
 pulsar-client-cpp/tests/BlockingQueueTest.cc | 39 ++++++++++++++++++++++++++++
 2 files changed, 45 insertions(+), 6 deletions(-)

diff --git a/pulsar-client-cpp/lib/BlockingQueue.h b/pulsar-client-cpp/lib/BlockingQueue.h
index 5e466bd..2814c4f 100644
--- a/pulsar-client-cpp/lib/BlockingQueue.h
+++ b/pulsar-client-cpp/lib/BlockingQueue.h
@@ -126,7 +126,7 @@ class BlockingQueue {
         lock.unlock();
         if (wasEmpty) {
             // Notify that an element is pushed
-            queueEmptyCondition.notify_one();
+            queueEmptyCondition.notify_all();
         }
     }
 
@@ -145,7 +145,7 @@ class BlockingQueue {
 
         if (wasEmpty) {
             // Notify that an element is pushed
-            queueEmptyCondition.notify_one();
+            queueEmptyCondition.notify_all();
         }
     }
 
@@ -163,7 +163,7 @@ class BlockingQueue {
 
         if (wasEmpty) {
             // Notify that an element is pushed
-            queueEmptyCondition.notify_one();
+            queueEmptyCondition.notify_all();
         }
 
         return true;
@@ -180,7 +180,7 @@ class BlockingQueue {
 
         if (wasFull) {
             // Notify that an element is popped
-            queueFullCondition.notify_one();
+            queueFullCondition.notify_all();
         }
     }
 
@@ -196,7 +196,7 @@ class BlockingQueue {
 
         if (wasFull) {
             // Notify that an element is popped
-            queueFullCondition.notify_one();
+            queueFullCondition.notify_all();
         }
     }
 
@@ -274,7 +274,7 @@ class BlockingQueue {
 
         if (wasFull) {
             // Notify that one spot is now available
-            queueFullCondition.notify_one();
+            queueFullCondition.notify_all();
         }
     }
 
diff --git a/pulsar-client-cpp/tests/BlockingQueueTest.cc b/pulsar-client-cpp/tests/BlockingQueueTest.cc
index 4047e5e..42644e9 100644
--- a/pulsar-client-cpp/tests/BlockingQueueTest.cc
+++ b/pulsar-client-cpp/tests/BlockingQueueTest.cc
@@ -19,6 +19,8 @@
 #include <gtest/gtest.h>
 #include <lib/BlockingQueue.h>
 
+#include <future>
+#include <iostream>
 #include <thread>
 
 class ProducerWorker {
@@ -215,3 +217,40 @@ TEST(BlockingQueueTest, testReservedSpot) {
         ASSERT_EQ(0, queue.size());
     }
 }
+
+TEST(BlockingQueueTest, testPushPopRace) {
+    auto test_logic = []() {
+        size_t size = 5;
+        BlockingQueue<int> queue(size);
+
+        std::vector<std::unique_ptr<ProducerWorker>> producers;
+        for (int i = 0; i < 5; ++i) {
+            producers.emplace_back(new ProducerWorker{queue});
+            producers.back()->produce(1000);
+        }
+
+        // wait for queue full
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+        std::vector<std::unique_ptr<ConsumerWorker>> consumers;
+        for (int i = 0; i < 5; ++i) {
+            consumers.emplace_back(new ConsumerWorker{queue});
+            consumers.back()->consume(1000);
+        }
+
+        auto future = std::async(std::launch::async, [&]() {
+            for (auto& p : producers) p->join();
+            for (auto& c : consumers) c->join();
+        });
+        auto ret = future.wait_for(std::chrono::seconds(5));
+        if (ret == std::future_status::ready) {
+            std::cerr << "Exiting";
+            exit(0);
+        } else {
+            std::cerr << "Threads are not exited in time";
+            exit(1);
+        }
+    };
+
+    ASSERT_EXIT(test_logic(), ::testing::ExitedWithCode(0), "Exiting");
+}


[pulsar] 02/02: [Issue 8787][C++] Add reader internal subscription name setter. (#8823)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2f9f639daed910758b3015ee9883b156f7065a4a
Author: Zike Yang <Ro...@outlook.com>
AuthorDate: Wed Dec 9 01:38:47 2020 +0800

    [Issue 8787][C++] Add reader internal subscription name setter. (#8823)
    
    Master Issue: #8787
    
    ### Motivation
    
    Currently, the reader subscription name can only be generated internally randomly in the C++ client.
    Java client part is at #8801
    
    ### Modifications
    
    Add a setter for the reader's internal subscription name.
    
    ### Verifying this change
    
    This change is already covered by existing tests, such as *testSubscriptionNameSetting*, *testSetSubscriptionNameAndPrefix* and *testMultiSameSubscriptionNameReaderShouldFail*.
    
    (cherry picked from commit 408f9e61c07b436b425f9ef65ccbada022a4e7fd)
---
 .../include/pulsar/ReaderConfiguration.h           |  7 +++
 pulsar-client-cpp/lib/ReaderConfiguration.cc       |  9 ++++
 pulsar-client-cpp/lib/ReaderConfigurationImpl.h    |  1 +
 pulsar-client-cpp/lib/ReaderImpl.cc                | 11 +++--
 pulsar-client-cpp/tests/ReaderTest.cc              | 57 ++++++++++++++++++++++
 5 files changed, 82 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
index bf52c4f..25b0ba3 100644
--- a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
@@ -105,6 +105,13 @@ class PULSAR_PUBLIC ReaderConfiguration {
     void setReadCompacted(bool compacted);
     bool isReadCompacted() const;
 
+    /**
+     * Set the internal subscription name.
+     * @param internal subscriptionName
+     */
+    void setInternalSubscriptionName(std::string internalSubscriptionName);
+    const std::string& getInternalSubscriptionName() const;
+
    private:
     std::shared_ptr<ReaderConfigurationImpl> impl_;
 };
diff --git a/pulsar-client-cpp/lib/ReaderConfiguration.cc b/pulsar-client-cpp/lib/ReaderConfiguration.cc
index fec8eed..eb18d11 100644
--- a/pulsar-client-cpp/lib/ReaderConfiguration.cc
+++ b/pulsar-client-cpp/lib/ReaderConfiguration.cc
@@ -67,4 +67,13 @@ void ReaderConfiguration::setSubscriptionRolePrefix(const std::string& subscript
 bool ReaderConfiguration::isReadCompacted() const { return impl_->readCompacted; }
 
 void ReaderConfiguration::setReadCompacted(bool compacted) { impl_->readCompacted = compacted; }
+
+void ReaderConfiguration::setInternalSubscriptionName(std::string internalSubscriptionName) {
+    impl_->internalSubscriptionName = internalSubscriptionName;
+}
+
+const std::string& ReaderConfiguration::getInternalSubscriptionName() const {
+    return impl_->internalSubscriptionName;
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
index c74b217..3b5cc8a 100644
--- a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
@@ -30,6 +30,7 @@ struct ReaderConfigurationImpl {
     std::string readerName;
     std::string subscriptionRolePrefix;
     bool readCompacted;
+    std::string internalSubscriptionName;
     ReaderConfigurationImpl()
         : schemaInfo(),
           hasReaderListener(false),
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index 1e08977..2ea3430 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -47,9 +47,14 @@ void ReaderImpl::start(const MessageId& startMessageId) {
                                                   std::placeholders::_1, std::placeholders::_2));
     }
 
-    std::string subscription = "reader-" + generateRandomName();
-    if (!readerConf_.getSubscriptionRolePrefix().empty()) {
-        subscription = readerConf_.getSubscriptionRolePrefix() + "-" + subscription;
+    std::string subscription;
+    if (!readerConf_.getInternalSubscriptionName().empty()) {
+        subscription = readerConf_.getInternalSubscriptionName();
+    } else {
+        subscription = "reader-" + generateRandomName();
+        if (!readerConf_.getSubscriptionRolePrefix().empty()) {
+            subscription = readerConf_.getSubscriptionRolePrefix() + "-" + subscription;
+        }
     }
 
     consumer_ = std::make_shared<ConsumerImpl>(
diff --git a/pulsar-client-cpp/tests/ReaderTest.cc b/pulsar-client-cpp/tests/ReaderTest.cc
index db4475b..a485652 100644
--- a/pulsar-client-cpp/tests/ReaderTest.cc
+++ b/pulsar-client-cpp/tests/ReaderTest.cc
@@ -506,3 +506,60 @@ TEST(ReaderTest, testPartitionIndex) {
 
     client.close();
 }
+
+TEST(ReaderTest, testSubscriptionNameSetting) {
+    Client client(serviceUrl);
+
+    std::string topicName = "persistent://public/default/test-subscription-name-setting";
+    std::string subName = "test-sub";
+
+    ReaderConfiguration readerConf;
+    readerConf.setInternalSubscriptionName(subName);
+    Reader reader;
+    ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
+
+    ASSERT_EQ(subName, ReaderTest::getConsumer(reader)->getSubscriptionName());
+
+    reader.close();
+    client.close();
+}
+
+TEST(ReaderTest, testSetSubscriptionNameAndPrefix) {
+    Client client(serviceUrl);
+
+    std::string topicName = "persistent://public/default/testSetSubscriptionNameAndPrefix";
+    std::string subName = "test-sub";
+
+    ReaderConfiguration readerConf;
+    readerConf.setInternalSubscriptionName(subName);
+    readerConf.setSubscriptionRolePrefix("my-prefix");
+    Reader reader;
+    ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
+
+    ASSERT_EQ(subName, ReaderTest::getConsumer(reader)->getSubscriptionName());
+
+    reader.close();
+    client.close();
+}
+
+TEST(ReaderTest, testMultiSameSubscriptionNameReaderShouldFail) {
+    Client client(serviceUrl);
+
+    std::string topicName = "persistent://public/default/testMultiSameSubscriptionNameReaderShouldFail";
+    std::string subscriptionName = "test-sub";
+
+    ReaderConfiguration readerConf1;
+    readerConf1.setInternalSubscriptionName(subscriptionName);
+    Reader reader1;
+    ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf1, reader1));
+
+    ReaderConfiguration readerConf2;
+    readerConf2.setInternalSubscriptionName(subscriptionName);
+    Reader reader2;
+    ASSERT_EQ(ResultConsumerBusy,
+              client.createReader(topicName, MessageId::earliest(), readerConf2, reader2));
+
+    reader1.close();
+    reader2.close();
+    client.close();
+}