You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/08 07:46:51 UTC

[pulsar] branch branch-2.6 updated: [C++] Remove namespace check for MultiTopicsConsumerImpl (#9520)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 3031550  [C++] Remove namespace check for MultiTopicsConsumerImpl (#9520)
3031550 is described below

commit 30315508fc53350e1588974a9049d57d895ae862
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Feb 8 13:01:14 2021 +0800

    [C++] Remove namespace check for MultiTopicsConsumerImpl (#9520)
    
    Fixes #9449
    
    ### Motivation
    
    This is a catchup work for https://github.com/apache/pulsar/pull/5716 that supports multiple topic subscriptions across multiple namespaces.
    
    ### Modifications
    
    - Move the check for namespace in `MultiTopicsConsumerImpl` to `PatternMultiTopicsConsumerImpl` that uses a regex subscription.
    - Fix the existed tests for subscription on topics across different namespaces.
    
    ### Verifying this change
    
    This change is already covered by existing tests, such as *BasicEndToEndTest.testMultiTopicsConsumerDifferentNamespace*.
    
    (cherry picked from commit 69e9322bef12d485bd2543f9ed9a32ded4597c5d)
---
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   | 21 --------
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |  1 -
 .../lib/PatternMultiTopicsConsumerImpl.cc          |  4 +-
 .../lib/PatternMultiTopicsConsumerImpl.h           |  1 +
 pulsar-client-cpp/python/pulsar_test.py            |  4 +-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       | 56 ++++++++++++----------
 6 files changed, 38 insertions(+), 49 deletions(-)

diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 85a9868..35b0f57 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -35,7 +35,6 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
       listenerExecutor_(client->getListenerExecutorProvider()->get()),
       messageListener_(conf.getMessageListener()),
       pendingReceives_(),
-      namespaceName_(topicName ? topicName->getNamespaceName() : std::shared_ptr<NamespaceName>()),
       lookupServicePtr_(lookupServicePtr),
       numberTopicPartitions_(std::make_shared<std::atomic<int>>(0)),
       topics_(topics) {
@@ -97,9 +96,6 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
     if (topicsNeedCreate->load() == 0) {
         if (compareAndSetState(Pending, Ready)) {
             LOG_INFO("Successfully Subscribed to Topics");
-            if (!namespaceName_) {
-                namespaceName_ = TopicName::get(topic)->getNamespaceName();
-            }
             multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
         } else {
             LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result);
@@ -122,13 +118,6 @@ Future<Result, Consumer> MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s
         return topicPromise->getFuture();
     }
 
-    if (namespaceName_ && !(*namespaceName_ == *(topicName->getNamespaceName()))) {
-        LOG_ERROR("TopicName namespace not the same with topicsConsumer. wanted namespace: "
-                  << namespaceName_->toString() << " this topic: " << topic);
-        topicPromise->setFailed(ResultInvalidTopicName);
-        return topicPromise->getFuture();
-    }
-
     if (state_ == Closed || state_ == Closing) {
         LOG_ERROR("MultiTopicsConsumer already closed when subscribe.");
         topicPromise->setFailed(ResultAlreadyClosed);
@@ -719,7 +708,6 @@ void MultiTopicsConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerS
 
 std::shared_ptr<TopicName> MultiTopicsConsumerImpl::topicNamesValid(const std::vector<std::string>& topics) {
     TopicNamePtr topicNamePtr = std::shared_ptr<TopicName>();
-    NamespaceNamePtr namespaceNamePtr = std::shared_ptr<NamespaceName>();
 
     // all topics name valid, and all topics have same namespace
     for (std::vector<std::string>::const_iterator itr = topics.begin(); itr != topics.end(); itr++) {
@@ -728,15 +716,6 @@ std::shared_ptr<TopicName> MultiTopicsConsumerImpl::topicNamesValid(const std::v
             LOG_ERROR("Topic name invalid when init " << *itr);
             return std::shared_ptr<TopicName>();
         }
-
-        // all contains same namespace part
-        if (!namespaceNamePtr) {
-            namespaceNamePtr = topicNamePtr->getNamespaceName();
-        } else if (!(*namespaceNamePtr == *(topicNamePtr->getNamespaceName()))) {
-            LOG_ERROR("Different namespace name. expected: " << namespaceNamePtr->toString() << " now:"
-                                                             << topicNamePtr->getNamespaceName()->toString());
-            return std::shared_ptr<TopicName>();
-        }
     }
 
     return topicNamePtr;
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index b91cac1..9b2771dd 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -89,7 +89,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     const std::string subscriptionName_;
     std::string consumerStr_;
     std::string topic_;
-    NamespaceNamePtr namespaceName_;
     const ConsumerConfiguration conf_;
     typedef std::map<std::string, ConsumerImplPtr> ConsumerMap;
     ConsumerMap consumers_;
diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
index 9abe612..f5cba85 100644
--- a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
@@ -33,7 +33,9 @@ PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl(ClientImplPtr cli
       patternString_(pattern),
       pattern_(std::regex(pattern)),
       autoDiscoveryTimer_(),
-      autoDiscoveryRunning_(false) {}
+      autoDiscoveryRunning_(false) {
+    namespaceName_ = TopicName::get(pattern)->getNamespaceName();
+}
 
 const std::regex PatternMultiTopicsConsumerImpl::getPattern() { return pattern_; }
 
diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h
index a37d0da..4ab974c 100644
--- a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h
@@ -63,6 +63,7 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl {
     typedef std::shared_ptr<boost::asio::deadline_timer> TimerPtr;
     TimerPtr autoDiscoveryTimer_;
     bool autoDiscoveryRunning_;
+    NamespaceNamePtr namespaceName_;
 
     void resetAutoDiscoveryTimer();
     void timerGetTopicsOfNamespace(const Result result, const NamespaceTopicsPtr topics);
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index 9f109b8..2ea940e 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -905,12 +905,12 @@ class PulsarTest(TestCase):
         client = Client(self.serviceUrl)
         topic1 = 'persistent://public/default/my-python-topics-consumer-1'
         topic2 = 'persistent://public/default/my-python-topics-consumer-2'
-        topic3 = 'persistent://public/default/my-python-topics-consumer-3'
+        topic3 = 'persistent://public/default-2/my-python-topics-consumer-3' # topic from different namespace
         topics = [topic1, topic2, topic3]
 
         url1 = self.adminUrl + '/admin/v2/persistent/public/default/my-python-topics-consumer-1/partitions'
         url2 = self.adminUrl + '/admin/v2/persistent/public/default/my-python-topics-consumer-2/partitions'
-        url3 = self.adminUrl + '/admin/v2/persistent/public/default/my-python-topics-consumer-3/partitions'
+        url3 = self.adminUrl + '/admin/v2/persistent/public/default-2/my-python-topics-consumer-3/partitions'
 
         doHttpPut(url1, '2')
         doHttpPut(url2, '3')
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index e3ae2f2..6012f97 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1848,33 +1848,41 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerDifferentNamespace) {
     topicNames.push_back(topicName2);
     topicNames.push_back(topicName3);
 
-    // call admin api to make topics partitioned
-    std::string url1 =
-        adminUrl + "admin/v2/persistent/public/default/testMultiTopicsConsumerDifferentNamespace1/partitions";
-    std::string url2 =
-        adminUrl +
-        "admin/v2/persistent/public/default-2/testMultiTopicsConsumerDifferentNamespace2/partitions";
-    std::string url3 =
-        adminUrl +
-        "admin/v2/persistent/public/default-3/testMultiTopicsConsumerDifferentNamespace3/partitions";
-
-    int res = makePutRequest(url1, "2");
-    ASSERT_FALSE(res != 204 && res != 409);
-    res = makePutRequest(url2, "3");
-    ASSERT_FALSE(res != 204 && res != 409);
-    res = makePutRequest(url3, "4");
-    ASSERT_FALSE(res != 204 && res != 409);
+    // key: message value integer, value: a pair of (topic, message id)
+    using MessageInfo = std::pair<std::string, MessageId>;
+    std::map<int, MessageInfo> messageIndexToInfo;
+    int index = 0;
+    // Produce some messages for each topic
+    for (const auto &topic : topicNames) {
+        Producer producer;
+        ProducerConfiguration producerConfig;
+        ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer));
+
+        const auto message = MessageBuilder().setContent(std::to_string(index)).build();
+        MessageId messageId;
+        ASSERT_EQ(ResultOk, producer.send(message, messageId));
+        messageIndexToInfo[index] = std::make_pair(topic, messageId);
+        LOG_INFO("Send " << index << " to " << topic << ", " << messageId);
+
+        ASSERT_EQ(ResultOk, producer.close());
+        index++;
+    }
 
-    // empty topics
     ConsumerConfiguration consConfig;
-    consConfig.setConsumerType(ConsumerShared);
+    consConfig.setSubscriptionInitialPosition(InitialPositionEarliest);
     Consumer consumer;
-    Promise<Result, Consumer> consumerPromise;
-    client.subscribeAsync(topicNames, subName, consConfig, WaitForCallbackValue<Consumer>(consumerPromise));
-    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
-    Result result = consumerFuture.get(consumer);
-    ASSERT_EQ(ResultInvalidTopicName, result);
-    LOG_INFO("subscribe on topics with different names should fail");
+    ASSERT_EQ(ResultOk, client.subscribe(topicNames, subName, consConfig, consumer));
+
+    for (int i = 0; i < index; i++) {
+        Message message;
+        ASSERT_EQ(ResultOk, consumer.receive(message, 3000));
+        ASSERT_EQ(ResultOk, consumer.acknowledge(message));
+        const int index = std::stoi(message.getDataAsString());
+        LOG_INFO("Receive " << index << " from " << message.getTopicName() << "," << message.getMessageId());
+        ASSERT_EQ(messageIndexToInfo.count(index), 1);
+        ASSERT_EQ(messageIndexToInfo[index], std::make_pair(message.getTopicName(), message.getMessageId()));
+    }
+
     consumer.close();
 
     client.shutdown();