You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/12/08 18:50:40 UTC
[kudu] branch master updated: rpc: plumb JWTs into the RPC layer
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new ccecc51f0 rpc: plumb JWTs into the RPC layer
ccecc51f0 is described below
commit ccecc51f07d9d4698e6104c9cb75939753c14061
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Thu Apr 21 16:58:06 2022 -0700
rpc: plumb JWTs into the RPC layer
This patch plumbs JWT verification into our C++ RPC negotiation.
It is limited in the sense that JWTs can be sent over encrypted
channels where the authenticity of the server is not verified yet
-- this should be addressed before using in production.
Co-authored-by: Zoltan Chovan <zc...@cloudera.com>
Change-Id: I252f1e597d9df4408379c3b695f266dbd7f48dcc
Reviewed-on: http://gerrit.cloudera.org:8080/18469
Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <al...@apache.org>
---
src/kudu/client/client.proto | 3 ++
src/kudu/rpc/client_negotiation.cc | 28 ++++++++++++++++++
src/kudu/rpc/client_negotiation.h | 5 ++++
src/kudu/rpc/messenger.cc | 4 ++-
src/kudu/rpc/messenger.h | 15 ++++++++++
src/kudu/rpc/negotiation-test.cc | 16 ++++++----
src/kudu/rpc/negotiation.cc | 6 +++-
src/kudu/rpc/negotiation.h | 1 +
src/kudu/rpc/rpc_header.proto | 14 +++++++++
src/kudu/rpc/server_negotiation.cc | 60 +++++++++++++++++++++++++++++++++++++-
src/kudu/rpc/server_negotiation.h | 5 ++++
src/kudu/security/token.proto | 5 ++++
src/kudu/util/jwt-util.cc | 11 +++++++
src/kudu/util/jwt-util.h | 22 ++++++++++++++
src/kudu/util/jwt.h | 50 +++++++++++++++++++++++++++++++
15 files changed, 236 insertions(+), 9 deletions(-)
diff --git a/src/kudu/client/client.proto b/src/kudu/client/client.proto
index b56465d9e..05d5deaf8 100644
--- a/src/kudu/client/client.proto
+++ b/src/kudu/client/client.proto
@@ -211,4 +211,7 @@ message AuthenticationCredentialsPB {
// Trusted root CA certificates.
repeated bytes ca_cert_ders = 2;
+
+ // A JWT to be verified by the master.
+ optional security.JwtRawPB jwt = 4;
}
diff --git a/src/kudu/rpc/client_negotiation.cc b/src/kudu/rpc/client_negotiation.cc
index 09c5357f6..8fecbecba 100644
--- a/src/kudu/rpc/client_negotiation.cc
+++ b/src/kudu/rpc/client_negotiation.cc
@@ -118,6 +118,7 @@ static Status StatusFromRpcError(const ErrorStatusPB& error) {
ClientNegotiation::ClientNegotiation(unique_ptr<Socket> socket,
const security::TlsContext* tls_context,
std::optional<security::SignedTokenPB> authn_token,
+ std::optional<security::JwtRawPB> jwt,
RpcEncryption encryption,
bool encrypt_loopback,
std::string sasl_proto_name)
@@ -129,6 +130,7 @@ ClientNegotiation::ClientNegotiation(unique_ptr<Socket> socket,
tls_negotiated_(false),
encrypt_loopback_(encrypt_loopback),
authn_token_(std::move(authn_token)),
+ jwt_(std::move(jwt)),
psecret_(nullptr, std::free),
negotiated_authn_(AuthenticationType::INVALID),
negotiated_mech_(SaslMechanism::INVALID),
@@ -224,6 +226,9 @@ Status ClientNegotiation::Negotiate(unique_ptr<ErrorStatusPB>* rpc_error) {
case AuthenticationType::TOKEN:
RETURN_NOT_OK(AuthenticateByToken(&recv_buf, rpc_error));
break;
+ case AuthenticationType::JWT:
+ RETURN_NOT_OK(AuthenticateByJwt(&recv_buf, rpc_error));
+ break;
case AuthenticationType::CERTIFICATE:
// The TLS handshake has already authenticated the server.
break;
@@ -349,6 +354,11 @@ Status ClientNegotiation::SendNegotiate() {
msg.add_authn_types()->mutable_token();
}
+ if (jwt_) {
+ // TODO(zchovan): make sure that we are using a trusted certificate
+ msg.add_authn_types()->mutable_jwt();
+ }
+
if (PREDICT_FALSE(msg.authn_types().empty())) {
return Status::NotAuthorized("client is not configured with an authentication type");
}
@@ -400,6 +410,9 @@ Status ClientNegotiation::HandleNegotiate(const NegotiatePB& response) {
}
negotiated_authn_ = AuthenticationType::TOKEN;
return Status::OK();
+ case AuthenticationTypePB::kJwt:
+ negotiated_authn_ = AuthenticationType::JWT;
+ return Status::OK();
case AuthenticationTypePB::kCertificate:
if (!tls_context_->has_signed_cert()) {
return Status::RuntimeError(
@@ -541,6 +554,21 @@ Status ClientNegotiation::AuthenticateBySasl(faststring* recv_buf,
return HandleSaslSuccess(success);
}
+Status ClientNegotiation::AuthenticateByJwt(faststring* recv_buf,
+ unique_ptr<ErrorStatusPB>* rpc_error) {
+ NegotiatePB pb;
+ pb.set_step(NegotiatePB::JWT_EXCHANGE);
+ *pb.mutable_jwt_raw() = std::move(*jwt_);
+ RETURN_NOT_OK(SendNegotiatePB(pb));
+ pb.Clear();
+ RETURN_NOT_OK(RecvNegotiatePB(&pb, recv_buf, rpc_error));
+ if (pb.step() != NegotiatePB::JWT_EXCHANGE) {
+ return Status::NotAuthorized("expected JWT_EXCHANGE step",
+ NegotiatePB::NegotiateStep_Name(pb.step()));
+ }
+ return Status::OK();
+}
+
Status ClientNegotiation::AuthenticateByToken(faststring* recv_buf,
unique_ptr<ErrorStatusPB>* rpc_error) {
// Sanity check that TLS has been negotiated. Sending the token on an
diff --git a/src/kudu/rpc/client_negotiation.h b/src/kudu/rpc/client_negotiation.h
index 4481bb59f..2fd2ffb68 100644
--- a/src/kudu/rpc/client_negotiation.h
+++ b/src/kudu/rpc/client_negotiation.h
@@ -65,6 +65,7 @@ class ClientNegotiation {
ClientNegotiation(std::unique_ptr<Socket> socket,
const security::TlsContext* tls_context,
std::optional<security::SignedTokenPB> authn_token,
+ std::optional<security::JwtRawPB> jwt,
security::RpcEncryption encryption,
bool encrypt_loopback,
std::string sasl_proto_name);
@@ -186,6 +187,9 @@ class ClientNegotiation {
Status AuthenticateByToken(faststring* recv_buf,
std::unique_ptr<ErrorStatusPB> *rpc_error) WARN_UNUSED_RESULT;
+ Status AuthenticateByJwt(faststring* recv_buf,
+ std::unique_ptr<ErrorStatusPB>* rpc_error) WARN_UNUSED_RESULT;
+
// Send an SASL_INITIATE message to the server.
// Returns:
// Status::OK if the SASL_SUCCESS message is expected next.
@@ -234,6 +238,7 @@ class ClientNegotiation {
// TSK state.
std::optional<security::SignedTokenPB> authn_token_;
+ std::optional<security::JwtRawPB> jwt_;
// Authentication state.
std::string plain_auth_user_;
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index af8d72412..c2585f181 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -22,6 +22,7 @@
#include <mutex>
#include <ostream>
#include <string>
+#include <type_traits>
#include <utility>
#include <glog/logging.h>
@@ -33,7 +34,7 @@
#include "kudu/rpc/acceptor_pool.h"
#include "kudu/rpc/connection_id.h"
#include "kudu/rpc/inbound_call.h"
-#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/outbound_call.h" // IWYU pragma: keep
#include "kudu/rpc/reactor.h"
#include "kudu/rpc/remote_method.h"
#include "kudu/rpc/rpc_header.pb.h"
@@ -319,6 +320,7 @@ Messenger::Messenger(const MessengerBuilder &bld)
bld.rpc_tls_min_protocol_,
bld.rpc_tls_excluded_protocols_)),
token_verifier_(new security::TokenVerifier),
+ jwt_verifier_(nullptr),
rpcz_store_(new RpczStore),
metric_entity_(bld.metric_entity_),
rpc_negotiation_timeout_ms_(bld.rpc_negotiation_timeout_ms_),
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index d3ac4e149..9fb77fb0e 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -42,6 +42,7 @@
namespace kudu {
+class JwtVerifier;
class Socket;
class ThreadPool;
@@ -376,6 +377,9 @@ class Messenger {
return token_verifier_;
}
+ const JwtVerifier* jwt_verifier() const { return jwt_verifier_.get(); }
+ JwtVerifier* mutable_jwt_verifier() { return jwt_verifier_.get(); }
+
std::optional<security::SignedTokenPB> authn_token() const {
std::lock_guard<simple_spinlock> l(authn_token_lock_);
return authn_token_;
@@ -385,6 +389,15 @@ class Messenger {
authn_token_ = token;
}
+ std::optional<security::JwtRawPB> jwt() const {
+ std::lock_guard<simple_spinlock> l(authn_token_lock_);
+ return jwt_;
+ }
+ void set_jwt(const security::JwtRawPB& token) {
+ std::lock_guard<simple_spinlock> l(authn_token_lock_);
+ jwt_ = token;
+ }
+
security::RpcAuthentication authentication() const { return authentication_; }
security::RpcEncryption encryption() const { return encryption_; }
bool loopback_encryption() const { return loopback_encryption_; }
@@ -502,10 +515,12 @@ class Messenger {
// A TokenVerifier, which can verify client provided authentication tokens.
std::shared_ptr<security::TokenVerifier> token_verifier_;
+ std::shared_ptr<JwtVerifier> jwt_verifier_;
// An optional token, which can be used to authenticate to a server.
mutable simple_spinlock authn_token_lock_;
std::optional<security::SignedTokenPB> authn_token_;
+ std::optional<security::JwtRawPB> jwt_;
std::unique_ptr<RpczStore> rpcz_store_;
diff --git a/src/kudu/rpc/negotiation-test.cc b/src/kudu/rpc/negotiation-test.cc
index 44d6baac4..58d4b9cbb 100644
--- a/src/kudu/rpc/negotiation-test.cc
+++ b/src/kudu/rpc/negotiation-test.cc
@@ -254,12 +254,14 @@ TEST_P(TestNegotiation, TestNegotiation) {
ClientNegotiation client_negotiation(std::move(client_socket),
&client_tls_context,
authn_token,
+ std::nullopt,
desc.client.encryption,
desc.rpc_encrypt_loopback,
"kudu");
ServerNegotiation server_negotiation(std::move(server_socket),
&server_tls_context,
&token_verifier,
+ nullptr,
desc.server.encryption,
desc.rpc_encrypt_loopback,
"kudu");
@@ -396,6 +398,8 @@ TEST_P(TestNegotiation, TestNegotiation) {
case AuthenticationType::TOKEN:
EXPECT_EQ("client-token", remote_user.username());
break;
+ case AuthenticationType::JWT:
+ break;
case AuthenticationType::INVALID: LOG(FATAL) << "invalid authentication negotiated";
}
}
@@ -1039,7 +1043,7 @@ static void RunGSSAPINegotiationServer(unique_ptr<Socket> socket,
CHECK_OK(tls_context.Init());
TokenVerifier token_verifier;
ServerNegotiation server_negotiation(std::move(socket), &tls_context,
- &token_verifier, RpcEncryption::OPTIONAL,
+ &token_verifier, nullptr, RpcEncryption::OPTIONAL,
/* encrypt_loopback */ false, "kudu");
server_negotiation.set_server_fqdn("127.0.0.1");
CHECK_OK(server_negotiation.EnableGSSAPI());
@@ -1053,7 +1057,7 @@ static void RunGSSAPINegotiationClient(unique_ptr<Socket> conn,
TlsContext tls_context;
CHECK_OK(tls_context.Init());
ClientNegotiation client_negotiation(std::move(conn), &tls_context,
- nullopt, RpcEncryption::OPTIONAL,
+ nullopt, nullopt, RpcEncryption::OPTIONAL,
/* encrypt_loopback */ false, "kudu");
client_negotiation.set_server_fqdn("127.0.0.1");
CHECK_OK(client_negotiation.EnableGSSAPI());
@@ -1217,7 +1221,7 @@ static void RunTimeoutExpectingServer(unique_ptr<Socket> socket) {
CHECK_OK(tls_context.Init());
TokenVerifier token_verifier;
ServerNegotiation server_negotiation(std::move(socket), &tls_context,
- &token_verifier, RpcEncryption::OPTIONAL,
+ &token_verifier, nullptr, RpcEncryption::OPTIONAL,
/* encrypt_loopback */ false, "kudu");
CHECK_OK(server_negotiation.EnablePlain());
Status s = server_negotiation.Negotiate();
@@ -1229,7 +1233,7 @@ static void RunTimeoutNegotiationClient(unique_ptr<Socket> sock) {
TlsContext tls_context;
CHECK_OK(tls_context.Init());
ClientNegotiation client_negotiation(std::move(sock), &tls_context,
- nullopt, RpcEncryption::OPTIONAL,
+ nullopt, nullopt, RpcEncryption::OPTIONAL,
/* encrypt_loopback */ false, "kudu");
CHECK_OK(client_negotiation.EnablePlain("test", "test"));
MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L);
@@ -1251,7 +1255,7 @@ static void RunTimeoutNegotiationServer(unique_ptr<Socket> socket) {
CHECK_OK(tls_context.Init());
TokenVerifier token_verifier;
ServerNegotiation server_negotiation(std::move(socket), &tls_context,
- &token_verifier, RpcEncryption::OPTIONAL,
+ &token_verifier, nullptr, RpcEncryption::OPTIONAL,
/* encrypt_loopback */ false, "kudu");
CHECK_OK(server_negotiation.EnablePlain());
MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L);
@@ -1265,7 +1269,7 @@ static void RunTimeoutExpectingClient(unique_ptr<Socket> socket) {
TlsContext tls_context;
CHECK_OK(tls_context.Init());
ClientNegotiation client_negotiation(std::move(socket), &tls_context,
- nullopt, RpcEncryption::OPTIONAL,
+ nullopt, nullopt, RpcEncryption::OPTIONAL,
/* encrypt_loopback */ false, "kudu");
CHECK_OK(client_negotiation.EnablePlain("test", "test"));
Status s = client_negotiation.Negotiate();
diff --git a/src/kudu/rpc/negotiation.cc b/src/kudu/rpc/negotiation.cc
index 8e65545a8..5dc1a7f23 100644
--- a/src/kudu/rpc/negotiation.cc
+++ b/src/kudu/rpc/negotiation.cc
@@ -17,11 +17,11 @@
#include "kudu/rpc/negotiation.h"
+#include <ctime>
#include <poll.h>
#include <sys/socket.h>
#include <cerrno>
-#include <ctime>
#include <memory>
#include <optional>
#include <ostream>
@@ -43,6 +43,7 @@
#include "kudu/rpc/server_negotiation.h"
#include "kudu/rpc/user_credentials.h"
#include "kudu/security/tls_context.h"
+#include "kudu/security/token.pb.h"
#include "kudu/util/errno.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
@@ -86,6 +87,7 @@ const char* AuthenticationTypeToString(AuthenticationType t) {
case AuthenticationType::SASL: return "SASL"; break;
case AuthenticationType::TOKEN: return "TOKEN"; break;
case AuthenticationType::CERTIFICATE: return "CERTIFICATE"; break;
+ case AuthenticationType::JWT: return "JWT"; break;
}
return "<cannot reach here>";
}
@@ -176,6 +178,7 @@ static Status DoClientNegotiation(Connection* conn,
ClientNegotiation client_negotiation(conn->release_socket(),
&messenger->tls_context(),
authn_token,
+ messenger->jwt(),
encryption,
encrypt_loopback,
messenger->sasl_proto_name());
@@ -259,6 +262,7 @@ static Status DoServerNegotiation(Connection* conn,
ServerNegotiation server_negotiation(conn->release_socket(),
&messenger->tls_context(),
&messenger->token_verifier(),
+ messenger->jwt_verifier(),
encryption,
encrypt_loopback,
messenger->sasl_proto_name());
diff --git a/src/kudu/rpc/negotiation.h b/src/kudu/rpc/negotiation.h
index 8f48e7145..d1574b450 100644
--- a/src/kudu/rpc/negotiation.h
+++ b/src/kudu/rpc/negotiation.h
@@ -35,6 +35,7 @@ enum class AuthenticationType {
SASL,
TOKEN,
CERTIFICATE,
+ JWT,
};
const char* AuthenticationTypeToString(AuthenticationType t);
diff --git a/src/kudu/rpc/rpc_header.proto b/src/kudu/rpc/rpc_header.proto
index 03bce1e4a..f6dff26eb 100644
--- a/src/kudu/rpc/rpc_header.proto
+++ b/src/kudu/rpc/rpc_header.proto
@@ -100,6 +100,7 @@ message AuthenticationTypePB {
message Sasl {};
message Token {};
message Certificate {};
+ message Jwt {};
oneof type {
// The server and client mutually authenticate via SASL.
@@ -116,6 +117,11 @@ message AuthenticationTypePB {
//
// Certificate authentication requires the connection to be TLS encrypted.
Certificate certificate = 3;
+
+ // The server authenticates the client via a JSON web token.
+ //
+ // Requires the connection to be TLS encrypted.
+ Jwt jwt = 4;
}
}
@@ -130,6 +136,7 @@ message NegotiatePB {
SASL_RESPONSE = 4;
TLS_HANDSHAKE = 5;
TOKEN_EXCHANGE = 6;
+ JWT_EXCHANGE = 7;
}
message SaslMechanism {
@@ -185,6 +192,10 @@ message NegotiatePB {
// During the TOKEN_EXCHANGE step, contains the client's signed authentication token.
optional security.SignedTokenPB authn_token = 8;
+
+ // During the JWT_EXCHANGE step, contains the client's JWT used for
+ // authentication.
+ optional security.JwtRawPB jwt_raw = 10;
}
message RemoteMethodPB {
@@ -336,6 +347,9 @@ message ErrorStatusPB {
// negotiation failed, and the client should obtain a new authn token and
// try to reconnect.
FATAL_INVALID_AUTHENTICATION_TOKEN = 16;
+
+ // The JWT is invalid or expired.
+ FATAL_INVALID_JWT = 18;
}
required string message = 1;
diff --git a/src/kudu/rpc/server_negotiation.cc b/src/kudu/rpc/server_negotiation.cc
index 332a30534..17beb8848 100644
--- a/src/kudu/rpc/server_negotiation.cc
+++ b/src/kudu/rpc/server_negotiation.cc
@@ -53,6 +53,7 @@
#include "kudu/util/faststring.h"
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
+#include "kudu/util/jwt.h"
#include "kudu/util/logging.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
@@ -156,6 +157,7 @@ static int ServerNegotiationPlainAuthCb(sasl_conn_t* conn,
ServerNegotiation::ServerNegotiation(unique_ptr<Socket> socket,
const security::TlsContext* tls_context,
const security::TokenVerifier* token_verifier,
+ const JwtVerifier* jwt_verifier,
RpcEncryption encryption,
bool encrypt_loopback,
std::string sasl_proto_name)
@@ -167,6 +169,7 @@ ServerNegotiation::ServerNegotiation(unique_ptr<Socket> socket,
tls_negotiated_(false),
encrypt_loopback_(encrypt_loopback),
token_verifier_(token_verifier),
+ jwt_verifier_(jwt_verifier),
negotiated_authn_(AuthenticationType::INVALID),
negotiated_mech_(SaslMechanism::INVALID),
sasl_proto_name_(std::move(sasl_proto_name)),
@@ -281,6 +284,9 @@ Status ServerNegotiation::Negotiate() {
case AuthenticationType::TOKEN:
RETURN_NOT_OK(AuthenticateByToken(&recv_buf));
break;
+ case AuthenticationType::JWT:
+ RETURN_NOT_OK(AuthenticateByJwt(&recv_buf));
+ break;
case AuthenticationType::CERTIFICATE:
RETURN_NOT_OK(AuthenticateByCertificate());
break;
@@ -307,7 +313,7 @@ Status ServerNegotiation::PreflightCheckGSSAPI(const std::string& sasl_proto_nam
// We aren't going to actually send/receive any messages, but
// this makes it easier to reuse the initialization code.
ServerNegotiation server(
- nullptr, nullptr, nullptr, RpcEncryption::OPTIONAL, false, sasl_proto_name);
+ nullptr, nullptr, nullptr, nullptr, RpcEncryption::OPTIONAL, false, sasl_proto_name);
Status s = server.EnableGSSAPI();
if (!s.ok()) {
return Status::RuntimeError(s.message());
@@ -494,6 +500,11 @@ Status ServerNegotiation::HandleNegotiate(const NegotiatePB& request) {
authn_types.insert(AuthenticationType::CERTIFICATE);
}
break;
+ case AuthenticationTypePB::kJwt:
+ if (jwt_verifier_) {
+ authn_types.insert(AuthenticationType::JWT);
+ }
+ break;
case AuthenticationTypePB::TYPE_NOT_SET: {
Sockaddr addr;
RETURN_NOT_OK(socket_->GetPeerAddress(&addr));
@@ -529,6 +540,9 @@ Status ServerNegotiation::HandleNegotiate(const NegotiatePB& request) {
// TODO(KUDU-1924): consider adding the TSK sequence number to the authentication
// message.
negotiated_authn_ = AuthenticationType::TOKEN;
+ } else if (ContainsKey(authn_types, AuthenticationType::JWT) &&
+ encryption_ != RpcEncryption::DISABLED) {
+ negotiated_authn_ = AuthenticationType::JWT;
} else {
// Otherwise we always can fallback to SASL.
DCHECK(ContainsKey(authn_types, AuthenticationType::SASL));
@@ -578,6 +592,9 @@ Status ServerNegotiation::HandleNegotiate(const NegotiatePB& request) {
}
break;
}
+ case AuthenticationType::JWT:
+ response.add_authn_types()->mutable_jwt();
+ break;
case AuthenticationType::INVALID: LOG(FATAL) << "unreachable";
}
@@ -685,6 +702,47 @@ Status ServerNegotiation::AuthenticateBySasl(faststring* recv_buf) {
return Status::OK();
}
+Status ServerNegotiation::AuthenticateByJwt(faststring* recv_buf) {
+ // Sanity check that TLS has been negotiated. Receiving the token on an
+ // unencrypted channel is a big no-no.
+ DCHECK(tls_negotiated_);
+ if (!tls_negotiated_) {
+ return Status::IllegalState("Attempting to authenticate using JWT on an unencrypted channel");
+ }
+
+ if (!jwt_verifier_) {
+ return Status::IllegalState("No JWT verifier available");
+ }
+
+ // Receive the token from the client.
+ NegotiatePB pb;
+ RETURN_NOT_OK(RecvNegotiatePB(&pb, recv_buf));
+
+ if (PREDICT_FALSE(pb.step() != NegotiatePB::JWT_EXCHANGE)) {
+ auto s = Status::NotAuthorized("expected JWT_EXCHANGE step",
+ NegotiatePB::NegotiateStep_Name(pb.step()));
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+ return s;
+ }
+ if (!pb.has_jwt_raw()) {
+ Status s = Status::NotAuthorized("JWT_EXCHANGE message must include a JWT");
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_INVALID_JWT, s));
+ return s;
+ }
+ string subject;
+ auto result = jwt_verifier_->VerifyToken(pb.jwt_raw().jwt_data(), &subject);
+ if (result.ok()) {
+ authenticated_user_.SetAuthenticatedByToken(std::move(subject));
+ } else {
+ auto s = Status::NotAuthorized(result.message());
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_INVALID_JWT, s));
+ return s;
+ }
+ pb.Clear();
+ pb.set_step(NegotiatePB::JWT_EXCHANGE);
+ return SendNegotiatePB(pb);
+}
+
Status ServerNegotiation::AuthenticateByToken(faststring* recv_buf) {
// Sanity check that TLS has been negotiated. Receiving the token on an
// unencrypted channel is a big no-no.
diff --git a/src/kudu/rpc/server_negotiation.h b/src/kudu/rpc/server_negotiation.h
index 1f7319c15..9339fb53b 100644
--- a/src/kudu/rpc/server_negotiation.h
+++ b/src/kudu/rpc/server_negotiation.h
@@ -42,6 +42,7 @@
namespace kudu {
+class JwtVerifier;
class Sockaddr;
class faststring;
@@ -65,6 +66,7 @@ class ServerNegotiation {
ServerNegotiation(std::unique_ptr<Socket> socket,
const security::TlsContext* tls_context,
const security::TokenVerifier* token_verifier,
+ const JwtVerifier* jwt_verifier,
security::RpcEncryption encryption,
bool encrypt_loopback,
std::string sasl_proto_name);
@@ -192,6 +194,8 @@ class ServerNegotiation {
// 'recv_buf' allows a receive buffer to be reused.
Status AuthenticateByToken(faststring* recv_buf) WARN_UNUSED_RESULT;
+ Status AuthenticateByJwt(faststring* recv_buf) WARN_UNUSED_RESULT;
+
// Authenticate the client using the client's TLS certificate. Populates the
// 'authenticated_user_' field with the certificate's subject.
Status AuthenticateByCertificate() WARN_UNUSED_RESULT;
@@ -234,6 +238,7 @@ class ServerNegotiation {
// TSK state.
const security::TokenVerifier* token_verifier_;
+ const JwtVerifier* jwt_verifier_;
// The set of features supported by the client and server. Filled in during negotiation.
std::set<RpcFeatureFlag> client_features_;
diff --git a/src/kudu/security/token.proto b/src/kudu/security/token.proto
index ea63ce29c..52f014524 100644
--- a/src/kudu/security/token.proto
+++ b/src/kudu/security/token.proto
@@ -90,6 +90,11 @@ message TokenPB {
}
};
+// JSON Web Token: a wrapper to pass around a JWT as is.
+message JwtRawPB {
+ optional bytes jwt_data = 1;
+}
+
message SignedTokenPB {
// The actual token data. This is a serialized TokenPB protobuf. However, we use a
// 'bytes' field, since protobuf doesn't guarantee that if two implementations serialize
diff --git a/src/kudu/util/jwt-util.cc b/src/kudu/util/jwt-util.cc
index bd928f37d..19c149095 100644
--- a/src/kudu/util/jwt-util.cc
+++ b/src/kudu/util/jwt-util.cc
@@ -925,4 +925,15 @@ Status JWTHelper::GetCustomClaimUsername(const JWTDecodedToken* decoded_token,
return Status::OK();
}
+Status KeyBasedJwtVerifier::Init() {
+ return jwt_->Init(jwks_uri_, is_local_file_);
+}
+
+Status KeyBasedJwtVerifier::VerifyToken(const string& bytes_raw, string* subject) const {
+ JWTHelper::UniqueJWTDecodedToken decoded_token;
+ RETURN_NOT_OK(JWTHelper::Decode(bytes_raw, decoded_token));
+ *subject = decoded_token->decoded_jwt_.get_subject();
+ return Status::OK();
+}
+
} // namespace kudu
diff --git a/src/kudu/util/jwt-util.h b/src/kudu/util/jwt-util.h
index 76b7f211a..a4f96839a 100644
--- a/src/kudu/util/jwt-util.h
+++ b/src/kudu/util/jwt-util.h
@@ -20,8 +20,11 @@
#include <memory>
#include <string>
+#include <utility>
+#include "kudu/gutil/macros.h"
#include "kudu/util/jwt-util-internal.h"
+#include "kudu/util/jwt.h"
#include "kudu/util/status.h"
namespace kudu {
@@ -86,4 +89,23 @@ class JWTHelper {
std::unique_ptr<JWKSMgr> jwks_mgr_;
};
+// JwtVerifier implementation that uses JWKS to verify tokens.
+class KeyBasedJwtVerifier : public JwtVerifier {
+ public:
+ KeyBasedJwtVerifier(std::string jwks_uri, bool is_local_file)
+ : jwt_(JWTHelper::GetInstance()),
+ jwks_uri_(std::move(jwks_uri)),
+ is_local_file_(is_local_file) {
+ }
+ ~KeyBasedJwtVerifier() override = default;
+ Status Init();
+ Status VerifyToken(const std::string& bytes_raw, std::string* subject) const override;
+ private:
+ JWTHelper* jwt_;
+ std::string jwks_uri_;
+ bool is_local_file_;
+
+ DISALLOW_COPY_AND_ASSIGN(KeyBasedJwtVerifier);
+};
+
} // namespace kudu
diff --git a/src/kudu/util/jwt.h b/src/kudu/util/jwt.h
new file mode 100644
index 000000000..43bb9d58b
--- /dev/null
+++ b/src/kudu/util/jwt.h
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/status.h"
+
+// Minimal header to avoid having to link heavy-weight modules into libraries.
+
+namespace kudu {
+
+class JwtVerifier {
+ public:
+ virtual ~JwtVerifier() {}
+ // Verifies a JWT, which is passed as bytes_raw, then extracts the subject from the verified
+ // token and returns it by pointer in subject. The returned pointer is owned by the caller.
+ virtual Status VerifyToken(const std::string& bytes_raw, std::string* subject) const = 0;
+};
+
+// Minimal implementation of a JWT verifier to be used when a more full-fledged
+// implementation is not available.
+class SimpleJwtVerifier : public JwtVerifier {
+ public:
+ SimpleJwtVerifier() = default;
+ ~SimpleJwtVerifier() override = default;
+ Status VerifyToken(const std::string& /*bytes_raw*/,
+ std::string* /*subject*/) const override {
+ return Status::NotAuthorized("JWT verification not configured");
+ }
+ private:
+ DISALLOW_COPY_AND_ASSIGN(SimpleJwtVerifier);
+};
+
+} // namespace kudu