You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/07/01 20:20:31 UTC
[impala] 02/02: Update kudu/security from
9ebcb77aa911aae76c48e717af24e643cb81908d
This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit b8e20905a80f6541efdb2161333fb1552a976135
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Fri Jun 28 11:16:45 2019 -0700
Update kudu/security from 9ebcb77aa911aae76c48e717af24e643cb81908d
This updates the kudu security code to the latest version, which
includes support for GSSAPI calls, necessary for SPNEGO.
This is a straight rsync of the kudu/util source code except for some
minor CMakeLists changes that were carried over from the old version.
Change-Id: Ie7c91193fd49f8ca1234b23cf61fc90c1fdbe2e0
Reviewed-on: http://gerrit.cloudera.org:8080/13767
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/kudu/security/CMakeLists.txt | 2 +
be/src/kudu/security/gssapi.cc | 164 ++++++++++++++++
be/src/kudu/security/gssapi.h | 59 ++++++
be/src/kudu/security/init.cc | 12 ++
be/src/kudu/security/openssl_util.cc | 41 +++-
be/src/kudu/security/test/mini_kdc.cc | 34 +++-
be/src/kudu/security/test/mini_kdc.h | 8 +
be/src/kudu/security/tls_socket-test.cc | 71 ++++---
be/src/kudu/security/tls_socket.cc | 12 +-
be/src/kudu/security/token-test.cc | 322 ++++++++++++++++++++++++++------
be/src/kudu/security/token.proto | 31 +++
be/src/kudu/security/token_signer.cc | 36 +++-
be/src/kudu/security/token_signer.h | 89 ++++++---
be/src/kudu/security/token_verifier.cc | 14 +-
be/src/kudu/util/test_util.cc | 92 ++++++---
be/src/kudu/util/test_util.h | 31 ++-
16 files changed, 847 insertions(+), 171 deletions(-)
diff --git a/be/src/kudu/security/CMakeLists.txt b/be/src/kudu/security/CMakeLists.txt
index 63d6341..9533523 100644
--- a/be/src/kudu/security/CMakeLists.txt
+++ b/be/src/kudu/security/CMakeLists.txt
@@ -68,6 +68,7 @@ set(SECURITY_SRCS
cert.cc
crypto.cc
kerberos_util.cc
+ gssapi.cc
init.cc
openssl_util.cc
${PORTED_X509_CHECK_HOST_CC}
@@ -86,6 +87,7 @@ set(SECURITY_LIBS
kudu_util
token_proto
+ gssapi_krb5
krb5
openssl_crypto
openssl_ssl)
diff --git a/be/src/kudu/security/gssapi.cc b/be/src/kudu/security/gssapi.cc
new file mode 100644
index 0000000..6797ec3
--- /dev/null
+++ b/be/src/kudu/security/gssapi.cc
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/gssapi.h"
+
+#include <cstring>
+#include <string>
+#include <utility>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/escaping.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+
+using std::string;
+
+namespace kudu {
+namespace gssapi {
+
+namespace {
+
+string ErrorToString(OM_uint32 code, OM_uint32 type) {
+ string description;
+ OM_uint32 message_context = 0;
+
+ do {
+ if (!description.empty()) {
+ description.append(": ");
+ }
+ OM_uint32 minor = 0;
+ gss_buffer_desc buf;
+ gss_display_status(&minor, code, type, GSS_C_NULL_OID, &message_context, &buf);
+ const char* err = static_cast<const char*>(buf.value);
+ // The error message buffer returned has an explicit length, but in some cases
+ // this length appears to include the null terminator character, and in others
+ // it doesn't. So, we trim off any null terminators if necessary.
+ int err_len = strnlen(err, buf.length);
+ description.append(err, err_len);
+
+ // NOTE: call gss_release_buffer explicitly here instead of ReleaseBufferOrWarn
+ // to avoid the potential for infinite recursion back into ErrorToString.
+ gss_release_buffer(&minor, &buf);
+ } while (message_context != 0);
+
+ return description;
+}
+
+// Wrap a call to F(...), which is typically one of the gss_* functions from
+// the gssapi library. These functions all return a major status code and
+// also provide a minor status code as their first out-param.
+template <class F, class... Args>
+Status WrapGssCall(F func, Args&&... args) {
+ OM_uint32 minor = 0;
+ OM_uint32 major = func(&minor, std::forward<Args>(args)...);
+ return MajorMinorToStatus(major, minor);
+}
+
+void ReleaseNameOrWarn(gss_name_t name) {
+ if (name == GSS_C_NO_NAME) return;
+ WARN_NOT_OK(WrapGssCall(gss_release_name, &name), "Unable to release GSS name");
+}
+
+void ReleaseBufferOrWarn(gss_buffer_t buf) {
+ if (buf == GSS_C_NO_BUFFER) return;
+ WARN_NOT_OK(WrapGssCall(gss_release_buffer, buf), "Unable to release GSS buffer");
+}
+
+void ReleaseContextOrWarn(gss_ctx_id_t ctx) {
+ if (ctx == GSS_C_NO_CONTEXT) return;
+ WARN_NOT_OK(WrapGssCall(gss_delete_sec_context, &ctx, GSS_C_NO_BUFFER),
+ "Unable to release GSS context");
+}
+
+} // anonymous namespace
+
+
+Status MajorMinorToStatus(OM_uint32 major, OM_uint32 minor) {
+ if (GSS_ERROR(major)) {
+ string maj_str = ErrorToString(major, GSS_C_GSS_CODE);
+ string min_str = minor != 0 ? ErrorToString(minor, GSS_C_MECH_CODE) : "";
+ return Status::NotAuthorized(maj_str, min_str);
+ }
+ if (major == GSS_S_CONTINUE_NEEDED) {
+ return Status::Incomplete("");
+ }
+
+ return Status::OK();
+}
+
+
+Status SpnegoStep(const string& in_token_b64,
+ string* out_token_b64,
+ bool* complete,
+ string* authenticated_principal) {
+ string token;
+ if (!strings::Base64Unescape(in_token_b64, &token)) {
+ return Status::InvalidArgument("invalid base64 encoding for token");
+ }
+
+ // Workaround MIT krb5 bug [1] fixed in krb5 1.16 and 1.15.3:
+ //
+ // gssint_get_mech_type_oid() was missing some length verification that could
+ // cause reads past the end of the input token. So, we extend the actual
+ // allocation of the input token an extra 256 bytes of padding.
+ //
+ // Without this fix, our ASAN builds fail with an out-of-bounds read.
+ //
+ // [1] http://krbdev.mit.edu/rt/Ticket/History.html?id=8620
+ size_t real_token_size = token.size();
+ token.resize(real_token_size + 256);
+
+ gss_buffer_desc input_token {real_token_size, const_cast<char*>(token.data())};
+
+ gss_ctx_id_t ctx = GSS_C_NO_CONTEXT;
+ gss_name_t client_name = GSS_C_NO_NAME;
+ SCOPED_CLEANUP({ ReleaseNameOrWarn(client_name); });
+
+ gss_buffer_desc out_token {0, nullptr};
+ SCOPED_CLEANUP({ ReleaseBufferOrWarn(&out_token); });
+ Status s = WrapGssCall(gss_accept_sec_context, &ctx, GSS_C_NO_CREDENTIAL,
+ &input_token, GSS_C_NO_CHANNEL_BINDINGS,
+ &client_name, /*mech_type=*/ nullptr,
+ &out_token, /*ret_flags=*/nullptr,
+ /*time_rec=*/nullptr, /*delegated_cred_handle=*/nullptr);
+ SCOPED_CLEANUP({ ReleaseContextOrWarn(ctx); });
+ if (!s.ok() && !s.IsIncomplete()) {
+ return s;
+ }
+ *complete = s.ok();
+
+ if (*complete) {
+ gss_buffer_desc name_buf {0, nullptr};
+ SCOPED_CLEANUP({ ReleaseBufferOrWarn(&name_buf); });
+ RETURN_NOT_OK_PREPEND(WrapGssCall(gss_display_name, client_name, &name_buf, nullptr),
+ "Unable to extract authenticated principal name");
+ authenticated_principal->assign(reinterpret_cast<char*>(name_buf.value),
+ name_buf.length);
+ }
+
+ string out_token_str(reinterpret_cast<char*>(out_token.value),
+ out_token.length);
+ strings::Base64Escape(out_token_str, out_token_b64);
+
+ return Status::OK();
+}
+
+
+} // namespace gssapi
+} // namespace kudu
diff --git a/be/src/kudu/security/gssapi.h b/be/src/kudu/security/gssapi.h
new file mode 100644
index 0000000..036b3d4
--- /dev/null
+++ b/be/src/kudu/security/gssapi.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+
+#include <gssapi/gssapi.h>
+
+namespace kudu {
+
+class Status;
+
+namespace gssapi {
+
+// Convert the given major/minor GSSAPI error codes into a Status.
+Status MajorMinorToStatus(OM_uint32 major, OM_uint32 minor);
+
+// Run a step of SPNEGO authentication.
+//
+// 'in_token_b64' is the base64-encoded token provided by the client, which may be empty
+// if the client did not provide any such token (e.g. if the HTTP 'Authorization' header
+// was not present).
+
+// 'out_token_b64' is the base64-encoded output token to send back to the client
+// during this round of negotiation.
+//
+// If any error occurs (eg an invalid token is provided), a bad Status is returned.
+//
+// An OK status indicates that the negotiation is proceeding successfully, or has
+// completed, whereas a non-OK status indicates an error or an unsuccessful
+// authentication (in which case the out-parameters will not be modified).
+//
+// In the case of an OK status, '*complete' indicates whether any further rounds are
+// required. On completion of negotiation, 'authenticated_principal' will be set to the
+// full principal name of the remote user.
+//
+// NOTE: per the SPNEGO protocol, the final "complete" negotiation stage may
+// include a token.
+Status SpnegoStep(const std::string& in_token_b64,
+ std::string* out_token_b64,
+ bool* complete,
+ std::string* authenticated_principal);
+
+} // namespace gssapi
+} // namespace kudu
diff --git a/be/src/kudu/security/init.cc b/be/src/kudu/security/init.cc
index 64ee1a5..200411f 100644
--- a/be/src/kudu/security/init.cc
+++ b/be/src/kudu/security/init.cc
@@ -48,6 +48,13 @@
#include "kudu/util/status.h"
#include "kudu/util/thread.h"
+#if defined(__APPLE__)
+// Almost all functions in the krb5 API are marked as deprecated in favor
+// of GSS.framework in macOS.
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
+#endif // #if defined(__APPLE__)
+
#ifndef __APPLE__
static constexpr bool kDefaultSystemAuthToLocal = true;
#else
@@ -56,6 +63,7 @@ static constexpr bool kDefaultSystemAuthToLocal = true;
// implementation.
static constexpr bool kDefaultSystemAuthToLocal = false;
#endif
+
DEFINE_bool(use_system_auth_to_local, kDefaultSystemAuthToLocal,
"When enabled, use the system krb5 library to map Kerberos principal "
"names to local (short) usernames. If not enabled, the first component "
@@ -475,3 +483,7 @@ Status InitKerberosForServer(const std::string& raw_principal, const std::string
} // namespace security
} // namespace kudu
+
+#if defined(__APPLE__)
+#pragma GCC diagnostic pop
+#endif // #if defined(__APPLE__)
diff --git a/be/src/kudu/security/openssl_util.cc b/be/src/kudu/security/openssl_util.cc
index a32140f..e96fcb3 100644
--- a/be/src/kudu/security/openssl_util.cc
+++ b/be/src/kudu/security/openssl_util.cc
@@ -82,25 +82,49 @@ void LockingCB(int mode, int type, const char* /*file*/, int /*line*/) {
#endif
Status CheckOpenSSLInitialized() {
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+ // Starting with OpenSSL 1.1.0, the old thread API became obsolete
+ // (see changelist 2e52e7df5 in the OpenSSL upstream repo), and
+ // CRYPTO_get_locking_callback() always returns nullptr. Also, the library
+ // always initializes its internals for multi-threaded usage.
+ // Another point is that starting with version 1.1.0, SSL_CTX_new()
+ // initializes the OpenSSL library under the hood, so SSL_CTX_new() would
+ // not return nullptr unless there was an error during the initialization
+ // of the library. That makes this code in CheckOpenSSLInitialized() obsolete
+ // starting with OpenSSL version 1.1.0.
+ //
+ // Starting with OpenSSL 1.1.0, there isn't a straightforward way to
+ // determine whether the library has already been initialized if using just
+ // the API (well, there is CRYPTO_secure_malloc_initialized() but that's just
+ // for the crypto library and it's implementation-dependent). But from the
+ // other side, the whole idea that this method should check whether the
+ // library has already been initialized is not relevant anymore: even if it's
+ // not yet initialized, the first call to SSL_CTX_new() (from, say,
+ // TlsContext::Init()) will initialize the library under the hood, so the
+ // library will be ready for multi-thread usage by Kudu.
if (!CRYPTO_get_locking_callback()) {
return Status::RuntimeError("Locking callback not initialized");
}
auto ctx = ssl_make_unique(SSL_CTX_new(SSLv23_method()));
if (!ctx) {
ERR_clear_error();
- return Status::RuntimeError("SSL library appears uninitialized (cannot create SSL_CTX)");
+ return Status::RuntimeError(
+ "SSL library appears uninitialized (cannot create SSL_CTX)");
}
+#endif
return Status::OK();
}
void DoInitializeOpenSSL() {
-#if OPENSSL_VERSION_NUMBER > 0x10100000L
+#if OPENSSL_VERSION_NUMBER >= 0x10100000L
// The OPENSSL_init_ssl manpage [1] says "As of version 1.1.0 OpenSSL will
// automatically allocate all resources it needs so no explicit initialisation
// is required." However, eliding library initialization leads to a memory
- // leak in some versions of OpenSSL 1.1 when the first OpenSSL is
- // ERR_peek_error [2]. In Kudu this is often the
- // case due to prolific application of SCOPED_OPENSSL_NO_PENDING_ERRORS.
+ // leak in some versions of OpenSSL 1.1 when the first OpenSSL call is
+ // ERR_peek_error (see [2] for details; the issue was addressed in OpenSSL
+ // 1.1.0i (OPENSSL_VERSION_NUMBER 0x1010009f)). In Kudu this is often the
+ // case due to prolific application of the SCOPED_OPENSSL_NO_PENDING_ERRORS
+ // macro.
//
// Rather than determine whether this particular OpenSSL instance is
// leak-free, we'll initialize the library explicitly.
@@ -130,8 +154,11 @@ void DoInitializeOpenSSL() {
auto ctx = ssl_make_unique(SSL_CTX_new(SSLv23_method()));
if (ctx) {
LOG(WARNING) << "It appears that OpenSSL has been previously initialized by "
- << "code outside of Kudu. Please use kudu::client::DisableOpenSSLInitialization() "
- << "to avoid potential crashes due to conflicting initialization.";
+ "code outside of Kudu. Please first properly initialize "
+ "OpenSSL for multi-threaded usage (setting thread callback "
+ "functions for OpenSSL of versions earlier than 1.1.0) and "
+ "then call kudu::client::DisableOpenSSLInitialization() "
+ "to avoid potential crashes due to conflicting initialization.";
// Continue anyway; all of the below is idempotent, except for the locking callback,
// which we check before overriding. They aren't thread-safe, however -- that's why
// we try to get embedding applications to do the right thing here rather than risk a
diff --git a/be/src/kudu/security/test/mini_kdc.cc b/be/src/kudu/security/test/mini_kdc.cc
index 4f987c5..1662770 100644
--- a/be/src/kudu/security/test/mini_kdc.cc
+++ b/be/src/kudu/security/test/mini_kdc.cc
@@ -25,6 +25,7 @@
#include <string>
#include <utility>
+#include <boost/optional/optional.hpp>
#include <glog/logging.h>
#include "kudu/gutil/strings/strip.h"
@@ -37,6 +38,7 @@
#include "kudu/util/subprocess.h"
#include "kudu/util/test_util.h"
+using boost::none;
using std::map;
using std::string;
using std::unique_ptr;
@@ -61,9 +63,7 @@ MiniKdc::MiniKdc(MiniKdcOptions options)
options_.realm = "KRBTEST.COM";
}
if (options_.data_root.empty()) {
- // We hardcode "/tmp" here since the original function which initializes a random test
- // directory (GetTestDataDirectory()), depends on gmock.
- options_.data_root = JoinPathSegments("/tmp", "krb5kdc");
+ options_.data_root = JoinPathSegments(GetTestDataDirectory(), "krb5kdc");
}
if (options_.ticket_lifetime.empty()) {
options_.ticket_lifetime = "24h";
@@ -152,8 +152,10 @@ Status MiniKdc::Start() {
RETURN_NOT_OK(kdc_process_->Start());
const bool need_config_update = (options_.port == 0);
- // Wait for KDC to start listening on its ports and commencing operation.
- RETURN_NOT_OK(WaitForUdpBind(kdc_process_->pid(), &options_.port, MonoDelta::FromSeconds(1)));
+ // Wait for KDC to start listening on its ports and commencing operation
+ // with a wildcard binding.
+ RETURN_NOT_OK(WaitForUdpBind(kdc_process_->pid(), &options_.port,
+ /*addr=*/none, MonoDelta::FromSeconds(1)));
if (need_config_update) {
// If we asked for an ephemeral port, grab the actual ports and
@@ -274,6 +276,28 @@ Status MiniKdc::CreateServiceKeytab(const string& spn,
return Status::OK();
}
+Status MiniKdc::RandomizePrincipalKey(const string& spn) {
+ SCOPED_LOG_SLOW_EXECUTION(WARNING, 100, Substitute("randomizing key for $0", spn));
+ string kadmin;
+ RETURN_NOT_OK(GetBinaryPath("kadmin.local", &kadmin));
+ RETURN_NOT_OK(Subprocess::Call(MakeArgv({
+ kadmin, "-q", Substitute("change_password -randkey $0", spn)})));
+ return Status::OK();
+}
+
+Status MiniKdc::CreateKeytabForExistingPrincipal(const string& spn) {
+ SCOPED_LOG_SLOW_EXECUTION(WARNING, 100, Substitute("creating keytab for $0", spn));
+ string kt_path = spn;
+ StripString(&kt_path, "/", '_');
+ kt_path = JoinPathSegments(options_.data_root, kt_path) + ".keytab";
+
+ string kadmin;
+ RETURN_NOT_OK(GetBinaryPath("kadmin.local", &kadmin));
+ RETURN_NOT_OK(Subprocess::Call(MakeArgv({
+ kadmin, "-q", Substitute("xst -norandkey -k $0 $1", kt_path, spn)})));
+ return Status::OK();
+}
+
Status MiniKdc::Kinit(const string& username) {
SCOPED_LOG_SLOW_EXECUTION(WARNING, 100, Substitute("kinit for $0", username));
string kinit;
diff --git a/be/src/kudu/security/test/mini_kdc.h b/be/src/kudu/security/test/mini_kdc.h
index e282cc4..94990a2 100644
--- a/be/src/kudu/security/test/mini_kdc.h
+++ b/be/src/kudu/security/test/mini_kdc.h
@@ -92,6 +92,14 @@ class MiniKdc {
// will be reset and a new keytab will be generated.
Status CreateServiceKeytab(const std::string& spn, std::string* path);
+ // Randomize the key for the given SPN. This invalidates any previously-produced
+ // keytabs.
+ Status RandomizePrincipalKey(const std::string& spn);
+
+ // Creates a keytab for an existing principal.
+ // 'spn' is the desired service principal name (e.g. "kudu/foo.example.com").
+ Status CreateKeytabForExistingPrincipal(const std::string& spn);
+
// Kinit a user to the mini KDC.
Status Kinit(const std::string& username) WARN_UNUSED_RESULT;
diff --git a/be/src/kudu/security/tls_socket-test.cc b/be/src/kudu/security/tls_socket-test.cc
index 001a206..b88cdf4 100644
--- a/be/src/kudu/security/tls_socket-test.cc
+++ b/be/src/kudu/security/tls_socket-test.cc
@@ -58,7 +58,7 @@ using std::vector;
namespace kudu {
namespace security {
-const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
// Size is big enough to not fit into output socket buffer of default size
// (controlled by setsockopt() with SO_SNDBUF).
@@ -155,18 +155,18 @@ class EchoServer {
CHECK_OK(sock->SetRecvTimeout(kTimeout));
unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
// An "echo" loop for kEchoChunkSize byte buffers.
- while (!stop_) {
+ while (!is_stopped_) {
size_t n;
Status s = sock->BlockingRecv(buf.get(), kEchoChunkSize, &n, MonoTime::Now() + kTimeout);
if (!s.ok()) {
- CHECK(stop_) << "unexpected error reading: " << s.ToString();
+ CHECK(is_stopped_) << "unexpected error reading: " << s.ToString();
}
LOG(INFO) << "server echoing " << n << " bytes";
size_t written;
s = sock->BlockingWrite(buf.get(), n, &written, MonoTime::Now() + kTimeout);
if (!s.ok()) {
- CHECK(stop_) << "unexpected error writing: " << s.ToString();
+ CHECK(is_stopped_) << "unexpected error writing: " << s.ToString();
}
if (slow_read_) {
SleepFor(MonoDelta::FromMilliseconds(10));
@@ -185,12 +185,12 @@ class EchoServer {
return listen_addr_;
}
- bool stopped() const {
- return stop_;
+ bool is_stopped() const {
+ return is_stopped_;
}
void Stop() {
- stop_ = true;
+ is_stopped_ = true;
}
void Join() {
thread_.join();
@@ -208,38 +208,49 @@ class EchoServer {
thread thread_;
pthread_t pthread_;
CountDownLatch pthread_sync_;
- std::atomic<bool> stop_ { false };
+ std::atomic<bool> is_stopped_ { false };
bool slow_read_ = false;
};
void handler(int /* signal */) {}
+// Test that errors returned when reading from a TLS socket are reasonable and
+// contain the address of the remote.
TEST_F(TlsSocketTest, TestRecvFailure) {
- EchoServer server;
- server.Start();
- unique_ptr<Socket> client_sock;
- NO_FATALS(ConnectClient(server.listen_addr(), &client_sock));
- unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
-
- SleepFor(MonoDelta::FromMilliseconds(100));
- server.Stop();
-
- size_t nwritten;
- ASSERT_OK(client_sock->BlockingWrite(buf.get(), kEchoChunkSize, &nwritten,
- MonoTime::Now() + kTimeout));
- size_t nread;
+ EchoServer server;
+ server.Start();
+ unique_ptr<Socket> client_sock;
+ NO_FATALS(ConnectClient(server.listen_addr(), &client_sock));
+ unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
- ASSERT_OK(client_sock->BlockingRecv(buf.get(), kEchoChunkSize, &nread,
- MonoTime::Now() + kTimeout));
+ // The client writes to the server. This blocks on the server receiving,
+ // so once this completes the server is in its "echo loop".
+ size_t nwritten_unused;
+ ASSERT_OK(client_sock->BlockingWrite(buf.get(),
+ kEchoChunkSize,
+ &nwritten_unused,
+ MonoTime::Now() + kTimeout));
- Status s = client_sock->BlockingRecv(buf.get(), kEchoChunkSize, &nread,
- MonoTime::Now() + kTimeout);
+ // Stop the server. The client can still Recv once from the server because
+ // the server is trying to echo what was sent by the client.
+ server.Stop();
- ASSERT_TRUE(!s.ok());
- ASSERT_TRUE(s.IsNetworkError());
- ASSERT_STR_MATCHES(s.message().ToString(), "BlockingRecv error: failed to read from "
- "TLS socket \\(remote: 127.0.0.1:[0-9]+\\): ");
+ // The first Recv succeeds. This unblocks the server and causes it to exit.
+ size_t nread_unused;
+ ASSERT_OK(client_sock->BlockingRecv(buf.get(),
+ kEchoChunkSize,
+ &nread_unused,
+ MonoTime::Now() + kTimeout));
+
+ // The server has closed the connection, so the second Recv will fail.
+ Status s = client_sock->BlockingRecv(buf.get(),
+ kEchoChunkSize,
+ &nread_unused,
+ MonoTime::Now() + kTimeout);
+ ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+ ASSERT_STR_MATCHES(s.message().ToString(), "BlockingRecv error: failed to read from "
+ "TLS socket \\(remote: 127.0.0.1:[0-9]+\\): ");
}
// Test for failures to handle EINTR during TLS connection
@@ -257,7 +268,7 @@ TEST_F(TlsSocketTest, TestTlsSocketInterrupted) {
// Start a thread to send signals to the server thread.
thread killer([&]() {
- while (!server.stopped()) {
+ while (!server.is_stopped()) {
PCHECK(pthread_kill(server.pthread(), SIGUSR2) == 0);
SleepFor(MonoDelta::FromMicroseconds(rand() % 10));
}
diff --git a/be/src/kudu/security/tls_socket.cc b/be/src/kudu/security/tls_socket.cc
index 355f04b..a586315 100644
--- a/be/src/kudu/security/tls_socket.cc
+++ b/be/src/kudu/security/tls_socket.cc
@@ -33,6 +33,9 @@
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/net/socket.h"
+using std::string;
+using strings::Substitute;
+
namespace kudu {
namespace security {
@@ -114,9 +117,10 @@ Status TlsSocket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) {
int save_errno = errno;
if (bytes_read <= 0) {
Sockaddr remote;
- Socket::GetPeerAddress(&remote);
- std::string kErrString = strings::Substitute("failed to read from TLS socket (remote: $0)",
- remote.ToString());
+ Status s = GetPeerAddress(&remote);
+ const string remote_str = s.ok() ? remote.ToString() : "unknown";
+ string kErrString = Substitute("failed to read from TLS socket (remote: $0)",
+ remote_str);
if (bytes_read == 0 && SSL_get_shutdown(ssl_.get()) == SSL_RECEIVED_SHUTDOWN) {
return Status::NetworkError(kErrString, ErrnoToString(ESHUTDOWN), ESHUTDOWN);
@@ -124,7 +128,7 @@ Status TlsSocket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) {
auto error_code = SSL_get_error(ssl_.get(), bytes_read);
if (error_code == SSL_ERROR_WANT_READ) {
if (save_errno != 0) {
- return Status::NetworkError("SSL_read error from " + remote.ToString(),
+ return Status::NetworkError("SSL_read error from " + remote_str,
ErrnoToString(save_errno), save_errno);
}
// Nothing available to read yet.
diff --git a/be/src/kudu/security/token-test.cc b/be/src/kudu/security/token-test.cc
index 7172a51..a4d7804 100644
--- a/be/src/kudu/security/token-test.cc
+++ b/be/src/kudu/security/token-test.cc
@@ -18,14 +18,16 @@
#include <cstdint>
#include <deque>
#include <memory>
+#include <ostream>
#include <string>
+#include <thread>
#include <utility>
#include <vector>
-#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
+#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/walltime.h"
#include "kudu/security/crypto.h"
#include "kudu/security/openssl_util.h"
@@ -33,23 +35,68 @@
#include "kudu/security/token_signer.h"
#include "kudu/security/token_signing_key.h"
#include "kudu/security/token_verifier.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/logging.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
-DECLARE_int32(tsk_num_rsa_bits);
-
-using std::string;
+using kudu::pb_util::SecureDebugString;
using std::make_shared;
+using std::string;
+using std::thread;
using std::unique_ptr;
using std::vector;
+using strings::Substitute;
namespace kudu {
namespace security {
namespace {
+// Dummy variables to use when their values don't matter much.
+const int kNumBits = 512;
+const int64_t kTokenValiditySeconds = 10;
+const char kUser[] = "user";
+
+// Repeatedly signs tokens and attempts to rotate TSKs until the active TSK's
+// sequence number passes `seq_num`, returning the last token signed by the TSK
+// at `seq_num`. This token is roughly the last possible token signed in the
+// TSK's activity interval.
+// The TokenGenerator 'generate_token' is a lambda that fills in a
+// SignedTokenPB and returns a Status.
+template <class TokenGenerator>
+Status SignUntilRotatePast(TokenSigner* signer, TokenGenerator generate_token,
+ const string& token_type, int64_t seq_num,
+ SignedTokenPB* last_signed_by_tsk) {
+ SignedTokenPB last_signed;
+ RETURN_NOT_OK_PREPEND(generate_token(&last_signed),
+ Substitute("Failed to generate first $0 token", token_type));
+ DCHECK_EQ(seq_num, last_signed.signing_key_seq_num())
+ << Substitute("Unexpected starting seq_num for $0 token", token_type);
+
+ auto cur_seq_num = seq_num;
+ while (cur_seq_num == seq_num) {
+ SleepFor(MonoDelta::FromMilliseconds(50));
+ KLOG_EVERY_N_SECS(INFO, 1) <<
+ Substitute("Generating $0 token for activity interval $1", token_type, seq_num);
+ RETURN_NOT_OK_PREPEND(signer->TryRotateKey(), "Failed to attempt to rotate key");
+ SignedTokenPB signed_token;
+ RETURN_NOT_OK_PREPEND(generate_token(&signed_token),
+ Substitute("Failed to generate $0 token", token_type));
+ // We want to return the last token signed by the `seq_num` TSK, so only
+ // update it when appropriate.
+ cur_seq_num = signed_token.signing_key_seq_num();
+ if (cur_seq_num == seq_num) {
+ last_signed = std::move(signed_token);
+ }
+ }
+ *last_signed_by_tsk = std::move(last_signed);
+ return Status::OK();
+}
+
SignedTokenPB MakeUnsignedToken(int64_t expiration) {
SignedTokenPB ret;
TokenPB token;
@@ -70,7 +117,7 @@ SignedTokenPB MakeIncompatibleToken() {
// Generate public key as a string in DER format for tests.
Status GeneratePublicKeyStrDer(string* ret) {
PrivateKey private_key;
- RETURN_NOT_OK(GeneratePrivateKey(512, &private_key));
+ RETURN_NOT_OK(GeneratePrivateKey(kNumBits, &private_key));
PublicKey public_key;
RETURN_NOT_OK(private_key.GetPublicKey(&public_key));
string public_key_str_der;
@@ -85,7 +132,7 @@ Status GenerateTokenSigningKey(int64_t seq_num,
unique_ptr<TokenSigningPrivateKey>* tsk) {
{
unique_ptr<PrivateKey> private_key(new PrivateKey);
- RETURN_NOT_OK(GeneratePrivateKey(512, private_key.get()));
+ RETURN_NOT_OK(GeneratePrivateKey(kNumBits, private_key.get()));
tsk->reset(new TokenSigningPrivateKey(
seq_num, expire_time_seconds, std::move(private_key)));
}
@@ -99,14 +146,14 @@ void CheckAndAddNextKey(int iter_num,
ASSERT_NE(nullptr, key_seq_num);
int64_t seq_num;
{
- std::unique_ptr<TokenSigningPrivateKey> key;
+ unique_ptr<TokenSigningPrivateKey> key;
ASSERT_OK(signer->CheckNeedKey(&key));
ASSERT_NE(nullptr, key.get());
seq_num = key->key_seq_num();
}
for (int i = 0; i < iter_num; ++i) {
- std::unique_ptr<TokenSigningPrivateKey> key;
+ unique_ptr<TokenSigningPrivateKey> key;
ASSERT_OK(signer->CheckNeedKey(&key));
ASSERT_NE(nullptr, key.get());
ASSERT_EQ(seq_num, key->key_seq_num());
@@ -124,7 +171,7 @@ class TokenTest : public KuduTest {
};
TEST_F(TokenTest, TestInit) {
- TokenSigner signer(10, 10);
+ TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 10);
const TokenVerifier& verifier(signer.verifier());
SignedTokenPB token = MakeUnsignedToken(WallTime_Now());
@@ -133,7 +180,7 @@ TEST_F(TokenTest, TestInit) {
static const int64_t kKeySeqNum = 100;
PrivateKey private_key;
- ASSERT_OK(GeneratePrivateKey(512, &private_key));
+ ASSERT_OK(GeneratePrivateKey(kNumBits, &private_key));
string private_key_str_der;
ASSERT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER));
TokenSigningPrivateKeyPB pb;
@@ -163,9 +210,10 @@ TEST_F(TokenTest, TestInit) {
TEST_F(TokenTest, TestTokenSignerNonSparseSequenceNumbers) {
static const int kIterNum = 3;
static const int64_t kAuthnTokenValiditySeconds = 1;
+ static const int64_t kAuthzTokenValiditySeconds = 1;
static const int64_t kKeyRotationSeconds = 1;
- TokenSigner signer(kAuthnTokenValiditySeconds, kKeyRotationSeconds);
+ TokenSigner signer(kAuthnTokenValiditySeconds, kAuthzTokenValiditySeconds, kKeyRotationSeconds);
int64_t seq_num_first_key;
NO_FATALS(CheckAndAddNextKey(kIterNum, &signer, &seq_num_first_key));
@@ -192,12 +240,10 @@ TEST_F(TokenTest, TestTokenSignerNonSparseSequenceNumbers) {
// import should be greater than any sequence number the TokenSigner has seen
// during the import.
TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
- static const int64_t kAuthnTokenValiditySeconds = 8;
static const int64_t kKeyRotationSeconds = 8;
- static const int64_t kKeyValiditySeconds =
- kAuthnTokenValiditySeconds + 2 * kKeyRotationSeconds;
- TokenSigner signer(kAuthnTokenValiditySeconds, kKeyRotationSeconds);
+ TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, kKeyRotationSeconds);
+ const int64_t key_validity_seconds = signer.key_validity_seconds_;
const TokenVerifier& verifier(signer.verifier());
static const int64_t kExpiredKeySeqNum = 100;
@@ -206,7 +252,7 @@ TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
// First, try to import already expired key to check that internal key
// sequence number advances correspondingly.
PrivateKey private_key;
- ASSERT_OK(GeneratePrivateKey(512, &private_key));
+ ASSERT_OK(GeneratePrivateKey(kNumBits, &private_key));
string private_key_str_der;
ASSERT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER));
TokenSigningPrivateKeyPB pb;
@@ -215,9 +261,7 @@ TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
pb.set_expire_unix_epoch_seconds(WallTime_Now() - 1);
ASSERT_OK(signer.ImportKeys({pb}));
- }
- {
// Check the result of importing keys: there should be no keys because
// the only one we tried to import was already expired.
vector<TokenSigningPublicKeyPB> public_keys(verifier.ExportKeys());
@@ -228,7 +272,7 @@ TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
// Now import valid (not yet expired) key, but with sequence number less
// than of the expired key.
PrivateKey private_key;
- ASSERT_OK(GeneratePrivateKey(512, &private_key));
+ ASSERT_OK(GeneratePrivateKey(kNumBits, &private_key));
string private_key_str_der;
ASSERT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER));
TokenSigningPrivateKeyPB pb;
@@ -237,19 +281,17 @@ TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
// Set the TSK's expiration time: make the key valid but past its activity
// interval.
pb.set_expire_unix_epoch_seconds(
- WallTime_Now() + (kKeyValiditySeconds - 2 * kKeyRotationSeconds - 1));
+ WallTime_Now() + (key_validity_seconds - 2 * kKeyRotationSeconds - 1));
ASSERT_OK(signer.ImportKeys({pb}));
- }
- {
- // Check the result of importing keys.
+ // Check the result of importing keys. The lower sequence number is
+ // accepted, even though we previously imported a key with a higher
+ // sequence number that was expired.
vector<TokenSigningPublicKeyPB> public_keys(verifier.ExportKeys());
ASSERT_EQ(1, public_keys.size());
ASSERT_EQ(kKeySeqNum, public_keys[0].key_seq_num());
- }
- {
// The newly imported key should be used to sign tokens.
SignedTokenPB token = MakeUnsignedToken(WallTime_Now());
ASSERT_OK(signer.SignToken(&token));
@@ -259,7 +301,7 @@ TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
}
{
- std::unique_ptr<TokenSigningPrivateKey> key;
+ unique_ptr<TokenSigningPrivateKey> key;
ASSERT_OK(signer.CheckNeedKey(&key));
ASSERT_NE(nullptr, key.get());
ASSERT_EQ(kExpiredKeySeqNum + 1, key->key_seq_num());
@@ -289,15 +331,19 @@ TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
// less or equal to the sequence number of the most 'recent' key.
TEST_F(TokenTest, TestAddKeyConstraints) {
{
- TokenSigner signer(1, 1);
- std::unique_ptr<TokenSigningPrivateKey> key;
+ // If a signer has not created a TSK yet, it will create a key, and will
+ // happily accept the generated key.
+ TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 1);
+ unique_ptr<TokenSigningPrivateKey> key;
ASSERT_OK(signer.CheckNeedKey(&key));
ASSERT_NE(nullptr, key.get());
ASSERT_OK(signer.AddKey(std::move(key)));
}
{
- TokenSigner signer(1, 1);
- std::unique_ptr<TokenSigningPrivateKey> key;
+ // If the key sequence number added to the signer isn't monotonically
+ // increasing, the signer will complain.
+ TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 1);
+ unique_ptr<TokenSigningPrivateKey> key;
ASSERT_OK(signer.CheckNeedKey(&key));
ASSERT_NE(nullptr, key.get());
const int64_t key_seq_num = key->key_seq_num();
@@ -308,10 +354,11 @@ TEST_F(TokenTest, TestAddKeyConstraints) {
": invalid key sequence number, should be at least ");
}
{
- TokenSigner signer(1, 1);
+ // Test importing expired keys. The signer should be OK with it.
+ TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 1);
static const int64_t kKeySeqNum = 100;
PrivateKey private_key;
- ASSERT_OK(GeneratePrivateKey(512, &private_key));
+ ASSERT_OK(GeneratePrivateKey(kNumBits, &private_key));
string private_key_str_der;
ASSERT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER));
TokenSigningPrivateKeyPB pb;
@@ -321,7 +368,9 @@ TEST_F(TokenTest, TestAddKeyConstraints) {
pb.set_expire_unix_epoch_seconds(WallTime_Now() - 1);
ASSERT_OK(signer.ImportKeys({pb}));
- std::unique_ptr<TokenSigningPrivateKey> key;
+ // Generated keys thereafter are expected to have higher sequence numbers
+ // than the imported expired keys.
+ unique_ptr<TokenSigningPrivateKey> key;
ASSERT_OK(signer.CheckNeedKey(&key));
ASSERT_NE(nullptr, key.get());
const int64_t key_seq_num = key->key_seq_num();
@@ -334,31 +383,62 @@ TEST_F(TokenTest, TestAddKeyConstraints) {
}
}
-TEST_F(TokenTest, TestGenerateAuthTokenNoUserName) {
- TokenSigner signer(10, 10);
+TEST_F(TokenTest, TestGenerateAuthnTokenNoUserName) {
+ TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 10);
SignedTokenPB signed_token_pb;
const Status& s = signer.GenerateAuthnToken("", &signed_token_pb);
EXPECT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "no username provided for authn token");
}
+TEST_F(TokenTest, TestGenerateAuthzToken) {
+ // We cannot generate tokens with no username associated with it.
+ auto verifier(make_shared<TokenVerifier>());
+ TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 10, verifier);
+ TablePrivilegePB table_privilege;
+ SignedTokenPB signed_token_pb;
+ Status s = signer.GenerateAuthzToken("", table_privilege, &signed_token_pb);
+ EXPECT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "no username provided for authz token");
+
+ // Generated tokens will have the specified privileges.
+ const string kAuthorized = "authzed";
+ unique_ptr<TokenSigningPrivateKey> key;
+ ASSERT_OK(signer.CheckNeedKey(&key));
+ ASSERT_NE(nullptr, key.get());
+ ASSERT_OK(signer.AddKey(std::move(key)));
+ ASSERT_OK(signer.GenerateAuthzToken(kAuthorized,
+ table_privilege,
+ &signed_token_pb));
+ ASSERT_TRUE(signed_token_pb.has_token_data());
+ TokenPB token_pb;
+ ASSERT_EQ(VerificationResult::VALID,
+ verifier->VerifyTokenSignature(signed_token_pb, &token_pb));
+ ASSERT_TRUE(token_pb.has_authz());
+ ASSERT_EQ(kAuthorized, token_pb.authz().username());
+ ASSERT_TRUE(token_pb.authz().has_table_privilege());
+ ASSERT_EQ(SecureDebugString(table_privilege),
+ SecureDebugString(token_pb.authz().table_privilege()));
+}
+
TEST_F(TokenTest, TestIsCurrentKeyValid) {
- static const int64_t kAuthnTokenValiditySeconds = 1;
+ // This test sleeps for a key validity period, so set it up to be short.
+ static const int64_t kShortTokenValiditySeconds = 1;
static const int64_t kKeyRotationSeconds = 1;
- static const int64_t kKeyValiditySeconds =
- kAuthnTokenValiditySeconds + 2 * kKeyRotationSeconds;
- TokenSigner signer(kAuthnTokenValiditySeconds, kKeyRotationSeconds);
+ TokenSigner signer(kShortTokenValiditySeconds, kShortTokenValiditySeconds, kKeyRotationSeconds);
+ static const int64_t key_validity_seconds = signer.key_validity_seconds_;
+
EXPECT_FALSE(signer.IsCurrentKeyValid());
{
- std::unique_ptr<TokenSigningPrivateKey> key;
+ unique_ptr<TokenSigningPrivateKey> key;
ASSERT_OK(signer.CheckNeedKey(&key));
// No keys are available yet, so should be able to add.
ASSERT_NE(nullptr, key.get());
ASSERT_OK(signer.AddKey(std::move(key)));
}
EXPECT_TRUE(signer.IsCurrentKeyValid());
- SleepFor(MonoDelta::FromSeconds(kKeyValiditySeconds));
+ SleepFor(MonoDelta::FromSeconds(key_validity_seconds));
// The key should expire after its validity interval.
EXPECT_FALSE(signer.IsCurrentKeyValid());
@@ -369,8 +449,8 @@ TEST_F(TokenTest, TestIsCurrentKeyValid) {
TEST_F(TokenTest, TestTokenSignerAddKeys) {
{
- TokenSigner signer(10, 10);
- std::unique_ptr<TokenSigningPrivateKey> key;
+ TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 10);
+ unique_ptr<TokenSigningPrivateKey> key;
ASSERT_OK(signer.CheckNeedKey(&key));
// No keys are available yet, so should be able to add.
ASSERT_NE(nullptr, key.get());
@@ -384,8 +464,8 @@ TEST_F(TokenTest, TestTokenSignerAddKeys) {
{
// Special configuration for TokenSigner: rotation interval is zero,
// so should be able to add two keys right away.
- TokenSigner signer(10, 0);
- std::unique_ptr<TokenSigningPrivateKey> key;
+ TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 0);
+ unique_ptr<TokenSigningPrivateKey> key;
ASSERT_OK(signer.CheckNeedKey(&key));
// No keys are available yet, so should be able to add.
ASSERT_NE(nullptr, key.get());
@@ -406,8 +486,8 @@ TEST_F(TokenTest, TestTokenSignerAddKeys) {
// It should not need next key right away, but should need next key after
// the rotation interval.
static const int64_t kKeyRotationIntervalSeconds = 8;
- TokenSigner signer(10, kKeyRotationIntervalSeconds);
- std::unique_ptr<TokenSigningPrivateKey> key;
+ TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, kKeyRotationIntervalSeconds);
+ unique_ptr<TokenSigningPrivateKey> key;
ASSERT_OK(signer.CheckNeedKey(&key));
// No keys are available yet, so should be able to add.
ASSERT_NE(nullptr, key.get());
@@ -433,7 +513,7 @@ TEST_F(TokenTest, TestTokenSignerAddKeys) {
// Test how key rotation works.
TEST_F(TokenTest, TestTokenSignerSignVerifyExport) {
// Key rotation interval 0 allows adding 2 keys in a row with no delay.
- TokenSigner signer(10, 0);
+ TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 0);
const TokenVerifier& verifier(signer.verifier());
// Should start off with no signing keys.
@@ -447,7 +527,7 @@ TEST_F(TokenTest, TestTokenSignerSignVerifyExport) {
// Generate and set a new key.
int64_t signing_key_seq_num;
{
- std::unique_ptr<TokenSigningPrivateKey> key;
+ unique_ptr<TokenSigningPrivateKey> key;
ASSERT_OK(signer.CheckNeedKey(&key));
ASSERT_NE(nullptr, key.get());
signing_key_seq_num = key->key_seq_num();
@@ -470,7 +550,7 @@ TEST_F(TokenTest, TestTokenSignerSignVerifyExport) {
// Set next key and check that we return the right keys.
int64_t next_signing_key_seq_num;
{
- std::unique_ptr<TokenSigningPrivateKey> key;
+ unique_ptr<TokenSigningPrivateKey> key;
ASSERT_OK(signer.CheckNeedKey(&key));
ASSERT_NE(nullptr, key.get());
next_signing_key_seq_num = key->key_seq_num();
@@ -482,7 +562,7 @@ TEST_F(TokenTest, TestTokenSignerSignVerifyExport) {
ASSERT_EQ(0, verifier.ExportKeys(next_signing_key_seq_num).size());
// The first key should be used for signing: the next one is saved
- // for the next round.
+ // for the next rotation.
{
SignedTokenPB token = MakeUnsignedToken(WallTime_Now());
ASSERT_OK(signer.SignToken(&token));
@@ -498,11 +578,11 @@ TEST_F(TokenTest, TestExportKeys) {
// and have an appropriate expiration.
const int64_t key_exp_seconds = 30;
const int64_t key_rotation_seconds = 10;
- TokenSigner signer(key_exp_seconds - 2 * key_rotation_seconds,
+ TokenSigner signer(key_exp_seconds - 2 * key_rotation_seconds, 0,
key_rotation_seconds);
int64_t key_seq_num;
{
- std::unique_ptr<TokenSigningPrivateKey> key;
+ unique_ptr<TokenSigningPrivateKey> key;
ASSERT_OK(signer.CheckNeedKey(&key));
ASSERT_NE(nullptr, key.get());
key_seq_num = key->key_seq_num();
@@ -523,9 +603,9 @@ TEST_F(TokenTest, TestExportKeys) {
// Test that the TokenVerifier can import keys exported by the TokenSigner
// and then verify tokens signed by it.
TEST_F(TokenTest, TestEndToEnd_Valid) {
- TokenSigner signer(10, 10);
+ TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 10);
{
- std::unique_ptr<TokenSigningPrivateKey> key;
+ unique_ptr<TokenSigningPrivateKey> key;
ASSERT_OK(signer.CheckNeedKey(&key));
ASSERT_NE(nullptr, key.get());
ASSERT_OK(signer.AddKey(std::move(key)));
@@ -546,9 +626,9 @@ TEST_F(TokenTest, TestEndToEnd_Valid) {
// See VerificationResult.
TEST_F(TokenTest, TestEndToEnd_InvalidCases) {
// Key rotation interval 0 allows adding 2 keys in a row with no delay.
- TokenSigner signer(10, 0);
+ TokenSigner signer(kTokenValiditySeconds, kTokenValiditySeconds, 0);
{
- std::unique_ptr<TokenSigningPrivateKey> key;
+ unique_ptr<TokenSigningPrivateKey> key;
ASSERT_OK(signer.CheckNeedKey(&key));
ASSERT_NE(nullptr, key.get());
ASSERT_OK(signer.AddKey(std::move(key)));
@@ -599,7 +679,7 @@ TEST_F(TokenTest, TestEndToEnd_InvalidCases) {
// verify, we expect the verifier to complain the key is unknown.
{
{
- std::unique_ptr<TokenSigningPrivateKey> key;
+ unique_ptr<TokenSigningPrivateKey> key;
ASSERT_OK(signer.CheckNeedKey(&key));
ASSERT_NE(nullptr, key.get());
ASSERT_OK(signer.AddKey(std::move(key)));
@@ -673,5 +753,131 @@ TEST_F(TokenTest, TestTokenVerifierImportKeys) {
}
}
+// Test using different token validity intervals.
+TEST_F(TokenTest, TestVaryingTokenValidityIntervals) {
+ constexpr int kShortValiditySeconds = 2;
+ const int kLongValiditySeconds = kShortValiditySeconds * 3;
+ auto verifier(make_shared<TokenVerifier>());
+ TokenSigner signer(kLongValiditySeconds, kShortValiditySeconds, 10, verifier);
+ unique_ptr<TokenSigningPrivateKey> key;
+ ASSERT_OK(signer.CheckNeedKey(&key));
+ ASSERT_NE(nullptr, key.get());
+ ASSERT_OK(signer.AddKey(std::move(key)));
+
+ const TablePrivilegePB table_privilege;
+ SignedTokenPB signed_authn;
+ SignedTokenPB signed_authz;
+ ASSERT_OK(signer.GenerateAuthnToken(kUser, &signed_authn));
+ ASSERT_OK(signer.GenerateAuthzToken(kUser, table_privilege, &signed_authz));
+ TokenPB authn_token;
+ TokenPB authz_token;
+ ASSERT_EQ(VerificationResult::VALID, verifier->VerifyTokenSignature(signed_authn, &authn_token));
+ ASSERT_EQ(VerificationResult::VALID, verifier->VerifyTokenSignature(signed_authz, &authz_token));
+
+ // Wait for the authz validity interval to pass and verify its expiration.
+ SleepFor(MonoDelta::FromSeconds(1 + kShortValiditySeconds));
+ EXPECT_EQ(VerificationResult::VALID, verifier->VerifyTokenSignature(signed_authn, &authn_token));
+ EXPECT_EQ(VerificationResult::EXPIRED_TOKEN,
+ verifier->VerifyTokenSignature(signed_authz, &authz_token));
+
+ // Wait for the authn validity interval to pass and verify its expiration.
+ SleepFor(MonoDelta::FromSeconds(kLongValiditySeconds - kShortValiditySeconds));
+ EXPECT_EQ(VerificationResult::EXPIRED_TOKEN,
+ verifier->VerifyTokenSignature(signed_authn, &authn_token));
+ EXPECT_EQ(VerificationResult::EXPIRED_TOKEN,
+ verifier->VerifyTokenSignature(signed_authz, &authz_token));
+}
+
+// Test to check the invariant that all tokens signed within a TSK's activity
+// interval must be expired by the end of the TSK's validity interval.
+TEST_F(TokenTest, TestKeyValidity) {
+ if (!AllowSlowTests()) {
+ LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+ return;
+ }
+ // Note: this test's runtime is roughly the length of a key-validity
+ // interval, which is determined by the token validity intervals and the key
+ // rotation interval.
+ const int kShortValiditySeconds = 2;
+ const int kLongValiditySeconds = 6;
+ const int kKeyRotationSeconds = 5;
+ auto verifier(make_shared<TokenVerifier>());
+ TokenSigner signer(kLongValiditySeconds, kShortValiditySeconds, kKeyRotationSeconds, verifier);
+ unique_ptr<TokenSigningPrivateKey> key;
+ ASSERT_OK(signer.CheckNeedKey(&key));
+ ASSERT_NE(nullptr, key.get());
+ ASSERT_OK(signer.AddKey(std::move(key)));
+
+ // First, start a countdown for the first TSK's validity interval. Any token
+ // signed during the first TSK's activity interval must be expired once this
+ // latch counts down.
+ vector<thread> threads;
+ CountDownLatch first_tsk_validity_latch(1);
+ const double key_validity_seconds = signer.key_validity_seconds_;
+ threads.emplace_back([&first_tsk_validity_latch, key_validity_seconds] {
+ SleepFor(MonoDelta::FromSeconds(key_validity_seconds));
+ LOG(INFO) << Substitute("First TSK's validity interval of $0 secs has finished!",
+ key_validity_seconds);
+ first_tsk_validity_latch.CountDown();
+ });
+
+ // Set up a second TSK so our threads can rotate TSKs when the time comes.
+ while (true) {
+ KLOG_EVERY_N_SECS(INFO, 1) << "Waiting for a second key...";
+ unique_ptr<TokenSigningPrivateKey> tsk;
+ ASSERT_OK(signer.CheckNeedKey(&tsk));
+ if (tsk) {
+ LOG(INFO) << "Added second key!";
+ ASSERT_OK(signer.AddKey(std::move(tsk)));
+ break;
+ }
+ SleepFor(MonoDelta::FromMilliseconds(50));
+ }
+
+ // Utility lambda to check that the token is expired.
+ const auto verify_expired = [&verifier] (const SignedTokenPB& signed_token,
+ const string& token_type) {
+ TokenPB token_pb;
+ const auto result = verifier->VerifyTokenSignature(signed_token, &token_pb);
+ const auto expire_secs = token_pb.expire_unix_epoch_seconds();
+ ASSERT_EQ(VerificationResult::EXPIRED_TOKEN, result)
+ << Substitute("validation result '$0': $1 token expires at $2, now $3",
+ VerificationResultToString(result), token_type,
+ expire_secs, WallTime_Now());
+ };
+
+ // Create a thread that repeatedly signs new authn tokens, returning the
+ // final one signed by TSK with seq_num 0. At the end of the key validity
+ // period, this token will not be valid.
+ vector<SignedTokenPB> tsks(2);
+ vector<Status> results(2);
+ threads.emplace_back([&] {
+ results[0] = SignUntilRotatePast(&signer,
+ [&] (SignedTokenPB* signed_token) {
+ return signer.GenerateAuthnToken(kUser, signed_token);
+ },
+ "authn", 0, &tsks[0]);
+ first_tsk_validity_latch.Wait();
+ });
+
+ // Do the same for authz tokens.
+ threads.emplace_back([&] {
+ results[1] = SignUntilRotatePast(&signer,
+ [&] (SignedTokenPB* signed_token) {
+ return signer.GenerateAuthzToken(kUser, TablePrivilegePB(), signed_token);
+ },
+ "authz", 0, &tsks[1]);
+ first_tsk_validity_latch.Wait();
+ });
+
+ for (auto& t : threads) {
+ t.join();
+ }
+ EXPECT_OK(results[0]);
+ EXPECT_OK(results[1]);
+ NO_FATALS(verify_expired(tsks[0], "authn"));
+ NO_FATALS(verify_expired(tsks[1], "authz"));
+}
+
} // namespace security
} // namespace kudu
diff --git a/be/src/kudu/security/token.proto b/be/src/kudu/security/token.proto
index e27ccdb..ea63ce2 100644
--- a/be/src/kudu/security/token.proto
+++ b/be/src/kudu/security/token.proto
@@ -21,11 +21,42 @@ option java_package = "org.apache.kudu.security";
import "kudu/util/pb_util.proto";
+message ColumnPrivilegePB {
+ // If set, the user has privileges to select and apply predicates on the
+ // column during scans.
+ optional bool scan_privilege = 1;
+};
+
+message TablePrivilegePB {
+ // The ID of the table to which the privileges apply.
+ optional string table_id = 1;
+
+ // If set, the user is authorized to select and apply predicates to all
+ // columns when scanning the table, and `column_privileges` is ignored. If
+ // unset, the user may only scan and apply predicates to columns with the
+ // privileges specified in `column_privileges`.
+ optional bool scan_privilege = 2;
+
+ // If set, the user is authorized to insert rows into the table.
+ optional bool insert_privilege= 3;
+
+ // If set, the user is authorized to update rows in the table.
+ optional bool update_privilege = 4;
+
+ // If set, the user is authorized to delete rows in the table.
+ optional bool delete_privilege = 5;
+
+ // Per-column privileges, indexed by column ID.
+ map<int32, ColumnPrivilegePB> column_privileges = 6;
+};
+
message AuthnTokenPB {
optional string username = 1;
};
message AuthzTokenPB {
+ optional string username = 1;
+ optional TablePrivilegePB table_privilege = 2;
};
message TokenPB {
diff --git a/be/src/kudu/security/token_signer.cc b/be/src/kudu/security/token_signer.cc
index 08c84be..aaf1be8 100644
--- a/be/src/kudu/security/token_signer.cc
+++ b/be/src/kudu/security/token_signer.cc
@@ -56,17 +56,21 @@ namespace kudu {
namespace security {
TokenSigner::TokenSigner(int64_t authn_token_validity_seconds,
+ int64_t authz_token_validity_seconds,
int64_t key_rotation_seconds,
shared_ptr<TokenVerifier> verifier)
: verifier_(verifier ? std::move(verifier)
: std::make_shared<TokenVerifier>()),
authn_token_validity_seconds_(authn_token_validity_seconds),
+ authz_token_validity_seconds_(authz_token_validity_seconds),
key_rotation_seconds_(key_rotation_seconds),
// The TSK propagation interval is equal to the rotation interval.
- key_validity_seconds_(2 * key_rotation_seconds_ + authn_token_validity_seconds_),
+ key_validity_seconds_(2 * key_rotation_seconds_ +
+ std::max(authn_token_validity_seconds_, authz_token_validity_seconds)),
last_key_seq_num_(-1) {
CHECK_GE(key_rotation_seconds_, 0);
CHECK_GE(authn_token_validity_seconds_, 0);
+ CHECK_GE(authz_token_validity_seconds_, 0);
CHECK(verifier_);
}
@@ -134,6 +138,27 @@ Status TokenSigner::ImportKeys(const vector<TokenSigningPrivateKeyPB>& keys) {
return Status::OK();
}
+Status TokenSigner::GenerateAuthzToken(string username,
+ TablePrivilegePB privilege,
+ SignedTokenPB* signed_token) const {
+ if (username.empty()) {
+ return Status::InvalidArgument("no username provided for authz token");
+ }
+ TokenPB token;
+ token.set_expire_unix_epoch_seconds(WallTime_Now() + authz_token_validity_seconds_);
+ AuthzTokenPB* authz = token.mutable_authz();
+ authz->set_username(std::move(username));
+ *authz->mutable_table_privilege() = std::move(privilege);
+
+ SignedTokenPB ret;
+ if (!token.SerializeToString(ret.mutable_token_data())) {
+ return Status::RuntimeError("could not serialize authz token");
+ }
+ RETURN_NOT_OK(SignToken(&ret));
+ *signed_token = std::move(ret);
+ return Status::OK();
+}
+
Status TokenSigner::GenerateAuthnToken(string username,
SignedTokenPB* signed_token) const {
if (username.empty()) {
@@ -162,7 +187,7 @@ Status TokenSigner::SignToken(SignedTokenPB* token) const {
return Status::IllegalState("no token signing key");
}
const TokenSigningPrivateKey* key = tsk_deque_.front().get();
- RETURN_NOT_OK_PREPEND(key->Sign(token), "could not sign authn token");
+ RETURN_NOT_OK_PREPEND(key->Sign(token), "could not sign token");
return Status::OK();
}
@@ -194,9 +219,10 @@ Status TokenSigner::CheckNeedKey(unique_ptr<TokenSigningPrivateKey>* tsk) const
// It's enough to have just one active key and next key ready to be
// activated when it's time to do so. However, it does not mean the
// process of key refreshment is about to stop once there are two keys
- // in the queue: the TryRotate() method (which should be called periodically
- // along with CheckNeedKey()/AddKey() pair) will eventually pop the
- // current key out of the keys queue once the key enters its inactive phase.
+ // in the queue: the TryRotateKey() method (which should be called
+ // periodically along with CheckNeedKey()/AddKey() pair) will eventually
+ // pop the current key out of the keys queue once the key enters its
+ // inactive phase.
tsk->reset();
return Status::OK();
}
diff --git a/be/src/kudu/security/token_signer.h b/be/src/kudu/security/token_signer.h
index df1e3eb..36f5a65 100644
--- a/be/src/kudu/security/token_signer.h
+++ b/be/src/kudu/security/token_signer.h
@@ -33,6 +33,7 @@ class Status;
namespace security {
class SignedTokenPB;
+class TablePrivilegePB;
class TokenSigningPrivateKey;
class TokenSigningPrivateKeyPB;
class TokenVerifier;
@@ -74,16 +75,18 @@ class TokenVerifier;
// '-' propagation interval
// The TSK is already created but not yet used to sign tokens. However,
// its public part is already being sent to the components which
-// may be involved in validation of tokens signed by the key.
+// may be involved in validation of tokens signed by the key. This is
+// exactly 'tsk_propagation_interval' below.
//
// 'A' activity interval
// The TSK is used to sign tokens. It's assumed that the components which
-// are involved in token verification have already received
-// the corresponding public part of the TSK.
+// are involved in token verification have already received the
+// corresponding public part of the TSK.
//
// '=' inactivity interval
-// The TSK is no longer used to sign tokens. However, it's still sent
-// to other components which validate token signatures.
+// The TSK is no longer used to sign tokens. However, its public part is
+// still sent to other components and can be used to validate token
+// signatures.
//
// Shortly after the TSK's expiration the token signing components stop
// propagating its public part.
@@ -98,8 +101,8 @@ class TokenVerifier;
// implications, so it's worth considering rolling twice at startup.
//
// For example, consider the following configuration for token signing keys:
-// validity period: 4 days
-// rotation interval: 1 days
+// key validity period: 4 days
+// rotation interval: 1 day
// propagation interval: 1 day
//
// Day 1 2 3 4 5 6 7 8
@@ -111,24 +114,34 @@ class TokenVerifier;
// ...............
// authn token: <**********>
//
-// 'A' indicates the 'Originator Usage Period' (a.k.a. 'Activity Interval'),
-// i.e. the period in which the key is being used to sign tokens.
+// 'A' indicates 'Activity Interval', i.e. the period during which the key is
+// being used to sign tokens. In cryptographic terms, this is the 'Originator
+// Usage Period'. Note that the Activity Interval is identically the rotation
+// interval -- a key is active for some amount of time, after which, we rotate.
//
-// '<...>' indicates the 'Recipient Usage Period': the period in which
-// the verifier may get tokens signed by the TSK and should consider them
-// for verification. The start of the recipient usage period is not crucial
-// in that regard, but the end of that period is -- after the TSK is expired,
-// a verifier should consider tokens signed by that TSK invalid and stop
-// accepting them even if the token signature is correct and the expiration.
+// '<...>' in cryptographic terms, indicates the 'Recipient Usage Period': the
+// period during which the public part of a key is used to sign tokens, and
+// verifiers will consider tokens signed by the given key as valid. At the end
+// of this period, a verifier should consider tokens signed by the given TSK
+// invalid and stop accepting them, even if the token signature is correct. The
+// start of the period is not crucial to the validity of a token, so we don't
+// perform verification for it.
//
// '<***>' indicates the validity interval for an authn token.
//
-// When configuring key rotation and authn token validity interval durations,
+// When configuring key rotation and token validity interval durations,
// consider the following constraint:
//
-// max_token_validity < tsk_validity_period -
+// Eq 1.
+// max_token_validity = tsk_validity_period -
// (tsk_propagation_interval + tsk_rotation_interval)
//
+// Note how if the validity period for a token created at the end of the
+// Activity Interval were to extend any farther than the above
+// 'max_token_validity', it would be considered valid beyond the end of the
+// 'tsk_validity_period', which would break the constraint that the token only
+// be valid within the TSK's validity period.
+//
// The idea is that the token validity interval should be contained in the
// corresponding TSK's validity interval. If the TSK is already expired at the
// time of token verification, the token is considered invalid and the
@@ -139,7 +152,7 @@ class TokenVerifier;
//
// * A TSK is issued at 00:00:00 on day 4.
// * An authn token generated and signed by current/active TSK at 23:59:59 on
-// day 6. That's at the very end of the TSK's activity interval.
+// day 5. That's at the very end of the TSK's activity interval.
// * From the diagram above it's clear that if the authn token validity
// interval were set to something longer than TSK inactivity interval
// (which is 2 days with for the specified parameters), an attempt to verify
@@ -179,23 +192,29 @@ class TokenVerifier;
//
class TokenSigner {
public:
- // The 'key_validity_seconds' and 'key_rotation_seconds' parameters define
- // the schedule of TSK rotation. See the class comment above for details.
+ // The token validity and 'key_rotation_seconds' parameters define the
+ // schedule of TSK rotation. See the class comment above for details.
//
// Any newly imported or generated keys are automatically imported into the
// passed 'verifier'. If no verifier passed as a parameter, TokenSigner
// creates one on its own. In either case, it's possible to access
// the embedded TokenVerifier instance using the verifier() accessor.
//
- // The 'authn_token_validity_seconds' parameter is used to specify validity
- // interval for the generated authn tokens and with 'key_rotation_seconds'
- // it defines validity interval of the newly generated TSK:
- // key_validity = 2 * key_rotation + authn_token_validity.
+ // The 'authn_token_validity_seconds' and 'authz_token_validity_seconds'
+ // parameters are used to specify validity intervals for the generated tokens
+ // and with 'key_rotation_seconds' it defines validity interval of the newly
+ // generated TSK:
+ //
+ // Eq 2.
+ // key_validity =
+ // 2 * key_rotation + max(authn_token_validity, authz_token_validity)
//
- // That corresponds to the maximum possible token lifetime for the effective
- // TSK validity and rotation intervals: see the class comment above for
- // details.
+ // This selects the 'max_token_validity' in Eq 1 as the higher of the authn
+ // and authz token validity intervals, and based on that, calculates the
+ // effective TSK validity period based on the provided rotation interval.
+ // See the above class comment for details.
TokenSigner(int64_t authn_token_validity_seconds,
+ int64_t authz_token_validity_seconds,
int64_t key_rotation_seconds,
std::shared_ptr<TokenVerifier> verifier = nullptr);
~TokenSigner();
@@ -266,6 +285,16 @@ class TokenSigner {
// See the class comment above for more information about the intended usage.
Status TryRotateKey(bool* has_rotated = nullptr) WARN_UNUSED_RESULT;
+ // Populates 'signed_token' with a signed authorization token with the given
+ // 'username' and table privilege. Returns an error if 'username' is empty,
+ // or if the created authn token could not be serialized for some reason.
+ Status GenerateAuthzToken(std::string username,
+ TablePrivilegePB privilege,
+ SignedTokenPB* signed_token) const WARN_UNUSED_RESULT;
+
+ // Populates 'signed_token' with a signed authentication token with the given
+ // 'username'. Returns an error if 'username' is empty, or if the created
+ // authz token could not be serialized for some reason.
Status GenerateAuthnToken(std::string username,
SignedTokenPB* signed_token) const WARN_UNUSED_RESULT;
@@ -279,6 +308,9 @@ class TokenSigner {
private:
FRIEND_TEST(TokenTest, TestEndToEnd_InvalidCases);
+ FRIEND_TEST(TokenTest, TestIsCurrentKeyValid);
+ FRIEND_TEST(TokenTest, TestTokenSignerAddKeyAfterImport);
+ FRIEND_TEST(TokenTest, TestKeyValidity);
static Status GenerateSigningKey(int64_t key_seq_num,
int64_t key_expiration,
@@ -286,8 +318,9 @@ class TokenSigner {
std::shared_ptr<TokenVerifier> verifier_;
- // Validity interval for the generated authn tokens.
+ // Validity intervals for the generated tokens.
const int64_t authn_token_validity_seconds_;
+ const int64_t authz_token_validity_seconds_;
// TSK rotation interval: number of seconds between consecutive activations
// of new token signing keys. Note that in current implementation it defines
diff --git a/be/src/kudu/security/token_verifier.cc b/be/src/kudu/security/token_verifier.cc
index 1ae20db..e3aef4c 100644
--- a/be/src/kudu/security/token_verifier.cc
+++ b/be/src/kudu/security/token_verifier.cc
@@ -123,7 +123,7 @@ VerificationResult TokenVerifier::VerifyTokenSignature(const SignedTokenPB& sign
for (auto flag : token->incompatible_features()) {
if (!TokenPB::Feature_IsValid(flag)) {
- KLOG_EVERY_N_SECS(WARNING, 60) << "received authentication token with unknown feature; "
+ KLOG_EVERY_N_SECS(WARNING, 60) << "received token with unknown feature; "
"server needs to be updated";
return VerificationResult::INCOMPATIBLE_FEATURE;
}
@@ -151,17 +151,17 @@ const char* VerificationResultToString(VerificationResult r) {
case security::VerificationResult::VALID:
return "valid";
case security::VerificationResult::INVALID_TOKEN:
- return "invalid authentication token";
+ return "invalid token";
case security::VerificationResult::INVALID_SIGNATURE:
- return "invalid authentication token signature";
+ return "invalid token signature";
case security::VerificationResult::EXPIRED_TOKEN:
- return "authentication token expired";
+ return "token expired";
case security::VerificationResult::EXPIRED_SIGNING_KEY:
- return "authentication token signing key expired";
+ return "token signing key expired";
case security::VerificationResult::UNKNOWN_SIGNING_KEY:
- return "authentication token signed with unknown key";
+ return "token signed with unknown key";
case security::VerificationResult::INCOMPATIBLE_FEATURE:
- return "authentication token uses incompatible feature";
+ return "token uses incompatible feature";
default:
LOG(FATAL) << "unexpected VerificationResult value: "
<< static_cast<int>(r);
diff --git a/be/src/kudu/util/test_util.cc b/be/src/kudu/util/test_util.cc
index c960441..c514d0e 100644
--- a/be/src/kudu/util/test_util.cc
+++ b/be/src/kudu/util/test_util.cc
@@ -36,6 +36,7 @@
#include <sys/param.h> // for MAXPATHLEN
#endif
+#include <boost/optional/optional.hpp>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest-spi.h>
@@ -43,6 +44,7 @@
#include "kudu/gutil/strings/numbers.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/strcat.h"
+#include "kudu/gutil/strings/stringpiece.h"
#include "kudu/gutil/strings/strip.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
@@ -54,6 +56,7 @@
#include "kudu/util/slice.h"
#include "kudu/util/spinlock_profiling.h"
#include "kudu/util/status.h"
+#include "kudu/util/string_case.h"
#include "kudu/util/subprocess.h"
DEFINE_string(test_leave_files, "on_failure",
@@ -62,6 +65,7 @@ DEFINE_string(test_leave_files, "on_failure",
DEFINE_int32(test_random_seed, 0, "Random seed to use for randomized tests");
+using boost::optional;
using std::string;
using std::vector;
using strings::Substitute;
@@ -378,7 +382,9 @@ int CountOpenFds(Env* env, const string& path_pattern) {
}
namespace {
-Status WaitForBind(pid_t pid, uint16_t* port, const char* kind, MonoDelta timeout) {
+Status WaitForBind(pid_t pid, uint16_t* port,
+ const optional<const string&>& addr,
+ const char* kind, MonoDelta timeout) {
// In general, processes do not expose the port they bind to, and
// reimplementing lsof involves parsing a lot of files in /proc/. So,
// requiring lsof for tests and parsing its output seems more
@@ -394,15 +400,52 @@ Status WaitForBind(pid_t pid, uint16_t* port, const char* kind, MonoDelta timeou
"-a", "-i", kind
};
+ // The '-Ffn' flag gets lsof to output something like:
+ // p5801
+ // f548
+ // n127.0.0.1:43954->127.0.0.1:43617
+ // f549
+ // n*:8038
+ //
+ // The first line is the pid. We ignore it.
+ // Subsequent lines come in pairs. In each pair, the first half of the pair
+ // is file descriptor number, we ignore it.
+ // The second half has the bind address and port.
+ //
+ // In this example, the first pair is an outbound TCP socket. We ignore it.
+ // The second pair is the listening TCP socket bind address and port.
+ //
+ // We use the first encountered listening TCP socket, since that's most likely
+ // to be the primary service port. When searching, we use the provided bind
+ // address if there is any, otherwise we use '*' (same as '0.0.0.0') which
+ // matches all addresses on the local machine.
+ string addr_pattern = Substitute("n$0:", (!addr || *addr == "0.0.0.0") ? "*" : *addr);
MonoTime deadline = MonoTime::Now() + timeout;
string lsof_out;
+ int32_t p = -1;
for (int64_t i = 1; ; i++) {
lsof_out.clear();
- Status s = Subprocess::Call(cmd, "", &lsof_out);
+ Status s = Subprocess::Call(cmd, "", &lsof_out).AndThen([&] () {
+ StripTrailingNewline(&lsof_out);
+ vector<string> lines = strings::Split(lsof_out, "\n");
+ for (int index = 2; index < lines.size(); index += 2) {
+ StringPiece cur_line(lines[index]);
+ if (HasPrefixString(cur_line.ToString(), addr_pattern) &&
+ !cur_line.contains("->")) {
+ cur_line.remove_prefix(addr_pattern.size());
+ if (!safe_strto32(cur_line.data(), cur_line.size(), &p)) {
+ return Status::RuntimeError("unexpected lsof output", lsof_out);
+ }
+
+ return Status::OK();
+ }
+ }
+
+ return Status::RuntimeError("unexpected lsof output", lsof_out);
+ });
if (s.ok()) {
- StripTrailingNewline(&lsof_out);
break;
}
if (deadline < MonoTime::Now()) {
@@ -412,22 +455,6 @@ Status WaitForBind(pid_t pid, uint16_t* port, const char* kind, MonoDelta timeou
SleepFor(MonoDelta::FromMilliseconds(i * 10));
}
- // The '-Ffn' flag gets lsof to output something like:
- // p19730
- // f123
- // n*:41254
- // The first line is the pid. We ignore it.
- // The second line is the file descriptor number. We ignore it.
- // The third line has the bind address and port.
- // Subsequent lines show active connections.
- vector<string> lines = strings::Split(lsof_out, "\n");
- int32_t p = -1;
- if (lines.size() < 3 ||
- lines[2].substr(0, 3) != "n*:" ||
- !safe_strto32(lines[2].substr(3), &p) ||
- p <= 0) {
- return Status::RuntimeError("unexpected lsof output", lsof_out);
- }
CHECK(p > 0 && p < std::numeric_limits<uint16_t>::max()) << "parsed invalid port: " << p;
VLOG(1) << "Determined bound port: " << p;
*port = p;
@@ -435,12 +462,31 @@ Status WaitForBind(pid_t pid, uint16_t* port, const char* kind, MonoDelta timeou
}
} // anonymous namespace
-Status WaitForTcpBind(pid_t pid, uint16_t* port, MonoDelta timeout) {
- return WaitForBind(pid, port, "4TCP", timeout);
+Status WaitForTcpBind(pid_t pid, uint16_t* port,
+ const optional<const string&>& addr,
+ MonoDelta timeout) {
+ return WaitForBind(pid, port, addr, "4TCP", timeout);
+}
+
+Status WaitForUdpBind(pid_t pid, uint16_t* port,
+ const optional<const string&>& addr,
+ MonoDelta timeout) {
+ return WaitForBind(pid, port, addr, "4UDP", timeout);
}
-Status WaitForUdpBind(pid_t pid, uint16_t* port, MonoDelta timeout) {
- return WaitForBind(pid, port, "4UDP", timeout);
+Status FindHomeDir(const string& name, const string& bin_dir, string* home_dir) {
+ string name_upper;
+ ToUpperCase(name, &name_upper);
+
+ string env_var = Substitute("$0_HOME", name_upper);
+ const char* env = std::getenv(env_var.c_str());
+ string dir = env == nullptr ? JoinPathSegments(bin_dir, Substitute("$0-home", name)) : env;
+
+ if (!Env::Default()->FileExists(dir)) {
+ return Status::NotFound(Substitute("$0 directory does not exist", env_var), dir);
+ }
+ *home_dir = dir;
+ return Status::OK();
}
} // namespace kudu
diff --git a/be/src/kudu/util/test_util.h b/be/src/kudu/util/test_util.h
index 8090fbc..d320e3e 100644
--- a/be/src/kudu/util/test_util.h
+++ b/be/src/kudu/util/test_util.h
@@ -26,11 +26,19 @@
#include <memory>
#include <string>
+#include <boost/optional/optional.hpp>
#include <gtest/gtest.h>
#include "kudu/gutil/port.h"
#include "kudu/util/monotime.h"
+#define SKIP_IF_SLOW_NOT_ALLOWED() do { \
+ if (!AllowSlowTests()) { \
+ LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; \
+ return; \
+ } \
+} while (0)
+
#define ASSERT_EVENTUALLY(expr) do { \
AssertEventually(expr); \
NO_PENDING_FATALS(); \
@@ -136,11 +144,26 @@ void AssertEventually(const std::function<void(void)>& f,
// unlike the usual behavior of path globs.
int CountOpenFds(Env* env, const std::string& path_pattern);
-// Waits for the subprocess to bind to any listening TCP port, and returns the port.
-Status WaitForTcpBind(pid_t pid, uint16_t* port, MonoDelta timeout) WARN_UNUSED_RESULT;
+// Waits for the subprocess to bind to any listening TCP port on the provided
+// IP address (if the address is not provided, it is a wildcard binding), and
+// returns the port.
+Status WaitForTcpBind(pid_t pid, uint16_t* port,
+ const boost::optional<const std::string&>& addr,
+ MonoDelta timeout) WARN_UNUSED_RESULT;
-// Waits for the subprocess to bind to any listening UDP port, and returns the port.
-Status WaitForUdpBind(pid_t pid, uint16_t* port, MonoDelta timeout) WARN_UNUSED_RESULT;
+// Similar to above but binds to any listening UDP port.
+Status WaitForUdpBind(pid_t pid, uint16_t* port,
+ const boost::optional<const std::string&>& addr,
+ MonoDelta timeout) WARN_UNUSED_RESULT;
+
+// Find the home directory of a Java-style application, e.g. JAVA_HOME or
+// HADOOP_HOME.
+//
+// Checks the environment, or falls back to a symlink in the bin installation
+// directory.
+Status FindHomeDir(const std::string& name,
+ const std::string& bin_dir,
+ std::string* home_dir) WARN_UNUSED_RESULT;
} // namespace kudu
#endif