You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/08 07:46:51 UTC
[pulsar] branch branch-2.6 updated: [C++] Remove namespace check
for MultiTopicsConsumerImpl (#9520)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 3031550 [C++] Remove namespace check for MultiTopicsConsumerImpl (#9520)
3031550 is described below
commit 30315508fc53350e1588974a9049d57d895ae862
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Feb 8 13:01:14 2021 +0800
[C++] Remove namespace check for MultiTopicsConsumerImpl (#9520)
Fixes #9449
### Motivation
This is a catchup work for https://github.com/apache/pulsar/pull/5716 that supports multiple topic subscriptions across multiple namespaces.
### Modifications
- Move the check for namespace in `MultiTopicsConsumerImpl` to `PatternMultiTopicsConsumerImpl` that uses a regex subscription.
- Fix the existed tests for subscription on topics across different namespaces.
### Verifying this change
This change is already covered by existing tests, such as *BasicEndToEndTest.testMultiTopicsConsumerDifferentNamespace*.
(cherry picked from commit 69e9322bef12d485bd2543f9ed9a32ded4597c5d)
---
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 21 --------
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h | 1 -
.../lib/PatternMultiTopicsConsumerImpl.cc | 4 +-
.../lib/PatternMultiTopicsConsumerImpl.h | 1 +
pulsar-client-cpp/python/pulsar_test.py | 4 +-
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 56 ++++++++++++----------
6 files changed, 38 insertions(+), 49 deletions(-)
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 85a9868..35b0f57 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -35,7 +35,6 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
listenerExecutor_(client->getListenerExecutorProvider()->get()),
messageListener_(conf.getMessageListener()),
pendingReceives_(),
- namespaceName_(topicName ? topicName->getNamespaceName() : std::shared_ptr<NamespaceName>()),
lookupServicePtr_(lookupServicePtr),
numberTopicPartitions_(std::make_shared<std::atomic<int>>(0)),
topics_(topics) {
@@ -97,9 +96,6 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
if (topicsNeedCreate->load() == 0) {
if (compareAndSetState(Pending, Ready)) {
LOG_INFO("Successfully Subscribed to Topics");
- if (!namespaceName_) {
- namespaceName_ = TopicName::get(topic)->getNamespaceName();
- }
multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
} else {
LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result);
@@ -122,13 +118,6 @@ Future<Result, Consumer> MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s
return topicPromise->getFuture();
}
- if (namespaceName_ && !(*namespaceName_ == *(topicName->getNamespaceName()))) {
- LOG_ERROR("TopicName namespace not the same with topicsConsumer. wanted namespace: "
- << namespaceName_->toString() << " this topic: " << topic);
- topicPromise->setFailed(ResultInvalidTopicName);
- return topicPromise->getFuture();
- }
-
if (state_ == Closed || state_ == Closing) {
LOG_ERROR("MultiTopicsConsumer already closed when subscribe.");
topicPromise->setFailed(ResultAlreadyClosed);
@@ -719,7 +708,6 @@ void MultiTopicsConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerS
std::shared_ptr<TopicName> MultiTopicsConsumerImpl::topicNamesValid(const std::vector<std::string>& topics) {
TopicNamePtr topicNamePtr = std::shared_ptr<TopicName>();
- NamespaceNamePtr namespaceNamePtr = std::shared_ptr<NamespaceName>();
// all topics name valid, and all topics have same namespace
for (std::vector<std::string>::const_iterator itr = topics.begin(); itr != topics.end(); itr++) {
@@ -728,15 +716,6 @@ std::shared_ptr<TopicName> MultiTopicsConsumerImpl::topicNamesValid(const std::v
LOG_ERROR("Topic name invalid when init " << *itr);
return std::shared_ptr<TopicName>();
}
-
- // all contains same namespace part
- if (!namespaceNamePtr) {
- namespaceNamePtr = topicNamePtr->getNamespaceName();
- } else if (!(*namespaceNamePtr == *(topicNamePtr->getNamespaceName()))) {
- LOG_ERROR("Different namespace name. expected: " << namespaceNamePtr->toString() << " now:"
- << topicNamePtr->getNamespaceName()->toString());
- return std::shared_ptr<TopicName>();
- }
}
return topicNamePtr;
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index b91cac1..9b2771dd 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -89,7 +89,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
const std::string subscriptionName_;
std::string consumerStr_;
std::string topic_;
- NamespaceNamePtr namespaceName_;
const ConsumerConfiguration conf_;
typedef std::map<std::string, ConsumerImplPtr> ConsumerMap;
ConsumerMap consumers_;
diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
index 9abe612..f5cba85 100644
--- a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
@@ -33,7 +33,9 @@ PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl(ClientImplPtr cli
patternString_(pattern),
pattern_(std::regex(pattern)),
autoDiscoveryTimer_(),
- autoDiscoveryRunning_(false) {}
+ autoDiscoveryRunning_(false) {
+ namespaceName_ = TopicName::get(pattern)->getNamespaceName();
+}
const std::regex PatternMultiTopicsConsumerImpl::getPattern() { return pattern_; }
diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h
index a37d0da..4ab974c 100644
--- a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h
@@ -63,6 +63,7 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl {
typedef std::shared_ptr<boost::asio::deadline_timer> TimerPtr;
TimerPtr autoDiscoveryTimer_;
bool autoDiscoveryRunning_;
+ NamespaceNamePtr namespaceName_;
void resetAutoDiscoveryTimer();
void timerGetTopicsOfNamespace(const Result result, const NamespaceTopicsPtr topics);
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index 9f109b8..2ea940e 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -905,12 +905,12 @@ class PulsarTest(TestCase):
client = Client(self.serviceUrl)
topic1 = 'persistent://public/default/my-python-topics-consumer-1'
topic2 = 'persistent://public/default/my-python-topics-consumer-2'
- topic3 = 'persistent://public/default/my-python-topics-consumer-3'
+ topic3 = 'persistent://public/default-2/my-python-topics-consumer-3' # topic from different namespace
topics = [topic1, topic2, topic3]
url1 = self.adminUrl + '/admin/v2/persistent/public/default/my-python-topics-consumer-1/partitions'
url2 = self.adminUrl + '/admin/v2/persistent/public/default/my-python-topics-consumer-2/partitions'
- url3 = self.adminUrl + '/admin/v2/persistent/public/default/my-python-topics-consumer-3/partitions'
+ url3 = self.adminUrl + '/admin/v2/persistent/public/default-2/my-python-topics-consumer-3/partitions'
doHttpPut(url1, '2')
doHttpPut(url2, '3')
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index e3ae2f2..6012f97 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1848,33 +1848,41 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerDifferentNamespace) {
topicNames.push_back(topicName2);
topicNames.push_back(topicName3);
- // call admin api to make topics partitioned
- std::string url1 =
- adminUrl + "admin/v2/persistent/public/default/testMultiTopicsConsumerDifferentNamespace1/partitions";
- std::string url2 =
- adminUrl +
- "admin/v2/persistent/public/default-2/testMultiTopicsConsumerDifferentNamespace2/partitions";
- std::string url3 =
- adminUrl +
- "admin/v2/persistent/public/default-3/testMultiTopicsConsumerDifferentNamespace3/partitions";
-
- int res = makePutRequest(url1, "2");
- ASSERT_FALSE(res != 204 && res != 409);
- res = makePutRequest(url2, "3");
- ASSERT_FALSE(res != 204 && res != 409);
- res = makePutRequest(url3, "4");
- ASSERT_FALSE(res != 204 && res != 409);
+ // key: message value integer, value: a pair of (topic, message id)
+ using MessageInfo = std::pair<std::string, MessageId>;
+ std::map<int, MessageInfo> messageIndexToInfo;
+ int index = 0;
+ // Produce some messages for each topic
+ for (const auto &topic : topicNames) {
+ Producer producer;
+ ProducerConfiguration producerConfig;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer));
+
+ const auto message = MessageBuilder().setContent(std::to_string(index)).build();
+ MessageId messageId;
+ ASSERT_EQ(ResultOk, producer.send(message, messageId));
+ messageIndexToInfo[index] = std::make_pair(topic, messageId);
+ LOG_INFO("Send " << index << " to " << topic << ", " << messageId);
+
+ ASSERT_EQ(ResultOk, producer.close());
+ index++;
+ }
- // empty topics
ConsumerConfiguration consConfig;
- consConfig.setConsumerType(ConsumerShared);
+ consConfig.setSubscriptionInitialPosition(InitialPositionEarliest);
Consumer consumer;
- Promise<Result, Consumer> consumerPromise;
- client.subscribeAsync(topicNames, subName, consConfig, WaitForCallbackValue<Consumer>(consumerPromise));
- Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
- Result result = consumerFuture.get(consumer);
- ASSERT_EQ(ResultInvalidTopicName, result);
- LOG_INFO("subscribe on topics with different names should fail");
+ ASSERT_EQ(ResultOk, client.subscribe(topicNames, subName, consConfig, consumer));
+
+ for (int i = 0; i < index; i++) {
+ Message message;
+ ASSERT_EQ(ResultOk, consumer.receive(message, 3000));
+ ASSERT_EQ(ResultOk, consumer.acknowledge(message));
+ const int index = std::stoi(message.getDataAsString());
+ LOG_INFO("Receive " << index << " from " << message.getTopicName() << "," << message.getMessageId());
+ ASSERT_EQ(messageIndexToInfo.count(index), 1);
+ ASSERT_EQ(messageIndexToInfo[index], std::make_pair(message.getTopicName(), message.getMessageId()));
+ }
+
consumer.close();
client.shutdown();