You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2018/09/22 00:39:35 UTC

kudu git commit: Add thrift module for common thrift utilities

Repository: kudu
Updated Branches:
  refs/heads/master 6129467d9 -> ddaa766e2


Add thrift module for common thrift utilities

The HMS patch series inlined all necessary Thrift utilities into the hms
module, since it was the only use of Thrift in the codebase. Now that
we're also planning on having a Sentry client it makes sense to properly
abstract the common Thrift code into its own shared module.

Change-Id: I6f6f843f42b37cb1170df03da01fc0790fe94acb
Reviewed-on: http://gerrit.cloudera.org:8080/11493
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ddaa766e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ddaa766e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ddaa766e

Branch: refs/heads/master
Commit: ddaa766e2fdf177a86ed3eb57e6b917730b75843
Parents: 6129467
Author: Dan Burkert <da...@apache.org>
Authored: Fri Sep 21 12:40:18 2018 -0700
Committer: Dan Burkert <da...@apache.org>
Committed: Sat Sep 22 00:39:23 2018 +0000

----------------------------------------------------------------------
 CMakeLists.txt                                  |   1 +
 src/kudu/hms/CMakeLists.txt                     |   4 +-
 src/kudu/hms/hms_catalog-test.cc                |   3 +-
 src/kudu/hms/hms_catalog.cc                     |  14 +-
 src/kudu/hms/hms_catalog.h                      |   2 -
 src/kudu/hms/hms_client-test.cc                 |  11 +-
 src/kudu/hms/hms_client.cc                      |  66 +--
 src/kudu/hms/hms_client.h                       |  27 +-
 src/kudu/hms/sasl_client_transport.cc           | 402 -------------------
 src/kudu/hms/sasl_client_transport.h            | 176 --------
 src/kudu/integration-tests/master_hms-itest.cc  |   4 +-
 .../mini-cluster/external_mini_cluster-test.cc  |   3 +-
 src/kudu/thrift/CMakeLists.txt                  |  33 ++
 src/kudu/thrift/client.cc                       |  85 ++++
 src/kudu/thrift/client.h                        |  65 +++
 src/kudu/thrift/sasl_client_transport.cc        | 402 +++++++++++++++++++
 src/kudu/thrift/sasl_client_transport.h         | 176 ++++++++
 src/kudu/tools/kudu-tool-test.cc                |  10 +-
 18 files changed, 806 insertions(+), 678 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/ddaa766e/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 6c9d35e..df02893 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1477,6 +1477,7 @@ add_subdirectory(src/kudu/security)
 add_subdirectory(src/kudu/sentry)
 add_subdirectory(src/kudu/server)
 add_subdirectory(src/kudu/tablet)
+add_subdirectory(src/kudu/thrift)
 add_subdirectory(src/kudu/tools)
 add_subdirectory(src/kudu/tserver)
 add_subdirectory(src/kudu/util)

http://git-wip-us.apache.org/repos/asf/kudu/blob/ddaa766e/src/kudu/hms/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/hms/CMakeLists.txt b/src/kudu/hms/CMakeLists.txt
index 90794a6..287307d 100644
--- a/src/kudu/hms/CMakeLists.txt
+++ b/src/kudu/hms/CMakeLists.txt
@@ -34,14 +34,14 @@ add_dependencies(hms_thrift ${HMS_THRIFT_TGTS})
 
 set(HMS_SRCS
   hms_catalog.cc
-  hms_client.cc
-  sasl_client_transport.cc)
+  hms_client.cc)
 set(HMS_DEPS
   gflags
   glog
   hms_thrift
   krpc
   kudu_common
+  kudu_thrift
   kudu_util)
 
 add_library(kudu_hms ${HMS_SRCS})

http://git-wip-us.apache.org/repos/asf/kudu/blob/ddaa766e/src/kudu/hms/hms_catalog-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_catalog-test.cc b/src/kudu/hms/hms_catalog-test.cc
index cf9977b..a2bfabf 100644
--- a/src/kudu/hms/hms_catalog-test.cc
+++ b/src/kudu/hms/hms_catalog-test.cc
@@ -37,6 +37,7 @@
 #include "kudu/hms/mini_hms.h"
 #include "kudu/rpc/sasl_common.h"
 #include "kudu/security/test/mini_kdc.h"
+#include "kudu/thrift/client.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
@@ -161,7 +162,7 @@ class HmsCatalogTest : public KuduTest {
   void SetUp() override {
     bool enable_kerberos = EnableKerberos();
 
-    HmsClientOptions hms_client_opts;
+    thrift::ClientOptions hms_client_opts;
 
     hms_.reset(new hms::MiniHms());
     if (enable_kerberos) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/ddaa766e/src/kudu/hms/hms_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_catalog.cc b/src/kudu/hms/hms_catalog.cc
index f62f9ba..aeaf226 100644
--- a/src/kudu/hms/hms_catalog.cc
+++ b/src/kudu/hms/hms_catalog.cc
@@ -42,6 +42,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/hms/hive_metastore_types.h"
 #include "kudu/hms/hms_client.h"
+#include "kudu/thrift/client.h"
 #include "kudu/util/async_util.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/net/net_util.h"
@@ -100,6 +101,14 @@ TAG_FLAG(hive_metastore_conn_timeout, advanced);
 TAG_FLAG(hive_metastore_conn_timeout, experimental);
 TAG_FLAG(hive_metastore_conn_timeout, runtime);
 
+DEFINE_int32(hive_metastore_max_message_size, 100 * 1024 * 1024,
+             "Maximum size of Hive Metastore objects that can be received by the "
+             "HMS client in bytes. Should match the metastore.server.max.message.size "
+             "configuration.");
+TAG_FLAG(hive_metastore_max_message_size, advanced);
+TAG_FLAG(hive_metastore_max_message_size, experimental);
+TAG_FLAG(hive_metastore_max_message_size, runtime);
+
 namespace kudu {
 namespace hms {
 
@@ -109,7 +118,7 @@ const char* const HmsCatalog::kInvalidTableError = "when the Hive Metastore inte
 
 HmsCatalog::HmsCatalog(string master_addresses)
     : master_addresses_(std::move(master_addresses)),
-      hms_client_(HostPort("", 0), hms_client_options_),
+      hms_client_(HostPort("", 0), thrift::ClientOptions()),
       reconnect_after_(MonoTime::Now()),
       reconnect_failure_(Status::OK()),
       consecutive_reconnect_failures_(0),
@@ -402,11 +411,12 @@ Status HmsCatalog::Execute(Task task) {
 Status HmsCatalog::Reconnect() {
   Status s;
 
-  HmsClientOptions options;
+  thrift::ClientOptions options;
   options.send_timeout = MonoDelta::FromSeconds(FLAGS_hive_metastore_send_timeout);
   options.recv_timeout = MonoDelta::FromSeconds(FLAGS_hive_metastore_recv_timeout);
   options.conn_timeout = MonoDelta::FromSeconds(FLAGS_hive_metastore_conn_timeout);
   options.enable_kerberos = FLAGS_hive_metastore_sasl_enabled;
+  options.max_buf_size = FLAGS_hive_metastore_max_message_size;
 
   // Try reconnecting to each HMS in sequence, returning the first one which
   // succeeds. In order to avoid getting 'stuck' on a partially failed HMS, we

http://git-wip-us.apache.org/repos/asf/kudu/blob/ddaa766e/src/kudu/hms/hms_catalog.h
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_catalog.h b/src/kudu/hms/hms_catalog.h
index de95ccf..a13f885 100644
--- a/src/kudu/hms/hms_catalog.h
+++ b/src/kudu/hms/hms_catalog.h
@@ -208,8 +208,6 @@ class HmsCatalog {
 
   // Fields only used by the threadpool thread:
 
-  // Options to use when creating the HMS client.
-  hms::HmsClientOptions hms_client_options_;
   // The HMS client.
   hms::HmsClient hms_client_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/ddaa766e/src/kudu/hms/hms_client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_client-test.cc b/src/kudu/hms/hms_client-test.cc
index fa8d9bf..e677cd0 100644
--- a/src/kudu/hms/hms_client-test.cc
+++ b/src/kudu/hms/hms_client-test.cc
@@ -36,6 +36,7 @@
 #include "kudu/hms/mini_hms.h"
 #include "kudu/rpc/sasl_common.h"
 #include "kudu/security/test/mini_kdc.h"
+#include "kudu/thrift/client.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
@@ -107,7 +108,7 @@ TEST_P(HmsClientTest, TestHmsOperations) {
   optional<SaslProtection::Type> protection = GetParam();
   MiniKdc kdc;
   MiniHms hms;
-  HmsClientOptions hms_client_opts;
+  thrift::ClientOptions hms_client_opts;
 
   if (protection) {
     ASSERT_OK(kdc.Start());
@@ -282,7 +283,7 @@ TEST_P(HmsClientTest, TestLargeObjects) {
 
   MiniKdc kdc;
   MiniHms hms;
-  HmsClientOptions hms_client_opts;
+  thrift::ClientOptions hms_client_opts;
 
   if (protection) {
     ASSERT_OK(kdc.Start());
@@ -352,7 +353,7 @@ TEST_F(HmsClientTest, TestHmsFaultHandling) {
   MiniHms hms;
   ASSERT_OK(hms.Start());
 
-  HmsClientOptions options;
+  thrift::ClientOptions options;
   options.recv_timeout = MonoDelta::FromMilliseconds(500),
   options.send_timeout = MonoDelta::FromMilliseconds(500);
   HmsClient client(hms.address(), options);
@@ -386,7 +387,7 @@ TEST_F(HmsClientTest, TestHmsConnect) {
   Sockaddr loopback;
   ASSERT_OK(loopback.ParseString("127.0.0.1", 0));
 
-  HmsClientOptions options;
+  thrift::ClientOptions options;
   options.recv_timeout = MonoDelta::FromMilliseconds(100),
   options.send_timeout = MonoDelta::FromMilliseconds(100);
   options.conn_timeout = MonoDelta::FromMilliseconds(100);
@@ -435,7 +436,7 @@ TEST_F(HmsClientTest, TestCaseSensitivity) {
   MiniHms hms;
   ASSERT_OK(hms.Start());
 
-  HmsClient client(hms.address(), HmsClientOptions());
+  HmsClient client(hms.address(), thrift::ClientOptions());
   ASSERT_OK(client.Start());
 
   // Create a database.

http://git-wip-us.apache.org/repos/asf/kudu/blob/ddaa766e/src/kudu/hms/hms_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_client.cc b/src/kudu/hms/hms_client.cc
index 257e150..ef37616 100644
--- a/src/kudu/hms/hms_client.cc
+++ b/src/kudu/hms/hms_client.cc
@@ -20,55 +20,35 @@
 #include <algorithm>
 #include <exception>
 #include <memory>
-#include <mutex>
-#include <ostream>
 #include <string>
 #include <vector>
 
 #include <boost/algorithm/string/predicate.hpp>
-#include <gflags/gflags.h>
 #include <glog/logging.h>
-#include <thrift/TOutput.h>
 #include <thrift/Thrift.h>
-#include <thrift/protocol/TBinaryProtocol.h>
 #include <thrift/protocol/TJSONProtocol.h>
 #include <thrift/protocol/TProtocol.h>
 #include <thrift/transport/TBufferTransports.h>
-#include <thrift/transport/TSocket.h>
 #include <thrift/transport/TTransport.h>
 #include <thrift/transport/TTransportException.h>
 
-#include "kudu/gutil/macros.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/strip.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/hms/ThriftHiveMetastore.h"
 #include "kudu/hms/hive_metastore_types.h"
-#include "kudu/hms/sasl_client_transport.h"
-#include "kudu/util/flag_tags.h"
-#include "kudu/util/net/net_util.h"
+#include "kudu/thrift/client.h"
+#include "kudu/thrift/sasl_client_transport.h"
 #include "kudu/util/status.h"
 #include "kudu/util/stopwatch.h"
 
-// Default to 100 MiB to match Thrift TSaslTransport.receiveSaslMessage and the
-// HMS metastore.server.max.message.size config.
-DEFINE_int32(hms_client_max_buf_size, 100 * 1024 * 1024,
-             "Maximum size of Hive Metastore objects that can be received by the "
-             "HMS client in bytes.");
-TAG_FLAG(hms_client_max_buf_size, experimental);
-// Note: despite being marked as a runtime flag, the new buf size value will
-// only take effect for new HMS clients.
-TAG_FLAG(hms_client_max_buf_size, runtime);
-
 using apache::thrift::TException;
-using apache::thrift::protocol::TBinaryProtocol;
 using apache::thrift::protocol::TJSONProtocol;
-using apache::thrift::transport::TBufferedTransport;
 using apache::thrift::transport::TMemoryBuffer;
-using apache::thrift::transport::TSocket;
-using apache::thrift::transport::TTransport;
 using apache::thrift::transport::TTransportException;
-using std::make_shared;
+using kudu::thrift::ClientOptions;
+using kudu::thrift::CreateClientProtocol;
+using kudu::thrift::SaslException;
 using std::shared_ptr;
 using std::string;
 using std::vector;
@@ -140,40 +120,8 @@ const uint16_t HmsClient::kDefaultHmsPort = 9083;
 
 const int kSlowExecutionWarningThresholdMs = 1000;
 
-namespace {
-// A logging callback for Thrift.
-//
-// Normally this would be defined in a more neutral location (e.g. Impala
-// defines it in thrift-util.cc), but since Hive is currently Kudu's only user
-// of Thrift, it's nice to have the log messsages originate from hms_client.cc.
-void ThriftOutputFunction(const char* output) {
-  LOG(INFO) << output;
-}
-} // anonymous namespace
-
-HmsClient::HmsClient(const HostPort& hms_address, const HmsClientOptions& options)
-      : client_(nullptr) {
-  static std::once_flag set_thrift_logging_callback;
-  std::call_once(set_thrift_logging_callback, [] {
-      apache::thrift::GlobalOutput.setOutputFunction(ThriftOutputFunction);
-  });
-
-  auto socket = make_shared<TSocket>(hms_address.host(), hms_address.port());
-  socket->setSendTimeout(options.send_timeout.ToMilliseconds());
-  socket->setRecvTimeout(options.recv_timeout.ToMilliseconds());
-  socket->setConnTimeout(options.conn_timeout.ToMilliseconds());
-  shared_ptr<TTransport> transport;
-
-  if (options.enable_kerberos) {
-    transport = make_shared<SaslClientTransport>(hms_address.host(),
-                                                 std::move(socket),
-                                                 FLAGS_hms_client_max_buf_size);
-  } else {
-    transport = make_shared<TBufferedTransport>(std::move(socket));
-  }
-
-  auto protocol = make_shared<TBinaryProtocol>(std::move(transport));
-  client_ = hive::ThriftHiveMetastoreClient(std::move(protocol));
+HmsClient::HmsClient(const HostPort& address, const ClientOptions& options)
+      : client_(hive::ThriftHiveMetastoreClient(CreateClientProtocol(address, options))) {
 }
 
 HmsClient::~HmsClient() {

http://git-wip-us.apache.org/repos/asf/kudu/blob/ddaa766e/src/kudu/hms/hms_client.h
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_client.h b/src/kudu/hms/hms_client.h
index 6c97d1c..c081366 100644
--- a/src/kudu/hms/hms_client.h
+++ b/src/kudu/hms/hms_client.h
@@ -24,7 +24,6 @@
 #include "kudu/gutil/port.h"
 #include "kudu/hms/ThriftHiveMetastore.h"
 #include "kudu/hms/hive_metastore_types.h"
-#include "kudu/util/monotime.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
@@ -32,6 +31,10 @@ namespace kudu {
 
 class HostPort;
 
+namespace thrift {
+struct ClientOptions;
+}
+
 namespace hms {
 
 // Whether to drop child-objects when dropping an HMS object.
@@ -40,21 +43,6 @@ enum class Cascade {
   kFalse,
 };
 
-struct HmsClientOptions {
-
-  // Thrift socket send timeout
-  MonoDelta send_timeout = MonoDelta::FromSeconds(60);
-
-  // Thrift socket receive timeout.
-  MonoDelta recv_timeout = MonoDelta::FromSeconds(60);
-
-  // Thrift socket connect timeout.
-  MonoDelta conn_timeout = MonoDelta::FromSeconds(60);
-
-  // Whether to use SASL Kerberos authentication when connecting to the HMS.
-  bool enable_kerberos = false;
-};
-
 // A client for the Hive Metastore.
 //
 // All operations are synchronous, and may block.
@@ -102,11 +90,8 @@ class HmsClient {
 
   static const uint16_t kDefaultHmsPort;
 
-  // Create an HmsClient connection to the proided HMS Thrift RPC address.
-  //
-  // The individual timeouts may be set to enforce per-operation
-  // (read/write/connect) timeouts.
-  HmsClient(const HostPort& hms_address, const HmsClientOptions& options);
+  // Create an HmsClient connection to the provided HMS Thrift RPC address.
+  HmsClient(const HostPort& address, const thrift::ClientOptions& options);
   ~HmsClient();
 
   // Starts the HMS client.

http://git-wip-us.apache.org/repos/asf/kudu/blob/ddaa766e/src/kudu/hms/sasl_client_transport.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/sasl_client_transport.cc b/src/kudu/hms/sasl_client_transport.cc
deleted file mode 100644
index 95d1222..0000000
--- a/src/kudu/hms/sasl_client_transport.cc
+++ /dev/null
@@ -1,402 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/hms/sasl_client_transport.h"
-
-#include <algorithm>
-#include <cstring>
-#include <limits>
-#include <memory>
-#include <ostream>
-#include <string>
-
-#include <glog/logging.h>
-#include <thrift/transport/TTransport.h>
-
-#include "kudu/gutil/endian.h"
-#include "kudu/gutil/port.h"
-#include "kudu/gutil/strings/human_readable.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/rpc/sasl_common.h"
-#include "kudu/rpc/sasl_helper.h"
-#include "kudu/util/faststring.h"
-#include "kudu/util/logging.h"
-#include "kudu/util/slice.h"
-#include "kudu/util/status.h"
-
-using apache::thrift::transport::TTransportException;
-using std::shared_ptr;
-using std::string;
-using strings::Substitute;
-
-namespace kudu {
-
-using rpc::SaslMechanism;
-using rpc::WrapSaslCall;
-
-namespace hms {
-
-namespace {
-
-// SASL negotiation frames are sent with an 8-bit status and a 32-bit length.
-const uint32_t kSaslHeaderSize = sizeof(uint8_t) + sizeof(uint32_t);
-
-// Frame headers consist of a 32-bit length.
-const uint32_t kFrameHeaderSize = sizeof(uint32_t);
-
-// SASL SASL_CB_GETOPT callback function.
-int GetoptCb(SaslClientTransport* client_transport,
-             const char* plugin_name,
-             const char* option,
-             const char** result,
-             unsigned* len) {
-  return client_transport->GetOptionCb(plugin_name, option, result, len);
-}
-
-// SASL SASL_CB_CANON_USER callback function.
-int CanonUserCb(sasl_conn_t* /*conn*/,
-                void* /*context*/,
-                const char* in, unsigned inlen,
-                unsigned /*flags*/,
-                const char* /*user_realm*/,
-                char* out, unsigned out_max, unsigned* out_len) {
-  CHECK_LE(inlen, out_max);
-  memcpy(out, in, inlen);
-  *out_len = inlen;
-  return SASL_OK;
-}
-
-// SASL SASL_CB_USER callback function.
-int UserCb(void* /*context*/, int id, const char** result, unsigned* len) {
-  CHECK_EQ(SASL_CB_USER, id);
-
-  // Setting the username to the empty string causes the remote end to use the
-  // clients Kerberos principal, which is correct.
-  *result = "";
-  if (len != nullptr) *len = 0;
-  return SASL_OK;
-}
-} // anonymous namespace
-
-SaslClientTransport::SaslClientTransport(const string& server_fqdn,
-                                         shared_ptr<TTransport> transport,
-                                         size_t max_recv_buf_size)
-    : transport_(std::move(transport)),
-      sasl_helper_(rpc::SaslHelper::CLIENT),
-      sasl_callbacks_({
-          rpc::SaslBuildCallback(SASL_CB_GETOPT, reinterpret_cast<int (*)()>(&GetoptCb), this),
-          rpc::SaslBuildCallback(SASL_CB_CANON_USER,
-                                 reinterpret_cast<int (*)()>(&CanonUserCb),
-                                 this),
-          rpc::SaslBuildCallback(SASL_CB_USER, reinterpret_cast<int (*)()>(&UserCb), nullptr),
-          rpc::SaslBuildCallback(SASL_CB_LIST_END, nullptr, nullptr)
-      }),
-      needs_wrap_(false),
-      max_recv_buf_size_(max_recv_buf_size),
-      // Set a reasonable max send buffer size for negotiation. Once negotiation
-      // is complete the negotiated value will be used.
-      max_send_buf_size_(64 * 1024) {
-  sasl_helper_.set_server_fqdn(server_fqdn);
-  sasl_helper_.EnableGSSAPI();
-  ResetWriteBuf();
-}
-
-bool SaslClientTransport::isOpen() {
-  return transport_->isOpen();
-}
-
-bool SaslClientTransport::peek() {
-  return !read_slice_.empty() || transport_->peek();
-}
-
-void SaslClientTransport::open() {
-  transport_->open();
-  DCHECK(transport_->isOpen());
-  try {
-    Negotiate();
-  } catch (...) {
-    transport_->close();
-    throw;
-  }
-}
-
-void SaslClientTransport::close() {
-  transport_->close();
-  sasl_conn_.reset();
-}
-
-void SaslClientTransport::ReadFrame() {
-  DCHECK_EQ(0, read_buf_.size());
-  DCHECK(read_slice_.empty());
-
-  uint8_t payload_len_buf[kFrameHeaderSize];
-  transport_->readAll(payload_len_buf, kFrameHeaderSize);
-  size_t payload_len = NetworkByteOrder::Load32(payload_len_buf);
-
-  if (payload_len > 1024 * 1024) {
-    KLOG_EVERY_N_SECS(WARNING, 60) << "Received large Thrift SASL frame: "
-                                   << HumanReadableNumBytes::ToString(payload_len);
-    if (payload_len > max_recv_buf_size_) {
-      throw TTransportException(Substitute("Thrift SASL frame is too long: $0/$1",
-                                           HumanReadableNumBytes::ToString(payload_len),
-                                           HumanReadableNumBytes::ToString(max_recv_buf_size_)));
-    }
-  }
-
-  read_buf_.reserve(kFrameHeaderSize + payload_len);
-  read_buf_.append(payload_len_buf, kFrameHeaderSize);
-  read_buf_.resize(kFrameHeaderSize + payload_len);
-  transport_->readAll(&read_buf_.data()[kFrameHeaderSize], payload_len);
-
-  if (needs_wrap_) {
-    // Point read_slice_ directly at the SASL library's internal buffer. This
-    // avoids having to copy the decoded data back into read_buf_.
-    Status s = rpc::SaslDecode(sasl_conn_.get(), read_buf_, &read_slice_);
-    if (!s.ok()) {
-      throw SaslException(s);
-    }
-    ResetReadBuf();
-  } else {
-    read_slice_ = read_buf_;
-    read_slice_.remove_prefix(kFrameHeaderSize);
-  }
-}
-
-uint32_t SaslClientTransport::read(uint8_t* buf, uint32_t len) {
-  // If there is nothing left to read in the buffer, then fill it.
-  if (read_slice_.empty()) {
-    ReadFrame();
-  }
-
-  uint32_t n = std::min(read_slice_.size(), static_cast<size_t>(len));
-  memcpy(buf, read_slice_.data(), n);
-  read_slice_.remove_prefix(n);
-  if (read_slice_.empty()) {
-    ResetReadBuf();
-  }
-  return n;
-}
-
-void SaslClientTransport::write(const uint8_t* buf, uint32_t len) {
-  // Check that we've already preallocated space in the buffer for the frame-header.
-  DCHECK(write_buf_.size() >= kFrameHeaderSize);
-
-  // Check if the amount to write would overflow a frame.
-  while (write_buf_.size() + len > max_send_buf_size_) {
-    uint32_t n = max_send_buf_size_ - write_buf_.size();
-    write_buf_.append(buf, n);
-    flush();
-    buf += n;
-    len -= n;
-  }
-
-  write_buf_.append(buf, len);
-}
-
-void SaslClientTransport::flush() {
-  if (needs_wrap_) {
-    Slice plaintext(write_buf_);
-    plaintext.remove_prefix(kFrameHeaderSize);
-    Slice ciphertext;
-    Status s = rpc::SaslEncode(sasl_conn_.get(), plaintext, &ciphertext);
-    if (!s.ok()) {
-      throw SaslException(s);
-    }
-
-    // Note: when the SASL C library encodes the plaintext, it prefixes the
-    // ciphertext with the length. Since this happens to match the SASL/Thrift
-    // frame format, we can send the ciphertext unmodified to the remote server.
-    transport_->write(ciphertext.data(), ciphertext.size());
-  } else {
-    size_t payload_len = write_buf_.size() - kFrameHeaderSize;
-    NetworkByteOrder::Store32(write_buf_.data(), payload_len);
-    transport_->write(write_buf_.data(), write_buf_.size());
-  }
-
-  transport_->flush();
-  ResetWriteBuf();
-}
-
-void SaslClientTransport::Negotiate() {
-  SetupSaslContext();
-
-  faststring recv_buf;
-  SendSaslStart();
-
-  for (;;) {
-    NegotiationStatus status = ReceiveSaslMessage(&recv_buf);
-
-    if (status == TSASL_COMPLETE) {
-        throw SaslException(
-            Status::IllegalState("Received SASL COMPLETE status, but handshake is not finished"));
-    }
-    CHECK_EQ(status, TSASL_OK);
-
-    const char* out;
-    unsigned out_len;
-    Status s = WrapSaslCall(sasl_conn_.get(), [&] {
-        return sasl_client_step(sasl_conn_.get(),
-                                reinterpret_cast<const char*>(recv_buf.data()),
-                                recv_buf.size(),
-                                nullptr,
-                                &out,
-                                &out_len);
-    });
-
-    if (PREDICT_FALSE(!s.IsIncomplete() && !s.ok())) {
-      throw SaslException(std::move(s));
-    }
-
-    SendSaslMessage(status, Slice(out, out_len));
-    transport_->flush();
-
-    if (s.ok()) {
-      break;
-    }
-  }
-
-  NegotiationStatus status = ReceiveSaslMessage(&recv_buf);
-  if (status != TSASL_COMPLETE) {
-    throw SaslException(
-        Status::IllegalState("Received SASL OK status, but expected SASL COMPLETE"));
-  }
-  DCHECK_EQ(0, recv_buf.size());
-
-  needs_wrap_ = rpc::NeedsWrap(sasl_conn_.get());
-  max_send_buf_size_ = rpc::GetMaxSendBufferSize(sasl_conn_.get());
-  VLOG(2) << "Thrift SASL GSSAPI negotiation complete"
-          << "; needs wrap: " << (needs_wrap_ ? "true" : "false")
-          << ", max send frame length: "
-          << HumanReadableNumBytes::ToStringWithoutRounding(max_send_buf_size_)
-          << ", max receive frame length: "
-          << HumanReadableNumBytes::ToStringWithoutRounding(max_recv_buf_size_);
-}
-
-void SaslClientTransport::SendSaslMessage(NegotiationStatus status, Slice payload) {
-  uint8_t header[kSaslHeaderSize];
-  header[0] = status;
-  DCHECK_LT(payload.size(), std::numeric_limits<int32_t>::max());
-  NetworkByteOrder::Store32(&header[1], payload.size());
-  transport_->write(header, kSaslHeaderSize);
-  if (!payload.empty()) {
-    transport_->write(payload.data(), payload.size());
-  }
-}
-
-NegotiationStatus SaslClientTransport::ReceiveSaslMessage(faststring* payload) {
-  // Read the fixed-length message header.
-  uint8_t header[kSaslHeaderSize];
-  transport_->readAll(header, kSaslHeaderSize);
-  size_t len = NetworkByteOrder::Load32(&header[1]);
-
-  // Handle status errors.
-  switch (header[0]) {
-    case TSASL_OK:
-    case TSASL_COMPLETE: break;
-    case TSASL_BAD:
-    case TSASL_ERROR:
-      throw SaslException(Status::RuntimeError("SASL peer indicated failure"));
-    // The Thrift client should never receive TSASL_START.
-    case TSASL_START:
-    default:
-      throw SaslException(Status::RuntimeError("Unexpected SASL status",
-                                               std::to_string(header[0])));
-  }
-
-  // Read the message payload.
-  if (len > max_recv_buf_size_) {
-    throw SaslException(Status::RuntimeError(Substitute(
-            "SASL negotiation message payload exceeds maximum length: $0/$1",
-            HumanReadableNumBytes::ToString(len),
-            HumanReadableNumBytes::ToString(max_recv_buf_size_))));
-  }
-  payload->resize(len);
-  transport_->readAll(payload->data(), len);
-
-  return static_cast<NegotiationStatus>(header[0]);
-}
-
-void SaslClientTransport::SendSaslStart() {
-  const char* init_msg = nullptr;
-  unsigned init_msg_len = 0;
-  const char* negotiated_mech = nullptr;
-
-  Status s = WrapSaslCall(sasl_conn_.get(), [&] {
-      return sasl_client_start(
-          sasl_conn_.get(),            // The SASL connection context created by sasl_client_new()
-          SaslMechanism::name_of(SaslMechanism::GSSAPI), // The mechanism to use.
-          nullptr,                                       // Disables INTERACT return if NULL.
-          &init_msg,                                     // Filled in on success.
-          &init_msg_len,                                 // Filled in on success.
-          &negotiated_mech);                             // Filled in on success.
-  });
-
-  if (PREDICT_FALSE(!s.IsIncomplete() && !s.ok())) {
-    throw SaslException(std::move(s));
-  }
-
-  // Check that the SASL library is using the mechanism that we picked.
-  DCHECK_EQ(SaslMechanism::value_of(negotiated_mech), SaslMechanism::GSSAPI);
-  s = rpc::EnableProtection(sasl_conn_.get(),
-                            rpc::SaslProtection::kAuthentication,
-                            max_recv_buf_size_);
-  if (!s.ok()) {
-    throw SaslException(s);
-  }
-
-  // These two calls comprise a single message in the thrift-sasl protocol.
-  SendSaslMessage(TSASL_START, Slice(negotiated_mech));
-  SendSaslMessage(TSASL_OK, Slice(init_msg, init_msg_len));
-  transport_->flush();
-}
-
-int SaslClientTransport::GetOptionCb(const char* plugin_name, const char* option,
-                                     const char** result, unsigned* len) {
-  return sasl_helper_.GetOptionCb(plugin_name, option, result, len);
-}
-
-void SaslClientTransport::SetupSaslContext() {
-  sasl_conn_t* sasl_conn = nullptr;
-  Status s = WrapSaslCall(nullptr /* no conn */, [&] {
-      return sasl_client_new(
-          // TODO(dan): make the service name configurable.
-          "hive",                       // Registered name of the service using SASL. Required.
-          sasl_helper_.server_fqdn(),   // The fully qualified domain name of the remote server.
-          nullptr,                      // Local and remote IP address strings. (we don't use
-          nullptr,                      // any mechanisms which require this info.)
-          sasl_callbacks_.data(),       // Connection-specific callbacks.
-          0,                            // flags
-          &sasl_conn);
-      });
-  if (!s.ok()) {
-    throw SaslException(s);
-  }
-  sasl_conn_.reset(sasl_conn);
-}
-
-void SaslClientTransport::ResetReadBuf() {
-  read_buf_.clear();
-  read_buf_.shrink_to_fit();
-}
-
-void SaslClientTransport::ResetWriteBuf() {
-  write_buf_.resize(kFrameHeaderSize);
-  write_buf_.shrink_to_fit();
-}
-
-} // namespace hms
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/ddaa766e/src/kudu/hms/sasl_client_transport.h
----------------------------------------------------------------------
diff --git a/src/kudu/hms/sasl_client_transport.h b/src/kudu/hms/sasl_client_transport.h
deleted file mode 100644
index a2bc7f3..0000000
--- a/src/kudu/hms/sasl_client_transport.h
+++ /dev/null
@@ -1,176 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <cstddef>
-#include <cstdint>
-#include <memory>
-#include <string>
-#include <utility>
-#include <vector>
-
-#include <sasl/sasl.h>
-#include <thrift/transport/TTransportException.h>
-#include <thrift/transport/TVirtualTransport.h>
-
-#include "kudu/rpc/sasl_helper.h"
-#include "kudu/util/faststring.h"
-#include "kudu/util/slice.h"
-#include "kudu/util/status.h"
-
-namespace apache {
-namespace thrift {
-namespace transport {
-class TTransport;
-} // namespace transport
-} // namespace thrift
-} // namespace apache
-
-namespace kudu {
-namespace rpc {
-struct SaslDeleter;
-} // namespace rpc
-namespace hms {
-
-// An exception representing a SASL or Kerberos failure.
-class SaslException : public apache::thrift::transport::TTransportException {
- public:
-  explicit SaslException(Status status)
-    : TTransportException(status.ToString()),
-      status_(std::move(status)) {
-  }
-
-  const Status& status() const {
-    return status_;
-  }
-
- private:
-  Status status_;
-};
-
-// An enum describing the possible states of the SASL negotiation protocol.
-enum NegotiationStatus {
-  TSASL_INVALID = -1,
-  TSASL_START = 1,
-  TSASL_OK = 2,
-  TSASL_BAD = 3,
-  TSASL_ERROR = 4,
-  TSASL_COMPLETE = 5
-};
-
-// A Thrift transport which uses SASL GSSAPI to authenticate as a client to a
-// remote server.
-//
-// SaslClientTransport internally holds buffers, so it does not need the
-// underlying transport to be buffered.
-class SaslClientTransport
-    : public apache::thrift::transport::TVirtualTransport<SaslClientTransport> {
- public:
-  SaslClientTransport(const std::string& server_fqdn,
-                      std::shared_ptr<TTransport> transport,
-                      size_t max_recv_buf_size);
-
-  ~SaslClientTransport() override = default;
-
-  bool isOpen() override;
-
-  bool peek() override;
-
-  void open() override;
-
-  void close() override;
-
-  uint32_t read(uint8_t* buf, uint32_t len);
-
-  void write(const uint8_t* buf, uint32_t len);
-
-  void flush() override;
-
-  int GetOptionCb(const char* plugin_name, const char* option,
-                  const char** result, unsigned* len);
-
- private:
-
-  // Runs SASL negotiation with the remote server.
-  void Negotiate();
-
-  // Sends a SASL negotiation message to the underlying transport.
-  //
-  // Send a SASL negotiation message using the Thrift framing protocol:
-  //
-  // - 1 byte of status
-  // - 4 bytes of remaining length
-  // - var-len payload
-  void SendSaslMessage(NegotiationStatus status, Slice payload);
-
-  // Receives a SASL negotiation message from the underlying transport.
-  //
-  // The returned negotiation status will be of type OK or COMPLETE, all
-  // other statuses result in an exception.
-  NegotiationStatus ReceiveSaslMessage(faststring* payload);
-
-  // Initializes SASL state.
-  void SetupSaslContext();
-
-  // Sends the initial SASL connection message.
-  void SendSaslStart();
-
-  // Reads a frame from the underlying transport, storing the payload into
-  // read_slice_. If the connection is using SASL auth-conf or auth-int
-  // protection the data is automatically decoded.
-  void ReadFrame();
-
-  // Resets the read buffer to empty, and deallocates its internal buffer.
-  void ResetReadBuf();
-
-  // Resets the write buffer to the size of a frame header, and deallocates its
-  // internal buffer.
-  void ResetWriteBuf();
-
-  // The underlying transport. Typically a TCP socket.
-  std::shared_ptr<TTransport> transport_;
-
-  // SASL state.
-  rpc::SaslHelper sasl_helper_;
-  std::unique_ptr<sasl_conn_t, rpc::SaslDeleter> sasl_conn_;
-  std::vector<sasl_callback_t> sasl_callbacks_;
-
-  // Whether the connection is using auth-int or auth-conf protection.
-  bool needs_wrap_;
-
-  // The negotiated SASL maximum buffer sizes. These correspond to the maximum
-  // sized frames that can be received or sent.
-  //
-  // Note: the Java implementation of the Thrift SASL transport does not respect
-  // the negotiated maximum buffer size (THRIFT-4483) and never splits a message
-  // into multiple frames, so we end up having to set the recv buf size to match
-  // the largest serialized Thrift message we want to be able to receive.
-  size_t max_recv_buf_size_;
-  size_t max_send_buf_size_;
-
-  // The read buffer and slice. The slice points to the remaining frame data
-  // which hasn't been read yet.
-  faststring read_buf_;
-  Slice read_slice_;
-
-  // The write buffer.
-  faststring write_buf_;
-};
-
-} // namespace hms
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/ddaa766e/src/kudu/integration-tests/master_hms-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/master_hms-itest.cc b/src/kudu/integration-tests/master_hms-itest.cc
index 9b25dfb..1cd33b8 100644
--- a/src/kudu/integration-tests/master_hms-itest.cc
+++ b/src/kudu/integration-tests/master_hms-itest.cc
@@ -39,6 +39,7 @@
 #include "kudu/mini-cluster/external_mini_cluster.h"
 #include "kudu/mini-cluster/mini_cluster.h"
 #include "kudu/security/test/mini_kdc.h"
+#include "kudu/thrift/client.h"
 #include "kudu/util/decimal_util.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/status.h"
@@ -57,7 +58,6 @@ using client::KuduTableCreator;
 using client::sp::shared_ptr;
 using cluster::ExternalMiniClusterOptions;
 using hms::HmsClient;
-using hms::HmsClientOptions;
 using std::string;
 using std::unique_ptr;
 using std::vector;
@@ -79,7 +79,7 @@ class MasterHmsTest : public ExternalMiniClusterITestBase {
     opts.extra_master_flags.emplace_back("--hive_metastore_notification_log_poll_period_seconds=1");
     StartClusterWithOpts(std::move(opts));
 
-    HmsClientOptions hms_opts;
+    thrift::ClientOptions hms_opts;
     hms_opts.enable_kerberos = EnableKerberos();
     hms_client_.reset(new HmsClient(cluster_->hms()->address(), hms_opts));
     ASSERT_OK(hms_client_->Start());

http://git-wip-us.apache.org/repos/asf/kudu/blob/ddaa766e/src/kudu/mini-cluster/external_mini_cluster-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/external_mini_cluster-test.cc b/src/kudu/mini-cluster/external_mini_cluster-test.cc
index ff99c80..e337499 100644
--- a/src/kudu/mini-cluster/external_mini_cluster-test.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster-test.cc
@@ -34,6 +34,7 @@
 #include "kudu/hms/mini_hms.h"
 #include "kudu/mini-cluster/mini_cluster.h"
 #include "kudu/security/test/mini_kdc.h"
+#include "kudu/thrift/client.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/status.h"
@@ -193,7 +194,7 @@ TEST_P(ExternalMiniClusterTest, TestBasicOperation) {
 
   // Verify that the HMS is reachable.
   if (opts.hms_mode == HmsMode::ENABLE_HIVE_METASTORE) {
-    hms::HmsClientOptions hms_client_opts;
+    thrift::ClientOptions hms_client_opts;
     hms_client_opts.enable_kerberos = opts.enable_kerberos;
     hms::HmsClient hms_client(cluster.hms()->address(), hms_client_opts);
     ASSERT_OK(hms_client.Start());

http://git-wip-us.apache.org/repos/asf/kudu/blob/ddaa766e/src/kudu/thrift/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/thrift/CMakeLists.txt b/src/kudu/thrift/CMakeLists.txt
new file mode 100644
index 0000000..c956fa4
--- /dev/null
+++ b/src/kudu/thrift/CMakeLists.txt
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##############################
+# kudu_thrift
+##############################
+
+set(THRIFT_SRCS
+  client.cc
+  sasl_client_transport.cc)
+set(THRIFT_DEPS
+  gflags
+  glog
+  krpc
+  kudu_util
+  thrift)
+
+add_library(kudu_thrift ${THRIFT_SRCS})
+target_link_libraries(kudu_thrift ${THRIFT_DEPS})

http://git-wip-us.apache.org/repos/asf/kudu/blob/ddaa766e/src/kudu/thrift/client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/thrift/client.cc b/src/kudu/thrift/client.cc
new file mode 100644
index 0000000..7c109db
--- /dev/null
+++ b/src/kudu/thrift/client.cc
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/thrift/client.h"
+
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <utility>
+
+#include <glog/logging.h>
+#include <thrift/TOutput.h>
+#include <thrift/protocol/TBinaryProtocol.h>
+#include <thrift/transport/TBufferTransports.h>
+#include <thrift/transport/TSocket.h>
+
+#include "kudu/thrift/sasl_client_transport.h"
+#include "kudu/util/net/net_util.h"
+
+namespace apache {
+namespace thrift {
+namespace transport {
+class TTransport;
+}  // namespace transport
+}  // namespace thrift
+}  // namespace apache
+
+using apache::thrift::protocol::TBinaryProtocol;
+using apache::thrift::protocol::TProtocol;
+using apache::thrift::transport::TBufferedTransport;
+using apache::thrift::transport::TSocket;
+using apache::thrift::transport::TTransport;
+using std::shared_ptr;
+using std::make_shared;
+
+namespace kudu {
+namespace thrift {
+
+namespace {
+// A logging callback for Thrift.
+void ThriftOutputFunction(const char* output) {
+  LOG(INFO) << output;
+}
+} // anonymous namespace
+
+shared_ptr<TProtocol> CreateClientProtocol(const HostPort& address, const ClientOptions& options) {
+  // Initialize the global Thrift logging callback.
+  static std::once_flag set_thrift_logging_callback;
+  std::call_once(set_thrift_logging_callback, [] {
+      apache::thrift::GlobalOutput.setOutputFunction(ThriftOutputFunction);
+  });
+
+  auto socket = make_shared<TSocket>(address.host(), address.port());
+  socket->setSendTimeout(options.send_timeout.ToMilliseconds());
+  socket->setRecvTimeout(options.recv_timeout.ToMilliseconds());
+  socket->setConnTimeout(options.conn_timeout.ToMilliseconds());
+  shared_ptr<TTransport> transport;
+
+  if (options.enable_kerberos) {
+    transport = make_shared<SaslClientTransport>(address.host(),
+                                                 std::move(socket),
+                                                 options.max_buf_size);
+  } else {
+    transport = make_shared<TBufferedTransport>(std::move(socket));
+  }
+
+  return make_shared<TBinaryProtocol>(std::move(transport));
+}
+
+} // namespace thrift
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/ddaa766e/src/kudu/thrift/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/thrift/client.h b/src/kudu/thrift/client.h
new file mode 100644
index 0000000..8b6a7d8
--- /dev/null
+++ b/src/kudu/thrift/client.h
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Utilities for working with Thrift clients.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+
+#include "kudu/util/monotime.h"
+
+namespace apache {
+namespace thrift {
+namespace protocol {
+class TProtocol;
+} // namespace protocol
+} // namespace thrift
+} // namespace apache
+
+namespace kudu {
+
+class HostPort;
+
+namespace thrift {
+
+// Options for a Thrift client connection.
+struct ClientOptions {
+
+  // Thrift socket send timeout
+  MonoDelta send_timeout = MonoDelta::FromSeconds(60);
+
+  // Thrift socket receive timeout.
+  MonoDelta recv_timeout = MonoDelta::FromSeconds(60);
+
+  // Thrift socket connect timeout.
+  MonoDelta conn_timeout = MonoDelta::FromSeconds(60);
+
+  // Whether to use SASL Kerberos authentication.
+  bool enable_kerberos = false;
+
+  // Maximum size of objects which can be received on the Thrift connection.
+  // Defaults to 100MiB to match Thrift TSaslTransport.receiveSaslMessage.
+  int32_t max_buf_size = 100 * 1024 * 1024;
+};
+
+std::shared_ptr<apache::thrift::protocol::TProtocol> CreateClientProtocol(
+    const HostPort& address, const ClientOptions& options);
+
+} // namespace thrift
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/ddaa766e/src/kudu/thrift/sasl_client_transport.cc
----------------------------------------------------------------------
diff --git a/src/kudu/thrift/sasl_client_transport.cc b/src/kudu/thrift/sasl_client_transport.cc
new file mode 100644
index 0000000..8a8d5a9
--- /dev/null
+++ b/src/kudu/thrift/sasl_client_transport.cc
@@ -0,0 +1,402 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/thrift/sasl_client_transport.h"
+
+#include <algorithm>
+#include <cstring>
+#include <limits>
+#include <memory>
+#include <ostream>
+#include <string>
+
+#include <glog/logging.h>
+#include <thrift/transport/TTransport.h>
+
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/human_readable.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/sasl_helper.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+using apache::thrift::transport::TTransportException;
+using std::shared_ptr;
+using std::string;
+using strings::Substitute;
+
+namespace kudu {
+
+using rpc::SaslMechanism;
+using rpc::WrapSaslCall;
+
+namespace thrift {
+
+namespace {
+
+// SASL negotiation frames are sent with an 8-bit status and a 32-bit length.
+const uint32_t kSaslHeaderSize = sizeof(uint8_t) + sizeof(uint32_t);
+
+// Frame headers consist of a 32-bit length.
+const uint32_t kFrameHeaderSize = sizeof(uint32_t);
+
+// SASL SASL_CB_GETOPT callback function.
+int GetoptCb(SaslClientTransport* client_transport,
+             const char* plugin_name,
+             const char* option,
+             const char** result,
+             unsigned* len) {
+  return client_transport->GetOptionCb(plugin_name, option, result, len);
+}
+
+// SASL SASL_CB_CANON_USER callback function.
+int CanonUserCb(sasl_conn_t* /*conn*/,
+                void* /*context*/,
+                const char* in, unsigned inlen,
+                unsigned /*flags*/,
+                const char* /*user_realm*/,
+                char* out, unsigned out_max, unsigned* out_len) {
+  CHECK_LE(inlen, out_max);
+  memcpy(out, in, inlen);
+  *out_len = inlen;
+  return SASL_OK;
+}
+
+// SASL SASL_CB_USER callback function.
+int UserCb(void* /*context*/, int id, const char** result, unsigned* len) {
+  CHECK_EQ(SASL_CB_USER, id);
+
+  // Setting the username to the empty string causes the remote end to use the
+  // clients Kerberos principal, which is correct.
+  *result = "";
+  if (len != nullptr) *len = 0;
+  return SASL_OK;
+}
+} // anonymous namespace
+
+SaslClientTransport::SaslClientTransport(const string& server_fqdn,
+                                         shared_ptr<TTransport> transport,
+                                         size_t max_recv_buf_size)
+    : transport_(std::move(transport)),
+      sasl_helper_(rpc::SaslHelper::CLIENT),
+      sasl_callbacks_({
+          rpc::SaslBuildCallback(SASL_CB_GETOPT, reinterpret_cast<int (*)()>(&GetoptCb), this),
+          rpc::SaslBuildCallback(SASL_CB_CANON_USER,
+                                 reinterpret_cast<int (*)()>(&CanonUserCb),
+                                 this),
+          rpc::SaslBuildCallback(SASL_CB_USER, reinterpret_cast<int (*)()>(&UserCb), nullptr),
+          rpc::SaslBuildCallback(SASL_CB_LIST_END, nullptr, nullptr)
+      }),
+      needs_wrap_(false),
+      max_recv_buf_size_(max_recv_buf_size),
+      // Set a reasonable max send buffer size for negotiation. Once negotiation
+      // is complete the negotiated value will be used.
+      max_send_buf_size_(64 * 1024) {
+  sasl_helper_.set_server_fqdn(server_fqdn);
+  sasl_helper_.EnableGSSAPI();
+  ResetWriteBuf();
+}
+
+bool SaslClientTransport::isOpen() {
+  return transport_->isOpen();
+}
+
+bool SaslClientTransport::peek() {
+  return !read_slice_.empty() || transport_->peek();
+}
+
+void SaslClientTransport::open() {
+  transport_->open();
+  DCHECK(transport_->isOpen());
+  try {
+    Negotiate();
+  } catch (...) {
+    transport_->close();
+    throw;
+  }
+}
+
+void SaslClientTransport::close() {
+  transport_->close();
+  sasl_conn_.reset();
+}
+
+void SaslClientTransport::ReadFrame() {
+  DCHECK_EQ(0, read_buf_.size());
+  DCHECK(read_slice_.empty());
+
+  uint8_t payload_len_buf[kFrameHeaderSize];
+  transport_->readAll(payload_len_buf, kFrameHeaderSize);
+  size_t payload_len = NetworkByteOrder::Load32(payload_len_buf);
+
+  if (payload_len > 1024 * 1024) {
+    KLOG_EVERY_N_SECS(WARNING, 60) << "Received large Thrift SASL frame: "
+                                   << HumanReadableNumBytes::ToString(payload_len);
+    if (payload_len > max_recv_buf_size_) {
+      throw TTransportException(Substitute("Thrift SASL frame is too long: $0/$1",
+                                           HumanReadableNumBytes::ToString(payload_len),
+                                           HumanReadableNumBytes::ToString(max_recv_buf_size_)));
+    }
+  }
+
+  read_buf_.reserve(kFrameHeaderSize + payload_len);
+  read_buf_.append(payload_len_buf, kFrameHeaderSize);
+  read_buf_.resize(kFrameHeaderSize + payload_len);
+  transport_->readAll(&read_buf_.data()[kFrameHeaderSize], payload_len);
+
+  if (needs_wrap_) {
+    // Point read_slice_ directly at the SASL library's internal buffer. This
+    // avoids having to copy the decoded data back into read_buf_.
+    Status s = rpc::SaslDecode(sasl_conn_.get(), read_buf_, &read_slice_);
+    if (!s.ok()) {
+      throw SaslException(s);
+    }
+    ResetReadBuf();
+  } else {
+    read_slice_ = read_buf_;
+    read_slice_.remove_prefix(kFrameHeaderSize);
+  }
+}
+
+uint32_t SaslClientTransport::read(uint8_t* buf, uint32_t len) {
+  // If there is nothing left to read in the buffer, then fill it.
+  if (read_slice_.empty()) {
+    ReadFrame();
+  }
+
+  uint32_t n = std::min(read_slice_.size(), static_cast<size_t>(len));
+  memcpy(buf, read_slice_.data(), n);
+  read_slice_.remove_prefix(n);
+  if (read_slice_.empty()) {
+    ResetReadBuf();
+  }
+  return n;
+}
+
+void SaslClientTransport::write(const uint8_t* buf, uint32_t len) {
+  // Check that we've already preallocated space in the buffer for the frame-header.
+  DCHECK(write_buf_.size() >= kFrameHeaderSize);
+
+  // Check if the amount to write would overflow a frame.
+  while (write_buf_.size() + len > max_send_buf_size_) {
+    uint32_t n = max_send_buf_size_ - write_buf_.size();
+    write_buf_.append(buf, n);
+    flush();
+    buf += n;
+    len -= n;
+  }
+
+  write_buf_.append(buf, len);
+}
+
+void SaslClientTransport::flush() {
+  if (needs_wrap_) {
+    Slice plaintext(write_buf_);
+    plaintext.remove_prefix(kFrameHeaderSize);
+    Slice ciphertext;
+    Status s = rpc::SaslEncode(sasl_conn_.get(), plaintext, &ciphertext);
+    if (!s.ok()) {
+      throw SaslException(s);
+    }
+
+    // Note: when the SASL C library encodes the plaintext, it prefixes the
+    // ciphertext with the length. Since this happens to match the SASL/Thrift
+    // frame format, we can send the ciphertext unmodified to the remote server.
+    transport_->write(ciphertext.data(), ciphertext.size());
+  } else {
+    size_t payload_len = write_buf_.size() - kFrameHeaderSize;
+    NetworkByteOrder::Store32(write_buf_.data(), payload_len);
+    transport_->write(write_buf_.data(), write_buf_.size());
+  }
+
+  transport_->flush();
+  ResetWriteBuf();
+}
+
+void SaslClientTransport::Negotiate() {
+  SetupSaslContext();
+
+  faststring recv_buf;
+  SendSaslStart();
+
+  for (;;) {
+    NegotiationStatus status = ReceiveSaslMessage(&recv_buf);
+
+    if (status == TSASL_COMPLETE) {
+        throw SaslException(
+            Status::IllegalState("Received SASL COMPLETE status, but handshake is not finished"));
+    }
+    CHECK_EQ(status, TSASL_OK);
+
+    const char* out;
+    unsigned out_len;
+    Status s = WrapSaslCall(sasl_conn_.get(), [&] {
+        return sasl_client_step(sasl_conn_.get(),
+                                reinterpret_cast<const char*>(recv_buf.data()),
+                                recv_buf.size(),
+                                nullptr,
+                                &out,
+                                &out_len);
+    });
+
+    if (PREDICT_FALSE(!s.IsIncomplete() && !s.ok())) {
+      throw SaslException(std::move(s));
+    }
+
+    SendSaslMessage(status, Slice(out, out_len));
+    transport_->flush();
+
+    if (s.ok()) {
+      break;
+    }
+  }
+
+  NegotiationStatus status = ReceiveSaslMessage(&recv_buf);
+  if (status != TSASL_COMPLETE) {
+    throw SaslException(
+        Status::IllegalState("Received SASL OK status, but expected SASL COMPLETE"));
+  }
+  DCHECK_EQ(0, recv_buf.size());
+
+  needs_wrap_ = rpc::NeedsWrap(sasl_conn_.get());
+  max_send_buf_size_ = rpc::GetMaxSendBufferSize(sasl_conn_.get());
+  VLOG(2) << "Thrift SASL GSSAPI negotiation complete"
+          << "; needs wrap: " << (needs_wrap_ ? "true" : "false")
+          << ", max send frame length: "
+          << HumanReadableNumBytes::ToStringWithoutRounding(max_send_buf_size_)
+          << ", max receive frame length: "
+          << HumanReadableNumBytes::ToStringWithoutRounding(max_recv_buf_size_);
+}
+
+void SaslClientTransport::SendSaslMessage(NegotiationStatus status, Slice payload) {
+  uint8_t header[kSaslHeaderSize];
+  header[0] = status;
+  DCHECK_LT(payload.size(), std::numeric_limits<int32_t>::max());
+  NetworkByteOrder::Store32(&header[1], payload.size());
+  transport_->write(header, kSaslHeaderSize);
+  if (!payload.empty()) {
+    transport_->write(payload.data(), payload.size());
+  }
+}
+
+NegotiationStatus SaslClientTransport::ReceiveSaslMessage(faststring* payload) {
+  // Read the fixed-length message header.
+  uint8_t header[kSaslHeaderSize];
+  transport_->readAll(header, kSaslHeaderSize);
+  size_t len = NetworkByteOrder::Load32(&header[1]);
+
+  // Handle status errors.
+  switch (header[0]) {
+    case TSASL_OK:
+    case TSASL_COMPLETE: break;
+    case TSASL_BAD:
+    case TSASL_ERROR:
+      throw SaslException(Status::RuntimeError("SASL peer indicated failure"));
+    // The Thrift client should never receive TSASL_START.
+    case TSASL_START:
+    default:
+      throw SaslException(Status::RuntimeError("Unexpected SASL status",
+                                               std::to_string(header[0])));
+  }
+
+  // Read the message payload.
+  if (len > max_recv_buf_size_) {
+    throw SaslException(Status::RuntimeError(Substitute(
+            "SASL negotiation message payload exceeds maximum length: $0/$1",
+            HumanReadableNumBytes::ToString(len),
+            HumanReadableNumBytes::ToString(max_recv_buf_size_))));
+  }
+  payload->resize(len);
+  transport_->readAll(payload->data(), len);
+
+  return static_cast<NegotiationStatus>(header[0]);
+}
+
+void SaslClientTransport::SendSaslStart() {
+  const char* init_msg = nullptr;
+  unsigned init_msg_len = 0;
+  const char* negotiated_mech = nullptr;
+
+  Status s = WrapSaslCall(sasl_conn_.get(), [&] {
+      return sasl_client_start(
+          sasl_conn_.get(),            // The SASL connection context created by sasl_client_new()
+          SaslMechanism::name_of(SaslMechanism::GSSAPI), // The mechanism to use.
+          nullptr,                                       // Disables INTERACT return if NULL.
+          &init_msg,                                     // Filled in on success.
+          &init_msg_len,                                 // Filled in on success.
+          &negotiated_mech);                             // Filled in on success.
+  });
+
+  if (PREDICT_FALSE(!s.IsIncomplete() && !s.ok())) {
+    throw SaslException(std::move(s));
+  }
+
+  // Check that the SASL library is using the mechanism that we picked.
+  DCHECK_EQ(SaslMechanism::value_of(negotiated_mech), SaslMechanism::GSSAPI);
+  s = rpc::EnableProtection(sasl_conn_.get(),
+                            rpc::SaslProtection::kAuthentication,
+                            max_recv_buf_size_);
+  if (!s.ok()) {
+    throw SaslException(s);
+  }
+
+  // These two calls comprise a single message in the thrift-sasl protocol.
+  SendSaslMessage(TSASL_START, Slice(negotiated_mech));
+  SendSaslMessage(TSASL_OK, Slice(init_msg, init_msg_len));
+  transport_->flush();
+}
+
+int SaslClientTransport::GetOptionCb(const char* plugin_name, const char* option,
+                                     const char** result, unsigned* len) {
+  return sasl_helper_.GetOptionCb(plugin_name, option, result, len);
+}
+
+void SaslClientTransport::SetupSaslContext() {
+  sasl_conn_t* sasl_conn = nullptr;
+  Status s = WrapSaslCall(nullptr /* no conn */, [&] {
+      return sasl_client_new(
+          // TODO(dan): make the service name configurable.
+          "hive",                       // Registered name of the service using SASL. Required.
+          sasl_helper_.server_fqdn(),   // The fully qualified domain name of the remote server.
+          nullptr,                      // Local and remote IP address strings. (we don't use
+          nullptr,                      // any mechanisms which require this info.)
+          sasl_callbacks_.data(),       // Connection-specific callbacks.
+          0,                            // flags
+          &sasl_conn);
+      });
+  if (!s.ok()) {
+    throw SaslException(s);
+  }
+  sasl_conn_.reset(sasl_conn);
+}
+
+void SaslClientTransport::ResetReadBuf() {
+  read_buf_.clear();
+  read_buf_.shrink_to_fit();
+}
+
+void SaslClientTransport::ResetWriteBuf() {
+  write_buf_.resize(kFrameHeaderSize);
+  write_buf_.shrink_to_fit();
+}
+
+} // namespace thrift
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/ddaa766e/src/kudu/thrift/sasl_client_transport.h
----------------------------------------------------------------------
diff --git a/src/kudu/thrift/sasl_client_transport.h b/src/kudu/thrift/sasl_client_transport.h
new file mode 100644
index 0000000..ef9936f
--- /dev/null
+++ b/src/kudu/thrift/sasl_client_transport.h
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <sasl/sasl.h>
+#include <thrift/transport/TTransportException.h>
+#include <thrift/transport/TVirtualTransport.h>
+
+#include "kudu/rpc/sasl_helper.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+namespace apache {
+namespace thrift {
+namespace transport {
+class TTransport;
+} // namespace transport
+} // namespace thrift
+} // namespace apache
+
+namespace kudu {
+namespace rpc {
+struct SaslDeleter;
+} // namespace rpc
+namespace thrift {
+
+// An exception representing a SASL or Kerberos failure.
+class SaslException : public apache::thrift::transport::TTransportException {
+ public:
+  explicit SaslException(Status status)
+    : TTransportException(status.ToString()),
+      status_(std::move(status)) {
+  }
+
+  const Status& status() const {
+    return status_;
+  }
+
+ private:
+  Status status_;
+};
+
+// An enum describing the possible states of the SASL negotiation protocol.
+enum NegotiationStatus {
+  TSASL_INVALID = -1,
+  TSASL_START = 1,
+  TSASL_OK = 2,
+  TSASL_BAD = 3,
+  TSASL_ERROR = 4,
+  TSASL_COMPLETE = 5
+};
+
+// A Thrift transport which uses SASL GSSAPI to authenticate as a client to a
+// remote server.
+//
+// SaslClientTransport internally holds buffers, so it does not need the
+// underlying transport to be buffered.
+class SaslClientTransport
+    : public apache::thrift::transport::TVirtualTransport<SaslClientTransport> {
+ public:
+  SaslClientTransport(const std::string& server_fqdn,
+                      std::shared_ptr<TTransport> transport,
+                      size_t max_recv_buf_size);
+
+  ~SaslClientTransport() override = default;
+
+  bool isOpen() override;
+
+  bool peek() override;
+
+  void open() override;
+
+  void close() override;
+
+  uint32_t read(uint8_t* buf, uint32_t len);
+
+  void write(const uint8_t* buf, uint32_t len);
+
+  void flush() override;
+
+  int GetOptionCb(const char* plugin_name, const char* option,
+                  const char** result, unsigned* len);
+
+ private:
+
+  // Runs SASL negotiation with the remote server.
+  void Negotiate();
+
+  // Sends a SASL negotiation message to the underlying transport.
+  //
+  // Send a SASL negotiation message using the Thrift framing protocol:
+  //
+  // - 1 byte of status
+  // - 4 bytes of remaining length
+  // - var-len payload
+  void SendSaslMessage(NegotiationStatus status, Slice payload);
+
+  // Receives a SASL negotiation message from the underlying transport.
+  //
+  // The returned negotiation status will be of type OK or COMPLETE, all
+  // other statuses result in an exception.
+  NegotiationStatus ReceiveSaslMessage(faststring* payload);
+
+  // Initializes SASL state.
+  void SetupSaslContext();
+
+  // Sends the initial SASL connection message.
+  void SendSaslStart();
+
+  // Reads a frame from the underlying transport, storing the payload into
+  // read_slice_. If the connection is using SASL auth-conf or auth-int
+  // protection the data is automatically decoded.
+  void ReadFrame();
+
+  // Resets the read buffer to empty, and deallocates its internal buffer.
+  void ResetReadBuf();
+
+  // Resets the write buffer to the size of a frame header, and deallocates its
+  // internal buffer.
+  void ResetWriteBuf();
+
+  // The underlying transport. Typically a TCP socket.
+  std::shared_ptr<TTransport> transport_;
+
+  // SASL state.
+  rpc::SaslHelper sasl_helper_;
+  std::unique_ptr<sasl_conn_t, rpc::SaslDeleter> sasl_conn_;
+  std::vector<sasl_callback_t> sasl_callbacks_;
+
+  // Whether the connection is using auth-int or auth-conf protection.
+  bool needs_wrap_;
+
+  // The negotiated SASL maximum buffer sizes. These correspond to the maximum
+  // sized frames that can be received or sent.
+  //
+  // Note: the Java implementation of the Thrift SASL transport does not respect
+  // the negotiated maximum buffer size (THRIFT-4483) and never splits a message
+  // into multiple frames, so we end up having to set the recv buf size to match
+  // the largest serialized Thrift message we want to be able to receive.
+  size_t max_recv_buf_size_;
+  size_t max_send_buf_size_;
+
+  // The read buffer and slice. The slice points to the remaining frame data
+  // which hasn't been read yet.
+  faststring read_buf_;
+  Slice read_slice_;
+
+  // The write buffer.
+  faststring write_buf_;
+};
+
+} // namespace thrift
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/ddaa766e/src/kudu/tools/kudu-tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 66b8648..4df871a 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -70,13 +70,13 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/escaping.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/strip.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/gutil/strings/escaping.h"
 #include "kudu/gutil/strings/util.h"
 #include "kudu/hms/hive_metastore_types.h"
 #include "kudu/hms/hms_catalog.h"
@@ -97,6 +97,7 @@
 #include "kudu/tablet/tablet.pb.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/tablet_replica.h"
+#include "kudu/thrift/client.h"
 #include "kudu/tools/tool.pb.h"
 #include "kudu/tools/tool_action_common.h"
 #include "kudu/tools/tool_replica_util.h"
@@ -159,7 +160,6 @@ using kudu::fs::FsReport;
 using kudu::fs::WritableBlock;
 using kudu::hms::HmsCatalog;
 using kudu::hms::HmsClient;
-using kudu::hms::HmsClientOptions;
 using kudu::itest::MiniClusterFsInspector;
 using kudu::itest::TServerDetails;
 using kudu::log::Log;
@@ -2410,7 +2410,7 @@ TEST_P(ToolTestKerberosParameterized, TestHmsDowngrade) {
   NO_FATALS(StartExternalMiniCluster(std::move(opts)));
 
   string master_addr = cluster_->master()->bound_rpc_addr().ToString();
-  HmsClientOptions hms_opts;
+  thrift::ClientOptions hms_opts;
   hms_opts.enable_kerberos = EnableKerberos();
   HmsClient hms_client(cluster_->hms()->address(), hms_opts);
   ASSERT_OK(hms_client.Start());
@@ -2455,7 +2455,7 @@ TEST_P(ToolTestKerberosParameterized, TestCheckAndAutomaticFixHmsMetadata) {
   NO_FATALS(StartExternalMiniCluster(std::move(opts)));
 
   string master_addr = cluster_->master()->bound_rpc_addr().ToString();
-  HmsClientOptions hms_opts;
+  thrift::ClientOptions hms_opts;
   hms_opts.enable_kerberos = EnableKerberos();
   HmsClient hms_client(cluster_->hms()->address(), hms_opts);
   ASSERT_OK(hms_client.Start());
@@ -2726,7 +2726,7 @@ TEST_P(ToolTestKerberosParameterized, TestCheckAndManualFixHmsMetadata) {
   NO_FATALS(StartExternalMiniCluster(std::move(opts)));
 
   string master_addr = cluster_->master()->bound_rpc_addr().ToString();
-  HmsClientOptions hms_opts;
+  thrift::ClientOptions hms_opts;
   hms_opts.enable_kerberos = EnableKerberos();
   HmsClient hms_client(cluster_->hms()->address(), hms_opts);
   ASSERT_OK(hms_client.Start());