You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2019/02/13 19:44:14 UTC

[kudu] branch master updated: KUDU-2543 pt 2: pass around default authz tokens

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new a981864  KUDU-2543 pt 2: pass around default authz tokens
a981864 is described below

commit a98186406b90ea5a53e06e7e98c34f6c5810de52
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Wed Jan 2 18:49:21 2019 -0800

    KUDU-2543 pt 2: pass around default authz tokens
    
    Adds authz token generation to the master's GetTableSchema endpoint,
    with which clients can authorize themselves for specific tables. A
    client will cache these tokens and use them appropriately for RPCs that
    need them (e.g. Writes and Scans), reacquiring them when receiving word
    that they are expired.
    
    This is tested in the following ways:
    - unit tests for the new client-side cache for authz tokens
    - parameterized the token expiration test for authn and authz tokens to
      have varying token expirations, testing when authn tokens expire but
      not authz tokens, and vice versa
    - added various tests to ensure the client behaves correctly in various
      non-happy cases
    
    Change-Id: I7971d652d6adc822167cf959bffd5f994a7ca565
    Reviewed-on: http://gerrit.cloudera.org:8080/12122
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Hao Hao <ha...@cloudera.com>
---
 src/kudu/client/CMakeLists.txt                     |   1 +
 src/kudu/client/authz_token_cache.cc               | 161 ++++++
 src/kudu/client/authz_token_cache.h                | 139 ++++++
 src/kudu/client/batcher.cc                         |  75 ++-
 src/kudu/client/client-internal.cc                 |  38 +-
 src/kudu/client/client-internal.h                  |  33 ++
 src/kudu/client/client-test.cc                     |  82 +++-
 src/kudu/client/client.h                           |   6 +
 src/kudu/client/scanner-internal.cc                |  40 +-
 src/kudu/client/scanner-internal.h                 |   8 +-
 src/kudu/integration-tests/CMakeLists.txt          |   3 +-
 ..._expire-itest.cc => auth_token_expire-itest.cc} | 148 +++---
 src/kudu/integration-tests/authz_token-itest.cc    | 546 +++++++++++++++++++++
 src/kudu/master/catalog_manager.h                  |   2 +
 src/kudu/master/master-test.cc                     |  36 ++
 src/kudu/master/master.proto                       |   9 +-
 src/kudu/master/master_service.cc                  |  33 ++
 src/kudu/rpc/retriable_rpc.h                       |  70 ++-
 src/kudu/rpc/rpc.h                                 |   5 +-
 src/kudu/util/test_util.h                          |   7 +
 20 files changed, 1347 insertions(+), 95 deletions(-)

diff --git a/src/kudu/client/CMakeLists.txt b/src/kudu/client/CMakeLists.txt
index 3048751..38f8d21 100644
--- a/src/kudu/client/CMakeLists.txt
+++ b/src/kudu/client/CMakeLists.txt
@@ -29,6 +29,7 @@ ADD_EXPORTABLE_LIBRARY(client_proto
   NONLINK_DEPS ${CLIENT_PROTO_TGTS})
 
 set(CLIENT_SRCS
+  authz_token_cache.cc
   batcher.cc
   client.cc
   client_builder-internal.cc
diff --git a/src/kudu/client/authz_token_cache.cc b/src/kudu/client/authz_token_cache.cc
new file mode 100644
index 0000000..1051bac
--- /dev/null
+++ b/src/kudu/client/authz_token_cache.cc
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/client/authz_token_cache.h"
+
+#include <cstdint>
+#include <mutex>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <boost/bind.hpp> // IWYU pragma: keep
+#include <glog/logging.h>
+
+#include "kudu/client/client-internal.h"
+#include "kudu/client/client.h"
+#include "kudu/client/master_proxy_rpc.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/gutil/bind.h"
+#include "kudu/gutil/bind_helpers.h"
+#include "kudu/gutil/callback.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/master/master.proxy.h"
+#include "kudu/rpc/response_callback.h"
+#include "kudu/rpc/rpc.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/status_callback.h"
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+using master::MasterFeatures;
+using master::MasterServiceProxy;
+using rpc::BackoffType;
+using security::SignedTokenPB;
+
+namespace client {
+namespace internal {
+
+RetrieveAuthzTokenRpc::RetrieveAuthzTokenRpc(const KuduTable* table,
+                                             MonoTime deadline)
+    : AsyncLeaderMasterRpc(deadline, table->client(), BackoffType::LINEAR, req_, &resp_,
+                           &MasterServiceProxy::GetTableSchemaAsync, "RetrieveAuthzToken",
+                           Bind(&AuthzTokenCache::RetrievedNewAuthzTokenCb,
+                                Unretained(&table->client()->data_->authz_token_cache_),
+                                table->id()),
+                           { MasterFeatures::GENERATE_AUTHZ_TOKEN }),
+      table_(table) {
+  req_.mutable_table()->set_table_id(table_->id());
+}
+
+string RetrieveAuthzTokenRpc::ToString() const {
+  return Substitute("$0 { table: '$1' ($2), attempt: $3 }", AsyncLeaderMasterRpc::ToString(),
+      req_.table().table_name(), req_.table().table_id(), num_attempts());
+}
+
+void RetrieveAuthzTokenRpc::SendRpcCb(const Status& status) {
+  Status new_status = status;
+  // Check for generic master RPC errors.
+  if (RetryOrReconnectIfNecessary(&new_status)) {
+    return;
+  }
+  // Unwrap and return any other application errors that may be returned by the
+  // master service.
+  if (new_status.ok() && resp_.has_error()) {
+    new_status = StatusFromPB(resp_.error().status());
+  }
+  if (new_status.ok()) {
+    // Note: legacy masters will be caught by the required GENERATE_AUTHZ_TOKEN
+    // feature, and so we can only get here without an authz token if the
+    // master didn't return one, which is a programming error.
+    DCHECK(resp_.has_authz_token());
+    if (PREDICT_TRUE(resp_.has_authz_token())) {
+      client_->data_->authz_token_cache_.Put(table_->id(), resp_.authz_token());
+    }
+  }
+  user_cb_.Run(new_status);
+}
+
+void AuthzTokenCache::Put(const string& table_id, SignedTokenPB authz_token) {
+  VLOG(1) << Substitute("Putting new token for table $0 into the token cache", table_id);
+  std::lock_guard<simple_spinlock> l(token_lock_);
+  EmplaceOrUpdate(&authz_tokens_, table_id, std::move(authz_token));
+}
+
+bool AuthzTokenCache::Fetch(const string& table_id, SignedTokenPB* authz_token) {
+  DCHECK(authz_token);
+  std::lock_guard<simple_spinlock> l(token_lock_);
+  const auto* token = FindOrNull(authz_tokens_, table_id);
+  if (token) {
+    *authz_token = *token;
+    return true;
+  }
+  return false;
+}
+
+void AuthzTokenCache::RetrieveNewAuthzToken(const KuduTable* table,
+                                            StatusCallback callback,
+                                            MonoTime deadline) {
+  DCHECK(table);
+  DCHECK(deadline.Initialized());
+  const string& table_id = table->id();
+  std::unique_lock<simple_spinlock> l(rpc_lock_);
+  // If there already exists an RPC for this table; attach the callback.
+  auto* rpc_and_cbs = FindOrNull(authz_rpcs_, table_id);
+  if (rpc_and_cbs) {
+    DCHECK(!rpc_and_cbs->second.empty());
+    VLOG(2) << Substitute("Binding to in-flight RPC to retrieve authz token for $0", table_id);
+    rpc_and_cbs->second.emplace_back(std::move(callback));
+  } else {
+    // Otherwise, send out a new RPC.
+    VLOG(2) << Substitute("Sending new RPC to retrieve authz token for $0", table_id);
+    scoped_refptr<RetrieveAuthzTokenRpc> rpc(new RetrieveAuthzTokenRpc(table, deadline));
+    EmplaceOrDie(&authz_rpcs_, table_id,
+                 RpcAndCallbacks(rpc, { std::move(callback) }));
+    l.unlock();
+    rpc->SendRpc();
+  }
+}
+
+void AuthzTokenCache::RetrievedNewAuthzTokenCb(const string& table_id,
+                                               const Status& status) {
+  VLOG(1) << Substitute("Retrieved new authz token for table $0", table_id);
+  vector<StatusCallback> cbs;
+  {
+    // Erase the RPC from our in-flight map.
+    std::lock_guard<simple_spinlock> l(rpc_lock_);
+    auto rpc_and_cbs = EraseKeyReturnValuePtr(&authz_rpcs_, table_id);
+    cbs = std::move(rpc_and_cbs.second);
+  }
+  DCHECK(!cbs.empty());
+  for (const auto& cb : cbs) {
+    cb.Run(status);
+  }
+}
+
+} // namespace internal
+} // namespace client
+} // namespace kudu
diff --git a/src/kudu/client/authz_token_cache.h b/src/kudu/client/authz_token_cache.h
new file mode 100644
index 0000000..7de8fe3
--- /dev/null
+++ b/src/kudu/client/authz_token_cache.h
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// This module is internal to the client and not a public API.
+#pragma once
+
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "kudu/client/master_proxy_rpc.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/status_callback.h"
+
+namespace kudu {
+
+class MonoTime;
+class Status;
+
+namespace security {
+class SignedTokenPB;
+} // namespace security
+
+namespace client {
+
+class KuduTable;
+
+namespace internal {
+
+// An asynchronous RPC that retrieves a new authz token for a table and puts it
+// in a token cache.
+class RetrieveAuthzTokenRpc : public AsyncLeaderMasterRpc<master::GetTableSchemaRequestPB,
+                                                          master::GetTableSchemaResponsePB>,
+                              public RefCountedThreadSafe<RetrieveAuthzTokenRpc> {
+ public:
+  RetrieveAuthzTokenRpc(const KuduTable* table, MonoTime deadline);
+  std::string ToString() const override;
+
+ protected:
+  // Handles retries, reconnection, and such.
+  void SendRpcCb(const Status& status) override;
+
+ private:
+  // Encapsulates the client and table with which the RPC will operate.
+  const KuduTable* table_;
+
+  // Request for the authz token.
+  master::GetTableSchemaRequestPB req_;
+
+  // Response containing the authz token. This gets populated before calling
+  // SendRpcCb().
+  master::GetTableSchemaResponsePB resp_;
+};
+
+// Cache for authz tokens received from the master. A client will receive an
+// authz token upon opening the table and put it into the cache. A subsequent
+// operation that requires an authz token (e.g. writes, scans) will fetch it
+// from the cache and attach it to the operation request. If the tserver
+// responds with an error indicating that the client needs a new token,
+// 'RetrieveNewAuthzToken' can be used to do so.
+//
+// This class is thread-safe.
+class AuthzTokenCache {
+ public:
+  typedef std::pair<scoped_refptr<RetrieveAuthzTokenRpc>,
+                    std::vector<StatusCallback>> RpcAndCallbacks;
+  // Adds an authz token to the cache for 'table_id', replacing any that
+  // previously existed.
+  void Put(const std::string& table_id,
+           security::SignedTokenPB authz_token);
+
+  // Fetches an authz token from the cache for 'table_id', returning true if
+  // one exists and false otherwise.
+  //
+  // Since clients may not have the same time-keeping guarantees that servers
+  // do, nor do they have private keys with which to validate tokens, no
+  // checking is done to verify the expiration or validity of the returned
+  // token. Such validation is delegated to the tservers and returned to the
+  // client via error to retrieve new tokens as appropriate.
+  bool Fetch(const std::string& table_id, security::SignedTokenPB* authz_token);
+
+  // Runs 'callback' asynchronously after retrieving a new authz token for
+  // 'table's ID and putting it in the cache. This method handles retries,
+  // leader-finding, and concurrent RPCs for the same table.
+  //
+  // Callers should expect 'callback' to be run with Status::OK if a token was
+  // successfully retrieved from the master, and with an error otherwise.
+  void RetrieveNewAuthzToken(const KuduTable* table,
+                             StatusCallback callback,
+                             MonoTime deadline);
+ private:
+  friend class RetrieveAuthzTokenRpc;
+
+  // Callback to run upon receiving a response for a RetrieveNewAuthzTokenRpc.
+  // This will handle the 'status' and call the pending callbacks for the RPC
+  // as appropriate.
+  void RetrievedNewAuthzTokenCb(const std::string& table_id,
+                                const Status& status);
+
+  // Protects 'authz_tokens_'.
+  simple_spinlock token_lock_;
+
+  // Protects 'authz_rpcs_'.
+  simple_spinlock rpc_lock_;
+
+  // Authorization tokens stored for each table, indexed by the table ID. Note
+  // that these may be expired, and it is up to the users of the cache to
+  // refresh tokens upon learning of their expiration.
+  //
+  // Protected by 'token_lock_'.
+  std::unordered_map<std::string, security::SignedTokenPB> authz_tokens_;
+
+  // Map from a table ID to the in-flight RPC to retrieve an authz token for
+  // it and the callbacks to run upon receiving its response.
+  //
+  // Protected by 'rpc_lock_'.
+  std::unordered_map<std::string, RpcAndCallbacks> authz_rpcs_;
+};
+
+} // namespace internal
+} // namespace client
+} // namespace kudu
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index c213d00..716a1ec 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -59,6 +59,7 @@
 #include "kudu/rpc/rpc.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/security/token.pb.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/tserver/tserver_service.proxy.h"
 #include "kudu/util/logging.h"
@@ -91,9 +92,7 @@ using rpc::RequestTracker;
 using rpc::ResponseCallback;
 using rpc::RetriableRpc;
 using rpc::RetriableRpcStatus;
-using rpc::Rpc;
-using rpc::RpcController;
-using rpc::ServerPicker;
+using security::SignedTokenPB;
 using tserver::WriteRequestPB;
 using tserver::WriteResponsePB;
 using tserver::WriteResponsePB_PerRowErrorPB;
@@ -242,7 +241,24 @@ class WriteRpc : public RetriableRpc<RemoteTabletServer, WriteRequestPB, WriteRe
   void Finish(const Status& status) override;
   bool GetNewAuthnTokenAndRetry() override;
 
+  // Asynchonously attempts to retrieve a new authz token from the master, and
+  // runs GotNewAuthzTokenRetryCb on success.
+  bool GetNewAuthzTokenAndRetry() override;
+
+  // Callback to run after going through the steps to get a new authz token.
+  void GotNewAuthzTokenRetryCb(const Status& status) override;
+
  private:
+  // Fetches the appropriate authz token for this request from the client
+  // cache. Note that this doesn't get a new token from the master, but rather,
+  // it updates 'req_' with one from the cache in case the client has recently
+  // received one.
+  //
+  // If an appropriate authz token is not in the cache, e.g. because the client
+  // has been communicating with an older-versioned master that doesn't support
+  // authz tokens, this is a no-op.
+  void FetchCachedAuthzToken();
+
   // Pointer back to the batcher. Processes the write response when it
   // completes, regardless of success or failure.
   scoped_refptr<Batcher> batcher_;
@@ -290,6 +306,8 @@ WriteRpc::WriteRpc(const scoped_refptr<Batcher>& batcher,
   CHECK_OK(SchemaToPB(*schema, req_.mutable_schema(),
                       SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES | SCHEMA_PB_WITHOUT_IDS));
 
+  // Pick up the authz token for the table.
+  FetchCachedAuthzToken();
   RowOperationsPB* requested = req_.mutable_row_operations();
 
   // Add the rows
@@ -362,11 +380,18 @@ RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status) {
   // Check for specific RPC errors.
   if (result.status.IsRemoteError()) {
     const ErrorStatusPB* err = mutable_retrier()->controller().error_response();
-    if (err && err->has_code() &&
-        (err->code() == ErrorStatusPB::ERROR_SERVER_TOO_BUSY ||
-         err->code() == ErrorStatusPB::ERROR_UNAVAILABLE)) {
-      result.result = RetriableRpcStatus::SERVICE_UNAVAILABLE;
-      return result;
+    if (err && err->has_code()) {
+      switch (err->code()) {
+        case ErrorStatusPB::ERROR_SERVER_TOO_BUSY:
+        case ErrorStatusPB::ERROR_UNAVAILABLE:
+          result.result = RetriableRpcStatus::SERVICE_UNAVAILABLE;
+          return result;
+        case ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN:
+          result.result = RetriableRpcStatus::INVALID_AUTHORIZATION_TOKEN;
+          return result;
+        default:
+          break;
+      }
     }
   }
 
@@ -442,15 +467,47 @@ RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status) {
 }
 
 bool WriteRpc::GetNewAuthnTokenAndRetry() {
+  // Since we know we may retry, clear the existing response.
+  resp_.Clear();
   // To get a new authn token it's necessary to authenticate with the master
   // using any other credentials but already existing authn token.
   KuduClient* c = batcher_->client_;
+  VLOG(1) << "Retrieving new authn token from master";
   c->data_->ConnectToClusterAsync(c, retrier().deadline(),
-      Bind(&RetriableRpc::GetNewAuthnTokenAndRetryCb, Unretained(this)),
+      Bind(&WriteRpc::GotNewAuthnTokenRetryCb, Unretained(this)),
       CredentialsPolicy::PRIMARY_CREDENTIALS);
   return true;
 }
 
+bool WriteRpc::GetNewAuthzTokenAndRetry() {
+  // Since we know we may retry, clear the existing response.
+  resp_.Clear();
+  KuduClient* c = batcher_->client_;
+  VLOG(1) << "Retrieving new authz token from master";
+  c->data_->RetrieveAuthzTokenAsync(table(),
+      Bind(&WriteRpc::GotNewAuthzTokenRetryCb, Unretained(this)),
+      retrier().deadline());
+  return true;
+}
+
+void WriteRpc::FetchCachedAuthzToken() {
+  SignedTokenPB signed_token;
+  if (batcher_->client_->data_->FetchCachedAuthzToken(table()->id(), &signed_token)) {
+    *req_.mutable_authz_token() = std::move(signed_token);
+  } else {
+    // Note: this is the expected path if communicating with an older-versioned
+    // master that does not support authz tokens.
+    VLOG(1) << "no authz token for table " << table()->id();
+  }
+}
+
+void WriteRpc::GotNewAuthzTokenRetryCb(const Status& status) {
+  if (status.ok()) {
+    FetchCachedAuthzToken();
+  }
+  RetriableRpc::GotNewAuthzTokenRetryCb(status);
+}
+
 Batcher::Batcher(KuduClient* client,
                  scoped_refptr<ErrorCollector> error_collector,
                  sp::weak_ptr<KuduSession> session,
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index fda6642..cde72f3 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -26,6 +26,7 @@
 #include <mutex>
 #include <ostream>
 #include <string>
+#include <unordered_map>
 #include <utility>
 #include <vector>
 
@@ -33,6 +34,7 @@
 #include <boost/function.hpp>
 #include <glog/logging.h>
 
+#include "kudu/client/authz_token_cache.h"
 #include "kudu/client/master_proxy_rpc.h"
 #include "kudu/client/master_rpc.h"
 #include "kudu/client/meta_cache.h"
@@ -74,6 +76,10 @@ using std::vector;
 
 namespace kudu {
 
+namespace security {
+class SignedTokenPB;
+} // namespace security
+
 using master::AlterTableRequestPB;
 using master::AlterTableResponsePB;
 using master::CreateTableRequestPB;
@@ -91,6 +97,7 @@ using master::MasterServiceProxy;
 using master::TableIdentifierPB;
 using rpc::BackoffType;
 using rpc::CredentialsPolicy;
+using security::SignedTokenPB;
 using strings::Substitute;
 
 namespace client {
@@ -146,8 +153,7 @@ Status RetryFunc(const MonoTime& deadline,
 
 KuduClient::Data::Data()
     : hive_metastore_sasl_enabled_(false),
-      latest_observed_timestamp_(KuduClient::kNoTimestamp) {
-}
+      latest_observed_timestamp_(KuduClient::kNoTimestamp) {}
 
 KuduClient::Data::~Data() {
   // Workaround for KUDU-956: the user may close a KuduClient while a flush
@@ -427,6 +433,15 @@ bool KuduClient::Data::IsTabletServerLocal(const RemoteTabletServer& rts) const
   return false;
 }
 
+void KuduClient::Data::StoreAuthzToken(const string& table_id,
+                                       const SignedTokenPB& token) {
+  authz_token_cache_.Put(table_id, token);
+}
+
+bool KuduClient::Data::FetchCachedAuthzToken(const string& table_id, SignedTokenPB* token) {
+  return authz_token_cache_.Fetch(table_id, token);
+}
+
 Status KuduClient::Data::GetTableSchema(KuduClient* client,
                                         const string& table_name,
                                         const MonoTime& deadline,
@@ -466,6 +481,11 @@ Status KuduClient::Data::GetTableSchema(KuduClient* client,
   if (num_replicas) {
     *num_replicas = resp.num_replicas();
   }
+  // Cache the authz token if the response came with one. It might not have one
+  // if running against an older master that does not support authz tokens.
+  if (resp.has_authz_token()) {
+    StoreAuthzToken(resp.table_id(), resp.authz_token());
+  }
   return Status::OK();
 }
 
@@ -646,6 +666,20 @@ void KuduClient::Data::ConnectToClusterAsync(KuduClient* client,
   }
 }
 
+Status KuduClient::Data::RetrieveAuthzToken(const KuduTable* table,
+                                            const MonoTime& deadline) {
+  Synchronizer sync;
+  RetrieveAuthzTokenAsync(table, sync.AsStatusCallback(), deadline);
+  return sync.Wait();
+}
+
+void KuduClient::Data::RetrieveAuthzTokenAsync(const KuduTable* table,
+                                               const StatusCallback& cb,
+                                               const MonoTime& deadline) {
+  DCHECK(deadline.Initialized());
+  authz_token_cache_.RetrieveNewAuthzToken(table, cb, deadline);
+}
+
 HostPort KuduClient::Data::leader_master_hostport() const {
   std::lock_guard<simple_spinlock> l(leader_master_lock_);
   return leader_master_hostport_;
diff --git a/src/kudu/client/client-internal.h b/src/kudu/client/client-internal.h
index 85cbafc..1f492e2 100644
--- a/src/kudu/client/client-internal.h
+++ b/src/kudu/client/client-internal.h
@@ -23,17 +23,22 @@
 #include <memory>
 #include <set>
 #include <string>
+#include <unordered_map>
 #include <unordered_set>
 #include <utility>
 #include <vector>
 
+#include "kudu/client/authz_token_cache.h"
 #include "kudu/client/client.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/master/master.pb.h"
 #include "kudu/rpc/response_callback.h"
+#include "kudu/rpc/rpc.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/user_credentials.h"
+#include "kudu/security/token.pb.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/monotime.h"
@@ -58,6 +63,7 @@ class AlterTableResponsePB;
 class ConnectToMasterResponsePB;
 class CreateTableRequestPB;
 class CreateTableResponsePB;
+class GetTableSchemaResponsePB;
 class MasterServiceProxy;
 class TableIdentifierPB;
 } // namespace master
@@ -72,6 +78,7 @@ namespace client {
 class KuduSchema;
 
 namespace internal {
+class AuthzTokenCache;
 class ConnectToClusterRpc;
 class MetaCache;
 class RemoteTablet;
@@ -188,6 +195,27 @@ class KuduClient::Data {
       const MonoTime& deadline,
       rpc::CredentialsPolicy creds_policy = rpc::CredentialsPolicy::ANY_CREDENTIALS);
 
+  // Asynchronously fetches an authz token from the master for the given table.
+  //
+  // Invokes 'cb' with the appropriate status when finished.
+  void RetrieveAuthzTokenAsync(const KuduTable* table,
+                               const StatusCallback& cb,
+                               const MonoTime& deadline);
+
+  // Synchronous version of RetrieveAuthzTokenAsync.
+  //
+  // NOTE: since this uses a Synchronizer, this may not be invoked by a method
+  // that's on a reactor thread.
+  Status RetrieveAuthzToken(const KuduTable* table, const MonoTime& deadline);
+
+  // Fetches the cached authz token for the given table ID, returning false if
+  // no such token exists.
+  bool FetchCachedAuthzToken(const std::string& table_id, security::SignedTokenPB* token);
+
+  // Stores the token into the cache, associating it with the given table ID.
+  void StoreAuthzToken(const std::string& table_id,
+                       const security::SignedTokenPB& token);
+
   std::shared_ptr<master::MasterServiceProxy> master_proxy() const;
 
   HostPort leader_master_hostport() const;
@@ -218,6 +246,11 @@ class KuduClient::Data {
   gscoped_ptr<DnsResolver> dns_resolver_;
   scoped_refptr<internal::MetaCache> meta_cache_;
 
+  // Authorization tokens stored for each table, indexed by table ID. Note that
+  // these may be expired, and it is up to the user of a token to refresh it
+  // upon learning of its expiration.
+  internal::AuthzTokenCache authz_token_cache_;
+
   // Set of hostnames and IPs on the local host.
   // This is initialized at client startup.
   std::unordered_set<std::string> local_host_names_;
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index d86a76a..1e3682b 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -38,6 +38,7 @@
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <glog/stl_logging.h>
+#include <google/protobuf/util/message_differencer.h>
 #include <gtest/gtest.h>
 
 #include "kudu/client/callbacks.h"
@@ -84,6 +85,7 @@
 #include "kudu/rpc/service_pool.h"
 #include "kudu/security/tls_context.h"
 #include "kudu/security/token.pb.h"
+#include "kudu/security/token_signer.h"
 #include "kudu/server/rpc_server.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_metadata.h"
@@ -95,11 +97,13 @@
 #include "kudu/tserver/ts_tablet_manager.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/async_util.h"
+#include "kudu/util/barrier.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/locks.h"  // IWYU pragma: keep
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/sockaddr.h"
+#include "kudu/util/pb_util.h"
 #include "kudu/util/semaphore.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
@@ -138,19 +142,22 @@ DEFINE_int32(test_scan_num_rows, 1000, "Number of rows to insert and scan");
 METRIC_DECLARE_counter(block_manager_total_bytes_read);
 METRIC_DECLARE_counter(rpcs_queue_overflow);
 METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetMasterRegistration);
-METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTableLocations);
 METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTabletLocations);
+METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTableLocations);
+METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTableSchema);
 METRIC_DECLARE_histogram(handler_latency_kudu_tserver_TabletServerService_Scan);
 
 using base::subtle::Atomic32;
 using base::subtle::NoBarrier_AtomicIncrement;
 using base::subtle::NoBarrier_Load;
 using base::subtle::NoBarrier_Store;
+using google::protobuf::util::MessageDifferencer;
 using kudu::cluster::InternalMiniCluster;
 using kudu::cluster::InternalMiniClusterOptions;
 using kudu::master::CatalogManager;
 using kudu::master::GetTableLocationsRequestPB;
 using kudu::master::GetTableLocationsResponsePB;
+using kudu::security::SignedTokenPB;
 using kudu::client::sp::shared_ptr;
 using kudu::tablet::TabletReplica;
 using kudu::tserver::MiniTabletServer;
@@ -5796,6 +5803,78 @@ TEST_F(ClientTest, TestBlockScannerHijackingAttempts) {
   }
 }
 
+// Basic functionality test for the client's authz token cache.
+TEST_F(ClientTest, TestCacheAuthzTokens) {
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+  const string& table_id = client_table_->id();
+  KuduClient::Data* data = client_->data_;
+  // The client should have already gotten a token when it opened the table.
+  SignedTokenPB first_token;
+  ASSERT_TRUE(data->FetchCachedAuthzToken(table_id, &first_token));
+
+  // Retrieving a token from the master should overwrite what's in the cache.
+  // Wait some time to ensure the new token will be different than the one
+  // already in the cache (different expiration).
+  SleepFor(MonoDelta::FromSeconds(3));
+  SignedTokenPB new_token;
+  ASSERT_OK(data->RetrieveAuthzToken(client_table_.get(), MonoTime::Now() + kTimeout));
+  ASSERT_TRUE(data->FetchCachedAuthzToken(table_id, &new_token));
+  ASSERT_FALSE(MessageDifferencer::Equals(first_token, new_token));
+
+  // Now store the token directly into the cache of a new client.
+  shared_ptr<KuduClient> new_client;
+  ASSERT_OK(cluster_->CreateClient(nullptr, &new_client));
+  KuduClient::Data* new_data = new_client->data_;
+  SignedTokenPB cached_token;
+  ASSERT_FALSE(new_data->FetchCachedAuthzToken(table_id, &cached_token));
+  new_data->StoreAuthzToken(table_id, first_token);
+
+  // Check that we actually stored the token.
+  ASSERT_TRUE(new_data->FetchCachedAuthzToken(table_id, &cached_token));
+  ASSERT_TRUE(MessageDifferencer::Equals(first_token, cached_token));
+
+  // Storing tokens directly also overwrites existing ones.
+  new_data->StoreAuthzToken(table_id, new_token);
+  ASSERT_TRUE(new_data->FetchCachedAuthzToken(table_id, &cached_token));
+  ASSERT_TRUE(MessageDifferencer::Equals(new_token, cached_token));
+
+  // Sanity check that the operations on this new client didn't affect the
+  // tokens of the old client.
+  ASSERT_TRUE(data->FetchCachedAuthzToken(table_id, &cached_token));
+  ASSERT_TRUE(MessageDifferencer::Equals(new_token, cached_token));
+}
+
+// Test to ensure that we don't send calls to retrieve authz tokens when one is
+// already in-flight for the same table ID.
+TEST_F(ClientTest, TestRetrieveAuthzTokenInParallel) {
+  const int kThreads = 20;
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+  vector<Synchronizer> syncs(kThreads);
+  vector<thread> threads;
+  Barrier barrier(kThreads);
+  for (auto& s : syncs) {
+    threads.emplace_back([&] {
+      barrier.Wait();
+      client_->data_->RetrieveAuthzTokenAsync(client_table_.get(), s.AsStatusCallback(),
+                                              MonoTime::Now() + kTimeout);
+    });
+  }
+  for (int i = 0 ; i < kThreads; i++) {
+    syncs[i].Wait();
+    threads[i].join();
+  }
+  SignedTokenPB token;
+  ASSERT_TRUE(client_->data_->FetchCachedAuthzToken(client_table_->id(), &token));
+  // The authz token retrieval requests shouldn't send one request per table;
+  // rather they should group together.
+  auto ent = cluster_->mini_master()->master()->metric_entity();
+  int num_reqs = METRIC_handler_latency_kudu_master_MasterService_GetTableSchema
+      .Instantiate(ent)->TotalCount();
+  LOG(INFO) << Substitute("$0 concurrent threads sent $1 RPC(s) to get authz tokens",
+                          kThreads, num_reqs);
+  ASSERT_LT(num_reqs, kThreads);
+}
+
 // Client test that assigns locations to clients and tablet servers.
 // For now, assigns a uniform location to all clients and tablet servers.
 class ClientWithLocationTest : public ClientTest {
@@ -5816,5 +5895,6 @@ TEST_F(ClientTest, TestClientLocationNoLocationMappingCmd) {
 TEST_F(ClientWithLocationTest, TestClientLocation) {
   ASSERT_EQ("/foo", client_->location());
 }
+
 } // namespace client
 } // namespace kudu
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 2e2136e..780967c 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -49,6 +49,7 @@
 
 namespace kudu {
 
+class AuthzTokenTest;
 class ClientStressTest_TestUniqueClientIds_Test;
 class KuduPartialRow;
 class MonoDelta;
@@ -95,6 +96,7 @@ class MetaCache;
 class RemoteTablet;
 class RemoteTabletServer;
 class ReplicaController;
+class RetrieveAuthzTokenRpc;
 class WriteRpc;
 } // namespace internal
 
@@ -622,17 +624,21 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
   friend class internal::MetaCache;
   friend class internal::RemoteTablet;
   friend class internal::RemoteTabletServer;
+  friend class internal::RetrieveAuthzTokenRpc;
   friend class internal::WriteRpc;
+  friend class kudu::AuthzTokenTest;
   friend class kudu::SecurityUnknownTskTest;
   friend class tools::LeaderMasterProxy;
 
   FRIEND_TEST(kudu::ClientStressTest, TestUniqueClientIds);
+  FRIEND_TEST(ClientTest, TestCacheAuthzTokens);
   FRIEND_TEST(ClientTest, TestGetSecurityInfoFromMaster);
   FRIEND_TEST(ClientTest, TestGetTabletServerBlacklist);
   FRIEND_TEST(ClientTest, TestMasterDown);
   FRIEND_TEST(ClientTest, TestMasterLookupPermits);
   FRIEND_TEST(ClientTest, TestMetaCacheExpiry);
   FRIEND_TEST(ClientTest, TestNonCoveringRangePartitions);
+  FRIEND_TEST(ClientTest, TestRetrieveAuthzTokenInParallel);
   FRIEND_TEST(ClientTest, TestReplicatedTabletWritesWithLeaderElection);
   FRIEND_TEST(ClientTest, TestScanFaultTolerance);
   FRIEND_TEST(ClientTest, TestScanTimeout);
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index 28a1d1e..57fa87b 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -43,6 +43,7 @@
 #include "kudu/rpc/rpc.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/security/token.pb.h"
 #include "kudu/tserver/tserver_service.proxy.h"
 #include "kudu/util/async_util.h"
 #include "kudu/util/bitmap.h"
@@ -62,6 +63,7 @@ namespace kudu {
 using rpc::ComputeExponentialBackoff;
 using rpc::CredentialsPolicy;
 using rpc::RpcController;
+using security::SignedTokenPB;
 using strings::Substitute;
 using tserver::NewScanRequestPB;
 using tserver::TabletServerFeatures;
@@ -122,6 +124,7 @@ Status KuduScanner::Data::HandleError(const ScanRpcStatus& err,
   bool can_retry = true;
   bool backoff = false;
   bool reacquire_authn_token = false;
+  bool reacquire_authz_token = false;
   switch (err.result) {
     case ScanRpcStatus::SERVICE_UNAVAILABLE:
       backoff = true;
@@ -139,10 +142,15 @@ Status KuduScanner::Data::HandleError(const ScanRpcStatus& err,
       }
       break;
     case ScanRpcStatus::RPC_INVALID_AUTHENTICATION_TOKEN:
-      // Usually this happens if doing an RPC call with an expired auth token.
+      // Usually this happens if doing an RPC call with an expired authn token.
       // Retrying with a new authn token should help.
       reacquire_authn_token = true;
       break;
+    case ScanRpcStatus::RPC_INVALID_AUTHORIZATION_TOKEN:
+      // Usually this happens if doing an RPC call with an expired authz token.
+      // Retrying with a new authz token should help.
+      reacquire_authz_token = true;
+      break;
     case ScanRpcStatus::TABLET_NOT_RUNNING:
       blacklist_location = true;
       break;
@@ -183,6 +191,21 @@ Status KuduScanner::Data::HandleError(const ScanRpcStatus& err,
     }
   }
 
+  if (reacquire_authz_token) {
+    KuduClient* c = table_->client();
+    const Status& s = c->data_->RetrieveAuthzToken(table_.get(), deadline);
+    if (s.IsNotSupported()) {
+      return EnrichStatusMessage(s.CloneAndPrepend(
+          "Tried to reacquire authz token but operation not supported"));
+    }
+    if (!s.ok()) {
+      KLOG_EVERY_N_SECS(WARNING, 1)
+          << Substitute("Couldn't get authz token for table $0: ",
+                        table_->name()) << s.ToString();
+      backoff = true;
+    }
+  }
+
   if (backoff) {
     MonoDelta sleep = ComputeExponentialBackoff(scan_attempts_);
     MonoTime now = MonoTime::Now() + sleep;
@@ -247,6 +270,9 @@ ScanRpcStatus KuduScanner::Data::AnalyzeResponse(const Status& rpc_status,
         case rpc::ErrorStatusPB::FATAL_UNAUTHORIZED:
           return ScanRpcStatus{
               ScanRpcStatus::SCAN_NOT_AUTHORIZED, rpc_status};
+        case rpc::ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN:
+          return ScanRpcStatus{
+              ScanRpcStatus::RPC_INVALID_AUTHORIZATION_TOKEN, rpc_status};
         default:
           return ScanRpcStatus{ScanRpcStatus::RPC_ERROR, rpc_status};
       }
@@ -326,6 +352,18 @@ ScanRpcStatus KuduScanner::Data::SendScanRpc(const MonoTime& overall_deadline,
   if (configuration().row_format_flags() & KuduScanner::PAD_UNIXTIME_MICROS_TO_16_BYTES) {
     controller_.RequireServerFeature(TabletServerFeatures::PAD_UNIXTIME_MICROS_TO_16_BYTES);
   }
+  if (next_req_.has_new_scan_request()) {
+    // Only new scan requests require authz tokens. Scan continuations rely on
+    // Kudu's prevention of scanner hijacking by different users.
+    SignedTokenPB authz_token;
+    if (table_->client()->data_->FetchCachedAuthzToken(table_->id(), &authz_token)) {
+      *next_req_.mutable_new_scan_request()->mutable_authz_token() = std::move(authz_token);
+    } else {
+      // Note: this is expected if attempting to connect to a cluster that does
+      // not support fine-grained access control.
+      VLOG(1) << "no authz token for table " << table_->id();
+    }
+  }
   ScanRpcStatus scan_status = AnalyzeResponse(
       proxy_->Scan(next_req_,
                    &last_response_,
diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h
index 7b233bf..194d504 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -85,10 +85,14 @@ struct ScanRpcStatus {
     // on other hosts.
     RPC_DEADLINE_EXCEEDED,
 
-    // The authentication token supplied by the client is invalid. Most likely,
-    // the token has expired.
+    // The authentication token supplied by the client is invalid. The token
+    // has likely expired.
     RPC_INVALID_AUTHENTICATION_TOKEN,
 
+    // The authorization token supplied by the client is invalid. The token has
+    // likely expired.
+    RPC_INVALID_AUTHORIZATION_TOKEN,
+
     // The requestor was not authorized to make the request.
     SCAN_NOT_AUTHORIZED,
 
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index 2de1a9f..63452cc 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -57,7 +57,8 @@ ADD_KUDU_TEST(all_types-itest
   NUM_SHARDS 8)
 ADD_KUDU_TEST(alter_table-randomized-test NUM_SHARDS 2 PROCESSORS 4)
 ADD_KUDU_TEST(alter_table-test PROCESSORS 3)
-ADD_KUDU_TEST(authn_token_expire-itest)
+ADD_KUDU_TEST(auth_token_expire-itest)
+ADD_KUDU_TEST(authz_token-itest PROCESSORS 2)
 ADD_KUDU_TEST(catalog_manager_tsk-itest PROCESSORS 2)
 ADD_KUDU_TEST(client_failover-itest)
 ADD_KUDU_TEST(client-negotiation-failover-itest)
diff --git a/src/kudu/integration-tests/authn_token_expire-itest.cc b/src/kudu/integration-tests/auth_token_expire-itest.cc
similarity index 79%
rename from src/kudu/integration-tests/authn_token_expire-itest.cc
rename to src/kudu/integration-tests/auth_token_expire-itest.cc
index fdbf5cd..6d8f64a 100644
--- a/src/kudu/integration-tests/authn_token_expire-itest.cc
+++ b/src/kudu/integration-tests/auth_token_expire-itest.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <algorithm>
 #include <cstdint>
 #include <memory>
 #include <ostream>
@@ -98,12 +99,14 @@ Status InsertTestRows(KuduClient* client, KuduTable* table,
 
 } // anonymous namespace
 
-class AuthnTokenExpireITestBase : public KuduTest {
+class AuthTokenExpireITestBase : public KuduTest {
  public:
-  AuthnTokenExpireITestBase(int64_t token_validity_seconds,
-                            int num_masters,
-                            int num_tablet_servers)
-      : token_validity_seconds_(token_validity_seconds),
+  AuthTokenExpireITestBase(int64_t authn_token_validity_seconds,
+                           int64_t authz_token_validity_seconds,
+                           int num_masters,
+                           int num_tablet_servers)
+      : authn_token_validity_seconds_(authn_token_validity_seconds),
+        authz_token_validity_seconds_(authz_token_validity_seconds),
         num_masters_(num_masters),
         num_tablet_servers_(num_tablet_servers),
         schema_(KuduSchema::FromSchema(CreateKeyValueTestSchema())) {
@@ -125,7 +128,8 @@ class AuthnTokenExpireITestBase : public KuduTest {
   }
 
  protected:
-  const int64_t token_validity_seconds_;
+  const int64_t authn_token_validity_seconds_;
+  const int64_t authz_token_validity_seconds_;
   const int num_masters_;
   const int num_tablet_servers_;
   KuduSchema schema_;
@@ -134,12 +138,14 @@ class AuthnTokenExpireITestBase : public KuduTest {
 };
 
 
-class AuthnTokenExpireITest : public AuthnTokenExpireITestBase {
+class AuthTokenExpireITest : public AuthTokenExpireITestBase {
  public:
-  explicit AuthnTokenExpireITest(int64_t token_validity_seconds = 2)
-      : AuthnTokenExpireITestBase(token_validity_seconds,
-                                  /*num_masters=*/ 1,
-                                  /*num_tablet_servers=*/ 3) {
+  explicit AuthTokenExpireITest(int64_t authn_token_validity_seconds = 2,
+                                int64_t authz_token_validity_seconds = 2)
+      : AuthTokenExpireITestBase(authn_token_validity_seconds,
+                                 authz_token_validity_seconds,
+                                 /*num_masters=*/ 1,
+                                 /*num_tablet_servers=*/ 3) {
     // Masters and tservers inject FATAL_INVALID_AUTHENTICATION_TOKEN errors.
     // The client should retry the operation again and eventually it should
     // succeed even with the high ratio of injected errors.
@@ -151,20 +157,27 @@ class AuthnTokenExpireITest : public AuthnTokenExpireITestBase {
       // as well (i.e. possible ERROR_UNAVAILABLE errors from tservers) upon
       // a new authn token re-acquisitions and retried RPCs.
       "--tsk_rotation_seconds=1",
-      Substitute("--authn_token_validity_seconds=$0", token_validity_seconds_),
-      Substitute("--authz_token_validity_seconds=$0", token_validity_seconds_),
+      Substitute("--authn_token_validity_seconds=$0", authn_token_validity_seconds_),
+      Substitute("--authz_token_validity_seconds=$0", authz_token_validity_seconds_),
     };
 
     cluster_opts_.extra_tserver_flags = {
       "--rpc_inject_invalid_authn_token_ratio=0.5",
 
+      // Tservers inject ERROR_INVALID_AUTHORIZATION_TOKEN errors, which will
+      // lead the client to retry the operation with after fetching a new authz
+      // token from the master.
+      "--tserver_inject_invalid_authz_token_ratio=0.5",
+
+      "--tserver_enforce_access_control=true",
+
       // Decreasing TS->master heartbeat interval speeds up the test.
       "--heartbeat_interval_ms=10",
     };
   }
 
   void SetUp() override {
-    AuthnTokenExpireITestBase::SetUp();
+    AuthTokenExpireITestBase::SetUp();
     ASSERT_OK(cluster_->Start());
   }
 };
@@ -172,7 +185,7 @@ class AuthnTokenExpireITest : public AuthnTokenExpireITestBase {
 
 // Make sure authn token is re-acquired on certain scenarios upon restarting
 // tablet servers.
-TEST_F(AuthnTokenExpireITest, RestartTabletServers) {
+TEST_F(AuthTokenExpireITest, RestartTabletServers) {
   const string table_name = "authn-token-expire-restart-tablet-servers";
 
   // Create and open one table, keeping it open over the component restarts.
@@ -191,11 +204,11 @@ TEST_F(AuthnTokenExpireITest, RestartTabletServers) {
     server->Shutdown();
     ASSERT_OK(server->Restart());
   }
-  SleepFor(MonoDelta::FromSeconds(token_validity_seconds_ + 1));
+  SleepFor(MonoDelta::FromSeconds(authn_token_validity_seconds_ + 1));
 
   ASSERT_OK(InsertTestRows(client.get(), table.get(),
                            num_tablet_servers_, num_tablet_servers_ * 1));
-  SleepFor(MonoDelta::FromSeconds(token_validity_seconds_ + 1));
+  SleepFor(MonoDelta::FromSeconds(authn_token_validity_seconds_ + 1));
   // Make sure to insert a row into all tablets to make an RPC call to every
   // tablet server hosting the table.
   ASSERT_OK(InsertTestRows(client.get(), table.get(),
@@ -204,7 +217,7 @@ TEST_F(AuthnTokenExpireITest, RestartTabletServers) {
 
 // Make sure authn token is re-acquired on certain scenarios upon restarting
 // both masters and tablet servers.
-TEST_F(AuthnTokenExpireITest, RestartCluster) {
+TEST_F(AuthTokenExpireITest, RestartCluster) {
   const string table_name = "authn-token-expire-restart-cluster";
 
   shared_ptr<KuduClient> client;
@@ -218,21 +231,31 @@ TEST_F(AuthnTokenExpireITest, RestartCluster) {
   // Restart all Kudu server-side components: masters and tablet servers.
   cluster_->Shutdown();
   ASSERT_OK(cluster_->Restart());
-  SleepFor(MonoDelta::FromSeconds(token_validity_seconds_ + 1));
+  SleepFor(MonoDelta::FromSeconds(authn_token_validity_seconds_ + 1));
 
   ASSERT_OK(InsertTestRows(client.get(), table.get(),
                            num_tablet_servers_, num_tablet_servers_ * 1));
-  SleepFor(MonoDelta::FromSeconds(token_validity_seconds_ + 1));
+  SleepFor(MonoDelta::FromSeconds(authn_token_validity_seconds_ + 1));
   // Make sure to insert a row into all tablets to make an RPC call to every
   // tablet server hosting the table.
   ASSERT_OK(InsertTestRows(client.get(), table.get(),
                            num_tablet_servers_, num_tablet_servers_ * 2));
 }
 
-class AuthnTokenExpireDuringWorkloadITest : public AuthnTokenExpireITest {
+struct AuthTokenParams {
+  int64_t authn_validity_secs;
+  int64_t authz_validity_secs;
+};
+
+constexpr AuthTokenParams kEvenValidity = { 5, 5 };
+constexpr AuthTokenParams kLongerAuthn = { 5, 3 };
+constexpr AuthTokenParams kLongerAuthz = { 3, 5 };
+
+class AuthTokenExpireDuringWorkloadITest : public AuthTokenExpireITest,
+                                           public ::testing::WithParamInterface<AuthTokenParams> {
  public:
-  AuthnTokenExpireDuringWorkloadITest()
-      : AuthnTokenExpireITest(5) {
+  AuthTokenExpireDuringWorkloadITest()
+      : AuthTokenExpireITest(GetParam().authn_validity_secs, GetParam().authz_validity_secs) {
     // Close an already established idle connection to the server and open
     // a new one upon making another call to the same server. This is to force
     // authn token verification at every RPC.
@@ -240,16 +263,22 @@ class AuthnTokenExpireDuringWorkloadITest : public AuthnTokenExpireITest {
   }
 
   void SetUp() override {
-    AuthnTokenExpireITestBase::SetUp();
+    AuthTokenExpireITestBase::SetUp();
     // Do not start the cluster as a part of setup phase. Don't waste time on
     // on that because the scenario contains a test which is marked slow and
     // will be skipped if KUDU_ALLOW_SLOW_TESTS environment variable is not set.
   }
+  const int64_t max_token_validity = std::max(GetParam().authn_validity_secs,
+                                              GetParam().authz_validity_secs);
 };
 
-// Run a mixed write/read test workload and check that client retries the
-// FATAL_INVALID_AUTH_TOKEN error, eventually succeeding with every issued RPC.
-TEST_F(AuthnTokenExpireDuringWorkloadITest, InvalidTokenDuringMixedWorkload) {
+INSTANTIATE_TEST_CASE_P(ValidityIntervals, AuthTokenExpireDuringWorkloadITest,
+    ::testing::Values(kEvenValidity, kLongerAuthn, kLongerAuthz));
+
+// Run a mixed write/read test workload and check that client retries upon
+// receiving the appropriate invalid token error, eventually succeeding with
+// every issued RPC.
+TEST_P(AuthTokenExpireDuringWorkloadITest, InvalidTokenDuringMixedWorkload) {
   static const int32_t kTimeoutMs = 10 * 60 * 1000;
 
   if (!AllowSlowTests()) {
@@ -272,7 +301,7 @@ TEST_F(AuthnTokenExpireDuringWorkloadITest, InvalidTokenDuringMixedWorkload) {
 
   w.Setup();
   w.Start();
-  SleepFor(MonoDelta::FromSeconds(8 * token_validity_seconds_));
+  SleepFor(MonoDelta::FromSeconds(8 * max_token_validity));
   w.StopAndJoin();
 
   ClusterVerifier v(cluster_.get());
@@ -284,13 +313,13 @@ TEST_F(AuthnTokenExpireDuringWorkloadITest, InvalidTokenDuringMixedWorkload) {
   NO_FATALS(cluster_->AssertNoCrashes());
 }
 
-// Run write-only and scan-only workloads and check that the client retries
-// the FATAL_INVALID_AUTH_TOKEN error, eventually succeeding with its RPCs.
-// There is also a test for the mixed workload (see above), but we are looking
-// at the implementation as a black box: it's impossible to guarantee that the
-// read paths are not affected by the write paths since the mixed workload uses
-// the same shared client instance for both the read and the write paths.
-TEST_F(AuthnTokenExpireDuringWorkloadITest, InvalidTokenDuringSeparateWorkloads) {
+// Run write-only and scan-only workloads and check that the client retries the
+// appropriate invalid token error, eventually succeeding with its RPCs. There
+// is also a test for the mixed workload (see above), but we are looking at the
+// implementation as a black box: it's impossible to guarantee that the read
+// paths are not affected by the write paths since the mixed workload uses the
+// same shared client instance for both the read and the write paths.
+TEST_P(AuthTokenExpireDuringWorkloadITest, InvalidTokenDuringSeparateWorkloads) {
   const string table_name = "authn-token-expire-separate-workloads";
   static const int32_t kTimeoutMs = 10 * 60 * 1000;
 
@@ -320,7 +349,7 @@ TEST_F(AuthnTokenExpireDuringWorkloadITest, InvalidTokenDuringSeparateWorkloads)
   w.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS);
   w.Setup();
   w.Start();
-  SleepFor(MonoDelta::FromSeconds(3 * token_validity_seconds_));
+  SleepFor(MonoDelta::FromSeconds(3 * max_token_validity));
   w.StopAndJoin();
 
   NO_FATALS(cluster_->AssertNoCrashes());
@@ -338,7 +367,7 @@ TEST_F(AuthnTokenExpireDuringWorkloadITest, InvalidTokenDuringSeparateWorkloads)
   r.set_num_write_threads(0);
   r.Setup();
   r.Start();
-  SleepFor(MonoDelta::FromSeconds(3 * token_validity_seconds_));
+  SleepFor(MonoDelta::FromSeconds(3 * max_token_validity));
   r.StopAndJoin();
 
   ClusterVerifier v(cluster_.get());
@@ -352,16 +381,20 @@ TEST_F(AuthnTokenExpireDuringWorkloadITest, InvalidTokenDuringSeparateWorkloads)
 // Scenarios to verify that the client automatically re-acquires authn token
 // when receiving ERROR_INVALID_AUTHENTICATION_TOKEN from the servers in case
 // if the client has established a token-based connection to masters.
-class TokenBasedConnectionITest : public AuthnTokenExpireITestBase {
+// Note: this test doesn't rely on authz tokens, but the TSK validity period is
+// determined based all token validity intervals, so for simplicity, set the
+// authz validity interval to be the same.
+class TokenBasedConnectionITest : public AuthTokenExpireITestBase {
  public:
   TokenBasedConnectionITest()
-      : AuthnTokenExpireITestBase(
-          /*token_validity_seconds=*/ 2,
+      : AuthTokenExpireITestBase(
+          /*authn_token_validity_seconds=*/ 2,
+          /*authz_token_validity_seconds=*/ 2,
           /*num_masters=*/ 1,
           /*num_tablet_servers=*/ 3) {
     cluster_opts_.extra_master_flags = {
-      Substitute("--authn_token_validity_seconds=$0", token_validity_seconds_),
-      Substitute("--authz_token_validity_seconds=$0", token_validity_seconds_),
+      Substitute("--authn_token_validity_seconds=$0", authn_token_validity_seconds_),
+      Substitute("--authz_token_validity_seconds=$0", authz_token_validity_seconds_),
     };
 
     cluster_opts_.extra_tserver_flags = {
@@ -371,7 +404,7 @@ class TokenBasedConnectionITest : public AuthnTokenExpireITestBase {
   }
 
   void SetUp() override {
-    AuthnTokenExpireITestBase::SetUp();
+    AuthTokenExpireITestBase::SetUp();
     ASSERT_OK(cluster_->Start());
   }
 };
@@ -405,7 +438,7 @@ TEST_F(TokenBasedConnectionITest, ReacquireAuthnToken) {
   ASSERT_OK(client->OpenTable(table_name, &table));
 
   // Let the authn token to expire.
-  SleepFor(MonoDelta::FromSeconds(token_validity_seconds_ + 1));
+  SleepFor(MonoDelta::FromSeconds(authn_token_validity_seconds_ + 1));
 
   // Here a new authn token should be automatically acquired upon receiving
   // FATAL_INVALID_AUTHENTICATION_TOKEN error. To get a new token it's necessary
@@ -418,11 +451,12 @@ TEST_F(TokenBasedConnectionITest, ReacquireAuthnToken) {
 // Test for scenarios involving multiple masters where
 // client-to-non-leader-master connections are closed due to inactivity,
 // but the connection to the former leader master is kept open.
-class MultiMasterIdleConnectionsITest : public AuthnTokenExpireITestBase {
+class MultiMasterIdleConnectionsITest : public AuthTokenExpireITestBase {
  public:
   MultiMasterIdleConnectionsITest()
-      : AuthnTokenExpireITestBase(
-          /*token_validity_seconds=*/ 3,
+      : AuthTokenExpireITestBase(
+          /*authn_token_validity_seconds=*/ 3,
+          /*authz_token_validity_seconds=*/ 3,
           /*num_masters=*/ 3,
           /*num_tablet_servers=*/ 3) {
 
@@ -430,8 +464,8 @@ class MultiMasterIdleConnectionsITest : public AuthnTokenExpireITestBase {
       // Custom validity interval for authn tokens. The scenario involves
       // expiration of authn tokens, while the default authn expiration timeout
       // is 7 days. So, let's make the token validity interval really short.
-      Substitute("--authn_token_validity_seconds=$0", token_validity_seconds_),
-      Substitute("--authz_token_validity_seconds=$0", token_validity_seconds_),
+      Substitute("--authn_token_validity_seconds=$0", authn_token_validity_seconds_),
+      Substitute("--authz_token_validity_seconds=$0", authz_token_validity_seconds_),
 
       // The default for leader_failure_max_missed_heartbeat_periods 3.0, but
       // 2.0 is enough to have master leadership stable enough and makes it
@@ -461,12 +495,12 @@ class MultiMasterIdleConnectionsITest : public AuthnTokenExpireITestBase {
   }
 
   void SetUp() override {
-    AuthnTokenExpireITestBase::SetUp();
+    AuthTokenExpireITestBase::SetUp();
     ASSERT_OK(cluster_->Start());
   }
 
  protected:
-  const int master_rpc_keepalive_time_ms_ = 3 * token_validity_seconds_ * 1000 / 2;
+  const int master_rpc_keepalive_time_ms_ = 3 * authn_token_validity_seconds_ * 1000 / 2;
   const int master_raft_hb_interval_ms_ = 250;
   const double master_leader_failure_max_missed_heartbeat_periods_ = 2.0;
 };
@@ -509,7 +543,7 @@ TEST_F(MultiMasterIdleConnectionsITest, ClientReacquiresAuthnToken) {
   //   2) connections to non-leader masters close
 
   const auto time_right_before_token_expiration = time_start +
-      MonoDelta::FromSeconds(token_validity_seconds_);
+      MonoDelta::FromSeconds(authn_token_validity_seconds_);
   while (MonoTime::Now() < time_right_before_token_expiration) {
     // Keep the connection to leader master open, time to time making requests
     // that go to the leader master, but not to other masters in the cluster.
@@ -526,11 +560,11 @@ TEST_F(MultiMasterIdleConnectionsITest, ClientReacquiresAuthnToken) {
   }
 
   // Given the relation between the master_rpc_keepalive_time_ms_ and
-  // token_validity_seconds_ parameters, the original authn token should expire
-  // and connections to follower masters should be torn down due to inactivity,
-  // but the connection to the leader master should be kept open after waiting
-  // for additional token expiration interval.
-  SleepFor(MonoDelta::FromSeconds(token_validity_seconds_));
+  // authn_token_validity_seconds_ parameters, the original authn token should
+  // expire and connections to follower masters should be torn down due to
+  // inactivity, but the connection to the leader master should be kept open
+  // after waiting for additional token expiration interval.
+  SleepFor(MonoDelta::FromSeconds(authn_token_validity_seconds_));
 
   {
     int former_leader_master_idx;
diff --git a/src/kudu/integration-tests/authz_token-itest.cc b/src/kudu/integration-tests/authz_token-itest.cc
new file mode 100644
index 0000000..327df76
--- /dev/null
+++ b/src/kudu/integration-tests/authz_token-itest.cc
@@ -0,0 +1,546 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <google/protobuf/util/message_differencer.h>
+#include <gtest/gtest.h>
+
+#include "kudu/client/client-internal.h"
+#include "kudu/client/client-test-util.h"
+#include "kudu/client/client.h"
+#include "kudu/client/client.pb.h"
+#include "kudu/client/schema.h"
+#include "kudu/client/shared_ptr.h"
+#include "kudu/client/write_op.h"
+#include "kudu/common/partial_row.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/master/catalog_manager.h"
+#include "kudu/master/master.h"
+#include "kudu/master/mini_master.h"
+#include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/security/token_signer.h"
+#include "kudu/tablet/key_value_test_schema.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using google::protobuf::util::MessageDifferencer;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+
+DECLARE_bool(master_support_authz_tokens);
+DECLARE_bool(tserver_enforce_access_control);
+DECLARE_bool(raft_enable_pre_election);
+DECLARE_double(leader_failure_max_missed_heartbeat_periods);
+DECLARE_double(tserver_inject_invalid_authz_token_ratio);
+DECLARE_int32(consensus_inject_latency_ms_in_notifications);
+DECLARE_int32(raft_heartbeat_interval_ms);
+DECLARE_int64(authz_token_validity_seconds);
+
+METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTableSchema);
+
+namespace kudu {
+
+class RWMutex;
+
+using cluster::InternalMiniCluster;
+using cluster::InternalMiniClusterOptions;
+using client::AuthenticationCredentialsPB;
+using client::sp::shared_ptr;
+using client::KuduClient;
+using client::KuduClientBuilder;
+using client::KuduError;
+using client::KuduInsert;
+using client::KuduScanner;
+using client::KuduSchema;
+using client::KuduSession;
+using client::KuduTable;
+using client::KuduTableCreator;
+using security::DataFormat;
+using security::PrivateKey;
+using security::SignedTokenPB;
+using security::TablePrivilegePB;
+using security::TokenSigner;
+using security::TokenSigningPrivateKeyPB;
+using strings::Substitute;
+
+namespace {
+
+// Relatively low timeout used so we don't have to wait too long for an
+// "invalid token" error.
+const int kRpcTimeoutSecs = 3;
+const int kOperationTimeoutSecs = kRpcTimeoutSecs * 3;
+
+// Inserts a single row to the given key-value table for the given key.
+Status InsertKeyToTable(KuduTable* table, KuduSession* session, int key) {
+  RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+  unique_ptr<KuduInsert> insert(table->NewInsert());
+  KuduPartialRow* row = insert->mutable_row();
+  RETURN_NOT_OK(row->SetInt32(0, key));
+  RETURN_NOT_OK(row->SetInt32(1, key));
+  return session->Apply(insert.release());
+}
+
+vector<Status> GetSessionErrors(KuduSession* session) {
+  vector<KuduError*> errors;
+  session->GetPendingErrors(&errors, nullptr);
+  vector<Status> ret(errors.size());
+  for (int i = 0; i < errors.size(); i++) {
+    ret[i] = errors[i]->status();
+  }
+  ElementDeleter deleter(&errors);
+  return ret;
+}
+
+// Scans values from the given table.
+Status ScanFromTable(KuduTable* table) {
+  KuduScanner scanner(table);
+  scanner.SetTimeoutMillis(kOperationTimeoutSecs * 1000);
+  vector<string> rows;
+  return ScanToStrings(&scanner, &rows);
+}
+
+} // anonymous namespace
+
+class AuthzTokenTest : public KuduTest {
+ public:
+  AuthzTokenTest()
+      : schema_(KuduSchema::FromSchema(CreateKeyValueTestSchema())) {}
+  const char* const kTableName = "test-table";
+  const char* const kUser = "token-user";
+  const char* const kBadUser = "bad-token-user";
+
+  // Helper to get the authz token for 'table_id' from the client's cache.
+  static bool FetchCachedAuthzToken(
+      KuduClient* client, const string& table_id, SignedTokenPB* token) {
+    return client->data_->FetchCachedAuthzToken(table_id, token);
+  }
+  // Helper to store the authz token for 'table_id' to the client's cache.
+  static void StoreAuthzToken(KuduClient* client,
+                              const string& table_id,
+                              const SignedTokenPB& token) {
+    client->data_->StoreAuthzToken(table_id, token);
+  }
+
+  void SetUp() override {
+    KuduTest::SetUp();
+    FLAGS_tserver_enforce_access_control = true;
+    FLAGS_authz_token_validity_seconds = 1;
+
+    // Create a table with a basic schema.
+    cluster_.reset(new InternalMiniCluster(env_, InternalMiniClusterOptions()));
+    ASSERT_OK(cluster_->Start());
+    ASSERT_OK(SetupClientAndTable());
+  }
+
+  // Sets up the client_ and client_table_ members.
+  Status SetupClientAndTable() {
+    RETURN_NOT_OK(CreateClientForUser(kUser, &client_));
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+    RETURN_NOT_OK(table_creator->table_name(kTableName)
+                               .schema(&schema_)
+                               .num_replicas(1)
+                               .set_range_partition_columns({ "key" })
+                               .Create());
+    RETURN_NOT_OK(client_->OpenTable(kTableName, &client_table_));
+    table_id_ = client_table_->id();
+    return Status::OK();
+  }
+
+  // Inserts the next appropriate row to the table.
+  Status InsertToTable(KuduTable* table) {
+    shared_ptr<KuduSession> session(table->client()->NewSession());
+    RETURN_NOT_OK(InsertKeyToTable(table, session.get(), next_row_key_++));
+    return session->Flush();
+  }
+
+  // Creates a client for the given user.
+  Status CreateClientForUser(const string& user, shared_ptr<KuduClient>* client) const {
+    // Many tests will expect operations to fail, so let's get there quicker by
+    // setting a low timeout.
+    KuduClientBuilder client_builder;
+    client_builder.default_rpc_timeout(MonoDelta::FromSeconds(kRpcTimeoutSecs));
+    string authn_creds;
+    AuthenticationCredentialsPB authn_pb;
+    authn_pb.set_real_user(user);
+    CHECK(authn_pb.SerializeToString(&authn_creds));
+    client_builder.import_authentication_credentials(std::move(authn_creds));
+    return cluster_->CreateClient(&client_builder, client);
+  }
+
+  // Gets the current number of GetTableSchema requests the master has serviced.
+  // This increments whenever a client opens a table or gets a new authz token.
+  uint64_t NumGetTableSchemaRequests() const {
+    const auto& ent = cluster_->mini_master()->master()->metric_entity();
+    return METRIC_handler_latency_kudu_master_MasterService_GetTableSchema
+        .Instantiate(ent)->TotalCount();
+  }
+
+  // Inserts the next row into the table, expecting an error. Returns the
+  // session error, rather than the usual coarse-grained IOError.
+  Status InsertToTableSessionError(KuduTable* table) {
+    KuduClient* client = table->client();
+    shared_ptr<KuduSession> session = client->NewSession();
+    RETURN_NOT_OK(InsertKeyToTable(table, session.get(), next_row_key_++));
+    Status s = session->Flush();
+    if (!s.IsIOError()) {
+      return s;
+    }
+    vector<Status> errors = GetSessionErrors(session.get());
+    if (errors.size() != 1) {
+      return Status::RuntimeError(Substitute("expected 1 error, got $0",
+                                  errors.size()));
+    }
+    return errors[0];
+  }
+
+ protected:
+  const KuduSchema schema_;
+  unique_ptr<InternalMiniCluster> cluster_;
+
+  // Client authenticated as the default user.
+  shared_ptr<KuduClient> client_;
+
+  // Table created with 'client_'.
+  shared_ptr<KuduTable> client_table_;
+  string table_id_;
+
+  // The next row key to insert.
+  int next_row_key_ = 0;
+};
+
+namespace {
+
+// Functors with which the client can send requests.
+Status InsertRequestor(AuthzTokenTest* test, KuduTable* table) {
+  return test->InsertToTable(table);
+}
+Status ScanRequestor(AuthzTokenTest* /*test*/, KuduTable* table) {
+  return ScanFromTable(table);
+}
+
+} // anonymous namespace
+
+// Tests parameterized on different data operations.
+typedef std::function<Status(AuthzTokenTest*, KuduTable*)> RequestorFunc;
+class ReacquireAuthzTokenTest : public AuthzTokenTest,
+                                public ::testing::WithParamInterface<RequestorFunc> {};
+
+// Test scenarios that lead the client to retrieve a new token.
+TEST_P(ReacquireAuthzTokenTest, TestInvalidAuthzTokens) {
+  auto client_func = GetParam();
+  // First, let's do a sanity check that initial authz tokens allow the user to
+  // perform all actions.
+  SignedTokenPB first_token;
+  ASSERT_TRUE(FetchCachedAuthzToken(client_.get(), table_id_, &first_token));
+  ASSERT_OK(client_func(this, client_table_.get()));
+
+  // The above operations shouldn't have required getting a new token.
+  SignedTokenPB same_token;
+  ASSERT_TRUE(FetchCachedAuthzToken(client_.get(), table_id_, &same_token));
+  ASSERT_TRUE(MessageDifferencer::Equals(first_token, same_token));
+
+  shared_ptr<KuduClient> bad_client;
+  ASSERT_OK(CreateClientForUser(kBadUser, &bad_client));
+  shared_ptr<KuduTable> bad_table;
+  ASSERT_OK(bad_client->OpenTable(kTableName, &bad_table));
+
+  LOG(INFO) << "Trying to use the wrong user's token...";
+  SignedTokenPB bad_token;
+  ASSERT_TRUE(FetchCachedAuthzToken(client_.get(), table_id_, &bad_token));
+  StoreAuthzToken(bad_client.get(), table_id_, bad_token);
+
+  // The bad client should succeed after being told go retrieve a new token for
+  // the correct user. Check that it received a different token.
+  ASSERT_OK(client_func(this, bad_table.get()));
+  SignedTokenPB new_token;
+  ASSERT_TRUE(FetchCachedAuthzToken(bad_client.get(), table_id_, &new_token));
+  ASSERT_FALSE(MessageDifferencer::Equals(bad_token, new_token));
+
+  // Replace the token used by the client with one that is malformed by
+  // messing with the token data. The server should respond such that the
+  // client circled back to the master and got a new token.
+  LOG(INFO) << "Trying to use a bad signature...";
+  string bad_signature = std::move(*new_token.mutable_signature());
+  // Flip the bits of the signature.
+  for (int i = 0; i < bad_signature.length(); i++) {
+    auto& byte = bad_signature[i];
+    byte = ~byte;
+  }
+  bad_token = std::move(new_token);
+  bad_token.set_token_data(std::move(bad_signature));
+  StoreAuthzToken(bad_client.get(), table_id_, bad_token);
+  ASSERT_OK(client_func(this, bad_table.get()));
+
+  // The client should have received a new token.
+  ASSERT_TRUE(FetchCachedAuthzToken(bad_client.get(), table_id_, &new_token));
+  ASSERT_FALSE(MessageDifferencer::Equals(bad_token, new_token));
+}
+
+TEST_P(ReacquireAuthzTokenTest, TestExpiredAuthzTokens) {
+  // We sleep for a bit to allow the expiration of tokens.
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  auto client_func = GetParam();
+
+  // Ensure that expired authz tokens will lead the client to retry with a new
+  // token upon writing/scanning.
+  uint64_t initial_reqs = NumGetTableSchemaRequests();
+  SleepFor(MonoDelta::FromSeconds(FLAGS_authz_token_validity_seconds + 1));
+  ASSERT_OK(client_func(this, client_table_.get()));
+  ASSERT_GT(NumGetTableSchemaRequests(), initial_reqs);
+}
+
+INSTANTIATE_TEST_CASE_P(RequestorFuncs, ReacquireAuthzTokenTest,
+    ::testing::ValuesIn(vector<RequestorFunc>({ &InsertRequestor, &ScanRequestor })));
+
+// Test to ensure tokens with no privileges will disallow operations.
+TEST_F(AuthzTokenTest, TestUnprivilegedAuthzTokens) {
+  // Replace the token used by the client with one that has no permissions.
+  // Since the token is well-formed, but does not have the sufficient
+  // privileges to perform the actions, the client going back to the master
+  // for a new token will not work, and the user will see an error.
+  LOG(INFO) << "Trying to use an unprivileged token...";
+  SignedTokenPB unprivileged_token;
+  TablePrivilegePB no_privilege;
+  no_privilege.set_table_id(table_id_);
+  ASSERT_OK(cluster_->mini_master()->master()->token_signer()->GenerateAuthzToken(
+      kUser, std::move(no_privilege), &unprivileged_token));
+  StoreAuthzToken(client_.get(), table_id_, unprivileged_token);
+
+  shared_ptr<KuduSession> bad_session(client_->NewSession());
+  ASSERT_OK(InsertKeyToTable(client_table_.get(), bad_session.get(), next_row_key_++));
+
+  // Write sessions will accumulate a bunch of non-authorized errors, veiling
+  // them in an IOError.
+  Status s = bad_session->Flush();
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+  vector<Status> errors = GetSessionErrors(bad_session.get());
+  for (const auto& e : errors) {
+    ASSERT_TRUE(e.IsRemoteError()) << e.ToString();
+    ASSERT_STR_CONTAINS(e.ToString(), "Not authorized");
+  }
+
+  // Scans will return a remote error with an appropriate message.
+  s = ScanFromTable(client_table_.get());
+  ASSERT_TRUE(s.IsRemoteError()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Not authorized");
+}
+
+// Test that ensures we retry when we send a token signed by a TSK that hasn't
+// percolated to the tservers. In such cases, the tablet server should respond
+// with an ERROR_UNAVAILABLE error and the request should be retried.
+TEST_F(AuthzTokenTest, TestUnknownTsk) {
+  // Create a TSK with a high enough sequence number that it will be unknown to
+  // the server.
+  TokenSigningPrivateKeyPB tsk;
+  PrivateKey private_key;
+  ASSERT_OK(GeneratePrivateKey(/*num_bits=*/512, &private_key));
+  string private_key_str_der;
+  ASSERT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER));
+  tsk.set_rsa_key_der(private_key_str_der);
+  tsk.set_key_seq_num(100);
+  tsk.set_expire_unix_epoch_seconds(WallTime_Now() + 3600);
+
+  TablePrivilegePB privilege;
+  privilege.set_table_id(table_id_);
+  privilege.set_scan_privilege(true);
+  privilege.set_insert_privilege(true);
+
+  // Create a token signer to use our surprise TSK. The intervals don't matter.
+  TokenSigner signer(100, 100, 100);
+  ASSERT_OK(signer.ImportKeys({ tsk }));
+  SignedTokenPB token;
+  ASSERT_OK(signer.GenerateAuthzToken(kUser, std::move(privilege), &token));
+  StoreAuthzToken(client_.get(), table_id_, token);
+
+  // The operations will see ERROR_UNAVAILABLE and keep retrying, hoping for
+  // the TSK to make its way to the server.
+  Status s = ScanFromTable(client_table_.get());
+  LOG(INFO) << s.ToString();
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+  s = InsertToTableSessionError(client_table_.get());
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+
+  // Only after we import the TSK do operations succeed.
+  ASSERT_OK(cluster_->mini_master()->master()->token_signer()->ImportKeys({ tsk }));
+  ASSERT_OK(ScanFromTable(client_table_.get()));
+  ASSERT_OK(InsertToTable(client_table_.get()));
+}
+
+// Test what happens when the single-master deployment responds with a
+// retriable error when getting a new authz token.
+TEST_F(AuthzTokenTest, TestSingleMasterUnavailable) {
+  // We sleep in this test to ensure our scan has time to retry.
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  // Warm the client's meta cache so it doesn't need to go to the master for
+  // the location of the tserver.
+  ASSERT_OK(InsertToTable(client_table_.get()));
+
+  // Set up the client such that its first operation will require it to go back
+  // to the master (in this case, by giving it a token for the wrong user).
+  shared_ptr<KuduClient> bad_client;
+  ASSERT_OK(CreateClientForUser("bad-token-user", &bad_client));
+  shared_ptr<KuduTable> bad_table;
+  ASSERT_OK(bad_client->OpenTable(kTableName, &bad_table));
+  SignedTokenPB bad_token;
+  ASSERT_TRUE(FetchCachedAuthzToken(bad_client.get(), table_id_, &bad_token));
+  StoreAuthzToken(client_.get(), table_id_, bad_token);
+
+  // Take the leader lock on the master, which will prevent successful attempts
+  // to get a new token, but will allow retries.
+  std::unique_lock<RWMutex> leader_lock(
+      cluster_->mini_master()->master()->catalog_manager()->leader_lock_);
+
+  // After a while, the client operation will time out.
+  Status s = ScanFromTable(client_table_.get());
+  LOG(INFO) << s.ToString();
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+
+  // If we let go of the leader lock while the operation is still in flight,
+  // the operation should succeed. At this point, the client still shouldn't
+  // have an authz token.
+  thread scanner([&] {
+    s = ScanFromTable(client_table_.get());
+  });
+
+  // Wait for a full RPC timeout to make the scan retry once more before
+  // letting go of the leader lock.
+  SleepFor(MonoDelta::FromSeconds(kRpcTimeoutSecs + 1));
+  leader_lock.unlock();
+  scanner.join();
+  ASSERT_OK(s);
+}
+
+// Test with utilities to prevent the master(s) from providing authz tokens.
+// The test is also configured such that the masters will frequently change
+// leadership.
+class BadMultiMasterAuthzTokenTest : public AuthzTokenTest {
+ public:
+  void SetUp() override {
+    KuduTest::SetUp();
+    // We're going to make elections more frequent, so set some non-runtime
+    // flags up front. The values for these and the below flags are chosen to
+    // not be flaky, even running with stress in TSAN mode.
+    FLAGS_leader_failure_max_missed_heartbeat_periods = 1;
+    FLAGS_raft_heartbeat_interval_ms = 200;
+
+    InternalMiniClusterOptions opts;
+    opts.num_masters = 3;
+    cluster_.reset(new InternalMiniCluster(env_, std::move(opts)));
+    ASSERT_OK(cluster_->Start());
+    ASSERT_OK(SetupClientAndTable());
+
+    // Enforce access control, and set the rest of the election flags.
+    FLAGS_tserver_enforce_access_control = true;
+    FLAGS_raft_enable_pre_election = false;
+    FLAGS_consensus_inject_latency_ms_in_notifications = FLAGS_raft_heartbeat_interval_ms / 3;
+  }
+};
+
+// Test what happens when the multimaster deployment undergoes frequent leader
+// changes. Tokens should still be issued and failures to get a token should be
+// retried.
+TEST_F(BadMultiMasterAuthzTokenTest, TestMasterElectionStorms) {
+  // Set up the tablet servers such that they'll force the client to go back to
+  // the master for a new token.
+  FLAGS_tserver_inject_invalid_authz_token_ratio = 1.0;
+
+  // Despite the master leader elections, new tokens should be receieved.
+  // After a while, operations should time out because the authz tokens are all
+  // invalid. The scanner will enrich the returned status with the last error
+  // received from the tablet server.
+  Status s = ScanFromTable(client_table_.get());
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Not authorized");
+
+  // Do the same for inserts.
+  shared_ptr<KuduSession> session = client_->NewSession();
+  ASSERT_OK(InsertKeyToTable(client_table_.get(), session.get(), next_row_key_++));
+  s = session->Flush();
+  vector<Status> errors = GetSessionErrors(session.get());
+  for (const auto& e : errors) {
+    ASSERT_TRUE(e.IsTimedOut()) << e.ToString();
+    // TODO(awong): refactor WriteRpc so it spits out the tserver error that
+    // caused it to attempt getting another token.
+    ASSERT_STR_CONTAINS(e.ToString(), "RetrieveAuthzToken timed out");
+  }
+
+  // Now ease up the error injection on the tserver to ensure the tokens we get
+  // are useable.
+  FLAGS_tserver_inject_invalid_authz_token_ratio = 0.5;
+  ASSERT_OK(ScanFromTable(client_table_.get()));
+  ASSERT_OK(InsertToTable(client_table_.get()));
+}
+
+
+// Test in which the master does not support creating authz tokens.
+class LegacyMasterAuthzTokenTest : public AuthzTokenTest {
+ public:
+  void SetUp() override {
+    KuduTest::SetUp();
+    FLAGS_master_support_authz_tokens = false;
+    FLAGS_tserver_enforce_access_control = false;
+    cluster_.reset(new InternalMiniCluster(env_, InternalMiniClusterOptions()));
+    ASSERT_OK(cluster_->Start());
+    ASSERT_OK(SetupClientAndTable());
+  }
+};
+
+// Ensures the client can still communicate with servers that do not support
+// authz tokens.
+TEST_F(LegacyMasterAuthzTokenTest, TestAuthzTokensNotSupported) {
+  // Client should have no problems connecting to an old cluster.
+  ASSERT_OK(InsertToTable(client_table_.get()));
+  ASSERT_OK(ScanFromTable(client_table_.get()));
+
+  // In the unexpected case that the tservers enforce access control but we
+  // have an old master, a scan will fail upon being asked to reacquire an
+  // authz token, learning it is not supported.
+  FLAGS_tserver_enforce_access_control = true;
+  Status s = ScanFromTable(client_table_.get());
+  ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "does not support RetrieveAuthzToken");
+
+  // The same will happen for writes.
+  s = InsertToTableSessionError(client_table_.get());
+  ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "does not support RetrieveAuthzToken");
+}
+
+} // namespace kudu
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 733b73c..c9ee5b7 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -53,6 +53,7 @@
 
 namespace kudu {
 
+class AuthzTokenTest_TestSingleMasterUnavailable_Test;;
 class CreateTableStressTest_TestConcurrentCreateTableAndReloadMetadata_Test;
 class MonitoredTask;
 class NodeInstancePB;
@@ -694,6 +695,7 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   // These tests call ElectedAsLeaderCb() directly.
   FRIEND_TEST(MasterTest, TestShutdownDuringTableVisit);
   FRIEND_TEST(MasterTest, TestGetTableLocationsDuringRepeatedTableVisit);
+  FRIEND_TEST(kudu::AuthzTokenTest, TestSingleMasterUnavailable);
 
   // This test calls VisitTablesAndTablets() directly.
   FRIEND_TEST(kudu::CreateTableStressTest, TestConcurrentCreateTableAndReloadMetadata);
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index 54dac9d..24db50d 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -102,6 +102,7 @@ using std::vector;
 using strings::Substitute;
 
 DECLARE_bool(catalog_manager_check_ts_count_for_create_table);
+DECLARE_bool(master_support_authz_tokens);
 DECLARE_bool(raft_prepare_replacement_before_eviction);
 DECLARE_double(sys_catalog_fail_during_write);
 DECLARE_int32(diagnostics_log_stack_traces_interval_ms);
@@ -1734,5 +1735,40 @@ TEST_F(MasterTest, TestTableIdentifierWithIdAndName) {
   }
 }
 
+class AuthzTokenMasterTest : public MasterTest,
+                             public ::testing::WithParamInterface<bool> {};
+
+// Some basic verifications that we get authz tokens when we expect.
+TEST_P(AuthzTokenMasterTest, TestGenerateAuthzTokens) {
+  bool supports_authz = GetParam();
+  FLAGS_master_support_authz_tokens = supports_authz;
+  const char* kTableName = "testtb";
+  const Schema kTableSchema({ ColumnSchema("key", INT32) }, 1);
+  const auto send_req = [&] (GetTableSchemaResponsePB* resp) -> Status {
+    RpcController rpc;
+    GetTableSchemaRequestPB req;
+    req.mutable_table()->set_table_name(kTableName);
+    return proxy_->GetTableSchema(req, resp, &rpc);
+  };
+  // Send a request for which there is no table. Whether or not authz tokens are
+  // supported, the response should have an error.
+  {
+    GetTableSchemaResponsePB resp;
+    ASSERT_OK(send_req(&resp));
+    ASSERT_TRUE(resp.has_error());
+    const Status s = StatusFromPB(resp.error().status());
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  }
+  // Now create the table and check that we only get tokens when we expect.
+  ASSERT_OK(CreateTable(kTableName, kTableSchema));
+  {
+    GetTableSchemaResponsePB resp;
+    ASSERT_OK(send_req(&resp));
+    ASSERT_EQ(supports_authz, resp.has_authz_token());
+  }
+}
+
+INSTANTIATE_TEST_CASE_P(SupportsAuthzTokens, AuthzTokenMasterTest, ::testing::Bool());
+
 } // namespace master
 } // namespace kudu
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 583644c..5c38173 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -633,13 +633,18 @@ message GetTableSchemaResponsePB {
 
   // The table name.
   optional string table_name = 7;
+
+  // A token that the client can use to authorize further access to the table.
+  // A token can always be expected with this response, unless the response is
+  // from an older version of Kudu or unless the request resulted in an error.
+  optional security.SignedTokenPB authz_token = 8;
 }
 
 message ConnectToMasterRequestPB {
 }
 
 message ConnectToMasterResponsePB {
-  // Set if there an error.
+  // Set if there is an error.
   optional MasterErrorPB error = 1;
 
   // The current role of the master.
@@ -783,6 +788,8 @@ enum MasterFeatures {
   // EVICT_FIRST (a.k.a. 3-2-3) and the PREPARE_REPLACEMENT_BEFORE_EVICTION
   // (a.k.a. 3-4-3) schemes.
   REPLICA_MANAGEMENT = 4;
+  // The master supports generating and dispensing authz tokens.
+  GENERATE_AUTHZ_TOKEN = 5;
 }
 
 service MasterService {
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index 4a29a61..375b7d0 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -86,11 +86,17 @@ TAG_FLAG(master_client_location_assignment_enabled, hidden);
 TAG_FLAG(master_client_location_assignment_enabled, runtime);
 TAG_FLAG(master_client_location_assignment_enabled, unsafe);
 
+DEFINE_bool(master_support_authz_tokens, true,
+            "Whether the master supports generating authz tokens. Used for "
+            "testing version compatibility in the client.");
+TAG_FLAG(master_support_authz_tokens, hidden);
+
 using google::protobuf::Message;
 using kudu::consensus::ReplicaManagementInfoPB;
 using kudu::pb_util::SecureDebugString;
 using kudu::pb_util::SecureShortDebugString;
 using kudu::security::SignedTokenPB;
+using kudu::security::TablePrivilegePB;
 using kudu::server::ServerBase;
 using std::shared_ptr;
 using std::string;
@@ -416,6 +422,31 @@ void MasterServiceImpl::GetTableSchema(const GetTableSchemaRequestPB* req,
 
   Status s = server_->catalog_manager()->GetTableSchema(req, resp);
   CheckRespErrorOrSetUnknown(s, resp);
+  if (resp->has_error()) {
+    // If there was an application error, respond to the RPC.
+    rpc->RespondSuccess();
+    return;
+  }
+
+  // TODO(awong): fill this token in with actual privileges from the
+  // appropriate AuthzProvider. For now, assume the user has all privileges
+  // for the table.
+  if (PREDICT_TRUE(FLAGS_master_support_authz_tokens)) {
+    SignedTokenPB authz_token;
+    TablePrivilegePB table_privilege;
+    table_privilege.set_table_id(resp->table_id());
+    table_privilege.set_scan_privilege(true);
+    table_privilege.set_insert_privilege(true);
+    table_privilege.set_update_privilege(true);
+    table_privilege.set_delete_privilege(true);
+    s = server_->token_signer()->GenerateAuthzToken(rpc->remote_user().username(),
+                                                    std::move(table_privilege), &authz_token);
+    if (!s.ok()) {
+      rpc->RespondFailure(s);
+      return;
+    }
+    *resp->mutable_authz_token() = std::move(authz_token);
+  }
   rpc->RespondSuccess();
 }
 
@@ -579,6 +610,8 @@ bool MasterServiceImpl::SupportsFeature(uint32_t feature) const {
     case MasterFeatures::ADD_DROP_RANGE_PARTITIONS: FALLTHROUGH_INTENDED;
     case MasterFeatures::REPLICA_MANAGEMENT:
       return true;
+    case MasterFeatures::GENERATE_AUTHZ_TOKEN:
+      return FLAGS_master_support_authz_tokens;
     case MasterFeatures::CONNECT_TO_MASTER:
       return FLAGS_master_support_connect_to_master_rpc;
     default:
diff --git a/src/kudu/rpc/retriable_rpc.h b/src/kudu/rpc/retriable_rpc.h
index 3344c9a..9a0b8f5 100644
--- a/src/kudu/rpc/retriable_rpc.h
+++ b/src/kudu/rpc/retriable_rpc.h
@@ -72,11 +72,6 @@ class RetriableRpc : public Rpc {
   // Try() to actually send the request.
   void SendRpc() override;
 
-  // The callback to call upon retrieving (of failing to retrieve) a new authn
-  // token. This is the callback that subclasses should call in their custom
-  // implementation of the GetNewAuthnTokenAndRetry() method.
-  void GetNewAuthnTokenAndRetryCb(const Status& status);
-
  protected:
   // Subclasses implement this method to actually try the RPC.
   // The server been looked up and is ready to be used.
@@ -91,15 +86,30 @@ class RetriableRpc : public Rpc {
   // After this is called the RPC will be no longer retried.
   virtual void Finish(const Status& status) = 0;
 
-  // Returns 'true' if the RPC is to scheduled for retry with a new authn token,
-  // 'false' otherwise. For RPCs performed in the context of providing token
-  // for authentication it's necessary to implement this method. The default
-  // implementation returns 'false' meaning the calls returning
+  // Returns 'true' if the RPC is scheduled for retry with a new authn
+  // token, 'false' otherwise. For RPCs performed in the context of providing
+  // token for authentication it's necessary to implement this method. The
+  // default implementation returns 'false' meaning the calls returning
   // INVALID_AUTHENTICATION_TOKEN RPC status are not retried.
   virtual bool GetNewAuthnTokenAndRetry() {
     return false;
   }
 
+  // Similar to GetNewAuthnTokenAndRetry() but applied for authz tokens. The
+  // default implementation returns 'false', meaning the calls returning
+  // INVALID_AUTHORIZATION_TOKEN RPC status are not retried.
+  virtual bool GetNewAuthzTokenAndRetry() {
+    return false;
+  }
+
+  // The callback to call upon retrieving (or failing to retrieve) a new authn
+  // token. This is the callback that subclasses should call in their custom
+  // implementation of the GetNewAuthnTokenAndRetry() method.
+  virtual void GotNewAuthnTokenRetryCb(const Status& status);
+
+  // Like GotNewAuthnTokenRetryCb() but for authz tokens.
+  virtual void GotNewAuthzTokenRetryCb(const Status& status);
+
   // Request body.
   RequestPB req_;
 
@@ -109,6 +119,10 @@ class RetriableRpc : public Rpc {
  private:
   friend class CalculatorServiceRpc;
 
+  // The callback to call upon retrieving (or failing to retrieve) a new token.
+  // 'token_type' is used for logging.
+  void GotNewTokenRetryCb(const Status& status, const char* token_type);
+
   // Decides whether to retry the RPC, based on the result of AnalyzeResponse()
   // and retries if that is the case.
   // Returns true if the RPC was retried or false otherwise.
@@ -151,20 +165,36 @@ void RetriableRpc<Server, RequestPB, ResponsePB>::SendRpc()  {
 }
 
 template <class Server, class RequestPB, class ResponsePB>
-void RetriableRpc<Server, RequestPB, ResponsePB>::GetNewAuthnTokenAndRetryCb(
-    const Status& status) {
+void RetriableRpc<Server, RequestPB, ResponsePB>::GotNewTokenRetryCb(
+    const Status& status, const char* token_type) {
   if (status.ok()) {
-    // Perform the RPC call with the newly fetched authn token.
+    // Perform the RPC call with the newly-fetched token.
     mutable_retrier()->mutable_controller()->Reset();
     SendRpc();
+  } else if (status.IsNotSupported()) {
+    // If the token retrieval isn't supported by the cluster, don't retry.
+    FinishInternal();
+    Finish(status);
   } else {
     // Back to the retry sequence, hoping for better conditions after some time.
-    VLOG(1) << "Failed to get new authn token: " << status.ToString();
+    VLOG(1) << strings::Substitute("Failed to get new $0 token: $1", token_type, status.ToString());
     mutable_retrier()->DelayedRetry(this, status);
   }
 }
 
 template <class Server, class RequestPB, class ResponsePB>
+void RetriableRpc<Server, RequestPB, ResponsePB>::GotNewAuthnTokenRetryCb(
+    const Status& status) {
+  GotNewTokenRetryCb(status, "authn");
+}
+
+template <class Server, class RequestPB, class ResponsePB>
+void RetriableRpc<Server, RequestPB, ResponsePB>::GotNewAuthzTokenRetryCb(
+    const Status& status) {
+  GotNewTokenRetryCb(status, "authz");
+}
+
+template <class Server, class RequestPB, class ResponsePB>
 bool RetriableRpc<Server, RequestPB, ResponsePB>::RetryIfNeeded(
     const RetriableRpcStatus& result, Server* server) {
   // Handle the cases where we retry.
@@ -207,13 +237,13 @@ bool RetriableRpc<Server, RequestPB, ResponsePB>::RetryIfNeeded(
     case RetriableRpcStatus::INVALID_AUTHENTICATION_TOKEN: {
       // This is a special case for retry: first it's necessary to get a new
       // authn token and then retry the operation with the new token.
-      if (GetNewAuthnTokenAndRetry()) {
-        // The RPC will be retried.
-        resp_.Clear();
-        return true;
-      }
-      // Do not retry.
-      return false;
+      return GetNewAuthnTokenAndRetry();
+    }
+
+    case RetriableRpcStatus::INVALID_AUTHORIZATION_TOKEN: {
+      // TODO(awong): It'd be nice if the response status that led us to
+      // retrieve a token and retry were propgated to the user.
+      return GetNewAuthzTokenAndRetry();
     }
 
     case RetriableRpcStatus::NON_RETRIABLE_ERROR:
diff --git a/src/kudu/rpc/rpc.h b/src/kudu/rpc/rpc.h
index 7a44092..541efa8 100644
--- a/src/kudu/rpc/rpc.h
+++ b/src/kudu/rpc/rpc.h
@@ -69,9 +69,12 @@ struct RetriableRpcStatus {
     RESOURCE_NOT_FOUND,
 
     // The authentication token supplied with the operation was found invalid
-    // by the server. Most likely, the token has expired. If so, get a new token
+    // by the server. The token has likely expired. If so, get a new token
     // using client credentials and retry the operation with it.
     INVALID_AUTHENTICATION_TOKEN,
+
+    // Similar to INVALID_AUTHENTICATION_TOKEN, but for authorization tokens.
+    INVALID_AUTHORIZATION_TOKEN,
   };
 
   Result result;
diff --git a/src/kudu/util/test_util.h b/src/kudu/util/test_util.h
index d74d7d4..d320e3e 100644
--- a/src/kudu/util/test_util.h
+++ b/src/kudu/util/test_util.h
@@ -32,6 +32,13 @@
 #include "kudu/gutil/port.h"
 #include "kudu/util/monotime.h"
 
+#define SKIP_IF_SLOW_NOT_ALLOWED() do { \
+  if (!AllowSlowTests()) { \
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; \
+    return; \
+  } \
+} while (0)
+
 #define ASSERT_EVENTUALLY(expr) do { \
   AssertEventually(expr); \
   NO_PENDING_FATALS(); \