You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2023/02/03 04:00:25 UTC

[kudu] branch master updated: jwt: determine discovery endpoint from token

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new b1bc36fe9 jwt: determine discovery endpoint from token
b1bc36fe9 is described below

commit b1bc36fe9f235737ef2d2b50bbc093f09a034ae0
Author: Zoltan Chovan <zc...@cloudera.com>
AuthorDate: Fri Jan 20 18:29:24 2023 +0100

    jwt: determine discovery endpoint from token
    
    This patch introduces a JWT verifier that manages multiple JWKSs,
    depending on an account ID included in a given JWT's payload.
    
    The current implementation is somewhat crude, simply instantiating new
    JWTHelpers per account ID, thereby creating new threads per account.
    Follow-on work will be required to ensure scalability (though it's
    unclear how many account IDs to expect in a typical deployment).
    
    Co-authored-by: Andrew Wong <aw...@apache.org>
    
    Change-Id: I970bcc6d894c0206160196418d549b71c35ac973
    Reviewed-on: http://gerrit.cloudera.org:8080/18472
    Tested-by: Kudu Jenkins
    Reviewed-by: Attila Bukor <ab...@apache.org>
    Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
    Reviewed-by: Alexey Serbin <al...@apache.org>
---
 src/kudu/rpc/negotiation.cc        |   4 +-
 src/kudu/rpc/server_negotiation.cc |   2 +-
 src/kudu/rpc/server_negotiation.h  |   4 +-
 src/kudu/server/server_base.cc     |   6 +
 src/kudu/util/jwt-util-test.cc     | 226 ++++++++++++++++++++++++++++++++++++-
 src/kudu/util/jwt-util.cc          | 102 ++++++++++++++++-
 src/kudu/util/jwt-util.h           |  38 ++++++-
 7 files changed, 371 insertions(+), 11 deletions(-)

diff --git a/src/kudu/rpc/negotiation.cc b/src/kudu/rpc/negotiation.cc
index 5dc1a7f23..8f5a0cf5d 100644
--- a/src/kudu/rpc/negotiation.cc
+++ b/src/kudu/rpc/negotiation.cc
@@ -243,7 +243,7 @@ static Status DoServerNegotiation(Connection* conn,
                                   RpcEncryption encryption,
                                   bool encrypt_loopback,
                                   const MonoTime& deadline) {
-  const auto* messenger = conn->reactor_thread()->reactor()->messenger();
+  auto* messenger = conn->reactor_thread()->reactor()->messenger();
   if (authentication == RpcAuthentication::REQUIRED &&
       messenger->keytab_file().empty() &&
       !messenger->tls_context().is_external_cert()) {
@@ -262,7 +262,7 @@ static Status DoServerNegotiation(Connection* conn,
   ServerNegotiation server_negotiation(conn->release_socket(),
                                        &messenger->tls_context(),
                                        &messenger->token_verifier(),
-                                       messenger->jwt_verifier(),
+                                       messenger->mutable_jwt_verifier(),
                                        encryption,
                                        encrypt_loopback,
                                        messenger->sasl_proto_name());
diff --git a/src/kudu/rpc/server_negotiation.cc b/src/kudu/rpc/server_negotiation.cc
index 17beb8848..f0eb9fef5 100644
--- a/src/kudu/rpc/server_negotiation.cc
+++ b/src/kudu/rpc/server_negotiation.cc
@@ -157,7 +157,7 @@ static int ServerNegotiationPlainAuthCb(sasl_conn_t* conn,
 ServerNegotiation::ServerNegotiation(unique_ptr<Socket> socket,
                                      const security::TlsContext* tls_context,
                                      const security::TokenVerifier* token_verifier,
-                                     const JwtVerifier* jwt_verifier,
+                                     JwtVerifier* jwt_verifier,
                                      RpcEncryption encryption,
                                      bool encrypt_loopback,
                                      std::string sasl_proto_name)
diff --git a/src/kudu/rpc/server_negotiation.h b/src/kudu/rpc/server_negotiation.h
index 9339fb53b..86b11e12c 100644
--- a/src/kudu/rpc/server_negotiation.h
+++ b/src/kudu/rpc/server_negotiation.h
@@ -66,7 +66,7 @@ class ServerNegotiation {
   ServerNegotiation(std::unique_ptr<Socket> socket,
                     const security::TlsContext* tls_context,
                     const security::TokenVerifier* token_verifier,
-                    const JwtVerifier* jwt_verifier,
+                    JwtVerifier* jwt_verifier,
                     security::RpcEncryption encryption,
                     bool encrypt_loopback,
                     std::string sasl_proto_name);
@@ -238,7 +238,7 @@ class ServerNegotiation {
 
   // TSK state.
   const security::TokenVerifier* token_verifier_;
-  const JwtVerifier* jwt_verifier_;
+  JwtVerifier* jwt_verifier_;
 
   // The set of features supported by the client and server. Filled in during negotiation.
   std::set<RpcFeatureFlag> client_features_;
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index a83207a1a..14db0f359 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -272,6 +272,12 @@ DEFINE_string(jwks_url, "",
     "URL of the JSON Web Key Set (JWKS) for JWT verification.");
 TAG_FLAG(jwks_url, experimental);
 
+DEFINE_string(jwks_discovery_endpoint_base, "",
+              "Base URL of the Discovery Endpoint that points to a JSON Web Key Set "
+              "(JWKS) for JWT verification. Additional query parameters, like 'accountId', "
+              "are taken from received JWTs to get the appropriate Discovery Endpoint.");
+TAG_FLAG(jwks_discovery_endpoint_base, experimental);
+
 DECLARE_bool(use_hybrid_clock);
 DECLARE_int32(dns_resolver_max_threads_num);
 DECLARE_uint32(dns_resolver_cache_capacity_mb);
diff --git a/src/kudu/util/jwt-util-test.cc b/src/kudu/util/jwt-util-test.cc
index ff3b9fddb..9ccc23cf0 100644
--- a/src/kudu/util/jwt-util-test.cc
+++ b/src/kudu/util/jwt-util-test.cc
@@ -24,21 +24,28 @@
 #include <iostream>
 #include <memory>
 #include <string>
+#include <thread>
 #include <type_traits>
+#include <unordered_map>
+#include <utility>
 #include <vector>
 
+#include <glog/logging.h>
 #include <gtest/gtest.h>
 #include <jwt-cpp/jwt.h>
 #include <jwt-cpp/traits/kazuho-picojson/defaults.h>
 #include <jwt-cpp/traits/kazuho-picojson/traits.h>
 
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/server/webserver.h"
 #include "kudu/server/webserver_options.h"
+#include "kudu/util/countdown_latch.h"
 #include "kudu/util/env.h"
 #include "kudu/util/jwt-util-internal.h"
 #include "kudu/util/jwt_test_certs.h"
 #include "kudu/util/net/sockaddr.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
@@ -48,6 +55,7 @@ namespace kudu {
 
 using std::string;
 using std::unique_ptr;
+using std::unordered_map;
 using std::vector;
 using strings::Substitute;
 
@@ -828,8 +836,10 @@ TEST(JwtUtilTest, VerifyJwtFailExpiredToken) {
 
 namespace {
 
-void JWKSHandler(const Webserver::WebRequest& /*req*/,
-                 Webserver::PrerenderedWebResponse* resp) {
+// Returns a simple JWKS to be used by tokens signed by 'rsa_pub_key_pem' and
+// 'rsa_priv_key_pem'.
+void SimpleJWKSHandler(const Webserver::WebRequest& /*req*/,
+                       Webserver::PrerenderedWebResponse* resp) {
   resp->output <<
       Substitute(kJwksRsaFileFormat, kKid1, "RS256",
           kRsaPubKeyJwkN, kRsaPubKeyJwkE, kKid2, "RS256", kRsaInvalidPubKeyJwkN,
@@ -839,12 +849,14 @@ void JWKSHandler(const Webserver::WebRequest& /*req*/,
 
 class JWKSMockServer {
  public:
+  // Registers a path handler for a single JWKS to be used by tokens signed by
+  // 'rsa_pub_key_pem' and 'rsa_priv_key_pem'.
   Status Start() {
     WebserverOptions opts;
     opts.port = 0;
     opts.bind_interface = "127.0.0.1";
     webserver_.reset(new Webserver(opts));
-    webserver_->RegisterPrerenderedPathHandler("/jwks", "JWKS", JWKSHandler,
+    webserver_->RegisterPrerenderedPathHandler("/jwks", "JWKS", SimpleJWKSHandler,
                                                /*is_styled*/false, /*is_on_nav_bar*/false);
     RETURN_NOT_OK(webserver_->Start());
     vector<Sockaddr> addrs;
@@ -853,9 +865,39 @@ class JWKSMockServer {
     return Status::OK();
   }
 
+  // Register a path handler for every account ID in 'account_id_to_resp' that
+  // returns the correspodning HTTP response.
+  Status StartWithAccounts(const unordered_map<string, string>& account_id_to_resp) {
+    DCHECK(!webserver_);
+    WebserverOptions opts;
+    opts.port = 0;
+    opts.bind_interface = "127.0.0.1";
+    webserver_.reset(new Webserver(opts));
+    for (const auto& ar : account_id_to_resp) {
+      const auto& account_id = ar.first;
+      const auto& jwks = ar.second;
+      webserver_->RegisterPrerenderedPathHandler(Substitute("/jwks/$0", account_id), account_id,
+          [account_id, jwks] (const Webserver::WebRequest& /*req*/,
+                              Webserver::PrerenderedWebResponse* resp) {
+            resp->output << jwks;
+            resp->status_code = HttpStatusCode::Ok;
+          },
+          /*is_styled*/false, /*is_on_nav_bar*/false);
+    }
+    RETURN_NOT_OK(webserver_->Start());
+    vector<Sockaddr> addrs;
+    RETURN_NOT_OK(webserver_->GetBoundAddresses(&addrs));
+    url_ = Substitute("http://$0/jwks", addrs[0].ToString());
+    return Status::OK();
+  }
+
   const string& url() const {
     return url_;
   }
+
+  string url_for_account(const string& account_id) const {
+    return Substitute("$0/$1", url_, account_id);
+  }
  private:
   unique_ptr<Webserver> webserver_;
   string url_;
@@ -884,9 +926,187 @@ TEST(JwtUtilTest, VerifyJWKSUrl) {
       "klbfyYn_ghqjL7ihY2ECaZzZ0Utw",
       encoded_token);
   KeyBasedJwtVerifier jwt_verifier(jwks_server.url(), /*is_local_file*/false);
+  ASSERT_OK(jwt_verifier.Init());
   string subject;
   ASSERT_OK(jwt_verifier.VerifyToken(encoded_token, &subject));
   ASSERT_EQ("kudu", subject);
 }
 
+namespace {
+
+// $0: account_id
+// $1: jwks_uri
+const char kDiscoveryFormat[] = R"({
+    "issuer": "auth0/$0",
+    "token_endpoint": "dummy.endpoint.com",
+    "response_types_supported": [
+        "id_token"
+    ],
+    "claims_supported": [
+        "sub",
+        "aud",
+        "iss",
+        "exp"
+    ],
+    "subject_types_supported": [
+        "public"
+    ],
+    "id_token_signing_alg_values_supported": [
+        "RS256"
+    ],
+    "jwks_uri": "$1"
+})";
+
+void JWKSDiscoveryHandler(const Webserver::WebRequest& req,
+                          Webserver::PrerenderedWebResponse* resp,
+                          const JWKSMockServer& jwks_server) {
+  const auto* account_id = FindOrNull(req.parsed_args, "accountid");
+  if (!account_id) {
+    resp->output << "expected 'accountId' query";
+    resp->status_code = HttpStatusCode::BadRequest;
+    return;
+  }
+  resp->output << Substitute(kDiscoveryFormat, *account_id,
+                             jwks_server.url_for_account(*account_id));
+  resp->status_code = HttpStatusCode::Ok;
+}
+
+const char kValidAccount[] = "new-phone";
+const char kInvalidAccount[] = "who-is-this";
+const char kMissingAccount[] = "no-where";
+
+class JWKSDiscoveryEndpointMockServer {
+ public:
+  Status Start() {
+    unordered_map<string, string> account_id_to_resp({
+        {
+          // Create an account that has valid keys.
+          kValidAccount,
+          Substitute(kJwksRsaFileFormat, kKid1, "RS256",
+              kRsaPubKeyJwkN, kRsaPubKeyJwkE, kKid2, "RS256", kRsaInvalidPubKeyJwkN,
+              kRsaPubKeyJwkE),
+        },
+        {
+          // The keys associated with this account are invalid.
+          kInvalidAccount,
+          Substitute(kJwksRsaFileFormat, kKid1, "RS256",
+              kRsaInvalidPubKeyJwkN, kRsaPubKeyJwkE, kKid2, "RS256",
+              kRsaInvalidPubKeyJwkN, kRsaPubKeyJwkE),
+        },
+    });
+    RETURN_NOT_OK(jwks_server_.StartWithAccounts(account_id_to_resp));
+
+    WebserverOptions opts;
+    opts.port = 0;
+    webserver_.reset(new Webserver(opts));
+    webserver_->RegisterPrerenderedPathHandler(
+        "/.well-known/openid-configuration'", "openid-configuration",
+        // Pass the 'accountId' query arguments to return a response that
+        // points to the JWKS endpoint for the account.
+        [this] (const Webserver::WebRequest& req, Webserver::PrerenderedWebResponse* resp) {
+          JWKSDiscoveryHandler(req, resp, jwks_server_);
+        },
+        /*is_styled*/false, /*is_on_nav_bar*/false);
+    RETURN_NOT_OK(webserver_->Start());
+    vector<Sockaddr> addrs;
+    RETURN_NOT_OK(webserver_->GetBoundAddresses(&addrs));
+    RETURN_NOT_OK(addr_.ParseString("127.0.0.1", addrs[0].port()));
+    url_ = Substitute("http://$0/.well-known/openid-configuration'", addr_.ToString());
+    return Status::OK();
+  }
+
+  const string& url() const {
+    return url_;
+  }
+ private:
+  unique_ptr<Webserver> webserver_;
+  JWKSMockServer jwks_server_;
+  string url_;
+  Sockaddr addr_;
+};
+
+} // anonymous namespace
+
+TEST(JwtUtilTest, VerifyJWKSDiscoveryEndpoint) {
+  JWKSDiscoveryEndpointMockServer discovery_endpoint;
+  ASSERT_OK(discovery_endpoint.Start());
+  PerAccountKeyBasedJwtVerifier jwt_verifier(discovery_endpoint.url());
+  {
+    auto valid_user_token =
+        jwt::create()
+            .set_issuer(Substitute("auth0/$0", kValidAccount))
+            .set_type("JWT")
+            .set_algorithm("RS256")
+            .set_key_id(kKid1)
+            .set_subject(kValidAccount)
+            .sign(jwt::algorithm::rs256(kRsaPubKeyPem, kRsaPrivKeyPem, "", ""));
+    string subject;
+    ASSERT_OK(jwt_verifier.VerifyToken(valid_user_token, &subject));
+    ASSERT_EQ(kValidAccount, subject);
+  }
+  {
+    auto invalid_user_token =
+        jwt::create()
+            .set_issuer(Substitute("auth0/$0", kInvalidAccount))
+            .set_type("JWT")
+            .set_algorithm("RS256")
+            .set_key_id(kKid1)
+            .set_subject(kInvalidAccount)
+            .sign(jwt::algorithm::rs256(kRsaPubKeyPem, kRsaPrivKeyPem, "", ""));
+    string subject;
+    Status s = jwt_verifier.VerifyToken(invalid_user_token, &subject);
+    ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+  }
+  {
+    auto missing_user_token =
+        jwt::create()
+            .set_issuer(Substitute("auth0/$0", kMissingAccount))
+            .set_type("JWT")
+            .set_algorithm("RS256")
+            .set_key_id(kKid1)
+            .set_subject(kMissingAccount)
+            .sign(jwt::algorithm::rs256(kRsaPubKeyPem, kRsaPrivKeyPem, "", ""));
+    string subject;
+    Status s = jwt_verifier.VerifyToken(missing_user_token, &subject);
+    ASSERT_TRUE(s.IsRemoteError()) << s.ToString();
+  }
+}
+
+TEST(JwtUtilTest, VerifyJWKSDiscoveryEndpointMultipleClients) {
+  JWKSDiscoveryEndpointMockServer discovery_endpoint;
+  ASSERT_OK(discovery_endpoint.Start());
+  PerAccountKeyBasedJwtVerifier jwt_verifier(discovery_endpoint.url());
+  {
+    auto valid_user_token =
+        jwt::create()
+            .set_issuer(Substitute("auth0/$0", kValidAccount))
+            .set_type("JWT")
+            .set_algorithm("RS256")
+            .set_key_id(kKid1)
+            .set_subject(kValidAccount)
+            .sign(jwt::algorithm::rs256(kRsaPubKeyPem, kRsaPrivKeyPem, "", ""));
+
+    int constexpr n = 8;
+    std::vector<std::thread> threads;
+    threads.reserve(n);
+    CountDownLatch latch(n);
+
+    for (int i = 0; i < n; i++) {
+      threads.emplace_back([&](){
+        string subject;
+        CHECK_OK(jwt_verifier.VerifyToken(valid_user_token, &subject));
+        CHECK_EQ(kValidAccount, subject);
+        latch.CountDown();
+      });
+    }
+
+    latch.Wait();
+    SCOPED_CLEANUP({
+      for (auto& t : threads) {
+        t.join();
+      }
+    });
+  }
+}
+
 } // namespace kudu
diff --git a/src/kudu/util/jwt-util.cc b/src/kudu/util/jwt-util.cc
index 19c149095..78280ae1e 100644
--- a/src/kudu/util/jwt-util.cc
+++ b/src/kudu/util/jwt-util.cc
@@ -26,7 +26,6 @@
 #include <openssl/pem.h>
 #include <openssl/ssl.h>
 #include <openssl/x509.h>
-
 #include <sys/stat.h>
 
 #include <cerrno>
@@ -43,6 +42,7 @@
 #include <typeinfo>
 #include <unordered_map>
 #include <utility>
+#include <vector>
 
 #include <gflags/gflags.h>
 #include <glog/logging.h>
@@ -55,8 +55,10 @@
 #include <rapidjson/rapidjson.h>
 
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/escaping.h"
+#include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/curl_util.h"
 #include "kudu/util/faststring.h"
@@ -765,6 +767,9 @@ void JWKSMgr::SetJWKSSnapshot(const JWKSSnapshotPtr& new_jwks) {
 // JWTHelper member functions.
 //
 
+JWTHelper::~JWTHelper() {
+}
+
 struct JWTHelper::JWTDecodedToken {
   explicit JWTDecodedToken(DecodedJWT  decoded_jwt) : decoded_jwt_(std::move(decoded_jwt)) {}
   DecodedJWT decoded_jwt_;
@@ -932,6 +937,101 @@ Status KeyBasedJwtVerifier::Init() {
 Status KeyBasedJwtVerifier::VerifyToken(const string& bytes_raw, string* subject) const {
   JWTHelper::UniqueJWTDecodedToken decoded_token;
   RETURN_NOT_OK(JWTHelper::Decode(bytes_raw, decoded_token));
+  RETURN_NOT_OK(jwt_->Verify(decoded_token.get()));
+  if (!decoded_token->decoded_jwt_.has_subject()) {
+    return Status::InvalidArgument("token does not include subject");
+  }
+  *subject = decoded_token->decoded_jwt_.get_subject();
+  return Status::OK();
+}
+
+Status PerAccountKeyBasedJwtVerifier::JWTHelperForToken(const JWTHelper::JWTDecodedToken& token,
+                                                        JWTHelper** helper) const {
+  if (!token.decoded_jwt_.has_issuer()) {
+    return Status::InvalidArgument("Expected token to have 'issuer' field");
+  }
+
+  // Parse the account ID from the 'iss' field of the JWT. If we already have a
+  // JWTHelper for it, use it.
+  const auto& issuer = token.decoded_jwt_.get_issuer();
+  std::vector<string> issuer_pieces = strings::Split(issuer, "/");
+  CHECK(!issuer_pieces.empty());
+  const auto& account_id = issuer_pieces.back();
+
+  {
+    const std::lock_guard<simple_spinlock> l(jwt_by_account_id_map_lock_);
+    const auto* unique_helper = FindOrNull(jwt_by_account_id_, account_id);
+
+    if (unique_helper) {
+      *helper = unique_helper->get();
+      return Status::OK();
+    }
+  }
+
+  // Otherwise, use the Discovery Endpoint to determine what 'jwks_uri' to use.
+  kudu::EasyCurl curl;
+  kudu::faststring dst;
+  const auto discovery_endpoint = Substitute("$0?accountId=$1", discovery_base_, account_id);
+  curl.set_timeout(
+      kudu::MonoDelta::FromSeconds(static_cast<int64_t>(FLAGS_jwks_pulling_timeout_s)));
+  curl.set_verify_peer(false);
+  RETURN_NOT_OK_PREPEND(curl.FetchURL(discovery_endpoint, &dst),
+      Substitute("Error downloading contents of Discovery Endpoint from '$0'", discovery_endpoint));
+  string jwks_uri;
+
+  if (dst.size() <= 0) {
+    return Status::RuntimeError("Discovery Endpoint returned an empty document");
+  }
+
+  dst.push_back('\0');
+  Document endpoint_doc;
+  endpoint_doc.Parse(reinterpret_cast<char*>(dst.data()));
+#define RETURN_INVALID_IF(stmt, msg)     \
+  if (PREDICT_FALSE(stmt)) {             \
+    return Status::InvalidArgument(msg); \
+  }
+
+  RETURN_INVALID_IF(endpoint_doc.HasParseError(), GetParseError_En(endpoint_doc.GetParseError()));
+  RETURN_INVALID_IF(!endpoint_doc.IsObject(), "root element must be a JSON Object");
+  auto jwks_uri_member = endpoint_doc.FindMember("jwks_uri");
+  RETURN_INVALID_IF(jwks_uri_member == endpoint_doc.MemberEnd(), "jwks_uri is required");
+  RETURN_INVALID_IF(!jwks_uri_member->value.IsString(), "jwks_uri must be a string");
+  jwks_uri = string(jwks_uri_member->value.GetString());
+#undef RETURN_INVALID_IF
+
+  // TODO(zchovan): this implementation expects there to be a small number of
+  // accounts, as it creates a JWKS refresh thread for each account. Group the
+  // refreshes into a single thread or threadpool.
+  auto new_helper = std::make_shared<JWTHelper>();
+  RETURN_NOT_OK_PREPEND(new_helper->Init(jwks_uri, /*is_local_file*/ false),
+                        "Error initializing JWT helper");
+
+  {
+    const std::lock_guard<simple_spinlock> l(jwt_by_account_id_map_lock_);
+    LookupOrEmplace(&jwt_by_account_id_, account_id, std::move(new_helper));
+    *helper = FindPointeeOrNull(jwt_by_account_id_, account_id);
+  }
+
+  return Status::OK();
+}
+
+Status PerAccountKeyBasedJwtVerifier::Init() {
+  for (auto& [account_id, verifier] : jwt_by_account_id_) {
+    verifier->Init(Substitute("$0?accountId=$1", discovery_base_, account_id),
+                   /*is_local_file*/false);
+  }
+  return Status::OK();
+}
+
+Status PerAccountKeyBasedJwtVerifier::VerifyToken(const string& bytes_raw, string* subject) const {
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  RETURN_NOT_OK(JWTHelper::Decode(bytes_raw, decoded_token));
+  JWTHelper* jwt;
+  RETURN_NOT_OK(JWTHelperForToken(*decoded_token, &jwt));
+  RETURN_NOT_OK(jwt->Verify(decoded_token.get()));
+  if (!decoded_token->decoded_jwt_.has_subject()) {
+    return Status::InvalidArgument("token does not include subject");
+  }
   *subject = decoded_token->decoded_jwt_.get_subject();
   return Status::OK();
 }
diff --git a/src/kudu/util/jwt-util.h b/src/kudu/util/jwt-util.h
index a761372ea..c98ce2b58 100644
--- a/src/kudu/util/jwt-util.h
+++ b/src/kudu/util/jwt-util.h
@@ -20,14 +20,17 @@
 
 #include <memory>
 #include <string>
+#include <unordered_map>
 #include <utility>
 
 #include "kudu/gutil/macros.h"
-#include "kudu/util/jwt-util-internal.h"
 #include "kudu/util/jwt.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
+class JWKSMgr;
+class JWKSSnapshot;
 
 // JSON Web Token (JWT) is an Internet proposed standard for creating data with optional
 // signature and/or optional encryption whose payload holds JSON that asserts some
@@ -52,6 +55,10 @@ class JWTHelper {
   // facilitate automatic reference counting.
   typedef std::unique_ptr<JWTDecodedToken, TokenDeleter> UniqueJWTDecodedToken;
 
+  // Define our destructor elsewhere so JWKSMgr's destructor is called from the
+  // correct compilation unit.
+  ~JWTHelper();
+
   // Return the single instance.
   static JWTHelper* GetInstance() { return jwt_helper_; }
 
@@ -85,7 +92,7 @@ class JWTHelper {
   bool initialized_ = false;
 
   // JWKS Manager for Json Web Token (JWT) verification.
-  // Only one instance per daemon.
+  // Only one instance per helper.
   std::unique_ptr<JWKSMgr> jwks_mgr_;
 };
 
@@ -108,4 +115,31 @@ class KeyBasedJwtVerifier : public JwtVerifier {
   DISALLOW_COPY_AND_ASSIGN(KeyBasedJwtVerifier);
 };
 
+class PerAccountKeyBasedJwtVerifier : public JwtVerifier {
+ public:
+  explicit PerAccountKeyBasedJwtVerifier(std::string  jwks_uri)
+      : discovery_base_(std::move(jwks_uri)) {}
+
+  ~PerAccountKeyBasedJwtVerifier() override = default;
+
+  Status Init() override;
+
+  Status VerifyToken(const std::string& bytes_raw, std::string* subject) const override;
+
+ private:
+  // Gets the appropriate JWTHelper, based on the token and source mode.
+  // Returns an error if the token doesn't contain the appropriate fields.
+  Status JWTHelperForToken(const JWTHelper::JWTDecodedToken& token, JWTHelper** helper) const;
+
+  std::string discovery_base_;
+  // Marked as mutable so that PerAccountKeyBasedJwtVerifier::JWTHelperForToken is able to emplace
+  // new JWTHelpers in it.
+  mutable std::unordered_map<std::string, std::shared_ptr<JWTHelper>> jwt_by_account_id_;
+
+  // Protects jwt_by_account_id_map
+  mutable simple_spinlock jwt_by_account_id_map_lock_;
+
+  DISALLOW_COPY_AND_ASSIGN(PerAccountKeyBasedJwtVerifier);
+};
+
 } // namespace kudu