You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/07/01 20:20:29 UTC

[impala] branch master updated (c353cf7 -> b8e2090)

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from c353cf7  IMPALA-8713: fix stack overflow in unhex()
     new 5d8c99c  IMPALA-8689: test_hive_impala_interop failing with "Timeout >7200s"
     new b8e2090  Update kudu/security from 9ebcb77aa911aae76c48e717af24e643cb81908d

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/kudu/security/CMakeLists.txt                |   2 +
 be/src/kudu/security/gssapi.cc                     | 164 +++++++++++
 be/src/kudu/security/gssapi.h                      |  59 ++++
 be/src/kudu/security/init.cc                       |  12 +
 be/src/kudu/security/openssl_util.cc               |  41 ++-
 be/src/kudu/security/test/mini_kdc.cc              |  34 ++-
 be/src/kudu/security/test/mini_kdc.h               |   8 +
 be/src/kudu/security/tls_socket-test.cc            |  71 +++--
 be/src/kudu/security/tls_socket.cc                 |  12 +-
 be/src/kudu/security/token-test.cc                 | 322 +++++++++++++++++----
 be/src/kudu/security/token.proto                   |  31 ++
 be/src/kudu/security/token_signer.cc               |  36 ++-
 be/src/kudu/security/token_signer.h                |  89 ++++--
 be/src/kudu/security/token_verifier.cc             |  14 +-
 be/src/kudu/util/test_util.cc                      |  92 ++++--
 be/src/kudu/util/test_util.h                       |  31 +-
 .../test_hive_parquet_codec_interop.py             |   6 +-
 17 files changed, 851 insertions(+), 173 deletions(-)
 create mode 100644 be/src/kudu/security/gssapi.cc
 create mode 100644 be/src/kudu/security/gssapi.h


[impala] 01/02: IMPALA-8689: test_hive_impala_interop failing with "Timeout >7200s"

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5d8c99ce74c45a7d04f11e1f252b346d654f02bf
Author: Abhishek Rawat <ar...@cloudera.com>
AuthorDate: Thu Jun 27 13:57:11 2019 -0700

    IMPALA-8689: test_hive_impala_interop failing with "Timeout >7200s"
    
    The newly added Hive<->Impala interop test fails due to unexpected
    wrong results when reading TimeStamp column value written by Hive.
    The short term measure is to remove TimeStamp column from the interop
    tests. The original issue will be fixed by IMPALA-8721.
    
    Testing: Ran the testcase N number of times on both upstream and
    downstream code base.
    
    Change-Id: I148c79a31f9aada1b75614390434462d1e483f28
    Reviewed-on: http://gerrit.cloudera.org:8080/13755
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/custom_cluster/test_hive_parquet_codec_interop.py | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/tests/custom_cluster/test_hive_parquet_codec_interop.py b/tests/custom_cluster/test_hive_parquet_codec_interop.py
index 04b559a..7901c84 100644
--- a/tests/custom_cluster/test_hive_parquet_codec_interop.py
+++ b/tests/custom_cluster/test_hive_parquet_codec_interop.py
@@ -49,9 +49,11 @@ class TestParquetInterop(CustomClusterTestSuite):
   def test_hive_impala_interop(self, vector, unique_database, cluster_properties):
     # Setup source table.
     source_table = "{0}.{1}".format(unique_database, "t1_source")
+    # TODO: Once IMPALA-8721 is fixed add coverage for TimeStamp data type.
     self.execute_query_expect_success(self.client,
-        "create table {0} as select * from functional_parquet.alltypes"
-        .format(source_table))
+        "create table {0} as select id, bool_col, tinyint_col, smallint_col, int_col, "
+        "bigint_col, float_col, double_col, date_string_col, string_col, year, month "
+        "from functional_parquet.alltypes".format(source_table))
     self.execute_query_expect_success(self.client,
         "insert into {0}(id) values (7777), (8888), (9999), (11111), (22222), (33333)"
         .format(source_table))


[impala] 02/02: Update kudu/security from 9ebcb77aa911aae76c48e717af24e643cb81908d

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b8e20905a80f6541efdb2161333fb1552a976135
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Fri Jun 28 11:16:45 2019 -0700

    Update kudu/security from 9ebcb77aa911aae76c48e717af24e643cb81908d
    
    This updates the kudu security code to the latest version, which
    includes support for GSSAPI calls, necessary for SPNEGO.
    
    This is a straight rsync of the kudu/util source code except for some
    minor CMakeLists changes that were carried over from the old version.
    
    Change-Id: Ie7c91193fd49f8ca1234b23cf61fc90c1fdbe2e0
    Reviewed-on: http://gerrit.cloudera.org:8080/13767
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/kudu/security/CMakeLists.txt     |   2 +
 be/src/kudu/security/gssapi.cc          | 164 ++++++++++++++++
 be/src/kudu/security/gssapi.h           |  59 ++++++
 be/src/kudu/security/init.cc            |  12 ++
 be/src/kudu/security/openssl_util.cc    |  41 +++-
 be/src/kudu/security/test/mini_kdc.cc   |  34 +++-
 be/src/kudu/security/test/mini_kdc.h    |   8 +
 be/src/kudu/security/tls_socket-test.cc |  71 ++++---
 be/src/kudu/security/tls_socket.cc      |  12 +-
 be/src/kudu/security/token-test.cc      | 322 ++++++++++++++++++++++++++------
 be/src/kudu/security/token.proto        |  31 +++
 be/src/kudu/security/token_signer.cc    |  36 +++-
 be/src/kudu/security/token_signer.h     |  89 ++++++---
 be/src/kudu/security/token_verifier.cc  |  14 +-
 be/src/kudu/util/test_util.cc           |  92 ++++++---
 be/src/kudu/util/test_util.h            |  31 ++-
 16 files changed, 847 insertions(+), 171 deletions(-)

diff --git a/be/src/kudu/security/CMakeLists.txt b/be/src/kudu/security/CMakeLists.txt
index 63d6341..9533523 100644
--- a/be/src/kudu/security/CMakeLists.txt
+++ b/be/src/kudu/security/CMakeLists.txt
@@ -68,6 +68,7 @@ set(SECURITY_SRCS
   cert.cc
   crypto.cc
   kerberos_util.cc
+  gssapi.cc
   init.cc
   openssl_util.cc
   ${PORTED_X509_CHECK_HOST_CC}
@@ -86,6 +87,7 @@ set(SECURITY_LIBS
   kudu_util
   token_proto
 
+  gssapi_krb5
   krb5
   openssl_crypto
   openssl_ssl)
diff --git a/be/src/kudu/security/gssapi.cc b/be/src/kudu/security/gssapi.cc
new file mode 100644
index 0000000..6797ec3
--- /dev/null
+++ b/be/src/kudu/security/gssapi.cc
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/gssapi.h"
+
+#include <cstring>
+#include <string>
+#include <utility>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/escaping.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+
+using std::string;
+
+namespace kudu {
+namespace gssapi {
+
+namespace {
+
+string ErrorToString(OM_uint32 code, OM_uint32 type) {
+  string description;
+  OM_uint32 message_context = 0;
+
+  do {
+    if (!description.empty()) {
+      description.append(": ");
+    }
+    OM_uint32 minor = 0;
+    gss_buffer_desc buf;
+    gss_display_status(&minor, code, type, GSS_C_NULL_OID, &message_context, &buf);
+    const char* err = static_cast<const char*>(buf.value);
+    // The error message buffer returned has an explicit length, but in some cases
+    // this length appears to include the null terminator character, and in others
+    // it doesn't. So, we trim off any null terminators if necessary.
+    int err_len = strnlen(err, buf.length);
+    description.append(err, err_len);
+
+    // NOTE: call gss_release_buffer explicitly here instead of ReleaseBufferOrWarn
+    // to avoid the potential for infinite recursion back into ErrorToString.
+    gss_release_buffer(&minor, &buf);
+  } while (message_context != 0);
+
+  return description;
+}
+
+// Wrap a call to F(...), which is typically one of the gss_* functions from
+// the gssapi library. These functions all return a major status code and
+// also provide a minor status code as their first out-param.
+template <class F, class... Args>
+Status WrapGssCall(F func, Args&&... args) {
+  OM_uint32 minor = 0;
+  OM_uint32 major = func(&minor, std::forward<Args>(args)...);
+  return MajorMinorToStatus(major, minor);
+}
+
+void ReleaseNameOrWarn(gss_name_t name) {
+  if (name == GSS_C_NO_NAME) return;
+  WARN_NOT_OK(WrapGssCall(gss_release_name, &name), "Unable to release GSS name");
+}
+
+void ReleaseBufferOrWarn(gss_buffer_t buf) {
+  if (buf == GSS_C_NO_BUFFER) return;
+  WARN_NOT_OK(WrapGssCall(gss_release_buffer, buf), "Unable to release GSS buffer");
+}
+
+void ReleaseContextOrWarn(gss_ctx_id_t ctx) {
+  if (ctx == GSS_C_NO_CONTEXT) return;
+  WARN_NOT_OK(WrapGssCall(gss_delete_sec_context, &ctx, GSS_C_NO_BUFFER),
+              "Unable to release GSS context");
+}
+
+} // anonymous namespace
+
+
+Status MajorMinorToStatus(OM_uint32 major, OM_uint32 minor) {
+  if (GSS_ERROR(major)) {
+    string maj_str = ErrorToString(major, GSS_C_GSS_CODE);
+    string min_str = minor != 0 ? ErrorToString(minor, GSS_C_MECH_CODE) : "";
+    return Status::NotAuthorized(maj_str, min_str);
+  }
+  if (major == GSS_S_CONTINUE_NEEDED) {
+    return Status::Incomplete("");
+  }
+
+  return Status::OK();
+}
+
+
+Status SpnegoStep(const string& in_token_b64,
+                  string* out_token_b64,
+                  bool* complete,
+                  string* authenticated_principal) {
+  string token;
+  if (!strings::Base64Unescape(in_token_b64, &token)) {
+    return Status::InvalidArgument("invalid base64 encoding for token");
+  }
+
+  // Workaround MIT krb5 bug [1] fixed in krb5 1.16 and 1.15.3:
+  //
+  // gssint_get_mech_type_oid() was missing some length verification that could
+  // cause reads past the end of the input token. So, we extend the actual
+  // allocation of the input token an extra 256 bytes of padding.
+  //
+  // Without this fix, our ASAN builds fail with an out-of-bounds read.
+  //
+  // [1] http://krbdev.mit.edu/rt/Ticket/History.html?id=8620
+  size_t real_token_size = token.size();
+  token.resize(real_token_size + 256);
+
+  gss_buffer_desc input_token {real_token_size, const_cast<char*>(token.data())};
+
+  gss_ctx_id_t ctx = GSS_C_NO_CONTEXT;
+  gss_name_t client_name = GSS_C_NO_NAME;
+  SCOPED_CLEANUP({ ReleaseNameOrWarn(client_name); });
+
+  gss_buffer_desc out_token {0, nullptr};
+  SCOPED_CLEANUP({ ReleaseBufferOrWarn(&out_token); });
+  Status s = WrapGssCall(gss_accept_sec_context, &ctx, GSS_C_NO_CREDENTIAL,
+                         &input_token, GSS_C_NO_CHANNEL_BINDINGS,
+                         &client_name, /*mech_type=*/ nullptr,
+                         &out_token, /*ret_flags=*/nullptr,
+                         /*time_rec=*/nullptr, /*delegated_cred_handle=*/nullptr);
+  SCOPED_CLEANUP({ ReleaseContextOrWarn(ctx); });
+  if (!s.ok() && !s.IsIncomplete()) {
+    return s;
+  }
+  *complete = s.ok();
+
+  if (*complete) {
+    gss_buffer_desc name_buf {0, nullptr};
+    SCOPED_CLEANUP({ ReleaseBufferOrWarn(&name_buf); });
+    RETURN_NOT_OK_PREPEND(WrapGssCall(gss_display_name, client_name, &name_buf, nullptr),
+                          "Unable to extract authenticated principal name");
+    authenticated_principal->assign(reinterpret_cast<char*>(name_buf.value),
+                                    name_buf.length);
+  }
+
+  string out_token_str(reinterpret_cast<char*>(out_token.value),
+                       out_token.length);
+  strings::Base64Escape(out_token_str, out_token_b64);
+
+  return Status::OK();
+}
+
+
+} // namespace gssapi
+} // namespace kudu
diff --git a/be/src/kudu/security/gssapi.h b/be/src/kudu/security/gssapi.h
new file mode 100644
index 0000000..036b3d4
--- /dev/null
+++ b/be/src/kudu/security/gssapi.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+
+#include <gssapi/gssapi.h>
+
+namespace kudu {
+
+class Status;
+
+namespace gssapi {
+
+// Convert the given major/minor GSSAPI error codes into a Status.
+Status MajorMinorToStatus(OM_uint32 major, OM_uint32 minor);
+
+// Run a step of SPNEGO authentication.
+//
+// 'in_token_b64' is the base64-encoded token provided by the client, which may be empty
+// if the client did not provide any such token (e.g. if the HTTP 'Authorization' header
+// was not present).
+
+// 'out_token_b64' is the base64-encoded output token to send back to the client
+// during this round of negotiation.
+//
+// If any error occurs (eg an invalid token is provided), a bad Status is returned.
+//
+// An OK status indicates that the negotiation is proceeding successfully, or has
+// completed, whereas a non-OK status indicates an error or an unsuccessful
+// authentication (in which case the out-parameters will not be modified).
+//
+// In the case of an OK status, '*complete' indicates whether any further rounds are
+// required. On completion of negotiation, 'authenticated_principal' will be set to the
+// full principal name of the remote user.
+//
+// NOTE: per the SPNEGO protocol, the final "complete" negotiation stage may
+// include a token.
+Status SpnegoStep(const std::string& in_token_b64,
+                  std::string* out_token_b64,
+                  bool* complete,
+                  std::string* authenticated_principal);
+
+} // namespace gssapi
+} // namespace kudu
diff --git a/be/src/kudu/security/init.cc b/be/src/kudu/security/init.cc
index 64ee1a5..200411f 100644
--- a/be/src/kudu/security/init.cc
+++ b/be/src/kudu/security/init.cc
@@ -48,6 +48,13 @@
 #include "kudu/util/status.h"
 #include "kudu/util/thread.h"
 
+#if defined(__APPLE__)
+// Almost all functions in the krb5 API are marked as deprecated in favor
+// of GSS.framework in macOS.
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
+#endif // #if defined(__APPLE__)
+
 #ifndef __APPLE__
 static constexpr bool kDefaultSystemAuthToLocal = true;
 #else
@@ -56,6 +63,7 @@ static constexpr bool kDefaultSystemAuthToLocal = true;
 // implementation.
 static constexpr bool kDefaultSystemAuthToLocal = false;
 #endif
+
 DEFINE_bool(use_system_auth_to_local, kDefaultSystemAuthToLocal,
             "When enabled, use the system krb5 library to map Kerberos principal "
             "names to local (short) usernames. If not enabled, the first component "
@@ -475,3 +483,7 @@ Status InitKerberosForServer(const std::string& raw_principal, const std::string
 
 } // namespace security
 } // namespace kudu
+
+#if defined(__APPLE__)
+#pragma GCC diagnostic pop
+#endif // #if defined(__APPLE__)
diff --git a/be/src/kudu/security/openssl_util.cc b/be/src/kudu/security/openssl_util.cc
index a32140f..e96fcb3 100644
--- a/be/src/kudu/security/openssl_util.cc
+++ b/be/src/kudu/security/openssl_util.cc
@@ -82,25 +82,49 @@ void LockingCB(int mode, int type, const char* /*file*/, int /*line*/) {
 #endif
 
 Status CheckOpenSSLInitialized() {
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+  // Starting with OpenSSL 1.1.0, the old thread API became obsolete
+  // (see changelist 2e52e7df5 in the OpenSSL upstream repo), and
+  // CRYPTO_get_locking_callback() always returns nullptr. Also, the library
+  // always initializes its internals for multi-threaded usage.
+  // Another point is that starting with version 1.1.0, SSL_CTX_new()
+  // initializes the OpenSSL library under the hood, so SSL_CTX_new() would
+  // not return nullptr unless there was an error during the initialization
+  // of the library. That makes this code in CheckOpenSSLInitialized() obsolete
+  // starting with OpenSSL version 1.1.0.
+  //
+  // Starting with OpenSSL 1.1.0, there isn't a straightforward way to
+  // determine whether the library has already been initialized if using just
+  // the API (well, there is CRYPTO_secure_malloc_initialized() but that's just
+  // for the crypto library and it's implementation-dependent). But from the
+  // other side, the whole idea that this method should check whether the
+  // library has already been initialized is not relevant anymore: even if it's
+  // not yet initialized, the first call to SSL_CTX_new() (from, say,
+  // TlsContext::Init()) will initialize the library under the hood, so the
+  // library will be ready for multi-thread usage by Kudu.
   if (!CRYPTO_get_locking_callback()) {
     return Status::RuntimeError("Locking callback not initialized");
   }
   auto ctx = ssl_make_unique(SSL_CTX_new(SSLv23_method()));
   if (!ctx) {
     ERR_clear_error();
-    return Status::RuntimeError("SSL library appears uninitialized (cannot create SSL_CTX)");
+    return Status::RuntimeError(
+        "SSL library appears uninitialized (cannot create SSL_CTX)");
   }
+#endif
   return Status::OK();
 }
 
 void DoInitializeOpenSSL() {
-#if OPENSSL_VERSION_NUMBER > 0x10100000L
+#if OPENSSL_VERSION_NUMBER >= 0x10100000L
   // The OPENSSL_init_ssl manpage [1] says "As of version 1.1.0 OpenSSL will
   // automatically allocate all resources it needs so no explicit initialisation
   // is required." However, eliding library initialization leads to a memory
-  // leak in some versions of OpenSSL 1.1 when the first OpenSSL is
-  // ERR_peek_error [2]. In Kudu this is often the
-  // case due to prolific application of SCOPED_OPENSSL_NO_PENDING_ERRORS.
+  // leak in some versions of OpenSSL 1.1 when the first OpenSSL call is
+  // ERR_peek_error (see [2] for details; the issue was addressed in OpenSSL
+  // 1.1.0i (OPENSSL_VERSION_NUMBER 0x1010009f)). In Kudu this is often the
+  // case due to prolific application of the SCOPED_OPENSSL_NO_PENDING_ERRORS
+  // macro.
   //
   // Rather than determine whether this particular OpenSSL instance is
   // leak-free, we'll initialize the library explicitly.
@@ -130,8 +154,11 @@ void DoInitializeOpenSSL() {
   auto ctx = ssl_make_unique(SSL_CTX_new(SSLv23_method()));
   if (ctx) {
     LOG(WARNING) << "It appears that OpenSSL has been previously initialized by "
-                 << "code outside of Kudu. Please use kudu::client::DisableOpenSSLInitialization() "
-                 << "to avoid potential crashes due to conflicting initialization.";
+                    "code outside of Kudu. Please first properly initialize "
+                    "OpenSSL for multi-threaded usage (setting thread callback "
+                    "functions for OpenSSL of versions earlier than 1.1.0) and "
+                    "then call kudu::client::DisableOpenSSLInitialization() "
+                    "to avoid potential crashes due to conflicting initialization.";
     // Continue anyway; all of the below is idempotent, except for the locking callback,
     // which we check before overriding. They aren't thread-safe, however -- that's why
     // we try to get embedding applications to do the right thing here rather than risk a
diff --git a/be/src/kudu/security/test/mini_kdc.cc b/be/src/kudu/security/test/mini_kdc.cc
index 4f987c5..1662770 100644
--- a/be/src/kudu/security/test/mini_kdc.cc
+++ b/be/src/kudu/security/test/mini_kdc.cc
@@ -25,6 +25,7 @@
 #include <string>
 #include <utility>
 
+#include <boost/optional/optional.hpp>
 #include <glog/logging.h>
 
 #include "kudu/gutil/strings/strip.h"
@@ -37,6 +38,7 @@
 #include "kudu/util/subprocess.h"
 #include "kudu/util/test_util.h"
 
+using boost::none;
 using std::map;
 using std::string;
 using std::unique_ptr;
@@ -61,9 +63,7 @@ MiniKdc::MiniKdc(MiniKdcOptions options)
     options_.realm = "KRBTEST.COM";
   }
   if (options_.data_root.empty()) {
-    // We hardcode "/tmp" here since the original function which initializes a random test
-    // directory (GetTestDataDirectory()), depends on gmock.
-    options_.data_root = JoinPathSegments("/tmp", "krb5kdc");
+    options_.data_root = JoinPathSegments(GetTestDataDirectory(), "krb5kdc");
   }
   if (options_.ticket_lifetime.empty()) {
     options_.ticket_lifetime = "24h";
@@ -152,8 +152,10 @@ Status MiniKdc::Start() {
   RETURN_NOT_OK(kdc_process_->Start());
 
   const bool need_config_update = (options_.port == 0);
-  // Wait for KDC to start listening on its ports and commencing operation.
-  RETURN_NOT_OK(WaitForUdpBind(kdc_process_->pid(), &options_.port, MonoDelta::FromSeconds(1)));
+  // Wait for KDC to start listening on its ports and commencing operation
+  // with a wildcard binding.
+  RETURN_NOT_OK(WaitForUdpBind(kdc_process_->pid(), &options_.port,
+                               /*addr=*/none, MonoDelta::FromSeconds(1)));
 
   if (need_config_update) {
     // If we asked for an ephemeral port, grab the actual ports and
@@ -274,6 +276,28 @@ Status MiniKdc::CreateServiceKeytab(const string& spn,
   return Status::OK();
 }
 
+Status MiniKdc::RandomizePrincipalKey(const string& spn) {
+  SCOPED_LOG_SLOW_EXECUTION(WARNING, 100, Substitute("randomizing key for $0", spn));
+  string kadmin;
+  RETURN_NOT_OK(GetBinaryPath("kadmin.local", &kadmin));
+  RETURN_NOT_OK(Subprocess::Call(MakeArgv({
+          kadmin, "-q", Substitute("change_password -randkey $0", spn)})));
+  return Status::OK();
+}
+
+Status MiniKdc::CreateKeytabForExistingPrincipal(const string& spn) {
+  SCOPED_LOG_SLOW_EXECUTION(WARNING, 100, Substitute("creating keytab for $0", spn));
+  string kt_path = spn;
+  StripString(&kt_path, "/", '_');
+  kt_path = JoinPathSegments(options_.data_root, kt_path) + ".keytab";
+
+  string kadmin;
+  RETURN_NOT_OK(GetBinaryPath("kadmin.local", &kadmin));
+  RETURN_NOT_OK(Subprocess::Call(MakeArgv({
+          kadmin, "-q", Substitute("xst -norandkey -k $0 $1", kt_path, spn)})));
+  return Status::OK();
+}
+
 Status MiniKdc::Kinit(const string& username) {
   SCOPED_LOG_SLOW_EXECUTION(WARNING, 100, Substitute("kinit for $0", username));
   string kinit;
diff --git a/be/src/kudu/security/test/mini_kdc.h b/be/src/kudu/security/test/mini_kdc.h
index e282cc4..94990a2 100644
--- a/be/src/kudu/security/test/mini_kdc.h
+++ b/be/src/kudu/security/test/mini_kdc.h
@@ -92,6 +92,14 @@ class MiniKdc {
   // will be reset and a new keytab will be generated.
   Status CreateServiceKeytab(const std::string& spn, std::string* path);
 
+  // Randomize the key for the given SPN. This invalidates any previously-produced
+  // keytabs.
+  Status RandomizePrincipalKey(const std::string& spn);
+
+  // Creates a keytab for an existing principal.
+  // 'spn' is the desired service principal name (e.g. "kudu/foo.example.com").
+  Status CreateKeytabForExistingPrincipal(const std::string& spn);
+
   // Kinit a user to the mini KDC.
   Status Kinit(const std::string& username) WARN_UNUSED_RESULT;
 
diff --git a/be/src/kudu/security/tls_socket-test.cc b/be/src/kudu/security/tls_socket-test.cc
index 001a206..b88cdf4 100644
--- a/be/src/kudu/security/tls_socket-test.cc
+++ b/be/src/kudu/security/tls_socket-test.cc
@@ -58,7 +58,7 @@ using std::vector;
 namespace kudu {
 namespace security {
 
-const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
 
 // Size is big enough to not fit into output socket buffer of default size
 // (controlled by setsockopt() with SO_SNDBUF).
@@ -155,18 +155,18 @@ class EchoServer {
         CHECK_OK(sock->SetRecvTimeout(kTimeout));
         unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
         // An "echo" loop for kEchoChunkSize byte buffers.
-        while (!stop_) {
+        while (!is_stopped_) {
           size_t n;
           Status s = sock->BlockingRecv(buf.get(), kEchoChunkSize, &n, MonoTime::Now() + kTimeout);
           if (!s.ok()) {
-            CHECK(stop_) << "unexpected error reading: " << s.ToString();
+            CHECK(is_stopped_) << "unexpected error reading: " << s.ToString();
           }
 
           LOG(INFO) << "server echoing " << n << " bytes";
           size_t written;
           s = sock->BlockingWrite(buf.get(), n, &written, MonoTime::Now() + kTimeout);
           if (!s.ok()) {
-            CHECK(stop_) << "unexpected error writing: " << s.ToString();
+            CHECK(is_stopped_) << "unexpected error writing: " << s.ToString();
           }
           if (slow_read_) {
             SleepFor(MonoDelta::FromMilliseconds(10));
@@ -185,12 +185,12 @@ class EchoServer {
     return listen_addr_;
   }
 
-  bool stopped() const {
-    return stop_;
+  bool is_stopped() const {
+    return is_stopped_;
   }
 
   void Stop() {
-    stop_ = true;
+    is_stopped_ = true;
   }
   void Join() {
     thread_.join();
@@ -208,38 +208,49 @@ class EchoServer {
   thread thread_;
   pthread_t pthread_;
   CountDownLatch pthread_sync_;
-  std::atomic<bool> stop_ { false };
+  std::atomic<bool> is_stopped_ { false };
 
   bool slow_read_ = false;
 };
 
 void handler(int /* signal */) {}
 
+// Test that errors returned when reading from a TLS socket are reasonable and
+// contain the address of the remote.
 TEST_F(TlsSocketTest, TestRecvFailure) {
-    EchoServer server;
-    server.Start();
-    unique_ptr<Socket> client_sock;
-    NO_FATALS(ConnectClient(server.listen_addr(), &client_sock));
-    unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
-
-    SleepFor(MonoDelta::FromMilliseconds(100));
-    server.Stop();
-
-    size_t nwritten;
-    ASSERT_OK(client_sock->BlockingWrite(buf.get(), kEchoChunkSize, &nwritten,
-        MonoTime::Now() + kTimeout));
-    size_t nread;
+  EchoServer server;
+  server.Start();
+  unique_ptr<Socket> client_sock;
+  NO_FATALS(ConnectClient(server.listen_addr(), &client_sock));
+  unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
 
-    ASSERT_OK(client_sock->BlockingRecv(buf.get(), kEchoChunkSize, &nread,
-        MonoTime::Now() + kTimeout));
+  // The client writes to the server. This blocks on the server receiving,
+  // so once this completes the server is in its "echo loop".
+  size_t nwritten_unused;
+  ASSERT_OK(client_sock->BlockingWrite(buf.get(),
+                                       kEchoChunkSize,
+                                       &nwritten_unused,
+                                       MonoTime::Now() + kTimeout));
 
-    Status s = client_sock->BlockingRecv(buf.get(), kEchoChunkSize, &nread,
-        MonoTime::Now() + kTimeout);
+  // Stop the server. The client can still Recv once from the server because
+  // the server is trying to echo what was sent by the client.
+  server.Stop();
 
-    ASSERT_TRUE(!s.ok());
-    ASSERT_TRUE(s.IsNetworkError());
-    ASSERT_STR_MATCHES(s.message().ToString(), "BlockingRecv error: failed to read from "
-                                               "TLS socket \\(remote: 127.0.0.1:[0-9]+\\): ");
+  // The first Recv succeeds. This unblocks the server and causes it to exit.
+  size_t nread_unused;
+  ASSERT_OK(client_sock->BlockingRecv(buf.get(),
+                                      kEchoChunkSize,
+                                      &nread_unused,
+                                      MonoTime::Now() + kTimeout));
+
+  // The server has closed the connection, so the second Recv will fail.
+  Status s = client_sock->BlockingRecv(buf.get(),
+                                       kEchoChunkSize,
+                                       &nread_unused,
+                                       MonoTime::Now() + kTimeout);
+  ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+  ASSERT_STR_MATCHES(s.message().ToString(), "BlockingRecv error: failed to read from "
+                                             "TLS socket \\(remote: 127.0.0.1:[0-9]+\\): ");
 }
 
 // Test for failures to handle EINTR during TLS connection
@@ -257,7 +268,7 @@ TEST_F(TlsSocketTest, TestTlsSocketInterrupted) {
 
   // Start a thread to send signals to the server thread.
   thread killer([&]() {
-      while (!server.stopped()) {
+      while (!server.is_stopped()) {
         PCHECK(pthread_kill(server.pthread(), SIGUSR2) == 0);
         SleepFor(MonoDelta::FromMicroseconds(rand() % 10));
       }
diff --git a/be/src/kudu/security/tls_socket.cc b/be/src/kudu/security/tls_socket.cc
index 355f04b..a586315 100644
--- a/be/src/kudu/security/tls_socket.cc
+++ b/be/src/kudu/security/tls_socket.cc
@@ -33,6 +33,9 @@
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
 
+using std::string;
+using strings::Substitute;
+
 namespace kudu {
 namespace security {
 
@@ -114,9 +117,10 @@ Status TlsSocket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) {
   int save_errno = errno;
   if (bytes_read <= 0) {
     Sockaddr remote;
-    Socket::GetPeerAddress(&remote);
-    std::string kErrString = strings::Substitute("failed to read from TLS socket (remote: $0)",
-                                                 remote.ToString());
+    Status s = GetPeerAddress(&remote);
+    const string remote_str = s.ok() ? remote.ToString() : "unknown";
+    string kErrString = Substitute("failed to read from TLS socket (remote: $0)",
+                                   remote_str);
 
     if (bytes_read == 0 && SSL_get_shutdown(ssl_.get()) == SSL_RECEIVED_SHUTDOWN) {
       return Status::NetworkError(kErrString, ErrnoToString(ESHUTDOWN), ESHUTDOWN);
@@ -124,7 +128,7 @@ Status TlsSocket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) {
     auto error_code = SSL_get_error(ssl_.get(), bytes_read);
     if (error_code == SSL_ERROR_WANT_READ) {
       if (save_errno != 0) {
-        return Status::NetworkError("SSL_read error from " + remote.ToString(),
+        return Status::NetworkError("SSL_read error from " + remote_str,
                                     ErrnoToString(save_errno), save_errno);
       }
       // Nothing available to read yet.
diff --git a/be/src/kudu/security/token-test.cc b/be/src/kudu/security/token-test.cc
index 7172a51..a4d7804 100644
--- a/be/src/kudu/security/token-test.cc
+++ b/be/src/kudu/security/token-test.cc
@@ -18,14 +18,16 @@
 #include <cstdint>
 #include <deque>
 #include <memory>
+#include <ostream>
 #include <string>
+#include <thread>
 #include <utility>
 #include <vector>
 
-#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/walltime.h"
 #include "kudu/security/crypto.h"
 #include "kudu/security/openssl_util.h"
@@ -33,23 +35,68 @@
 #include "kudu/security/token_signer.h"
 #include "kudu/security/token_signing_key.h"
 #include "kudu/security/token_verifier.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
-DECLARE_int32(tsk_num_rsa_bits);
-
-using std::string;
+using kudu::pb_util::SecureDebugString;
 using std::make_shared;
+using std::string;
+using std::thread;
 using std::unique_ptr;
 using std::vector;
+using strings::Substitute;
 
 namespace kudu {
 namespace security {
 
 namespace {
 
+// Dummy variables to use when their values don't matter much.
+const int kNumBits = 512;
+const int64_t kTokenValiditySeconds = 10;
+const char kUser[] = "user";
+
+// Repeatedly signs tokens and attempts to rotate TSKs until the active TSK's
+// sequence number passes `seq_num`, returning the last token signed by the TSK
+// at `seq_num`. This token is roughly the last possible token signed in the
+// TSK's activity interval.
+// The TokenGenerator 'generate_token' is a lambda that fills in a
+// SignedTokenPB and returns a Status.
+template <class TokenGenerator>
+Status SignUntilRotatePast(TokenSigner* signer, TokenGenerator generate_token,
+                           const string& token_type, int64_t seq_num,
+                           SignedTokenPB* last_signed_by_tsk) {
+  SignedTokenPB last_signed;
+  RETURN_NOT_OK_PREPEND(generate_token(&last_signed),
+      Substitute("Failed to generate first $0 token", token_type));
+  DCHECK_EQ(seq_num, last_signed.signing_key_seq_num())
+      << Substitute("Unexpected starting seq_num for $0 token", token_type);
+
+  auto cur_seq_num = seq_num;
+  while (cur_seq_num == seq_num) {
+    SleepFor(MonoDelta::FromMilliseconds(50));
+    KLOG_EVERY_N_SECS(INFO, 1) <<
+        Substitute("Generating $0 token for activity interval $1", token_type, seq_num);
+    RETURN_NOT_OK_PREPEND(signer->TryRotateKey(), "Failed to attempt to rotate key");
+    SignedTokenPB signed_token;
+    RETURN_NOT_OK_PREPEND(generate_token(&signed_token),
+        Substitute("Failed to generate $0 token", token_type));
+    // We want to return the last token signed by the `seq_num` TSK, so only
+    // update it when appropriate.
+    cur_seq_num = signed_token.signing_key_seq_num();
+    if (cur_seq_num == seq_num) {
+      last_signed = std::move(signed_token);
+    }
+  }
+  *last_signed_by_tsk = std::move(last_signed);
+  return Status::OK();
+}
+
 SignedTokenPB MakeUnsignedToken(int64_t expiration) {
   SignedTokenPB ret;
   TokenPB token;
@@ -70,7 +117,7 @@ SignedTokenPB MakeIncompatibleToken() {
 // Generate public key as a string in DER format for tests.
 Status GeneratePublicKeyStrDer(string* ret) {
   PrivateKey private_key;
-  RETURN_NOT_OK(GeneratePrivateKey(512, &private_key));
+  RETURN_NOT_OK(GeneratePrivateKey(kNumBits, &private_key));
   PublicKey public_key;
   RETURN_NOT_OK(private_key.GetPublicKey(&public_key));
   string public_key_str_der;
@@ -85,7 +132,7 @@ Status GenerateTokenSigningKey(int64_t seq_num,
                                unique_ptr<TokenSigningPrivateKey>* tsk) {
   {
     unique_ptr<PrivateKey> private_key(new PrivateKey);
-    RETURN_NOT_OK(GeneratePrivateKey(512, private_key.get()));
+    RETURN_NOT_OK(GeneratePrivateKey(kNumBits, private_key.get()));
     tsk->reset(new TokenSigningPrivateKey(
         seq_num, expire_time_seconds, std::move(private_key)));
   }
@@ -99,14 +146,14 @@ void CheckAndAddNextKey(int iter_num,
   ASSERT_NE(nullptr, key_seq_num);
   int64_t seq_num;
   {
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer->CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     seq_num = key->key_seq_num();
   }
 
   for (int i = 0; i < iter_num; ++i) {
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer->CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     ASSERT_EQ(seq_num, key->key_seq_num());
@@ -124,7 +171,7 @@ class TokenTest : public KuduTest {
 };
 
 TEST_F(TokenTest, TestInit) {
-  TokenSigner signer(10, 10);
+  TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 10);
   const TokenVerifier& verifier(signer.verifier());
 
   SignedTokenPB token = MakeUnsignedToken(WallTime_Now());
@@ -133,7 +180,7 @@ TEST_F(TokenTest, TestInit) {
 
   static const int64_t kKeySeqNum = 100;
   PrivateKey private_key;
-  ASSERT_OK(GeneratePrivateKey(512, &private_key));
+  ASSERT_OK(GeneratePrivateKey(kNumBits, &private_key));
   string private_key_str_der;
   ASSERT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER));
   TokenSigningPrivateKeyPB pb;
@@ -163,9 +210,10 @@ TEST_F(TokenTest, TestInit) {
 TEST_F(TokenTest, TestTokenSignerNonSparseSequenceNumbers) {
   static const int kIterNum = 3;
   static const int64_t kAuthnTokenValiditySeconds = 1;
+  static const int64_t kAuthzTokenValiditySeconds = 1;
   static const int64_t kKeyRotationSeconds = 1;
 
-  TokenSigner signer(kAuthnTokenValiditySeconds, kKeyRotationSeconds);
+  TokenSigner signer(kAuthnTokenValiditySeconds, kAuthzTokenValiditySeconds, kKeyRotationSeconds);
 
   int64_t seq_num_first_key;
   NO_FATALS(CheckAndAddNextKey(kIterNum, &signer, &seq_num_first_key));
@@ -192,12 +240,10 @@ TEST_F(TokenTest, TestTokenSignerNonSparseSequenceNumbers) {
 // import should be greater than any sequence number the TokenSigner has seen
 // during the import.
 TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
-  static const int64_t kAuthnTokenValiditySeconds = 8;
   static const int64_t kKeyRotationSeconds = 8;
-  static const int64_t kKeyValiditySeconds =
-      kAuthnTokenValiditySeconds + 2 * kKeyRotationSeconds;
 
-  TokenSigner signer(kAuthnTokenValiditySeconds, kKeyRotationSeconds);
+  TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, kKeyRotationSeconds);
+  const int64_t key_validity_seconds = signer.key_validity_seconds_;
   const TokenVerifier& verifier(signer.verifier());
 
   static const int64_t kExpiredKeySeqNum = 100;
@@ -206,7 +252,7 @@ TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
     // First, try to import already expired key to check that internal key
     // sequence number advances correspondingly.
     PrivateKey private_key;
-    ASSERT_OK(GeneratePrivateKey(512, &private_key));
+    ASSERT_OK(GeneratePrivateKey(kNumBits, &private_key));
     string private_key_str_der;
     ASSERT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER));
     TokenSigningPrivateKeyPB pb;
@@ -215,9 +261,7 @@ TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
     pb.set_expire_unix_epoch_seconds(WallTime_Now() - 1);
 
     ASSERT_OK(signer.ImportKeys({pb}));
-  }
 
-  {
     // Check the result of importing keys: there should be no keys because
     // the only one we tried to import was already expired.
     vector<TokenSigningPublicKeyPB> public_keys(verifier.ExportKeys());
@@ -228,7 +272,7 @@ TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
     // Now import valid (not yet expired) key, but with sequence number less
     // than of the expired key.
     PrivateKey private_key;
-    ASSERT_OK(GeneratePrivateKey(512, &private_key));
+    ASSERT_OK(GeneratePrivateKey(kNumBits, &private_key));
     string private_key_str_der;
     ASSERT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER));
     TokenSigningPrivateKeyPB pb;
@@ -237,19 +281,17 @@ TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
     // Set the TSK's expiration time: make the key valid but past its activity
     // interval.
     pb.set_expire_unix_epoch_seconds(
-        WallTime_Now() + (kKeyValiditySeconds - 2 * kKeyRotationSeconds - 1));
+        WallTime_Now() + (key_validity_seconds - 2 * kKeyRotationSeconds - 1));
 
     ASSERT_OK(signer.ImportKeys({pb}));
-  }
 
-  {
-    // Check the result of importing keys.
+    // Check the result of importing keys. The lower sequence number is
+    // accepted, even though we previously imported a key with a higher
+    // sequence number that was expired.
     vector<TokenSigningPublicKeyPB> public_keys(verifier.ExportKeys());
     ASSERT_EQ(1, public_keys.size());
     ASSERT_EQ(kKeySeqNum, public_keys[0].key_seq_num());
-  }
 
-  {
     // The newly imported key should be used to sign tokens.
     SignedTokenPB token = MakeUnsignedToken(WallTime_Now());
     ASSERT_OK(signer.SignToken(&token));
@@ -259,7 +301,7 @@ TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
   }
 
   {
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     ASSERT_EQ(kExpiredKeySeqNum + 1, key->key_seq_num());
@@ -289,15 +331,19 @@ TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
 // less or equal to the sequence number of the most 'recent' key.
 TEST_F(TokenTest, TestAddKeyConstraints) {
   {
-    TokenSigner signer(1, 1);
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    // If a signer has not created a TSK yet, it will create a key, and will
+    // happily accept the generated key.
+    TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 1);
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     ASSERT_OK(signer.AddKey(std::move(key)));
   }
   {
-    TokenSigner signer(1, 1);
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    // If the key sequence number added to the signer isn't monotonically
+    // increasing, the signer will complain.
+    TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 1);
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     const int64_t key_seq_num = key->key_seq_num();
@@ -308,10 +354,11 @@ TEST_F(TokenTest, TestAddKeyConstraints) {
                         ": invalid key sequence number, should be at least ");
   }
   {
-    TokenSigner signer(1, 1);
+    // Test importing expired keys. The signer should be OK with it.
+    TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 1);
     static const int64_t kKeySeqNum = 100;
     PrivateKey private_key;
-    ASSERT_OK(GeneratePrivateKey(512, &private_key));
+    ASSERT_OK(GeneratePrivateKey(kNumBits, &private_key));
     string private_key_str_der;
     ASSERT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER));
     TokenSigningPrivateKeyPB pb;
@@ -321,7 +368,9 @@ TEST_F(TokenTest, TestAddKeyConstraints) {
     pb.set_expire_unix_epoch_seconds(WallTime_Now() - 1);
     ASSERT_OK(signer.ImportKeys({pb}));
 
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    // Generated keys thereafter are expected to have higher sequence numbers
+    // than the imported expired keys.
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     const int64_t key_seq_num = key->key_seq_num();
@@ -334,31 +383,62 @@ TEST_F(TokenTest, TestAddKeyConstraints) {
   }
 }
 
-TEST_F(TokenTest, TestGenerateAuthTokenNoUserName) {
-  TokenSigner signer(10, 10);
+TEST_F(TokenTest, TestGenerateAuthnTokenNoUserName) {
+  TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 10);
   SignedTokenPB signed_token_pb;
   const Status& s = signer.GenerateAuthnToken("", &signed_token_pb);
   EXPECT_TRUE(s.IsInvalidArgument()) << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "no username provided for authn token");
 }
 
+TEST_F(TokenTest, TestGenerateAuthzToken) {
+  // We cannot generate tokens with no username associated with it.
+  auto verifier(make_shared<TokenVerifier>());
+  TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 10, verifier);
+  TablePrivilegePB table_privilege;
+  SignedTokenPB signed_token_pb;
+  Status s = signer.GenerateAuthzToken("", table_privilege, &signed_token_pb);
+  EXPECT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "no username provided for authz token");
+
+  // Generated tokens will have the specified privileges.
+  const string kAuthorized = "authzed";
+  unique_ptr<TokenSigningPrivateKey> key;
+  ASSERT_OK(signer.CheckNeedKey(&key));
+  ASSERT_NE(nullptr, key.get());
+  ASSERT_OK(signer.AddKey(std::move(key)));
+  ASSERT_OK(signer.GenerateAuthzToken(kAuthorized,
+                                      table_privilege,
+                                      &signed_token_pb));
+  ASSERT_TRUE(signed_token_pb.has_token_data());
+  TokenPB token_pb;
+  ASSERT_EQ(VerificationResult::VALID,
+            verifier->VerifyTokenSignature(signed_token_pb, &token_pb));
+  ASSERT_TRUE(token_pb.has_authz());
+  ASSERT_EQ(kAuthorized, token_pb.authz().username());
+  ASSERT_TRUE(token_pb.authz().has_table_privilege());
+  ASSERT_EQ(SecureDebugString(table_privilege),
+            SecureDebugString(token_pb.authz().table_privilege()));
+}
+
 TEST_F(TokenTest, TestIsCurrentKeyValid) {
-  static const int64_t kAuthnTokenValiditySeconds = 1;
+  // This test sleeps for a key validity period, so set it up to be short.
+  static const int64_t kShortTokenValiditySeconds = 1;
   static const int64_t kKeyRotationSeconds = 1;
-  static const int64_t kKeyValiditySeconds =
-      kAuthnTokenValiditySeconds + 2 * kKeyRotationSeconds;
 
-  TokenSigner signer(kAuthnTokenValiditySeconds, kKeyRotationSeconds);
+  TokenSigner signer(kShortTokenValiditySeconds, kShortTokenValiditySeconds, kKeyRotationSeconds);
+  static const int64_t key_validity_seconds = signer.key_validity_seconds_;
+
   EXPECT_FALSE(signer.IsCurrentKeyValid());
   {
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     // No keys are available yet, so should be able to add.
     ASSERT_NE(nullptr, key.get());
     ASSERT_OK(signer.AddKey(std::move(key)));
   }
   EXPECT_TRUE(signer.IsCurrentKeyValid());
-  SleepFor(MonoDelta::FromSeconds(kKeyValiditySeconds));
+  SleepFor(MonoDelta::FromSeconds(key_validity_seconds));
   // The key should expire after its validity interval.
   EXPECT_FALSE(signer.IsCurrentKeyValid());
 
@@ -369,8 +449,8 @@ TEST_F(TokenTest, TestIsCurrentKeyValid) {
 
 TEST_F(TokenTest, TestTokenSignerAddKeys) {
   {
-    TokenSigner signer(10, 10);
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 10);
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     // No keys are available yet, so should be able to add.
     ASSERT_NE(nullptr, key.get());
@@ -384,8 +464,8 @@ TEST_F(TokenTest, TestTokenSignerAddKeys) {
   {
     // Special configuration for TokenSigner: rotation interval is zero,
     // so should be able to add two keys right away.
-    TokenSigner signer(10, 0);
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 0);
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     // No keys are available yet, so should be able to add.
     ASSERT_NE(nullptr, key.get());
@@ -406,8 +486,8 @@ TEST_F(TokenTest, TestTokenSignerAddKeys) {
     // It should not need next key right away, but should need next key after
     // the rotation interval.
     static const int64_t kKeyRotationIntervalSeconds = 8;
-    TokenSigner signer(10, kKeyRotationIntervalSeconds);
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, kKeyRotationIntervalSeconds);
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     // No keys are available yet, so should be able to add.
     ASSERT_NE(nullptr, key.get());
@@ -433,7 +513,7 @@ TEST_F(TokenTest, TestTokenSignerAddKeys) {
 // Test how key rotation works.
 TEST_F(TokenTest, TestTokenSignerSignVerifyExport) {
   // Key rotation interval 0 allows adding 2 keys in a row with no delay.
-  TokenSigner signer(10, 0);
+  TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 0);
   const TokenVerifier& verifier(signer.verifier());
 
   // Should start off with no signing keys.
@@ -447,7 +527,7 @@ TEST_F(TokenTest, TestTokenSignerSignVerifyExport) {
   // Generate and set a new key.
   int64_t signing_key_seq_num;
   {
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     signing_key_seq_num = key->key_seq_num();
@@ -470,7 +550,7 @@ TEST_F(TokenTest, TestTokenSignerSignVerifyExport) {
   // Set next key and check that we return the right keys.
   int64_t next_signing_key_seq_num;
   {
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     next_signing_key_seq_num = key->key_seq_num();
@@ -482,7 +562,7 @@ TEST_F(TokenTest, TestTokenSignerSignVerifyExport) {
   ASSERT_EQ(0, verifier.ExportKeys(next_signing_key_seq_num).size());
 
   // The first key should be used for signing: the next one is saved
-  // for the next round.
+  // for the next rotation.
   {
     SignedTokenPB token = MakeUnsignedToken(WallTime_Now());
     ASSERT_OK(signer.SignToken(&token));
@@ -498,11 +578,11 @@ TEST_F(TokenTest, TestExportKeys) {
   // and have an appropriate expiration.
   const int64_t key_exp_seconds = 30;
   const int64_t key_rotation_seconds = 10;
-  TokenSigner signer(key_exp_seconds - 2 * key_rotation_seconds,
+  TokenSigner signer(key_exp_seconds - 2 * key_rotation_seconds, 0,
                      key_rotation_seconds);
   int64_t key_seq_num;
   {
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     key_seq_num = key->key_seq_num();
@@ -523,9 +603,9 @@ TEST_F(TokenTest, TestExportKeys) {
 // Test that the TokenVerifier can import keys exported by the TokenSigner
 // and then verify tokens signed by it.
 TEST_F(TokenTest, TestEndToEnd_Valid) {
-  TokenSigner signer(10, 10);
+  TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 10);
   {
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     ASSERT_OK(signer.AddKey(std::move(key)));
@@ -546,9 +626,9 @@ TEST_F(TokenTest, TestEndToEnd_Valid) {
 // See VerificationResult.
 TEST_F(TokenTest, TestEndToEnd_InvalidCases) {
   // Key rotation interval 0 allows adding 2 keys in a row with no delay.
-  TokenSigner signer(10, 0);
+  TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 0);
   {
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     ASSERT_OK(signer.AddKey(std::move(key)));
@@ -599,7 +679,7 @@ TEST_F(TokenTest, TestEndToEnd_InvalidCases) {
   // verify, we expect the verifier to complain the key is unknown.
   {
     {
-      std::unique_ptr<TokenSigningPrivateKey> key;
+      unique_ptr<TokenSigningPrivateKey> key;
       ASSERT_OK(signer.CheckNeedKey(&key));
       ASSERT_NE(nullptr, key.get());
       ASSERT_OK(signer.AddKey(std::move(key)));
@@ -673,5 +753,131 @@ TEST_F(TokenTest, TestTokenVerifierImportKeys) {
   }
 }
 
+// Test using different token validity intervals.
+TEST_F(TokenTest, TestVaryingTokenValidityIntervals) {
+  constexpr int kShortValiditySeconds = 2;
+  const int kLongValiditySeconds = kShortValiditySeconds * 3;
+  auto verifier(make_shared<TokenVerifier>());
+  TokenSigner signer(kLongValiditySeconds, kShortValiditySeconds, 10, verifier);
+  unique_ptr<TokenSigningPrivateKey> key;
+  ASSERT_OK(signer.CheckNeedKey(&key));
+  ASSERT_NE(nullptr, key.get());
+  ASSERT_OK(signer.AddKey(std::move(key)));
+
+  const TablePrivilegePB table_privilege;
+  SignedTokenPB signed_authn;
+  SignedTokenPB signed_authz;
+  ASSERT_OK(signer.GenerateAuthnToken(kUser, &signed_authn));
+  ASSERT_OK(signer.GenerateAuthzToken(kUser, table_privilege, &signed_authz));
+  TokenPB authn_token;
+  TokenPB authz_token;
+  ASSERT_EQ(VerificationResult::VALID, verifier->VerifyTokenSignature(signed_authn, &authn_token));
+  ASSERT_EQ(VerificationResult::VALID, verifier->VerifyTokenSignature(signed_authz, &authz_token));
+
+  // Wait for the authz validity interval to pass and verify its expiration.
+  SleepFor(MonoDelta::FromSeconds(1 + kShortValiditySeconds));
+  EXPECT_EQ(VerificationResult::VALID, verifier->VerifyTokenSignature(signed_authn, &authn_token));
+  EXPECT_EQ(VerificationResult::EXPIRED_TOKEN,
+            verifier->VerifyTokenSignature(signed_authz, &authz_token));
+
+  // Wait for the authn validity interval to pass and verify its expiration.
+  SleepFor(MonoDelta::FromSeconds(kLongValiditySeconds - kShortValiditySeconds));
+  EXPECT_EQ(VerificationResult::EXPIRED_TOKEN,
+            verifier->VerifyTokenSignature(signed_authn, &authn_token));
+  EXPECT_EQ(VerificationResult::EXPIRED_TOKEN,
+            verifier->VerifyTokenSignature(signed_authz, &authz_token));
+}
+
+// Test to check the invariant that all tokens signed within a TSK's activity
+// interval must be expired by the end of the TSK's validity interval.
+TEST_F(TokenTest, TestKeyValidity) {
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+    return;
+  }
+  // Note: this test's runtime is roughly the length of a key-validity
+  // interval, which is determined by the token validity intervals and the key
+  // rotation interval.
+  const int kShortValiditySeconds = 2;
+  const int kLongValiditySeconds = 6;
+  const int kKeyRotationSeconds = 5;
+  auto verifier(make_shared<TokenVerifier>());
+  TokenSigner signer(kLongValiditySeconds, kShortValiditySeconds, kKeyRotationSeconds, verifier);
+  unique_ptr<TokenSigningPrivateKey> key;
+  ASSERT_OK(signer.CheckNeedKey(&key));
+  ASSERT_NE(nullptr, key.get());
+  ASSERT_OK(signer.AddKey(std::move(key)));
+
+  // First, start a countdown for the first TSK's validity interval. Any token
+  // signed during the first TSK's activity interval must be expired once this
+  // latch counts down.
+  vector<thread> threads;
+  CountDownLatch first_tsk_validity_latch(1);
+  const double key_validity_seconds = signer.key_validity_seconds_;
+  threads.emplace_back([&first_tsk_validity_latch, key_validity_seconds] {
+    SleepFor(MonoDelta::FromSeconds(key_validity_seconds));
+    LOG(INFO) << Substitute("First TSK's validity interval of $0 secs has finished!",
+                            key_validity_seconds);
+    first_tsk_validity_latch.CountDown();
+  });
+
+  // Set up a second TSK so our threads can rotate TSKs when the time comes.
+  while (true) {
+    KLOG_EVERY_N_SECS(INFO, 1) << "Waiting for a second key...";
+    unique_ptr<TokenSigningPrivateKey> tsk;
+    ASSERT_OK(signer.CheckNeedKey(&tsk));
+    if (tsk) {
+      LOG(INFO) << "Added second key!";
+      ASSERT_OK(signer.AddKey(std::move(tsk)));
+      break;
+    }
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+
+  // Utility lambda to check that the token is expired.
+  const auto verify_expired = [&verifier] (const SignedTokenPB& signed_token,
+                                           const string& token_type) {
+    TokenPB token_pb;
+    const auto result = verifier->VerifyTokenSignature(signed_token, &token_pb);
+    const auto expire_secs = token_pb.expire_unix_epoch_seconds();
+    ASSERT_EQ(VerificationResult::EXPIRED_TOKEN, result)
+        << Substitute("validation result '$0': $1 token expires at $2, now $3",
+                      VerificationResultToString(result), token_type,
+                      expire_secs, WallTime_Now());
+  };
+
+  // Create a thread that repeatedly signs new authn tokens, returning the
+  // final one signed by TSK with seq_num 0. At the end of the key validity
+  // period, this token will not be valid.
+  vector<SignedTokenPB> tsks(2);
+  vector<Status> results(2);
+  threads.emplace_back([&] {
+    results[0] = SignUntilRotatePast(&signer,
+        [&] (SignedTokenPB* signed_token) {
+          return signer.GenerateAuthnToken(kUser, signed_token);
+        },
+        "authn", 0, &tsks[0]);
+    first_tsk_validity_latch.Wait();
+  });
+
+  // Do the same for authz tokens.
+  threads.emplace_back([&] {
+    results[1] = SignUntilRotatePast(&signer,
+        [&] (SignedTokenPB* signed_token) {
+          return signer.GenerateAuthzToken(kUser, TablePrivilegePB(), signed_token);
+        },
+        "authz", 0, &tsks[1]);
+    first_tsk_validity_latch.Wait();
+  });
+
+  for (auto& t : threads) {
+    t.join();
+  }
+  EXPECT_OK(results[0]);
+  EXPECT_OK(results[1]);
+  NO_FATALS(verify_expired(tsks[0], "authn"));
+  NO_FATALS(verify_expired(tsks[1], "authz"));
+}
+
 } // namespace security
 } // namespace kudu
diff --git a/be/src/kudu/security/token.proto b/be/src/kudu/security/token.proto
index e27ccdb..ea63ce2 100644
--- a/be/src/kudu/security/token.proto
+++ b/be/src/kudu/security/token.proto
@@ -21,11 +21,42 @@ option java_package = "org.apache.kudu.security";
 
 import "kudu/util/pb_util.proto";
 
+message ColumnPrivilegePB {
+  // If set, the user has privileges to select and apply predicates on the
+  // column during scans.
+  optional bool scan_privilege = 1;
+};
+
+message TablePrivilegePB {
+  // The ID of the table to which the privileges apply.
+  optional string table_id = 1;
+
+  // If set, the user is authorized to select and apply predicates to all
+  // columns when scanning the table, and `column_privileges` is ignored. If
+  // unset, the user may only scan and apply predicates to columns with the
+  // privileges specified in `column_privileges`.
+  optional bool scan_privilege = 2;
+
+  // If set, the user is authorized to insert rows into the table.
+  optional bool insert_privilege= 3;
+
+  // If set, the user is authorized to update rows in the table.
+  optional bool update_privilege = 4;
+
+  // If set, the user is authorized to delete rows in the table.
+  optional bool delete_privilege = 5;
+
+  // Per-column privileges, indexed by column ID.
+  map<int32, ColumnPrivilegePB> column_privileges = 6;
+};
+
 message AuthnTokenPB {
   optional string username = 1;
 };
 
 message AuthzTokenPB {
+  optional string username = 1;
+  optional TablePrivilegePB table_privilege = 2;
 };
 
 message TokenPB {
diff --git a/be/src/kudu/security/token_signer.cc b/be/src/kudu/security/token_signer.cc
index 08c84be..aaf1be8 100644
--- a/be/src/kudu/security/token_signer.cc
+++ b/be/src/kudu/security/token_signer.cc
@@ -56,17 +56,21 @@ namespace kudu {
 namespace security {
 
 TokenSigner::TokenSigner(int64_t authn_token_validity_seconds,
+                         int64_t authz_token_validity_seconds,
                          int64_t key_rotation_seconds,
                          shared_ptr<TokenVerifier> verifier)
     : verifier_(verifier ? std::move(verifier)
                          : std::make_shared<TokenVerifier>()),
       authn_token_validity_seconds_(authn_token_validity_seconds),
+      authz_token_validity_seconds_(authz_token_validity_seconds),
       key_rotation_seconds_(key_rotation_seconds),
       // The TSK propagation interval is equal to the rotation interval.
-      key_validity_seconds_(2 * key_rotation_seconds_ + authn_token_validity_seconds_),
+      key_validity_seconds_(2 * key_rotation_seconds_ +
+          std::max(authn_token_validity_seconds_, authz_token_validity_seconds)),
       last_key_seq_num_(-1) {
   CHECK_GE(key_rotation_seconds_, 0);
   CHECK_GE(authn_token_validity_seconds_, 0);
+  CHECK_GE(authz_token_validity_seconds_, 0);
   CHECK(verifier_);
 }
 
@@ -134,6 +138,27 @@ Status TokenSigner::ImportKeys(const vector<TokenSigningPrivateKeyPB>& keys) {
   return Status::OK();
 }
 
+Status TokenSigner::GenerateAuthzToken(string username,
+                                       TablePrivilegePB privilege,
+                                       SignedTokenPB* signed_token) const {
+  if (username.empty()) {
+    return Status::InvalidArgument("no username provided for authz token");
+  }
+  TokenPB token;
+  token.set_expire_unix_epoch_seconds(WallTime_Now() + authz_token_validity_seconds_);
+  AuthzTokenPB* authz = token.mutable_authz();
+  authz->set_username(std::move(username));
+  *authz->mutable_table_privilege() = std::move(privilege);
+
+  SignedTokenPB ret;
+  if (!token.SerializeToString(ret.mutable_token_data())) {
+    return Status::RuntimeError("could not serialize authz token");
+  }
+  RETURN_NOT_OK(SignToken(&ret));
+  *signed_token = std::move(ret);
+  return Status::OK();
+}
+
 Status TokenSigner::GenerateAuthnToken(string username,
                                        SignedTokenPB* signed_token) const {
   if (username.empty()) {
@@ -162,7 +187,7 @@ Status TokenSigner::SignToken(SignedTokenPB* token) const {
     return Status::IllegalState("no token signing key");
   }
   const TokenSigningPrivateKey* key = tsk_deque_.front().get();
-  RETURN_NOT_OK_PREPEND(key->Sign(token), "could not sign authn token");
+  RETURN_NOT_OK_PREPEND(key->Sign(token), "could not sign token");
   return Status::OK();
 }
 
@@ -194,9 +219,10 @@ Status TokenSigner::CheckNeedKey(unique_ptr<TokenSigningPrivateKey>* tsk) const
     // It's enough to have just one active key and next key ready to be
     // activated when it's time to do so.  However, it does not mean the
     // process of key refreshment is about to stop once there are two keys
-    // in the queue: the TryRotate() method (which should be called periodically
-    // along with CheckNeedKey()/AddKey() pair) will eventually pop the
-    // current key out of the keys queue once the key enters its inactive phase.
+    // in the queue: the TryRotateKey() method (which should be called
+    // periodically along with CheckNeedKey()/AddKey() pair) will eventually
+    // pop the current key out of the keys queue once the key enters its
+    // inactive phase.
     tsk->reset();
     return Status::OK();
   }
diff --git a/be/src/kudu/security/token_signer.h b/be/src/kudu/security/token_signer.h
index df1e3eb..36f5a65 100644
--- a/be/src/kudu/security/token_signer.h
+++ b/be/src/kudu/security/token_signer.h
@@ -33,6 +33,7 @@ class Status;
 
 namespace security {
 class SignedTokenPB;
+class TablePrivilegePB;
 class TokenSigningPrivateKey;
 class TokenSigningPrivateKeyPB;
 class TokenVerifier;
@@ -74,16 +75,18 @@ class TokenVerifier;
 // '-' propagation interval
 //       The TSK is already created but not yet used to sign tokens. However,
 //       its public part is already being sent to the components which
-//       may be involved in validation of tokens signed by the key.
+//       may be involved in validation of tokens signed by the key. This is
+//       exactly 'tsk_propagation_interval' below.
 //
 // 'A' activity interval
 //       The TSK is used to sign tokens. It's assumed that the components which
-//       are involved in token verification have already received
-//       the corresponding public part of the TSK.
+//       are involved in token verification have already received the
+//       corresponding public part of the TSK.
 //
 // '=' inactivity interval
-//       The TSK is no longer used to sign tokens. However, it's still sent
-//       to other components which validate token signatures.
+//       The TSK is no longer used to sign tokens. However, its public part is
+//       still sent to other components and can be used to validate token
+//       signatures.
 //
 // Shortly after the TSK's expiration the token signing components stop
 // propagating its public part.
@@ -98,8 +101,8 @@ class TokenVerifier;
 //       implications, so it's worth considering rolling twice at startup.
 //
 // For example, consider the following configuration for token signing keys:
-//   validity period:      4 days
-//   rotation interval:    1 days
+//   key validity period:  4 days
+//   rotation interval:    1 day
 //   propagation interval: 1 day
 //
 // Day      1    2    3    4    5    6    7    8
@@ -111,24 +114,34 @@ class TokenVerifier;
 //                              ...............
 // authn token:                     <**********>
 //
-// 'A' indicates the 'Originator Usage Period' (a.k.a. 'Activity Interval'),
-// i.e. the period in which the key is being used to sign tokens.
+// 'A' indicates 'Activity Interval', i.e. the period during which the key is
+// being used to sign tokens. In cryptographic terms, this is the 'Originator
+// Usage Period'. Note that the Activity Interval is identically the rotation
+// interval -- a key is active for some amount of time, after which, we rotate.
 //
-// '<...>' indicates the 'Recipient Usage Period': the period in which
-// the verifier may get tokens signed by the TSK and should consider them
-// for verification. The start of the recipient usage period is not crucial
-// in that regard, but the end of that period is -- after the TSK is expired,
-// a verifier should consider tokens signed by that TSK invalid and stop
-// accepting them even if the token signature is correct and the expiration.
+// '<...>' in cryptographic terms, indicates the 'Recipient Usage Period': the
+// period during which the public part of a key is used to sign tokens, and
+// verifiers will consider tokens signed by the given key as valid. At the end
+// of this period, a verifier should consider tokens signed by the given TSK
+// invalid and stop accepting them, even if the token signature is correct. The
+// start of the period is not crucial to the validity of a token, so we don't
+// perform verification for it.
 //
 // '<***>' indicates the validity interval for an authn token.
 //
-// When configuring key rotation and authn token validity interval durations,
+// When configuring key rotation and token validity interval durations,
 // consider the following constraint:
 //
-//   max_token_validity < tsk_validity_period -
+// Eq 1.
+//   max_token_validity = tsk_validity_period -
 //       (tsk_propagation_interval + tsk_rotation_interval)
 //
+// Note how if the validity period for a token created at the end of the
+// Activity Interval were to extend any farther than the above
+// 'max_token_validity', it would be considered valid beyond the end of the
+// 'tsk_validity_period', which would break the constraint that the token only
+// be valid within the TSK's validity period.
+//
 // The idea is that the token validity interval should be contained in the
 // corresponding TSK's validity interval. If the TSK is already expired at the
 // time of token verification, the token is considered invalid and the
@@ -139,7 +152,7 @@ class TokenVerifier;
 //
 // * A TSK is issued at 00:00:00 on day 4.
 // * An authn token generated and signed by current/active TSK at 23:59:59 on
-//   day 6. That's at the very end of the TSK's activity interval.
+//   day 5. That's at the very end of the TSK's activity interval.
 // * From the diagram above it's clear that if the authn token validity
 //   interval were set to something longer than TSK inactivity interval
 //   (which is 2 days with for the specified parameters), an attempt to verify
@@ -179,23 +192,29 @@ class TokenVerifier;
 //
 class TokenSigner {
  public:
-  // The 'key_validity_seconds' and 'key_rotation_seconds' parameters define
-  // the schedule of TSK rotation. See the class comment above for details.
+  // The token validity and 'key_rotation_seconds' parameters define the
+  // schedule of TSK rotation. See the class comment above for details.
   //
   // Any newly imported or generated keys are automatically imported into the
   // passed 'verifier'. If no verifier passed as a parameter, TokenSigner
   // creates one on its own. In either case, it's possible to access
   // the embedded TokenVerifier instance using the verifier() accessor.
   //
-  // The 'authn_token_validity_seconds' parameter is used to specify validity
-  // interval for the generated authn tokens and with 'key_rotation_seconds'
-  // it defines validity interval of the newly generated TSK:
-  //   key_validity = 2 * key_rotation + authn_token_validity.
+  // The 'authn_token_validity_seconds' and 'authz_token_validity_seconds'
+  // parameters are used to specify validity intervals for the generated tokens
+  // and with 'key_rotation_seconds' it defines validity interval of the newly
+  // generated TSK:
+  //
+  // Eq 2.
+  //   key_validity =
+  //      2 * key_rotation + max(authn_token_validity, authz_token_validity)
   //
-  // That corresponds to the maximum possible token lifetime for the effective
-  // TSK validity and rotation intervals: see the class comment above for
-  // details.
+  // This selects the 'max_token_validity' in Eq 1 as the higher of the authn
+  // and authz token validity intervals, and based on that, calculates the
+  // effective TSK validity period based on the provided rotation interval.
+  // See the above class comment for details.
   TokenSigner(int64_t authn_token_validity_seconds,
+              int64_t authz_token_validity_seconds,
               int64_t key_rotation_seconds,
               std::shared_ptr<TokenVerifier> verifier = nullptr);
   ~TokenSigner();
@@ -266,6 +285,16 @@ class TokenSigner {
   // See the class comment above for more information about the intended usage.
   Status TryRotateKey(bool* has_rotated = nullptr) WARN_UNUSED_RESULT;
 
+  // Populates 'signed_token' with a signed authorization token with the given
+  // 'username' and table privilege. Returns an error if 'username' is empty,
+  // or if the created authn token could not be serialized for some reason.
+  Status GenerateAuthzToken(std::string username,
+                            TablePrivilegePB privilege,
+                            SignedTokenPB* signed_token) const WARN_UNUSED_RESULT;
+
+  // Populates 'signed_token' with a signed authentication token with the given
+  // 'username'. Returns an error if 'username' is empty, or if the created
+  // authz token could not be serialized for some reason.
   Status GenerateAuthnToken(std::string username,
                             SignedTokenPB* signed_token) const WARN_UNUSED_RESULT;
 
@@ -279,6 +308,9 @@ class TokenSigner {
 
  private:
   FRIEND_TEST(TokenTest, TestEndToEnd_InvalidCases);
+  FRIEND_TEST(TokenTest, TestIsCurrentKeyValid);
+  FRIEND_TEST(TokenTest, TestTokenSignerAddKeyAfterImport);
+  FRIEND_TEST(TokenTest, TestKeyValidity);
 
   static Status GenerateSigningKey(int64_t key_seq_num,
                                    int64_t key_expiration,
@@ -286,8 +318,9 @@ class TokenSigner {
 
   std::shared_ptr<TokenVerifier> verifier_;
 
-  // Validity interval for the generated authn tokens.
+  // Validity intervals for the generated tokens.
   const int64_t authn_token_validity_seconds_;
+  const int64_t authz_token_validity_seconds_;
 
   // TSK rotation interval: number of seconds between consecutive activations
   // of new token signing keys. Note that in current implementation it defines
diff --git a/be/src/kudu/security/token_verifier.cc b/be/src/kudu/security/token_verifier.cc
index 1ae20db..e3aef4c 100644
--- a/be/src/kudu/security/token_verifier.cc
+++ b/be/src/kudu/security/token_verifier.cc
@@ -123,7 +123,7 @@ VerificationResult TokenVerifier::VerifyTokenSignature(const SignedTokenPB& sign
 
   for (auto flag : token->incompatible_features()) {
     if (!TokenPB::Feature_IsValid(flag)) {
-      KLOG_EVERY_N_SECS(WARNING, 60) << "received authentication token with unknown feature; "
+      KLOG_EVERY_N_SECS(WARNING, 60) << "received token with unknown feature; "
                                         "server needs to be updated";
       return VerificationResult::INCOMPATIBLE_FEATURE;
     }
@@ -151,17 +151,17 @@ const char* VerificationResultToString(VerificationResult r) {
     case security::VerificationResult::VALID:
       return "valid";
     case security::VerificationResult::INVALID_TOKEN:
-      return "invalid authentication token";
+      return "invalid token";
     case security::VerificationResult::INVALID_SIGNATURE:
-      return "invalid authentication token signature";
+      return "invalid token signature";
     case security::VerificationResult::EXPIRED_TOKEN:
-      return "authentication token expired";
+      return "token expired";
     case security::VerificationResult::EXPIRED_SIGNING_KEY:
-      return "authentication token signing key expired";
+      return "token signing key expired";
     case security::VerificationResult::UNKNOWN_SIGNING_KEY:
-      return "authentication token signed with unknown key";
+      return "token signed with unknown key";
     case security::VerificationResult::INCOMPATIBLE_FEATURE:
-      return "authentication token uses incompatible feature";
+      return "token uses incompatible feature";
     default:
       LOG(FATAL) << "unexpected VerificationResult value: "
                  << static_cast<int>(r);
diff --git a/be/src/kudu/util/test_util.cc b/be/src/kudu/util/test_util.cc
index c960441..c514d0e 100644
--- a/be/src/kudu/util/test_util.cc
+++ b/be/src/kudu/util/test_util.cc
@@ -36,6 +36,7 @@
 #include <sys/param.h> // for MAXPATHLEN
 #endif
 
+#include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <gtest/gtest-spi.h>
@@ -43,6 +44,7 @@
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/strcat.h"
+#include "kudu/gutil/strings/stringpiece.h"
 #include "kudu/gutil/strings/strip.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
@@ -54,6 +56,7 @@
 #include "kudu/util/slice.h"
 #include "kudu/util/spinlock_profiling.h"
 #include "kudu/util/status.h"
+#include "kudu/util/string_case.h"
 #include "kudu/util/subprocess.h"
 
 DEFINE_string(test_leave_files, "on_failure",
@@ -62,6 +65,7 @@ DEFINE_string(test_leave_files, "on_failure",
 
 DEFINE_int32(test_random_seed, 0, "Random seed to use for randomized tests");
 
+using boost::optional;
 using std::string;
 using std::vector;
 using strings::Substitute;
@@ -378,7 +382,9 @@ int CountOpenFds(Env* env, const string& path_pattern) {
 }
 
 namespace {
-Status WaitForBind(pid_t pid, uint16_t* port, const char* kind, MonoDelta timeout) {
+Status WaitForBind(pid_t pid, uint16_t* port,
+                   const optional<const string&>& addr,
+                   const char* kind, MonoDelta timeout) {
   // In general, processes do not expose the port they bind to, and
   // reimplementing lsof involves parsing a lot of files in /proc/. So,
   // requiring lsof for tests and parsing its output seems more
@@ -394,15 +400,52 @@ Status WaitForBind(pid_t pid, uint16_t* port, const char* kind, MonoDelta timeou
     "-a", "-i", kind
   };
 
+  // The '-Ffn' flag gets lsof to output something like:
+  //   p5801
+  //   f548
+  //   n127.0.0.1:43954->127.0.0.1:43617
+  //   f549
+  //   n*:8038
+  //
+  // The first line is the pid. We ignore it.
+  // Subsequent lines come in pairs. In each pair, the first half of the pair
+  // is file descriptor number, we ignore it.
+  // The second half has the bind address and port.
+  //
+  // In this example, the first pair is an outbound TCP socket. We ignore it.
+  // The second pair is the listening TCP socket bind address and port.
+  //
+  // We use the first encountered listening TCP socket, since that's most likely
+  // to be the primary service port. When searching, we use the provided bind
+  // address if there is any, otherwise we use '*' (same as '0.0.0.0') which
+  // matches all addresses on the local machine.
+  string addr_pattern = Substitute("n$0:", (!addr || *addr == "0.0.0.0") ? "*" : *addr);
   MonoTime deadline = MonoTime::Now() + timeout;
   string lsof_out;
+  int32_t p = -1;
 
   for (int64_t i = 1; ; i++) {
     lsof_out.clear();
-    Status s = Subprocess::Call(cmd, "", &lsof_out);
+    Status s = Subprocess::Call(cmd, "", &lsof_out).AndThen([&] () {
+      StripTrailingNewline(&lsof_out);
+      vector<string> lines = strings::Split(lsof_out, "\n");
+      for (int index = 2; index < lines.size(); index += 2) {
+        StringPiece cur_line(lines[index]);
+        if (HasPrefixString(cur_line.ToString(), addr_pattern) &&
+            !cur_line.contains("->")) {
+          cur_line.remove_prefix(addr_pattern.size());
+          if (!safe_strto32(cur_line.data(), cur_line.size(), &p)) {
+            return Status::RuntimeError("unexpected lsof output", lsof_out);
+          }
+
+          return Status::OK();
+        }
+      }
+
+      return Status::RuntimeError("unexpected lsof output", lsof_out);
+    });
 
     if (s.ok()) {
-      StripTrailingNewline(&lsof_out);
       break;
     }
     if (deadline < MonoTime::Now()) {
@@ -412,22 +455,6 @@ Status WaitForBind(pid_t pid, uint16_t* port, const char* kind, MonoDelta timeou
     SleepFor(MonoDelta::FromMilliseconds(i * 10));
   }
 
-  // The '-Ffn' flag gets lsof to output something like:
-  //   p19730
-  //   f123
-  //   n*:41254
-  // The first line is the pid. We ignore it.
-  // The second line is the file descriptor number. We ignore it.
-  // The third line has the bind address and port.
-  // Subsequent lines show active connections.
-  vector<string> lines = strings::Split(lsof_out, "\n");
-  int32_t p = -1;
-  if (lines.size() < 3 ||
-      lines[2].substr(0, 3) != "n*:" ||
-      !safe_strto32(lines[2].substr(3), &p) ||
-      p <= 0) {
-    return Status::RuntimeError("unexpected lsof output", lsof_out);
-  }
   CHECK(p > 0 && p < std::numeric_limits<uint16_t>::max()) << "parsed invalid port: " << p;
   VLOG(1) << "Determined bound port: " << p;
   *port = p;
@@ -435,12 +462,31 @@ Status WaitForBind(pid_t pid, uint16_t* port, const char* kind, MonoDelta timeou
 }
 } // anonymous namespace
 
-Status WaitForTcpBind(pid_t pid, uint16_t* port, MonoDelta timeout) {
-  return WaitForBind(pid, port, "4TCP", timeout);
+Status WaitForTcpBind(pid_t pid, uint16_t* port,
+                      const optional<const string&>& addr,
+                      MonoDelta timeout) {
+  return WaitForBind(pid, port, addr, "4TCP", timeout);
+}
+
+Status WaitForUdpBind(pid_t pid, uint16_t* port,
+                      const optional<const string&>& addr,
+                      MonoDelta timeout) {
+  return WaitForBind(pid, port, addr, "4UDP", timeout);
 }
 
-Status WaitForUdpBind(pid_t pid, uint16_t* port, MonoDelta timeout) {
-  return WaitForBind(pid, port, "4UDP", timeout);
+Status FindHomeDir(const string& name, const string& bin_dir, string* home_dir) {
+  string name_upper;
+  ToUpperCase(name, &name_upper);
+
+  string env_var = Substitute("$0_HOME", name_upper);
+  const char* env = std::getenv(env_var.c_str());
+  string dir = env == nullptr ? JoinPathSegments(bin_dir, Substitute("$0-home", name)) : env;
+
+  if (!Env::Default()->FileExists(dir)) {
+    return Status::NotFound(Substitute("$0 directory does not exist", env_var), dir);
+  }
+  *home_dir = dir;
+  return Status::OK();
 }
 
 } // namespace kudu
diff --git a/be/src/kudu/util/test_util.h b/be/src/kudu/util/test_util.h
index 8090fbc..d320e3e 100644
--- a/be/src/kudu/util/test_util.h
+++ b/be/src/kudu/util/test_util.h
@@ -26,11 +26,19 @@
 #include <memory>
 #include <string>
 
+#include <boost/optional/optional.hpp>
 #include <gtest/gtest.h>
 
 #include "kudu/gutil/port.h"
 #include "kudu/util/monotime.h"
 
+#define SKIP_IF_SLOW_NOT_ALLOWED() do { \
+  if (!AllowSlowTests()) { \
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; \
+    return; \
+  } \
+} while (0)
+
 #define ASSERT_EVENTUALLY(expr) do { \
   AssertEventually(expr); \
   NO_PENDING_FATALS(); \
@@ -136,11 +144,26 @@ void AssertEventually(const std::function<void(void)>& f,
 // unlike the usual behavior of path globs.
 int CountOpenFds(Env* env, const std::string& path_pattern);
 
-// Waits for the subprocess to bind to any listening TCP port, and returns the port.
-Status WaitForTcpBind(pid_t pid, uint16_t* port, MonoDelta timeout) WARN_UNUSED_RESULT;
+// Waits for the subprocess to bind to any listening TCP port on the provided
+// IP address (if the address is not provided, it is a wildcard binding), and
+// returns the port.
+Status WaitForTcpBind(pid_t pid, uint16_t* port,
+                      const boost::optional<const std::string&>& addr,
+                      MonoDelta timeout) WARN_UNUSED_RESULT;
 
-// Waits for the subprocess to bind to any listening UDP port, and returns the port.
-Status WaitForUdpBind(pid_t pid, uint16_t* port, MonoDelta timeout) WARN_UNUSED_RESULT;
+// Similar to above but binds to any listening UDP port.
+Status WaitForUdpBind(pid_t pid, uint16_t* port,
+                      const boost::optional<const std::string&>& addr,
+                      MonoDelta timeout) WARN_UNUSED_RESULT;
+
+// Find the home directory of a Java-style application, e.g. JAVA_HOME or
+// HADOOP_HOME.
+//
+// Checks the environment, or falls back to a symlink in the bin installation
+// directory.
+Status FindHomeDir(const std::string& name,
+                   const std::string& bin_dir,
+                   std::string* home_dir) WARN_UNUSED_RESULT;
 
 } // namespace kudu
 #endif