You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/02/11 18:43:11 UTC
[kudu] 01/04: KUDU-2543 pt 1: basic checks for authz tokens
This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit ccb92b18988240ca8192cf622f6fc617a00d0646
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Fri Dec 21 21:45:27 2018 -0800
KUDU-2543 pt 1: basic checks for authz tokens
In preparation for passing around authorization tokens, the tservers are
now fitted with minimal token-verifying logic that protects the write
and the various scan-like endpoints (i.e. scans, checksum scans, and
split-key requests), optionally enforcing that the client has provided a
valid authz token. I put the negotiation authn token verification logic
into its own function for reuse in the tserver layer.
It's worth noting that scan-like requests that have a concept of a "new"
request vs a "continue" request (i.e. scans, checksum scans) will only
need verification on "new" requests. "Continue" requests are handled in
that a scanner cannot be hijacked by a user who didn't create it.
A test is added to test various scenarios at the tserver level.
Change-Id: I99555e0ab2d09d4abcbc12b1100658a9a17590f4
Reviewed-on: http://gerrit.cloudera.org:8080/11751
Tested-by: Andrew Wong <aw...@cloudera.com>
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Hao Hao <ha...@cloudera.com>
Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
src/kudu/rpc/CMakeLists.txt | 1 +
src/kudu/rpc/rpc_header.proto | 11 +-
src/kudu/rpc/rpc_verification_util.cc | 67 +++++
src/kudu/rpc/rpc_verification_util.h | 41 +++
src/kudu/rpc/server_negotiation.cc | 34 +--
src/kudu/security/token_verifier.cc | 14 +-
src/kudu/tserver/CMakeLists.txt | 1 +
src/kudu/tserver/tablet_server-test-base.h | 2 +-
.../tserver/tablet_server_authorization-test.cc | 308 +++++++++++++++++++++
src/kudu/tserver/tablet_service.cc | 146 ++++++++++
src/kudu/tserver/tserver.proto | 10 +
src/kudu/tserver/tserver_service.proto | 2 +
12 files changed, 600 insertions(+), 37 deletions(-)
diff --git a/src/kudu/rpc/CMakeLists.txt b/src/kudu/rpc/CMakeLists.txt
index b747337..b258b0e 100644
--- a/src/kudu/rpc/CMakeLists.txt
+++ b/src/kudu/rpc/CMakeLists.txt
@@ -62,6 +62,7 @@ set(KRPC_SRCS
rpc_context.cc
rpc_controller.cc
rpc_sidecar.cc
+ rpc_verification_util.cc
rpcz_store.cc
sasl_common.cc
sasl_helper.cc
diff --git a/src/kudu/rpc/rpc_header.proto b/src/kudu/rpc/rpc_header.proto
index 1d55b6a..03bce1e 100644
--- a/src/kudu/rpc/rpc_header.proto
+++ b/src/kudu/rpc/rpc_header.proto
@@ -313,6 +313,12 @@ message ErrorStatusPB {
// time. The client may try again later.
ERROR_UNAVAILABLE = 7;
+ // The authorization token is invalid or expired. Unlike
+ // FATAL_INVALID_AUTHENTICATION_TOKEN, receipt of this code does not mean
+ // that a reconnection attempt should be made; just that the client should
+ // obtain a new authz token.
+ ERROR_INVALID_AUTHORIZATION_TOKEN = 17;
+
// FATAL_* errors indicate that the client should shut down the connection.
//------------------------------------------------------------
// The RPC server is already shutting down.
@@ -326,8 +332,9 @@ message ErrorStatusPB {
// Auth failed.
FATAL_UNAUTHORIZED = 15;
- // The authentication token is invalid or expired;
- // the client should obtain a new one.
+ // The authentication token is invalid or expired. The connection
+ // negotiation failed, and the client should obtain a new authn token and
+ // try to reconnect.
FATAL_INVALID_AUTHENTICATION_TOKEN = 16;
}
diff --git a/src/kudu/rpc/rpc_verification_util.cc b/src/kudu/rpc/rpc_verification_util.cc
new file mode 100644
index 0000000..c0bf8b1
--- /dev/null
+++ b/src/kudu/rpc/rpc_verification_util.cc
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc_verification_util.h"
+
+#include <ostream>
+
+#include <glog/logging.h>
+
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/security/token_verifier.h"
+
+namespace kudu {
+
+using security::VerificationResult;
+
+namespace rpc {
+
+Status ParseVerificationResult(const VerificationResult& result,
+ ErrorStatusPB::RpcErrorCodePB retry_error,
+ ErrorStatusPB::RpcErrorCodePB* error) {
+ DCHECK(error);
+ switch (result) {
+ case VerificationResult::VALID: return Status::OK();
+
+ case VerificationResult::INVALID_TOKEN:
+ case VerificationResult::INVALID_SIGNATURE:
+ case VerificationResult::EXPIRED_TOKEN:
+ case VerificationResult::EXPIRED_SIGNING_KEY: {
+ // These errors indicate the client should get a new token and try again.
+ *error = retry_error;
+ break;
+ }
+ case VerificationResult::UNKNOWN_SIGNING_KEY: {
+ // The server doesn't recognize the signing key. This indicates that the
+ // server has not been updated with the most recent TSKs, so tell the
+ // client to try again later.
+ *error = ErrorStatusPB::ERROR_UNAVAILABLE;
+ break;
+ }
+ case VerificationResult::INCOMPATIBLE_FEATURE: {
+ // These error types aren't recoverable by having the client get a new token.
+ *error = ErrorStatusPB::FATAL_UNAUTHORIZED;
+ break;
+ }
+ default:
+ LOG(FATAL) << "Unknown verification result: " << static_cast<int>(result);
+ }
+ return Status::NotAuthorized(VerificationResultToString(result));
+}
+
+} // namespace rpc
+} // namespace kudu
diff --git a/src/kudu/rpc/rpc_verification_util.h b/src/kudu/rpc/rpc_verification_util.h
new file mode 100644
index 0000000..0c15b9c
--- /dev/null
+++ b/src/kudu/rpc/rpc_verification_util.h
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+namespace security {
+enum class VerificationResult;
+} // namespace security
+
+namespace rpc {
+
+// Utility function to convert the result of a security token verification to
+// an appropriate RPC error code. Returns OK if 'result' is VALID, and
+// otherwise returns non-OK and sets 'error' appropriately.
+// 'retry_error' is the error code to be returned to denote that verification
+// should be retried after retrieving a new token.
+Status ParseVerificationResult(const security::VerificationResult& result,
+ ErrorStatusPB::RpcErrorCodePB retry_error,
+ ErrorStatusPB::RpcErrorCodePB* error);
+
+} // namespace rpc
+} // namespace kudu
diff --git a/src/kudu/rpc/server_negotiation.cc b/src/kudu/rpc/server_negotiation.cc
index ac641b1..7c3d66a 100644
--- a/src/kudu/rpc/server_negotiation.cc
+++ b/src/kudu/rpc/server_negotiation.cc
@@ -40,6 +40,7 @@
#include "kudu/rpc/blocking_ops.h"
#include "kudu/rpc/constants.h"
#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_verification_util.h"
#include "kudu/rpc/serialization.h"
#include "kudu/security/cert.h"
#include "kudu/security/crypto.h"
@@ -657,33 +658,12 @@ Status ServerNegotiation::AuthenticateByToken(faststring* recv_buf) {
// so it knows how to intelligently retry.
security::TokenPB token;
auto verification_result = token_verifier_->VerifyTokenSignature(pb.authn_token(), &token);
- switch (verification_result) {
- case security::VerificationResult::VALID: break;
-
- case security::VerificationResult::INVALID_TOKEN:
- case security::VerificationResult::INVALID_SIGNATURE:
- case security::VerificationResult::EXPIRED_TOKEN:
- case security::VerificationResult::EXPIRED_SIGNING_KEY: {
- // These errors indicate the client should get a new token and try again.
- Status s = Status::NotAuthorized(VerificationResultToString(verification_result));
- RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_INVALID_AUTHENTICATION_TOKEN, s));
- return s;
- }
-
- case security::VerificationResult::UNKNOWN_SIGNING_KEY: {
- // The server doesn't recognize the signing key. This indicates that the
- // server has not been updated with the most recent TSKs, so tell the
- // client to try again later.
- Status s = Status::NotAuthorized(VerificationResultToString(verification_result));
- RETURN_NOT_OK(SendError(ErrorStatusPB::ERROR_UNAVAILABLE, s));
- return s;
- }
- case security::VerificationResult::INCOMPATIBLE_FEATURE: {
- Status s = Status::NotAuthorized(VerificationResultToString(verification_result));
- // These error types aren't recoverable by having the client get a new token.
- RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
- return s;
- }
+ ErrorStatusPB::RpcErrorCodePB error;
+ Status s = ParseVerificationResult(verification_result,
+ ErrorStatusPB::FATAL_INVALID_AUTHENTICATION_TOKEN, &error);
+ if (!s.ok()) {
+ RETURN_NOT_OK(SendError(error, s));
+ return s;
}
if (!token.has_authn()) {
diff --git a/src/kudu/security/token_verifier.cc b/src/kudu/security/token_verifier.cc
index 1ae20db..e3aef4c 100644
--- a/src/kudu/security/token_verifier.cc
+++ b/src/kudu/security/token_verifier.cc
@@ -123,7 +123,7 @@ VerificationResult TokenVerifier::VerifyTokenSignature(const SignedTokenPB& sign
for (auto flag : token->incompatible_features()) {
if (!TokenPB::Feature_IsValid(flag)) {
- KLOG_EVERY_N_SECS(WARNING, 60) << "received authentication token with unknown feature; "
+ KLOG_EVERY_N_SECS(WARNING, 60) << "received token with unknown feature; "
"server needs to be updated";
return VerificationResult::INCOMPATIBLE_FEATURE;
}
@@ -151,17 +151,17 @@ const char* VerificationResultToString(VerificationResult r) {
case security::VerificationResult::VALID:
return "valid";
case security::VerificationResult::INVALID_TOKEN:
- return "invalid authentication token";
+ return "invalid token";
case security::VerificationResult::INVALID_SIGNATURE:
- return "invalid authentication token signature";
+ return "invalid token signature";
case security::VerificationResult::EXPIRED_TOKEN:
- return "authentication token expired";
+ return "token expired";
case security::VerificationResult::EXPIRED_SIGNING_KEY:
- return "authentication token signing key expired";
+ return "token signing key expired";
case security::VerificationResult::UNKNOWN_SIGNING_KEY:
- return "authentication token signed with unknown key";
+ return "token signed with unknown key";
case security::VerificationResult::INCOMPATIBLE_FEATURE:
- return "authentication token uses incompatible feature";
+ return "token uses incompatible feature";
default:
LOG(FATAL) << "unexpected VerificationResult value: "
<< static_cast<int>(r);
diff --git a/src/kudu/tserver/CMakeLists.txt b/src/kudu/tserver/CMakeLists.txt
index 7304d72..1bc6398 100644
--- a/src/kudu/tserver/CMakeLists.txt
+++ b/src/kudu/tserver/CMakeLists.txt
@@ -177,5 +177,6 @@ ADD_KUDU_TEST(tablet_copy_source_session-test)
ADD_KUDU_TEST(tablet_copy_service-test)
ADD_KUDU_TEST(tablet_server-test PROCESSORS 3)
ADD_KUDU_TEST(tablet_server-stress-test RUN_SERIAL true)
+ADD_KUDU_TEST(tablet_server_authorization-test)
ADD_KUDU_TEST(scanners-test)
ADD_KUDU_TEST(ts_tablet_manager-test)
diff --git a/src/kudu/tserver/tablet_server-test-base.h b/src/kudu/tserver/tablet_server-test-base.h
index b80dfac..e325496 100644
--- a/src/kudu/tserver/tablet_server-test-base.h
+++ b/src/kudu/tserver/tablet_server-test-base.h
@@ -124,10 +124,10 @@ class TabletServerTestBase : public KuduTest {
// given read mode.
Status FillNewScanRequest(ReadMode read_mode, NewScanRequestPB* scan) const;
- protected:
static const char* kTableId;
static const char* kTabletId;
+ protected:
const Schema schema_;
Schema key_schema_;
std::unique_ptr<RowBuilder> rb_;
diff --git a/src/kudu/tserver/tablet_server_authorization-test.cc b/src/kudu/tserver/tablet_server_authorization-test.cc
new file mode 100644
index 0000000..43594eb
--- /dev/null
+++ b/src/kudu/tserver/tablet_server_authorization-test.cc
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <functional>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/common/wire_protocol-test-util.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/user_credentials.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/security/token_signer.h"
+#include "kudu/security/token_verifier.h"
+#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/tablet_server-test-base.h"
+#include "kudu/tserver/tablet_server.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_service.pb.h"
+#include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+using std::shared_ptr;
+using std::string;
+using std::vector;
+
+DECLARE_bool(tserver_enforce_access_control);
+DECLARE_double(tserver_inject_invalid_authz_token_ratio);
+
+namespace kudu {
+
+class Schema;
+
+using pb_util::SecureShortDebugString;
+using rpc::ErrorStatusPB;
+using rpc::RpcController;
+using security::PrivateKey;
+using security::SignedTokenPB;
+using security::TablePrivilegePB;
+using security::TokenSigner;
+using security::TokenSigningPrivateKeyPB;
+using security::TokenSigningPublicKeyPB;
+using security::TokenVerifier;
+
+namespace tserver {
+
+namespace {
+
+// Verifies the expected response for an invalid/malformed token.
+void CheckInvalidAuthzToken(const Status& s, const RpcController& rpc) {
+ ASSERT_TRUE(s.IsRemoteError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "Not authorized");
+ ASSERT_TRUE(rpc.error_response()) << "Expected an error response";
+ ASSERT_TRUE(rpc.error_response()->code() == ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN)
+ << SecureShortDebugString(*rpc.error_response());
+}
+
+// Gets a private key for the given sequence number.
+TokenSigningPrivateKeyPB GetTokenSigningPrivateKey(int seq_num) {
+ TokenSigningPrivateKeyPB tsk;
+ PrivateKey private_key;
+ CHECK_OK(GeneratePrivateKey(/*num_bits=*/512, &private_key));
+ string private_key_str_der;
+ CHECK_OK(private_key.ToString(&private_key_str_der, security::DataFormat::DER));
+ tsk.set_rsa_key_der(private_key_str_der);
+ tsk.set_key_seq_num(seq_num);
+ tsk.set_expire_unix_epoch_seconds(WallTime_Now() + 3600);
+ return tsk;
+}
+
+// Test-param argument to instantiate various tserver requests and send the
+// appropriate proxy calls.
+typedef std::function<Status(const Schema&, const SignedTokenPB*, TabletServerServiceProxy*,
+ RpcController*)> RequestorFunc;
+
+Status WriteGenerator(const Schema& schema, const SignedTokenPB* token,
+ TabletServerServiceProxy* proxy, RpcController* rpc) {
+ WriteRequestPB req;
+ req.set_tablet_id(TabletServerTestBase::kTabletId);
+ RETURN_NOT_OK(SchemaToPB(schema, req.mutable_schema()));
+ AddTestRowToPB(RowOperationsPB::INSERT, schema, 1234, 5678, "hello world",
+ req.mutable_row_operations());
+ if (token) {
+ *req.mutable_authz_token() = *token;
+ }
+ WriteResponsePB resp;
+ LOG(INFO) << "Sending write request";
+ return proxy->Write(req, &resp, rpc);
+}
+
+Status ScanGenerator(const Schema& schema, const SignedTokenPB* token,
+ TabletServerServiceProxy* proxy, RpcController* rpc) {
+ ScanRequestPB req;
+ req.set_call_seq_id(0);
+ NewScanRequestPB* scan = req.mutable_new_scan_request();
+ scan->set_tablet_id(TabletServerTestBase::kTabletId);
+ RETURN_NOT_OK(SchemaToColumnPBs(schema, scan->mutable_projected_columns()));
+ if (token) {
+ *scan->mutable_authz_token() = *token;
+ }
+ ScanResponsePB resp;
+ LOG(INFO) << "Sending scan request";
+ return proxy->Scan(req, &resp, rpc);
+}
+
+Status SplitKeyRangeGenerator(const Schema& /*schema*/, const SignedTokenPB* token,
+ TabletServerServiceProxy* proxy, RpcController* rpc) {
+ SplitKeyRangeRequestPB req;
+ req.set_tablet_id(TabletServerTestBase::kTabletId);
+ if (token) {
+ *req.mutable_authz_token() = *token;
+ }
+ SplitKeyRangeResponsePB resp;
+ LOG(INFO) << "Sending split-key-range request";
+ return proxy->SplitKeyRange(req, &resp, rpc);
+}
+
+Status ChecksumGenerator(const Schema& schema, const SignedTokenPB* token,
+ TabletServerServiceProxy* proxy, RpcController* rpc) {
+ ChecksumRequestPB req;
+ NewScanRequestPB* scan = req.mutable_new_request();
+ scan->set_tablet_id(TabletServerTestBase::kTabletId);
+ RETURN_NOT_OK(SchemaToColumnPBs(schema, scan->mutable_projected_columns()));
+ if (token) {
+ *scan->mutable_authz_token() = *token;
+ }
+ ChecksumResponsePB resp;
+ LOG(INFO) << "Sending checksum scan request";
+ return proxy->Checksum(req, &resp, rpc);
+}
+
+} // anonymous namespace
+
+class AuthzTabletServerTest : public TabletServerTestBase,
+ public testing::WithParamInterface<RequestorFunc> {
+ public:
+ void SetUp() override {
+ NO_FATALS(TabletServerTestBase::SetUp());
+ NO_FATALS(StartTabletServer(/*num_data_dirs=*/1));
+ }
+};
+
+TEST_P(AuthzTabletServerTest, TestInvalidAuthzTokens) {
+ FLAGS_tserver_enforce_access_control = true;
+ rpc::UserCredentials user;
+ const string kUser = "dan";
+ user.set_real_user(kUser);
+ proxy_->set_user_credentials(user);
+
+ TokenSigningPrivateKeyPB tsk = GetTokenSigningPrivateKey(1);
+ shared_ptr<TokenVerifier> verifier(new TokenVerifier());
+ // We're going to manually tamper with the tokens to make them invalid, so
+ // pass in arbitrary expiration values.
+ TokenSigner signer(3600, 3600, 3600, verifier);
+ ASSERT_OK(signer.ImportKeys({ tsk }));
+ vector<TokenSigningPublicKeyPB> public_keys = verifier->ExportKeys();
+ ASSERT_OK(mini_server_->server()->mutable_token_verifier()->ImportKeys(public_keys));
+
+ // Set up a privilege that permits everything. Even with these privileges,
+ // invalid authz tokens will prevent access.
+ TablePrivilegePB privilege;
+ privilege.set_table_id(kTableId);
+ privilege.set_scan_privilege(true);
+ privilege.set_insert_privilege(true);
+ privilege.set_update_privilege(true);
+ privilege.set_delete_privilege(true);
+
+ // Test various "invalid token" scenarios.
+ typedef std::function<SignedTokenPB(void)> TokenCreator;
+ vector<TokenCreator> token_creators;
+ token_creators.emplace_back([&] {
+ LOG(INFO) << "Generating token with a bad signature";
+ SignedTokenPB token;
+ CHECK_OK(signer.GenerateAuthzToken(kUser, privilege, &token));
+ string bad_signature = token.signature();
+ // Flip the bits in the signature.
+ for (int i = 0; i < bad_signature.length(); i++) {
+ char* byte = &bad_signature[i];
+ *byte = ~*byte;
+ }
+ token.set_token_data(std::move(bad_signature));
+ return token;
+ });
+ token_creators.emplace_back([&] {
+ LOG(INFO) << "Generating token with no signature";
+ SignedTokenPB token;
+ CHECK_OK(signer.GenerateAuthzToken(kUser, privilege, &token));
+ token.clear_signature();
+ return token;
+ });
+ token_creators.emplace_back([&] {
+ LOG(INFO) << "Generating token for a different user";
+ SignedTokenPB token;
+ CHECK_OK(signer.GenerateAuthzToken("bad-dan", privilege, &token));
+ return token;
+ });
+ token_creators.emplace_back([&] {
+ LOG(INFO) << "Generating authn token instead of authz token";
+ SignedTokenPB token;
+ CHECK_OK(signer.GenerateAuthnToken(kUser, &token));
+ return token;
+ });
+ token_creators.emplace_back([&] {
+ LOG(INFO) << "Generating expired authz token";
+ TokenSigningPrivateKeyPB tsk = GetTokenSigningPrivateKey(2);
+ shared_ptr<TokenVerifier> verifier(new TokenVerifier());
+ TokenSigner expired_signer(3600, /*authz_token_validity_seconds=*/1, 3600, verifier);
+ CHECK_OK(expired_signer.ImportKeys({ tsk }));
+ vector<TokenSigningPublicKeyPB> expired_public_keys = verifier->ExportKeys();
+ CHECK_OK(mini_server_->server()->mutable_token_verifier()->ImportKeys(public_keys));
+
+ SignedTokenPB token;
+ CHECK_OK(expired_signer.GenerateAuthzToken(kUser, privilege, &token));
+ // Wait for the token to expire.
+ SleepFor(MonoDelta::FromSeconds(3));
+ return token;
+ });
+
+ const auto& send_req = GetParam();
+ // Run all of the above "invalid token" scenarios against the above
+ // requests.
+ for (const auto& token_creator : token_creators) {
+ RpcController rpc;
+ const SignedTokenPB token = token_creator();
+ Status s = send_req(schema_, &token, proxy_.get(), &rpc);
+ NO_FATALS(CheckInvalidAuthzToken(s, rpc));
+ }
+
+ // Send a request with no token. This is also considered an "invalid token".
+ {
+ LOG(INFO) << "Generating request with no authz token";
+ RpcController rpc;
+ Status s = send_req(schema_, nullptr, proxy_.get(), &rpc);
+ NO_FATALS(CheckInvalidAuthzToken(s, rpc));
+ }
+ // Now test a valid token that has no privileges. This is flat-out
+ // disallowed and "fatal".
+ {
+ LOG(INFO) << "Generating request with no privileges";
+ SignedTokenPB token;
+ TablePrivilegePB empty;
+ ASSERT_OK(signer.GenerateAuthzToken(kUser, empty, &token));
+ RpcController rpc;
+ Status s = send_req(schema_, &token, proxy_.get(), &rpc);
+ ASSERT_TRUE(s.IsRemoteError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "Not authorized");
+ ASSERT_TRUE(rpc.error_response());
+ ASSERT_TRUE(rpc.error_response()->code() == ErrorStatusPB::FATAL_UNAUTHORIZED)
+ << SecureShortDebugString(*rpc.error_response());
+ }
+ // Create a healthy token but inject an error.
+ {
+ LOG(INFO) << "Generating healthy request but injecting error";
+ google::FlagSaver saver;
+ FLAGS_tserver_inject_invalid_authz_token_ratio = 1.0;
+ SignedTokenPB token;
+ ASSERT_OK(signer.GenerateAuthzToken(kUser, privilege, &token));
+ RpcController rpc;
+ Status s = send_req(schema_, &token, proxy_.get(), &rpc);
+ NO_FATALS(CheckInvalidAuthzToken(s, rpc));
+ }
+ // Create a healthy token.
+ {
+ LOG(INFO) << "Generating healthy request";
+ SignedTokenPB token;
+ ASSERT_OK(signer.GenerateAuthzToken(kUser, privilege, &token));
+ RpcController rpc;
+ ASSERT_OK(send_req(schema_, &token, proxy_.get(), &rpc));
+ ASSERT_FALSE(rpc.error_response());
+ }
+}
+
+INSTANTIATE_TEST_CASE_P(RequestorFuncs, AuthzTabletServerTest,
+ ::testing::Values(&WriteGenerator, &ScanGenerator,
+ &SplitKeyRangeGenerator, &ChecksumGenerator));
+
+} // namespace tserver
+} // namespace kudu
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 9834f8b..6f1261f 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -66,6 +66,9 @@
#include "kudu/rpc/rpc_context.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/rpc/rpc_verification_util.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/security/token_verifier.h"
#include "kudu/server/server_base.h"
#include "kudu/tablet/compaction.h"
#include "kudu/tablet/metadata.pb.h"
@@ -89,6 +92,7 @@
#include "kudu/util/crc.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/faststring.h"
+#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/memory/arena.h"
@@ -140,6 +144,16 @@ DEFINE_bool(scanner_inject_service_unavailable_on_continue_scan, false,
"any Scan continuation RPC call. Used for tests.");
TAG_FLAG(scanner_inject_service_unavailable_on_continue_scan, unsafe);
+DEFINE_bool(tserver_enforce_access_control, false,
+ "If set, the server will apply fine-grained access control rules "
+ "to client RPCs.");
+TAG_FLAG(tserver_enforce_access_control, experimental);
+TAG_FLAG(tserver_enforce_access_control, runtime);
+
+DEFINE_double(tserver_inject_invalid_authz_token_ratio, 0.0,
+ "Fraction of the time that authz token validation will fail. Used for tests.");
+TAG_FLAG(tserver_inject_invalid_authz_token_ratio, hidden);
+
DECLARE_bool(raft_prepare_replacement_before_eviction);
DECLARE_int32(memory_limit_warn_threshold_percentage);
DECLARE_int32(tablet_history_max_age_sec);
@@ -167,10 +181,15 @@ using kudu::consensus::UnsafeChangeConfigRequestPB;
using kudu::consensus::UnsafeChangeConfigResponsePB;
using kudu::consensus::VoteRequestPB;
using kudu::consensus::VoteResponsePB;
+using kudu::fault_injection::MaybeTrue;
using kudu::pb_util::SecureDebugString;
using kudu::pb_util::SecureShortDebugString;
+using kudu::rpc::ParseVerificationResult;
+using kudu::rpc::ErrorStatusPB;
using kudu::rpc::RpcContext;
using kudu::rpc::RpcSidecar;
+using kudu::security::TokenVerifier;
+using kudu::security::TokenPB;
using kudu::server::ServerBase;
using kudu::tablet::AlterSchemaTransactionState;
using kudu::tablet::TABLET_DATA_COPYING;
@@ -366,6 +385,83 @@ static StdStatusCallback BindHandleResponse(
typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB;
+// If the privilege has neither full scan privileges nor column-level scan
+// privileges, the user is definitely not authorized to perform a scan.
+bool MayHaveScanPrivileges(const security::TablePrivilegePB& privilege) {
+ if (privilege.scan_privilege()) {
+ return true;
+ }
+ if (privilege.column_privileges_size() > 0) {
+ for (const auto& col_id_and_privilege : privilege.column_privileges()) {
+ if (col_id_and_privilege.second.scan_privilege()) {
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
+// Verifies the authorization token's correctness. Returns false and sends an
+// appropriate response if the request's authz token is invalid.
+template <class AuthorizableRequest>
+static bool VerifyAuthzTokenOrRespond(const TokenVerifier& token_verifier,
+ const AuthorizableRequest& req,
+ rpc::RpcContext* context,
+ TokenPB* token) {
+ DCHECK(token);
+ if (!req.has_authz_token()) {
+ context->RespondRpcFailure(rpc::ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN,
+ Status::NotAuthorized("no authorization token presented"));
+ return false;
+ }
+ TokenPB token_pb;
+ const auto result = token_verifier.VerifyTokenSignature(req.authz_token(), &token_pb);
+ ErrorStatusPB::RpcErrorCodePB error;
+ Status s = ParseVerificationResult(result,
+ ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN, &error);
+ if (!s.ok()) {
+ context->RespondRpcFailure(error, s.CloneAndPrepend("authz token verification failure"));
+ return false;
+ }
+ if (!token_pb.has_authz() ||
+ !token_pb.authz().has_table_privilege() ||
+ token_pb.authz().username() != context->remote_user().username()) {
+ context->RespondRpcFailure(ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN,
+ Status::NotAuthorized("invalid authorization token presented"));
+ return false;
+ }
+ if (MaybeTrue(FLAGS_tserver_inject_invalid_authz_token_ratio)) {
+ context->RespondRpcFailure(ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN,
+ Status::NotAuthorized("INJECTED FAILURE"));
+ return false;
+ }
+ *token = std::move(token_pb);
+ return true;
+}
+
+// Verifies the given scan-like request (e.g. Scan, Checksum, SplitKeyRange)
+// 'req', checking for any scan privileges. Returns false if the request's
+// authz token is invalid or does not have any scan privileges, in which case,
+// 'context' will be used to respond with an error. Otherwise, returns true,
+// and the privileges in 'token' should be used to further verify the request.
+template <class AuthorizableScanRequest>
+static bool VerifyHasAnyScanPrivileges(const TokenVerifier& token_verifier,
+ const AuthorizableScanRequest& req,
+ const char* not_authorized_str,
+ rpc::RpcContext* context,
+ TokenPB* token) {
+ if (!VerifyAuthzTokenOrRespond(token_verifier, req, context, token)) {
+ return false;
+ }
+ const auto& privilege = token->authz().table_privilege();
+ if (!MayHaveScanPrivileges(privilege)) {
+ context->RespondRpcFailure(rpc::ErrorStatusPB::FATAL_UNAUTHORIZED,
+ Status::NotAuthorized(not_authorized_str));
+ return false;
+ }
+ return true;
+}
+
static void SetupErrorAndRespond(TabletServerErrorPB* error,
const Status& s,
TabletServerErrorPB::Code code,
@@ -843,6 +939,22 @@ void TabletServiceImpl::Write(const WriteRequestPB* req,
"tablet_id", req->tablet_id());
DVLOG(3) << "Received Write RPC: " << SecureDebugString(*req);
+ if (FLAGS_tserver_enforce_access_control) {
+ TokenPB token;
+ if (!VerifyAuthzTokenOrRespond(server_->token_verifier(), *req, context, &token)) {
+ return;
+ }
+ const auto& privilege = token.authz().table_privilege();
+ if (!privilege.insert_privilege() &&
+ !privilege.update_privilege() &&
+ !privilege.delete_privilege()) {
+ context->RespondRpcFailure(rpc::ErrorStatusPB::FATAL_UNAUTHORIZED,
+ Status::NotAuthorized("not authorized to write"));
+ return;
+ }
+ // TODO(awong): check the privileges required for the contents of the write
+ // request by parsing out the op types in the request.
+ }
scoped_refptr<TabletReplica> replica;
if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp,
context, &replica)) {
@@ -1341,6 +1453,20 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
return;
}
+ // If this is a new scan request, we must enforce the appropriate privileges.
+ string authorized_table_id;
+ unordered_set<int> authorized_column_ids;
+ TokenPB token;
+ if (FLAGS_tserver_enforce_access_control && req->has_new_scan_request()) {
+ if (!VerifyHasAnyScanPrivileges(server_->token_verifier(), req->new_scan_request(),
+ "not authorized to scan", context, &token)) {
+ return;
+ }
+ // TODO(awong): check the privileges required for the contents of the scan
+ // request by pulling out the columns and checking against individual
+ // column privileges.
+ }
+
size_t batch_size_bytes = GetMaxBatchSizeBytesHint(req);
unique_ptr<faststring> rows_data(new faststring(batch_size_bytes * 11 / 10));
unique_ptr<faststring> indirect_data(new faststring(batch_size_bytes * 11 / 10));
@@ -1440,6 +1566,16 @@ void TabletServiceImpl::SplitKeyRange(const SplitKeyRangeRequestPB* req,
TRACE_EVENT1("tserver", "TabletServiceImpl::SplitKeyRange",
"tablet_id", req->tablet_id());
DVLOG(3) << "Received SplitKeyRange RPC: " << SecureDebugString(*req);
+ TokenPB token;
+ if (FLAGS_tserver_enforce_access_control) {
+ if (!VerifyHasAnyScanPrivileges(server_->token_verifier(), *req,
+ "not authorized to split key range", context, &token)) {
+ return;
+ }
+ // TODO(awong): check the privileges required for the contents of the
+ // split-key request by pulling out the columns and checking against
+ // individual column privileges.
+ }
scoped_refptr<TabletReplica> replica;
if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp,
@@ -1565,6 +1701,16 @@ void TabletServiceImpl::Checksum(const ChecksumRequestPB* req,
bool has_more = false;
TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR;
if (req->has_new_request()) {
+ if (FLAGS_tserver_enforce_access_control) {
+ TokenPB token;
+ if (!VerifyHasAnyScanPrivileges(server_->token_verifier(), req->new_request(),
+ "not authorized to checksum", context, &token)) {
+ return;
+ }
+ // TODO(awong): check the privileges required for the contents of the
+ // checksum request by pulling out the columns and checking against
+ // individual column privileges.
+ }
scan_req.mutable_new_scan_request()->CopyFrom(req->new_request());
const NewScanRequestPB& new_req = req->new_request();
scoped_refptr<TabletReplica> replica;
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index afe1b71..3964fdb 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -21,6 +21,7 @@ option java_package = "org.apache.kudu.tserver";
import "kudu/common/common.proto";
import "kudu/common/wire_protocol.proto";
+import "kudu/security/token.proto";
import "kudu/tablet/tablet.proto";
import "kudu/util/pb_util.proto";
@@ -136,6 +137,9 @@ message WriteRequestPB {
// TODO crypto sign this and propagate the signature along with
// the timestamp.
optional fixed64 propagated_timestamp = 5;
+
+ // An authorization token with which to authorize this request.
+ optional security.SignedTokenPB authz_token = 6;
}
message WriteResponsePB {
@@ -284,6 +288,9 @@ message NewScanRequestPB {
// The default value corresponds to RowFormatFlags::NO_FLAGS, which can't be set
// as the actual default since the types differ.
optional uint64 row_format_flags = 14 [default = 0];
+
+ // An authorization token with which to authorize this request.
+ optional security.SignedTokenPB authz_token = 15;
}
// A scan request. Initially, it should specify a scan. Later on, you
@@ -407,6 +414,9 @@ message SplitKeyRangeRequestPB {
// should only include these columns. This can be used if a query will
// only scan a certain subset of the columns.
repeated ColumnSchemaPB columns = 5;
+
+ // An authorization token with which to authorize this request.
+ optional security.SignedTokenPB authz_token = 6;
}
message SplitKeyRangeResponsePB {
diff --git a/src/kudu/tserver/tserver_service.proto b/src/kudu/tserver/tserver_service.proto
index 78b99a3..0f8c25b 100644
--- a/src/kudu/tserver/tserver_service.proto
+++ b/src/kudu/tserver/tserver_service.proto
@@ -59,6 +59,8 @@ service TabletServerService {
message ChecksumRequestPB {
// Only one of 'new_request' or 'continue_request' should be specified.
+ // NOTE: if 'new_request' is specified, it should also include an appropriate
+ // authorization token.
optional NewScanRequestPB new_request = 1;
optional ContinueChecksumRequestPB continue_request = 2;