You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/05/28 18:01:51 UTC

[pulsar] branch master updated: [C++] PIP-55: Refresh Authentication Credentials (#7070)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b50aac  [C++] PIP-55: Refresh Authentication Credentials (#7070)
9b50aac is described below

commit 9b50aac5586d325bb1af4229778486e23283dc75
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu May 28 11:01:31 2020 -0700

    [C++] PIP-55: Refresh Authentication Credentials (#7070)
---
 pulsar-client-cpp/lib/ClientConnection.cc | 18 ++++++++++++++++++
 pulsar-client-cpp/lib/ClientConnection.h  |  1 +
 pulsar-client-cpp/lib/Commands.cc         | 20 ++++++++++++++++++++
 pulsar-client-cpp/lib/Commands.h          |  2 ++
 4 files changed, 41 insertions(+)

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index e40f2f8..fe43805 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -414,6 +414,15 @@ void ClientConnection::handleSentPulsarConnect(const boost::system::error_code&
     readNextCommand();
 }
 
+void ClientConnection::handleSentAuthResponse(const boost::system::error_code& err,
+                                              const SharedBuffer& buffer) {
+    if (err) {
+        LOG_WARN(cnxString_ << "Failed to send auth response: " << err.message());
+        close();
+        return;
+    }
+}
+
 /*
  * Async method to establish TCP connection with broker
  *
@@ -1026,6 +1035,15 @@ void ClientConnection::handleIncomingCommand() {
                     break;
                 }
 
+                case BaseCommand::AUTH_CHALLENGE: {
+                    LOG_DEBUG(cnxString_ << "Received auth challenge from broker");
+
+                    SharedBuffer buffer = Commands::newAuthResponse(authentication_);
+                    asyncWrite(buffer.const_asio_buffer(),
+                               std::bind(&ClientConnection::handleSentAuthResponse, shared_from_this(),
+                                         std::placeholders::_1, buffer));
+                }
+
                 case BaseCommand::ACTIVE_CONSUMER_CHANGE: {
                     LOG_DEBUG(cnxString_ << "Received notification about active consumer changes");
                     // ignore this message for now.
diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h
index e350eaf..a18280e 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -181,6 +181,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
     void handleHandshake(const boost::system::error_code& err);
 
     void handleSentPulsarConnect(const boost::system::error_code& err, const SharedBuffer& buffer);
+    void handleSentAuthResponse(const boost::system::error_code& err, const SharedBuffer& buffer);
 
     void readNextCommand();
 
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 7d1f53e..63173cb 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -215,6 +215,9 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const
     connect->set_client_version(_PULSAR_VERSION_);
     connect->set_auth_method_name(authentication->getAuthMethodName());
     connect->set_protocol_version(ProtocolVersion_MAX);
+
+    FeatureFlags* flags = connect->mutable_feature_flags();
+    flags->set_supports_auth_refresh(true);
     if (connectingThroughProxy) {
         Url logicalAddressUrl;
         Url::parse(logicalAddress, logicalAddressUrl);
@@ -228,6 +231,23 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const
     return writeMessageWithSize(cmd);
 }
 
+SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication) {
+    BaseCommand cmd;
+    cmd.set_type(BaseCommand::AUTH_RESPONSE);
+    CommandAuthResponse* authResponse = cmd.mutable_authresponse();
+    authResponse->set_client_version(_PULSAR_VERSION_);
+
+    AuthData* authData = authResponse->mutable_response();
+    authData->set_auth_method_name(authentication->getAuthMethodName());
+
+    AuthenticationDataPtr authDataContent;
+    if (authentication->getAuthData(authDataContent) == ResultOk && authDataContent->hasDataFromCommand()) {
+        authData->set_auth_data(authDataContent->getCommandData());
+    }
+
+    return writeMessageWithSize(cmd);
+}
+
 SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription,
                                     uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType,
                                     const std::string& consumerName, SubscriptionMode subscriptionMode,
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index 411fec7..0d1410a 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -71,6 +71,8 @@ class Commands {
     static SharedBuffer newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress,
                                    bool connectingThroughProxy);
 
+    static SharedBuffer newAuthResponse(const AuthenticationPtr& authentication);
+
     static SharedBuffer newPartitionMetadataRequest(const std::string& topic, uint64_t requestId);
 
     static SharedBuffer newLookup(const std::string& topic, const bool authoritative, uint64_t requestId);