You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/07/01 20:20:31 UTC

[impala] 02/02: Update kudu/security from 9ebcb77aa911aae76c48e717af24e643cb81908d

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b8e20905a80f6541efdb2161333fb1552a976135
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Fri Jun 28 11:16:45 2019 -0700

    Update kudu/security from 9ebcb77aa911aae76c48e717af24e643cb81908d
    
    This updates the kudu security code to the latest version, which
    includes support for GSSAPI calls, necessary for SPNEGO.
    
    This is a straight rsync of the kudu/util source code except for some
    minor CMakeLists changes that were carried over from the old version.
    
    Change-Id: Ie7c91193fd49f8ca1234b23cf61fc90c1fdbe2e0
    Reviewed-on: http://gerrit.cloudera.org:8080/13767
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/kudu/security/CMakeLists.txt     |   2 +
 be/src/kudu/security/gssapi.cc          | 164 ++++++++++++++++
 be/src/kudu/security/gssapi.h           |  59 ++++++
 be/src/kudu/security/init.cc            |  12 ++
 be/src/kudu/security/openssl_util.cc    |  41 +++-
 be/src/kudu/security/test/mini_kdc.cc   |  34 +++-
 be/src/kudu/security/test/mini_kdc.h    |   8 +
 be/src/kudu/security/tls_socket-test.cc |  71 ++++---
 be/src/kudu/security/tls_socket.cc      |  12 +-
 be/src/kudu/security/token-test.cc      | 322 ++++++++++++++++++++++++++------
 be/src/kudu/security/token.proto        |  31 +++
 be/src/kudu/security/token_signer.cc    |  36 +++-
 be/src/kudu/security/token_signer.h     |  89 ++++++---
 be/src/kudu/security/token_verifier.cc  |  14 +-
 be/src/kudu/util/test_util.cc           |  92 ++++++---
 be/src/kudu/util/test_util.h            |  31 ++-
 16 files changed, 847 insertions(+), 171 deletions(-)

diff --git a/be/src/kudu/security/CMakeLists.txt b/be/src/kudu/security/CMakeLists.txt
index 63d6341..9533523 100644
--- a/be/src/kudu/security/CMakeLists.txt
+++ b/be/src/kudu/security/CMakeLists.txt
@@ -68,6 +68,7 @@ set(SECURITY_SRCS
   cert.cc
   crypto.cc
   kerberos_util.cc
+  gssapi.cc
   init.cc
   openssl_util.cc
   ${PORTED_X509_CHECK_HOST_CC}
@@ -86,6 +87,7 @@ set(SECURITY_LIBS
   kudu_util
   token_proto
 
+  gssapi_krb5
   krb5
   openssl_crypto
   openssl_ssl)
diff --git a/be/src/kudu/security/gssapi.cc b/be/src/kudu/security/gssapi.cc
new file mode 100644
index 0000000..6797ec3
--- /dev/null
+++ b/be/src/kudu/security/gssapi.cc
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/gssapi.h"
+
+#include <cstring>
+#include <string>
+#include <utility>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/escaping.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+
+using std::string;
+
+namespace kudu {
+namespace gssapi {
+
+namespace {
+
+string ErrorToString(OM_uint32 code, OM_uint32 type) {
+  string description;
+  OM_uint32 message_context = 0;
+
+  do {
+    if (!description.empty()) {
+      description.append(": ");
+    }
+    OM_uint32 minor = 0;
+    gss_buffer_desc buf;
+    gss_display_status(&minor, code, type, GSS_C_NULL_OID, &message_context, &buf);
+    const char* err = static_cast<const char*>(buf.value);
+    // The error message buffer returned has an explicit length, but in some cases
+    // this length appears to include the null terminator character, and in others
+    // it doesn't. So, we trim off any null terminators if necessary.
+    int err_len = strnlen(err, buf.length);
+    description.append(err, err_len);
+
+    // NOTE: call gss_release_buffer explicitly here instead of ReleaseBufferOrWarn
+    // to avoid the potential for infinite recursion back into ErrorToString.
+    gss_release_buffer(&minor, &buf);
+  } while (message_context != 0);
+
+  return description;
+}
+
+// Wrap a call to F(...), which is typically one of the gss_* functions from
+// the gssapi library. These functions all return a major status code and
+// also provide a minor status code as their first out-param.
+template <class F, class... Args>
+Status WrapGssCall(F func, Args&&... args) {
+  OM_uint32 minor = 0;
+  OM_uint32 major = func(&minor, std::forward<Args>(args)...);
+  return MajorMinorToStatus(major, minor);
+}
+
+void ReleaseNameOrWarn(gss_name_t name) {
+  if (name == GSS_C_NO_NAME) return;
+  WARN_NOT_OK(WrapGssCall(gss_release_name, &name), "Unable to release GSS name");
+}
+
+void ReleaseBufferOrWarn(gss_buffer_t buf) {
+  if (buf == GSS_C_NO_BUFFER) return;
+  WARN_NOT_OK(WrapGssCall(gss_release_buffer, buf), "Unable to release GSS buffer");
+}
+
+void ReleaseContextOrWarn(gss_ctx_id_t ctx) {
+  if (ctx == GSS_C_NO_CONTEXT) return;
+  WARN_NOT_OK(WrapGssCall(gss_delete_sec_context, &ctx, GSS_C_NO_BUFFER),
+              "Unable to release GSS context");
+}
+
+} // anonymous namespace
+
+
+Status MajorMinorToStatus(OM_uint32 major, OM_uint32 minor) {
+  if (GSS_ERROR(major)) {
+    string maj_str = ErrorToString(major, GSS_C_GSS_CODE);
+    string min_str = minor != 0 ? ErrorToString(minor, GSS_C_MECH_CODE) : "";
+    return Status::NotAuthorized(maj_str, min_str);
+  }
+  if (major == GSS_S_CONTINUE_NEEDED) {
+    return Status::Incomplete("");
+  }
+
+  return Status::OK();
+}
+
+
+Status SpnegoStep(const string& in_token_b64,
+                  string* out_token_b64,
+                  bool* complete,
+                  string* authenticated_principal) {
+  string token;
+  if (!strings::Base64Unescape(in_token_b64, &token)) {
+    return Status::InvalidArgument("invalid base64 encoding for token");
+  }
+
+  // Workaround MIT krb5 bug [1] fixed in krb5 1.16 and 1.15.3:
+  //
+  // gssint_get_mech_type_oid() was missing some length verification that could
+  // cause reads past the end of the input token. So, we extend the actual
+  // allocation of the input token an extra 256 bytes of padding.
+  //
+  // Without this fix, our ASAN builds fail with an out-of-bounds read.
+  //
+  // [1] http://krbdev.mit.edu/rt/Ticket/History.html?id=8620
+  size_t real_token_size = token.size();
+  token.resize(real_token_size + 256);
+
+  gss_buffer_desc input_token {real_token_size, const_cast<char*>(token.data())};
+
+  gss_ctx_id_t ctx = GSS_C_NO_CONTEXT;
+  gss_name_t client_name = GSS_C_NO_NAME;
+  SCOPED_CLEANUP({ ReleaseNameOrWarn(client_name); });
+
+  gss_buffer_desc out_token {0, nullptr};
+  SCOPED_CLEANUP({ ReleaseBufferOrWarn(&out_token); });
+  Status s = WrapGssCall(gss_accept_sec_context, &ctx, GSS_C_NO_CREDENTIAL,
+                         &input_token, GSS_C_NO_CHANNEL_BINDINGS,
+                         &client_name, /*mech_type=*/ nullptr,
+                         &out_token, /*ret_flags=*/nullptr,
+                         /*time_rec=*/nullptr, /*delegated_cred_handle=*/nullptr);
+  SCOPED_CLEANUP({ ReleaseContextOrWarn(ctx); });
+  if (!s.ok() && !s.IsIncomplete()) {
+    return s;
+  }
+  *complete = s.ok();
+
+  if (*complete) {
+    gss_buffer_desc name_buf {0, nullptr};
+    SCOPED_CLEANUP({ ReleaseBufferOrWarn(&name_buf); });
+    RETURN_NOT_OK_PREPEND(WrapGssCall(gss_display_name, client_name, &name_buf, nullptr),
+                          "Unable to extract authenticated principal name");
+    authenticated_principal->assign(reinterpret_cast<char*>(name_buf.value),
+                                    name_buf.length);
+  }
+
+  string out_token_str(reinterpret_cast<char*>(out_token.value),
+                       out_token.length);
+  strings::Base64Escape(out_token_str, out_token_b64);
+
+  return Status::OK();
+}
+
+
+} // namespace gssapi
+} // namespace kudu
diff --git a/be/src/kudu/security/gssapi.h b/be/src/kudu/security/gssapi.h
new file mode 100644
index 0000000..036b3d4
--- /dev/null
+++ b/be/src/kudu/security/gssapi.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+
+#include <gssapi/gssapi.h>
+
+namespace kudu {
+
+class Status;
+
+namespace gssapi {
+
+// Convert the given major/minor GSSAPI error codes into a Status.
+Status MajorMinorToStatus(OM_uint32 major, OM_uint32 minor);
+
+// Run a step of SPNEGO authentication.
+//
+// 'in_token_b64' is the base64-encoded token provided by the client, which may be empty
+// if the client did not provide any such token (e.g. if the HTTP 'Authorization' header
+// was not present).
+
+// 'out_token_b64' is the base64-encoded output token to send back to the client
+// during this round of negotiation.
+//
+// If any error occurs (eg an invalid token is provided), a bad Status is returned.
+//
+// An OK status indicates that the negotiation is proceeding successfully, or has
+// completed, whereas a non-OK status indicates an error or an unsuccessful
+// authentication (in which case the out-parameters will not be modified).
+//
+// In the case of an OK status, '*complete' indicates whether any further rounds are
+// required. On completion of negotiation, 'authenticated_principal' will be set to the
+// full principal name of the remote user.
+//
+// NOTE: per the SPNEGO protocol, the final "complete" negotiation stage may
+// include a token.
+Status SpnegoStep(const std::string& in_token_b64,
+                  std::string* out_token_b64,
+                  bool* complete,
+                  std::string* authenticated_principal);
+
+} // namespace gssapi
+} // namespace kudu
diff --git a/be/src/kudu/security/init.cc b/be/src/kudu/security/init.cc
index 64ee1a5..200411f 100644
--- a/be/src/kudu/security/init.cc
+++ b/be/src/kudu/security/init.cc
@@ -48,6 +48,13 @@
 #include "kudu/util/status.h"
 #include "kudu/util/thread.h"
 
+#if defined(__APPLE__)
+// Almost all functions in the krb5 API are marked as deprecated in favor
+// of GSS.framework in macOS.
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
+#endif // #if defined(__APPLE__)
+
 #ifndef __APPLE__
 static constexpr bool kDefaultSystemAuthToLocal = true;
 #else
@@ -56,6 +63,7 @@ static constexpr bool kDefaultSystemAuthToLocal = true;
 // implementation.
 static constexpr bool kDefaultSystemAuthToLocal = false;
 #endif
+
 DEFINE_bool(use_system_auth_to_local, kDefaultSystemAuthToLocal,
             "When enabled, use the system krb5 library to map Kerberos principal "
             "names to local (short) usernames. If not enabled, the first component "
@@ -475,3 +483,7 @@ Status InitKerberosForServer(const std::string& raw_principal, const std::string
 
 } // namespace security
 } // namespace kudu
+
+#if defined(__APPLE__)
+#pragma GCC diagnostic pop
+#endif // #if defined(__APPLE__)
diff --git a/be/src/kudu/security/openssl_util.cc b/be/src/kudu/security/openssl_util.cc
index a32140f..e96fcb3 100644
--- a/be/src/kudu/security/openssl_util.cc
+++ b/be/src/kudu/security/openssl_util.cc
@@ -82,25 +82,49 @@ void LockingCB(int mode, int type, const char* /*file*/, int /*line*/) {
 #endif
 
 Status CheckOpenSSLInitialized() {
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+  // Starting with OpenSSL 1.1.0, the old thread API became obsolete
+  // (see changelist 2e52e7df5 in the OpenSSL upstream repo), and
+  // CRYPTO_get_locking_callback() always returns nullptr. Also, the library
+  // always initializes its internals for multi-threaded usage.
+  // Another point is that starting with version 1.1.0, SSL_CTX_new()
+  // initializes the OpenSSL library under the hood, so SSL_CTX_new() would
+  // not return nullptr unless there was an error during the initialization
+  // of the library. That makes this code in CheckOpenSSLInitialized() obsolete
+  // starting with OpenSSL version 1.1.0.
+  //
+  // Starting with OpenSSL 1.1.0, there isn't a straightforward way to
+  // determine whether the library has already been initialized if using just
+  // the API (well, there is CRYPTO_secure_malloc_initialized() but that's just
+  // for the crypto library and it's implementation-dependent). But from the
+  // other side, the whole idea that this method should check whether the
+  // library has already been initialized is not relevant anymore: even if it's
+  // not yet initialized, the first call to SSL_CTX_new() (from, say,
+  // TlsContext::Init()) will initialize the library under the hood, so the
+  // library will be ready for multi-thread usage by Kudu.
   if (!CRYPTO_get_locking_callback()) {
     return Status::RuntimeError("Locking callback not initialized");
   }
   auto ctx = ssl_make_unique(SSL_CTX_new(SSLv23_method()));
   if (!ctx) {
     ERR_clear_error();
-    return Status::RuntimeError("SSL library appears uninitialized (cannot create SSL_CTX)");
+    return Status::RuntimeError(
+        "SSL library appears uninitialized (cannot create SSL_CTX)");
   }
+#endif
   return Status::OK();
 }
 
 void DoInitializeOpenSSL() {
-#if OPENSSL_VERSION_NUMBER > 0x10100000L
+#if OPENSSL_VERSION_NUMBER >= 0x10100000L
   // The OPENSSL_init_ssl manpage [1] says "As of version 1.1.0 OpenSSL will
   // automatically allocate all resources it needs so no explicit initialisation
   // is required." However, eliding library initialization leads to a memory
-  // leak in some versions of OpenSSL 1.1 when the first OpenSSL is
-  // ERR_peek_error [2]. In Kudu this is often the
-  // case due to prolific application of SCOPED_OPENSSL_NO_PENDING_ERRORS.
+  // leak in some versions of OpenSSL 1.1 when the first OpenSSL call is
+  // ERR_peek_error (see [2] for details; the issue was addressed in OpenSSL
+  // 1.1.0i (OPENSSL_VERSION_NUMBER 0x1010009f)). In Kudu this is often the
+  // case due to prolific application of the SCOPED_OPENSSL_NO_PENDING_ERRORS
+  // macro.
   //
   // Rather than determine whether this particular OpenSSL instance is
   // leak-free, we'll initialize the library explicitly.
@@ -130,8 +154,11 @@ void DoInitializeOpenSSL() {
   auto ctx = ssl_make_unique(SSL_CTX_new(SSLv23_method()));
   if (ctx) {
     LOG(WARNING) << "It appears that OpenSSL has been previously initialized by "
-                 << "code outside of Kudu. Please use kudu::client::DisableOpenSSLInitialization() "
-                 << "to avoid potential crashes due to conflicting initialization.";
+                    "code outside of Kudu. Please first properly initialize "
+                    "OpenSSL for multi-threaded usage (setting thread callback "
+                    "functions for OpenSSL of versions earlier than 1.1.0) and "
+                    "then call kudu::client::DisableOpenSSLInitialization() "
+                    "to avoid potential crashes due to conflicting initialization.";
     // Continue anyway; all of the below is idempotent, except for the locking callback,
     // which we check before overriding. They aren't thread-safe, however -- that's why
     // we try to get embedding applications to do the right thing here rather than risk a
diff --git a/be/src/kudu/security/test/mini_kdc.cc b/be/src/kudu/security/test/mini_kdc.cc
index 4f987c5..1662770 100644
--- a/be/src/kudu/security/test/mini_kdc.cc
+++ b/be/src/kudu/security/test/mini_kdc.cc
@@ -25,6 +25,7 @@
 #include <string>
 #include <utility>
 
+#include <boost/optional/optional.hpp>
 #include <glog/logging.h>
 
 #include "kudu/gutil/strings/strip.h"
@@ -37,6 +38,7 @@
 #include "kudu/util/subprocess.h"
 #include "kudu/util/test_util.h"
 
+using boost::none;
 using std::map;
 using std::string;
 using std::unique_ptr;
@@ -61,9 +63,7 @@ MiniKdc::MiniKdc(MiniKdcOptions options)
     options_.realm = "KRBTEST.COM";
   }
   if (options_.data_root.empty()) {
-    // We hardcode "/tmp" here since the original function which initializes a random test
-    // directory (GetTestDataDirectory()), depends on gmock.
-    options_.data_root = JoinPathSegments("/tmp", "krb5kdc");
+    options_.data_root = JoinPathSegments(GetTestDataDirectory(), "krb5kdc");
   }
   if (options_.ticket_lifetime.empty()) {
     options_.ticket_lifetime = "24h";
@@ -152,8 +152,10 @@ Status MiniKdc::Start() {
   RETURN_NOT_OK(kdc_process_->Start());
 
   const bool need_config_update = (options_.port == 0);
-  // Wait for KDC to start listening on its ports and commencing operation.
-  RETURN_NOT_OK(WaitForUdpBind(kdc_process_->pid(), &options_.port, MonoDelta::FromSeconds(1)));
+  // Wait for KDC to start listening on its ports and commencing operation
+  // with a wildcard binding.
+  RETURN_NOT_OK(WaitForUdpBind(kdc_process_->pid(), &options_.port,
+                               /*addr=*/none, MonoDelta::FromSeconds(1)));
 
   if (need_config_update) {
     // If we asked for an ephemeral port, grab the actual ports and
@@ -274,6 +276,28 @@ Status MiniKdc::CreateServiceKeytab(const string& spn,
   return Status::OK();
 }
 
+Status MiniKdc::RandomizePrincipalKey(const string& spn) {
+  SCOPED_LOG_SLOW_EXECUTION(WARNING, 100, Substitute("randomizing key for $0", spn));
+  string kadmin;
+  RETURN_NOT_OK(GetBinaryPath("kadmin.local", &kadmin));
+  RETURN_NOT_OK(Subprocess::Call(MakeArgv({
+          kadmin, "-q", Substitute("change_password -randkey $0", spn)})));
+  return Status::OK();
+}
+
+Status MiniKdc::CreateKeytabForExistingPrincipal(const string& spn) {
+  SCOPED_LOG_SLOW_EXECUTION(WARNING, 100, Substitute("creating keytab for $0", spn));
+  string kt_path = spn;
+  StripString(&kt_path, "/", '_');
+  kt_path = JoinPathSegments(options_.data_root, kt_path) + ".keytab";
+
+  string kadmin;
+  RETURN_NOT_OK(GetBinaryPath("kadmin.local", &kadmin));
+  RETURN_NOT_OK(Subprocess::Call(MakeArgv({
+          kadmin, "-q", Substitute("xst -norandkey -k $0 $1", kt_path, spn)})));
+  return Status::OK();
+}
+
 Status MiniKdc::Kinit(const string& username) {
   SCOPED_LOG_SLOW_EXECUTION(WARNING, 100, Substitute("kinit for $0", username));
   string kinit;
diff --git a/be/src/kudu/security/test/mini_kdc.h b/be/src/kudu/security/test/mini_kdc.h
index e282cc4..94990a2 100644
--- a/be/src/kudu/security/test/mini_kdc.h
+++ b/be/src/kudu/security/test/mini_kdc.h
@@ -92,6 +92,14 @@ class MiniKdc {
   // will be reset and a new keytab will be generated.
   Status CreateServiceKeytab(const std::string& spn, std::string* path);
 
+  // Randomize the key for the given SPN. This invalidates any previously-produced
+  // keytabs.
+  Status RandomizePrincipalKey(const std::string& spn);
+
+  // Creates a keytab for an existing principal.
+  // 'spn' is the desired service principal name (e.g. "kudu/foo.example.com").
+  Status CreateKeytabForExistingPrincipal(const std::string& spn);
+
   // Kinit a user to the mini KDC.
   Status Kinit(const std::string& username) WARN_UNUSED_RESULT;
 
diff --git a/be/src/kudu/security/tls_socket-test.cc b/be/src/kudu/security/tls_socket-test.cc
index 001a206..b88cdf4 100644
--- a/be/src/kudu/security/tls_socket-test.cc
+++ b/be/src/kudu/security/tls_socket-test.cc
@@ -58,7 +58,7 @@ using std::vector;
 namespace kudu {
 namespace security {
 
-const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
 
 // Size is big enough to not fit into output socket buffer of default size
 // (controlled by setsockopt() with SO_SNDBUF).
@@ -155,18 +155,18 @@ class EchoServer {
         CHECK_OK(sock->SetRecvTimeout(kTimeout));
         unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
         // An "echo" loop for kEchoChunkSize byte buffers.
-        while (!stop_) {
+        while (!is_stopped_) {
           size_t n;
           Status s = sock->BlockingRecv(buf.get(), kEchoChunkSize, &n, MonoTime::Now() + kTimeout);
           if (!s.ok()) {
-            CHECK(stop_) << "unexpected error reading: " << s.ToString();
+            CHECK(is_stopped_) << "unexpected error reading: " << s.ToString();
           }
 
           LOG(INFO) << "server echoing " << n << " bytes";
           size_t written;
           s = sock->BlockingWrite(buf.get(), n, &written, MonoTime::Now() + kTimeout);
           if (!s.ok()) {
-            CHECK(stop_) << "unexpected error writing: " << s.ToString();
+            CHECK(is_stopped_) << "unexpected error writing: " << s.ToString();
           }
           if (slow_read_) {
             SleepFor(MonoDelta::FromMilliseconds(10));
@@ -185,12 +185,12 @@ class EchoServer {
     return listen_addr_;
   }
 
-  bool stopped() const {
-    return stop_;
+  bool is_stopped() const {
+    return is_stopped_;
   }
 
   void Stop() {
-    stop_ = true;
+    is_stopped_ = true;
   }
   void Join() {
     thread_.join();
@@ -208,38 +208,49 @@ class EchoServer {
   thread thread_;
   pthread_t pthread_;
   CountDownLatch pthread_sync_;
-  std::atomic<bool> stop_ { false };
+  std::atomic<bool> is_stopped_ { false };
 
   bool slow_read_ = false;
 };
 
 void handler(int /* signal */) {}
 
+// Test that errors returned when reading from a TLS socket are reasonable and
+// contain the address of the remote.
 TEST_F(TlsSocketTest, TestRecvFailure) {
-    EchoServer server;
-    server.Start();
-    unique_ptr<Socket> client_sock;
-    NO_FATALS(ConnectClient(server.listen_addr(), &client_sock));
-    unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
-
-    SleepFor(MonoDelta::FromMilliseconds(100));
-    server.Stop();
-
-    size_t nwritten;
-    ASSERT_OK(client_sock->BlockingWrite(buf.get(), kEchoChunkSize, &nwritten,
-        MonoTime::Now() + kTimeout));
-    size_t nread;
+  EchoServer server;
+  server.Start();
+  unique_ptr<Socket> client_sock;
+  NO_FATALS(ConnectClient(server.listen_addr(), &client_sock));
+  unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
 
-    ASSERT_OK(client_sock->BlockingRecv(buf.get(), kEchoChunkSize, &nread,
-        MonoTime::Now() + kTimeout));
+  // The client writes to the server. This blocks on the server receiving,
+  // so once this completes the server is in its "echo loop".
+  size_t nwritten_unused;
+  ASSERT_OK(client_sock->BlockingWrite(buf.get(),
+                                       kEchoChunkSize,
+                                       &nwritten_unused,
+                                       MonoTime::Now() + kTimeout));
 
-    Status s = client_sock->BlockingRecv(buf.get(), kEchoChunkSize, &nread,
-        MonoTime::Now() + kTimeout);
+  // Stop the server. The client can still Recv once from the server because
+  // the server is trying to echo what was sent by the client.
+  server.Stop();
 
-    ASSERT_TRUE(!s.ok());
-    ASSERT_TRUE(s.IsNetworkError());
-    ASSERT_STR_MATCHES(s.message().ToString(), "BlockingRecv error: failed to read from "
-                                               "TLS socket \\(remote: 127.0.0.1:[0-9]+\\): ");
+  // The first Recv succeeds. This unblocks the server and causes it to exit.
+  size_t nread_unused;
+  ASSERT_OK(client_sock->BlockingRecv(buf.get(),
+                                      kEchoChunkSize,
+                                      &nread_unused,
+                                      MonoTime::Now() + kTimeout));
+
+  // The server has closed the connection, so the second Recv will fail.
+  Status s = client_sock->BlockingRecv(buf.get(),
+                                       kEchoChunkSize,
+                                       &nread_unused,
+                                       MonoTime::Now() + kTimeout);
+  ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+  ASSERT_STR_MATCHES(s.message().ToString(), "BlockingRecv error: failed to read from "
+                                             "TLS socket \\(remote: 127.0.0.1:[0-9]+\\): ");
 }
 
 // Test for failures to handle EINTR during TLS connection
@@ -257,7 +268,7 @@ TEST_F(TlsSocketTest, TestTlsSocketInterrupted) {
 
   // Start a thread to send signals to the server thread.
   thread killer([&]() {
-      while (!server.stopped()) {
+      while (!server.is_stopped()) {
         PCHECK(pthread_kill(server.pthread(), SIGUSR2) == 0);
         SleepFor(MonoDelta::FromMicroseconds(rand() % 10));
       }
diff --git a/be/src/kudu/security/tls_socket.cc b/be/src/kudu/security/tls_socket.cc
index 355f04b..a586315 100644
--- a/be/src/kudu/security/tls_socket.cc
+++ b/be/src/kudu/security/tls_socket.cc
@@ -33,6 +33,9 @@
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
 
+using std::string;
+using strings::Substitute;
+
 namespace kudu {
 namespace security {
 
@@ -114,9 +117,10 @@ Status TlsSocket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) {
   int save_errno = errno;
   if (bytes_read <= 0) {
     Sockaddr remote;
-    Socket::GetPeerAddress(&remote);
-    std::string kErrString = strings::Substitute("failed to read from TLS socket (remote: $0)",
-                                                 remote.ToString());
+    Status s = GetPeerAddress(&remote);
+    const string remote_str = s.ok() ? remote.ToString() : "unknown";
+    string kErrString = Substitute("failed to read from TLS socket (remote: $0)",
+                                   remote_str);
 
     if (bytes_read == 0 && SSL_get_shutdown(ssl_.get()) == SSL_RECEIVED_SHUTDOWN) {
       return Status::NetworkError(kErrString, ErrnoToString(ESHUTDOWN), ESHUTDOWN);
@@ -124,7 +128,7 @@ Status TlsSocket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) {
     auto error_code = SSL_get_error(ssl_.get(), bytes_read);
     if (error_code == SSL_ERROR_WANT_READ) {
       if (save_errno != 0) {
-        return Status::NetworkError("SSL_read error from " + remote.ToString(),
+        return Status::NetworkError("SSL_read error from " + remote_str,
                                     ErrnoToString(save_errno), save_errno);
       }
       // Nothing available to read yet.
diff --git a/be/src/kudu/security/token-test.cc b/be/src/kudu/security/token-test.cc
index 7172a51..a4d7804 100644
--- a/be/src/kudu/security/token-test.cc
+++ b/be/src/kudu/security/token-test.cc
@@ -18,14 +18,16 @@
 #include <cstdint>
 #include <deque>
 #include <memory>
+#include <ostream>
 #include <string>
+#include <thread>
 #include <utility>
 #include <vector>
 
-#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/walltime.h"
 #include "kudu/security/crypto.h"
 #include "kudu/security/openssl_util.h"
@@ -33,23 +35,68 @@
 #include "kudu/security/token_signer.h"
 #include "kudu/security/token_signing_key.h"
 #include "kudu/security/token_verifier.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
-DECLARE_int32(tsk_num_rsa_bits);
-
-using std::string;
+using kudu::pb_util::SecureDebugString;
 using std::make_shared;
+using std::string;
+using std::thread;
 using std::unique_ptr;
 using std::vector;
+using strings::Substitute;
 
 namespace kudu {
 namespace security {
 
 namespace {
 
+// Dummy variables to use when their values don't matter much.
+const int kNumBits = 512;
+const int64_t kTokenValiditySeconds = 10;
+const char kUser[] = "user";
+
+// Repeatedly signs tokens and attempts to rotate TSKs until the active TSK's
+// sequence number passes `seq_num`, returning the last token signed by the TSK
+// at `seq_num`. This token is roughly the last possible token signed in the
+// TSK's activity interval.
+// The TokenGenerator 'generate_token' is a lambda that fills in a
+// SignedTokenPB and returns a Status.
+template <class TokenGenerator>
+Status SignUntilRotatePast(TokenSigner* signer, TokenGenerator generate_token,
+                           const string& token_type, int64_t seq_num,
+                           SignedTokenPB* last_signed_by_tsk) {
+  SignedTokenPB last_signed;
+  RETURN_NOT_OK_PREPEND(generate_token(&last_signed),
+      Substitute("Failed to generate first $0 token", token_type));
+  DCHECK_EQ(seq_num, last_signed.signing_key_seq_num())
+      << Substitute("Unexpected starting seq_num for $0 token", token_type);
+
+  auto cur_seq_num = seq_num;
+  while (cur_seq_num == seq_num) {
+    SleepFor(MonoDelta::FromMilliseconds(50));
+    KLOG_EVERY_N_SECS(INFO, 1) <<
+        Substitute("Generating $0 token for activity interval $1", token_type, seq_num);
+    RETURN_NOT_OK_PREPEND(signer->TryRotateKey(), "Failed to attempt to rotate key");
+    SignedTokenPB signed_token;
+    RETURN_NOT_OK_PREPEND(generate_token(&signed_token),
+        Substitute("Failed to generate $0 token", token_type));
+    // We want to return the last token signed by the `seq_num` TSK, so only
+    // update it when appropriate.
+    cur_seq_num = signed_token.signing_key_seq_num();
+    if (cur_seq_num == seq_num) {
+      last_signed = std::move(signed_token);
+    }
+  }
+  *last_signed_by_tsk = std::move(last_signed);
+  return Status::OK();
+}
+
 SignedTokenPB MakeUnsignedToken(int64_t expiration) {
   SignedTokenPB ret;
   TokenPB token;
@@ -70,7 +117,7 @@ SignedTokenPB MakeIncompatibleToken() {
 // Generate public key as a string in DER format for tests.
 Status GeneratePublicKeyStrDer(string* ret) {
   PrivateKey private_key;
-  RETURN_NOT_OK(GeneratePrivateKey(512, &private_key));
+  RETURN_NOT_OK(GeneratePrivateKey(kNumBits, &private_key));
   PublicKey public_key;
   RETURN_NOT_OK(private_key.GetPublicKey(&public_key));
   string public_key_str_der;
@@ -85,7 +132,7 @@ Status GenerateTokenSigningKey(int64_t seq_num,
                                unique_ptr<TokenSigningPrivateKey>* tsk) {
   {
     unique_ptr<PrivateKey> private_key(new PrivateKey);
-    RETURN_NOT_OK(GeneratePrivateKey(512, private_key.get()));
+    RETURN_NOT_OK(GeneratePrivateKey(kNumBits, private_key.get()));
     tsk->reset(new TokenSigningPrivateKey(
         seq_num, expire_time_seconds, std::move(private_key)));
   }
@@ -99,14 +146,14 @@ void CheckAndAddNextKey(int iter_num,
   ASSERT_NE(nullptr, key_seq_num);
   int64_t seq_num;
   {
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer->CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     seq_num = key->key_seq_num();
   }
 
   for (int i = 0; i < iter_num; ++i) {
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer->CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     ASSERT_EQ(seq_num, key->key_seq_num());
@@ -124,7 +171,7 @@ class TokenTest : public KuduTest {
 };
 
 TEST_F(TokenTest, TestInit) {
-  TokenSigner signer(10, 10);
+  TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 10);
   const TokenVerifier& verifier(signer.verifier());
 
   SignedTokenPB token = MakeUnsignedToken(WallTime_Now());
@@ -133,7 +180,7 @@ TEST_F(TokenTest, TestInit) {
 
   static const int64_t kKeySeqNum = 100;
   PrivateKey private_key;
-  ASSERT_OK(GeneratePrivateKey(512, &private_key));
+  ASSERT_OK(GeneratePrivateKey(kNumBits, &private_key));
   string private_key_str_der;
   ASSERT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER));
   TokenSigningPrivateKeyPB pb;
@@ -163,9 +210,10 @@ TEST_F(TokenTest, TestInit) {
 TEST_F(TokenTest, TestTokenSignerNonSparseSequenceNumbers) {
   static const int kIterNum = 3;
   static const int64_t kAuthnTokenValiditySeconds = 1;
+  static const int64_t kAuthzTokenValiditySeconds = 1;
   static const int64_t kKeyRotationSeconds = 1;
 
-  TokenSigner signer(kAuthnTokenValiditySeconds, kKeyRotationSeconds);
+  TokenSigner signer(kAuthnTokenValiditySeconds, kAuthzTokenValiditySeconds, kKeyRotationSeconds);
 
   int64_t seq_num_first_key;
   NO_FATALS(CheckAndAddNextKey(kIterNum, &signer, &seq_num_first_key));
@@ -192,12 +240,10 @@ TEST_F(TokenTest, TestTokenSignerNonSparseSequenceNumbers) {
 // import should be greater than any sequence number the TokenSigner has seen
 // during the import.
 TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
-  static const int64_t kAuthnTokenValiditySeconds = 8;
   static const int64_t kKeyRotationSeconds = 8;
-  static const int64_t kKeyValiditySeconds =
-      kAuthnTokenValiditySeconds + 2 * kKeyRotationSeconds;
 
-  TokenSigner signer(kAuthnTokenValiditySeconds, kKeyRotationSeconds);
+  TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, kKeyRotationSeconds);
+  const int64_t key_validity_seconds = signer.key_validity_seconds_;
   const TokenVerifier& verifier(signer.verifier());
 
   static const int64_t kExpiredKeySeqNum = 100;
@@ -206,7 +252,7 @@ TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
     // First, try to import already expired key to check that internal key
     // sequence number advances correspondingly.
     PrivateKey private_key;
-    ASSERT_OK(GeneratePrivateKey(512, &private_key));
+    ASSERT_OK(GeneratePrivateKey(kNumBits, &private_key));
     string private_key_str_der;
     ASSERT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER));
     TokenSigningPrivateKeyPB pb;
@@ -215,9 +261,7 @@ TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
     pb.set_expire_unix_epoch_seconds(WallTime_Now() - 1);
 
     ASSERT_OK(signer.ImportKeys({pb}));
-  }
 
-  {
     // Check the result of importing keys: there should be no keys because
     // the only one we tried to import was already expired.
     vector<TokenSigningPublicKeyPB> public_keys(verifier.ExportKeys());
@@ -228,7 +272,7 @@ TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
     // Now import valid (not yet expired) key, but with sequence number less
     // than of the expired key.
     PrivateKey private_key;
-    ASSERT_OK(GeneratePrivateKey(512, &private_key));
+    ASSERT_OK(GeneratePrivateKey(kNumBits, &private_key));
     string private_key_str_der;
     ASSERT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER));
     TokenSigningPrivateKeyPB pb;
@@ -237,19 +281,17 @@ TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
     // Set the TSK's expiration time: make the key valid but past its activity
     // interval.
     pb.set_expire_unix_epoch_seconds(
-        WallTime_Now() + (kKeyValiditySeconds - 2 * kKeyRotationSeconds - 1));
+        WallTime_Now() + (key_validity_seconds - 2 * kKeyRotationSeconds - 1));
 
     ASSERT_OK(signer.ImportKeys({pb}));
-  }
 
-  {
-    // Check the result of importing keys.
+    // Check the result of importing keys. The lower sequence number is
+    // accepted, even though we previously imported a key with a higher
+    // sequence number that was expired.
     vector<TokenSigningPublicKeyPB> public_keys(verifier.ExportKeys());
     ASSERT_EQ(1, public_keys.size());
     ASSERT_EQ(kKeySeqNum, public_keys[0].key_seq_num());
-  }
 
-  {
     // The newly imported key should be used to sign tokens.
     SignedTokenPB token = MakeUnsignedToken(WallTime_Now());
     ASSERT_OK(signer.SignToken(&token));
@@ -259,7 +301,7 @@ TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
   }
 
   {
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     ASSERT_EQ(kExpiredKeySeqNum + 1, key->key_seq_num());
@@ -289,15 +331,19 @@ TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
 // less or equal to the sequence number of the most 'recent' key.
 TEST_F(TokenTest, TestAddKeyConstraints) {
   {
-    TokenSigner signer(1, 1);
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    // If a signer has not created a TSK yet, it will create a key, and will
+    // happily accept the generated key.
+    TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 1);
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     ASSERT_OK(signer.AddKey(std::move(key)));
   }
   {
-    TokenSigner signer(1, 1);
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    // If the key sequence number added to the signer isn't monotonically
+    // increasing, the signer will complain.
+    TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 1);
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     const int64_t key_seq_num = key->key_seq_num();
@@ -308,10 +354,11 @@ TEST_F(TokenTest, TestAddKeyConstraints) {
                         ": invalid key sequence number, should be at least ");
   }
   {
-    TokenSigner signer(1, 1);
+    // Test importing expired keys. The signer should be OK with it.
+    TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 1);
     static const int64_t kKeySeqNum = 100;
     PrivateKey private_key;
-    ASSERT_OK(GeneratePrivateKey(512, &private_key));
+    ASSERT_OK(GeneratePrivateKey(kNumBits, &private_key));
     string private_key_str_der;
     ASSERT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER));
     TokenSigningPrivateKeyPB pb;
@@ -321,7 +368,9 @@ TEST_F(TokenTest, TestAddKeyConstraints) {
     pb.set_expire_unix_epoch_seconds(WallTime_Now() - 1);
     ASSERT_OK(signer.ImportKeys({pb}));
 
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    // Generated keys thereafter are expected to have higher sequence numbers
+    // than the imported expired keys.
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     const int64_t key_seq_num = key->key_seq_num();
@@ -334,31 +383,62 @@ TEST_F(TokenTest, TestAddKeyConstraints) {
   }
 }
 
-TEST_F(TokenTest, TestGenerateAuthTokenNoUserName) {
-  TokenSigner signer(10, 10);
+TEST_F(TokenTest, TestGenerateAuthnTokenNoUserName) {
+  TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 10);
   SignedTokenPB signed_token_pb;
   const Status& s = signer.GenerateAuthnToken("", &signed_token_pb);
   EXPECT_TRUE(s.IsInvalidArgument()) << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "no username provided for authn token");
 }
 
+TEST_F(TokenTest, TestGenerateAuthzToken) {
+  // We cannot generate tokens with no username associated with it.
+  auto verifier(make_shared<TokenVerifier>());
+  TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 10, verifier);
+  TablePrivilegePB table_privilege;
+  SignedTokenPB signed_token_pb;
+  Status s = signer.GenerateAuthzToken("", table_privilege, &signed_token_pb);
+  EXPECT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "no username provided for authz token");
+
+  // Generated tokens will have the specified privileges.
+  const string kAuthorized = "authzed";
+  unique_ptr<TokenSigningPrivateKey> key;
+  ASSERT_OK(signer.CheckNeedKey(&key));
+  ASSERT_NE(nullptr, key.get());
+  ASSERT_OK(signer.AddKey(std::move(key)));
+  ASSERT_OK(signer.GenerateAuthzToken(kAuthorized,
+                                      table_privilege,
+                                      &signed_token_pb));
+  ASSERT_TRUE(signed_token_pb.has_token_data());
+  TokenPB token_pb;
+  ASSERT_EQ(VerificationResult::VALID,
+            verifier->VerifyTokenSignature(signed_token_pb, &token_pb));
+  ASSERT_TRUE(token_pb.has_authz());
+  ASSERT_EQ(kAuthorized, token_pb.authz().username());
+  ASSERT_TRUE(token_pb.authz().has_table_privilege());
+  ASSERT_EQ(SecureDebugString(table_privilege),
+            SecureDebugString(token_pb.authz().table_privilege()));
+}
+
 TEST_F(TokenTest, TestIsCurrentKeyValid) {
-  static const int64_t kAuthnTokenValiditySeconds = 1;
+  // This test sleeps for a key validity period, so set it up to be short.
+  static const int64_t kShortTokenValiditySeconds = 1;
   static const int64_t kKeyRotationSeconds = 1;
-  static const int64_t kKeyValiditySeconds =
-      kAuthnTokenValiditySeconds + 2 * kKeyRotationSeconds;
 
-  TokenSigner signer(kAuthnTokenValiditySeconds, kKeyRotationSeconds);
+  TokenSigner signer(kShortTokenValiditySeconds, kShortTokenValiditySeconds, kKeyRotationSeconds);
+  static const int64_t key_validity_seconds = signer.key_validity_seconds_;
+
   EXPECT_FALSE(signer.IsCurrentKeyValid());
   {
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     // No keys are available yet, so should be able to add.
     ASSERT_NE(nullptr, key.get());
     ASSERT_OK(signer.AddKey(std::move(key)));
   }
   EXPECT_TRUE(signer.IsCurrentKeyValid());
-  SleepFor(MonoDelta::FromSeconds(kKeyValiditySeconds));
+  SleepFor(MonoDelta::FromSeconds(key_validity_seconds));
   // The key should expire after its validity interval.
   EXPECT_FALSE(signer.IsCurrentKeyValid());
 
@@ -369,8 +449,8 @@ TEST_F(TokenTest, TestIsCurrentKeyValid) {
 
 TEST_F(TokenTest, TestTokenSignerAddKeys) {
   {
-    TokenSigner signer(10, 10);
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 10);
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     // No keys are available yet, so should be able to add.
     ASSERT_NE(nullptr, key.get());
@@ -384,8 +464,8 @@ TEST_F(TokenTest, TestTokenSignerAddKeys) {
   {
     // Special configuration for TokenSigner: rotation interval is zero,
     // so should be able to add two keys right away.
-    TokenSigner signer(10, 0);
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 0);
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     // No keys are available yet, so should be able to add.
     ASSERT_NE(nullptr, key.get());
@@ -406,8 +486,8 @@ TEST_F(TokenTest, TestTokenSignerAddKeys) {
     // It should not need next key right away, but should need next key after
     // the rotation interval.
     static const int64_t kKeyRotationIntervalSeconds = 8;
-    TokenSigner signer(10, kKeyRotationIntervalSeconds);
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, kKeyRotationIntervalSeconds);
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     // No keys are available yet, so should be able to add.
     ASSERT_NE(nullptr, key.get());
@@ -433,7 +513,7 @@ TEST_F(TokenTest, TestTokenSignerAddKeys) {
 // Test how key rotation works.
 TEST_F(TokenTest, TestTokenSignerSignVerifyExport) {
   // Key rotation interval 0 allows adding 2 keys in a row with no delay.
-  TokenSigner signer(10, 0);
+  TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 0);
   const TokenVerifier& verifier(signer.verifier());
 
   // Should start off with no signing keys.
@@ -447,7 +527,7 @@ TEST_F(TokenTest, TestTokenSignerSignVerifyExport) {
   // Generate and set a new key.
   int64_t signing_key_seq_num;
   {
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     signing_key_seq_num = key->key_seq_num();
@@ -470,7 +550,7 @@ TEST_F(TokenTest, TestTokenSignerSignVerifyExport) {
   // Set next key and check that we return the right keys.
   int64_t next_signing_key_seq_num;
   {
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     next_signing_key_seq_num = key->key_seq_num();
@@ -482,7 +562,7 @@ TEST_F(TokenTest, TestTokenSignerSignVerifyExport) {
   ASSERT_EQ(0, verifier.ExportKeys(next_signing_key_seq_num).size());
 
   // The first key should be used for signing: the next one is saved
-  // for the next round.
+  // for the next rotation.
   {
     SignedTokenPB token = MakeUnsignedToken(WallTime_Now());
     ASSERT_OK(signer.SignToken(&token));
@@ -498,11 +578,11 @@ TEST_F(TokenTest, TestExportKeys) {
   // and have an appropriate expiration.
   const int64_t key_exp_seconds = 30;
   const int64_t key_rotation_seconds = 10;
-  TokenSigner signer(key_exp_seconds - 2 * key_rotation_seconds,
+  TokenSigner signer(key_exp_seconds - 2 * key_rotation_seconds, 0,
                      key_rotation_seconds);
   int64_t key_seq_num;
   {
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     key_seq_num = key->key_seq_num();
@@ -523,9 +603,9 @@ TEST_F(TokenTest, TestExportKeys) {
 // Test that the TokenVerifier can import keys exported by the TokenSigner
 // and then verify tokens signed by it.
 TEST_F(TokenTest, TestEndToEnd_Valid) {
-  TokenSigner signer(10, 10);
+  TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 10);
   {
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     ASSERT_OK(signer.AddKey(std::move(key)));
@@ -546,9 +626,9 @@ TEST_F(TokenTest, TestEndToEnd_Valid) {
 // See VerificationResult.
 TEST_F(TokenTest, TestEndToEnd_InvalidCases) {
   // Key rotation interval 0 allows adding 2 keys in a row with no delay.
-  TokenSigner signer(10, 0);
+  TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 0);
   {
-    std::unique_ptr<TokenSigningPrivateKey> key;
+    unique_ptr<TokenSigningPrivateKey> key;
     ASSERT_OK(signer.CheckNeedKey(&key));
     ASSERT_NE(nullptr, key.get());
     ASSERT_OK(signer.AddKey(std::move(key)));
@@ -599,7 +679,7 @@ TEST_F(TokenTest, TestEndToEnd_InvalidCases) {
   // verify, we expect the verifier to complain the key is unknown.
   {
     {
-      std::unique_ptr<TokenSigningPrivateKey> key;
+      unique_ptr<TokenSigningPrivateKey> key;
       ASSERT_OK(signer.CheckNeedKey(&key));
       ASSERT_NE(nullptr, key.get());
       ASSERT_OK(signer.AddKey(std::move(key)));
@@ -673,5 +753,131 @@ TEST_F(TokenTest, TestTokenVerifierImportKeys) {
   }
 }
 
+// Test using different token validity intervals.
+TEST_F(TokenTest, TestVaryingTokenValidityIntervals) {
+  constexpr int kShortValiditySeconds = 2;
+  const int kLongValiditySeconds = kShortValiditySeconds * 3;
+  auto verifier(make_shared<TokenVerifier>());
+  TokenSigner signer(kLongValiditySeconds, kShortValiditySeconds, 10, verifier);
+  unique_ptr<TokenSigningPrivateKey> key;
+  ASSERT_OK(signer.CheckNeedKey(&key));
+  ASSERT_NE(nullptr, key.get());
+  ASSERT_OK(signer.AddKey(std::move(key)));
+
+  const TablePrivilegePB table_privilege;
+  SignedTokenPB signed_authn;
+  SignedTokenPB signed_authz;
+  ASSERT_OK(signer.GenerateAuthnToken(kUser, &signed_authn));
+  ASSERT_OK(signer.GenerateAuthzToken(kUser, table_privilege, &signed_authz));
+  TokenPB authn_token;
+  TokenPB authz_token;
+  ASSERT_EQ(VerificationResult::VALID, verifier->VerifyTokenSignature(signed_authn, &authn_token));
+  ASSERT_EQ(VerificationResult::VALID, verifier->VerifyTokenSignature(signed_authz, &authz_token));
+
+  // Wait for the authz validity interval to pass and verify its expiration.
+  SleepFor(MonoDelta::FromSeconds(1 + kShortValiditySeconds));
+  EXPECT_EQ(VerificationResult::VALID, verifier->VerifyTokenSignature(signed_authn, &authn_token));
+  EXPECT_EQ(VerificationResult::EXPIRED_TOKEN,
+            verifier->VerifyTokenSignature(signed_authz, &authz_token));
+
+  // Wait for the authn validity interval to pass and verify its expiration.
+  SleepFor(MonoDelta::FromSeconds(kLongValiditySeconds - kShortValiditySeconds));
+  EXPECT_EQ(VerificationResult::EXPIRED_TOKEN,
+            verifier->VerifyTokenSignature(signed_authn, &authn_token));
+  EXPECT_EQ(VerificationResult::EXPIRED_TOKEN,
+            verifier->VerifyTokenSignature(signed_authz, &authz_token));
+}
+
+// Test to check the invariant that all tokens signed within a TSK's activity
+// interval must be expired by the end of the TSK's validity interval.
+TEST_F(TokenTest, TestKeyValidity) {
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+    return;
+  }
+  // Note: this test's runtime is roughly the length of a key-validity
+  // interval, which is determined by the token validity intervals and the key
+  // rotation interval.
+  const int kShortValiditySeconds = 2;
+  const int kLongValiditySeconds = 6;
+  const int kKeyRotationSeconds = 5;
+  auto verifier(make_shared<TokenVerifier>());
+  TokenSigner signer(kLongValiditySeconds, kShortValiditySeconds, kKeyRotationSeconds, verifier);
+  unique_ptr<TokenSigningPrivateKey> key;
+  ASSERT_OK(signer.CheckNeedKey(&key));
+  ASSERT_NE(nullptr, key.get());
+  ASSERT_OK(signer.AddKey(std::move(key)));
+
+  // First, start a countdown for the first TSK's validity interval. Any token
+  // signed during the first TSK's activity interval must be expired once this
+  // latch counts down.
+  vector<thread> threads;
+  CountDownLatch first_tsk_validity_latch(1);
+  const double key_validity_seconds = signer.key_validity_seconds_;
+  threads.emplace_back([&first_tsk_validity_latch, key_validity_seconds] {
+    SleepFor(MonoDelta::FromSeconds(key_validity_seconds));
+    LOG(INFO) << Substitute("First TSK's validity interval of $0 secs has finished!",
+                            key_validity_seconds);
+    first_tsk_validity_latch.CountDown();
+  });
+
+  // Set up a second TSK so our threads can rotate TSKs when the time comes.
+  while (true) {
+    KLOG_EVERY_N_SECS(INFO, 1) << "Waiting for a second key...";
+    unique_ptr<TokenSigningPrivateKey> tsk;
+    ASSERT_OK(signer.CheckNeedKey(&tsk));
+    if (tsk) {
+      LOG(INFO) << "Added second key!";
+      ASSERT_OK(signer.AddKey(std::move(tsk)));
+      break;
+    }
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+
+  // Utility lambda to check that the token is expired.
+  const auto verify_expired = [&verifier] (const SignedTokenPB& signed_token,
+                                           const string& token_type) {
+    TokenPB token_pb;
+    const auto result = verifier->VerifyTokenSignature(signed_token, &token_pb);
+    const auto expire_secs = token_pb.expire_unix_epoch_seconds();
+    ASSERT_EQ(VerificationResult::EXPIRED_TOKEN, result)
+        << Substitute("validation result '$0': $1 token expires at $2, now $3",
+                      VerificationResultToString(result), token_type,
+                      expire_secs, WallTime_Now());
+  };
+
+  // Create a thread that repeatedly signs new authn tokens, returning the
+  // final one signed by TSK with seq_num 0. At the end of the key validity
+  // period, this token will not be valid.
+  vector<SignedTokenPB> tsks(2);
+  vector<Status> results(2);
+  threads.emplace_back([&] {
+    results[0] = SignUntilRotatePast(&signer,
+        [&] (SignedTokenPB* signed_token) {
+          return signer.GenerateAuthnToken(kUser, signed_token);
+        },
+        "authn", 0, &tsks[0]);
+    first_tsk_validity_latch.Wait();
+  });
+
+  // Do the same for authz tokens.
+  threads.emplace_back([&] {
+    results[1] = SignUntilRotatePast(&signer,
+        [&] (SignedTokenPB* signed_token) {
+          return signer.GenerateAuthzToken(kUser, TablePrivilegePB(), signed_token);
+        },
+        "authz", 0, &tsks[1]);
+    first_tsk_validity_latch.Wait();
+  });
+
+  for (auto& t : threads) {
+    t.join();
+  }
+  EXPECT_OK(results[0]);
+  EXPECT_OK(results[1]);
+  NO_FATALS(verify_expired(tsks[0], "authn"));
+  NO_FATALS(verify_expired(tsks[1], "authz"));
+}
+
 } // namespace security
 } // namespace kudu
diff --git a/be/src/kudu/security/token.proto b/be/src/kudu/security/token.proto
index e27ccdb..ea63ce2 100644
--- a/be/src/kudu/security/token.proto
+++ b/be/src/kudu/security/token.proto
@@ -21,11 +21,42 @@ option java_package = "org.apache.kudu.security";
 
 import "kudu/util/pb_util.proto";
 
+message ColumnPrivilegePB {
+  // If set, the user has privileges to select and apply predicates on the
+  // column during scans.
+  optional bool scan_privilege = 1;
+};
+
+message TablePrivilegePB {
+  // The ID of the table to which the privileges apply.
+  optional string table_id = 1;
+
+  // If set, the user is authorized to select and apply predicates to all
+  // columns when scanning the table, and `column_privileges` is ignored. If
+  // unset, the user may only scan and apply predicates to columns with the
+  // privileges specified in `column_privileges`.
+  optional bool scan_privilege = 2;
+
+  // If set, the user is authorized to insert rows into the table.
+  optional bool insert_privilege= 3;
+
+  // If set, the user is authorized to update rows in the table.
+  optional bool update_privilege = 4;
+
+  // If set, the user is authorized to delete rows in the table.
+  optional bool delete_privilege = 5;
+
+  // Per-column privileges, indexed by column ID.
+  map<int32, ColumnPrivilegePB> column_privileges = 6;
+};
+
 message AuthnTokenPB {
   optional string username = 1;
 };
 
 message AuthzTokenPB {
+  optional string username = 1;
+  optional TablePrivilegePB table_privilege = 2;
 };
 
 message TokenPB {
diff --git a/be/src/kudu/security/token_signer.cc b/be/src/kudu/security/token_signer.cc
index 08c84be..aaf1be8 100644
--- a/be/src/kudu/security/token_signer.cc
+++ b/be/src/kudu/security/token_signer.cc
@@ -56,17 +56,21 @@ namespace kudu {
 namespace security {
 
 TokenSigner::TokenSigner(int64_t authn_token_validity_seconds,
+                         int64_t authz_token_validity_seconds,
                          int64_t key_rotation_seconds,
                          shared_ptr<TokenVerifier> verifier)
     : verifier_(verifier ? std::move(verifier)
                          : std::make_shared<TokenVerifier>()),
       authn_token_validity_seconds_(authn_token_validity_seconds),
+      authz_token_validity_seconds_(authz_token_validity_seconds),
       key_rotation_seconds_(key_rotation_seconds),
       // The TSK propagation interval is equal to the rotation interval.
-      key_validity_seconds_(2 * key_rotation_seconds_ + authn_token_validity_seconds_),
+      key_validity_seconds_(2 * key_rotation_seconds_ +
+          std::max(authn_token_validity_seconds_, authz_token_validity_seconds)),
       last_key_seq_num_(-1) {
   CHECK_GE(key_rotation_seconds_, 0);
   CHECK_GE(authn_token_validity_seconds_, 0);
+  CHECK_GE(authz_token_validity_seconds_, 0);
   CHECK(verifier_);
 }
 
@@ -134,6 +138,27 @@ Status TokenSigner::ImportKeys(const vector<TokenSigningPrivateKeyPB>& keys) {
   return Status::OK();
 }
 
+Status TokenSigner::GenerateAuthzToken(string username,
+                                       TablePrivilegePB privilege,
+                                       SignedTokenPB* signed_token) const {
+  if (username.empty()) {
+    return Status::InvalidArgument("no username provided for authz token");
+  }
+  TokenPB token;
+  token.set_expire_unix_epoch_seconds(WallTime_Now() + authz_token_validity_seconds_);
+  AuthzTokenPB* authz = token.mutable_authz();
+  authz->set_username(std::move(username));
+  *authz->mutable_table_privilege() = std::move(privilege);
+
+  SignedTokenPB ret;
+  if (!token.SerializeToString(ret.mutable_token_data())) {
+    return Status::RuntimeError("could not serialize authz token");
+  }
+  RETURN_NOT_OK(SignToken(&ret));
+  *signed_token = std::move(ret);
+  return Status::OK();
+}
+
 Status TokenSigner::GenerateAuthnToken(string username,
                                        SignedTokenPB* signed_token) const {
   if (username.empty()) {
@@ -162,7 +187,7 @@ Status TokenSigner::SignToken(SignedTokenPB* token) const {
     return Status::IllegalState("no token signing key");
   }
   const TokenSigningPrivateKey* key = tsk_deque_.front().get();
-  RETURN_NOT_OK_PREPEND(key->Sign(token), "could not sign authn token");
+  RETURN_NOT_OK_PREPEND(key->Sign(token), "could not sign token");
   return Status::OK();
 }
 
@@ -194,9 +219,10 @@ Status TokenSigner::CheckNeedKey(unique_ptr<TokenSigningPrivateKey>* tsk) const
     // It's enough to have just one active key and next key ready to be
     // activated when it's time to do so.  However, it does not mean the
     // process of key refreshment is about to stop once there are two keys
-    // in the queue: the TryRotate() method (which should be called periodically
-    // along with CheckNeedKey()/AddKey() pair) will eventually pop the
-    // current key out of the keys queue once the key enters its inactive phase.
+    // in the queue: the TryRotateKey() method (which should be called
+    // periodically along with CheckNeedKey()/AddKey() pair) will eventually
+    // pop the current key out of the keys queue once the key enters its
+    // inactive phase.
     tsk->reset();
     return Status::OK();
   }
diff --git a/be/src/kudu/security/token_signer.h b/be/src/kudu/security/token_signer.h
index df1e3eb..36f5a65 100644
--- a/be/src/kudu/security/token_signer.h
+++ b/be/src/kudu/security/token_signer.h
@@ -33,6 +33,7 @@ class Status;
 
 namespace security {
 class SignedTokenPB;
+class TablePrivilegePB;
 class TokenSigningPrivateKey;
 class TokenSigningPrivateKeyPB;
 class TokenVerifier;
@@ -74,16 +75,18 @@ class TokenVerifier;
 // '-' propagation interval
 //       The TSK is already created but not yet used to sign tokens. However,
 //       its public part is already being sent to the components which
-//       may be involved in validation of tokens signed by the key.
+//       may be involved in validation of tokens signed by the key. This is
+//       exactly 'tsk_propagation_interval' below.
 //
 // 'A' activity interval
 //       The TSK is used to sign tokens. It's assumed that the components which
-//       are involved in token verification have already received
-//       the corresponding public part of the TSK.
+//       are involved in token verification have already received the
+//       corresponding public part of the TSK.
 //
 // '=' inactivity interval
-//       The TSK is no longer used to sign tokens. However, it's still sent
-//       to other components which validate token signatures.
+//       The TSK is no longer used to sign tokens. However, its public part is
+//       still sent to other components and can be used to validate token
+//       signatures.
 //
 // Shortly after the TSK's expiration the token signing components stop
 // propagating its public part.
@@ -98,8 +101,8 @@ class TokenVerifier;
 //       implications, so it's worth considering rolling twice at startup.
 //
 // For example, consider the following configuration for token signing keys:
-//   validity period:      4 days
-//   rotation interval:    1 days
+//   key validity period:  4 days
+//   rotation interval:    1 day
 //   propagation interval: 1 day
 //
 // Day      1    2    3    4    5    6    7    8
@@ -111,24 +114,34 @@ class TokenVerifier;
 //                              ...............
 // authn token:                     <**********>
 //
-// 'A' indicates the 'Originator Usage Period' (a.k.a. 'Activity Interval'),
-// i.e. the period in which the key is being used to sign tokens.
+// 'A' indicates 'Activity Interval', i.e. the period during which the key is
+// being used to sign tokens. In cryptographic terms, this is the 'Originator
+// Usage Period'. Note that the Activity Interval is identically the rotation
+// interval -- a key is active for some amount of time, after which, we rotate.
 //
-// '<...>' indicates the 'Recipient Usage Period': the period in which
-// the verifier may get tokens signed by the TSK and should consider them
-// for verification. The start of the recipient usage period is not crucial
-// in that regard, but the end of that period is -- after the TSK is expired,
-// a verifier should consider tokens signed by that TSK invalid and stop
-// accepting them even if the token signature is correct and the expiration.
+// '<...>' in cryptographic terms, indicates the 'Recipient Usage Period': the
+// period during which the public part of a key is used to sign tokens, and
+// verifiers will consider tokens signed by the given key as valid. At the end
+// of this period, a verifier should consider tokens signed by the given TSK
+// invalid and stop accepting them, even if the token signature is correct. The
+// start of the period is not crucial to the validity of a token, so we don't
+// perform verification for it.
 //
 // '<***>' indicates the validity interval for an authn token.
 //
-// When configuring key rotation and authn token validity interval durations,
+// When configuring key rotation and token validity interval durations,
 // consider the following constraint:
 //
-//   max_token_validity < tsk_validity_period -
+// Eq 1.
+//   max_token_validity = tsk_validity_period -
 //       (tsk_propagation_interval + tsk_rotation_interval)
 //
+// Note how if the validity period for a token created at the end of the
+// Activity Interval were to extend any farther than the above
+// 'max_token_validity', it would be considered valid beyond the end of the
+// 'tsk_validity_period', which would break the constraint that the token only
+// be valid within the TSK's validity period.
+//
 // The idea is that the token validity interval should be contained in the
 // corresponding TSK's validity interval. If the TSK is already expired at the
 // time of token verification, the token is considered invalid and the
@@ -139,7 +152,7 @@ class TokenVerifier;
 //
 // * A TSK is issued at 00:00:00 on day 4.
 // * An authn token generated and signed by current/active TSK at 23:59:59 on
-//   day 6. That's at the very end of the TSK's activity interval.
+//   day 5. That's at the very end of the TSK's activity interval.
 // * From the diagram above it's clear that if the authn token validity
 //   interval were set to something longer than TSK inactivity interval
 //   (which is 2 days with for the specified parameters), an attempt to verify
@@ -179,23 +192,29 @@ class TokenVerifier;
 //
 class TokenSigner {
  public:
-  // The 'key_validity_seconds' and 'key_rotation_seconds' parameters define
-  // the schedule of TSK rotation. See the class comment above for details.
+  // The token validity and 'key_rotation_seconds' parameters define the
+  // schedule of TSK rotation. See the class comment above for details.
   //
   // Any newly imported or generated keys are automatically imported into the
   // passed 'verifier'. If no verifier passed as a parameter, TokenSigner
   // creates one on its own. In either case, it's possible to access
   // the embedded TokenVerifier instance using the verifier() accessor.
   //
-  // The 'authn_token_validity_seconds' parameter is used to specify validity
-  // interval for the generated authn tokens and with 'key_rotation_seconds'
-  // it defines validity interval of the newly generated TSK:
-  //   key_validity = 2 * key_rotation + authn_token_validity.
+  // The 'authn_token_validity_seconds' and 'authz_token_validity_seconds'
+  // parameters are used to specify validity intervals for the generated tokens
+  // and with 'key_rotation_seconds' it defines validity interval of the newly
+  // generated TSK:
+  //
+  // Eq 2.
+  //   key_validity =
+  //      2 * key_rotation + max(authn_token_validity, authz_token_validity)
   //
-  // That corresponds to the maximum possible token lifetime for the effective
-  // TSK validity and rotation intervals: see the class comment above for
-  // details.
+  // This selects the 'max_token_validity' in Eq 1 as the higher of the authn
+  // and authz token validity intervals, and based on that, calculates the
+  // effective TSK validity period based on the provided rotation interval.
+  // See the above class comment for details.
   TokenSigner(int64_t authn_token_validity_seconds,
+              int64_t authz_token_validity_seconds,
               int64_t key_rotation_seconds,
               std::shared_ptr<TokenVerifier> verifier = nullptr);
   ~TokenSigner();
@@ -266,6 +285,16 @@ class TokenSigner {
   // See the class comment above for more information about the intended usage.
   Status TryRotateKey(bool* has_rotated = nullptr) WARN_UNUSED_RESULT;
 
+  // Populates 'signed_token' with a signed authorization token with the given
+  // 'username' and table privilege. Returns an error if 'username' is empty,
+  // or if the created authn token could not be serialized for some reason.
+  Status GenerateAuthzToken(std::string username,
+                            TablePrivilegePB privilege,
+                            SignedTokenPB* signed_token) const WARN_UNUSED_RESULT;
+
+  // Populates 'signed_token' with a signed authentication token with the given
+  // 'username'. Returns an error if 'username' is empty, or if the created
+  // authz token could not be serialized for some reason.
   Status GenerateAuthnToken(std::string username,
                             SignedTokenPB* signed_token) const WARN_UNUSED_RESULT;
 
@@ -279,6 +308,9 @@ class TokenSigner {
 
  private:
   FRIEND_TEST(TokenTest, TestEndToEnd_InvalidCases);
+  FRIEND_TEST(TokenTest, TestIsCurrentKeyValid);
+  FRIEND_TEST(TokenTest, TestTokenSignerAddKeyAfterImport);
+  FRIEND_TEST(TokenTest, TestKeyValidity);
 
   static Status GenerateSigningKey(int64_t key_seq_num,
                                    int64_t key_expiration,
@@ -286,8 +318,9 @@ class TokenSigner {
 
   std::shared_ptr<TokenVerifier> verifier_;
 
-  // Validity interval for the generated authn tokens.
+  // Validity intervals for the generated tokens.
   const int64_t authn_token_validity_seconds_;
+  const int64_t authz_token_validity_seconds_;
 
   // TSK rotation interval: number of seconds between consecutive activations
   // of new token signing keys. Note that in current implementation it defines
diff --git a/be/src/kudu/security/token_verifier.cc b/be/src/kudu/security/token_verifier.cc
index 1ae20db..e3aef4c 100644
--- a/be/src/kudu/security/token_verifier.cc
+++ b/be/src/kudu/security/token_verifier.cc
@@ -123,7 +123,7 @@ VerificationResult TokenVerifier::VerifyTokenSignature(const SignedTokenPB& sign
 
   for (auto flag : token->incompatible_features()) {
     if (!TokenPB::Feature_IsValid(flag)) {
-      KLOG_EVERY_N_SECS(WARNING, 60) << "received authentication token with unknown feature; "
+      KLOG_EVERY_N_SECS(WARNING, 60) << "received token with unknown feature; "
                                         "server needs to be updated";
       return VerificationResult::INCOMPATIBLE_FEATURE;
     }
@@ -151,17 +151,17 @@ const char* VerificationResultToString(VerificationResult r) {
     case security::VerificationResult::VALID:
       return "valid";
     case security::VerificationResult::INVALID_TOKEN:
-      return "invalid authentication token";
+      return "invalid token";
     case security::VerificationResult::INVALID_SIGNATURE:
-      return "invalid authentication token signature";
+      return "invalid token signature";
     case security::VerificationResult::EXPIRED_TOKEN:
-      return "authentication token expired";
+      return "token expired";
     case security::VerificationResult::EXPIRED_SIGNING_KEY:
-      return "authentication token signing key expired";
+      return "token signing key expired";
     case security::VerificationResult::UNKNOWN_SIGNING_KEY:
-      return "authentication token signed with unknown key";
+      return "token signed with unknown key";
     case security::VerificationResult::INCOMPATIBLE_FEATURE:
-      return "authentication token uses incompatible feature";
+      return "token uses incompatible feature";
     default:
       LOG(FATAL) << "unexpected VerificationResult value: "
                  << static_cast<int>(r);
diff --git a/be/src/kudu/util/test_util.cc b/be/src/kudu/util/test_util.cc
index c960441..c514d0e 100644
--- a/be/src/kudu/util/test_util.cc
+++ b/be/src/kudu/util/test_util.cc
@@ -36,6 +36,7 @@
 #include <sys/param.h> // for MAXPATHLEN
 #endif
 
+#include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <gtest/gtest-spi.h>
@@ -43,6 +44,7 @@
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/strcat.h"
+#include "kudu/gutil/strings/stringpiece.h"
 #include "kudu/gutil/strings/strip.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
@@ -54,6 +56,7 @@
 #include "kudu/util/slice.h"
 #include "kudu/util/spinlock_profiling.h"
 #include "kudu/util/status.h"
+#include "kudu/util/string_case.h"
 #include "kudu/util/subprocess.h"
 
 DEFINE_string(test_leave_files, "on_failure",
@@ -62,6 +65,7 @@ DEFINE_string(test_leave_files, "on_failure",
 
 DEFINE_int32(test_random_seed, 0, "Random seed to use for randomized tests");
 
+using boost::optional;
 using std::string;
 using std::vector;
 using strings::Substitute;
@@ -378,7 +382,9 @@ int CountOpenFds(Env* env, const string& path_pattern) {
 }
 
 namespace {
-Status WaitForBind(pid_t pid, uint16_t* port, const char* kind, MonoDelta timeout) {
+Status WaitForBind(pid_t pid, uint16_t* port,
+                   const optional<const string&>& addr,
+                   const char* kind, MonoDelta timeout) {
   // In general, processes do not expose the port they bind to, and
   // reimplementing lsof involves parsing a lot of files in /proc/. So,
   // requiring lsof for tests and parsing its output seems more
@@ -394,15 +400,52 @@ Status WaitForBind(pid_t pid, uint16_t* port, const char* kind, MonoDelta timeou
     "-a", "-i", kind
   };
 
+  // The '-Ffn' flag gets lsof to output something like:
+  //   p5801
+  //   f548
+  //   n127.0.0.1:43954->127.0.0.1:43617
+  //   f549
+  //   n*:8038
+  //
+  // The first line is the pid. We ignore it.
+  // Subsequent lines come in pairs. In each pair, the first half of the pair
+  // is file descriptor number, we ignore it.
+  // The second half has the bind address and port.
+  //
+  // In this example, the first pair is an outbound TCP socket. We ignore it.
+  // The second pair is the listening TCP socket bind address and port.
+  //
+  // We use the first encountered listening TCP socket, since that's most likely
+  // to be the primary service port. When searching, we use the provided bind
+  // address if there is any, otherwise we use '*' (same as '0.0.0.0') which
+  // matches all addresses on the local machine.
+  string addr_pattern = Substitute("n$0:", (!addr || *addr == "0.0.0.0") ? "*" : *addr);
   MonoTime deadline = MonoTime::Now() + timeout;
   string lsof_out;
+  int32_t p = -1;
 
   for (int64_t i = 1; ; i++) {
     lsof_out.clear();
-    Status s = Subprocess::Call(cmd, "", &lsof_out);
+    Status s = Subprocess::Call(cmd, "", &lsof_out).AndThen([&] () {
+      StripTrailingNewline(&lsof_out);
+      vector<string> lines = strings::Split(lsof_out, "\n");
+      for (int index = 2; index < lines.size(); index += 2) {
+        StringPiece cur_line(lines[index]);
+        if (HasPrefixString(cur_line.ToString(), addr_pattern) &&
+            !cur_line.contains("->")) {
+          cur_line.remove_prefix(addr_pattern.size());
+          if (!safe_strto32(cur_line.data(), cur_line.size(), &p)) {
+            return Status::RuntimeError("unexpected lsof output", lsof_out);
+          }
+
+          return Status::OK();
+        }
+      }
+
+      return Status::RuntimeError("unexpected lsof output", lsof_out);
+    });
 
     if (s.ok()) {
-      StripTrailingNewline(&lsof_out);
       break;
     }
     if (deadline < MonoTime::Now()) {
@@ -412,22 +455,6 @@ Status WaitForBind(pid_t pid, uint16_t* port, const char* kind, MonoDelta timeou
     SleepFor(MonoDelta::FromMilliseconds(i * 10));
   }
 
-  // The '-Ffn' flag gets lsof to output something like:
-  //   p19730
-  //   f123
-  //   n*:41254
-  // The first line is the pid. We ignore it.
-  // The second line is the file descriptor number. We ignore it.
-  // The third line has the bind address and port.
-  // Subsequent lines show active connections.
-  vector<string> lines = strings::Split(lsof_out, "\n");
-  int32_t p = -1;
-  if (lines.size() < 3 ||
-      lines[2].substr(0, 3) != "n*:" ||
-      !safe_strto32(lines[2].substr(3), &p) ||
-      p <= 0) {
-    return Status::RuntimeError("unexpected lsof output", lsof_out);
-  }
   CHECK(p > 0 && p < std::numeric_limits<uint16_t>::max()) << "parsed invalid port: " << p;
   VLOG(1) << "Determined bound port: " << p;
   *port = p;
@@ -435,12 +462,31 @@ Status WaitForBind(pid_t pid, uint16_t* port, const char* kind, MonoDelta timeou
 }
 } // anonymous namespace
 
-Status WaitForTcpBind(pid_t pid, uint16_t* port, MonoDelta timeout) {
-  return WaitForBind(pid, port, "4TCP", timeout);
+Status WaitForTcpBind(pid_t pid, uint16_t* port,
+                      const optional<const string&>& addr,
+                      MonoDelta timeout) {
+  return WaitForBind(pid, port, addr, "4TCP", timeout);
+}
+
+Status WaitForUdpBind(pid_t pid, uint16_t* port,
+                      const optional<const string&>& addr,
+                      MonoDelta timeout) {
+  return WaitForBind(pid, port, addr, "4UDP", timeout);
 }
 
-Status WaitForUdpBind(pid_t pid, uint16_t* port, MonoDelta timeout) {
-  return WaitForBind(pid, port, "4UDP", timeout);
+Status FindHomeDir(const string& name, const string& bin_dir, string* home_dir) {
+  string name_upper;
+  ToUpperCase(name, &name_upper);
+
+  string env_var = Substitute("$0_HOME", name_upper);
+  const char* env = std::getenv(env_var.c_str());
+  string dir = env == nullptr ? JoinPathSegments(bin_dir, Substitute("$0-home", name)) : env;
+
+  if (!Env::Default()->FileExists(dir)) {
+    return Status::NotFound(Substitute("$0 directory does not exist", env_var), dir);
+  }
+  *home_dir = dir;
+  return Status::OK();
 }
 
 } // namespace kudu
diff --git a/be/src/kudu/util/test_util.h b/be/src/kudu/util/test_util.h
index 8090fbc..d320e3e 100644
--- a/be/src/kudu/util/test_util.h
+++ b/be/src/kudu/util/test_util.h
@@ -26,11 +26,19 @@
 #include <memory>
 #include <string>
 
+#include <boost/optional/optional.hpp>
 #include <gtest/gtest.h>
 
 #include "kudu/gutil/port.h"
 #include "kudu/util/monotime.h"
 
+#define SKIP_IF_SLOW_NOT_ALLOWED() do { \
+  if (!AllowSlowTests()) { \
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; \
+    return; \
+  } \
+} while (0)
+
 #define ASSERT_EVENTUALLY(expr) do { \
   AssertEventually(expr); \
   NO_PENDING_FATALS(); \
@@ -136,11 +144,26 @@ void AssertEventually(const std::function<void(void)>& f,
 // unlike the usual behavior of path globs.
 int CountOpenFds(Env* env, const std::string& path_pattern);
 
-// Waits for the subprocess to bind to any listening TCP port, and returns the port.
-Status WaitForTcpBind(pid_t pid, uint16_t* port, MonoDelta timeout) WARN_UNUSED_RESULT;
+// Waits for the subprocess to bind to any listening TCP port on the provided
+// IP address (if the address is not provided, it is a wildcard binding), and
+// returns the port.
+Status WaitForTcpBind(pid_t pid, uint16_t* port,
+                      const boost::optional<const std::string&>& addr,
+                      MonoDelta timeout) WARN_UNUSED_RESULT;
 
-// Waits for the subprocess to bind to any listening UDP port, and returns the port.
-Status WaitForUdpBind(pid_t pid, uint16_t* port, MonoDelta timeout) WARN_UNUSED_RESULT;
+// Similar to above but binds to any listening UDP port.
+Status WaitForUdpBind(pid_t pid, uint16_t* port,
+                      const boost::optional<const std::string&>& addr,
+                      MonoDelta timeout) WARN_UNUSED_RESULT;
+
+// Find the home directory of a Java-style application, e.g. JAVA_HOME or
+// HADOOP_HOME.
+//
+// Checks the environment, or falls back to a symlink in the bin installation
+// directory.
+Status FindHomeDir(const std::string& name,
+                   const std::string& bin_dir,
+                   std::string* home_dir) WARN_UNUSED_RESULT;
 
 } // namespace kudu
 #endif