You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 09:53:11 UTC

[pulsar] 05/10: Cpp oauth2 auth client (#7467)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7905aca413f3f16d3e4ffece6a51af4162b23f56
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Wed Jul 8 21:25:19 2020 -0500

    Cpp oauth2 auth client (#7467)
    
    ### Motivation
    
    #7420 provides an Oauth2 auth client for java. This PR tries to support it in cpp client
    
    ### Modifications
    
    - add implementation
    - add related tests.
    
    (cherry picked from commit 2d0ccebab1dce4c657821fb78a08f4a0bfca2454)
---
 pulsar-client-cpp/include/pulsar/Authentication.h  | 107 +++++++-
 .../include/pulsar/ClientConfiguration.h           |   2 +-
 pulsar-client-cpp/lib/Authentication.cc            |   7 +
 pulsar-client-cpp/lib/ClientConfiguration.cc       |   2 +-
 pulsar-client-cpp/lib/auth/AuthAthenz.cc           |   2 +-
 pulsar-client-cpp/lib/auth/AuthOauth2.cc           | 271 +++++++++++++++++++++
 pulsar-client-cpp/lib/auth/AuthOauth2.h            |  74 ++++++
 pulsar-client-cpp/lib/auth/AuthTls.cc              |   2 +-
 pulsar-client-cpp/lib/auth/AuthToken.cc            |   2 +-
 pulsar-client-cpp/tests/AuthPluginTest.cc          |  45 ++++
 10 files changed, 505 insertions(+), 9 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/Authentication.h b/pulsar-client-cpp/include/pulsar/Authentication.h
index efa8d9f..57a3e70 100644
--- a/pulsar-client-cpp/include/pulsar/Authentication.h
+++ b/pulsar-client-cpp/include/pulsar/Authentication.h
@@ -56,7 +56,7 @@ class PULSAR_PUBLIC Authentication {
    public:
     virtual ~Authentication();
     virtual const std::string getAuthMethodName() const = 0;
-    virtual Result getAuthData(AuthenticationDataPtr& authDataContent) const {
+    virtual Result getAuthData(AuthenticationDataPtr& authDataContent) {
         authDataContent = authData_;
         return ResultOk;
     }
@@ -107,7 +107,7 @@ class PULSAR_PUBLIC AuthTls : public Authentication {
     static AuthenticationPtr create(const std::string& authParamsString);
     static AuthenticationPtr create(const std::string& certificatePath, const std::string& privateKeyPath);
     const std::string getAuthMethodName() const;
-    Result getAuthData(AuthenticationDataPtr& authDataTls) const;
+    Result getAuthData(AuthenticationDataPtr& authDataTls);
 
    private:
     AuthenticationDataPtr authDataTls_;
@@ -144,7 +144,7 @@ class PULSAR_PUBLIC AuthToken : public Authentication {
     static AuthenticationPtr create(const TokenSupplier& tokenSupplier);
 
     const std::string getAuthMethodName() const;
-    Result getAuthData(AuthenticationDataPtr& authDataToken) const;
+    Result getAuthData(AuthenticationDataPtr& authDataToken);
 
    private:
     AuthenticationDataPtr authDataToken_;
@@ -160,12 +160,111 @@ class PULSAR_PUBLIC AuthAthenz : public Authentication {
     static AuthenticationPtr create(ParamMap& params);
     static AuthenticationPtr create(const std::string& authParamsString);
     const std::string getAuthMethodName() const;
-    Result getAuthData(AuthenticationDataPtr& authDataAthenz) const;
+    Result getAuthData(AuthenticationDataPtr& authDataAthenz);
 
    private:
     AuthenticationDataPtr authDataAthenz_;
 };
 
+// OAuth 2.0 token and associated information.
+// currently mainly works for access token
+class Oauth2TokenResult {
+   public:
+    enum
+    {
+        undefined_expiration = -1
+    };
+
+    Oauth2TokenResult();
+    ~Oauth2TokenResult();
+
+    Oauth2TokenResult& setAccessToken(const std::string& accessToken);
+    Oauth2TokenResult& setIdToken(const std::string& idToken);
+    Oauth2TokenResult& setRefreshToken(const std::string& refreshToken);
+    Oauth2TokenResult& setExpiresIn(const int64_t expiresIn);
+
+    const std::string& getAccessToken() const;
+    const std::string& getIdToken() const;
+    const std::string& getRefreshToken() const;
+    int64_t getExpiresIn() const;
+
+   private:
+    // map to json "access_token"
+    std::string accessToken_;
+    // map to json "id_token"
+    std::string idToken_;
+    // map to json "refresh_token"
+    std::string refreshToken_;
+    // map to json "expires_in"
+    int64_t expiresIn_;
+};
+
+typedef std::shared_ptr<Oauth2TokenResult> Oauth2TokenResultPtr;
+
+class Oauth2Flow {
+   public:
+    virtual ~Oauth2Flow();
+
+    /**
+     * Initializes the authorization flow.
+     */
+    virtual void initialize() = 0;
+
+    /**
+     * Acquires an access token from the OAuth 2.0 authorization server.
+     * @return a token result including an access token.
+     */
+    virtual Oauth2TokenResultPtr authenticate() = 0;
+
+    /**
+     * Closes the authorization flow.
+     */
+    virtual void close() = 0;
+
+   protected:
+    Oauth2Flow();
+};
+
+typedef std::shared_ptr<Oauth2Flow> FlowPtr;
+
+class CachedToken {
+   public:
+    ~CachedToken();
+    virtual bool isExpired() = 0;
+    virtual AuthenticationDataPtr getAuthData() = 0;
+
+   protected:
+    CachedToken();
+};
+
+typedef std::shared_ptr<CachedToken> CachedTokenPtr;
+
+/**
+ * Oauth2 based implementation of Pulsar client authentication.
+ * Passed in parameter would be like:
+ * ```
+ *   "type": "client_credentials",
+ *   "issuer_url": "https://accounts.google.com",
+ *   "client_id": "d9ZyX97q1ef8Cr81WHVC4hFQ64vSlDK3",
+ *   "client_secret": "on1uJ...k6F6R",
+ *   "audience": "https://broker.example.com"
+ *  ```
+ */
+class PULSAR_PUBLIC AuthOauth2 : public Authentication {
+   public:
+    AuthOauth2(ParamMap& params);
+    ~AuthOauth2();
+
+    static AuthenticationPtr create(ParamMap& params);
+    static AuthenticationPtr create(const std::string& authParamsString);
+    const std::string getAuthMethodName() const;
+    Result getAuthData(AuthenticationDataPtr& authDataOauth2);
+
+   private:
+    FlowPtr flowPtr_;
+    CachedTokenPtr cachedTokenPtr_;
+};
+
 }  // namespace pulsar
 
 #endif /* PULSAR_AUTHENTICATION_H_ */
diff --git a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
index e65aecc..9bb63d4 100644
--- a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
@@ -43,7 +43,7 @@ class PULSAR_PUBLIC ClientConfiguration {
     /**
      * @return the authentication data
      */
-    const Authentication& getAuth() const;
+    Authentication& getAuth() const;
 
     /**
      * Set timeout on client operations (subscribe, create producer, close, unsubscribe)
diff --git a/pulsar-client-cpp/lib/Authentication.cc b/pulsar-client-cpp/lib/Authentication.cc
index f552b53..105d1c3 100644
--- a/pulsar-client-cpp/lib/Authentication.cc
+++ b/pulsar-client-cpp/lib/Authentication.cc
@@ -22,6 +22,7 @@
 #include "auth/AuthTls.h"
 #include "auth/AuthAthenz.h"
 #include "auth/AuthToken.h"
+#include "auth/AuthOauth2.h"
 #include <lib/LogUtils.h>
 
 #include <string>
@@ -125,6 +126,9 @@ AuthenticationPtr tryCreateBuiltinAuth(const std::string& pluginName, ParamMap&
     } else if (boost::iequals(pluginName, ATHENZ_PLUGIN_NAME) ||
                boost::iequals(pluginName, ATHENZ_JAVA_PLUGIN_NAME)) {
         return AuthAthenz::create(paramMap);
+    } else if (boost::iequals(pluginName, OAUTH2_TOKEN_PLUGIN_NAME) ||
+               boost::iequals(pluginName, OAUTH2_TOKEN_JAVA_PLUGIN_NAME)) {
+        return AuthOauth2::create(paramMap);
     } else {
         return AuthenticationPtr();
     }
@@ -139,6 +143,9 @@ AuthenticationPtr tryCreateBuiltinAuth(const std::string& pluginName, const std:
     } else if (boost::iequals(pluginName, ATHENZ_PLUGIN_NAME) ||
                boost::iequals(pluginName, ATHENZ_JAVA_PLUGIN_NAME)) {
         return AuthAthenz::create(authParamsString);
+    } else if (boost::iequals(pluginName, OAUTH2_TOKEN_PLUGIN_NAME) ||
+               boost::iequals(pluginName, OAUTH2_TOKEN_JAVA_PLUGIN_NAME)) {
+        return AuthOauth2::create(authParamsString);
     } else {
         return AuthenticationPtr();
     }
diff --git a/pulsar-client-cpp/lib/ClientConfiguration.cc b/pulsar-client-cpp/lib/ClientConfiguration.cc
index 1733dde..ad210f6 100644
--- a/pulsar-client-cpp/lib/ClientConfiguration.cc
+++ b/pulsar-client-cpp/lib/ClientConfiguration.cc
@@ -36,7 +36,7 @@ ClientConfiguration& ClientConfiguration::setAuth(const AuthenticationPtr& authe
     return *this;
 }
 
-const Authentication& ClientConfiguration::getAuth() const { return *impl_->authenticationPtr; }
+Authentication& ClientConfiguration::getAuth() const { return *impl_->authenticationPtr; }
 
 const AuthenticationPtr& ClientConfiguration::getAuthPtr() const { return impl_->authenticationPtr; }
 
diff --git a/pulsar-client-cpp/lib/auth/AuthAthenz.cc b/pulsar-client-cpp/lib/auth/AuthAthenz.cc
index 1e6c8fe..3141fb3 100644
--- a/pulsar-client-cpp/lib/auth/AuthAthenz.cc
+++ b/pulsar-client-cpp/lib/auth/AuthAthenz.cc
@@ -91,7 +91,7 @@ AuthenticationPtr AuthAthenz::create(ParamMap& params) {
 
 const std::string AuthAthenz::getAuthMethodName() const { return "athenz"; }
 
-Result AuthAthenz::getAuthData(AuthenticationDataPtr& authDataContent) const {
+Result AuthAthenz::getAuthData(AuthenticationDataPtr& authDataContent) {
     authDataContent = authDataAthenz_;
     return ResultOk;
 }
diff --git a/pulsar-client-cpp/lib/auth/AuthOauth2.cc b/pulsar-client-cpp/lib/auth/AuthOauth2.cc
new file mode 100644
index 0000000..3104c4d
--- /dev/null
+++ b/pulsar-client-cpp/lib/auth/AuthOauth2.cc
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <lib/auth/AuthOauth2.h>
+
+#include <curl/curl.h>
+#include <sstream>
+#include <boost/property_tree/json_parser.hpp>
+#include <boost/property_tree/ptree.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+
+#include <lib/LogUtils.h>
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+// AuthDataOauth2
+
+AuthDataOauth2::AuthDataOauth2(const std::string& accessToken) { accessToken_ = accessToken; }
+
+AuthDataOauth2::~AuthDataOauth2() {}
+
+bool AuthDataOauth2::hasDataForHttp() { return true; }
+
+std::string AuthDataOauth2::getHttpHeaders() { return "Authorization: Bearer " + accessToken_; }
+
+bool AuthDataOauth2::hasDataFromCommand() { return true; }
+
+std::string AuthDataOauth2::getCommandData() { return accessToken_; }
+
+// Oauth2TokenResult
+
+Oauth2TokenResult::Oauth2TokenResult() { expiresIn_ = undefined_expiration; }
+
+Oauth2TokenResult::~Oauth2TokenResult() {}
+
+Oauth2TokenResult& Oauth2TokenResult::setAccessToken(const std::string& accessToken) {
+    accessToken_ = accessToken;
+    return *this;
+}
+
+Oauth2TokenResult& Oauth2TokenResult::setIdToken(const std::string& idToken) {
+    idToken_ = idToken;
+    return *this;
+}
+
+Oauth2TokenResult& Oauth2TokenResult::setRefreshToken(const std::string& refreshToken) {
+    refreshToken_ = refreshToken;
+    return *this;
+}
+
+Oauth2TokenResult& Oauth2TokenResult::setExpiresIn(const int64_t expiresIn) {
+    expiresIn_ = expiresIn;
+    return *this;
+}
+
+const std::string& Oauth2TokenResult::getAccessToken() const { return accessToken_; }
+
+const std::string& Oauth2TokenResult::getIdToken() const { return idToken_; }
+
+const std::string& Oauth2TokenResult::getRefreshToken() const { return refreshToken_; }
+
+int64_t Oauth2TokenResult::getExpiresIn() const { return expiresIn_; }
+
+// CachedToken
+
+CachedToken::CachedToken() {}
+
+CachedToken::~CachedToken() {}
+
+// Oauth2CachedToken
+
+static int64_t currentTimeMillis() {
+    using namespace boost::posix_time;
+    using boost::posix_time::milliseconds;
+    using boost::posix_time::seconds;
+    static ptime time_t_epoch(boost::gregorian::date(1970, 1, 1));
+
+    time_duration diff = microsec_clock::universal_time() - time_t_epoch;
+    return diff.total_milliseconds();
+}
+
+Oauth2CachedToken::Oauth2CachedToken(Oauth2TokenResultPtr token) {
+    latest_ = token;
+
+    int64_t expiredIn = token->getExpiresIn();
+    if (expiredIn > 0) {
+        expiresAt_ = expiredIn + currentTimeMillis();
+    } else {
+        throw "ExpiresIn in Oauth2TokenResult invalid value: " + expiredIn;
+    }
+    authData_ = AuthenticationDataPtr(new AuthDataOauth2(token->getAccessToken()));
+}
+
+AuthenticationDataPtr Oauth2CachedToken::getAuthData() { return authData_; }
+
+Oauth2CachedToken::~Oauth2CachedToken() {}
+
+bool Oauth2CachedToken::isExpired() { return expiresAt_ < currentTimeMillis(); }
+
+// OauthFlow
+
+Oauth2Flow::Oauth2Flow() {}
+Oauth2Flow::~Oauth2Flow() {}
+
+// ClientCredentialFlow
+
+ClientCredentialFlow::ClientCredentialFlow(const std::string& issuerUrl, const std::string& clientId,
+                                           const std::string& clientSecret, const std::string& audience) {
+    issuerUrl_ = issuerUrl;
+    clientId_ = clientId;
+    clientSecret_ = clientSecret;
+    audience_ = audience;
+}
+
+void ClientCredentialFlow::initialize() {}
+void ClientCredentialFlow::close() {}
+
+static size_t curlWriteCallback(void* contents, size_t size, size_t nmemb, void* responseDataPtr) {
+    ((std::string*)responseDataPtr)->append((char*)contents, size * nmemb);
+    return size * nmemb;
+}
+
+Oauth2TokenResultPtr ClientCredentialFlow::authenticate() {
+    Oauth2TokenResultPtr resultPtr = Oauth2TokenResultPtr(new Oauth2TokenResult());
+
+    CURL* handle = curl_easy_init();
+    CURLcode res;
+    std::string responseData;
+
+    // set header: json, request type: post
+    struct curl_slist* list = NULL;
+    list = curl_slist_append(list, "Content-Type: application/json");
+    curl_easy_setopt(handle, CURLOPT_HTTPHEADER, list);
+    curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "POST");
+
+    // set URL: issuerUrl
+    curl_easy_setopt(handle, CURLOPT_URL, issuerUrl_.c_str());
+
+    // Write callback
+    curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlWriteCallback);
+    curl_easy_setopt(handle, CURLOPT_WRITEDATA, &responseData);
+
+    // New connection is made for each call
+    curl_easy_setopt(handle, CURLOPT_FRESH_CONNECT, 1L);
+    curl_easy_setopt(handle, CURLOPT_FORBID_REUSE, 1L);
+
+    curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
+    curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 0L);
+    curl_easy_setopt(handle, CURLOPT_SSL_VERIFYHOST, 0L);
+
+    // fill in the request data
+    boost::property_tree::ptree pt;
+    pt.put("grant_type", "client_credentials");
+    pt.put("client_id", clientId_);
+    pt.put("client_secret", clientSecret_);
+    pt.put("audience", audience_);
+
+    std::stringstream ss;
+    boost::property_tree::json_parser::write_json(ss, pt);
+    std::string ssString = ss.str();
+
+    curl_easy_setopt(handle, CURLOPT_POSTFIELDS, ssString.c_str());
+
+    // Make get call to server
+    res = curl_easy_perform(handle);
+
+    LOG_DEBUG("issuerUrl_ " << issuerUrl_ << " clientid: " << clientId_ << " client_secret " << clientSecret_
+                            << " audience " << audience_ << " ssstring " << ssString);
+
+    switch (res) {
+        case CURLE_OK:
+            long response_code;
+            curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code);
+            LOG_DEBUG("Response received for issuerurl " << issuerUrl_ << " code " << response_code);
+            if (response_code == 200) {
+                boost::property_tree::ptree root;
+                std::stringstream stream;
+                stream << responseData;
+                try {
+                    boost::property_tree::read_json(stream, root);
+                } catch (boost::property_tree::json_parser_error& e) {
+                    LOG_ERROR("Failed to parse json of Oauth2 response: "
+                              << e.what() << "\nInput Json = " << responseData << " passedin: " << ssString);
+                    break;
+                }
+
+                resultPtr->setAccessToken(root.get<std::string>("access_token"));
+                resultPtr->setExpiresIn(root.get<uint32_t>("expires_in"));
+
+                LOG_DEBUG("access_token: " << resultPtr->getAccessToken()
+                                           << " expires_in: " << resultPtr->getExpiresIn());
+            } else {
+                LOG_ERROR("Response failed for issuerurl " << issuerUrl_ << ". response Code "
+                                                           << response_code << " passedin: " << ssString);
+            }
+            break;
+        default:
+            LOG_ERROR("Response failed for issuerurl " << issuerUrl_ << ". Error Code " << res
+                                                       << " passedin: " << ssString);
+            break;
+    }
+    // Free header list
+    curl_slist_free_all(list);
+    curl_easy_cleanup(handle);
+
+    return resultPtr;
+}
+
+// AuthOauth2
+
+AuthOauth2::AuthOauth2(ParamMap& params) {
+    flowPtr_ = FlowPtr(new ClientCredentialFlow(params["issuer_url"], params["client_id"],
+                                                params["client_secret"], params["audience"]));
+}
+
+AuthOauth2::~AuthOauth2() {}
+
+ParamMap parseJsonAuthParamsString(const std::string& authParamsString) {
+    ParamMap params;
+    if (!authParamsString.empty()) {
+        boost::property_tree::ptree root;
+        std::stringstream stream;
+        stream << authParamsString;
+        try {
+            boost::property_tree::read_json(stream, root);
+            for (const auto& item : root) {
+                params[item.first] = item.second.get_value<std::string>();
+            }
+        } catch (boost::property_tree::json_parser_error& e) {
+            LOG_ERROR("Invalid String Error: " << e.what());
+        }
+    }
+    return params;
+}
+
+AuthenticationPtr AuthOauth2::create(const std::string& authParamsString) {
+    ParamMap params = parseJsonAuthParamsString(authParamsString);
+
+    return create(params);
+}
+
+AuthenticationPtr AuthOauth2::create(ParamMap& params) { return AuthenticationPtr(new AuthOauth2(params)); }
+
+const std::string AuthOauth2::getAuthMethodName() const { return "token"; }
+
+Result AuthOauth2::getAuthData(AuthenticationDataPtr& authDataContent) {
+    if (cachedTokenPtr_ == nullptr || cachedTokenPtr_->isExpired()) {
+        cachedTokenPtr_ = CachedTokenPtr(new Oauth2CachedToken(flowPtr_->authenticate()));
+    }
+
+    authDataContent = cachedTokenPtr_->getAuthData();
+    return ResultOk;
+}
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/auth/AuthOauth2.h b/pulsar-client-cpp/lib/auth/AuthOauth2.h
new file mode 100644
index 0000000..0090976
--- /dev/null
+++ b/pulsar-client-cpp/lib/auth/AuthOauth2.h
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <pulsar/Authentication.h>
+#include <string>
+#include <boost/function.hpp>
+
+namespace pulsar {
+
+const std::string OAUTH2_TOKEN_PLUGIN_NAME = "oauth2token";
+const std::string OAUTH2_TOKEN_JAVA_PLUGIN_NAME =
+    "org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2";
+
+class ClientCredentialFlow : public Oauth2Flow {
+   public:
+    ClientCredentialFlow(const std::string& issuerUrl, const std::string& clientId,
+                         const std::string& clientSecret, const std::string& audience);
+    void initialize();
+    Oauth2TokenResultPtr authenticate();
+    void close();
+
+   private:
+    std::string issuerUrl_;
+    std::string clientId_;
+    std::string clientSecret_;
+    std::string audience_;
+};
+
+class Oauth2CachedToken : public CachedToken {
+   public:
+    Oauth2CachedToken(Oauth2TokenResultPtr token);
+    ~Oauth2CachedToken();
+    bool isExpired();
+    AuthenticationDataPtr getAuthData();
+
+   private:
+    int64_t expiresAt_;
+    Oauth2TokenResultPtr latest_;
+    AuthenticationDataPtr authData_;
+};
+
+class AuthDataOauth2 : public AuthenticationDataProvider {
+   public:
+    AuthDataOauth2(const std::string& accessToken);
+    ~AuthDataOauth2();
+
+    bool hasDataForHttp();
+    std::string getHttpHeaders();
+    bool hasDataFromCommand();
+    std::string getCommandData();
+
+   private:
+    std::string accessToken_;
+};
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/auth/AuthTls.cc b/pulsar-client-cpp/lib/auth/AuthTls.cc
index fcf6571..fdf7f21 100644
--- a/pulsar-client-cpp/lib/auth/AuthTls.cc
+++ b/pulsar-client-cpp/lib/auth/AuthTls.cc
@@ -53,7 +53,7 @@ AuthenticationPtr AuthTls::create(const std::string& certificatePath, const std:
 
 const std::string AuthTls::getAuthMethodName() const { return "tls"; }
 
-Result AuthTls::getAuthData(AuthenticationDataPtr& authDataContent) const {
+Result AuthTls::getAuthData(AuthenticationDataPtr& authDataContent) {
     authDataContent = authDataTls_;
     return ResultOk;
 }
diff --git a/pulsar-client-cpp/lib/auth/AuthToken.cc b/pulsar-client-cpp/lib/auth/AuthToken.cc
index e377139..429f409 100644
--- a/pulsar-client-cpp/lib/auth/AuthToken.cc
+++ b/pulsar-client-cpp/lib/auth/AuthToken.cc
@@ -109,7 +109,7 @@ AuthenticationPtr AuthToken::create(const TokenSupplier &tokenSupplier) {
 
 const std::string AuthToken::getAuthMethodName() const { return "token"; }
 
-Result AuthToken::getAuthData(AuthenticationDataPtr &authDataContent) const {
+Result AuthToken::getAuthData(AuthenticationDataPtr &authDataContent) {
     authDataContent = authDataToken_;
     return ResultOk;
 }
diff --git a/pulsar-client-cpp/tests/AuthPluginTest.cc b/pulsar-client-cpp/tests/AuthPluginTest.cc
index a447a3a..183c880 100644
--- a/pulsar-client-cpp/tests/AuthPluginTest.cc
+++ b/pulsar-client-cpp/tests/AuthPluginTest.cc
@@ -336,3 +336,48 @@ TEST(AuthPluginTest, testAuthFactoryAthenz) {
         }
     }
 }
+
+TEST(AuthPluginTest, testOauth2) {
+    // test success get token from oauth2 server.
+    pulsar::AuthenticationDataPtr data;
+    std::string params = R"({
+        "type": "client_credentials",
+        "issuer_url": "https://dev-kt-aa9ne.us.auth0.com/oauth/token",
+        "client_id": "Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x",
+        "client_secret": "rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ-g07ZH52N_poGAb",
+        "audience": "https://dev-kt-aa9ne.us.auth0.com/api/v2/"})";
+
+    int expectedTokenLength = 3379;
+    LOG_INFO("PARAMS: " << params);
+    pulsar::AuthenticationPtr auth = pulsar::AuthOauth2::create(params);
+    ASSERT_EQ(auth->getAuthMethodName(), "token");
+    ASSERT_EQ(auth->getAuthData(data), pulsar::ResultOk);
+    ASSERT_EQ(data->hasDataForHttp(), true);
+    ASSERT_EQ(data->hasDataFromCommand(), true);
+    ASSERT_EQ(data->getCommandData().length(), expectedTokenLength);
+}
+
+TEST(AuthPluginTest, testOauth2WrongSecret) {
+    try {
+        pulsar::AuthenticationDataPtr data;
+
+        std::string params = R"({
+        "type": "client_credentials",
+        "issuer_url": "https://dev-kt-aa9ne.us.auth0.com/oauth/token",
+        "client_id": "Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x",
+        "client_secret": "rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ",
+        "audience": "https://dev-kt-aa9ne.us.auth0.com/api/v2/"})";
+
+        int expectedTokenLength = 3379;
+        LOG_INFO("PARAMS: " << params);
+        pulsar::AuthenticationPtr auth = pulsar::AuthOauth2::create(params);
+        ASSERT_EQ(auth->getAuthMethodName(), "token");
+
+        auth->getAuthData(data);
+
+        FAIL() << "Expected fail for wrong secret when to get token from server";
+
+    } catch (...) {
+        // expected
+    }
+}