You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/12/08 18:50:40 UTC

[kudu] branch master updated: rpc: plumb JWTs into the RPC layer

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new ccecc51f0 rpc: plumb JWTs into the RPC layer
ccecc51f0 is described below

commit ccecc51f07d9d4698e6104c9cb75939753c14061
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Thu Apr 21 16:58:06 2022 -0700

    rpc: plumb JWTs into the RPC layer
    
    This patch plumbs JWT verification into our C++ RPC negotiation.
    It is limited in the sense that JWTs can be sent over encrypted
    channels where the authenticity of the server is not verified yet
    -- this should be addressed before using in production.
    
    Co-authored-by: Zoltan Chovan <zc...@cloudera.com>
    
    Change-Id: I252f1e597d9df4408379c3b695f266dbd7f48dcc
    Reviewed-on: http://gerrit.cloudera.org:8080/18469
    Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <al...@apache.org>
---
 src/kudu/client/client.proto       |  3 ++
 src/kudu/rpc/client_negotiation.cc | 28 ++++++++++++++++++
 src/kudu/rpc/client_negotiation.h  |  5 ++++
 src/kudu/rpc/messenger.cc          |  4 ++-
 src/kudu/rpc/messenger.h           | 15 ++++++++++
 src/kudu/rpc/negotiation-test.cc   | 16 ++++++----
 src/kudu/rpc/negotiation.cc        |  6 +++-
 src/kudu/rpc/negotiation.h         |  1 +
 src/kudu/rpc/rpc_header.proto      | 14 +++++++++
 src/kudu/rpc/server_negotiation.cc | 60 +++++++++++++++++++++++++++++++++++++-
 src/kudu/rpc/server_negotiation.h  |  5 ++++
 src/kudu/security/token.proto      |  5 ++++
 src/kudu/util/jwt-util.cc          | 11 +++++++
 src/kudu/util/jwt-util.h           | 22 ++++++++++++++
 src/kudu/util/jwt.h                | 50 +++++++++++++++++++++++++++++++
 15 files changed, 236 insertions(+), 9 deletions(-)

diff --git a/src/kudu/client/client.proto b/src/kudu/client/client.proto
index b56465d9e..05d5deaf8 100644
--- a/src/kudu/client/client.proto
+++ b/src/kudu/client/client.proto
@@ -211,4 +211,7 @@ message AuthenticationCredentialsPB {
 
   // Trusted root CA certificates.
   repeated bytes ca_cert_ders = 2;
+
+  // A JWT to be verified by the master.
+  optional security.JwtRawPB jwt = 4;
 }
diff --git a/src/kudu/rpc/client_negotiation.cc b/src/kudu/rpc/client_negotiation.cc
index 09c5357f6..8fecbecba 100644
--- a/src/kudu/rpc/client_negotiation.cc
+++ b/src/kudu/rpc/client_negotiation.cc
@@ -118,6 +118,7 @@ static Status StatusFromRpcError(const ErrorStatusPB& error) {
 ClientNegotiation::ClientNegotiation(unique_ptr<Socket> socket,
                                      const security::TlsContext* tls_context,
                                      std::optional<security::SignedTokenPB> authn_token,
+                                     std::optional<security::JwtRawPB> jwt,
                                      RpcEncryption encryption,
                                      bool encrypt_loopback,
                                      std::string sasl_proto_name)
@@ -129,6 +130,7 @@ ClientNegotiation::ClientNegotiation(unique_ptr<Socket> socket,
       tls_negotiated_(false),
       encrypt_loopback_(encrypt_loopback),
       authn_token_(std::move(authn_token)),
+      jwt_(std::move(jwt)),
       psecret_(nullptr, std::free),
       negotiated_authn_(AuthenticationType::INVALID),
       negotiated_mech_(SaslMechanism::INVALID),
@@ -224,6 +226,9 @@ Status ClientNegotiation::Negotiate(unique_ptr<ErrorStatusPB>* rpc_error) {
     case AuthenticationType::TOKEN:
       RETURN_NOT_OK(AuthenticateByToken(&recv_buf, rpc_error));
       break;
+    case AuthenticationType::JWT:
+      RETURN_NOT_OK(AuthenticateByJwt(&recv_buf, rpc_error));
+      break;
     case AuthenticationType::CERTIFICATE:
       // The TLS handshake has already authenticated the server.
       break;
@@ -349,6 +354,11 @@ Status ClientNegotiation::SendNegotiate() {
     msg.add_authn_types()->mutable_token();
   }
 
+  if (jwt_) {
+    // TODO(zchovan): make sure that we are using a trusted certificate
+    msg.add_authn_types()->mutable_jwt();
+  }
+
   if (PREDICT_FALSE(msg.authn_types().empty())) {
     return Status::NotAuthorized("client is not configured with an authentication type");
   }
@@ -400,6 +410,9 @@ Status ClientNegotiation::HandleNegotiate(const NegotiatePB& response) {
         }
         negotiated_authn_ = AuthenticationType::TOKEN;
         return Status::OK();
+      case AuthenticationTypePB::kJwt:
+        negotiated_authn_ = AuthenticationType::JWT;
+        return Status::OK();
       case AuthenticationTypePB::kCertificate:
         if (!tls_context_->has_signed_cert()) {
           return Status::RuntimeError(
@@ -541,6 +554,21 @@ Status ClientNegotiation::AuthenticateBySasl(faststring* recv_buf,
   return HandleSaslSuccess(success);
 }
 
+Status ClientNegotiation::AuthenticateByJwt(faststring* recv_buf,
+                                            unique_ptr<ErrorStatusPB>* rpc_error) {
+  NegotiatePB pb;
+  pb.set_step(NegotiatePB::JWT_EXCHANGE);
+  *pb.mutable_jwt_raw() = std::move(*jwt_);
+  RETURN_NOT_OK(SendNegotiatePB(pb));
+  pb.Clear();
+  RETURN_NOT_OK(RecvNegotiatePB(&pb, recv_buf, rpc_error));
+  if (pb.step() != NegotiatePB::JWT_EXCHANGE) {
+    return Status::NotAuthorized("expected JWT_EXCHANGE step",
+                                 NegotiatePB::NegotiateStep_Name(pb.step()));
+  }
+  return Status::OK();
+}
+
 Status ClientNegotiation::AuthenticateByToken(faststring* recv_buf,
                                               unique_ptr<ErrorStatusPB>* rpc_error) {
   // Sanity check that TLS has been negotiated. Sending the token on an
diff --git a/src/kudu/rpc/client_negotiation.h b/src/kudu/rpc/client_negotiation.h
index 4481bb59f..2fd2ffb68 100644
--- a/src/kudu/rpc/client_negotiation.h
+++ b/src/kudu/rpc/client_negotiation.h
@@ -65,6 +65,7 @@ class ClientNegotiation {
   ClientNegotiation(std::unique_ptr<Socket> socket,
                     const security::TlsContext* tls_context,
                     std::optional<security::SignedTokenPB> authn_token,
+                    std::optional<security::JwtRawPB> jwt,
                     security::RpcEncryption encryption,
                     bool encrypt_loopback,
                     std::string sasl_proto_name);
@@ -186,6 +187,9 @@ class ClientNegotiation {
   Status AuthenticateByToken(faststring* recv_buf,
                              std::unique_ptr<ErrorStatusPB> *rpc_error) WARN_UNUSED_RESULT;
 
+  Status AuthenticateByJwt(faststring* recv_buf,
+                           std::unique_ptr<ErrorStatusPB>* rpc_error) WARN_UNUSED_RESULT;
+
   // Send an SASL_INITIATE message to the server.
   // Returns:
   //  Status::OK if the SASL_SUCCESS message is expected next.
@@ -234,6 +238,7 @@ class ClientNegotiation {
 
   // TSK state.
   std::optional<security::SignedTokenPB> authn_token_;
+  std::optional<security::JwtRawPB> jwt_;
 
   // Authentication state.
   std::string plain_auth_user_;
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index af8d72412..c2585f181 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -22,6 +22,7 @@
 #include <mutex>
 #include <ostream>
 #include <string>
+#include <type_traits>
 #include <utility>
 
 #include <glog/logging.h>
@@ -33,7 +34,7 @@
 #include "kudu/rpc/acceptor_pool.h"
 #include "kudu/rpc/connection_id.h"
 #include "kudu/rpc/inbound_call.h"
-#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/outbound_call.h" // IWYU pragma: keep
 #include "kudu/rpc/reactor.h"
 #include "kudu/rpc/remote_method.h"
 #include "kudu/rpc/rpc_header.pb.h"
@@ -319,6 +320,7 @@ Messenger::Messenger(const MessengerBuilder &bld)
                                             bld.rpc_tls_min_protocol_,
                                             bld.rpc_tls_excluded_protocols_)),
       token_verifier_(new security::TokenVerifier),
+      jwt_verifier_(nullptr),
       rpcz_store_(new RpczStore),
       metric_entity_(bld.metric_entity_),
       rpc_negotiation_timeout_ms_(bld.rpc_negotiation_timeout_ms_),
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index d3ac4e149..9fb77fb0e 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -42,6 +42,7 @@
 
 namespace kudu {
 
+class JwtVerifier;
 class Socket;
 class ThreadPool;
 
@@ -376,6 +377,9 @@ class Messenger {
     return token_verifier_;
   }
 
+  const JwtVerifier* jwt_verifier() const { return jwt_verifier_.get(); }
+  JwtVerifier* mutable_jwt_verifier() { return jwt_verifier_.get(); }
+
   std::optional<security::SignedTokenPB> authn_token() const {
     std::lock_guard<simple_spinlock> l(authn_token_lock_);
     return authn_token_;
@@ -385,6 +389,15 @@ class Messenger {
     authn_token_ = token;
   }
 
+  std::optional<security::JwtRawPB> jwt() const {
+    std::lock_guard<simple_spinlock> l(authn_token_lock_);
+    return jwt_;
+  }
+  void set_jwt(const security::JwtRawPB& token) {
+    std::lock_guard<simple_spinlock> l(authn_token_lock_);
+    jwt_ = token;
+  }
+
   security::RpcAuthentication authentication() const { return authentication_; }
   security::RpcEncryption encryption() const { return encryption_; }
   bool loopback_encryption() const { return loopback_encryption_; }
@@ -502,10 +515,12 @@ class Messenger {
 
   // A TokenVerifier, which can verify client provided authentication tokens.
   std::shared_ptr<security::TokenVerifier> token_verifier_;
+  std::shared_ptr<JwtVerifier> jwt_verifier_;
 
   // An optional token, which can be used to authenticate to a server.
   mutable simple_spinlock authn_token_lock_;
   std::optional<security::SignedTokenPB> authn_token_;
+  std::optional<security::JwtRawPB> jwt_;
 
   std::unique_ptr<RpczStore> rpcz_store_;
 
diff --git a/src/kudu/rpc/negotiation-test.cc b/src/kudu/rpc/negotiation-test.cc
index 44d6baac4..58d4b9cbb 100644
--- a/src/kudu/rpc/negotiation-test.cc
+++ b/src/kudu/rpc/negotiation-test.cc
@@ -254,12 +254,14 @@ TEST_P(TestNegotiation, TestNegotiation) {
   ClientNegotiation client_negotiation(std::move(client_socket),
                                        &client_tls_context,
                                        authn_token,
+                                       std::nullopt,
                                        desc.client.encryption,
                                        desc.rpc_encrypt_loopback,
                                        "kudu");
   ServerNegotiation server_negotiation(std::move(server_socket),
                                        &server_tls_context,
                                        &token_verifier,
+                                       nullptr,
                                        desc.server.encryption,
                                        desc.rpc_encrypt_loopback,
                                        "kudu");
@@ -396,6 +398,8 @@ TEST_P(TestNegotiation, TestNegotiation) {
       case AuthenticationType::TOKEN:
         EXPECT_EQ("client-token", remote_user.username());
         break;
+      case AuthenticationType::JWT:
+        break;
       case AuthenticationType::INVALID: LOG(FATAL) << "invalid authentication negotiated";
     }
   }
@@ -1039,7 +1043,7 @@ static void RunGSSAPINegotiationServer(unique_ptr<Socket> socket,
   CHECK_OK(tls_context.Init());
   TokenVerifier token_verifier;
   ServerNegotiation server_negotiation(std::move(socket), &tls_context,
-                                       &token_verifier, RpcEncryption::OPTIONAL,
+                                       &token_verifier, nullptr, RpcEncryption::OPTIONAL,
                                        /* encrypt_loopback */ false, "kudu");
   server_negotiation.set_server_fqdn("127.0.0.1");
   CHECK_OK(server_negotiation.EnableGSSAPI());
@@ -1053,7 +1057,7 @@ static void RunGSSAPINegotiationClient(unique_ptr<Socket> conn,
   TlsContext tls_context;
   CHECK_OK(tls_context.Init());
   ClientNegotiation client_negotiation(std::move(conn), &tls_context,
-                                       nullopt, RpcEncryption::OPTIONAL,
+                                       nullopt, nullopt, RpcEncryption::OPTIONAL,
                                        /* encrypt_loopback */ false, "kudu");
   client_negotiation.set_server_fqdn("127.0.0.1");
   CHECK_OK(client_negotiation.EnableGSSAPI());
@@ -1217,7 +1221,7 @@ static void RunTimeoutExpectingServer(unique_ptr<Socket> socket) {
   CHECK_OK(tls_context.Init());
   TokenVerifier token_verifier;
   ServerNegotiation server_negotiation(std::move(socket), &tls_context,
-                                       &token_verifier, RpcEncryption::OPTIONAL,
+                                       &token_verifier, nullptr, RpcEncryption::OPTIONAL,
                                        /* encrypt_loopback */ false, "kudu");
   CHECK_OK(server_negotiation.EnablePlain());
   Status s = server_negotiation.Negotiate();
@@ -1229,7 +1233,7 @@ static void RunTimeoutNegotiationClient(unique_ptr<Socket> sock) {
   TlsContext tls_context;
   CHECK_OK(tls_context.Init());
   ClientNegotiation client_negotiation(std::move(sock), &tls_context,
-                                       nullopt, RpcEncryption::OPTIONAL,
+                                       nullopt, nullopt, RpcEncryption::OPTIONAL,
                                        /* encrypt_loopback */ false, "kudu");
   CHECK_OK(client_negotiation.EnablePlain("test", "test"));
   MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L);
@@ -1251,7 +1255,7 @@ static void RunTimeoutNegotiationServer(unique_ptr<Socket> socket) {
   CHECK_OK(tls_context.Init());
   TokenVerifier token_verifier;
   ServerNegotiation server_negotiation(std::move(socket), &tls_context,
-                                       &token_verifier, RpcEncryption::OPTIONAL,
+                                       &token_verifier, nullptr, RpcEncryption::OPTIONAL,
                                        /* encrypt_loopback */ false, "kudu");
   CHECK_OK(server_negotiation.EnablePlain());
   MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L);
@@ -1265,7 +1269,7 @@ static void RunTimeoutExpectingClient(unique_ptr<Socket> socket) {
   TlsContext tls_context;
   CHECK_OK(tls_context.Init());
   ClientNegotiation client_negotiation(std::move(socket), &tls_context,
-                                       nullopt, RpcEncryption::OPTIONAL,
+                                       nullopt, nullopt, RpcEncryption::OPTIONAL,
                                        /* encrypt_loopback */ false, "kudu");
   CHECK_OK(client_negotiation.EnablePlain("test", "test"));
   Status s = client_negotiation.Negotiate();
diff --git a/src/kudu/rpc/negotiation.cc b/src/kudu/rpc/negotiation.cc
index 8e65545a8..5dc1a7f23 100644
--- a/src/kudu/rpc/negotiation.cc
+++ b/src/kudu/rpc/negotiation.cc
@@ -17,11 +17,11 @@
 
 #include "kudu/rpc/negotiation.h"
 
+#include <ctime>
 #include <poll.h>
 #include <sys/socket.h>
 
 #include <cerrno>
-#include <ctime>
 #include <memory>
 #include <optional>
 #include <ostream>
@@ -43,6 +43,7 @@
 #include "kudu/rpc/server_negotiation.h"
 #include "kudu/rpc/user_credentials.h"
 #include "kudu/security/tls_context.h"
+#include "kudu/security/token.pb.h"
 #include "kudu/util/errno.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
@@ -86,6 +87,7 @@ const char* AuthenticationTypeToString(AuthenticationType t) {
     case AuthenticationType::SASL: return "SASL"; break;
     case AuthenticationType::TOKEN: return "TOKEN"; break;
     case AuthenticationType::CERTIFICATE: return "CERTIFICATE"; break;
+    case AuthenticationType::JWT: return "JWT"; break;
   }
   return "<cannot reach here>";
 }
@@ -176,6 +178,7 @@ static Status DoClientNegotiation(Connection* conn,
   ClientNegotiation client_negotiation(conn->release_socket(),
                                        &messenger->tls_context(),
                                        authn_token,
+                                       messenger->jwt(),
                                        encryption,
                                        encrypt_loopback,
                                        messenger->sasl_proto_name());
@@ -259,6 +262,7 @@ static Status DoServerNegotiation(Connection* conn,
   ServerNegotiation server_negotiation(conn->release_socket(),
                                        &messenger->tls_context(),
                                        &messenger->token_verifier(),
+                                       messenger->jwt_verifier(),
                                        encryption,
                                        encrypt_loopback,
                                        messenger->sasl_proto_name());
diff --git a/src/kudu/rpc/negotiation.h b/src/kudu/rpc/negotiation.h
index 8f48e7145..d1574b450 100644
--- a/src/kudu/rpc/negotiation.h
+++ b/src/kudu/rpc/negotiation.h
@@ -35,6 +35,7 @@ enum class AuthenticationType {
   SASL,
   TOKEN,
   CERTIFICATE,
+  JWT,
 };
 const char* AuthenticationTypeToString(AuthenticationType t);
 
diff --git a/src/kudu/rpc/rpc_header.proto b/src/kudu/rpc/rpc_header.proto
index 03bce1e4a..f6dff26eb 100644
--- a/src/kudu/rpc/rpc_header.proto
+++ b/src/kudu/rpc/rpc_header.proto
@@ -100,6 +100,7 @@ message AuthenticationTypePB {
   message Sasl {};
   message Token {};
   message Certificate {};
+  message Jwt {};
 
   oneof type {
     // The server and client mutually authenticate via SASL.
@@ -116,6 +117,11 @@ message AuthenticationTypePB {
     //
     // Certificate authentication requires the connection to be TLS encrypted.
     Certificate certificate = 3;
+
+    // The server authenticates the client via a JSON web token.
+    //
+    // Requires the connection to be TLS encrypted.
+    Jwt jwt = 4;
   }
 }
 
@@ -130,6 +136,7 @@ message NegotiatePB {
     SASL_RESPONSE  = 4;
     TLS_HANDSHAKE  = 5;
     TOKEN_EXCHANGE = 6;
+    JWT_EXCHANGE   = 7;
   }
 
   message SaslMechanism {
@@ -185,6 +192,10 @@ message NegotiatePB {
 
   // During the TOKEN_EXCHANGE step, contains the client's signed authentication token.
   optional security.SignedTokenPB authn_token = 8;
+
+  // During the JWT_EXCHANGE step, contains the client's JWT used for
+  // authentication.
+  optional security.JwtRawPB jwt_raw = 10;
 }
 
 message RemoteMethodPB {
@@ -336,6 +347,9 @@ message ErrorStatusPB {
     // negotiation failed, and the client should obtain a new authn token and
     // try to reconnect.
     FATAL_INVALID_AUTHENTICATION_TOKEN = 16;
+
+    // The JWT is invalid or expired.
+    FATAL_INVALID_JWT = 18;
   }
 
   required string message = 1;
diff --git a/src/kudu/rpc/server_negotiation.cc b/src/kudu/rpc/server_negotiation.cc
index 332a30534..17beb8848 100644
--- a/src/kudu/rpc/server_negotiation.cc
+++ b/src/kudu/rpc/server_negotiation.cc
@@ -53,6 +53,7 @@
 #include "kudu/util/faststring.h"
 #include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/jwt.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
@@ -156,6 +157,7 @@ static int ServerNegotiationPlainAuthCb(sasl_conn_t* conn,
 ServerNegotiation::ServerNegotiation(unique_ptr<Socket> socket,
                                      const security::TlsContext* tls_context,
                                      const security::TokenVerifier* token_verifier,
+                                     const JwtVerifier* jwt_verifier,
                                      RpcEncryption encryption,
                                      bool encrypt_loopback,
                                      std::string sasl_proto_name)
@@ -167,6 +169,7 @@ ServerNegotiation::ServerNegotiation(unique_ptr<Socket> socket,
       tls_negotiated_(false),
       encrypt_loopback_(encrypt_loopback),
       token_verifier_(token_verifier),
+      jwt_verifier_(jwt_verifier),
       negotiated_authn_(AuthenticationType::INVALID),
       negotiated_mech_(SaslMechanism::INVALID),
       sasl_proto_name_(std::move(sasl_proto_name)),
@@ -281,6 +284,9 @@ Status ServerNegotiation::Negotiate() {
     case AuthenticationType::TOKEN:
       RETURN_NOT_OK(AuthenticateByToken(&recv_buf));
       break;
+    case AuthenticationType::JWT:
+      RETURN_NOT_OK(AuthenticateByJwt(&recv_buf));
+      break;
     case AuthenticationType::CERTIFICATE:
       RETURN_NOT_OK(AuthenticateByCertificate());
       break;
@@ -307,7 +313,7 @@ Status ServerNegotiation::PreflightCheckGSSAPI(const std::string& sasl_proto_nam
   // We aren't going to actually send/receive any messages, but
   // this makes it easier to reuse the initialization code.
   ServerNegotiation server(
-      nullptr, nullptr, nullptr, RpcEncryption::OPTIONAL, false, sasl_proto_name);
+      nullptr, nullptr, nullptr, nullptr, RpcEncryption::OPTIONAL, false, sasl_proto_name);
   Status s = server.EnableGSSAPI();
   if (!s.ok()) {
     return Status::RuntimeError(s.message());
@@ -494,6 +500,11 @@ Status ServerNegotiation::HandleNegotiate(const NegotiatePB& request) {
             authn_types.insert(AuthenticationType::CERTIFICATE);
           }
           break;
+        case AuthenticationTypePB::kJwt:
+          if (jwt_verifier_) {
+            authn_types.insert(AuthenticationType::JWT);
+          }
+          break;
         case AuthenticationTypePB::TYPE_NOT_SET: {
           Sockaddr addr;
           RETURN_NOT_OK(socket_->GetPeerAddress(&addr));
@@ -529,6 +540,9 @@ Status ServerNegotiation::HandleNegotiate(const NegotiatePB& request) {
     // TODO(KUDU-1924): consider adding the TSK sequence number to the authentication
     // message.
     negotiated_authn_ = AuthenticationType::TOKEN;
+  } else if (ContainsKey(authn_types, AuthenticationType::JWT) &&
+             encryption_ != RpcEncryption::DISABLED) {
+    negotiated_authn_ = AuthenticationType::JWT;
   } else {
     // Otherwise we always can fallback to SASL.
     DCHECK(ContainsKey(authn_types, AuthenticationType::SASL));
@@ -578,6 +592,9 @@ Status ServerNegotiation::HandleNegotiate(const NegotiatePB& request) {
       }
       break;
     }
+    case AuthenticationType::JWT:
+      response.add_authn_types()->mutable_jwt();
+      break;
     case AuthenticationType::INVALID: LOG(FATAL) << "unreachable";
   }
 
@@ -685,6 +702,47 @@ Status ServerNegotiation::AuthenticateBySasl(faststring* recv_buf) {
   return Status::OK();
 }
 
+Status ServerNegotiation::AuthenticateByJwt(faststring* recv_buf) {
+  // Sanity check that TLS has been negotiated. Receiving the token on an
+  // unencrypted channel is a big no-no.
+  DCHECK(tls_negotiated_);
+  if (!tls_negotiated_) {
+    return Status::IllegalState("Attempting to authenticate using JWT on an unencrypted channel");
+  }
+
+  if (!jwt_verifier_) {
+    return Status::IllegalState("No JWT verifier available");
+  }
+
+  // Receive the token from the client.
+  NegotiatePB pb;
+  RETURN_NOT_OK(RecvNegotiatePB(&pb, recv_buf));
+
+  if (PREDICT_FALSE(pb.step() != NegotiatePB::JWT_EXCHANGE)) {
+    auto s = Status::NotAuthorized("expected JWT_EXCHANGE step",
+                                   NegotiatePB::NegotiateStep_Name(pb.step()));
+    RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+    return s;
+  }
+  if (!pb.has_jwt_raw()) {
+    Status s = Status::NotAuthorized("JWT_EXCHANGE message must include a JWT");
+    RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_INVALID_JWT, s));
+    return s;
+  }
+  string subject;
+  auto result = jwt_verifier_->VerifyToken(pb.jwt_raw().jwt_data(), &subject);
+  if (result.ok()) {
+    authenticated_user_.SetAuthenticatedByToken(std::move(subject));
+  } else {
+    auto s = Status::NotAuthorized(result.message());
+    RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_INVALID_JWT, s));
+    return s;
+  }
+  pb.Clear();
+  pb.set_step(NegotiatePB::JWT_EXCHANGE);
+  return SendNegotiatePB(pb);
+}
+
 Status ServerNegotiation::AuthenticateByToken(faststring* recv_buf) {
   // Sanity check that TLS has been negotiated. Receiving the token on an
   // unencrypted channel is a big no-no.
diff --git a/src/kudu/rpc/server_negotiation.h b/src/kudu/rpc/server_negotiation.h
index 1f7319c15..9339fb53b 100644
--- a/src/kudu/rpc/server_negotiation.h
+++ b/src/kudu/rpc/server_negotiation.h
@@ -42,6 +42,7 @@
 
 namespace kudu {
 
+class JwtVerifier;
 class Sockaddr;
 class faststring;
 
@@ -65,6 +66,7 @@ class ServerNegotiation {
   ServerNegotiation(std::unique_ptr<Socket> socket,
                     const security::TlsContext* tls_context,
                     const security::TokenVerifier* token_verifier,
+                    const JwtVerifier* jwt_verifier,
                     security::RpcEncryption encryption,
                     bool encrypt_loopback,
                     std::string sasl_proto_name);
@@ -192,6 +194,8 @@ class ServerNegotiation {
   // 'recv_buf' allows a receive buffer to be reused.
   Status AuthenticateByToken(faststring* recv_buf) WARN_UNUSED_RESULT;
 
+  Status AuthenticateByJwt(faststring* recv_buf) WARN_UNUSED_RESULT;
+
   // Authenticate the client using the client's TLS certificate. Populates the
   // 'authenticated_user_' field with the certificate's subject.
   Status AuthenticateByCertificate() WARN_UNUSED_RESULT;
@@ -234,6 +238,7 @@ class ServerNegotiation {
 
   // TSK state.
   const security::TokenVerifier* token_verifier_;
+  const JwtVerifier* jwt_verifier_;
 
   // The set of features supported by the client and server. Filled in during negotiation.
   std::set<RpcFeatureFlag> client_features_;
diff --git a/src/kudu/security/token.proto b/src/kudu/security/token.proto
index ea63ce29c..52f014524 100644
--- a/src/kudu/security/token.proto
+++ b/src/kudu/security/token.proto
@@ -90,6 +90,11 @@ message TokenPB {
   }
 };
 
+// JSON Web Token: a wrapper to pass around a JWT as is.
+message JwtRawPB {
+  optional bytes jwt_data = 1;
+}
+
 message SignedTokenPB {
   // The actual token data. This is a serialized TokenPB protobuf. However, we use a
   // 'bytes' field, since protobuf doesn't guarantee that if two implementations serialize
diff --git a/src/kudu/util/jwt-util.cc b/src/kudu/util/jwt-util.cc
index bd928f37d..19c149095 100644
--- a/src/kudu/util/jwt-util.cc
+++ b/src/kudu/util/jwt-util.cc
@@ -925,4 +925,15 @@ Status JWTHelper::GetCustomClaimUsername(const JWTDecodedToken* decoded_token,
   return Status::OK();
 }
 
+Status KeyBasedJwtVerifier::Init() {
+  return jwt_->Init(jwks_uri_, is_local_file_);
+}
+
+Status KeyBasedJwtVerifier::VerifyToken(const string& bytes_raw, string* subject) const {
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  RETURN_NOT_OK(JWTHelper::Decode(bytes_raw, decoded_token));
+  *subject = decoded_token->decoded_jwt_.get_subject();
+  return Status::OK();
+}
+
 } // namespace kudu
diff --git a/src/kudu/util/jwt-util.h b/src/kudu/util/jwt-util.h
index 76b7f211a..a4f96839a 100644
--- a/src/kudu/util/jwt-util.h
+++ b/src/kudu/util/jwt-util.h
@@ -20,8 +20,11 @@
 
 #include <memory>
 #include <string>
+#include <utility>
 
+#include "kudu/gutil/macros.h"
 #include "kudu/util/jwt-util-internal.h"
+#include "kudu/util/jwt.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
@@ -86,4 +89,23 @@ class JWTHelper {
   std::unique_ptr<JWKSMgr> jwks_mgr_;
 };
 
+// JwtVerifier implementation that uses JWKS to verify tokens.
+class KeyBasedJwtVerifier : public JwtVerifier {
+ public:
+  KeyBasedJwtVerifier(std::string jwks_uri, bool is_local_file)
+      : jwt_(JWTHelper::GetInstance()),
+        jwks_uri_(std::move(jwks_uri)),
+        is_local_file_(is_local_file) {
+  }
+  ~KeyBasedJwtVerifier() override = default;
+  Status Init();
+  Status VerifyToken(const std::string& bytes_raw, std::string* subject) const override;
+ private:
+  JWTHelper* jwt_;
+  std::string jwks_uri_;
+  bool is_local_file_;
+
+  DISALLOW_COPY_AND_ASSIGN(KeyBasedJwtVerifier);
+};
+
 } // namespace kudu
diff --git a/src/kudu/util/jwt.h b/src/kudu/util/jwt.h
new file mode 100644
index 000000000..43bb9d58b
--- /dev/null
+++ b/src/kudu/util/jwt.h
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/status.h"
+
+// Minimal header to avoid having to link heavy-weight modules into libraries.
+
+namespace kudu {
+
+class JwtVerifier {
+ public:
+  virtual ~JwtVerifier() {}
+  // Verifies a JWT, which is passed as bytes_raw, then extracts the subject from the verified
+  // token and returns it by pointer in subject. The returned pointer is owned by the caller.
+  virtual Status VerifyToken(const std::string& bytes_raw, std::string* subject) const = 0;
+};
+
+// Minimal implementation of a JWT verifier to be used when a more full-fledged
+// implementation is not available.
+class SimpleJwtVerifier : public JwtVerifier {
+ public:
+  SimpleJwtVerifier() = default;
+  ~SimpleJwtVerifier() override = default;
+  Status VerifyToken(const std::string&  /*bytes_raw*/,
+                     std::string*  /*subject*/) const override {
+    return Status::NotAuthorized("JWT verification not configured");
+  }
+ private:
+  DISALLOW_COPY_AND_ASSIGN(SimpleJwtVerifier);
+};
+
+} // namespace kudu