You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/02/11 18:43:11 UTC

[kudu] 01/04: KUDU-2543 pt 1: basic checks for authz tokens

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit ccb92b18988240ca8192cf622f6fc617a00d0646
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Fri Dec 21 21:45:27 2018 -0800

    KUDU-2543 pt 1: basic checks for authz tokens
    
    In preparation for passing around authorization tokens, the tservers are
    now fitted with minimal token-verifying logic that protects the write
    and the various scan-like endpoints (i.e. scans, checksum scans, and
    split-key requests), optionally enforcing that the client has provided a
    valid authz token. I put the negotiation authn token verification logic
    into its own function for reuse in the tserver layer.
    
    It's worth noting that scan-like requests that have a concept of a "new"
    request vs a "continue" request (i.e. scans, checksum scans) will only
    need verification on "new" requests. "Continue" requests are handled in
    that a scanner cannot be hijacked by a user who didn't create it.
    
    A test is added to test various scenarios at the tserver level.
    
    Change-Id: I99555e0ab2d09d4abcbc12b1100658a9a17590f4
    Reviewed-on: http://gerrit.cloudera.org:8080/11751
    Tested-by: Andrew Wong <aw...@cloudera.com>
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Reviewed-by: Hao Hao <ha...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/rpc/CMakeLists.txt                        |   1 +
 src/kudu/rpc/rpc_header.proto                      |  11 +-
 src/kudu/rpc/rpc_verification_util.cc              |  67 +++++
 src/kudu/rpc/rpc_verification_util.h               |  41 +++
 src/kudu/rpc/server_negotiation.cc                 |  34 +--
 src/kudu/security/token_verifier.cc                |  14 +-
 src/kudu/tserver/CMakeLists.txt                    |   1 +
 src/kudu/tserver/tablet_server-test-base.h         |   2 +-
 .../tserver/tablet_server_authorization-test.cc    | 308 +++++++++++++++++++++
 src/kudu/tserver/tablet_service.cc                 | 146 ++++++++++
 src/kudu/tserver/tserver.proto                     |  10 +
 src/kudu/tserver/tserver_service.proto             |   2 +
 12 files changed, 600 insertions(+), 37 deletions(-)

diff --git a/src/kudu/rpc/CMakeLists.txt b/src/kudu/rpc/CMakeLists.txt
index b747337..b258b0e 100644
--- a/src/kudu/rpc/CMakeLists.txt
+++ b/src/kudu/rpc/CMakeLists.txt
@@ -62,6 +62,7 @@ set(KRPC_SRCS
     rpc_context.cc
     rpc_controller.cc
     rpc_sidecar.cc
+    rpc_verification_util.cc
     rpcz_store.cc
     sasl_common.cc
     sasl_helper.cc
diff --git a/src/kudu/rpc/rpc_header.proto b/src/kudu/rpc/rpc_header.proto
index 1d55b6a..03bce1e 100644
--- a/src/kudu/rpc/rpc_header.proto
+++ b/src/kudu/rpc/rpc_header.proto
@@ -313,6 +313,12 @@ message ErrorStatusPB {
     // time. The client may try again later.
     ERROR_UNAVAILABLE = 7;
 
+    // The authorization token is invalid or expired. Unlike
+    // FATAL_INVALID_AUTHENTICATION_TOKEN, receipt of this code does not mean
+    // that a reconnection attempt should be made; just that the client should
+    // obtain a new authz token.
+    ERROR_INVALID_AUTHORIZATION_TOKEN = 17;
+
     // FATAL_* errors indicate that the client should shut down the connection.
     //------------------------------------------------------------
     // The RPC server is already shutting down.
@@ -326,8 +332,9 @@ message ErrorStatusPB {
     // Auth failed.
     FATAL_UNAUTHORIZED = 15;
 
-    // The authentication token is invalid or expired;
-    // the client should obtain a new one.
+    // The authentication token is invalid or expired. The connection
+    // negotiation failed, and the client should obtain a new authn token and
+    // try to reconnect.
     FATAL_INVALID_AUTHENTICATION_TOKEN = 16;
   }
 
diff --git a/src/kudu/rpc/rpc_verification_util.cc b/src/kudu/rpc/rpc_verification_util.cc
new file mode 100644
index 0000000..c0bf8b1
--- /dev/null
+++ b/src/kudu/rpc/rpc_verification_util.cc
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc_verification_util.h"
+
+#include <ostream>
+
+#include <glog/logging.h>
+
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/security/token_verifier.h"
+
+namespace kudu {
+
+using security::VerificationResult;
+
+namespace rpc {
+
+Status ParseVerificationResult(const VerificationResult& result,
+                               ErrorStatusPB::RpcErrorCodePB retry_error,
+                               ErrorStatusPB::RpcErrorCodePB* error) {
+  DCHECK(error);
+  switch (result) {
+    case VerificationResult::VALID: return Status::OK();
+
+    case VerificationResult::INVALID_TOKEN:
+    case VerificationResult::INVALID_SIGNATURE:
+    case VerificationResult::EXPIRED_TOKEN:
+    case VerificationResult::EXPIRED_SIGNING_KEY: {
+      // These errors indicate the client should get a new token and try again.
+      *error = retry_error;
+      break;
+    }
+    case VerificationResult::UNKNOWN_SIGNING_KEY: {
+      // The server doesn't recognize the signing key. This indicates that the
+      // server has not been updated with the most recent TSKs, so tell the
+      // client to try again later.
+      *error = ErrorStatusPB::ERROR_UNAVAILABLE;
+      break;
+    }
+    case VerificationResult::INCOMPATIBLE_FEATURE: {
+      // These error types aren't recoverable by having the client get a new token.
+      *error = ErrorStatusPB::FATAL_UNAUTHORIZED;
+      break;
+    }
+    default:
+      LOG(FATAL) << "Unknown verification result: " << static_cast<int>(result);
+  }
+  return Status::NotAuthorized(VerificationResultToString(result));
+}
+
+} // namespace rpc
+} // namespace kudu
diff --git a/src/kudu/rpc/rpc_verification_util.h b/src/kudu/rpc/rpc_verification_util.h
new file mode 100644
index 0000000..0c15b9c
--- /dev/null
+++ b/src/kudu/rpc/rpc_verification_util.h
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+namespace security {
+enum class VerificationResult;
+} // namespace security
+
+namespace rpc {
+
+// Utility function to convert the result of a security token verification to
+// an appropriate RPC error code. Returns OK if 'result' is VALID, and
+// otherwise returns non-OK and sets 'error' appropriately.
+// 'retry_error' is the error code to be returned to denote that verification
+// should be retried after retrieving a new token.
+Status ParseVerificationResult(const security::VerificationResult& result,
+                               ErrorStatusPB::RpcErrorCodePB retry_error,
+                               ErrorStatusPB::RpcErrorCodePB* error);
+
+} // namespace rpc
+} // namespace kudu
diff --git a/src/kudu/rpc/server_negotiation.cc b/src/kudu/rpc/server_negotiation.cc
index ac641b1..7c3d66a 100644
--- a/src/kudu/rpc/server_negotiation.cc
+++ b/src/kudu/rpc/server_negotiation.cc
@@ -40,6 +40,7 @@
 #include "kudu/rpc/blocking_ops.h"
 #include "kudu/rpc/constants.h"
 #include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_verification_util.h"
 #include "kudu/rpc/serialization.h"
 #include "kudu/security/cert.h"
 #include "kudu/security/crypto.h"
@@ -657,33 +658,12 @@ Status ServerNegotiation::AuthenticateByToken(faststring* recv_buf) {
   // so it knows how to intelligently retry.
   security::TokenPB token;
   auto verification_result = token_verifier_->VerifyTokenSignature(pb.authn_token(), &token);
-  switch (verification_result) {
-    case security::VerificationResult::VALID: break;
-
-    case security::VerificationResult::INVALID_TOKEN:
-    case security::VerificationResult::INVALID_SIGNATURE:
-    case security::VerificationResult::EXPIRED_TOKEN:
-    case security::VerificationResult::EXPIRED_SIGNING_KEY: {
-      // These errors indicate the client should get a new token and try again.
-      Status s = Status::NotAuthorized(VerificationResultToString(verification_result));
-      RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_INVALID_AUTHENTICATION_TOKEN, s));
-      return s;
-    }
-
-    case security::VerificationResult::UNKNOWN_SIGNING_KEY: {
-      // The server doesn't recognize the signing key. This indicates that the
-      // server has not been updated with the most recent TSKs, so tell the
-      // client to try again later.
-      Status s = Status::NotAuthorized(VerificationResultToString(verification_result));
-      RETURN_NOT_OK(SendError(ErrorStatusPB::ERROR_UNAVAILABLE, s));
-      return s;
-    }
-    case security::VerificationResult::INCOMPATIBLE_FEATURE: {
-      Status s = Status::NotAuthorized(VerificationResultToString(verification_result));
-      // These error types aren't recoverable by having the client get a new token.
-      RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
-      return s;
-    }
+  ErrorStatusPB::RpcErrorCodePB error;
+  Status s = ParseVerificationResult(verification_result,
+      ErrorStatusPB::FATAL_INVALID_AUTHENTICATION_TOKEN, &error);
+  if (!s.ok()) {
+    RETURN_NOT_OK(SendError(error, s));
+    return s;
   }
 
   if (!token.has_authn()) {
diff --git a/src/kudu/security/token_verifier.cc b/src/kudu/security/token_verifier.cc
index 1ae20db..e3aef4c 100644
--- a/src/kudu/security/token_verifier.cc
+++ b/src/kudu/security/token_verifier.cc
@@ -123,7 +123,7 @@ VerificationResult TokenVerifier::VerifyTokenSignature(const SignedTokenPB& sign
 
   for (auto flag : token->incompatible_features()) {
     if (!TokenPB::Feature_IsValid(flag)) {
-      KLOG_EVERY_N_SECS(WARNING, 60) << "received authentication token with unknown feature; "
+      KLOG_EVERY_N_SECS(WARNING, 60) << "received token with unknown feature; "
                                         "server needs to be updated";
       return VerificationResult::INCOMPATIBLE_FEATURE;
     }
@@ -151,17 +151,17 @@ const char* VerificationResultToString(VerificationResult r) {
     case security::VerificationResult::VALID:
       return "valid";
     case security::VerificationResult::INVALID_TOKEN:
-      return "invalid authentication token";
+      return "invalid token";
     case security::VerificationResult::INVALID_SIGNATURE:
-      return "invalid authentication token signature";
+      return "invalid token signature";
     case security::VerificationResult::EXPIRED_TOKEN:
-      return "authentication token expired";
+      return "token expired";
     case security::VerificationResult::EXPIRED_SIGNING_KEY:
-      return "authentication token signing key expired";
+      return "token signing key expired";
     case security::VerificationResult::UNKNOWN_SIGNING_KEY:
-      return "authentication token signed with unknown key";
+      return "token signed with unknown key";
     case security::VerificationResult::INCOMPATIBLE_FEATURE:
-      return "authentication token uses incompatible feature";
+      return "token uses incompatible feature";
     default:
       LOG(FATAL) << "unexpected VerificationResult value: "
                  << static_cast<int>(r);
diff --git a/src/kudu/tserver/CMakeLists.txt b/src/kudu/tserver/CMakeLists.txt
index 7304d72..1bc6398 100644
--- a/src/kudu/tserver/CMakeLists.txt
+++ b/src/kudu/tserver/CMakeLists.txt
@@ -177,5 +177,6 @@ ADD_KUDU_TEST(tablet_copy_source_session-test)
 ADD_KUDU_TEST(tablet_copy_service-test)
 ADD_KUDU_TEST(tablet_server-test PROCESSORS 3)
 ADD_KUDU_TEST(tablet_server-stress-test RUN_SERIAL true)
+ADD_KUDU_TEST(tablet_server_authorization-test)
 ADD_KUDU_TEST(scanners-test)
 ADD_KUDU_TEST(ts_tablet_manager-test)
diff --git a/src/kudu/tserver/tablet_server-test-base.h b/src/kudu/tserver/tablet_server-test-base.h
index b80dfac..e325496 100644
--- a/src/kudu/tserver/tablet_server-test-base.h
+++ b/src/kudu/tserver/tablet_server-test-base.h
@@ -124,10 +124,10 @@ class TabletServerTestBase : public KuduTest {
   // given read mode.
   Status FillNewScanRequest(ReadMode read_mode, NewScanRequestPB* scan) const;
 
- protected:
   static const char* kTableId;
   static const char* kTabletId;
 
+ protected:
   const Schema schema_;
   Schema key_schema_;
   std::unique_ptr<RowBuilder> rb_;
diff --git a/src/kudu/tserver/tablet_server_authorization-test.cc b/src/kudu/tserver/tablet_server_authorization-test.cc
new file mode 100644
index 0000000..43594eb
--- /dev/null
+++ b/src/kudu/tserver/tablet_server_authorization-test.cc
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <functional>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/common/wire_protocol-test-util.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/user_credentials.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/security/token_signer.h"
+#include "kudu/security/token_verifier.h"
+#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/tablet_server-test-base.h"
+#include "kudu/tserver/tablet_server.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_service.pb.h"
+#include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+using std::shared_ptr;
+using std::string;
+using std::vector;
+
+DECLARE_bool(tserver_enforce_access_control);
+DECLARE_double(tserver_inject_invalid_authz_token_ratio);
+
+namespace kudu {
+
+class Schema;
+
+using pb_util::SecureShortDebugString;
+using rpc::ErrorStatusPB;
+using rpc::RpcController;
+using security::PrivateKey;
+using security::SignedTokenPB;
+using security::TablePrivilegePB;
+using security::TokenSigner;
+using security::TokenSigningPrivateKeyPB;
+using security::TokenSigningPublicKeyPB;
+using security::TokenVerifier;
+
+namespace tserver {
+
+namespace {
+
+// Verifies the expected response for an invalid/malformed token.
+void CheckInvalidAuthzToken(const Status& s, const RpcController& rpc) {
+  ASSERT_TRUE(s.IsRemoteError()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Not authorized");
+  ASSERT_TRUE(rpc.error_response()) << "Expected an error response";
+  ASSERT_TRUE(rpc.error_response()->code() == ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN)
+      << SecureShortDebugString(*rpc.error_response());
+}
+
+// Gets a private key for the given sequence number.
+TokenSigningPrivateKeyPB GetTokenSigningPrivateKey(int seq_num) {
+  TokenSigningPrivateKeyPB tsk;
+  PrivateKey private_key;
+  CHECK_OK(GeneratePrivateKey(/*num_bits=*/512, &private_key));
+  string private_key_str_der;
+  CHECK_OK(private_key.ToString(&private_key_str_der, security::DataFormat::DER));
+  tsk.set_rsa_key_der(private_key_str_der);
+  tsk.set_key_seq_num(seq_num);
+  tsk.set_expire_unix_epoch_seconds(WallTime_Now() + 3600);
+  return tsk;
+}
+
+// Test-param argument to instantiate various tserver requests and send the
+// appropriate proxy calls.
+typedef std::function<Status(const Schema&, const SignedTokenPB*, TabletServerServiceProxy*,
+                             RpcController*)> RequestorFunc;
+
+Status WriteGenerator(const Schema& schema, const SignedTokenPB* token,
+                      TabletServerServiceProxy* proxy, RpcController* rpc) {
+  WriteRequestPB req;
+  req.set_tablet_id(TabletServerTestBase::kTabletId);
+  RETURN_NOT_OK(SchemaToPB(schema, req.mutable_schema()));
+  AddTestRowToPB(RowOperationsPB::INSERT, schema, 1234, 5678, "hello world",
+                 req.mutable_row_operations());
+  if (token) {
+    *req.mutable_authz_token() = *token;
+  }
+  WriteResponsePB resp;
+  LOG(INFO) << "Sending write request";
+  return proxy->Write(req, &resp, rpc);
+}
+
+Status ScanGenerator(const Schema& schema, const SignedTokenPB* token,
+                     TabletServerServiceProxy* proxy, RpcController* rpc) {
+  ScanRequestPB req;
+  req.set_call_seq_id(0);
+  NewScanRequestPB* scan = req.mutable_new_scan_request();
+  scan->set_tablet_id(TabletServerTestBase::kTabletId);
+  RETURN_NOT_OK(SchemaToColumnPBs(schema, scan->mutable_projected_columns()));
+  if (token) {
+    *scan->mutable_authz_token() = *token;
+  }
+  ScanResponsePB resp;
+  LOG(INFO) << "Sending scan request";
+  return proxy->Scan(req, &resp, rpc);
+}
+
+Status SplitKeyRangeGenerator(const Schema& /*schema*/, const SignedTokenPB* token,
+                              TabletServerServiceProxy* proxy, RpcController* rpc) {
+  SplitKeyRangeRequestPB req;
+  req.set_tablet_id(TabletServerTestBase::kTabletId);
+  if (token) {
+    *req.mutable_authz_token() = *token;
+  }
+  SplitKeyRangeResponsePB resp;
+  LOG(INFO) << "Sending split-key-range request";
+  return proxy->SplitKeyRange(req, &resp, rpc);
+}
+
+Status ChecksumGenerator(const Schema& schema, const SignedTokenPB* token,
+                         TabletServerServiceProxy* proxy, RpcController* rpc) {
+  ChecksumRequestPB req;
+  NewScanRequestPB* scan = req.mutable_new_request();
+  scan->set_tablet_id(TabletServerTestBase::kTabletId);
+  RETURN_NOT_OK(SchemaToColumnPBs(schema, scan->mutable_projected_columns()));
+  if (token) {
+    *scan->mutable_authz_token() = *token;
+  }
+  ChecksumResponsePB resp;
+  LOG(INFO) << "Sending checksum scan request";
+  return proxy->Checksum(req, &resp, rpc);
+}
+
+} // anonymous namespace
+
+class AuthzTabletServerTest : public TabletServerTestBase,
+                              public testing::WithParamInterface<RequestorFunc> {
+ public:
+  void SetUp() override {
+    NO_FATALS(TabletServerTestBase::SetUp());
+    NO_FATALS(StartTabletServer(/*num_data_dirs=*/1));
+  }
+};
+
+TEST_P(AuthzTabletServerTest, TestInvalidAuthzTokens) {
+  FLAGS_tserver_enforce_access_control = true;
+  rpc::UserCredentials user;
+  const string kUser = "dan";
+  user.set_real_user(kUser);
+  proxy_->set_user_credentials(user);
+
+  TokenSigningPrivateKeyPB tsk = GetTokenSigningPrivateKey(1);
+  shared_ptr<TokenVerifier> verifier(new TokenVerifier());
+  // We're going to manually tamper with the tokens to make them invalid, so
+  // pass in arbitrary expiration values.
+  TokenSigner signer(3600, 3600, 3600, verifier);
+  ASSERT_OK(signer.ImportKeys({ tsk }));
+  vector<TokenSigningPublicKeyPB> public_keys = verifier->ExportKeys();
+  ASSERT_OK(mini_server_->server()->mutable_token_verifier()->ImportKeys(public_keys));
+
+  // Set up a privilege that permits everything. Even with these privileges,
+  // invalid authz tokens will prevent access.
+  TablePrivilegePB privilege;
+  privilege.set_table_id(kTableId);
+  privilege.set_scan_privilege(true);
+  privilege.set_insert_privilege(true);
+  privilege.set_update_privilege(true);
+  privilege.set_delete_privilege(true);
+
+  // Test various "invalid token" scenarios.
+  typedef std::function<SignedTokenPB(void)> TokenCreator;
+  vector<TokenCreator> token_creators;
+  token_creators.emplace_back([&] {
+    LOG(INFO) << "Generating token with a bad signature";
+    SignedTokenPB token;
+    CHECK_OK(signer.GenerateAuthzToken(kUser, privilege, &token));
+    string bad_signature = token.signature();
+    // Flip the bits in the signature.
+    for (int i = 0; i < bad_signature.length(); i++) {
+      char* byte = &bad_signature[i];
+      *byte = ~*byte;
+    }
+    token.set_token_data(std::move(bad_signature));
+    return token;
+  });
+  token_creators.emplace_back([&] {
+    LOG(INFO) << "Generating token with no signature";
+    SignedTokenPB token;
+    CHECK_OK(signer.GenerateAuthzToken(kUser, privilege, &token));
+    token.clear_signature();
+    return token;
+  });
+  token_creators.emplace_back([&] {
+    LOG(INFO) << "Generating token for a different user";
+    SignedTokenPB token;
+    CHECK_OK(signer.GenerateAuthzToken("bad-dan", privilege, &token));
+    return token;
+  });
+  token_creators.emplace_back([&] {
+    LOG(INFO) << "Generating authn token instead of authz token";
+    SignedTokenPB token;
+    CHECK_OK(signer.GenerateAuthnToken(kUser, &token));
+    return token;
+  });
+  token_creators.emplace_back([&] {
+    LOG(INFO) << "Generating expired authz token";
+    TokenSigningPrivateKeyPB tsk = GetTokenSigningPrivateKey(2);
+    shared_ptr<TokenVerifier> verifier(new TokenVerifier());
+    TokenSigner expired_signer(3600, /*authz_token_validity_seconds=*/1, 3600, verifier);
+    CHECK_OK(expired_signer.ImportKeys({ tsk }));
+    vector<TokenSigningPublicKeyPB> expired_public_keys = verifier->ExportKeys();
+    CHECK_OK(mini_server_->server()->mutable_token_verifier()->ImportKeys(public_keys));
+
+    SignedTokenPB token;
+    CHECK_OK(expired_signer.GenerateAuthzToken(kUser, privilege, &token));
+    // Wait for the token to expire.
+    SleepFor(MonoDelta::FromSeconds(3));
+    return token;
+  });
+
+  const auto& send_req = GetParam();
+  // Run all of the above "invalid token" scenarios against the above
+  // requests.
+  for (const auto& token_creator : token_creators) {
+    RpcController rpc;
+    const SignedTokenPB token = token_creator();
+    Status s = send_req(schema_, &token, proxy_.get(), &rpc);
+    NO_FATALS(CheckInvalidAuthzToken(s, rpc));
+  }
+
+  // Send a request with no token. This is also considered an "invalid token".
+  {
+    LOG(INFO) << "Generating request with no authz token";
+    RpcController rpc;
+    Status s = send_req(schema_, nullptr, proxy_.get(), &rpc);
+    NO_FATALS(CheckInvalidAuthzToken(s, rpc));
+  }
+  // Now test a valid token that has no privileges. This is flat-out
+  // disallowed and "fatal".
+  {
+    LOG(INFO) << "Generating request with no privileges";
+    SignedTokenPB token;
+    TablePrivilegePB empty;
+    ASSERT_OK(signer.GenerateAuthzToken(kUser, empty, &token));
+    RpcController rpc;
+    Status s = send_req(schema_, &token, proxy_.get(), &rpc);
+    ASSERT_TRUE(s.IsRemoteError()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "Not authorized");
+    ASSERT_TRUE(rpc.error_response());
+    ASSERT_TRUE(rpc.error_response()->code() == ErrorStatusPB::FATAL_UNAUTHORIZED)
+        << SecureShortDebugString(*rpc.error_response());
+  }
+  // Create a healthy token but inject an error.
+  {
+    LOG(INFO) << "Generating healthy request but injecting error";
+    google::FlagSaver saver;
+    FLAGS_tserver_inject_invalid_authz_token_ratio = 1.0;
+    SignedTokenPB token;
+    ASSERT_OK(signer.GenerateAuthzToken(kUser, privilege, &token));
+    RpcController rpc;
+    Status s = send_req(schema_, &token, proxy_.get(), &rpc);
+    NO_FATALS(CheckInvalidAuthzToken(s, rpc));
+  }
+  // Create a healthy token.
+  {
+    LOG(INFO) << "Generating healthy request";
+    SignedTokenPB token;
+    ASSERT_OK(signer.GenerateAuthzToken(kUser, privilege, &token));
+    RpcController rpc;
+    ASSERT_OK(send_req(schema_, &token, proxy_.get(), &rpc));
+    ASSERT_FALSE(rpc.error_response());
+  }
+}
+
+INSTANTIATE_TEST_CASE_P(RequestorFuncs, AuthzTabletServerTest,
+    ::testing::Values(&WriteGenerator, &ScanGenerator,
+                      &SplitKeyRangeGenerator, &ChecksumGenerator));
+
+} // namespace tserver
+} // namespace kudu
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 9834f8b..6f1261f 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -66,6 +66,9 @@
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/rpc/rpc_verification_util.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/security/token_verifier.h"
 #include "kudu/server/server_base.h"
 #include "kudu/tablet/compaction.h"
 #include "kudu/tablet/metadata.pb.h"
@@ -89,6 +92,7 @@
 #include "kudu/util/crc.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/faststring.h"
+#include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/memory/arena.h"
@@ -140,6 +144,16 @@ DEFINE_bool(scanner_inject_service_unavailable_on_continue_scan, false,
            "any Scan continuation RPC call. Used for tests.");
 TAG_FLAG(scanner_inject_service_unavailable_on_continue_scan, unsafe);
 
+DEFINE_bool(tserver_enforce_access_control, false,
+            "If set, the server will apply fine-grained access control rules "
+            "to client RPCs.");
+TAG_FLAG(tserver_enforce_access_control, experimental);
+TAG_FLAG(tserver_enforce_access_control, runtime);
+
+DEFINE_double(tserver_inject_invalid_authz_token_ratio, 0.0,
+              "Fraction of the time that authz token validation will fail. Used for tests.");
+TAG_FLAG(tserver_inject_invalid_authz_token_ratio, hidden);
+
 DECLARE_bool(raft_prepare_replacement_before_eviction);
 DECLARE_int32(memory_limit_warn_threshold_percentage);
 DECLARE_int32(tablet_history_max_age_sec);
@@ -167,10 +181,15 @@ using kudu::consensus::UnsafeChangeConfigRequestPB;
 using kudu::consensus::UnsafeChangeConfigResponsePB;
 using kudu::consensus::VoteRequestPB;
 using kudu::consensus::VoteResponsePB;
+using kudu::fault_injection::MaybeTrue;
 using kudu::pb_util::SecureDebugString;
 using kudu::pb_util::SecureShortDebugString;
+using kudu::rpc::ParseVerificationResult;
+using kudu::rpc::ErrorStatusPB;
 using kudu::rpc::RpcContext;
 using kudu::rpc::RpcSidecar;
+using kudu::security::TokenVerifier;
+using kudu::security::TokenPB;
 using kudu::server::ServerBase;
 using kudu::tablet::AlterSchemaTransactionState;
 using kudu::tablet::TABLET_DATA_COPYING;
@@ -366,6 +385,83 @@ static StdStatusCallback BindHandleResponse(
 
 typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB;
 
+// If the privilege has neither full scan privileges nor column-level scan
+// privileges, the user is definitely not authorized to perform a scan.
+bool MayHaveScanPrivileges(const security::TablePrivilegePB& privilege) {
+  if (privilege.scan_privilege()) {
+    return true;
+  }
+  if (privilege.column_privileges_size() > 0) {
+    for (const auto& col_id_and_privilege : privilege.column_privileges()) {
+      if (col_id_and_privilege.second.scan_privilege()) {
+        return true;
+      }
+    }
+  }
+  return false;
+}
+
+// Verifies the authorization token's correctness. Returns false and sends an
+// appropriate response if the request's authz token is invalid.
+template <class AuthorizableRequest>
+static bool VerifyAuthzTokenOrRespond(const TokenVerifier& token_verifier,
+                                      const AuthorizableRequest& req,
+                                      rpc::RpcContext* context,
+                                      TokenPB* token) {
+  DCHECK(token);
+  if (!req.has_authz_token()) {
+    context->RespondRpcFailure(rpc::ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN,
+        Status::NotAuthorized("no authorization token presented"));
+    return false;
+  }
+  TokenPB token_pb;
+  const auto result = token_verifier.VerifyTokenSignature(req.authz_token(), &token_pb);
+  ErrorStatusPB::RpcErrorCodePB error;
+  Status s = ParseVerificationResult(result,
+      ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN, &error);
+  if (!s.ok()) {
+    context->RespondRpcFailure(error, s.CloneAndPrepend("authz token verification failure"));
+    return false;
+  }
+  if (!token_pb.has_authz() ||
+      !token_pb.authz().has_table_privilege() ||
+      token_pb.authz().username() != context->remote_user().username()) {
+    context->RespondRpcFailure(ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN,
+        Status::NotAuthorized("invalid authorization token presented"));
+    return false;
+  }
+  if (MaybeTrue(FLAGS_tserver_inject_invalid_authz_token_ratio)) {
+    context->RespondRpcFailure(ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN,
+        Status::NotAuthorized("INJECTED FAILURE"));
+    return false;
+  }
+  *token = std::move(token_pb);
+  return true;
+}
+
+// Verifies the given scan-like request (e.g. Scan, Checksum, SplitKeyRange)
+// 'req', checking for any scan privileges. Returns false if the request's
+// authz token is invalid or does not have any scan privileges, in which case,
+// 'context' will be used to respond with an error. Otherwise, returns true,
+// and the privileges in 'token' should be used to further verify the request.
+template <class AuthorizableScanRequest>
+static bool VerifyHasAnyScanPrivileges(const TokenVerifier& token_verifier,
+                                       const AuthorizableScanRequest& req,
+                                       const char* not_authorized_str,
+                                       rpc::RpcContext* context,
+                                       TokenPB* token) {
+  if (!VerifyAuthzTokenOrRespond(token_verifier, req, context, token)) {
+    return false;
+  }
+  const auto& privilege = token->authz().table_privilege();
+  if (!MayHaveScanPrivileges(privilege)) {
+    context->RespondRpcFailure(rpc::ErrorStatusPB::FATAL_UNAUTHORIZED,
+        Status::NotAuthorized(not_authorized_str));
+    return false;
+  }
+  return true;
+}
+
 static void SetupErrorAndRespond(TabletServerErrorPB* error,
                                  const Status& s,
                                  TabletServerErrorPB::Code code,
@@ -843,6 +939,22 @@ void TabletServiceImpl::Write(const WriteRequestPB* req,
                "tablet_id", req->tablet_id());
   DVLOG(3) << "Received Write RPC: " << SecureDebugString(*req);
 
+  if (FLAGS_tserver_enforce_access_control) {
+    TokenPB token;
+    if (!VerifyAuthzTokenOrRespond(server_->token_verifier(), *req, context, &token)) {
+      return;
+    }
+    const auto& privilege = token.authz().table_privilege();
+    if (!privilege.insert_privilege() &&
+        !privilege.update_privilege() &&
+        !privilege.delete_privilege()) {
+      context->RespondRpcFailure(rpc::ErrorStatusPB::FATAL_UNAUTHORIZED,
+          Status::NotAuthorized("not authorized to write"));
+      return;
+    }
+    // TODO(awong): check the privileges required for the contents of the write
+    // request by parsing out the op types in the request.
+  }
   scoped_refptr<TabletReplica> replica;
   if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp,
                                            context, &replica)) {
@@ -1341,6 +1453,20 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
     return;
   }
 
+  // If this is a new scan request, we must enforce the appropriate privileges.
+  string authorized_table_id;
+  unordered_set<int> authorized_column_ids;
+  TokenPB token;
+  if (FLAGS_tserver_enforce_access_control && req->has_new_scan_request()) {
+    if (!VerifyHasAnyScanPrivileges(server_->token_verifier(), req->new_scan_request(),
+                                    "not authorized to scan", context, &token)) {
+      return;
+    }
+    // TODO(awong): check the privileges required for the contents of the scan
+    // request by pulling out the columns and checking against individual
+    // column privileges.
+  }
+
   size_t batch_size_bytes = GetMaxBatchSizeBytesHint(req);
   unique_ptr<faststring> rows_data(new faststring(batch_size_bytes * 11 / 10));
   unique_ptr<faststring> indirect_data(new faststring(batch_size_bytes * 11 / 10));
@@ -1440,6 +1566,16 @@ void TabletServiceImpl::SplitKeyRange(const SplitKeyRangeRequestPB* req,
   TRACE_EVENT1("tserver", "TabletServiceImpl::SplitKeyRange",
                "tablet_id", req->tablet_id());
   DVLOG(3) << "Received SplitKeyRange RPC: " << SecureDebugString(*req);
+  TokenPB token;
+  if (FLAGS_tserver_enforce_access_control) {
+    if (!VerifyHasAnyScanPrivileges(server_->token_verifier(), *req,
+                                    "not authorized to split key range", context, &token)) {
+      return;
+    }
+    // TODO(awong): check the privileges required for the contents of the
+    // split-key request by pulling out the columns and checking against
+    // individual column privileges.
+  }
 
   scoped_refptr<TabletReplica> replica;
   if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp,
@@ -1565,6 +1701,16 @@ void TabletServiceImpl::Checksum(const ChecksumRequestPB* req,
   bool has_more = false;
   TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR;
   if (req->has_new_request()) {
+    if (FLAGS_tserver_enforce_access_control) {
+      TokenPB token;
+      if (!VerifyHasAnyScanPrivileges(server_->token_verifier(), req->new_request(),
+                                      "not authorized to checksum", context, &token)) {
+        return;
+      }
+      // TODO(awong): check the privileges required for the contents of the
+      // checksum request by pulling out the columns and checking against
+      // individual column privileges.
+    }
     scan_req.mutable_new_scan_request()->CopyFrom(req->new_request());
     const NewScanRequestPB& new_req = req->new_request();
     scoped_refptr<TabletReplica> replica;
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index afe1b71..3964fdb 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -21,6 +21,7 @@ option java_package = "org.apache.kudu.tserver";
 
 import "kudu/common/common.proto";
 import "kudu/common/wire_protocol.proto";
+import "kudu/security/token.proto";
 import "kudu/tablet/tablet.proto";
 import "kudu/util/pb_util.proto";
 
@@ -136,6 +137,9 @@ message WriteRequestPB {
   // TODO crypto sign this and propagate the signature along with
   // the timestamp.
   optional fixed64 propagated_timestamp = 5;
+
+  // An authorization token with which to authorize this request.
+  optional security.SignedTokenPB authz_token = 6;
 }
 
 message WriteResponsePB {
@@ -284,6 +288,9 @@ message NewScanRequestPB {
   // The default value corresponds to RowFormatFlags::NO_FLAGS, which can't be set
   // as the actual default since the types differ.
   optional uint64 row_format_flags = 14 [default = 0];
+
+  // An authorization token with which to authorize this request.
+  optional security.SignedTokenPB authz_token = 15;
 }
 
 // A scan request. Initially, it should specify a scan. Later on, you
@@ -407,6 +414,9 @@ message SplitKeyRangeRequestPB {
   // should only include these columns. This can be used if a query will
   // only scan a certain subset of the columns.
   repeated ColumnSchemaPB columns = 5;
+
+  // An authorization token with which to authorize this request.
+  optional security.SignedTokenPB authz_token = 6;
 }
 
 message SplitKeyRangeResponsePB {
diff --git a/src/kudu/tserver/tserver_service.proto b/src/kudu/tserver/tserver_service.proto
index 78b99a3..0f8c25b 100644
--- a/src/kudu/tserver/tserver_service.proto
+++ b/src/kudu/tserver/tserver_service.proto
@@ -59,6 +59,8 @@ service TabletServerService {
 
 message ChecksumRequestPB {
   // Only one of 'new_request' or 'continue_request' should be specified.
+  // NOTE: if 'new_request' is specified, it should also include an appropriate
+  // authorization token.
   optional NewScanRequestPB new_request = 1;
   optional ContinueChecksumRequestPB continue_request = 2;