You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/17 03:14:31 UTC

[15/15] incubator-impala git commit: IMPALA-4669: [KRPC] Import RPC library from kudu@314c9d8

IMPALA-4669: [KRPC] Import RPC library from kudu@314c9d8

Change-Id: I06ab5b56312e482a27fa484414c338438ad6972c
Reviewed-on: http://gerrit.cloudera.org:8080/5718
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c7db60aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c7db60aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c7db60aa

Branch: refs/heads/master
Commit: c7db60aa46565c19634e8a791df3af8d116b9017
Parents: 852e1bb
Author: Henry Robinson <he...@cloudera.com>
Authored: Fri Oct 28 17:10:46 2016 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Aug 17 03:13:20 2017 +0000

----------------------------------------------------------------------
 be/src/kudu/rpc/CMakeLists.txt           |  128 +++
 be/src/kudu/rpc/acceptor_pool.cc         |  166 +++
 be/src/kudu/rpc/acceptor_pool.h          |   79 ++
 be/src/kudu/rpc/blocking_ops.cc          |  127 +++
 be/src/kudu/rpc/blocking_ops.h           |   62 ++
 be/src/kudu/rpc/client_negotiation.cc    |  776 ++++++++++++++
 be/src/kudu/rpc/client_negotiation.h     |  252 +++++
 be/src/kudu/rpc/connection.cc            |  732 ++++++++++++++
 be/src/kudu/rpc/connection.h             |  360 +++++++
 be/src/kudu/rpc/constants.cc             |   38 +
 be/src/kudu/rpc/constants.h              |   63 ++
 be/src/kudu/rpc/exactly_once_rpc-test.cc |  589 +++++++++++
 be/src/kudu/rpc/inbound_call.cc          |  322 ++++++
 be/src/kudu/rpc/inbound_call.h           |  269 +++++
 be/src/kudu/rpc/messenger.cc             |  488 +++++++++
 be/src/kudu/rpc/messenger.h              |  354 +++++++
 be/src/kudu/rpc/mt-rpc-test.cc           |  291 ++++++
 be/src/kudu/rpc/negotiation-test.cc      | 1331 +++++++++++++++++++++++++
 be/src/kudu/rpc/negotiation.cc           |  317 ++++++
 be/src/kudu/rpc/negotiation.h            |   56 ++
 be/src/kudu/rpc/outbound_call.cc         |  509 ++++++++++
 be/src/kudu/rpc/outbound_call.h          |  363 +++++++
 be/src/kudu/rpc/protoc-gen-krpc.cc       |  674 +++++++++++++
 be/src/kudu/rpc/proxy.cc                 |  115 +++
 be/src/kudu/rpc/proxy.h                  |  121 +++
 be/src/kudu/rpc/reactor-test.cc          |   98 ++
 be/src/kudu/rpc/reactor.cc               |  750 ++++++++++++++
 be/src/kudu/rpc/reactor.h                |  370 +++++++
 be/src/kudu/rpc/remote_method.cc         |   49 +
 be/src/kudu/rpc/remote_method.h          |   51 +
 be/src/kudu/rpc/remote_user.cc           |   41 +
 be/src/kudu/rpc/remote_user.h            |   98 ++
 be/src/kudu/rpc/request_tracker-test.cc  |   83 ++
 be/src/kudu/rpc/request_tracker.cc       |   53 +
 be/src/kudu/rpc/request_tracker.h        |   85 ++
 be/src/kudu/rpc/response_callback.h      |   31 +
 be/src/kudu/rpc/result_tracker.cc        |  582 +++++++++++
 be/src/kudu/rpc/result_tracker.h         |  399 ++++++++
 be/src/kudu/rpc/retriable_rpc.h          |  296 ++++++
 be/src/kudu/rpc/rpc-bench.cc             |  260 +++++
 be/src/kudu/rpc/rpc-test-base.h          |  585 +++++++++++
 be/src/kudu/rpc/rpc-test.cc              |  808 +++++++++++++++
 be/src/kudu/rpc/rpc.cc                   |   96 ++
 be/src/kudu/rpc/rpc.h                    |  218 ++++
 be/src/kudu/rpc/rpc_context.cc           |  208 ++++
 be/src/kudu/rpc/rpc_context.h            |  224 +++++
 be/src/kudu/rpc/rpc_controller.cc        |  149 +++
 be/src/kudu/rpc/rpc_controller.h         |  256 +++++
 be/src/kudu/rpc/rpc_header.proto         |  365 +++++++
 be/src/kudu/rpc/rpc_introspection.proto  |  108 ++
 be/src/kudu/rpc/rpc_service.h            |   47 +
 be/src/kudu/rpc/rpc_sidecar.cc           |  102 ++
 be/src/kudu/rpc/rpc_sidecar.h            |   68 ++
 be/src/kudu/rpc/rpc_stub-test.cc         |  679 +++++++++++++
 be/src/kudu/rpc/rpcz_store.cc            |  255 +++++
 be/src/kudu/rpc/rpcz_store.h             |   74 ++
 be/src/kudu/rpc/rtest.proto              |  150 +++
 be/src/kudu/rpc/rtest_diff_package.proto |   26 +
 be/src/kudu/rpc/sasl_common.cc           |  459 +++++++++
 be/src/kudu/rpc/sasl_common.h            |  126 +++
 be/src/kudu/rpc/sasl_helper.cc           |  134 +++
 be/src/kudu/rpc/sasl_helper.h            |  109 ++
 be/src/kudu/rpc/serialization.cc         |  199 ++++
 be/src/kudu/rpc/serialization.h          |   88 ++
 be/src/kudu/rpc/server_negotiation.cc    |  980 ++++++++++++++++++
 be/src/kudu/rpc/server_negotiation.h     |  248 +++++
 be/src/kudu/rpc/service_if.cc            |  149 +++
 be/src/kudu/rpc/service_if.h             |  137 +++
 be/src/kudu/rpc/service_pool.cc          |  219 ++++
 be/src/kudu/rpc/service_pool.h           |   98 ++
 be/src/kudu/rpc/service_queue-test.cc    |  144 +++
 be/src/kudu/rpc/service_queue.cc         |  142 +++
 be/src/kudu/rpc/service_queue.h          |  215 ++++
 be/src/kudu/rpc/transfer.cc              |  264 +++++
 be/src/kudu/rpc/transfer.h               |  203 ++++
 be/src/kudu/rpc/user_credentials.cc      |   57 ++
 be/src/kudu/rpc/user_credentials.h       |   47 +
 77 files changed, 20264 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/CMakeLists.txt b/be/src/kudu/rpc/CMakeLists.txt
new file mode 100644
index 0000000..0cfe6e9
--- /dev/null
+++ b/be/src/kudu/rpc/CMakeLists.txt
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#### Global header protobufs
+PROTOBUF_GENERATE_CPP(
+  RPC_HEADER_PROTO_SRCS RPC_HEADER_PROTO_HDRS RPC_HEADER_PROTO_TGTS
+  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+  BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+  PROTO_FILES rpc_header.proto)
+ADD_EXPORTABLE_LIBRARY(rpc_header_proto
+  SRCS ${RPC_HEADER_PROTO_SRCS}
+  DEPS protobuf pb_util_proto token_proto
+  NONLINK_DEPS ${RPC_HEADER_PROTO_TGTS})
+
+PROTOBUF_GENERATE_CPP(
+  RPC_INTROSPECTION_PROTO_SRCS RPC_INTROSPECTION_PROTO_HDRS RPC_INTROSPECTION_PROTO_TGTS
+  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+  BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+  PROTO_FILES rpc_introspection.proto)
+set(RPC_INTROSPECTION_PROTO_LIBS
+  rpc_header_proto
+  protobuf)
+ADD_EXPORTABLE_LIBRARY(rpc_introspection_proto
+  SRCS ${RPC_INTROSPECTION_PROTO_SRCS}
+  DEPS ${RPC_INTROSPECTION_PROTO_LIBS}
+  NONLINK_DEPS ${RPC_INTROSPECTION_PROTO_TGTS})
+
+### RPC library
+set(KRPC_SRCS
+    acceptor_pool.cc
+    blocking_ops.cc
+    client_negotiation.cc
+    connection.cc
+    constants.cc
+    inbound_call.cc
+    messenger.cc
+    negotiation.cc
+    outbound_call.cc
+    proxy.cc
+    reactor.cc
+    remote_method.cc
+    remote_user.cc
+    request_tracker.cc
+    result_tracker.cc
+    rpc.cc
+    rpc_context.cc
+    rpc_controller.cc
+    rpc_sidecar.cc
+    rpcz_store.cc
+    sasl_common.cc
+    sasl_helper.cc
+    serialization.cc
+    server_negotiation.cc
+    service_if.cc
+    service_pool.cc
+    service_queue.cc
+    user_credentials.cc
+    transfer.cc
+)
+
+set(KRPC_LIBS
+  cyrus_sasl
+  gutil
+  kudu_util
+  libev
+  rpc_header_proto
+  rpc_introspection_proto
+  security)
+
+ADD_EXPORTABLE_LIBRARY(krpc
+  SRCS ${KRPC_SRCS}
+  DEPS ${KRPC_LIBS})
+
+### RPC generator tool
+add_executable(protoc-gen-krpc protoc-gen-krpc.cc)
+target_link_libraries(protoc-gen-krpc
+    ${KUDU_BASE_LIBS}
+    rpc_header_proto
+    protoc
+    protobuf
+    gutil
+    kudu_util)
+
+#### RPC test
+PROTOBUF_GENERATE_CPP(
+  RPC_TEST_DIFF_PACKAGE_SRCS RPC_TEST_DIFF_PACKAGE_HDRS RPC_TEST_DIFF_PACKAGE_TGTS
+  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+  BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+  PROTO_FILES rtest_diff_package.proto)
+add_library(rtest_diff_package_proto ${RPC_TEST_DIFF_PACKAGE_SRCS} ${RPC_TEST_DIFF_PACKAGE_HDRS})
+target_link_libraries(rtest_diff_package_proto rpc_header_proto)
+
+KRPC_GENERATE(
+  RTEST_KRPC_SRCS RTEST_KRPC_HDRS RTEST_KRPC_TGTS
+  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+  BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+  PROTO_FILES rtest.proto)
+add_library(rtest_krpc ${RTEST_KRPC_SRCS} ${RTEST_KRPC_HDRS})
+target_link_libraries(rtest_krpc
+  krpc
+  rpc_header_proto
+  rtest_diff_package_proto)
+
+# Tests
+set(KUDU_TEST_LINK_LIBS rtest_krpc krpc rpc_header_proto security-test ${KUDU_MIN_TEST_LIBS})
+ADD_KUDU_TEST(exactly_once_rpc-test)
+ADD_KUDU_TEST(mt-rpc-test RUN_SERIAL true)
+ADD_KUDU_TEST(negotiation-test)
+ADD_KUDU_TEST(reactor-test)
+ADD_KUDU_TEST(request_tracker-test)
+ADD_KUDU_TEST(rpc-bench RUN_SERIAL true)
+ADD_KUDU_TEST(rpc-test)
+ADD_KUDU_TEST(rpc_stub-test)
+ADD_KUDU_TEST(service_queue-test)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/acceptor_pool.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/acceptor_pool.cc b/be/src/kudu/rpc/acceptor_pool.cc
new file mode 100644
index 0000000..f3c935c
--- /dev/null
+++ b/be/src/kudu/rpc/acceptor_pool.cc
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/acceptor_pool.h"
+
+#include <pthread.h>
+
+#include <cinttypes>
+#include <cstdint>
+#include <iostream>
+#include <string>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+using google::protobuf::Message;
+using std::string;
+
+METRIC_DEFINE_counter(server, rpc_connections_accepted,
+                      "RPC Connections Accepted",
+                      kudu::MetricUnit::kConnections,
+                      "Number of incoming TCP connections made to the RPC server");
+
+DEFINE_int32(rpc_acceptor_listen_backlog, 128,
+             "Socket backlog parameter used when listening for RPC connections. "
+             "This defines the maximum length to which the queue of pending "
+             "TCP connections inbound to the RPC server may grow. If a connection "
+             "request arrives when the queue is full, the client may receive "
+             "an error. Higher values may help the server ride over bursts of "
+             "new inbound connection requests.");
+TAG_FLAG(rpc_acceptor_listen_backlog, advanced);
+
+namespace kudu {
+namespace rpc {
+
+AcceptorPool::AcceptorPool(Messenger* messenger, Socket* socket,
+                           Sockaddr bind_address)
+    : messenger_(messenger),
+      socket_(socket->Release()),
+      bind_address_(std::move(bind_address)),
+      rpc_connections_accepted_(METRIC_rpc_connections_accepted.Instantiate(
+          messenger->metric_entity())),
+      closing_(false) {}
+
+AcceptorPool::~AcceptorPool() {
+  Shutdown();
+}
+
+Status AcceptorPool::Start(int num_threads) {
+  RETURN_NOT_OK(socket_.Listen(FLAGS_rpc_acceptor_listen_backlog));
+
+  for (int i = 0; i < num_threads; i++) {
+    scoped_refptr<kudu::Thread> new_thread;
+    Status s = kudu::Thread::Create("acceptor pool", "acceptor",
+        &AcceptorPool::RunThread, this, &new_thread);
+    if (!s.ok()) {
+      Shutdown();
+      return s;
+    }
+    threads_.push_back(new_thread);
+  }
+  return Status::OK();
+}
+
+void AcceptorPool::Shutdown() {
+  if (Acquire_CompareAndSwap(&closing_, false, true) != false) {
+    VLOG(2) << "Acceptor Pool on " << bind_address_.ToString()
+            << " already shut down";
+    return;
+  }
+
+#if defined(__linux__)
+  // Closing the socket will break us out of accept() if we're in it, and
+  // prevent future accepts.
+  WARN_NOT_OK(socket_.Shutdown(true, true),
+              strings::Substitute("Could not shut down acceptor socket on $0",
+                                  bind_address_.ToString()));
+#else
+  // Calling shutdown on an accepting (non-connected) socket is illegal on most
+  // platforms (but not Linux). Instead, the accepting threads are interrupted
+  // forcefully.
+  for (const scoped_refptr<kudu::Thread>& thread : threads_) {
+    pthread_cancel(thread.get()->pthread_id());
+  }
+#endif
+
+  for (const scoped_refptr<kudu::Thread>& thread : threads_) {
+    CHECK_OK(ThreadJoiner(thread.get()).Join());
+  }
+  threads_.clear();
+
+  // Close the socket: keeping the descriptor open and, possibly, receiving late
+  // not-to-be-read messages from the peer does not make much sense. The
+  // Socket::Close() method is called upon destruction of the aggregated socket_
+  // object as well. However, the typical ownership pattern of an AcceptorPool
+  // object includes two references wrapped via a shared_ptr smart pointer: one
+  // is held by Messenger, another by RpcServer. If not calling Socket::Close()
+  // here, it would  necessary to wait until Messenger::Shutdown() is called for
+  // the corresponding messenger object to close this socket.
+  ignore_result(socket_.Close());
+}
+
+Sockaddr AcceptorPool::bind_address() const {
+  return bind_address_;
+}
+
+Status AcceptorPool::GetBoundAddress(Sockaddr* addr) const {
+  return socket_.GetSocketAddress(addr);
+}
+
+void AcceptorPool::RunThread() {
+  while (true) {
+    Socket new_sock;
+    Sockaddr remote;
+    VLOG(2) << "calling accept() on socket " << socket_.GetFd()
+            << " listening on " << bind_address_.ToString();
+    Status s = socket_.Accept(&new_sock, &remote, Socket::FLAG_NONBLOCKING);
+    if (!s.ok()) {
+      if (Release_Load(&closing_)) {
+        break;
+      }
+      KLOG_EVERY_N_SECS(WARNING, 1) << "AcceptorPool: accept failed: " << s.ToString()
+                                    << THROTTLE_MSG;
+      continue;
+    }
+    s = new_sock.SetNoDelay(true);
+    if (!s.ok()) {
+      KLOG_EVERY_N_SECS(WARNING, 1) << "Acceptor with remote = " << remote.ToString()
+          << " failed to set TCP_NODELAY on a newly accepted socket: "
+          << s.ToString() << THROTTLE_MSG;
+      continue;
+    }
+    rpc_connections_accepted_->Increment();
+    messenger_->RegisterInboundSocket(&new_sock, remote);
+  }
+  VLOG(1) << "AcceptorPool shutting down.";
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/acceptor_pool.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/acceptor_pool.h b/be/src/kudu/rpc/acceptor_pool.h
new file mode 100644
index 0000000..92b7fc5
--- /dev/null
+++ b/be/src/kudu/rpc/acceptor_pool.h
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_ACCEPTOR_POOL_H
+#define KUDU_RPC_ACCEPTOR_POOL_H
+
+#include <vector>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Counter;
+class Socket;
+
+namespace rpc {
+
+class Messenger;
+
+// A pool of threads calling accept() to create new connections.
+// Acceptor pool threads terminate when they notice that the messenger has been
+// shut down, if Shutdown() is called, or if the pool object is destructed.
+class AcceptorPool {
+ public:
+  // Create a new acceptor pool.  Calls socket::Release to take ownership of the
+  // socket.
+  // 'socket' must be already bound, but should not yet be listening.
+  AcceptorPool(Messenger *messenger, Socket *socket, Sockaddr bind_address);
+  ~AcceptorPool();
+
+  // Start listening and accepting connections.
+  Status Start(int num_threads);
+  void Shutdown();
+
+  // Return the address that the pool is bound to. If the port is specified as
+  // 0, then this will always return port 0.
+  Sockaddr bind_address() const;
+
+  // Return the address that the pool is bound to. This only works while the
+  // socket is open, and if the specified port is 0 then this will return the
+  // actual port that was bound.
+  Status GetBoundAddress(Sockaddr* addr) const;
+
+ private:
+  void RunThread();
+
+  Messenger *messenger_;
+  Socket socket_;
+  Sockaddr bind_address_;
+  std::vector<scoped_refptr<kudu::Thread> > threads_;
+
+  scoped_refptr<Counter> rpc_connections_accepted_;
+
+  Atomic32 closing_;
+
+  DISALLOW_COPY_AND_ASSIGN(AcceptorPool);
+};
+
+} // namespace rpc
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/blocking_ops.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/blocking_ops.cc b/be/src/kudu/rpc/blocking_ops.cc
new file mode 100644
index 0000000..64ae2c0
--- /dev/null
+++ b/be/src/kudu/rpc/blocking_ops.cc
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/blocking_ops.h"
+
+#include <stdint.h>
+#include <string.h>
+
+#include <glog/logging.h>
+#include <google/protobuf/message_lite.h>
+
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/serialization.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace rpc {
+
+using google::protobuf::MessageLite;
+
+const char kHTTPHeader[] = "HTTP";
+
+Status EnsureBlockingMode(const Socket* sock) {
+  bool is_nonblocking;
+  RETURN_NOT_OK(sock->IsNonBlocking(&is_nonblocking));
+  if (is_nonblocking) {
+    return Status::IllegalState("Underlying socket is not set to blocking mode!");
+  }
+  return Status::OK();
+}
+
+Status SendFramedMessageBlocking(Socket* sock, const MessageLite& header, const MessageLite& msg,
+    const MonoTime& deadline) {
+  DCHECK(sock != nullptr);
+  DCHECK(header.IsInitialized()) << "header protobuf must be initialized";
+  DCHECK(msg.IsInitialized()) << "msg protobuf must be initialized";
+
+  RETURN_NOT_OK(EnsureBlockingMode(sock));
+
+  // Ensure we are in blocking mode.
+  // These blocking calls are typically not in the fast path, so doing this for all build types.
+  bool is_non_blocking = false;
+  RETURN_NOT_OK(sock->IsNonBlocking(&is_non_blocking));
+  DCHECK(!is_non_blocking) << "Socket must be in blocking mode to use SendFramedMessage";
+
+  // Serialize message
+  faststring param_buf;
+  serialization::SerializeMessage(msg, &param_buf);
+
+  // Serialize header and initial length
+  faststring header_buf;
+  serialization::SerializeHeader(header, param_buf.size(), &header_buf);
+
+  // Write header & param to stream
+  size_t nsent;
+  RETURN_NOT_OK(sock->BlockingWrite(header_buf.data(), header_buf.size(), &nsent, deadline));
+  RETURN_NOT_OK(sock->BlockingWrite(param_buf.data(), param_buf.size(), &nsent, deadline));
+
+  return Status::OK();
+}
+
+Status ReceiveFramedMessageBlocking(Socket* sock, faststring* recv_buf,
+    MessageLite* header, Slice* param_buf, const MonoTime& deadline) {
+  DCHECK(sock != nullptr);
+  DCHECK(recv_buf != nullptr);
+  DCHECK(header != nullptr);
+  DCHECK(param_buf != nullptr);
+
+  RETURN_NOT_OK(EnsureBlockingMode(sock));
+
+  // Read the message prefix, which specifies the length of the payload.
+  recv_buf->clear();
+  recv_buf->resize(kMsgLengthPrefixLength);
+  size_t recvd = 0;
+  RETURN_NOT_OK(sock->BlockingRecv(recv_buf->data(), kMsgLengthPrefixLength, &recvd, deadline));
+  uint32_t payload_len = NetworkByteOrder::Load32(recv_buf->data());
+
+  // Verify that the payload size isn't out of bounds.
+  // This can happen because of network corruption, or a naughty client.
+  if (PREDICT_FALSE(payload_len > FLAGS_rpc_max_message_size)) {
+    // A common user mistake is to try to speak the Kudu RPC protocol to an
+    // HTTP endpoint, or vice versa.
+    if (memcmp(recv_buf->data(), kHTTPHeader, strlen(kHTTPHeader)) == 0) {
+      return Status::IOError(
+          "received invalid RPC message which appears to be an HTTP response. "
+          "Verify that you have specified a valid RPC port and not an HTTP port.");
+    }
+
+    return Status::IOError(
+        strings::Substitute(
+            "received invalid message of size $0 which exceeds"
+            " the rpc_max_message_size of $1 bytes",
+            payload_len, FLAGS_rpc_max_message_size));
+  }
+
+  // Read the message payload.
+  recvd = 0;
+  recv_buf->resize(payload_len + kMsgLengthPrefixLength);
+  RETURN_NOT_OK(sock->BlockingRecv(recv_buf->data() + kMsgLengthPrefixLength,
+                payload_len, &recvd, deadline));
+  RETURN_NOT_OK(serialization::ParseMessage(Slice(*recv_buf), header, param_buf));
+  return Status::OK();
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/blocking_ops.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/blocking_ops.h b/be/src/kudu/rpc/blocking_ops.h
new file mode 100644
index 0000000..01bb7a6
--- /dev/null
+++ b/be/src/kudu/rpc/blocking_ops.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_BLOCKING_OPS_H
+#define KUDU_RPC_BLOCKING_OPS_H
+
+#include <set>
+#include <string>
+
+namespace google {
+namespace protobuf {
+class MessageLite;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+
+class faststring;
+class MonoTime;
+class Slice;
+class Sockaddr;
+class Socket;
+class Status;
+
+namespace rpc {
+
+// Returns OK if socket is in blocking mode. Otherwise, returns an error.
+Status EnsureBlockingMode(const Socket* sock);
+
+// Encode and send a message over a socket.
+// header: Request or Response header protobuf.
+// msg: Protobuf message to send. This message must be fully initialized.
+// deadline: Latest time allowed for receive to complete before timeout.
+Status SendFramedMessageBlocking(Socket* sock, const google::protobuf::MessageLite& header,
+    const google::protobuf::MessageLite& msg, const MonoTime& deadline);
+
+// Receive a full message frame from the server.
+// recv_buf: buffer to use for reading the data from the socket.
+// header: Request or Response header protobuf.
+// param_buf: Slice into recv_buf containing unparsed RPC param protobuf data.
+// deadline: Latest time allowed for receive to complete before timeout.
+Status ReceiveFramedMessageBlocking(Socket* sock, faststring* recv_buf,
+    google::protobuf::MessageLite* header, Slice* param_buf, const MonoTime& deadline);
+
+} // namespace rpc
+} // namespace kudu
+
+#endif  // KUDU_RPC_BLOCKING_OPS_H

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/client_negotiation.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/client_negotiation.cc b/be/src/kudu/rpc/client_negotiation.cc
new file mode 100644
index 0000000..8ce5190
--- /dev/null
+++ b/be/src/kudu/rpc/client_negotiation.cc
@@ -0,0 +1,776 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/client_negotiation.h"
+
+#include <string.h>
+
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <sasl/sasl.h>
+
+#include "kudu/gutil/casts.h"
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/rpc/blocking_ops.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/sasl_helper.h"
+#include "kudu/rpc/serialization.h"
+#include "kudu/security/cert.h"
+#include "kudu/security/tls_context.h"
+#include "kudu/security/tls_handshake.h"
+#include "kudu/security/tls_socket.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/trace.h"
+
+using std::map;
+using std::set;
+using std::string;
+using std::unique_ptr;
+
+using strings::Substitute;
+
+DECLARE_bool(rpc_encrypt_loopback_connections);
+
+namespace kudu {
+namespace rpc {
+
+static int ClientNegotiationGetoptCb(ClientNegotiation* client_negotiation,
+                                     const char* plugin_name,
+                                     const char* option,
+                                     const char** result,
+                                     unsigned* len) {
+  return client_negotiation->GetOptionCb(plugin_name, option, result, len);
+}
+
+static int ClientNegotiationSimpleCb(ClientNegotiation* client_negotiation,
+                                     int id,
+                                     const char** result,
+                                     unsigned* len) {
+  return client_negotiation->SimpleCb(id, result, len);
+}
+
+static int ClientNegotiationSecretCb(sasl_conn_t* conn,
+                                     ClientNegotiation* client_negotiation,
+                                     int id,
+                                     sasl_secret_t** psecret) {
+  return client_negotiation->SecretCb(conn, id, psecret);
+}
+
+// Return an appropriately-typed Status object based on an ErrorStatusPB returned
+// from an Error RPC.
+// In case there is no relevant Status type, return a RuntimeError.
+static Status StatusFromRpcError(const ErrorStatusPB& error) {
+  DCHECK(error.IsInitialized()) << "Error status PB must be initialized";
+  if (PREDICT_FALSE(!error.has_code())) {
+    return Status::RuntimeError(error.message());
+  }
+  const string code_name = ErrorStatusPB::RpcErrorCodePB_Name(error.code());
+  switch (error.code()) {
+    case ErrorStatusPB_RpcErrorCodePB_FATAL_UNAUTHORIZED: // fall-through
+    case ErrorStatusPB_RpcErrorCodePB_FATAL_INVALID_AUTHENTICATION_TOKEN:
+      return Status::NotAuthorized(code_name, error.message());
+    case ErrorStatusPB_RpcErrorCodePB_ERROR_UNAVAILABLE:
+      return Status::ServiceUnavailable(code_name, error.message());
+    default:
+      return Status::RuntimeError(code_name, error.message());
+  }
+}
+
+ClientNegotiation::ClientNegotiation(unique_ptr<Socket> socket,
+                                     const security::TlsContext* tls_context,
+                                     const boost::optional<security::SignedTokenPB>& authn_token,
+                                     RpcEncryption encryption)
+    : socket_(std::move(socket)),
+      helper_(SaslHelper::CLIENT),
+      tls_context_(tls_context),
+      encryption_(encryption),
+      tls_negotiated_(false),
+      authn_token_(authn_token),
+      psecret_(nullptr, std::free),
+      negotiated_authn_(AuthenticationType::INVALID),
+      negotiated_mech_(SaslMechanism::INVALID),
+      deadline_(MonoTime::Max()) {
+  callbacks_.push_back(SaslBuildCallback(SASL_CB_GETOPT,
+      reinterpret_cast<int (*)()>(&ClientNegotiationGetoptCb), this));
+  callbacks_.push_back(SaslBuildCallback(SASL_CB_AUTHNAME,
+      reinterpret_cast<int (*)()>(&ClientNegotiationSimpleCb), this));
+  callbacks_.push_back(SaslBuildCallback(SASL_CB_PASS,
+      reinterpret_cast<int (*)()>(&ClientNegotiationSecretCb), this));
+  callbacks_.push_back(SaslBuildCallback(SASL_CB_LIST_END, nullptr, nullptr));
+  DCHECK(socket_);
+  DCHECK(tls_context_);
+}
+
+Status ClientNegotiation::EnablePlain(const string& user, const string& pass) {
+  RETURN_NOT_OK(helper_.EnablePlain());
+  plain_auth_user_ = user;
+  plain_pass_ = pass;
+  return Status::OK();
+}
+
+Status ClientNegotiation::EnableGSSAPI() {
+  return helper_.EnableGSSAPI();
+}
+
+SaslMechanism::Type ClientNegotiation::negotiated_mechanism() const {
+  return negotiated_mech_;
+}
+
+void ClientNegotiation::set_server_fqdn(const string& domain_name) {
+  helper_.set_server_fqdn(domain_name);
+}
+
+void ClientNegotiation::set_deadline(const MonoTime& deadline) {
+  deadline_ = deadline;
+}
+
+Status ClientNegotiation::Negotiate(unique_ptr<ErrorStatusPB>* rpc_error) {
+  TRACE("Beginning negotiation");
+
+  // Ensure we can use blocking calls on the socket during negotiation.
+  RETURN_NOT_OK(EnsureBlockingMode(socket_.get()));
+
+  // Step 1: send the connection header.
+  RETURN_NOT_OK(SendConnectionHeader());
+
+  faststring recv_buf;
+
+  { // Step 2: send and receive the NEGOTIATE step messages.
+    RETURN_NOT_OK(SendNegotiate());
+    NegotiatePB response;
+    RETURN_NOT_OK(RecvNegotiatePB(&response, &recv_buf, rpc_error));
+    RETURN_NOT_OK(HandleNegotiate(response));
+    TRACE("Negotiated authn=$0", AuthenticationTypeToString(negotiated_authn_));
+  }
+
+  // Step 3: if both ends support TLS, do a TLS handshake.
+  // TODO(KUDU-1921): allow the client to require TLS.
+  if (encryption_ != RpcEncryption::DISABLED &&
+      ContainsKey(server_features_, TLS)) {
+    RETURN_NOT_OK(tls_context_->InitiateHandshake(security::TlsHandshakeType::CLIENT,
+                                                  &tls_handshake_));
+
+    if (negotiated_authn_ == AuthenticationType::SASL) {
+      // When using SASL authentication, verifying the server's certificate is
+      // not necessary. This allows the client to still use TLS encryption for
+      // connections to servers which only have a self-signed certificate.
+      tls_handshake_.set_verification_mode(security::TlsVerificationMode::VERIFY_NONE);
+    }
+
+    // To initiate the TLS handshake, we pretend as if the server sent us an
+    // empty TLS_HANDSHAKE token.
+    NegotiatePB initial;
+    initial.set_step(NegotiatePB::TLS_HANDSHAKE);
+    initial.set_tls_handshake("");
+    Status s = HandleTlsHandshake(initial);
+
+    while (s.IsIncomplete()) {
+      NegotiatePB response;
+      RETURN_NOT_OK(RecvNegotiatePB(&response, &recv_buf, rpc_error));
+      s = HandleTlsHandshake(response);
+    }
+    RETURN_NOT_OK(s);
+    tls_negotiated_ = true;
+  }
+
+  // Step 4: Authentication
+  switch (negotiated_authn_) {
+    case AuthenticationType::SASL:
+      RETURN_NOT_OK(AuthenticateBySasl(&recv_buf, rpc_error));
+      break;
+    case AuthenticationType::TOKEN:
+      RETURN_NOT_OK(AuthenticateByToken(&recv_buf, rpc_error));
+      break;
+    case AuthenticationType::CERTIFICATE:
+      // The TLS handshake has already authenticated the server.
+      break;
+    case AuthenticationType::INVALID: LOG(FATAL) << "unreachable";
+  }
+
+  // Step 5: Send connection context.
+  RETURN_NOT_OK(SendConnectionContext());
+
+  TRACE("Negotiation successful");
+  return Status::OK();
+}
+
+Status ClientNegotiation::SendNegotiatePB(const NegotiatePB& msg) {
+  RequestHeader header;
+  header.set_call_id(kNegotiateCallId);
+
+  DCHECK(socket_);
+  DCHECK(msg.IsInitialized()) << "message must be initialized";
+  DCHECK(msg.has_step()) << "message must have a step";
+
+  TRACE("Sending $0 NegotiatePB request", NegotiatePB::NegotiateStep_Name(msg.step()));
+  return SendFramedMessageBlocking(socket(), header, msg, deadline_);
+}
+
+Status ClientNegotiation::RecvNegotiatePB(NegotiatePB* msg,
+                                          faststring* buffer,
+                                          unique_ptr<ErrorStatusPB>* rpc_error) {
+  ResponseHeader header;
+  Slice param_buf;
+  RETURN_NOT_OK(ReceiveFramedMessageBlocking(socket(), buffer, &header, &param_buf, deadline_));
+  RETURN_NOT_OK(helper_.CheckNegotiateCallId(header.call_id()));
+
+  if (header.is_error()) {
+    return ParseError(param_buf, rpc_error);
+  }
+
+  RETURN_NOT_OK(helper_.ParseNegotiatePB(param_buf, msg));
+  TRACE("Received $0 NegotiatePB response", NegotiatePB::NegotiateStep_Name(msg->step()));
+  return Status::OK();
+}
+
+Status ClientNegotiation::ParseError(const Slice& err_data,
+                                     unique_ptr<ErrorStatusPB>* rpc_error) {
+  unique_ptr<ErrorStatusPB> error(new ErrorStatusPB);
+  if (!error->ParseFromArray(err_data.data(), err_data.size())) {
+    return Status::IOError("invalid error response, missing fields",
+                           error->InitializationErrorString());
+  }
+  Status s = StatusFromRpcError(*error);
+  TRACE("Received error response from server: $0", s.ToString());
+
+  if (rpc_error) {
+    rpc_error->swap(error);
+  }
+  return s;
+}
+
+Status ClientNegotiation::SendConnectionHeader() {
+  const uint8_t buflen = kMagicNumberLength + kHeaderFlagsLength;
+  uint8_t buf[buflen];
+  serialization::SerializeConnHeader(buf);
+  size_t nsent;
+  return socket()->BlockingWrite(buf, buflen, &nsent, deadline_);
+}
+
+Status ClientNegotiation::InitSaslClient() {
+  RETURN_NOT_OK(SaslInit());
+
+  // TODO(KUDU-1922): consider setting SASL_SUCCESS_DATA
+  unsigned flags = 0;
+
+  sasl_conn_t* sasl_conn = nullptr;
+  RETURN_NOT_OK_PREPEND(WrapSaslCall(nullptr /* no conn */, [&]() {
+      return sasl_client_new(
+          kSaslProtoName,               // Registered name of the service using SASL. Required.
+          helper_.server_fqdn(),        // The fully qualified domain name of the remote server.
+          nullptr,                      // Local and remote IP address strings. (we don't use
+          nullptr,                      // any mechanisms which require this info.)
+          &callbacks_[0],               // Connection-specific callbacks.
+          flags,
+          &sasl_conn);
+    }), "Unable to create new SASL client");
+  sasl_conn_.reset(sasl_conn);
+  return Status::OK();
+}
+
+Status ClientNegotiation::SendNegotiate() {
+  NegotiatePB msg;
+  msg.set_step(NegotiatePB::NEGOTIATE);
+
+  // Advertise our supported features.
+  client_features_ = kSupportedClientRpcFeatureFlags;
+
+  if (encryption_ != RpcEncryption::DISABLED) {
+    client_features_.insert(TLS);
+    // If the remote peer is local, then we allow using TLS for authentication
+    // without encryption or integrity.
+    if (socket_->IsLoopbackConnection() && !FLAGS_rpc_encrypt_loopback_connections) {
+      client_features_.insert(TLS_AUTHENTICATION_ONLY);
+    }
+  }
+
+  for (RpcFeatureFlag feature : client_features_) {
+    msg.add_supported_features(feature);
+  }
+
+  if (!helper_.EnabledMechs().empty()) {
+    msg.add_authn_types()->mutable_sasl();
+  }
+  if (tls_context_->has_signed_cert() && !tls_context_->is_external_cert()) {
+    // We only provide authenticated TLS if the certificates are generated
+    // by the internal CA.
+    msg.add_authn_types()->mutable_certificate();
+  }
+  if (authn_token_ && tls_context_->has_trusted_cert()) {
+    // TODO(KUDU-1924): check that the authn token is not expired. Can this be done
+    // reliably on clients?
+    msg.add_authn_types()->mutable_token();
+  }
+
+  if (PREDICT_FALSE(msg.authn_types().empty())) {
+    return Status::NotAuthorized("client is not configured with an authentication type");
+  }
+
+  RETURN_NOT_OK(SendNegotiatePB(msg));
+  return Status::OK();
+}
+
+Status ClientNegotiation::HandleNegotiate(const NegotiatePB& response) {
+  if (PREDICT_FALSE(response.step() != NegotiatePB::NEGOTIATE)) {
+    return Status::NotAuthorized("expected NEGOTIATE step",
+                                 NegotiatePB::NegotiateStep_Name(response.step()));
+  }
+  TRACE("Received NEGOTIATE response from server");
+
+  // Fill in the set of features supported by the server.
+  for (int flag : response.supported_features()) {
+    // We only add the features that our local build knows about.
+    RpcFeatureFlag feature_flag = RpcFeatureFlag_IsValid(flag) ?
+                                  static_cast<RpcFeatureFlag>(flag) : UNKNOWN;
+    if (feature_flag != UNKNOWN) {
+      server_features_.insert(feature_flag);
+    }
+  }
+
+  if (encryption_ == RpcEncryption::REQUIRED &&
+      !ContainsKey(server_features_, RpcFeatureFlag::TLS)) {
+    return Status::NotAuthorized("server does not support required TLS encryption");
+  }
+
+  // Get the authentication type which the server would like to use.
+  DCHECK_LE(response.authn_types().size(), 1);
+  if (response.authn_types().empty()) {
+    // If the server doesn't send back an authentication type, default to SASL
+    // in order to maintain backwards compatibility.
+    negotiated_authn_ = AuthenticationType::SASL;
+  } else {
+    const auto& authn_type = response.authn_types(0);
+    switch (authn_type.type_case()) {
+      case AuthenticationTypePB::kSasl:
+        negotiated_authn_ = AuthenticationType::SASL;
+        break;
+      case AuthenticationTypePB::kToken:
+        // TODO(todd): we should also be checking tls_context_->has_trusted_cert()
+        // here to match the original logic we used to advertise TOKEN support,
+        // or perhaps just check explicitly whether we advertised TOKEN.
+        if (!authn_token_) {
+          return Status::RuntimeError(
+              "server chose token authentication, but client has no token");
+        }
+        negotiated_authn_ = AuthenticationType::TOKEN;
+        return Status::OK();
+      case AuthenticationTypePB::kCertificate:
+        if (!tls_context_->has_signed_cert()) {
+          return Status::RuntimeError(
+              "server chose certificate authentication, but client has no certificate");
+        }
+        negotiated_authn_ = AuthenticationType::CERTIFICATE;
+        return Status::OK();
+      case AuthenticationTypePB::TYPE_NOT_SET:
+        return Status::RuntimeError("server chose an unknown authentication type");
+    }
+  }
+
+  DCHECK_EQ(negotiated_authn_, AuthenticationType::SASL);
+
+  // Build a map of the SASL mechanisms offered by the server.
+  const set<SaslMechanism::Type>& client_mechs = helper_.EnabledMechs();
+  set<SaslMechanism::Type> server_mechs;
+  for (const NegotiatePB::SaslMechanism& sasl_mech : response.sasl_mechanisms()) {
+    auto mech = SaslMechanism::value_of(sasl_mech.mechanism());
+    if (mech == SaslMechanism::INVALID) {
+      continue;
+    }
+    server_mechs.insert(mech);
+  }
+
+  // Determine which SASL mechanism to use for authenticating the connection.
+  // We pick the most preferred mechanism which is supported by both parties.
+  // The preference list in order of most to least preferred:
+  //  * GSSAPI
+  //  * PLAIN
+  set<SaslMechanism::Type> common_mechs = STLSetIntersection(client_mechs, server_mechs);
+
+  if (common_mechs.empty()) {
+    if (ContainsKey(server_mechs, SaslMechanism::GSSAPI) &&
+        !ContainsKey(client_mechs, SaslMechanism::GSSAPI)) {
+      return Status::NotAuthorized("server requires authentication, "
+                                   "but client does not have Kerberos enabled");
+    }
+    if (!ContainsKey(server_mechs, SaslMechanism::GSSAPI) &&
+        ContainsKey(client_mechs, SaslMechanism::GSSAPI)) {
+      return Status::NotAuthorized("client requires authentication, "
+                                   "but server does not have Kerberos enabled");
+    }
+    string msg = Substitute("client/server supported SASL mechanism mismatch; "
+                            "client mechanisms: [$0], server mechanisms: [$1]",
+                            JoinMapped(client_mechs, SaslMechanism::name_of, ", "),
+                            JoinMapped(server_mechs, SaslMechanism::name_of, ", "));
+
+    // For now, there should never be a SASL mechanism mismatch that isn't due
+    // to one of the sides requiring Kerberos and the other not having it, so
+    // lets sanity check that.
+    DLOG(FATAL) << msg;
+    return Status::NotAuthorized(msg);
+  }
+
+  // TODO(KUDU-1921): allow the client to require authentication.
+  if (ContainsKey(common_mechs, SaslMechanism::GSSAPI)) {
+    negotiated_mech_ = SaslMechanism::GSSAPI;
+  } else {
+    DCHECK(ContainsKey(common_mechs, SaslMechanism::PLAIN));
+    negotiated_mech_ = SaslMechanism::PLAIN;
+  }
+
+  return Status::OK();
+}
+
+Status ClientNegotiation::SendTlsHandshake(string tls_token) {
+  TRACE("Sending TLS_HANDSHAKE message to server");
+  NegotiatePB msg;
+  msg.set_step(NegotiatePB::TLS_HANDSHAKE);
+  msg.mutable_tls_handshake()->swap(tls_token);
+  return SendNegotiatePB(msg);
+}
+
+Status ClientNegotiation::HandleTlsHandshake(const NegotiatePB& response) {
+  if (PREDICT_FALSE(response.step() != NegotiatePB::TLS_HANDSHAKE)) {
+    return Status::NotAuthorized("expected TLS_HANDSHAKE step",
+                                 NegotiatePB::NegotiateStep_Name(response.step()));
+  }
+  TRACE("Received TLS_HANDSHAKE response from server");
+
+  if (PREDICT_FALSE(!response.has_tls_handshake())) {
+    return Status::NotAuthorized("No TLS handshake token in TLS_HANDSHAKE response from server");
+  }
+
+  string token;
+  Status s = tls_handshake_.Continue(response.tls_handshake(), &token);
+  if (s.IsIncomplete()) {
+    // Another roundtrip is required to complete the handshake.
+    RETURN_NOT_OK(SendTlsHandshake(std::move(token)));
+  }
+
+  // Check that the handshake step didn't produce an error. Will also propagate
+  // an Incomplete status.
+  RETURN_NOT_OK(s);
+
+  // TLS handshake is finished.
+  DCHECK(token.empty());
+
+  if (ContainsKey(server_features_, TLS_AUTHENTICATION_ONLY) &&
+      ContainsKey(client_features_, TLS_AUTHENTICATION_ONLY)) {
+    TRACE("Negotiated auth-only $0 with cipher suite $1",
+          tls_handshake_.GetProtocol(), tls_handshake_.GetCipherSuite());
+    return tls_handshake_.FinishNoWrap(*socket_);
+  }
+
+  TRACE("Negotiated $0 with cipher suite $1",
+        tls_handshake_.GetProtocol(), tls_handshake_.GetCipherSuite());
+  return tls_handshake_.Finish(&socket_);
+}
+
+Status ClientNegotiation::AuthenticateBySasl(faststring* recv_buf,
+                                             unique_ptr<ErrorStatusPB>* rpc_error) {
+  RETURN_NOT_OK(InitSaslClient());
+  Status s = SendSaslInitiate();
+
+  // HandleSasl[Initiate, Challenge] return incomplete if an additional
+  // challenge step is required, or OK if a SASL_SUCCESS message is expected.
+  while (s.IsIncomplete()) {
+    NegotiatePB challenge;
+    RETURN_NOT_OK(RecvNegotiatePB(&challenge, recv_buf, rpc_error));
+    s = HandleSaslChallenge(challenge);
+  }
+
+  // Propagate failure from SendSaslInitiate or HandleSaslChallenge.
+  RETURN_NOT_OK(s);
+
+  // Server challenges are over; we now expect the success message.
+  NegotiatePB success;
+  RETURN_NOT_OK(RecvNegotiatePB(&success, recv_buf, rpc_error));
+  return HandleSaslSuccess(success);
+}
+
+Status ClientNegotiation::AuthenticateByToken(faststring* recv_buf,
+                                              unique_ptr<ErrorStatusPB>* rpc_error) {
+  // Sanity check that TLS has been negotiated. Sending the token on an
+  // unencrypted channel is a big no-no.
+  CHECK(tls_negotiated_);
+
+  // Send the token to the server.
+  NegotiatePB pb;
+  pb.set_step(NegotiatePB::TOKEN_EXCHANGE);
+  pb.mutable_authn_token()->Swap(authn_token_.get_ptr());
+  RETURN_NOT_OK(SendNegotiatePB(pb));
+  pb.Clear();
+
+  // Check that the server responds with a non-error TOKEN_EXCHANGE message.
+  RETURN_NOT_OK(RecvNegotiatePB(&pb, recv_buf, rpc_error));
+  if (pb.step() != NegotiatePB::TOKEN_EXCHANGE) {
+    return Status::NotAuthorized("expected TOKEN_EXCHANGE step",
+                                 NegotiatePB::NegotiateStep_Name(pb.step()));
+  }
+
+  return Status::OK();
+}
+
+Status ClientNegotiation::SendSaslInitiate() {
+  TRACE("Initiating SASL $0 handshake", SaslMechanism::name_of(negotiated_mech_));
+
+  // At this point we've already chosen the SASL mechanism to use
+  // (negotiated_mech_), but we need to let the SASL library know. SASL likes to
+  // choose the mechanism from among a list of possible options, so we simply
+  // provide it one option, and then check that it picks that option.
+
+  const char* init_msg = nullptr;
+  unsigned init_msg_len = 0;
+  const char* negotiated_mech = nullptr;
+
+  /* select a mechanism for a connection
+   *  mechlist      -- mechanisms server has available (punctuation ignored)
+   * output:
+   *  prompt_need   -- on SASL_INTERACT, list of prompts needed to continue
+   *  clientout     -- the initial client response to send to the server
+   *  mech          -- set to mechanism name
+   *
+   * Returns:
+   *  SASL_OK       -- success
+   *  SASL_CONTINUE -- negotiation required
+   *  SASL_NOMEM    -- not enough memory
+   *  SASL_NOMECH   -- no mechanism meets requested properties
+   *  SASL_INTERACT -- user interaction needed to fill in prompt_need list
+   */
+  TRACE("Calling sasl_client_start()");
+  const Status s = WrapSaslCall(sasl_conn_.get(), [&]() {
+      return sasl_client_start(
+          sasl_conn_.get(),                         // The SASL connection context created by init()
+          SaslMechanism::name_of(negotiated_mech_), // The list of mechanisms to negotiate.
+          nullptr,                                  // Disables INTERACT return if NULL.
+          &init_msg,                                // Filled in on success.
+          &init_msg_len,                            // Filled in on success.
+          &negotiated_mech);                        // Filled in on success.
+  });
+
+  if (PREDICT_FALSE(!s.IsIncomplete() && !s.ok())) {
+    return s;
+  }
+
+  // Check that the SASL library is using the mechanism that we picked.
+  DCHECK_EQ(SaslMechanism::value_of(negotiated_mech), negotiated_mech_);
+
+  // If the negotiated mechanism is GSSAPI (Kerberos), configure SASL to use
+  // integrity protection so that the channel bindings and nonce can be
+  // verified.
+  if (negotiated_mech_ == SaslMechanism::GSSAPI) {
+    RETURN_NOT_OK(EnableIntegrityProtection(sasl_conn_.get()));
+  }
+
+  NegotiatePB msg;
+  msg.set_step(NegotiatePB::SASL_INITIATE);
+  msg.mutable_token()->assign(init_msg, init_msg_len);
+  msg.add_sasl_mechanisms()->set_mechanism(negotiated_mech);
+  RETURN_NOT_OK(SendNegotiatePB(msg));
+  return s;
+}
+
+Status ClientNegotiation::SendSaslResponse(const char* resp_msg, unsigned resp_msg_len) {
+  NegotiatePB reply;
+  reply.set_step(NegotiatePB::SASL_RESPONSE);
+  reply.mutable_token()->assign(resp_msg, resp_msg_len);
+  return SendNegotiatePB(reply);
+}
+
+Status ClientNegotiation::HandleSaslChallenge(const NegotiatePB& response) {
+  if (PREDICT_FALSE(response.step() != NegotiatePB::SASL_CHALLENGE)) {
+    return Status::NotAuthorized("expected SASL_CHALLENGE step",
+                                 NegotiatePB::NegotiateStep_Name(response.step()));
+  }
+  TRACE("Received SASL_CHALLENGE response from server");
+  if (PREDICT_FALSE(!response.has_token())) {
+    return Status::NotAuthorized("no token in SASL_CHALLENGE response from server");
+  }
+
+  const char* out = nullptr;
+  unsigned out_len = 0;
+  const Status s = DoSaslStep(response.token(), &out, &out_len);
+  if (PREDICT_FALSE(!s.IsIncomplete() && !s.ok())) {
+    return s;
+  }
+
+  RETURN_NOT_OK(SendSaslResponse(out, out_len));
+  return s;
+}
+
+Status ClientNegotiation::HandleSaslSuccess(const NegotiatePB& response) {
+  if (PREDICT_FALSE(response.step() != NegotiatePB::SASL_SUCCESS)) {
+    return Status::NotAuthorized("expected SASL_SUCCESS step",
+                                 NegotiatePB::NegotiateStep_Name(response.step()));
+  }
+  TRACE("Received SASL_SUCCESS response from server");
+
+  if (negotiated_mech_ == SaslMechanism::GSSAPI) {
+    if (response.has_nonce()) {
+      // Grab the nonce from the server, if it has sent one. We'll send it back
+      // later with SASL integrity protection as part of the connection context.
+      nonce_ = response.nonce();
+    }
+
+    if (tls_negotiated_) {
+      // Check the channel bindings provided by the server against the expected channel bindings.
+      if (!response.has_channel_bindings()) {
+        return Status::NotAuthorized("no channel bindings provided by server");
+      }
+
+      security::Cert cert;
+      RETURN_NOT_OK(tls_handshake_.GetRemoteCert(&cert));
+
+      string expected_channel_bindings;
+      RETURN_NOT_OK_PREPEND(cert.GetServerEndPointChannelBindings(&expected_channel_bindings),
+                            "failed to generate channel bindings");
+
+      string received_channel_bindings;
+      RETURN_NOT_OK_PREPEND(SaslDecode(sasl_conn_.get(),
+                                       response.channel_bindings(),
+                                       &received_channel_bindings),
+                            "failed to decode channel bindings");
+
+      if (expected_channel_bindings != received_channel_bindings) {
+        Sockaddr addr;
+        ignore_result(socket_->GetPeerAddress(&addr));
+
+        LOG(WARNING) << "Received invalid channel bindings from server "
+                    << addr.ToString()
+                    << ", this could indicate an active network man-in-the-middle";
+        return Status::NotAuthorized("channel bindings do not match");
+      }
+    }
+  }
+
+  return Status::OK();
+}
+
+Status ClientNegotiation::DoSaslStep(const string& in, const char** out, unsigned* out_len) {
+  TRACE("Calling sasl_client_step()");
+
+  return WrapSaslCall(sasl_conn_.get(), [&]() {
+      return sasl_client_step(sasl_conn_.get(), in.c_str(), in.length(), nullptr, out, out_len);
+  });
+}
+
+Status ClientNegotiation::SendConnectionContext() {
+  TRACE("Sending connection context");
+  RequestHeader header;
+  header.set_call_id(kConnectionContextCallId);
+
+  ConnectionContextPB conn_context;
+  // This field is deprecated but used by servers <Kudu 1.1. Newer server versions ignore
+  // this and use the SASL-provided username instead.
+  conn_context.mutable_deprecated_user_info()->set_real_user(
+      plain_auth_user_.empty() ? "cpp-client" : plain_auth_user_);
+
+  if (nonce_) {
+    // Reply with the SASL-protected nonce. We only set the nonce when using SASL GSSAPI.
+    RETURN_NOT_OK(SaslEncode(sasl_conn_.get(), *nonce_, conn_context.mutable_encoded_nonce()));
+  }
+
+  return SendFramedMessageBlocking(socket(), header, conn_context, deadline_);
+}
+
+int ClientNegotiation::GetOptionCb(const char* plugin_name, const char* option,
+                            const char** result, unsigned* len) {
+  return helper_.GetOptionCb(plugin_name, option, result, len);
+}
+
+// Used for PLAIN.
+// SASL callback for SASL_CB_USER, SASL_CB_AUTHNAME, SASL_CB_LANGUAGE
+int ClientNegotiation::SimpleCb(int id, const char** result, unsigned* len) {
+  if (PREDICT_FALSE(!helper_.IsPlainEnabled())) {
+    LOG(DFATAL) << "Simple callback called, but PLAIN auth is not enabled";
+    return SASL_FAIL;
+  }
+  if (PREDICT_FALSE(result == nullptr)) {
+    LOG(DFATAL) << "result outparam is NULL";
+    return SASL_BADPARAM;
+  }
+  switch (id) {
+    // TODO(unknown): Support impersonation?
+    // For impersonation, USER is the impersonated user, AUTHNAME is the "sudoer".
+    case SASL_CB_USER:
+      TRACE("callback for SASL_CB_USER");
+      *result = plain_auth_user_.c_str();
+      if (len != nullptr) *len = plain_auth_user_.length();
+      break;
+    case SASL_CB_AUTHNAME:
+      TRACE("callback for SASL_CB_AUTHNAME");
+      *result = plain_auth_user_.c_str();
+      if (len != nullptr) *len = plain_auth_user_.length();
+      break;
+    case SASL_CB_LANGUAGE:
+      LOG(DFATAL) << "Unable to handle SASL callback type SASL_CB_LANGUAGE"
+        << "(" << id << ")";
+      return SASL_BADPARAM;
+    default:
+      LOG(DFATAL) << "Unexpected SASL callback type: " << id;
+      return SASL_BADPARAM;
+  }
+
+  return SASL_OK;
+}
+
+// Used for PLAIN.
+// SASL callback for SASL_CB_PASS: User password.
+int ClientNegotiation::SecretCb(sasl_conn_t* conn, int id, sasl_secret_t** psecret) {
+  if (PREDICT_FALSE(!helper_.IsPlainEnabled())) {
+    LOG(DFATAL) << "Plain secret callback called, but PLAIN auth is not enabled";
+    return SASL_FAIL;
+  }
+  switch (id) {
+    case SASL_CB_PASS: {
+      if (!conn || !psecret) return SASL_BADPARAM;
+
+      size_t len = plain_pass_.length();
+      *psecret = reinterpret_cast<sasl_secret_t*>(malloc(sizeof(sasl_secret_t) + len));
+      if (!*psecret) {
+        return SASL_NOMEM;
+      }
+      psecret_.reset(*psecret);  // Ensure that we free() this structure later.
+      (*psecret)->len = len;
+      memcpy((*psecret)->data, plain_pass_.c_str(), len + 1);
+      break;
+    }
+    default:
+      LOG(DFATAL) << "Unexpected SASL callback type: " << id;
+      return SASL_BADPARAM;
+  }
+
+  return SASL_OK;
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/client_negotiation.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/client_negotiation.h b/be/src/kudu/rpc/client_negotiation.h
new file mode 100644
index 0000000..aa8c6d9
--- /dev/null
+++ b/be/src/kudu/rpc/client_negotiation.h
@@ -0,0 +1,252 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdlib>
+#include <memory>
+#include <set>
+#include <string>
+#include <vector>
+
+#include <boost/optional.hpp>
+#include <sasl/sasl.h>
+
+#include "kudu/rpc/negotiation.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/sasl_helper.h"
+#include "kudu/security/tls_handshake.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+namespace security {
+class TlsContext;
+}
+
+namespace rpc {
+
+class NegotiatePB;
+class NegotiatePB_SaslAuth;
+class ResponseHeader;
+
+// Class for doing KRPC negotiation with a remote server over a bidirectional socket.
+// Operations on this class are NOT thread-safe.
+class ClientNegotiation {
+ public:
+  // Creates a new client negotiation instance, taking ownership of the
+  // provided socket. After completing the negotiation process by setting the
+  // desired options and calling Negotiate(), the socket can be retrieved with
+  // 'release_socket'.
+  //
+  // The provided TlsContext must outlive this negotiation instance.
+  ClientNegotiation(std::unique_ptr<Socket> socket,
+                    const security::TlsContext* tls_context,
+                    const boost::optional<security::SignedTokenPB>& authn_token,
+                    RpcEncryption encryption);
+
+  // Enable PLAIN authentication.
+  // Must be called before Negotiate().
+  Status EnablePlain(const std::string& user,
+                     const std::string& pass);
+
+  // Enable GSSAPI authentication.
+  // Must be called before Negotiate().
+  Status EnableGSSAPI();
+
+  // Returns mechanism negotiated by this connection.
+  // Must be called after Negotiate().
+  SaslMechanism::Type negotiated_mechanism() const;
+
+  // Returns the negotiated authentication type for the connection.
+  // Must be called after Negotiate().
+  AuthenticationType negotiated_authn() const {
+    DCHECK_NE(negotiated_authn_, AuthenticationType::INVALID);
+    return negotiated_authn_;
+  }
+
+  // Returns true if TLS was negotiated.
+  // Must be called after Negotiate().
+  bool tls_negotiated() const {
+    return tls_negotiated_;
+  }
+
+  // Returns the set of RPC system features supported by the remote server.
+  // Must be called before Negotiate().
+  std::set<RpcFeatureFlag> server_features() const {
+    return server_features_;
+  }
+
+  // Returns the set of RPC system features supported by the remote server.
+  // Must be called after Negotiate().
+  // Subsequent calls to this method or server_features() will return an empty set.
+  std::set<RpcFeatureFlag> take_server_features() {
+    return std::move(server_features_);
+  }
+
+  // Specify the fully-qualified domain name of the remote server.
+  // Must be called before Negotiate(). Required for some mechanisms.
+  void set_server_fqdn(const std::string& domain_name);
+
+  // Set deadline for connection negotiation.
+  void set_deadline(const MonoTime& deadline);
+
+  Socket* socket() { return socket_.get(); }
+
+  // Takes and returns the socket owned by this client negotiation. The caller
+  // will own the socket after this call, and the negotiation instance should no
+  // longer be used. Must be called after Negotiate(). Subsequent calls to this
+  // method or socket() will return a null pointer.
+  std::unique_ptr<Socket> release_socket() { return std::move(socket_); }
+
+  // Negotiate with the remote server. Should only be called once per
+  // ClientNegotiation and socket instance, after all options have been set.
+  //
+  // Returns OK on success, otherwise may return NotAuthorized, NotSupported, or
+  // another non-OK status.
+  Status Negotiate(std::unique_ptr<ErrorStatusPB>* rpc_error = nullptr);
+
+  // SASL callback for plugin options, supported mechanisms, etc.
+  // Returns SASL_FAIL if the option is not handled, which does not fail the handshake.
+  int GetOptionCb(const char* plugin_name, const char* option,
+                  const char** result, unsigned* len);
+
+  // SASL callback for SASL_CB_USER, SASL_CB_AUTHNAME, SASL_CB_LANGUAGE
+  int SimpleCb(int id, const char** result, unsigned* len);
+
+  // SASL callback for SASL_CB_PASS
+  int SecretCb(sasl_conn_t* conn, int id, sasl_secret_t** psecret);
+
+ private:
+
+  // Encode and send the specified negotiate request message to the server.
+  Status SendNegotiatePB(const NegotiatePB& msg) WARN_UNUSED_RESULT;
+
+  // Receive a negotiate response message from the server, deserializing it into 'msg'.
+  // Validates that the response is not an error.
+  Status RecvNegotiatePB(NegotiatePB* msg,
+                         faststring* buffer,
+                         std::unique_ptr<ErrorStatusPB>* rpc_error) WARN_UNUSED_RESULT;
+
+  // Parse error status message from raw bytes of an ErrorStatusPB.
+  Status ParseError(const Slice& err_data,
+                    std::unique_ptr<ErrorStatusPB>* rpc_error) WARN_UNUSED_RESULT;
+
+  Status SendConnectionHeader() WARN_UNUSED_RESULT;
+
+  // Initialize the SASL client negotiation instance.
+  Status InitSaslClient() WARN_UNUSED_RESULT;
+
+  // Send a NEGOTIATE step message to the server.
+  Status SendNegotiate() WARN_UNUSED_RESULT;
+
+  // Handle NEGOTIATE step response from the server.
+  Status HandleNegotiate(const NegotiatePB& response) WARN_UNUSED_RESULT;
+
+  // Send a TLS_HANDSHAKE request message to the server with the provided token.
+  Status SendTlsHandshake(std::string tls_token) WARN_UNUSED_RESULT;
+
+  // Handle a TLS_HANDSHAKE response message from the server.
+  Status HandleTlsHandshake(const NegotiatePB& response) WARN_UNUSED_RESULT;
+
+  // Authenticate to the server using SASL.
+  // 'recv_buf' allows a receive buffer to be reused.
+  Status AuthenticateBySasl(faststring* recv_buf,
+                            std::unique_ptr<ErrorStatusPB>* rpc_error) WARN_UNUSED_RESULT;
+
+  // Authenticate to the server using a token.
+  // 'recv_buf' allows a receive buffer to be reused.
+  Status AuthenticateByToken(faststring* recv_buf,
+                             std::unique_ptr<ErrorStatusPB> *rpc_error) WARN_UNUSED_RESULT;
+
+  // Send an SASL_INITIATE message to the server.
+  // Returns:
+  //  Status::OK if the SASL_SUCCESS message is expected next.
+  //  Status::Incomplete if the SASL_CHALLENGE message is expected next.
+  //  Any other status indicates an error.
+  Status SendSaslInitiate() WARN_UNUSED_RESULT;
+
+  // Send a SASL_RESPONSE message to the server.
+  Status SendSaslResponse(const char* resp_msg, unsigned resp_msg_len) WARN_UNUSED_RESULT;
+
+  // Handle case when server sends SASL_CHALLENGE response.
+  // Returns:
+  //  Status::OK if a SASL_SUCCESS message is expected next.
+  //  Status::Incomplete if another SASL_CHALLENGE message is expected.
+  //  Any other status indicates an error.
+  Status HandleSaslChallenge(const NegotiatePB& response) WARN_UNUSED_RESULT;
+
+  // Handle case when server sends SASL_SUCCESS response.
+  Status HandleSaslSuccess(const NegotiatePB& response) WARN_UNUSED_RESULT;
+
+  // Perform a client-side step of the SASL negotiation.
+  // Input is what came from the server. Output is what we will send back to the server.
+  // Returns:
+  //   Status::OK if sasl_client_step returns SASL_OK.
+  //   Status::Incomplete if sasl_client_step returns SASL_CONTINUE
+  // otherwise returns an appropriate error status.
+  Status DoSaslStep(const std::string& in, const char** out, unsigned* out_len) WARN_UNUSED_RESULT;
+
+  Status SendConnectionContext() WARN_UNUSED_RESULT;
+
+  // The socket to the remote server.
+  std::unique_ptr<Socket> socket_;
+
+  // SASL state.
+  std::vector<sasl_callback_t> callbacks_;
+  std::unique_ptr<sasl_conn_t, SaslDeleter> sasl_conn_;
+  SaslHelper helper_;
+  boost::optional<std::string> nonce_;
+
+  // TLS state.
+  const security::TlsContext* tls_context_;
+  security::TlsHandshake tls_handshake_;
+  const RpcEncryption encryption_;
+  bool tls_negotiated_;
+
+  // TSK state.
+  boost::optional<security::SignedTokenPB> authn_token_;
+
+  // Authentication state.
+  std::string plain_auth_user_;
+  std::string plain_pass_;
+  std::unique_ptr<sasl_secret_t, decltype(std::free)*> psecret_;
+
+  // The set of features advertised by the client. Filled in when we send
+  // the first message. This is not necessarily constant since some features
+  // may be dynamically enabled.
+  std::set<RpcFeatureFlag> client_features_;
+
+  // The set of features supported by the server. Filled in during negotiation.
+  std::set<RpcFeatureFlag> server_features_;
+
+  // The authentication type. Filled in during negotiation.
+  AuthenticationType negotiated_authn_;
+
+  // The SASL mechanism used by the connection. Filled in during negotiation.
+  SaslMechanism::Type negotiated_mech_;
+
+  // Negotiation timeout deadline.
+  MonoTime deadline_;
+};
+
+} // namespace rpc
+} // namespace kudu