You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2018/02/23 21:09:34 UTC
[1/2] kudu git commit: KUDU-2291 (part 8): fix a TSAN issue with
libunwind initialization
Repository: kudu
Updated Branches:
refs/heads/master 2a5a12fbd -> 57fe0c3db
KUDU-2291 (part 8): fix a TSAN issue with libunwind initialization
libunwind uses double-checked locking for initialization, which isn't
technically safe. We previously tried to work around this by calling into the
stack trace library before starting any kudu::Threads, but that still left us
open to races in unit tests like rw_mutex-test which uses std::thread.
This patch changes the forced single-threaded initialization to use GoogleOnce
instead.
Prior to this patch, looping rw_mutex-test on TSAN failed 12/1000 times.
With the patch it passed 1000/1000.
Change-Id: I522b6553e9cb9a30d7106ff55ad119f7df1f949c
Reviewed-on: http://gerrit.cloudera.org:8080/9409
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/20ba3c7b
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/20ba3c7b
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/20ba3c7b
Branch: refs/heads/master
Commit: 20ba3c7ba6c64c4a1521e7aab06a99a93cc01d45
Parents: 2a5a12f
Author: Todd Lipcon <to...@apache.org>
Authored: Thu Feb 22 16:59:38 2018 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Fri Feb 23 20:42:19 2018 +0000
----------------------------------------------------------------------
src/kudu/util/debug-util.cc | 26 ++++++++++++++++++++++++++
src/kudu/util/thread.cc | 5 -----
2 files changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/20ba3c7b/src/kudu/util/debug-util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/debug-util.cc b/src/kudu/util/debug-util.cc
index 0af7abe..5a142bc 100644
--- a/src/kudu/util/debug-util.cc
+++ b/src/kudu/util/debug-util.cc
@@ -44,10 +44,12 @@
#include <libunwind.h>
#endif
+#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/dynamic_annotations.h"
#include "kudu/gutil/hash/city.h"
#include "kudu/gutil/linux_syscall_support.h"
#include "kudu/gutil/macros.h"
+#include "kudu/gutil/once.h"
#include "kudu/gutil/spinlock.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/numbers.h"
@@ -305,6 +307,19 @@ bool InitSignalHandlerUnlocked(int signum) {
return state == INITIALIZED;
}
+#ifdef __linux__
+GoogleOnceType g_prime_libunwind_once;
+
+void PrimeLibunwind() {
+ // The first call into libunwind does some unsafe double-checked locking
+ // for initialization. So, we make sure that the first call is not concurrent
+ // with any other call.
+ unw_cursor_t cursor;
+ unw_context_t uc;
+ unw_getcontext(&uc);
+ RAW_CHECK(unw_init_local(&cursor, &uc) >= 0, "unw_init_local failed");
+}
+#endif
} // anonymous namespace
Status SetStackTraceSignal(int signum) {
@@ -392,6 +407,15 @@ Status StackTraceCollector::TriggerAsync(int64_t tid, StackTrace* stack) {
return Status::NotSupported("unable to take thread stack: signal handler unavailable");
}
}
+ // Ensure that libunwind is primed for use before we send any signals. Otherwise
+ // we can hit a deadlock with the following stack:
+ // GoogleOnceInit() [waits on the 'once' to finish, but will never finish]
+ // StackTrace::Collect()
+ // <signal handler>
+ // PrimeLibUnwind
+ // GoogleOnceInit() [not yet initted, so starts initializing]
+ // StackTrace::Collect()
+ GoogleOnceInit(&g_prime_libunwind_once, &PrimeLibunwind);
std::unique_ptr<SignalData> data(new SignalData());
// Set the target TID in our communication structure, so if we end up with any
@@ -545,6 +569,8 @@ void StackTrace::Collect(int skip_frames) {
const int kMaxDepth = arraysize(frames_);
#ifdef __linux__
+ GoogleOnceInit(&g_prime_libunwind_once, &PrimeLibunwind);
+
unw_cursor_t cursor;
unw_context_t uc;
unw_getcontext(&uc);
http://git-wip-us.apache.org/repos/asf/kudu/blob/20ba3c7b/src/kudu/util/thread.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/thread.cc b/src/kudu/util/thread.cc
index b5db351..3275e6c 100644
--- a/src/kudu/util/thread.cc
+++ b/src/kudu/util/thread.cc
@@ -41,7 +41,6 @@
#include <glog/logging.h>
#include "kudu/gutil/atomicops.h"
-#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/dynamic_annotations.h"
@@ -49,7 +48,6 @@
#include "kudu/gutil/once.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/debug-util.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/kernel_stack_watchdog.h"
#include "kudu/util/logging.h"
@@ -407,9 +405,6 @@ void ThreadMgr::ThreadPathHandler(const WebCallbackRegistry::WebRequest& req,
}
static void InitThreading() {
- // Warm up the stack trace library. This avoids a race in libunwind initialization
- // by making sure we initialize it before we start any other threads.
- ignore_result(GetStackTraceHex());
thread_manager.reset(new ThreadMgr());
}
[2/2] kudu git commit: KUDU-2191 (5/n): Add Kerberos SASL support to
the HMS client
Posted by da...@apache.org.
KUDU-2191 (5/n): Add Kerberos SASL support to the HMS client
The bulk of this commit is adding a new Thrift transport type,
SaslClientTransport, which facilitates SASL GSSAPI negotiation, as well
as integrity/privacy channel protection. The new transport is based on
Impala's version with some significant changes:
- Impala has a client and server SASL transport, necessitating a common
superclass (SaslTransport). Since we only need a client transport, I
collapsed all of the logic into a single class, which I think makes the
code easier to follow.
- The transport uses Kudu helper types where possible, e.g., faststring
buffers, and our existing SASL utility infrastructure.
- Integrity and privacy channel protection are implemented.
There are no standlone unit-tests for the transport, since that would
require implementing the server-specific counterpart. Instead, the class
is tested indirectly through using the HMS client to communicate with a
Kerberos-enabled HMS instance.
Change-Id: I8f217ae05fd36c8ee88fe20eeccd73d49233a345
Reviewed-on: http://gerrit.cloudera.org:8080/8692
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/57fe0c3d
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/57fe0c3d
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/57fe0c3d
Branch: refs/heads/master
Commit: 57fe0c3db086c9fc61fbcef9b2d879422b387a7e
Parents: 20ba3c7
Author: Dan Burkert <da...@apache.org>
Authored: Tue Nov 14 18:14:06 2017 -0800
Committer: Dan Burkert <da...@apache.org>
Committed: Fri Feb 23 21:09:17 2018 +0000
----------------------------------------------------------------------
src/kudu/hms/CMakeLists.txt | 7 +-
src/kudu/hms/hms_client-test.cc | 119 +++++-
src/kudu/hms/hms_client.cc | 54 ++-
src/kudu/hms/hms_client.h | 15 +-
src/kudu/hms/mini_hms.cc | 91 ++++-
src/kudu/hms/mini_hms.h | 16 +
src/kudu/hms/sasl_client_transport.cc | 402 +++++++++++++++++++
src/kudu/hms/sasl_client_transport.h | 176 ++++++++
.../mini-cluster/external_mini_cluster-test.cc | 25 +-
src/kudu/mini-cluster/external_mini_cluster.cc | 11 +
src/kudu/rpc/client_negotiation.cc | 8 +-
src/kudu/rpc/sasl_common.cc | 89 ++--
src/kudu/rpc/sasl_common.h | 55 ++-
src/kudu/rpc/server_negotiation.cc | 9 +-
14 files changed, 994 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/57fe0c3d/src/kudu/hms/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/hms/CMakeLists.txt b/src/kudu/hms/CMakeLists.txt
index e449047..f50eead 100644
--- a/src/kudu/hms/CMakeLists.txt
+++ b/src/kudu/hms/CMakeLists.txt
@@ -24,10 +24,13 @@ target_link_libraries(hms_thrift thrift)
add_dependencies(hms_thrift ${HMS_THRIFT_TGTS})
set(HMS_SRCS
- hms_client.cc)
+ hms_client.cc
+ sasl_client_transport.cc)
set(HMS_DEPS
+ gflags
glog
hms_thrift
+ krpc
kudu_util)
add_library(kudu_hms ${HMS_SRCS})
@@ -62,6 +65,7 @@ set(MINI_HMS_SRCS
add_library(mini_hms ${MINI_HMS_SRCS})
target_link_libraries(mini_hms
gutil
+ krpc
kudu_test_util
kudu_util)
add_dependencies(mini_hms hms-plugin)
@@ -71,6 +75,7 @@ if (NOT NO_TESTS)
set(KUDU_TEST_LINK_LIBS
kudu_hms
mini_hms
+ mini_kdc
${KUDU_MIN_TEST_LIBS})
# This test has to run serially since otherwise starting the HMS can take a very
http://git-wip-us.apache.org/repos/asf/kudu/blob/57fe0c3d/src/kudu/hms/hms_client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_client-test.cc b/src/kudu/hms/hms_client-test.cc
index 9ec0f4c..87601eb 100644
--- a/src/kudu/hms/hms_client-test.cc
+++ b/src/kudu/hms/hms_client-test.cc
@@ -22,12 +22,15 @@
#include <utility>
#include <vector>
+#include <boost/optional/optional.hpp>
#include <glog/stl_logging.h> // IWYU pragma: keep
#include <gtest/gtest.h>
#include "kudu/hms/hive_metastore_constants.h"
#include "kudu/hms/hive_metastore_types.h"
#include "kudu/hms/mini_hms.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/security/test/mini_kdc.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
@@ -36,6 +39,8 @@
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
+using boost::optional;
+using kudu::rpc::SaslProtection;
using std::make_pair;
using std::string;
using std::vector;
@@ -43,7 +48,8 @@ using std::vector;
namespace kudu {
namespace hms {
-class HmsClientTest : public KuduTest {
+class HmsClientTest : public KuduTest,
+ public ::testing::WithParamInterface<optional<SaslProtection::Type>> {
public:
Status CreateTable(HmsClient* client,
@@ -75,11 +81,47 @@ class HmsClientTest : public KuduTest {
}
};
-TEST_F(HmsClientTest, TestHmsOperations) {
+INSTANTIATE_TEST_CASE_P(ProtectionTypes,
+ HmsClientTest,
+ ::testing::Values(boost::none
+ , SaslProtection::kIntegrity
+// On macos, krb5 has issues repeatedly spinning up new KDCs ('unable to reach
+// any KDC in realm KRBTEST.COM, tried 1 KDC'). Integrity protection gives us
+// good coverage, so we disable the other variants.
+#ifndef __APPLE__
+ , SaslProtection::kAuthentication
+ , SaslProtection::kPrivacy
+#endif
+ ));
+
+TEST_P(HmsClientTest, TestHmsOperations) {
+ optional<SaslProtection::Type> protection = GetParam();
+ MiniKdc kdc;
MiniHms hms;
+ HmsClientOptions hms_client_opts;
+
+ if (protection) {
+ ASSERT_OK(kdc.Start());
+
+ string spn = "hive/127.0.0.1";
+ string ktpath;
+ ASSERT_OK(kdc.CreateServiceKeytab(spn, &ktpath));
+
+ ASSERT_OK(rpc::SaslInit());
+ hms.EnableKerberos(kdc.GetEnvVars()["KRB5_CONFIG"],
+ spn,
+ ktpath,
+ *protection);
+
+ ASSERT_OK(kdc.CreateUserPrincipal("alice"));
+ ASSERT_OK(kdc.Kinit("alice"));
+ ASSERT_OK(kdc.SetKrb5Environment());
+ hms_client_opts.enable_kerberos = true;
+ }
+
ASSERT_OK(hms.Start());
- HmsClient client(hms.address(), HmsClientOptions());
+ HmsClient client(hms.address(), hms_client_opts);
ASSERT_OK(client.Start());
// Create a database.
@@ -200,6 +242,77 @@ TEST_F(HmsClientTest, TestHmsOperations) {
ASSERT_OK(client.Stop());
}
+TEST_P(HmsClientTest, TestLargeObjects) {
+ optional<SaslProtection::Type> protection = GetParam();
+
+ MiniKdc kdc;
+ MiniHms hms;
+ HmsClientOptions hms_client_opts;
+
+ if (protection) {
+ ASSERT_OK(kdc.Start());
+
+ string spn = "hive/127.0.0.1";
+ string ktpath;
+ ASSERT_OK(kdc.CreateServiceKeytab(spn, &ktpath));
+
+ ASSERT_OK(rpc::SaslInit());
+ hms.EnableKerberos(kdc.GetEnvVars()["KRB5_CONFIG"],
+ spn,
+ ktpath,
+ *protection);
+
+ ASSERT_OK(kdc.CreateUserPrincipal("alice"));
+ ASSERT_OK(kdc.Kinit("alice"));
+ ASSERT_OK(kdc.SetKrb5Environment());
+ hms_client_opts.enable_kerberos = true;
+ }
+
+ ASSERT_OK(hms.Start());
+
+ HmsClient client(hms.address(), hms_client_opts);
+ ASSERT_OK(client.Start());
+
+ string database_name = "default";
+ string table_name = "big_table";
+
+ hive::Table table;
+ table.dbName = database_name;
+ table.tableName = table_name;
+ table.tableType = "MANAGED_TABLE";
+ hive::FieldSchema partition_key;
+ partition_key.name = "c1";
+ partition_key.type = "int";
+ table.partitionKeys.emplace_back(std::move(partition_key));
+
+ ASSERT_OK(client.CreateTable(table));
+
+ // Add a bunch of partitions to the table. This ensures we can send and
+ // receive really large thrift objects. We have to add the partitions in small
+ // batches, otherwise Derby chokes.
+ const int kBatches = 25;
+ const int kPartitionsPerBatch = 40;
+
+ for (int batch_idx = 0; batch_idx < kBatches; batch_idx++) {
+ vector<hive::Partition> partitions;
+ for (int partition_idx = 0; partition_idx < kPartitionsPerBatch; partition_idx++) {
+ hive::Partition partition;
+ partition.dbName = database_name;
+ partition.tableName = table_name;
+ partition.values = { std::to_string(batch_idx * kPartitionsPerBatch + partition_idx) };
+ partitions.emplace_back(std::move(partition));
+ }
+
+ ASSERT_OK(client.AddPartitions(database_name, table_name, std::move(partitions)));
+ }
+
+ ASSERT_OK(client.GetTable(database_name, table_name, &table));
+
+ vector<hive::Partition> partitions;
+ ASSERT_OK(client.GetPartitions(database_name, table_name, &partitions));
+ ASSERT_EQ(kBatches * kPartitionsPerBatch, partitions.size());
+}
+
TEST_F(HmsClientTest, TestHmsFaultHandling) {
MiniHms hms;
ASSERT_OK(hms.Start());
http://git-wip-us.apache.org/repos/asf/kudu/blob/57fe0c3d/src/kudu/hms/hms_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_client.cc b/src/kudu/hms/hms_client.cc
index c2c2a67..6e2f4c6 100644
--- a/src/kudu/hms/hms_client.cc
+++ b/src/kudu/hms/hms_client.cc
@@ -37,16 +37,29 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/hms/ThriftHiveMetastore.h"
#include "kudu/hms/hive_metastore_constants.h"
+#include "kudu/hms/sasl_client_transport.h"
+#include "kudu/util/flag_tags.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
+// Default to 100 MiB to match Thrift TSaslTransport.receiveSaslMessage and the
+// HMS metastore.server.max.message.size config.
+DEFINE_int32(hms_client_max_buf_size, 100 * 1024 * 1024,
+ "Maximum size of Hive MetaStore objects that can be received by the "
+ "HMS client in bytes.");
+TAG_FLAG(hms_client_max_buf_size, experimental);
+// Note: despite being marked as a runtime flag, the new buf size value will
+// only take effect for new HMS clients.
+TAG_FLAG(hms_client_max_buf_size, runtime);
+
using apache::thrift::TException;
using apache::thrift::protocol::TBinaryProtocol;
using apache::thrift::protocol::TJSONProtocol;
using apache::thrift::transport::TBufferedTransport;
using apache::thrift::transport::TMemoryBuffer;
using apache::thrift::transport::TSocket;
+using apache::thrift::transport::TTransport;
using apache::thrift::transport::TTransportException;
using std::make_shared;
using std::shared_ptr;
@@ -76,6 +89,8 @@ namespace hms {
return Status::IllegalState((msg), e.what()); \
} catch (const hive::MetaException& e) { \
return Status::RemoteError((msg), e.what()); \
+ } catch (const SaslException& e) { \
+ return e.status().CloneAndPrepend((msg)); \
} catch (const TTransportException& e) { \
switch (e.getType()) { \
case TTransportException::TIMED_OUT: return Status::TimedOut((msg), e.what()); \
@@ -85,6 +100,8 @@ namespace hms {
} \
} catch (const TException& e) { \
return Status::IOError((msg), e.what()); \
+ } catch (const std::exception& e) { \
+ return Status::RuntimeError((msg), e.what()); \
}
const char* const HmsClient::kKuduTableIdKey = "kudu.table_id";
@@ -122,7 +139,16 @@ HmsClient::HmsClient(const HostPort& hms_address, const HmsClientOptions& option
socket->setSendTimeout(options.send_timeout.ToMilliseconds());
socket->setRecvTimeout(options.recv_timeout.ToMilliseconds());
socket->setConnTimeout(options.conn_timeout.ToMilliseconds());
- auto transport = make_shared<TBufferedTransport>(std::move(socket));
+ shared_ptr<TTransport> transport;
+
+ if (options.enable_kerberos) {
+ transport = make_shared<SaslClientTransport>(hms_address.host(),
+ std::move(socket),
+ FLAGS_hms_client_max_buf_size);
+ } else {
+ transport = make_shared<TBufferedTransport>(std::move(socket));
+ }
+
auto protocol = make_shared<TBinaryProtocol>(std::move(transport));
client_ = hive::ThriftHiveMetastoreClient(std::move(protocol));
}
@@ -267,6 +293,32 @@ Status HmsClient::GetNotificationEvents(int64_t last_event_id,
return Status::OK();
}
+Status HmsClient::AddPartitions(const string& database_name,
+ const string& table_name,
+ vector<hive::Partition> partitions) {
+ SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "add HMS table partitions");
+ hive::AddPartitionsRequest request;
+ hive::AddPartitionsResult response;
+
+ request.dbName = database_name;
+ request.tblName = table_name;
+ request.parts = std::move(partitions);
+
+ HMS_RET_NOT_OK(client_.add_partitions_req(response, request),
+ "failed to add Hive MetaStore table partitions");
+ return Status::OK();
+}
+
+Status HmsClient::GetPartitions(const string& database_name,
+ const string& table_name,
+ vector<hive::Partition>* partitions) {
+ SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "get HMS table partitions");
+ HMS_RET_NOT_OK(client_.get_partitions(*partitions, database_name, table_name, -1),
+ "failed to get Hive Metastore table partitions");
+ return Status::OK();
+}
+
+
Status HmsClient::DeserializeJsonTable(Slice json, hive::Table* table) {
shared_ptr<TMemoryBuffer> membuffer(new TMemoryBuffer(json.size()));
membuffer->write(json.data(), json.size());
http://git-wip-us.apache.org/repos/asf/kudu/blob/57fe0c3d/src/kudu/hms/hms_client.h
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_client.h b/src/kudu/hms/hms_client.h
index 0ce17f9..a71fcf6 100644
--- a/src/kudu/hms/hms_client.h
+++ b/src/kudu/hms/hms_client.h
@@ -52,6 +52,9 @@ struct HmsClientOptions {
// Thrift socket connect timeout.
MonoDelta conn_timeout = MonoDelta::FromSeconds(60);
+
+ // Whether to use SASL Kerberos authentication when connecting to the HMS.
+ bool enable_kerberos = false;
};
// A client for the Hive MetaStore.
@@ -76,8 +79,6 @@ struct HmsClientOptions {
// handling connection retries, because the higher-level construct which is
// handling HA deployments will naturally want to retry across HMS instances as
// opposed to retrying repeatedly on a single instance.
-//
-// TODO(dan): this client does not yet handle Kerberized HMS deployments.
class HmsClient {
public:
@@ -160,6 +161,16 @@ class HmsClient {
int32_t max_events,
std::vector<hive::NotificationEvent>* events) WARN_UNUSED_RESULT;
+ // Adds partitions to an HMS table.
+ Status AddPartitions(const std::string& database_name,
+ const std::string& table_name,
+ std::vector<hive::Partition> partitions) WARN_UNUSED_RESULT;
+
+ // Retrieves the partitions of an HMS table.
+ Status GetPartitions(const std::string& database_name,
+ const std::string& table_name,
+ std::vector<hive::Partition>* partitions) WARN_UNUSED_RESULT;
+
// Deserializes a JSON encoded table.
//
// Notification event log messages often include table objects serialized as
http://git-wip-us.apache.org/repos/asf/kudu/blob/57fe0c3d/src/kudu/hms/mini_hms.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/mini_hms.cc b/src/kudu/hms/mini_hms.cc
index c7de53d..078341d 100644
--- a/src/kudu/hms/mini_hms.cc
+++ b/src/kudu/hms/mini_hms.cc
@@ -38,6 +38,7 @@
#include "kudu/util/subprocess.h"
#include "kudu/util/test_util.h"
+using kudu::rpc::SaslProtection;
using std::map;
using std::string;
using std::unique_ptr;
@@ -52,8 +53,21 @@ MiniHms::~MiniHms() {
WARN_NOT_OK(Stop(), "Failed to stop MiniHms");
}
-namespace {
+void MiniHms::EnableKerberos(string krb5_conf,
+ string service_principal,
+ string keytab_file,
+ SaslProtection::Type protection) {
+ CHECK(!hms_process_);
+ CHECK(!krb5_conf.empty());
+ CHECK(!service_principal.empty());
+ CHECK(!keytab_file.empty());
+ krb5_conf_ = std::move(krb5_conf);
+ service_principal_ = std::move(service_principal);
+ keytab_file_ = std::move(keytab_file);
+ protection_ = protection;
+}
+namespace {
Status FindHomeDir(const char* name, const string& bin_dir, string* home_dir) {
string name_upper;
ToUpperCase(name, &name_upper);
@@ -67,7 +81,6 @@ Status FindHomeDir(const char* name, const string& bin_dir, string* home_dir) {
}
return Status::OK();
}
-
} // anonymous namespace
Status MiniHms::Start() {
@@ -92,6 +105,7 @@ Status MiniHms::Start() {
auto tmp_dir = GetTestDataDirectory();
RETURN_NOT_OK(CreateHiveSite(tmp_dir));
+ RETURN_NOT_OK(CreateCoreSite(tmp_dir));
// Comma-separated list of additional jars to add to the HMS classpath.
string aux_jars = Substitute("$0/hms-plugin.jar", bin_dir);
@@ -101,7 +115,11 @@ Status MiniHms::Start() {
{ "HIVE_AUX_JARS_PATH", aux_jars },
{ "HIVE_CONF_DIR", tmp_dir },
{ "JAVA_TOOL_OPTIONS", "-Dhive.log.level=WARN -Dhive.root.logger=console" },
+ { "HADOOP_CONF_DIR", tmp_dir },
};
+ if (!krb5_conf_.empty()) {
+ env_vars["JAVA_TOOL_OPTIONS"] += Substitute(" -Djava.security.krb5.conf=$0", krb5_conf_);
+ }
// Start the HMS.
hms_process_.reset(new Subprocess({
@@ -150,10 +168,19 @@ Status MiniHms::Resume() {
}
Status MiniHms::CreateHiveSite(const string& tmp_dir) const {
- // 'datanucleus.schema.autoCreateAll' and 'hive.metastore.schema.verification'
- // allow Hive to startup and run without first running the schemaTool.
- // 'hive.metastore.event.db.listener.timetolive' configures how long the
- // Metastore will store notification log events before GCing them.
+
+ // - datanucleus.schema.autoCreateAll
+ // - hive.metastore.schema.verification
+ // Allow Hive to startup and run without first running the schemaTool.
+ //
+ // - hive.metastore.event.db.listener.timetolive
+ // Configures how long the Metastore will store notification log events
+ // before GCing them.
+ //
+ // - hive.metastore.sasl.enabled
+ // - hive.metastore.kerberos.keytab.file
+ // - hive.metastore.kerberos.principal
+ // Configures the HMS to use Kerberos for its Thrift RPC interface.
static const string kFileTemplate = R"(
<configuration>
<property>
@@ -188,17 +215,67 @@ Status MiniHms::CreateHiveSite(const string& tmp_dir) const {
<name>hive.metastore.event.db.listener.timetolive</name>
<value>$0s</value>
</property>
+
+ <property>
+ <name>hive.metastore.sasl.enabled</name>
+ <value>$2</value>
+ </property>
+
+ <property>
+ <name>hive.metastore.kerberos.keytab.file</name>
+ <value>$3</value>
+ </property>
+
+ <property>
+ <name>hive.metastore.kerberos.principal</name>
+ <value>$4</value>
+ </property>
+
+ <property>
+ <name>hadoop.rpc.protection</name>
+ <value>$5</value>
+ </property>
</configuration>
)";
string file_contents = strings::Substitute(kFileTemplate,
notification_log_ttl_.ToSeconds(),
- tmp_dir);
+ tmp_dir,
+ !keytab_file_.empty(),
+ keytab_file_,
+ service_principal_,
+ SaslProtection::name_of(protection_));
return WriteStringToFile(Env::Default(),
file_contents,
JoinPathSegments(tmp_dir, "hive-site.xml"));
}
+Status MiniHms::CreateCoreSite(const string& tmp_dir) const {
+
+ // - hadoop.security.authentication
+ // The HMS uses Hadoop's UGI contraption which will refuse to login a user
+ // with Kerberos unless this special property is set. The property must
+ // not be in hive-site.xml because a new Configuration object is created
+ // to search for the property, and it only checks places Hadoop knows
+ // about.
+
+ static const string kFileTemplate = R"(
+<configuration>
+ <property>
+ <name>hadoop.security.authentication</name>
+ <value>$0</value>
+ </property>
+</configuration>
+ )";
+
+ string file_contents = strings::Substitute(kFileTemplate,
+ keytab_file_.empty() ? "simple" : "kerberos");
+
+ return WriteStringToFile(Env::Default(),
+ file_contents,
+ JoinPathSegments(tmp_dir, "core-site.xml"));
+}
+
} // namespace hms
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/57fe0c3d/src/kudu/hms/mini_hms.h
----------------------------------------------------------------------
diff --git a/src/kudu/hms/mini_hms.h b/src/kudu/hms/mini_hms.h
index 4bef966..52f480b 100644
--- a/src/kudu/hms/mini_hms.h
+++ b/src/kudu/hms/mini_hms.h
@@ -24,6 +24,7 @@
#include <glog/logging.h>
#include "kudu/gutil/port.h"
+#include "kudu/rpc/sasl_common.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/status.h"
@@ -45,6 +46,12 @@ class MiniHms {
notification_log_ttl_ = ttl;
}
+ // Configures the mini HMS to use Kerberos.
+ void EnableKerberos(std::string krb5_conf,
+ std::string service_principal,
+ std::string keytab_file,
+ rpc::SaslProtection::Type protection);
+
// Starts the mini Hive metastore.
//
// If the MiniHms has already been started and stopped, it will be restarted
@@ -71,12 +78,21 @@ class MiniHms {
// Creates a hive-site.xml for the mini HMS.
Status CreateHiveSite(const std::string& tmp_dir) const WARN_UNUSED_RESULT;
+ // Creates a core-site.xml for the mini HMS.
+ Status CreateCoreSite(const std::string& tmp_dir) const WARN_UNUSED_RESULT;
+
// Waits for the metastore process to bind to a port.
Status WaitForHmsPorts() WARN_UNUSED_RESULT;
std::unique_ptr<Subprocess> hms_process_;
MonoDelta notification_log_ttl_ = MonoDelta::FromSeconds(86400);
uint16_t port_ = 0;
+
+ // Kerberos configuration
+ std::string krb5_conf_;
+ std::string service_principal_;
+ std::string keytab_file_;
+ rpc::SaslProtection::Type protection_ = rpc::SaslProtection::kAuthentication;
};
} // namespace hms
http://git-wip-us.apache.org/repos/asf/kudu/blob/57fe0c3d/src/kudu/hms/sasl_client_transport.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/sasl_client_transport.cc b/src/kudu/hms/sasl_client_transport.cc
new file mode 100644
index 0000000..95d1222
--- /dev/null
+++ b/src/kudu/hms/sasl_client_transport.cc
@@ -0,0 +1,402 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/hms/sasl_client_transport.h"
+
+#include <algorithm>
+#include <cstring>
+#include <limits>
+#include <memory>
+#include <ostream>
+#include <string>
+
+#include <glog/logging.h>
+#include <thrift/transport/TTransport.h>
+
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/human_readable.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/sasl_helper.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+using apache::thrift::transport::TTransportException;
+using std::shared_ptr;
+using std::string;
+using strings::Substitute;
+
+namespace kudu {
+
+using rpc::SaslMechanism;
+using rpc::WrapSaslCall;
+
+namespace hms {
+
+namespace {
+
+// SASL negotiation frames are sent with an 8-bit status and a 32-bit length.
+const uint32_t kSaslHeaderSize = sizeof(uint8_t) + sizeof(uint32_t);
+
+// Frame headers consist of a 32-bit length.
+const uint32_t kFrameHeaderSize = sizeof(uint32_t);
+
+// SASL SASL_CB_GETOPT callback function.
+int GetoptCb(SaslClientTransport* client_transport,
+ const char* plugin_name,
+ const char* option,
+ const char** result,
+ unsigned* len) {
+ return client_transport->GetOptionCb(plugin_name, option, result, len);
+}
+
+// SASL SASL_CB_CANON_USER callback function.
+int CanonUserCb(sasl_conn_t* /*conn*/,
+ void* /*context*/,
+ const char* in, unsigned inlen,
+ unsigned /*flags*/,
+ const char* /*user_realm*/,
+ char* out, unsigned out_max, unsigned* out_len) {
+ CHECK_LE(inlen, out_max);
+ memcpy(out, in, inlen);
+ *out_len = inlen;
+ return SASL_OK;
+}
+
+// SASL SASL_CB_USER callback function.
+int UserCb(void* /*context*/, int id, const char** result, unsigned* len) {
+ CHECK_EQ(SASL_CB_USER, id);
+
+ // Setting the username to the empty string causes the remote end to use the
+ // clients Kerberos principal, which is correct.
+ *result = "";
+ if (len != nullptr) *len = 0;
+ return SASL_OK;
+}
+} // anonymous namespace
+
+SaslClientTransport::SaslClientTransport(const string& server_fqdn,
+ shared_ptr<TTransport> transport,
+ size_t max_recv_buf_size)
+ : transport_(std::move(transport)),
+ sasl_helper_(rpc::SaslHelper::CLIENT),
+ sasl_callbacks_({
+ rpc::SaslBuildCallback(SASL_CB_GETOPT, reinterpret_cast<int (*)()>(&GetoptCb), this),
+ rpc::SaslBuildCallback(SASL_CB_CANON_USER,
+ reinterpret_cast<int (*)()>(&CanonUserCb),
+ this),
+ rpc::SaslBuildCallback(SASL_CB_USER, reinterpret_cast<int (*)()>(&UserCb), nullptr),
+ rpc::SaslBuildCallback(SASL_CB_LIST_END, nullptr, nullptr)
+ }),
+ needs_wrap_(false),
+ max_recv_buf_size_(max_recv_buf_size),
+ // Set a reasonable max send buffer size for negotiation. Once negotiation
+ // is complete the negotiated value will be used.
+ max_send_buf_size_(64 * 1024) {
+ sasl_helper_.set_server_fqdn(server_fqdn);
+ sasl_helper_.EnableGSSAPI();
+ ResetWriteBuf();
+}
+
+bool SaslClientTransport::isOpen() {
+ return transport_->isOpen();
+}
+
+bool SaslClientTransport::peek() {
+ return !read_slice_.empty() || transport_->peek();
+}
+
+void SaslClientTransport::open() {
+ transport_->open();
+ DCHECK(transport_->isOpen());
+ try {
+ Negotiate();
+ } catch (...) {
+ transport_->close();
+ throw;
+ }
+}
+
+void SaslClientTransport::close() {
+ transport_->close();
+ sasl_conn_.reset();
+}
+
+void SaslClientTransport::ReadFrame() {
+ DCHECK_EQ(0, read_buf_.size());
+ DCHECK(read_slice_.empty());
+
+ uint8_t payload_len_buf[kFrameHeaderSize];
+ transport_->readAll(payload_len_buf, kFrameHeaderSize);
+ size_t payload_len = NetworkByteOrder::Load32(payload_len_buf);
+
+ if (payload_len > 1024 * 1024) {
+ KLOG_EVERY_N_SECS(WARNING, 60) << "Received large Thrift SASL frame: "
+ << HumanReadableNumBytes::ToString(payload_len);
+ if (payload_len > max_recv_buf_size_) {
+ throw TTransportException(Substitute("Thrift SASL frame is too long: $0/$1",
+ HumanReadableNumBytes::ToString(payload_len),
+ HumanReadableNumBytes::ToString(max_recv_buf_size_)));
+ }
+ }
+
+ read_buf_.reserve(kFrameHeaderSize + payload_len);
+ read_buf_.append(payload_len_buf, kFrameHeaderSize);
+ read_buf_.resize(kFrameHeaderSize + payload_len);
+ transport_->readAll(&read_buf_.data()[kFrameHeaderSize], payload_len);
+
+ if (needs_wrap_) {
+ // Point read_slice_ directly at the SASL library's internal buffer. This
+ // avoids having to copy the decoded data back into read_buf_.
+ Status s = rpc::SaslDecode(sasl_conn_.get(), read_buf_, &read_slice_);
+ if (!s.ok()) {
+ throw SaslException(s);
+ }
+ ResetReadBuf();
+ } else {
+ read_slice_ = read_buf_;
+ read_slice_.remove_prefix(kFrameHeaderSize);
+ }
+}
+
+uint32_t SaslClientTransport::read(uint8_t* buf, uint32_t len) {
+ // If there is nothing left to read in the buffer, then fill it.
+ if (read_slice_.empty()) {
+ ReadFrame();
+ }
+
+ uint32_t n = std::min(read_slice_.size(), static_cast<size_t>(len));
+ memcpy(buf, read_slice_.data(), n);
+ read_slice_.remove_prefix(n);
+ if (read_slice_.empty()) {
+ ResetReadBuf();
+ }
+ return n;
+}
+
+void SaslClientTransport::write(const uint8_t* buf, uint32_t len) {
+ // Check that we've already preallocated space in the buffer for the frame-header.
+ DCHECK(write_buf_.size() >= kFrameHeaderSize);
+
+ // Check if the amount to write would overflow a frame.
+ while (write_buf_.size() + len > max_send_buf_size_) {
+ uint32_t n = max_send_buf_size_ - write_buf_.size();
+ write_buf_.append(buf, n);
+ flush();
+ buf += n;
+ len -= n;
+ }
+
+ write_buf_.append(buf, len);
+}
+
+void SaslClientTransport::flush() {
+ if (needs_wrap_) {
+ Slice plaintext(write_buf_);
+ plaintext.remove_prefix(kFrameHeaderSize);
+ Slice ciphertext;
+ Status s = rpc::SaslEncode(sasl_conn_.get(), plaintext, &ciphertext);
+ if (!s.ok()) {
+ throw SaslException(s);
+ }
+
+ // Note: when the SASL C library encodes the plaintext, it prefixes the
+ // ciphertext with the length. Since this happens to match the SASL/Thrift
+ // frame format, we can send the ciphertext unmodified to the remote server.
+ transport_->write(ciphertext.data(), ciphertext.size());
+ } else {
+ size_t payload_len = write_buf_.size() - kFrameHeaderSize;
+ NetworkByteOrder::Store32(write_buf_.data(), payload_len);
+ transport_->write(write_buf_.data(), write_buf_.size());
+ }
+
+ transport_->flush();
+ ResetWriteBuf();
+}
+
+void SaslClientTransport::Negotiate() {
+ SetupSaslContext();
+
+ faststring recv_buf;
+ SendSaslStart();
+
+ for (;;) {
+ NegotiationStatus status = ReceiveSaslMessage(&recv_buf);
+
+ if (status == TSASL_COMPLETE) {
+ throw SaslException(
+ Status::IllegalState("Received SASL COMPLETE status, but handshake is not finished"));
+ }
+ CHECK_EQ(status, TSASL_OK);
+
+ const char* out;
+ unsigned out_len;
+ Status s = WrapSaslCall(sasl_conn_.get(), [&] {
+ return sasl_client_step(sasl_conn_.get(),
+ reinterpret_cast<const char*>(recv_buf.data()),
+ recv_buf.size(),
+ nullptr,
+ &out,
+ &out_len);
+ });
+
+ if (PREDICT_FALSE(!s.IsIncomplete() && !s.ok())) {
+ throw SaslException(std::move(s));
+ }
+
+ SendSaslMessage(status, Slice(out, out_len));
+ transport_->flush();
+
+ if (s.ok()) {
+ break;
+ }
+ }
+
+ NegotiationStatus status = ReceiveSaslMessage(&recv_buf);
+ if (status != TSASL_COMPLETE) {
+ throw SaslException(
+ Status::IllegalState("Received SASL OK status, but expected SASL COMPLETE"));
+ }
+ DCHECK_EQ(0, recv_buf.size());
+
+ needs_wrap_ = rpc::NeedsWrap(sasl_conn_.get());
+ max_send_buf_size_ = rpc::GetMaxSendBufferSize(sasl_conn_.get());
+ VLOG(2) << "Thrift SASL GSSAPI negotiation complete"
+ << "; needs wrap: " << (needs_wrap_ ? "true" : "false")
+ << ", max send frame length: "
+ << HumanReadableNumBytes::ToStringWithoutRounding(max_send_buf_size_)
+ << ", max receive frame length: "
+ << HumanReadableNumBytes::ToStringWithoutRounding(max_recv_buf_size_);
+}
+
+void SaslClientTransport::SendSaslMessage(NegotiationStatus status, Slice payload) {
+ uint8_t header[kSaslHeaderSize];
+ header[0] = status;
+ DCHECK_LT(payload.size(), std::numeric_limits<int32_t>::max());
+ NetworkByteOrder::Store32(&header[1], payload.size());
+ transport_->write(header, kSaslHeaderSize);
+ if (!payload.empty()) {
+ transport_->write(payload.data(), payload.size());
+ }
+}
+
+NegotiationStatus SaslClientTransport::ReceiveSaslMessage(faststring* payload) {
+ // Read the fixed-length message header.
+ uint8_t header[kSaslHeaderSize];
+ transport_->readAll(header, kSaslHeaderSize);
+ size_t len = NetworkByteOrder::Load32(&header[1]);
+
+ // Handle status errors.
+ switch (header[0]) {
+ case TSASL_OK:
+ case TSASL_COMPLETE: break;
+ case TSASL_BAD:
+ case TSASL_ERROR:
+ throw SaslException(Status::RuntimeError("SASL peer indicated failure"));
+ // The Thrift client should never receive TSASL_START.
+ case TSASL_START:
+ default:
+ throw SaslException(Status::RuntimeError("Unexpected SASL status",
+ std::to_string(header[0])));
+ }
+
+ // Read the message payload.
+ if (len > max_recv_buf_size_) {
+ throw SaslException(Status::RuntimeError(Substitute(
+ "SASL negotiation message payload exceeds maximum length: $0/$1",
+ HumanReadableNumBytes::ToString(len),
+ HumanReadableNumBytes::ToString(max_recv_buf_size_))));
+ }
+ payload->resize(len);
+ transport_->readAll(payload->data(), len);
+
+ return static_cast<NegotiationStatus>(header[0]);
+}
+
+void SaslClientTransport::SendSaslStart() {
+ const char* init_msg = nullptr;
+ unsigned init_msg_len = 0;
+ const char* negotiated_mech = nullptr;
+
+ Status s = WrapSaslCall(sasl_conn_.get(), [&] {
+ return sasl_client_start(
+ sasl_conn_.get(), // The SASL connection context created by sasl_client_new()
+ SaslMechanism::name_of(SaslMechanism::GSSAPI), // The mechanism to use.
+ nullptr, // Disables INTERACT return if NULL.
+ &init_msg, // Filled in on success.
+ &init_msg_len, // Filled in on success.
+ &negotiated_mech); // Filled in on success.
+ });
+
+ if (PREDICT_FALSE(!s.IsIncomplete() && !s.ok())) {
+ throw SaslException(std::move(s));
+ }
+
+ // Check that the SASL library is using the mechanism that we picked.
+ DCHECK_EQ(SaslMechanism::value_of(negotiated_mech), SaslMechanism::GSSAPI);
+ s = rpc::EnableProtection(sasl_conn_.get(),
+ rpc::SaslProtection::kAuthentication,
+ max_recv_buf_size_);
+ if (!s.ok()) {
+ throw SaslException(s);
+ }
+
+ // These two calls comprise a single message in the thrift-sasl protocol.
+ SendSaslMessage(TSASL_START, Slice(negotiated_mech));
+ SendSaslMessage(TSASL_OK, Slice(init_msg, init_msg_len));
+ transport_->flush();
+}
+
+int SaslClientTransport::GetOptionCb(const char* plugin_name, const char* option,
+ const char** result, unsigned* len) {
+ return sasl_helper_.GetOptionCb(plugin_name, option, result, len);
+}
+
+void SaslClientTransport::SetupSaslContext() {
+ sasl_conn_t* sasl_conn = nullptr;
+ Status s = WrapSaslCall(nullptr /* no conn */, [&] {
+ return sasl_client_new(
+ // TODO(dan): make the service name configurable.
+ "hive", // Registered name of the service using SASL. Required.
+ sasl_helper_.server_fqdn(), // The fully qualified domain name of the remote server.
+ nullptr, // Local and remote IP address strings. (we don't use
+ nullptr, // any mechanisms which require this info.)
+ sasl_callbacks_.data(), // Connection-specific callbacks.
+ 0, // flags
+ &sasl_conn);
+ });
+ if (!s.ok()) {
+ throw SaslException(s);
+ }
+ sasl_conn_.reset(sasl_conn);
+}
+
+void SaslClientTransport::ResetReadBuf() {
+ read_buf_.clear();
+ read_buf_.shrink_to_fit();
+}
+
+void SaslClientTransport::ResetWriteBuf() {
+ write_buf_.resize(kFrameHeaderSize);
+ write_buf_.shrink_to_fit();
+}
+
+} // namespace hms
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/57fe0c3d/src/kudu/hms/sasl_client_transport.h
----------------------------------------------------------------------
diff --git a/src/kudu/hms/sasl_client_transport.h b/src/kudu/hms/sasl_client_transport.h
new file mode 100644
index 0000000..a2bc7f3
--- /dev/null
+++ b/src/kudu/hms/sasl_client_transport.h
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <sasl/sasl.h>
+#include <thrift/transport/TTransportException.h>
+#include <thrift/transport/TVirtualTransport.h>
+
+#include "kudu/rpc/sasl_helper.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+namespace apache {
+namespace thrift {
+namespace transport {
+class TTransport;
+} // namespace transport
+} // namespace thrift
+} // namespace apache
+
+namespace kudu {
+namespace rpc {
+struct SaslDeleter;
+} // namespace rpc
+namespace hms {
+
+// An exception representing a SASL or Kerberos failure.
+class SaslException : public apache::thrift::transport::TTransportException {
+ public:
+ explicit SaslException(Status status)
+ : TTransportException(status.ToString()),
+ status_(std::move(status)) {
+ }
+
+ const Status& status() const {
+ return status_;
+ }
+
+ private:
+ Status status_;
+};
+
+// An enum describing the possible states of the SASL negotiation protocol.
+enum NegotiationStatus {
+ TSASL_INVALID = -1,
+ TSASL_START = 1,
+ TSASL_OK = 2,
+ TSASL_BAD = 3,
+ TSASL_ERROR = 4,
+ TSASL_COMPLETE = 5
+};
+
+// A Thrift transport which uses SASL GSSAPI to authenticate as a client to a
+// remote server.
+//
+// SaslClientTransport internally holds buffers, so it does not need the
+// underlying transport to be buffered.
+class SaslClientTransport
+ : public apache::thrift::transport::TVirtualTransport<SaslClientTransport> {
+ public:
+ SaslClientTransport(const std::string& server_fqdn,
+ std::shared_ptr<TTransport> transport,
+ size_t max_recv_buf_size);
+
+ ~SaslClientTransport() override = default;
+
+ bool isOpen() override;
+
+ bool peek() override;
+
+ void open() override;
+
+ void close() override;
+
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ void write(const uint8_t* buf, uint32_t len);
+
+ void flush() override;
+
+ int GetOptionCb(const char* plugin_name, const char* option,
+ const char** result, unsigned* len);
+
+ private:
+
+ // Runs SASL negotiation with the remote server.
+ void Negotiate();
+
+ // Sends a SASL negotiation message to the underlying transport.
+ //
+ // Send a SASL negotiation message using the Thrift framing protocol:
+ //
+ // - 1 byte of status
+ // - 4 bytes of remaining length
+ // - var-len payload
+ void SendSaslMessage(NegotiationStatus status, Slice payload);
+
+ // Receives a SASL negotiation message from the underlying transport.
+ //
+ // The returned negotiation status will be of type OK or COMPLETE, all
+ // other statuses result in an exception.
+ NegotiationStatus ReceiveSaslMessage(faststring* payload);
+
+ // Initializes SASL state.
+ void SetupSaslContext();
+
+ // Sends the initial SASL connection message.
+ void SendSaslStart();
+
+ // Reads a frame from the underlying transport, storing the payload into
+ // read_slice_. If the connection is using SASL auth-conf or auth-int
+ // protection the data is automatically decoded.
+ void ReadFrame();
+
+ // Resets the read buffer to empty, and deallocates its internal buffer.
+ void ResetReadBuf();
+
+ // Resets the write buffer to the size of a frame header, and deallocates its
+ // internal buffer.
+ void ResetWriteBuf();
+
+ // The underlying transport. Typically a TCP socket.
+ std::shared_ptr<TTransport> transport_;
+
+ // SASL state.
+ rpc::SaslHelper sasl_helper_;
+ std::unique_ptr<sasl_conn_t, rpc::SaslDeleter> sasl_conn_;
+ std::vector<sasl_callback_t> sasl_callbacks_;
+
+ // Whether the connection is using auth-int or auth-conf protection.
+ bool needs_wrap_;
+
+ // The negotiated SASL maximum buffer sizes. These correspond to the maximum
+ // sized frames that can be received or sent.
+ //
+ // Note: the Java implementation of the Thrift SASL transport does not respect
+ // the negotiated maximum buffer size (THRIFT-4483) and never splits a message
+ // into multiple frames, so we end up having to set the recv buf size to match
+ // the largest serialized Thrift message we want to be able to receive.
+ size_t max_recv_buf_size_;
+ size_t max_send_buf_size_;
+
+ // The read buffer and slice. The slice points to the remaining frame data
+ // which hasn't been read yet.
+ faststring read_buf_;
+ Slice read_slice_;
+
+ // The write buffer.
+ faststring write_buf_;
+};
+
+} // namespace hms
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/57fe0c3d/src/kudu/mini-cluster/external_mini_cluster-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/external_mini_cluster-test.cc b/src/kudu/mini-cluster/external_mini_cluster-test.cc
index c6787f4..8cd357c 100644
--- a/src/kudu/mini-cluster/external_mini_cluster-test.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster-test.cc
@@ -76,12 +76,12 @@ class ExternalMiniClusterTest : public KuduTest,
public testing::WithParamInterface<pair<Kerberos, HiveMetastore>> {
};
-// TODO(dan): Add ENABLED/ENABLED when the mini HMS supports Kerberos.
INSTANTIATE_TEST_CASE_P(KerberosOnAndOff,
ExternalMiniClusterTest,
testing::Values(make_pair(Kerberos::DISABLED, HiveMetastore::DISABLED),
make_pair(Kerberos::ENABLED, HiveMetastore::DISABLED),
- make_pair(Kerberos::DISABLED, HiveMetastore::ENABLED)));
+ make_pair(Kerberos::DISABLED, HiveMetastore::ENABLED),
+ make_pair(Kerberos::ENABLED, HiveMetastore::ENABLED)));
void SmokeTestKerberizedCluster(ExternalMiniClusterOptions opts) {
ASSERT_TRUE(opts.enable_kerberos);
@@ -190,6 +190,17 @@ TEST_P(ExternalMiniClusterTest, TestBasicOperation) {
ASSERT_EQ(ts_rpc.ToString(), ts->bound_rpc_hostport().ToString());
ASSERT_EQ(ts_http.ToString(), ts->bound_http_hostport().ToString());
+ // Verify that the HMS is reachable.
+ if (opts.enable_hive_metastore) {
+ hms::HmsClientOptions hms_client_opts;
+ hms_client_opts.enable_kerberos = opts.enable_kerberos;
+ hms::HmsClient hms_client(cluster.hms()->address(), hms_client_opts);
+ ASSERT_OK(hms_client.Start());
+ vector<string> tables;
+ ASSERT_OK(hms_client.GetAllTables("default", &tables));
+ ASSERT_TRUE(tables.empty()) << "tables: " << tables;
+ }
+
// Verify that, in a Kerberized cluster, if we drop our Kerberos environment,
// we can't make RPCs to a server.
if (opts.enable_kerberos) {
@@ -202,16 +213,6 @@ TEST_P(ExternalMiniClusterTest, TestBasicOperation) {
"but client does not have Kerberos credentials available");
}
- // Verify that the HMS is reachable.
- if (opts.enable_hive_metastore) {
- hms::HmsClient hms_client(cluster.hms()->address(), hms::HmsClientOptions());
- ASSERT_OK(hms_client.Start());
- vector<string> tables;
- ASSERT_OK(hms_client.GetAllTables("default", &tables));
- ASSERT_TRUE(tables.empty()) << "tables: " << tables;
- }
-
- // Test that if we inject a fault into a tablet server's boot process
// ExternalTabletServer::Restart() still returns OK, even if the tablet server crashed.
ts->Shutdown();
ts->mutable_flags()->push_back("--fault_before_start=1.0");
http://git-wip-us.apache.org/repos/asf/kudu/blob/57fe0c3d/src/kudu/mini-cluster/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc b/src/kudu/mini-cluster/external_mini_cluster.cc
index 55a75fa..b90c4ef 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster.cc
@@ -44,6 +44,7 @@
#include "kudu/master/master.proxy.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/sasl_common.h"
#include "kudu/security/test/mini_kdc.h"
#include "kudu/server/server_base.pb.h"
#include "kudu/server/server_base.proxy.h"
@@ -185,6 +186,16 @@ Status ExternalMiniCluster::Start() {
if (opts_.enable_hive_metastore) {
hms_.reset(new hms::MiniHms());
+
+ if (opts_.enable_kerberos) {
+ string spn = "hive/127.0.0.1";
+ string ktpath;
+ RETURN_NOT_OK_PREPEND(kdc_->CreateServiceKeytab(spn, &ktpath),
+ "could not create keytab");
+ hms_->EnableKerberos(kdc_->GetEnvVars()["KRB5_CONFIG"], spn, ktpath,
+ rpc::SaslProtection::kAuthentication);
+ }
+
RETURN_NOT_OK_PREPEND(hms_->Start(),
"Failed to start the Hive Metastore");
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/57fe0c3d/src/kudu/rpc/client_negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/client_negotiation.cc b/src/kudu/rpc/client_negotiation.cc
index 71dde92..02175f6 100644
--- a/src/kudu/rpc/client_negotiation.cc
+++ b/src/kudu/rpc/client_negotiation.cc
@@ -596,7 +596,7 @@ Status ClientNegotiation::SendSaslInitiate() {
// integrity protection so that the channel bindings and nonce can be
// verified.
if (negotiated_mech_ == SaslMechanism::GSSAPI) {
- RETURN_NOT_OK(EnableIntegrityProtection(sasl_conn_.get()));
+ RETURN_NOT_OK(EnableProtection(sasl_conn_.get(), SaslProtection::kIntegrity));
}
NegotiatePB msg;
@@ -662,7 +662,7 @@ Status ClientNegotiation::HandleSaslSuccess(const NegotiatePB& response) {
RETURN_NOT_OK_PREPEND(cert.GetServerEndPointChannelBindings(&expected_channel_bindings),
"failed to generate channel bindings");
- string received_channel_bindings;
+ Slice received_channel_bindings;
RETURN_NOT_OK_PREPEND(SaslDecode(sasl_conn_.get(),
response.channel_bindings(),
&received_channel_bindings),
@@ -704,7 +704,9 @@ Status ClientNegotiation::SendConnectionContext() {
if (nonce_) {
// Reply with the SASL-protected nonce. We only set the nonce when using SASL GSSAPI.
- RETURN_NOT_OK(SaslEncode(sasl_conn_.get(), *nonce_, conn_context.mutable_encoded_nonce()));
+ Slice ciphertext;
+ RETURN_NOT_OK(SaslEncode(sasl_conn_.get(), *nonce_, &ciphertext));
+ *conn_context.mutable_encoded_nonce() = ciphertext.ToString();
}
return SendFramedMessageBlocking(socket(), header, conn_context, deadline_);
http://git-wip-us.apache.org/repos/asf/kudu/blob/57fe0c3d/src/kudu/rpc/sasl_common.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_common.cc b/src/kudu/rpc/sasl_common.cc
index a377d16..645e854 100644
--- a/src/kudu/rpc/sasl_common.cc
+++ b/src/kudu/rpc/sasl_common.cc
@@ -17,7 +17,6 @@
#include "kudu/rpc/sasl_common.h"
-#include <algorithm>
#include <cstdio>
#include <cstring>
#include <limits>
@@ -46,7 +45,7 @@ namespace rpc {
const char* const kSaslMechPlain = "PLAIN";
const char* const kSaslMechGSSAPI = "GSSAPI";
-extern const size_t kSaslMaxOutBufLen = 1024;
+extern const size_t kSaslMaxBufSize = 1024;
// See WrapSaslCall().
static __thread string* g_auth_failure_capture = nullptr;
@@ -351,45 +350,44 @@ Status WrapSaslCall(sasl_conn_t* conn, const std::function<int()>& call) {
}
}
-Status SaslEncode(sasl_conn_t* conn, const std::string& plaintext, std::string* encoded) {
- size_t offset = 0;
-
- // The SASL library can only encode up to a maximum amount at a time, so we
- // have to call encode multiple times if our input is larger than this max.
- while (offset < plaintext.size()) {
- const char* out;
- unsigned out_len;
- size_t len = std::min(kSaslMaxOutBufLen, plaintext.size() - offset);
-
- RETURN_NOT_OK(WrapSaslCall(conn, [&]() {
- return sasl_encode(conn, plaintext.data() + offset, len, &out, &out_len);
- }));
+bool NeedsWrap(sasl_conn_t* sasl_conn) {
+ const unsigned* ssf;
+ int rc = sasl_getprop(sasl_conn, SASL_SSF, reinterpret_cast<const void**>(&ssf));
+ CHECK_EQ(rc, SASL_OK) << "Failed to get SSF property on authenticated SASL connection";
+ return *ssf != 0;
+}
- encoded->append(out, out_len);
- offset += len;
- }
+uint32_t GetMaxSendBufferSize(sasl_conn_t* sasl_conn) {
+ const unsigned* max_buf_size;
+ int rc = sasl_getprop(sasl_conn, SASL_MAXOUTBUF, reinterpret_cast<const void**>(&max_buf_size));
+ CHECK_EQ(rc, SASL_OK)
+ << "Failed to get max output buffer property on authenticated SASL connection";
+ return *max_buf_size;
+}
+Status SaslEncode(sasl_conn_t* conn, Slice plaintext, Slice* ciphertext) {
+ const char* out;
+ unsigned out_len;
+ RETURN_NOT_OK_PREPEND(WrapSaslCall(conn, [&] {
+ return sasl_encode(conn,
+ reinterpret_cast<const char*>(plaintext.data()),
+ plaintext.size(),
+ &out, &out_len);
+ }), "SASL encode failed");
+ *ciphertext = Slice(out, out_len);
return Status::OK();
}
-Status SaslDecode(sasl_conn_t* conn, const string& encoded, string* plaintext) {
- size_t offset = 0;
-
- // The SASL library can only decode up to a maximum amount at a time, so we
- // have to call decode multiple times if our input is larger than this max.
- while (offset < encoded.size()) {
- const char* out;
- unsigned out_len;
- size_t len = std::min(kSaslMaxOutBufLen, encoded.size() - offset);
-
- RETURN_NOT_OK(WrapSaslCall(conn, [&]() {
- return sasl_decode(conn, encoded.data() + offset, len, &out, &out_len);
- }));
-
- plaintext->append(out, out_len);
- offset += len;
- }
-
+Status SaslDecode(sasl_conn_t* conn, Slice ciphertext, Slice* plaintext) {
+ const char* out;
+ unsigned out_len;
+ RETURN_NOT_OK_PREPEND(WrapSaslCall(conn, [&] {
+ return sasl_decode(conn,
+ reinterpret_cast<const char*>(ciphertext.data()),
+ ciphertext.size(),
+ &out, &out_len);
+ }), "SASL decode failed");
+ *plaintext = Slice(out, out_len);
return Status::OK();
}
@@ -425,14 +423,16 @@ sasl_callback_t SaslBuildCallback(int id, int (*proc)(void), void* context) {
return callback;
}
-Status EnableIntegrityProtection(sasl_conn_t* sasl_conn) {
+Status EnableProtection(sasl_conn_t* sasl_conn,
+ SaslProtection::Type minimum_protection,
+ size_t max_recv_buf_size) {
sasl_security_properties_t sec_props;
memset(&sec_props, 0, sizeof(sec_props));
- sec_props.min_ssf = 1;
+ sec_props.min_ssf = minimum_protection;
sec_props.max_ssf = std::numeric_limits<sasl_ssf_t>::max();
- sec_props.maxbufsize = kSaslMaxOutBufLen;
+ sec_props.maxbufsize = max_recv_buf_size;
- RETURN_NOT_OK_PREPEND(WrapSaslCall(sasl_conn, [&] () {
+ RETURN_NOT_OK_PREPEND(WrapSaslCall(sasl_conn, [&] {
return sasl_setprop(sasl_conn, SASL_SEC_PROPS, &sec_props);
}), "failed to set SASL security properties");
return Status::OK();
@@ -457,5 +457,14 @@ const char* SaslMechanism::name_of(SaslMechanism::Type val) {
}
}
+const char* SaslProtection::name_of(SaslProtection::Type val) {
+ switch (val) {
+ case SaslProtection::kAuthentication: return "authentication";
+ case SaslProtection::kIntegrity: return "integrity";
+ case SaslProtection::kPrivacy: return "privacy";
+ }
+ LOG(FATAL) << "unknown SASL protection type: " << val;
+}
+
} // namespace rpc
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/57fe0c3d/src/kudu/rpc/sasl_common.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_common.h b/src/kudu/rpc/sasl_common.h
index 888e7cb..2454cfd 100644
--- a/src/kudu/rpc/sasl_common.h
+++ b/src/kudu/rpc/sasl_common.h
@@ -19,13 +19,15 @@
#define KUDU_RPC_SASL_COMMON_H
#include <cstddef>
+#include <cstdint>
#include <functional>
-#include <string>
#include <set>
+#include <string>
#include <sasl/sasl.h>
#include "kudu/gutil/port.h"
+#include "kudu/util/slice.h"
#include "kudu/util/status.h"
namespace kudu {
@@ -37,7 +39,7 @@ namespace rpc {
// Constants
extern const char* const kSaslMechPlain;
extern const char* const kSaslMechGSSAPI;
-extern const size_t kSaslMaxOutBufLen;
+extern const size_t kSaslMaxBufSize;
struct SaslMechanism {
enum Type {
@@ -49,6 +51,18 @@ struct SaslMechanism {
static const char* name_of(Type val);
};
+struct SaslProtection {
+ enum Type {
+ // SASL authentication without integrity or privacy.
+ kAuthentication = 0,
+ // Integrity protection, i.e. messages are HMAC'd.
+ kIntegrity = 1,
+ // Privacy protection, i.e. messages are encrypted.
+ kPrivacy = 2,
+ };
+ static const char* name_of(Type val);
+};
+
// Initialize the SASL library.
// appname: Name of the application for logging messages & sasl plugin configuration.
// Note that this string must remain allocated for the lifetime of the program.
@@ -93,19 +107,38 @@ std::set<SaslMechanism::Type> SaslListAvailableMechs();
// context: An object to pass to the callback as the context pointer, or NULL.
sasl_callback_t SaslBuildCallback(int id, int (*proc)(void), void* context);
-// Require integrity protection on the SASL connection. Should be called before
-// initiating the SASL negotiation.
-Status EnableIntegrityProtection(sasl_conn_t* sasl_conn) WARN_UNUSED_RESULT;
+// Enable SASL integrity and privacy protection on the connection. Also allows
+// setting the minimum required protection level, and the maximum receive buffer
+// size.
+Status EnableProtection(sasl_conn_t* sasl_conn,
+ SaslProtection::Type minimum_protection = SaslProtection::kAuthentication,
+ size_t max_recv_buf_size = kSaslMaxBufSize) WARN_UNUSED_RESULT;
-// Encode the provided data and append it to 'encoded'.
+// Returns true if the SASL connection has been negotiated with auth-int or
+// auth-conf. 'sasl_conn' must already be negotiated.
+bool NeedsWrap(sasl_conn_t* sasl_conn);
+
+// Retrieves the negotiated maximum send buffer size for auth-int or auth-conf
+// protected channels.
+uint32_t GetMaxSendBufferSize(sasl_conn_t* sasl_conn) WARN_UNUSED_RESULT;
+
+// Encode the provided data.
+//
+// The plaintext data must not be longer than the negotiated maximum buffer size.
+//
+// The output 'ciphertext' slice is only valid until the next use of the SASL connection.
Status SaslEncode(sasl_conn_t* conn,
- const std::string& plaintext,
- std::string* encoded) WARN_UNUSED_RESULT;
+ Slice plaintext,
+ Slice* ciphertext) WARN_UNUSED_RESULT;
-// Decode the provided SASL-encoded data and append it to 'plaintext'.
+// Decode the provided SASL-encoded data.
+//
+// The decoded plaintext must not be longer than the negotiated maximum buffer size.
+//
+// The output 'plaintext' slice is only valid until the next use of the SASL connection.
Status SaslDecode(sasl_conn_t* conn,
- const std::string& encoded,
- std::string* plaintext) WARN_UNUSED_RESULT;
+ Slice ciphertext,
+ Slice* plaintext) WARN_UNUSED_RESULT;
// Deleter for sasl_conn_t instances, for use with gscoped_ptr after calling sasl_*_new()
struct SaslDeleter {
http://git-wip-us.apache.org/repos/asf/kudu/blob/57fe0c3d/src/kudu/rpc/server_negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/server_negotiation.cc b/src/kudu/rpc/server_negotiation.cc
index b12b54b..a623853 100644
--- a/src/kudu/rpc/server_negotiation.cc
+++ b/src/kudu/rpc/server_negotiation.cc
@@ -789,7 +789,7 @@ Status ServerNegotiation::HandleSaslInitiate(const NegotiatePB& request) {
// integrity protection so that the channel bindings and nonce can be
// verified.
if (negotiated_mech_ == SaslMechanism::GSSAPI) {
- RETURN_NOT_OK(EnableIntegrityProtection(sasl_conn_.get()));
+ RETURN_NOT_OK(EnableProtection(sasl_conn_.get(), SaslProtection::kIntegrity));
}
const char* server_out = nullptr;
@@ -884,9 +884,12 @@ Status ServerNegotiation::SendSaslSuccess() {
string plaintext_channel_bindings;
RETURN_NOT_OK(cert.GetServerEndPointChannelBindings(&plaintext_channel_bindings));
+
+ Slice ciphertext;
RETURN_NOT_OK(SaslEncode(sasl_conn_.get(),
plaintext_channel_bindings,
- response.mutable_channel_bindings()));
+ &ciphertext));
+ *response.mutable_channel_bindings() = ciphertext.ToString();
}
}
@@ -919,7 +922,7 @@ Status ServerNegotiation::RecvConnectionContext(faststring* recv_buf) {
return Status::NotAuthorized("ConnectionContextPB wrapped nonce missing");
}
- string decoded_nonce;
+ Slice decoded_nonce;
s = SaslDecode(sasl_conn_.get(), conn_context.encoded_nonce(), &decoded_nonce);
if (!s.ok()) {
return Status::NotAuthorized("failed to decode nonce", s.message());