You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/05/28 18:01:51 UTC
[pulsar] branch master updated: [C++] PIP-55: Refresh
Authentication Credentials (#7070)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9b50aac [C++] PIP-55: Refresh Authentication Credentials (#7070)
9b50aac is described below
commit 9b50aac5586d325bb1af4229778486e23283dc75
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu May 28 11:01:31 2020 -0700
[C++] PIP-55: Refresh Authentication Credentials (#7070)
---
pulsar-client-cpp/lib/ClientConnection.cc | 18 ++++++++++++++++++
pulsar-client-cpp/lib/ClientConnection.h | 1 +
pulsar-client-cpp/lib/Commands.cc | 20 ++++++++++++++++++++
pulsar-client-cpp/lib/Commands.h | 2 ++
4 files changed, 41 insertions(+)
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index e40f2f8..fe43805 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -414,6 +414,15 @@ void ClientConnection::handleSentPulsarConnect(const boost::system::error_code&
readNextCommand();
}
+void ClientConnection::handleSentAuthResponse(const boost::system::error_code& err,
+ const SharedBuffer& buffer) {
+ if (err) {
+ LOG_WARN(cnxString_ << "Failed to send auth response: " << err.message());
+ close();
+ return;
+ }
+}
+
/*
* Async method to establish TCP connection with broker
*
@@ -1026,6 +1035,15 @@ void ClientConnection::handleIncomingCommand() {
break;
}
+ case BaseCommand::AUTH_CHALLENGE: {
+ LOG_DEBUG(cnxString_ << "Received auth challenge from broker");
+
+ SharedBuffer buffer = Commands::newAuthResponse(authentication_);
+ asyncWrite(buffer.const_asio_buffer(),
+ std::bind(&ClientConnection::handleSentAuthResponse, shared_from_this(),
+ std::placeholders::_1, buffer));
+ }
+
case BaseCommand::ACTIVE_CONSUMER_CHANGE: {
LOG_DEBUG(cnxString_ << "Received notification about active consumer changes");
// ignore this message for now.
diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h
index e350eaf..a18280e 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -181,6 +181,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
void handleHandshake(const boost::system::error_code& err);
void handleSentPulsarConnect(const boost::system::error_code& err, const SharedBuffer& buffer);
+ void handleSentAuthResponse(const boost::system::error_code& err, const SharedBuffer& buffer);
void readNextCommand();
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 7d1f53e..63173cb 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -215,6 +215,9 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const
connect->set_client_version(_PULSAR_VERSION_);
connect->set_auth_method_name(authentication->getAuthMethodName());
connect->set_protocol_version(ProtocolVersion_MAX);
+
+ FeatureFlags* flags = connect->mutable_feature_flags();
+ flags->set_supports_auth_refresh(true);
if (connectingThroughProxy) {
Url logicalAddressUrl;
Url::parse(logicalAddress, logicalAddressUrl);
@@ -228,6 +231,23 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const
return writeMessageWithSize(cmd);
}
+SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication) {
+ BaseCommand cmd;
+ cmd.set_type(BaseCommand::AUTH_RESPONSE);
+ CommandAuthResponse* authResponse = cmd.mutable_authresponse();
+ authResponse->set_client_version(_PULSAR_VERSION_);
+
+ AuthData* authData = authResponse->mutable_response();
+ authData->set_auth_method_name(authentication->getAuthMethodName());
+
+ AuthenticationDataPtr authDataContent;
+ if (authentication->getAuthData(authDataContent) == ResultOk && authDataContent->hasDataFromCommand()) {
+ authData->set_auth_data(authDataContent->getCommandData());
+ }
+
+ return writeMessageWithSize(cmd);
+}
+
SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription,
uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType,
const std::string& consumerName, SubscriptionMode subscriptionMode,
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index 411fec7..0d1410a 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -71,6 +71,8 @@ class Commands {
static SharedBuffer newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress,
bool connectingThroughProxy);
+ static SharedBuffer newAuthResponse(const AuthenticationPtr& authentication);
+
static SharedBuffer newPartitionMetadataRequest(const std::string& topic, uint64_t requestId);
static SharedBuffer newLookup(const std::string& topic, const bool authoritative, uint64_t requestId);