You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2021/01/06 06:21:01 UTC
[pulsar] branch master updated: add listenerName support to
client-cpp (#9119)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7d7feb3 add listenerName support to client-cpp (#9119)
7d7feb3 is described below
commit 7d7feb36068e2764c6d691e551a4caa53fefb33d
Author: z2665 <z2...@hotmail.com>
AuthorDate: Wed Jan 6 14:20:37 2021 +0800
add listenerName support to client-cpp (#9119)
### Motivation
since pulsar 2.6.1 java client support the listenerName to support access pular server with advertisedListeners
but client-cpp not support
### Modifications
1. add setListenerName/getListenerName to ClientConfiguration
2. add ListenerName to TopicLookup in BinaryProtoLookupService
---
pulsar-client-cpp/include/pulsar/ClientConfiguration.h | 3 +++
pulsar-client-cpp/lib/BinaryProtoLookupService.cc | 17 +++++++++++++----
pulsar-client-cpp/lib/BinaryProtoLookupService.h | 7 ++++++-
pulsar-client-cpp/lib/ClientConfiguration.cc | 7 +++++++
pulsar-client-cpp/lib/ClientConfigurationImpl.h | 1 +
pulsar-client-cpp/lib/ClientConnection.cc | 5 +++--
pulsar-client-cpp/lib/ClientConnection.h | 4 ++--
pulsar-client-cpp/lib/ClientImpl.cc | 3 ++-
pulsar-client-cpp/lib/Commands.cc | 4 +++-
pulsar-client-cpp/lib/Commands.h | 3 ++-
10 files changed, 42 insertions(+), 12 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
index 9bb63d4..9a5ecb7 100644
--- a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
@@ -141,6 +141,9 @@ class PULSAR_PUBLIC ClientConfiguration {
ClientConfiguration& setValidateHostName(bool validateHostName);
bool isValidateHostName() const;
+ ClientConfiguration& setListenerName(const std::string& listenerName);
+ const std::string& getListenerName() const;
+
/*
* Initialize stats interval in seconds. Stats are printed and reset after every 'statsIntervalInSeconds'.
* Set to 0 in order to disable stats collection.
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
index 160efc9..069e0e2 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
@@ -36,6 +36,14 @@ namespace pulsar {
BinaryProtoLookupService::BinaryProtoLookupService(ConnectionPool& cnxPool, const std::string& lookupUrl)
: cnxPool_(cnxPool), serviceUrl_(lookupUrl), mutex_(), requestIdGenerator_(0) {}
+BinaryProtoLookupService::BinaryProtoLookupService(ConnectionPool& cnxPool, const std::string& lookupUrl,
+ const std::string& listenerName)
+ : cnxPool_(cnxPool),
+ serviceUrl_(lookupUrl),
+ listenerName_(listenerName),
+ mutex_(),
+ requestIdGenerator_(0) {}
+
/*
* @param topicName topic name to get broker for
*
@@ -53,7 +61,7 @@ Future<Result, LookupDataResultPtr> BinaryProtoLookupService::lookupAsync(const
LookupDataResultPromisePtr promise = std::make_shared<LookupDataResultPromise>();
Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_);
future.addListener(std::bind(&BinaryProtoLookupService::sendTopicLookupRequest, this, lookupName, false,
- std::placeholders::_1, std::placeholders::_2, promise));
+ listenerName_, std::placeholders::_1, std::placeholders::_2, promise));
return promise->getFuture();
}
@@ -76,7 +84,8 @@ Future<Result, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetada
}
void BinaryProtoLookupService::sendTopicLookupRequest(const std::string& topicName, bool authoritative,
- Result result, const ClientConnectionWeakPtr& clientCnx,
+ const std::string& listenerName, Result result,
+ const ClientConnectionWeakPtr& clientCnx,
LookupDataResultPromisePtr promise) {
if (result != ResultOk) {
promise->setFailed(ResultConnectError);
@@ -85,7 +94,7 @@ void BinaryProtoLookupService::sendTopicLookupRequest(const std::string& topicNa
LookupDataResultPromisePtr lookupPromise = std::make_shared<LookupDataResultPromise>();
ClientConnectionPtr conn = clientCnx.lock();
uint64_t requestId = newRequestId();
- conn->newTopicLookup(topicName, authoritative, requestId, lookupPromise);
+ conn->newTopicLookup(topicName, authoritative, listenerName, requestId, lookupPromise);
lookupPromise->getFuture().addListener(std::bind(&BinaryProtoLookupService::handleLookup, this, topicName,
std::placeholders::_1, std::placeholders::_2, clientCnx,
promise));
@@ -105,7 +114,7 @@ void BinaryProtoLookupService::handleLookup(const std::string& topicName, Result
Future<Result, ClientConnectionWeakPtr> future =
cnxPool_.getConnectionAsync(logicalAddress, physicalAddress);
future.addListener(std::bind(&BinaryProtoLookupService::sendTopicLookupRequest, this, topicName,
- data->isAuthoritative(), std::placeholders::_1,
+ data->isAuthoritative(), listenerName_, std::placeholders::_1,
std::placeholders::_2, promise));
} else {
LOG_DEBUG("Lookup response for " << topicName << ", lookup-broker-url " << data->getBrokerUrl());
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.h b/pulsar-client-cpp/lib/BinaryProtoLookupService.h
index 4a6fba2..c4cf0d0 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.h
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.h
@@ -37,6 +37,9 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
*/
BinaryProtoLookupService(ConnectionPool& cnxPool, const std::string& serviceUrl);
+ BinaryProtoLookupService(ConnectionPool& cnxPool, const std::string& serviceUrl,
+ const std::string& listenerName);
+
Future<Result, LookupDataResultPtr> lookupAsync(const std::string& topicName);
Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName);
@@ -49,8 +52,10 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
std::string serviceUrl_;
ConnectionPool& cnxPool_;
+ std::string listenerName_;
- void sendTopicLookupRequest(const std::string& topicName, bool authoritative, Result result,
+ void sendTopicLookupRequest(const std::string& topicName, bool authoritative,
+ const std::string& listenerName, Result result,
const ClientConnectionWeakPtr& clientCnx, LookupDataResultPromisePtr promise);
void handleLookup(const std::string& topicName, Result result, LookupDataResultPtr data,
diff --git a/pulsar-client-cpp/lib/ClientConfiguration.cc b/pulsar-client-cpp/lib/ClientConfiguration.cc
index ad210f6..a70c22e 100644
--- a/pulsar-client-cpp/lib/ClientConfiguration.cc
+++ b/pulsar-client-cpp/lib/ClientConfiguration.cc
@@ -126,4 +126,11 @@ ClientConfiguration& ClientConfiguration::setPartititionsUpdateInterval(unsigned
unsigned int ClientConfiguration::getPartitionsUpdateInterval() const {
return impl_->partitionsUpdateInterval;
}
+
+ClientConfiguration& ClientConfiguration::setListenerName(const std::string& listenerName) {
+ impl_->listenerName = listenerName;
+ return *this;
+}
+
+const std::string& ClientConfiguration::getListenerName() const { return impl_->listenerName; }
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ClientConfigurationImpl.h b/pulsar-client-cpp/lib/ClientConfigurationImpl.h
index 11cd8c9..4dc4c23 100644
--- a/pulsar-client-cpp/lib/ClientConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ClientConfigurationImpl.h
@@ -37,6 +37,7 @@ struct ClientConfigurationImpl {
std::unique_ptr<LoggerFactory> loggerFactory;
bool validateHostName;
unsigned int partitionsUpdateInterval;
+ std::string listenerName;
ClientConfigurationImpl()
: authenticationPtr(AuthFactory::Disabled()),
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index d2fb659..b4d1bd1 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -1186,8 +1186,9 @@ Future<Result, BrokerConsumerStatsImpl> ClientConnection::newConsumerStats(uint6
}
void ClientConnection::newTopicLookup(const std::string& topicName, bool authoritative,
- const uint64_t requestId, LookupDataResultPromisePtr promise) {
- newLookup(Commands::newLookup(topicName, authoritative, requestId), requestId, promise);
+ const std::string& listenerName, const uint64_t requestId,
+ LookupDataResultPromisePtr promise) {
+ newLookup(Commands::newLookup(topicName, authoritative, requestId, listenerName), requestId, promise);
}
void ClientConnection::newPartitionedMetadataLookup(const std::string& topicName, const uint64_t requestId,
diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h
index 99d9d45..71db1ad 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -120,8 +120,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
Future<Result, ClientConnectionWeakPtr> getCloseFuture();
- void newTopicLookup(const std::string& topicName, bool authoritative, const uint64_t requestId,
- LookupDataResultPromisePtr promise);
+ void newTopicLookup(const std::string& topicName, bool authoritative, const std::string& listenerName,
+ const uint64_t requestId, LookupDataResultPromisePtr promise);
void newPartitionedMetadataLookup(const std::string& topicName, const uint64_t requestId,
LookupDataResultPromisePtr promise);
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 83af939..af144ab 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -116,7 +116,8 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
std::cref(clientConfiguration_.getAuthPtr()));
} else {
LOG_DEBUG("Using Binary Lookup");
- lookupServicePtr_ = std::make_shared<BinaryProtoLookupService>(std::ref(pool_), std::ref(serviceUrl));
+ lookupServicePtr_ = std::make_shared<BinaryProtoLookupService>(
+ std::ref(pool_), std::ref(serviceUrl), std::cref(clientConfiguration_.getListenerName()));
}
}
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 4df47c8..009c19e 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -108,7 +108,8 @@ SharedBuffer Commands::newPartitionMetadataRequest(const std::string& topic, uin
return buffer;
}
-SharedBuffer Commands::newLookup(const std::string& topic, const bool authoritative, uint64_t requestId) {
+SharedBuffer Commands::newLookup(const std::string& topic, const bool authoritative, uint64_t requestId,
+ const std::string& listenerName) {
static BaseCommand cmd;
static std::mutex mutex;
std::lock_guard<std::mutex> lock(mutex);
@@ -117,6 +118,7 @@ SharedBuffer Commands::newLookup(const std::string& topic, const bool authoritat
lookup->set_topic(topic);
lookup->set_authoritative(authoritative);
lookup->set_request_id(requestId);
+ lookup->set_advertised_listener_name(listenerName);
const SharedBuffer buffer = writeMessageWithSize(cmd);
cmd.clear_lookuptopic();
return buffer;
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index 18f0049..abcb44b 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -76,7 +76,8 @@ class Commands {
static SharedBuffer newPartitionMetadataRequest(const std::string& topic, uint64_t requestId);
- static SharedBuffer newLookup(const std::string& topic, const bool authoritative, uint64_t requestId);
+ static SharedBuffer newLookup(const std::string& topic, const bool authoritative, uint64_t requestId,
+ const std::string& listenerName);
static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& cmd, uint64_t producerId,
uint64_t sequenceId, ChecksumType checksumType, const Message& msg);