You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/17 03:14:31 UTC
[15/15] incubator-impala git commit: IMPALA-4669: [KRPC] Import RPC
library from kudu@314c9d8
IMPALA-4669: [KRPC] Import RPC library from kudu@314c9d8
Change-Id: I06ab5b56312e482a27fa484414c338438ad6972c
Reviewed-on: http://gerrit.cloudera.org:8080/5718
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c7db60aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c7db60aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c7db60aa
Branch: refs/heads/master
Commit: c7db60aa46565c19634e8a791df3af8d116b9017
Parents: 852e1bb
Author: Henry Robinson <he...@cloudera.com>
Authored: Fri Oct 28 17:10:46 2016 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Aug 17 03:13:20 2017 +0000
----------------------------------------------------------------------
be/src/kudu/rpc/CMakeLists.txt | 128 +++
be/src/kudu/rpc/acceptor_pool.cc | 166 +++
be/src/kudu/rpc/acceptor_pool.h | 79 ++
be/src/kudu/rpc/blocking_ops.cc | 127 +++
be/src/kudu/rpc/blocking_ops.h | 62 ++
be/src/kudu/rpc/client_negotiation.cc | 776 ++++++++++++++
be/src/kudu/rpc/client_negotiation.h | 252 +++++
be/src/kudu/rpc/connection.cc | 732 ++++++++++++++
be/src/kudu/rpc/connection.h | 360 +++++++
be/src/kudu/rpc/constants.cc | 38 +
be/src/kudu/rpc/constants.h | 63 ++
be/src/kudu/rpc/exactly_once_rpc-test.cc | 589 +++++++++++
be/src/kudu/rpc/inbound_call.cc | 322 ++++++
be/src/kudu/rpc/inbound_call.h | 269 +++++
be/src/kudu/rpc/messenger.cc | 488 +++++++++
be/src/kudu/rpc/messenger.h | 354 +++++++
be/src/kudu/rpc/mt-rpc-test.cc | 291 ++++++
be/src/kudu/rpc/negotiation-test.cc | 1331 +++++++++++++++++++++++++
be/src/kudu/rpc/negotiation.cc | 317 ++++++
be/src/kudu/rpc/negotiation.h | 56 ++
be/src/kudu/rpc/outbound_call.cc | 509 ++++++++++
be/src/kudu/rpc/outbound_call.h | 363 +++++++
be/src/kudu/rpc/protoc-gen-krpc.cc | 674 +++++++++++++
be/src/kudu/rpc/proxy.cc | 115 +++
be/src/kudu/rpc/proxy.h | 121 +++
be/src/kudu/rpc/reactor-test.cc | 98 ++
be/src/kudu/rpc/reactor.cc | 750 ++++++++++++++
be/src/kudu/rpc/reactor.h | 370 +++++++
be/src/kudu/rpc/remote_method.cc | 49 +
be/src/kudu/rpc/remote_method.h | 51 +
be/src/kudu/rpc/remote_user.cc | 41 +
be/src/kudu/rpc/remote_user.h | 98 ++
be/src/kudu/rpc/request_tracker-test.cc | 83 ++
be/src/kudu/rpc/request_tracker.cc | 53 +
be/src/kudu/rpc/request_tracker.h | 85 ++
be/src/kudu/rpc/response_callback.h | 31 +
be/src/kudu/rpc/result_tracker.cc | 582 +++++++++++
be/src/kudu/rpc/result_tracker.h | 399 ++++++++
be/src/kudu/rpc/retriable_rpc.h | 296 ++++++
be/src/kudu/rpc/rpc-bench.cc | 260 +++++
be/src/kudu/rpc/rpc-test-base.h | 585 +++++++++++
be/src/kudu/rpc/rpc-test.cc | 808 +++++++++++++++
be/src/kudu/rpc/rpc.cc | 96 ++
be/src/kudu/rpc/rpc.h | 218 ++++
be/src/kudu/rpc/rpc_context.cc | 208 ++++
be/src/kudu/rpc/rpc_context.h | 224 +++++
be/src/kudu/rpc/rpc_controller.cc | 149 +++
be/src/kudu/rpc/rpc_controller.h | 256 +++++
be/src/kudu/rpc/rpc_header.proto | 365 +++++++
be/src/kudu/rpc/rpc_introspection.proto | 108 ++
be/src/kudu/rpc/rpc_service.h | 47 +
be/src/kudu/rpc/rpc_sidecar.cc | 102 ++
be/src/kudu/rpc/rpc_sidecar.h | 68 ++
be/src/kudu/rpc/rpc_stub-test.cc | 679 +++++++++++++
be/src/kudu/rpc/rpcz_store.cc | 255 +++++
be/src/kudu/rpc/rpcz_store.h | 74 ++
be/src/kudu/rpc/rtest.proto | 150 +++
be/src/kudu/rpc/rtest_diff_package.proto | 26 +
be/src/kudu/rpc/sasl_common.cc | 459 +++++++++
be/src/kudu/rpc/sasl_common.h | 126 +++
be/src/kudu/rpc/sasl_helper.cc | 134 +++
be/src/kudu/rpc/sasl_helper.h | 109 ++
be/src/kudu/rpc/serialization.cc | 199 ++++
be/src/kudu/rpc/serialization.h | 88 ++
be/src/kudu/rpc/server_negotiation.cc | 980 ++++++++++++++++++
be/src/kudu/rpc/server_negotiation.h | 248 +++++
be/src/kudu/rpc/service_if.cc | 149 +++
be/src/kudu/rpc/service_if.h | 137 +++
be/src/kudu/rpc/service_pool.cc | 219 ++++
be/src/kudu/rpc/service_pool.h | 98 ++
be/src/kudu/rpc/service_queue-test.cc | 144 +++
be/src/kudu/rpc/service_queue.cc | 142 +++
be/src/kudu/rpc/service_queue.h | 215 ++++
be/src/kudu/rpc/transfer.cc | 264 +++++
be/src/kudu/rpc/transfer.h | 203 ++++
be/src/kudu/rpc/user_credentials.cc | 57 ++
be/src/kudu/rpc/user_credentials.h | 47 +
77 files changed, 20264 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/CMakeLists.txt b/be/src/kudu/rpc/CMakeLists.txt
new file mode 100644
index 0000000..0cfe6e9
--- /dev/null
+++ b/be/src/kudu/rpc/CMakeLists.txt
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#### Global header protobufs
+PROTOBUF_GENERATE_CPP(
+ RPC_HEADER_PROTO_SRCS RPC_HEADER_PROTO_HDRS RPC_HEADER_PROTO_TGTS
+ SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+ BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+ PROTO_FILES rpc_header.proto)
+ADD_EXPORTABLE_LIBRARY(rpc_header_proto
+ SRCS ${RPC_HEADER_PROTO_SRCS}
+ DEPS protobuf pb_util_proto token_proto
+ NONLINK_DEPS ${RPC_HEADER_PROTO_TGTS})
+
+PROTOBUF_GENERATE_CPP(
+ RPC_INTROSPECTION_PROTO_SRCS RPC_INTROSPECTION_PROTO_HDRS RPC_INTROSPECTION_PROTO_TGTS
+ SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+ BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+ PROTO_FILES rpc_introspection.proto)
+set(RPC_INTROSPECTION_PROTO_LIBS
+ rpc_header_proto
+ protobuf)
+ADD_EXPORTABLE_LIBRARY(rpc_introspection_proto
+ SRCS ${RPC_INTROSPECTION_PROTO_SRCS}
+ DEPS ${RPC_INTROSPECTION_PROTO_LIBS}
+ NONLINK_DEPS ${RPC_INTROSPECTION_PROTO_TGTS})
+
+### RPC library
+set(KRPC_SRCS
+ acceptor_pool.cc
+ blocking_ops.cc
+ client_negotiation.cc
+ connection.cc
+ constants.cc
+ inbound_call.cc
+ messenger.cc
+ negotiation.cc
+ outbound_call.cc
+ proxy.cc
+ reactor.cc
+ remote_method.cc
+ remote_user.cc
+ request_tracker.cc
+ result_tracker.cc
+ rpc.cc
+ rpc_context.cc
+ rpc_controller.cc
+ rpc_sidecar.cc
+ rpcz_store.cc
+ sasl_common.cc
+ sasl_helper.cc
+ serialization.cc
+ server_negotiation.cc
+ service_if.cc
+ service_pool.cc
+ service_queue.cc
+ user_credentials.cc
+ transfer.cc
+)
+
+set(KRPC_LIBS
+ cyrus_sasl
+ gutil
+ kudu_util
+ libev
+ rpc_header_proto
+ rpc_introspection_proto
+ security)
+
+ADD_EXPORTABLE_LIBRARY(krpc
+ SRCS ${KRPC_SRCS}
+ DEPS ${KRPC_LIBS})
+
+### RPC generator tool
+add_executable(protoc-gen-krpc protoc-gen-krpc.cc)
+target_link_libraries(protoc-gen-krpc
+ ${KUDU_BASE_LIBS}
+ rpc_header_proto
+ protoc
+ protobuf
+ gutil
+ kudu_util)
+
+#### RPC test
+PROTOBUF_GENERATE_CPP(
+ RPC_TEST_DIFF_PACKAGE_SRCS RPC_TEST_DIFF_PACKAGE_HDRS RPC_TEST_DIFF_PACKAGE_TGTS
+ SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+ BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+ PROTO_FILES rtest_diff_package.proto)
+add_library(rtest_diff_package_proto ${RPC_TEST_DIFF_PACKAGE_SRCS} ${RPC_TEST_DIFF_PACKAGE_HDRS})
+target_link_libraries(rtest_diff_package_proto rpc_header_proto)
+
+KRPC_GENERATE(
+ RTEST_KRPC_SRCS RTEST_KRPC_HDRS RTEST_KRPC_TGTS
+ SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+ BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+ PROTO_FILES rtest.proto)
+add_library(rtest_krpc ${RTEST_KRPC_SRCS} ${RTEST_KRPC_HDRS})
+target_link_libraries(rtest_krpc
+ krpc
+ rpc_header_proto
+ rtest_diff_package_proto)
+
+# Tests
+set(KUDU_TEST_LINK_LIBS rtest_krpc krpc rpc_header_proto security-test ${KUDU_MIN_TEST_LIBS})
+ADD_KUDU_TEST(exactly_once_rpc-test)
+ADD_KUDU_TEST(mt-rpc-test RUN_SERIAL true)
+ADD_KUDU_TEST(negotiation-test)
+ADD_KUDU_TEST(reactor-test)
+ADD_KUDU_TEST(request_tracker-test)
+ADD_KUDU_TEST(rpc-bench RUN_SERIAL true)
+ADD_KUDU_TEST(rpc-test)
+ADD_KUDU_TEST(rpc_stub-test)
+ADD_KUDU_TEST(service_queue-test)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/acceptor_pool.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/acceptor_pool.cc b/be/src/kudu/rpc/acceptor_pool.cc
new file mode 100644
index 0000000..f3c935c
--- /dev/null
+++ b/be/src/kudu/rpc/acceptor_pool.cc
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/acceptor_pool.h"
+
+#include <pthread.h>
+
+#include <cinttypes>
+#include <cstdint>
+#include <iostream>
+#include <string>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+using google::protobuf::Message;
+using std::string;
+
+METRIC_DEFINE_counter(server, rpc_connections_accepted,
+ "RPC Connections Accepted",
+ kudu::MetricUnit::kConnections,
+ "Number of incoming TCP connections made to the RPC server");
+
+DEFINE_int32(rpc_acceptor_listen_backlog, 128,
+ "Socket backlog parameter used when listening for RPC connections. "
+ "This defines the maximum length to which the queue of pending "
+ "TCP connections inbound to the RPC server may grow. If a connection "
+ "request arrives when the queue is full, the client may receive "
+ "an error. Higher values may help the server ride over bursts of "
+ "new inbound connection requests.");
+TAG_FLAG(rpc_acceptor_listen_backlog, advanced);
+
+namespace kudu {
+namespace rpc {
+
+AcceptorPool::AcceptorPool(Messenger* messenger, Socket* socket,
+ Sockaddr bind_address)
+ : messenger_(messenger),
+ socket_(socket->Release()),
+ bind_address_(std::move(bind_address)),
+ rpc_connections_accepted_(METRIC_rpc_connections_accepted.Instantiate(
+ messenger->metric_entity())),
+ closing_(false) {}
+
+AcceptorPool::~AcceptorPool() {
+ Shutdown();
+}
+
+Status AcceptorPool::Start(int num_threads) {
+ RETURN_NOT_OK(socket_.Listen(FLAGS_rpc_acceptor_listen_backlog));
+
+ for (int i = 0; i < num_threads; i++) {
+ scoped_refptr<kudu::Thread> new_thread;
+ Status s = kudu::Thread::Create("acceptor pool", "acceptor",
+ &AcceptorPool::RunThread, this, &new_thread);
+ if (!s.ok()) {
+ Shutdown();
+ return s;
+ }
+ threads_.push_back(new_thread);
+ }
+ return Status::OK();
+}
+
+void AcceptorPool::Shutdown() {
+ if (Acquire_CompareAndSwap(&closing_, false, true) != false) {
+ VLOG(2) << "Acceptor Pool on " << bind_address_.ToString()
+ << " already shut down";
+ return;
+ }
+
+#if defined(__linux__)
+ // Closing the socket will break us out of accept() if we're in it, and
+ // prevent future accepts.
+ WARN_NOT_OK(socket_.Shutdown(true, true),
+ strings::Substitute("Could not shut down acceptor socket on $0",
+ bind_address_.ToString()));
+#else
+ // Calling shutdown on an accepting (non-connected) socket is illegal on most
+ // platforms (but not Linux). Instead, the accepting threads are interrupted
+ // forcefully.
+ for (const scoped_refptr<kudu::Thread>& thread : threads_) {
+ pthread_cancel(thread.get()->pthread_id());
+ }
+#endif
+
+ for (const scoped_refptr<kudu::Thread>& thread : threads_) {
+ CHECK_OK(ThreadJoiner(thread.get()).Join());
+ }
+ threads_.clear();
+
+ // Close the socket: keeping the descriptor open and, possibly, receiving late
+ // not-to-be-read messages from the peer does not make much sense. The
+ // Socket::Close() method is called upon destruction of the aggregated socket_
+ // object as well. However, the typical ownership pattern of an AcceptorPool
+ // object includes two references wrapped via a shared_ptr smart pointer: one
+ // is held by Messenger, another by RpcServer. If not calling Socket::Close()
+ // here, it would necessary to wait until Messenger::Shutdown() is called for
+ // the corresponding messenger object to close this socket.
+ ignore_result(socket_.Close());
+}
+
+Sockaddr AcceptorPool::bind_address() const {
+ return bind_address_;
+}
+
+Status AcceptorPool::GetBoundAddress(Sockaddr* addr) const {
+ return socket_.GetSocketAddress(addr);
+}
+
+void AcceptorPool::RunThread() {
+ while (true) {
+ Socket new_sock;
+ Sockaddr remote;
+ VLOG(2) << "calling accept() on socket " << socket_.GetFd()
+ << " listening on " << bind_address_.ToString();
+ Status s = socket_.Accept(&new_sock, &remote, Socket::FLAG_NONBLOCKING);
+ if (!s.ok()) {
+ if (Release_Load(&closing_)) {
+ break;
+ }
+ KLOG_EVERY_N_SECS(WARNING, 1) << "AcceptorPool: accept failed: " << s.ToString()
+ << THROTTLE_MSG;
+ continue;
+ }
+ s = new_sock.SetNoDelay(true);
+ if (!s.ok()) {
+ KLOG_EVERY_N_SECS(WARNING, 1) << "Acceptor with remote = " << remote.ToString()
+ << " failed to set TCP_NODELAY on a newly accepted socket: "
+ << s.ToString() << THROTTLE_MSG;
+ continue;
+ }
+ rpc_connections_accepted_->Increment();
+ messenger_->RegisterInboundSocket(&new_sock, remote);
+ }
+ VLOG(1) << "AcceptorPool shutting down.";
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/acceptor_pool.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/acceptor_pool.h b/be/src/kudu/rpc/acceptor_pool.h
new file mode 100644
index 0000000..92b7fc5
--- /dev/null
+++ b/be/src/kudu/rpc/acceptor_pool.h
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_ACCEPTOR_POOL_H
+#define KUDU_RPC_ACCEPTOR_POOL_H
+
+#include <vector>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Counter;
+class Socket;
+
+namespace rpc {
+
+class Messenger;
+
+// A pool of threads calling accept() to create new connections.
+// Acceptor pool threads terminate when they notice that the messenger has been
+// shut down, if Shutdown() is called, or if the pool object is destructed.
+class AcceptorPool {
+ public:
+ // Create a new acceptor pool. Calls socket::Release to take ownership of the
+ // socket.
+ // 'socket' must be already bound, but should not yet be listening.
+ AcceptorPool(Messenger *messenger, Socket *socket, Sockaddr bind_address);
+ ~AcceptorPool();
+
+ // Start listening and accepting connections.
+ Status Start(int num_threads);
+ void Shutdown();
+
+ // Return the address that the pool is bound to. If the port is specified as
+ // 0, then this will always return port 0.
+ Sockaddr bind_address() const;
+
+ // Return the address that the pool is bound to. This only works while the
+ // socket is open, and if the specified port is 0 then this will return the
+ // actual port that was bound.
+ Status GetBoundAddress(Sockaddr* addr) const;
+
+ private:
+ void RunThread();
+
+ Messenger *messenger_;
+ Socket socket_;
+ Sockaddr bind_address_;
+ std::vector<scoped_refptr<kudu::Thread> > threads_;
+
+ scoped_refptr<Counter> rpc_connections_accepted_;
+
+ Atomic32 closing_;
+
+ DISALLOW_COPY_AND_ASSIGN(AcceptorPool);
+};
+
+} // namespace rpc
+} // namespace kudu
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/blocking_ops.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/blocking_ops.cc b/be/src/kudu/rpc/blocking_ops.cc
new file mode 100644
index 0000000..64ae2c0
--- /dev/null
+++ b/be/src/kudu/rpc/blocking_ops.cc
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/blocking_ops.h"
+
+#include <stdint.h>
+#include <string.h>
+
+#include <glog/logging.h>
+#include <google/protobuf/message_lite.h>
+
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/serialization.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace rpc {
+
+using google::protobuf::MessageLite;
+
+const char kHTTPHeader[] = "HTTP";
+
+Status EnsureBlockingMode(const Socket* sock) {
+ bool is_nonblocking;
+ RETURN_NOT_OK(sock->IsNonBlocking(&is_nonblocking));
+ if (is_nonblocking) {
+ return Status::IllegalState("Underlying socket is not set to blocking mode!");
+ }
+ return Status::OK();
+}
+
+Status SendFramedMessageBlocking(Socket* sock, const MessageLite& header, const MessageLite& msg,
+ const MonoTime& deadline) {
+ DCHECK(sock != nullptr);
+ DCHECK(header.IsInitialized()) << "header protobuf must be initialized";
+ DCHECK(msg.IsInitialized()) << "msg protobuf must be initialized";
+
+ RETURN_NOT_OK(EnsureBlockingMode(sock));
+
+ // Ensure we are in blocking mode.
+ // These blocking calls are typically not in the fast path, so doing this for all build types.
+ bool is_non_blocking = false;
+ RETURN_NOT_OK(sock->IsNonBlocking(&is_non_blocking));
+ DCHECK(!is_non_blocking) << "Socket must be in blocking mode to use SendFramedMessage";
+
+ // Serialize message
+ faststring param_buf;
+ serialization::SerializeMessage(msg, ¶m_buf);
+
+ // Serialize header and initial length
+ faststring header_buf;
+ serialization::SerializeHeader(header, param_buf.size(), &header_buf);
+
+ // Write header & param to stream
+ size_t nsent;
+ RETURN_NOT_OK(sock->BlockingWrite(header_buf.data(), header_buf.size(), &nsent, deadline));
+ RETURN_NOT_OK(sock->BlockingWrite(param_buf.data(), param_buf.size(), &nsent, deadline));
+
+ return Status::OK();
+}
+
+Status ReceiveFramedMessageBlocking(Socket* sock, faststring* recv_buf,
+ MessageLite* header, Slice* param_buf, const MonoTime& deadline) {
+ DCHECK(sock != nullptr);
+ DCHECK(recv_buf != nullptr);
+ DCHECK(header != nullptr);
+ DCHECK(param_buf != nullptr);
+
+ RETURN_NOT_OK(EnsureBlockingMode(sock));
+
+ // Read the message prefix, which specifies the length of the payload.
+ recv_buf->clear();
+ recv_buf->resize(kMsgLengthPrefixLength);
+ size_t recvd = 0;
+ RETURN_NOT_OK(sock->BlockingRecv(recv_buf->data(), kMsgLengthPrefixLength, &recvd, deadline));
+ uint32_t payload_len = NetworkByteOrder::Load32(recv_buf->data());
+
+ // Verify that the payload size isn't out of bounds.
+ // This can happen because of network corruption, or a naughty client.
+ if (PREDICT_FALSE(payload_len > FLAGS_rpc_max_message_size)) {
+ // A common user mistake is to try to speak the Kudu RPC protocol to an
+ // HTTP endpoint, or vice versa.
+ if (memcmp(recv_buf->data(), kHTTPHeader, strlen(kHTTPHeader)) == 0) {
+ return Status::IOError(
+ "received invalid RPC message which appears to be an HTTP response. "
+ "Verify that you have specified a valid RPC port and not an HTTP port.");
+ }
+
+ return Status::IOError(
+ strings::Substitute(
+ "received invalid message of size $0 which exceeds"
+ " the rpc_max_message_size of $1 bytes",
+ payload_len, FLAGS_rpc_max_message_size));
+ }
+
+ // Read the message payload.
+ recvd = 0;
+ recv_buf->resize(payload_len + kMsgLengthPrefixLength);
+ RETURN_NOT_OK(sock->BlockingRecv(recv_buf->data() + kMsgLengthPrefixLength,
+ payload_len, &recvd, deadline));
+ RETURN_NOT_OK(serialization::ParseMessage(Slice(*recv_buf), header, param_buf));
+ return Status::OK();
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/blocking_ops.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/blocking_ops.h b/be/src/kudu/rpc/blocking_ops.h
new file mode 100644
index 0000000..01bb7a6
--- /dev/null
+++ b/be/src/kudu/rpc/blocking_ops.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_BLOCKING_OPS_H
+#define KUDU_RPC_BLOCKING_OPS_H
+
+#include <set>
+#include <string>
+
+namespace google {
+namespace protobuf {
+class MessageLite;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+
+class faststring;
+class MonoTime;
+class Slice;
+class Sockaddr;
+class Socket;
+class Status;
+
+namespace rpc {
+
+// Returns OK if socket is in blocking mode. Otherwise, returns an error.
+Status EnsureBlockingMode(const Socket* sock);
+
+// Encode and send a message over a socket.
+// header: Request or Response header protobuf.
+// msg: Protobuf message to send. This message must be fully initialized.
+// deadline: Latest time allowed for receive to complete before timeout.
+Status SendFramedMessageBlocking(Socket* sock, const google::protobuf::MessageLite& header,
+ const google::protobuf::MessageLite& msg, const MonoTime& deadline);
+
+// Receive a full message frame from the server.
+// recv_buf: buffer to use for reading the data from the socket.
+// header: Request or Response header protobuf.
+// param_buf: Slice into recv_buf containing unparsed RPC param protobuf data.
+// deadline: Latest time allowed for receive to complete before timeout.
+Status ReceiveFramedMessageBlocking(Socket* sock, faststring* recv_buf,
+ google::protobuf::MessageLite* header, Slice* param_buf, const MonoTime& deadline);
+
+} // namespace rpc
+} // namespace kudu
+
+#endif // KUDU_RPC_BLOCKING_OPS_H
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/client_negotiation.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/client_negotiation.cc b/be/src/kudu/rpc/client_negotiation.cc
new file mode 100644
index 0000000..8ce5190
--- /dev/null
+++ b/be/src/kudu/rpc/client_negotiation.cc
@@ -0,0 +1,776 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/client_negotiation.h"
+
+#include <string.h>
+
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <sasl/sasl.h>
+
+#include "kudu/gutil/casts.h"
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/rpc/blocking_ops.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/sasl_helper.h"
+#include "kudu/rpc/serialization.h"
+#include "kudu/security/cert.h"
+#include "kudu/security/tls_context.h"
+#include "kudu/security/tls_handshake.h"
+#include "kudu/security/tls_socket.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/trace.h"
+
+using std::map;
+using std::set;
+using std::string;
+using std::unique_ptr;
+
+using strings::Substitute;
+
+DECLARE_bool(rpc_encrypt_loopback_connections);
+
+namespace kudu {
+namespace rpc {
+
+static int ClientNegotiationGetoptCb(ClientNegotiation* client_negotiation,
+ const char* plugin_name,
+ const char* option,
+ const char** result,
+ unsigned* len) {
+ return client_negotiation->GetOptionCb(plugin_name, option, result, len);
+}
+
+static int ClientNegotiationSimpleCb(ClientNegotiation* client_negotiation,
+ int id,
+ const char** result,
+ unsigned* len) {
+ return client_negotiation->SimpleCb(id, result, len);
+}
+
+static int ClientNegotiationSecretCb(sasl_conn_t* conn,
+ ClientNegotiation* client_negotiation,
+ int id,
+ sasl_secret_t** psecret) {
+ return client_negotiation->SecretCb(conn, id, psecret);
+}
+
+// Return an appropriately-typed Status object based on an ErrorStatusPB returned
+// from an Error RPC.
+// In case there is no relevant Status type, return a RuntimeError.
+static Status StatusFromRpcError(const ErrorStatusPB& error) {
+ DCHECK(error.IsInitialized()) << "Error status PB must be initialized";
+ if (PREDICT_FALSE(!error.has_code())) {
+ return Status::RuntimeError(error.message());
+ }
+ const string code_name = ErrorStatusPB::RpcErrorCodePB_Name(error.code());
+ switch (error.code()) {
+ case ErrorStatusPB_RpcErrorCodePB_FATAL_UNAUTHORIZED: // fall-through
+ case ErrorStatusPB_RpcErrorCodePB_FATAL_INVALID_AUTHENTICATION_TOKEN:
+ return Status::NotAuthorized(code_name, error.message());
+ case ErrorStatusPB_RpcErrorCodePB_ERROR_UNAVAILABLE:
+ return Status::ServiceUnavailable(code_name, error.message());
+ default:
+ return Status::RuntimeError(code_name, error.message());
+ }
+}
+
+ClientNegotiation::ClientNegotiation(unique_ptr<Socket> socket,
+ const security::TlsContext* tls_context,
+ const boost::optional<security::SignedTokenPB>& authn_token,
+ RpcEncryption encryption)
+ : socket_(std::move(socket)),
+ helper_(SaslHelper::CLIENT),
+ tls_context_(tls_context),
+ encryption_(encryption),
+ tls_negotiated_(false),
+ authn_token_(authn_token),
+ psecret_(nullptr, std::free),
+ negotiated_authn_(AuthenticationType::INVALID),
+ negotiated_mech_(SaslMechanism::INVALID),
+ deadline_(MonoTime::Max()) {
+ callbacks_.push_back(SaslBuildCallback(SASL_CB_GETOPT,
+ reinterpret_cast<int (*)()>(&ClientNegotiationGetoptCb), this));
+ callbacks_.push_back(SaslBuildCallback(SASL_CB_AUTHNAME,
+ reinterpret_cast<int (*)()>(&ClientNegotiationSimpleCb), this));
+ callbacks_.push_back(SaslBuildCallback(SASL_CB_PASS,
+ reinterpret_cast<int (*)()>(&ClientNegotiationSecretCb), this));
+ callbacks_.push_back(SaslBuildCallback(SASL_CB_LIST_END, nullptr, nullptr));
+ DCHECK(socket_);
+ DCHECK(tls_context_);
+}
+
+Status ClientNegotiation::EnablePlain(const string& user, const string& pass) {
+ RETURN_NOT_OK(helper_.EnablePlain());
+ plain_auth_user_ = user;
+ plain_pass_ = pass;
+ return Status::OK();
+}
+
+Status ClientNegotiation::EnableGSSAPI() {
+ return helper_.EnableGSSAPI();
+}
+
+SaslMechanism::Type ClientNegotiation::negotiated_mechanism() const {
+ return negotiated_mech_;
+}
+
+void ClientNegotiation::set_server_fqdn(const string& domain_name) {
+ helper_.set_server_fqdn(domain_name);
+}
+
+void ClientNegotiation::set_deadline(const MonoTime& deadline) {
+ deadline_ = deadline;
+}
+
+Status ClientNegotiation::Negotiate(unique_ptr<ErrorStatusPB>* rpc_error) {
+ TRACE("Beginning negotiation");
+
+ // Ensure we can use blocking calls on the socket during negotiation.
+ RETURN_NOT_OK(EnsureBlockingMode(socket_.get()));
+
+ // Step 1: send the connection header.
+ RETURN_NOT_OK(SendConnectionHeader());
+
+ faststring recv_buf;
+
+ { // Step 2: send and receive the NEGOTIATE step messages.
+ RETURN_NOT_OK(SendNegotiate());
+ NegotiatePB response;
+ RETURN_NOT_OK(RecvNegotiatePB(&response, &recv_buf, rpc_error));
+ RETURN_NOT_OK(HandleNegotiate(response));
+ TRACE("Negotiated authn=$0", AuthenticationTypeToString(negotiated_authn_));
+ }
+
+ // Step 3: if both ends support TLS, do a TLS handshake.
+ // TODO(KUDU-1921): allow the client to require TLS.
+ if (encryption_ != RpcEncryption::DISABLED &&
+ ContainsKey(server_features_, TLS)) {
+ RETURN_NOT_OK(tls_context_->InitiateHandshake(security::TlsHandshakeType::CLIENT,
+ &tls_handshake_));
+
+ if (negotiated_authn_ == AuthenticationType::SASL) {
+ // When using SASL authentication, verifying the server's certificate is
+ // not necessary. This allows the client to still use TLS encryption for
+ // connections to servers which only have a self-signed certificate.
+ tls_handshake_.set_verification_mode(security::TlsVerificationMode::VERIFY_NONE);
+ }
+
+ // To initiate the TLS handshake, we pretend as if the server sent us an
+ // empty TLS_HANDSHAKE token.
+ NegotiatePB initial;
+ initial.set_step(NegotiatePB::TLS_HANDSHAKE);
+ initial.set_tls_handshake("");
+ Status s = HandleTlsHandshake(initial);
+
+ while (s.IsIncomplete()) {
+ NegotiatePB response;
+ RETURN_NOT_OK(RecvNegotiatePB(&response, &recv_buf, rpc_error));
+ s = HandleTlsHandshake(response);
+ }
+ RETURN_NOT_OK(s);
+ tls_negotiated_ = true;
+ }
+
+ // Step 4: Authentication
+ switch (negotiated_authn_) {
+ case AuthenticationType::SASL:
+ RETURN_NOT_OK(AuthenticateBySasl(&recv_buf, rpc_error));
+ break;
+ case AuthenticationType::TOKEN:
+ RETURN_NOT_OK(AuthenticateByToken(&recv_buf, rpc_error));
+ break;
+ case AuthenticationType::CERTIFICATE:
+ // The TLS handshake has already authenticated the server.
+ break;
+ case AuthenticationType::INVALID: LOG(FATAL) << "unreachable";
+ }
+
+ // Step 5: Send connection context.
+ RETURN_NOT_OK(SendConnectionContext());
+
+ TRACE("Negotiation successful");
+ return Status::OK();
+}
+
+Status ClientNegotiation::SendNegotiatePB(const NegotiatePB& msg) {
+ RequestHeader header;
+ header.set_call_id(kNegotiateCallId);
+
+ DCHECK(socket_);
+ DCHECK(msg.IsInitialized()) << "message must be initialized";
+ DCHECK(msg.has_step()) << "message must have a step";
+
+ TRACE("Sending $0 NegotiatePB request", NegotiatePB::NegotiateStep_Name(msg.step()));
+ return SendFramedMessageBlocking(socket(), header, msg, deadline_);
+}
+
+Status ClientNegotiation::RecvNegotiatePB(NegotiatePB* msg,
+ faststring* buffer,
+ unique_ptr<ErrorStatusPB>* rpc_error) {
+ ResponseHeader header;
+ Slice param_buf;
+ RETURN_NOT_OK(ReceiveFramedMessageBlocking(socket(), buffer, &header, ¶m_buf, deadline_));
+ RETURN_NOT_OK(helper_.CheckNegotiateCallId(header.call_id()));
+
+ if (header.is_error()) {
+ return ParseError(param_buf, rpc_error);
+ }
+
+ RETURN_NOT_OK(helper_.ParseNegotiatePB(param_buf, msg));
+ TRACE("Received $0 NegotiatePB response", NegotiatePB::NegotiateStep_Name(msg->step()));
+ return Status::OK();
+}
+
+Status ClientNegotiation::ParseError(const Slice& err_data,
+ unique_ptr<ErrorStatusPB>* rpc_error) {
+ unique_ptr<ErrorStatusPB> error(new ErrorStatusPB);
+ if (!error->ParseFromArray(err_data.data(), err_data.size())) {
+ return Status::IOError("invalid error response, missing fields",
+ error->InitializationErrorString());
+ }
+ Status s = StatusFromRpcError(*error);
+ TRACE("Received error response from server: $0", s.ToString());
+
+ if (rpc_error) {
+ rpc_error->swap(error);
+ }
+ return s;
+}
+
+Status ClientNegotiation::SendConnectionHeader() {
+ const uint8_t buflen = kMagicNumberLength + kHeaderFlagsLength;
+ uint8_t buf[buflen];
+ serialization::SerializeConnHeader(buf);
+ size_t nsent;
+ return socket()->BlockingWrite(buf, buflen, &nsent, deadline_);
+}
+
+Status ClientNegotiation::InitSaslClient() {
+ RETURN_NOT_OK(SaslInit());
+
+ // TODO(KUDU-1922): consider setting SASL_SUCCESS_DATA
+ unsigned flags = 0;
+
+ sasl_conn_t* sasl_conn = nullptr;
+ RETURN_NOT_OK_PREPEND(WrapSaslCall(nullptr /* no conn */, [&]() {
+ return sasl_client_new(
+ kSaslProtoName, // Registered name of the service using SASL. Required.
+ helper_.server_fqdn(), // The fully qualified domain name of the remote server.
+ nullptr, // Local and remote IP address strings. (we don't use
+ nullptr, // any mechanisms which require this info.)
+ &callbacks_[0], // Connection-specific callbacks.
+ flags,
+ &sasl_conn);
+ }), "Unable to create new SASL client");
+ sasl_conn_.reset(sasl_conn);
+ return Status::OK();
+}
+
+Status ClientNegotiation::SendNegotiate() {
+ NegotiatePB msg;
+ msg.set_step(NegotiatePB::NEGOTIATE);
+
+ // Advertise our supported features.
+ client_features_ = kSupportedClientRpcFeatureFlags;
+
+ if (encryption_ != RpcEncryption::DISABLED) {
+ client_features_.insert(TLS);
+ // If the remote peer is local, then we allow using TLS for authentication
+ // without encryption or integrity.
+ if (socket_->IsLoopbackConnection() && !FLAGS_rpc_encrypt_loopback_connections) {
+ client_features_.insert(TLS_AUTHENTICATION_ONLY);
+ }
+ }
+
+ for (RpcFeatureFlag feature : client_features_) {
+ msg.add_supported_features(feature);
+ }
+
+ if (!helper_.EnabledMechs().empty()) {
+ msg.add_authn_types()->mutable_sasl();
+ }
+ if (tls_context_->has_signed_cert() && !tls_context_->is_external_cert()) {
+ // We only provide authenticated TLS if the certificates are generated
+ // by the internal CA.
+ msg.add_authn_types()->mutable_certificate();
+ }
+ if (authn_token_ && tls_context_->has_trusted_cert()) {
+ // TODO(KUDU-1924): check that the authn token is not expired. Can this be done
+ // reliably on clients?
+ msg.add_authn_types()->mutable_token();
+ }
+
+ if (PREDICT_FALSE(msg.authn_types().empty())) {
+ return Status::NotAuthorized("client is not configured with an authentication type");
+ }
+
+ RETURN_NOT_OK(SendNegotiatePB(msg));
+ return Status::OK();
+}
+
+Status ClientNegotiation::HandleNegotiate(const NegotiatePB& response) {
+ if (PREDICT_FALSE(response.step() != NegotiatePB::NEGOTIATE)) {
+ return Status::NotAuthorized("expected NEGOTIATE step",
+ NegotiatePB::NegotiateStep_Name(response.step()));
+ }
+ TRACE("Received NEGOTIATE response from server");
+
+ // Fill in the set of features supported by the server.
+ for (int flag : response.supported_features()) {
+ // We only add the features that our local build knows about.
+ RpcFeatureFlag feature_flag = RpcFeatureFlag_IsValid(flag) ?
+ static_cast<RpcFeatureFlag>(flag) : UNKNOWN;
+ if (feature_flag != UNKNOWN) {
+ server_features_.insert(feature_flag);
+ }
+ }
+
+ if (encryption_ == RpcEncryption::REQUIRED &&
+ !ContainsKey(server_features_, RpcFeatureFlag::TLS)) {
+ return Status::NotAuthorized("server does not support required TLS encryption");
+ }
+
+ // Get the authentication type which the server would like to use.
+ DCHECK_LE(response.authn_types().size(), 1);
+ if (response.authn_types().empty()) {
+ // If the server doesn't send back an authentication type, default to SASL
+ // in order to maintain backwards compatibility.
+ negotiated_authn_ = AuthenticationType::SASL;
+ } else {
+ const auto& authn_type = response.authn_types(0);
+ switch (authn_type.type_case()) {
+ case AuthenticationTypePB::kSasl:
+ negotiated_authn_ = AuthenticationType::SASL;
+ break;
+ case AuthenticationTypePB::kToken:
+ // TODO(todd): we should also be checking tls_context_->has_trusted_cert()
+ // here to match the original logic we used to advertise TOKEN support,
+ // or perhaps just check explicitly whether we advertised TOKEN.
+ if (!authn_token_) {
+ return Status::RuntimeError(
+ "server chose token authentication, but client has no token");
+ }
+ negotiated_authn_ = AuthenticationType::TOKEN;
+ return Status::OK();
+ case AuthenticationTypePB::kCertificate:
+ if (!tls_context_->has_signed_cert()) {
+ return Status::RuntimeError(
+ "server chose certificate authentication, but client has no certificate");
+ }
+ negotiated_authn_ = AuthenticationType::CERTIFICATE;
+ return Status::OK();
+ case AuthenticationTypePB::TYPE_NOT_SET:
+ return Status::RuntimeError("server chose an unknown authentication type");
+ }
+ }
+
+ DCHECK_EQ(negotiated_authn_, AuthenticationType::SASL);
+
+ // Build a map of the SASL mechanisms offered by the server.
+ const set<SaslMechanism::Type>& client_mechs = helper_.EnabledMechs();
+ set<SaslMechanism::Type> server_mechs;
+ for (const NegotiatePB::SaslMechanism& sasl_mech : response.sasl_mechanisms()) {
+ auto mech = SaslMechanism::value_of(sasl_mech.mechanism());
+ if (mech == SaslMechanism::INVALID) {
+ continue;
+ }
+ server_mechs.insert(mech);
+ }
+
+ // Determine which SASL mechanism to use for authenticating the connection.
+ // We pick the most preferred mechanism which is supported by both parties.
+ // The preference list in order of most to least preferred:
+ // * GSSAPI
+ // * PLAIN
+ set<SaslMechanism::Type> common_mechs = STLSetIntersection(client_mechs, server_mechs);
+
+ if (common_mechs.empty()) {
+ if (ContainsKey(server_mechs, SaslMechanism::GSSAPI) &&
+ !ContainsKey(client_mechs, SaslMechanism::GSSAPI)) {
+ return Status::NotAuthorized("server requires authentication, "
+ "but client does not have Kerberos enabled");
+ }
+ if (!ContainsKey(server_mechs, SaslMechanism::GSSAPI) &&
+ ContainsKey(client_mechs, SaslMechanism::GSSAPI)) {
+ return Status::NotAuthorized("client requires authentication, "
+ "but server does not have Kerberos enabled");
+ }
+ string msg = Substitute("client/server supported SASL mechanism mismatch; "
+ "client mechanisms: [$0], server mechanisms: [$1]",
+ JoinMapped(client_mechs, SaslMechanism::name_of, ", "),
+ JoinMapped(server_mechs, SaslMechanism::name_of, ", "));
+
+ // For now, there should never be a SASL mechanism mismatch that isn't due
+ // to one of the sides requiring Kerberos and the other not having it, so
+ // lets sanity check that.
+ DLOG(FATAL) << msg;
+ return Status::NotAuthorized(msg);
+ }
+
+ // TODO(KUDU-1921): allow the client to require authentication.
+ if (ContainsKey(common_mechs, SaslMechanism::GSSAPI)) {
+ negotiated_mech_ = SaslMechanism::GSSAPI;
+ } else {
+ DCHECK(ContainsKey(common_mechs, SaslMechanism::PLAIN));
+ negotiated_mech_ = SaslMechanism::PLAIN;
+ }
+
+ return Status::OK();
+}
+
+Status ClientNegotiation::SendTlsHandshake(string tls_token) {
+ TRACE("Sending TLS_HANDSHAKE message to server");
+ NegotiatePB msg;
+ msg.set_step(NegotiatePB::TLS_HANDSHAKE);
+ msg.mutable_tls_handshake()->swap(tls_token);
+ return SendNegotiatePB(msg);
+}
+
+Status ClientNegotiation::HandleTlsHandshake(const NegotiatePB& response) {
+ if (PREDICT_FALSE(response.step() != NegotiatePB::TLS_HANDSHAKE)) {
+ return Status::NotAuthorized("expected TLS_HANDSHAKE step",
+ NegotiatePB::NegotiateStep_Name(response.step()));
+ }
+ TRACE("Received TLS_HANDSHAKE response from server");
+
+ if (PREDICT_FALSE(!response.has_tls_handshake())) {
+ return Status::NotAuthorized("No TLS handshake token in TLS_HANDSHAKE response from server");
+ }
+
+ string token;
+ Status s = tls_handshake_.Continue(response.tls_handshake(), &token);
+ if (s.IsIncomplete()) {
+ // Another roundtrip is required to complete the handshake.
+ RETURN_NOT_OK(SendTlsHandshake(std::move(token)));
+ }
+
+ // Check that the handshake step didn't produce an error. Will also propagate
+ // an Incomplete status.
+ RETURN_NOT_OK(s);
+
+ // TLS handshake is finished.
+ DCHECK(token.empty());
+
+ if (ContainsKey(server_features_, TLS_AUTHENTICATION_ONLY) &&
+ ContainsKey(client_features_, TLS_AUTHENTICATION_ONLY)) {
+ TRACE("Negotiated auth-only $0 with cipher suite $1",
+ tls_handshake_.GetProtocol(), tls_handshake_.GetCipherSuite());
+ return tls_handshake_.FinishNoWrap(*socket_);
+ }
+
+ TRACE("Negotiated $0 with cipher suite $1",
+ tls_handshake_.GetProtocol(), tls_handshake_.GetCipherSuite());
+ return tls_handshake_.Finish(&socket_);
+}
+
+Status ClientNegotiation::AuthenticateBySasl(faststring* recv_buf,
+ unique_ptr<ErrorStatusPB>* rpc_error) {
+ RETURN_NOT_OK(InitSaslClient());
+ Status s = SendSaslInitiate();
+
+ // HandleSasl[Initiate, Challenge] return incomplete if an additional
+ // challenge step is required, or OK if a SASL_SUCCESS message is expected.
+ while (s.IsIncomplete()) {
+ NegotiatePB challenge;
+ RETURN_NOT_OK(RecvNegotiatePB(&challenge, recv_buf, rpc_error));
+ s = HandleSaslChallenge(challenge);
+ }
+
+ // Propagate failure from SendSaslInitiate or HandleSaslChallenge.
+ RETURN_NOT_OK(s);
+
+ // Server challenges are over; we now expect the success message.
+ NegotiatePB success;
+ RETURN_NOT_OK(RecvNegotiatePB(&success, recv_buf, rpc_error));
+ return HandleSaslSuccess(success);
+}
+
+Status ClientNegotiation::AuthenticateByToken(faststring* recv_buf,
+ unique_ptr<ErrorStatusPB>* rpc_error) {
+ // Sanity check that TLS has been negotiated. Sending the token on an
+ // unencrypted channel is a big no-no.
+ CHECK(tls_negotiated_);
+
+ // Send the token to the server.
+ NegotiatePB pb;
+ pb.set_step(NegotiatePB::TOKEN_EXCHANGE);
+ pb.mutable_authn_token()->Swap(authn_token_.get_ptr());
+ RETURN_NOT_OK(SendNegotiatePB(pb));
+ pb.Clear();
+
+ // Check that the server responds with a non-error TOKEN_EXCHANGE message.
+ RETURN_NOT_OK(RecvNegotiatePB(&pb, recv_buf, rpc_error));
+ if (pb.step() != NegotiatePB::TOKEN_EXCHANGE) {
+ return Status::NotAuthorized("expected TOKEN_EXCHANGE step",
+ NegotiatePB::NegotiateStep_Name(pb.step()));
+ }
+
+ return Status::OK();
+}
+
+Status ClientNegotiation::SendSaslInitiate() {
+ TRACE("Initiating SASL $0 handshake", SaslMechanism::name_of(negotiated_mech_));
+
+ // At this point we've already chosen the SASL mechanism to use
+ // (negotiated_mech_), but we need to let the SASL library know. SASL likes to
+ // choose the mechanism from among a list of possible options, so we simply
+ // provide it one option, and then check that it picks that option.
+
+ const char* init_msg = nullptr;
+ unsigned init_msg_len = 0;
+ const char* negotiated_mech = nullptr;
+
+ /* select a mechanism for a connection
+ * mechlist -- mechanisms server has available (punctuation ignored)
+ * output:
+ * prompt_need -- on SASL_INTERACT, list of prompts needed to continue
+ * clientout -- the initial client response to send to the server
+ * mech -- set to mechanism name
+ *
+ * Returns:
+ * SASL_OK -- success
+ * SASL_CONTINUE -- negotiation required
+ * SASL_NOMEM -- not enough memory
+ * SASL_NOMECH -- no mechanism meets requested properties
+ * SASL_INTERACT -- user interaction needed to fill in prompt_need list
+ */
+ TRACE("Calling sasl_client_start()");
+ const Status s = WrapSaslCall(sasl_conn_.get(), [&]() {
+ return sasl_client_start(
+ sasl_conn_.get(), // The SASL connection context created by init()
+ SaslMechanism::name_of(negotiated_mech_), // The list of mechanisms to negotiate.
+ nullptr, // Disables INTERACT return if NULL.
+ &init_msg, // Filled in on success.
+ &init_msg_len, // Filled in on success.
+ &negotiated_mech); // Filled in on success.
+ });
+
+ if (PREDICT_FALSE(!s.IsIncomplete() && !s.ok())) {
+ return s;
+ }
+
+ // Check that the SASL library is using the mechanism that we picked.
+ DCHECK_EQ(SaslMechanism::value_of(negotiated_mech), negotiated_mech_);
+
+ // If the negotiated mechanism is GSSAPI (Kerberos), configure SASL to use
+ // integrity protection so that the channel bindings and nonce can be
+ // verified.
+ if (negotiated_mech_ == SaslMechanism::GSSAPI) {
+ RETURN_NOT_OK(EnableIntegrityProtection(sasl_conn_.get()));
+ }
+
+ NegotiatePB msg;
+ msg.set_step(NegotiatePB::SASL_INITIATE);
+ msg.mutable_token()->assign(init_msg, init_msg_len);
+ msg.add_sasl_mechanisms()->set_mechanism(negotiated_mech);
+ RETURN_NOT_OK(SendNegotiatePB(msg));
+ return s;
+}
+
+Status ClientNegotiation::SendSaslResponse(const char* resp_msg, unsigned resp_msg_len) {
+ NegotiatePB reply;
+ reply.set_step(NegotiatePB::SASL_RESPONSE);
+ reply.mutable_token()->assign(resp_msg, resp_msg_len);
+ return SendNegotiatePB(reply);
+}
+
+Status ClientNegotiation::HandleSaslChallenge(const NegotiatePB& response) {
+ if (PREDICT_FALSE(response.step() != NegotiatePB::SASL_CHALLENGE)) {
+ return Status::NotAuthorized("expected SASL_CHALLENGE step",
+ NegotiatePB::NegotiateStep_Name(response.step()));
+ }
+ TRACE("Received SASL_CHALLENGE response from server");
+ if (PREDICT_FALSE(!response.has_token())) {
+ return Status::NotAuthorized("no token in SASL_CHALLENGE response from server");
+ }
+
+ const char* out = nullptr;
+ unsigned out_len = 0;
+ const Status s = DoSaslStep(response.token(), &out, &out_len);
+ if (PREDICT_FALSE(!s.IsIncomplete() && !s.ok())) {
+ return s;
+ }
+
+ RETURN_NOT_OK(SendSaslResponse(out, out_len));
+ return s;
+}
+
+Status ClientNegotiation::HandleSaslSuccess(const NegotiatePB& response) {
+ if (PREDICT_FALSE(response.step() != NegotiatePB::SASL_SUCCESS)) {
+ return Status::NotAuthorized("expected SASL_SUCCESS step",
+ NegotiatePB::NegotiateStep_Name(response.step()));
+ }
+ TRACE("Received SASL_SUCCESS response from server");
+
+ if (negotiated_mech_ == SaslMechanism::GSSAPI) {
+ if (response.has_nonce()) {
+ // Grab the nonce from the server, if it has sent one. We'll send it back
+ // later with SASL integrity protection as part of the connection context.
+ nonce_ = response.nonce();
+ }
+
+ if (tls_negotiated_) {
+ // Check the channel bindings provided by the server against the expected channel bindings.
+ if (!response.has_channel_bindings()) {
+ return Status::NotAuthorized("no channel bindings provided by server");
+ }
+
+ security::Cert cert;
+ RETURN_NOT_OK(tls_handshake_.GetRemoteCert(&cert));
+
+ string expected_channel_bindings;
+ RETURN_NOT_OK_PREPEND(cert.GetServerEndPointChannelBindings(&expected_channel_bindings),
+ "failed to generate channel bindings");
+
+ string received_channel_bindings;
+ RETURN_NOT_OK_PREPEND(SaslDecode(sasl_conn_.get(),
+ response.channel_bindings(),
+ &received_channel_bindings),
+ "failed to decode channel bindings");
+
+ if (expected_channel_bindings != received_channel_bindings) {
+ Sockaddr addr;
+ ignore_result(socket_->GetPeerAddress(&addr));
+
+ LOG(WARNING) << "Received invalid channel bindings from server "
+ << addr.ToString()
+ << ", this could indicate an active network man-in-the-middle";
+ return Status::NotAuthorized("channel bindings do not match");
+ }
+ }
+ }
+
+ return Status::OK();
+}
+
+Status ClientNegotiation::DoSaslStep(const string& in, const char** out, unsigned* out_len) {
+ TRACE("Calling sasl_client_step()");
+
+ return WrapSaslCall(sasl_conn_.get(), [&]() {
+ return sasl_client_step(sasl_conn_.get(), in.c_str(), in.length(), nullptr, out, out_len);
+ });
+}
+
+Status ClientNegotiation::SendConnectionContext() {
+ TRACE("Sending connection context");
+ RequestHeader header;
+ header.set_call_id(kConnectionContextCallId);
+
+ ConnectionContextPB conn_context;
+ // This field is deprecated but used by servers <Kudu 1.1. Newer server versions ignore
+ // this and use the SASL-provided username instead.
+ conn_context.mutable_deprecated_user_info()->set_real_user(
+ plain_auth_user_.empty() ? "cpp-client" : plain_auth_user_);
+
+ if (nonce_) {
+ // Reply with the SASL-protected nonce. We only set the nonce when using SASL GSSAPI.
+ RETURN_NOT_OK(SaslEncode(sasl_conn_.get(), *nonce_, conn_context.mutable_encoded_nonce()));
+ }
+
+ return SendFramedMessageBlocking(socket(), header, conn_context, deadline_);
+}
+
+int ClientNegotiation::GetOptionCb(const char* plugin_name, const char* option,
+ const char** result, unsigned* len) {
+ return helper_.GetOptionCb(plugin_name, option, result, len);
+}
+
+// Used for PLAIN.
+// SASL callback for SASL_CB_USER, SASL_CB_AUTHNAME, SASL_CB_LANGUAGE
+int ClientNegotiation::SimpleCb(int id, const char** result, unsigned* len) {
+ if (PREDICT_FALSE(!helper_.IsPlainEnabled())) {
+ LOG(DFATAL) << "Simple callback called, but PLAIN auth is not enabled";
+ return SASL_FAIL;
+ }
+ if (PREDICT_FALSE(result == nullptr)) {
+ LOG(DFATAL) << "result outparam is NULL";
+ return SASL_BADPARAM;
+ }
+ switch (id) {
+ // TODO(unknown): Support impersonation?
+ // For impersonation, USER is the impersonated user, AUTHNAME is the "sudoer".
+ case SASL_CB_USER:
+ TRACE("callback for SASL_CB_USER");
+ *result = plain_auth_user_.c_str();
+ if (len != nullptr) *len = plain_auth_user_.length();
+ break;
+ case SASL_CB_AUTHNAME:
+ TRACE("callback for SASL_CB_AUTHNAME");
+ *result = plain_auth_user_.c_str();
+ if (len != nullptr) *len = plain_auth_user_.length();
+ break;
+ case SASL_CB_LANGUAGE:
+ LOG(DFATAL) << "Unable to handle SASL callback type SASL_CB_LANGUAGE"
+ << "(" << id << ")";
+ return SASL_BADPARAM;
+ default:
+ LOG(DFATAL) << "Unexpected SASL callback type: " << id;
+ return SASL_BADPARAM;
+ }
+
+ return SASL_OK;
+}
+
+// Used for PLAIN.
+// SASL callback for SASL_CB_PASS: User password.
+int ClientNegotiation::SecretCb(sasl_conn_t* conn, int id, sasl_secret_t** psecret) {
+ if (PREDICT_FALSE(!helper_.IsPlainEnabled())) {
+ LOG(DFATAL) << "Plain secret callback called, but PLAIN auth is not enabled";
+ return SASL_FAIL;
+ }
+ switch (id) {
+ case SASL_CB_PASS: {
+ if (!conn || !psecret) return SASL_BADPARAM;
+
+ size_t len = plain_pass_.length();
+ *psecret = reinterpret_cast<sasl_secret_t*>(malloc(sizeof(sasl_secret_t) + len));
+ if (!*psecret) {
+ return SASL_NOMEM;
+ }
+ psecret_.reset(*psecret); // Ensure that we free() this structure later.
+ (*psecret)->len = len;
+ memcpy((*psecret)->data, plain_pass_.c_str(), len + 1);
+ break;
+ }
+ default:
+ LOG(DFATAL) << "Unexpected SASL callback type: " << id;
+ return SASL_BADPARAM;
+ }
+
+ return SASL_OK;
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/client_negotiation.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/client_negotiation.h b/be/src/kudu/rpc/client_negotiation.h
new file mode 100644
index 0000000..aa8c6d9
--- /dev/null
+++ b/be/src/kudu/rpc/client_negotiation.h
@@ -0,0 +1,252 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdlib>
+#include <memory>
+#include <set>
+#include <string>
+#include <vector>
+
+#include <boost/optional.hpp>
+#include <sasl/sasl.h>
+
+#include "kudu/rpc/negotiation.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/sasl_helper.h"
+#include "kudu/security/tls_handshake.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+namespace security {
+class TlsContext;
+}
+
+namespace rpc {
+
+class NegotiatePB;
+class NegotiatePB_SaslAuth;
+class ResponseHeader;
+
+// Class for doing KRPC negotiation with a remote server over a bidirectional socket.
+// Operations on this class are NOT thread-safe.
+class ClientNegotiation {
+ public:
+ // Creates a new client negotiation instance, taking ownership of the
+ // provided socket. After completing the negotiation process by setting the
+ // desired options and calling Negotiate(), the socket can be retrieved with
+ // 'release_socket'.
+ //
+ // The provided TlsContext must outlive this negotiation instance.
+ ClientNegotiation(std::unique_ptr<Socket> socket,
+ const security::TlsContext* tls_context,
+ const boost::optional<security::SignedTokenPB>& authn_token,
+ RpcEncryption encryption);
+
+ // Enable PLAIN authentication.
+ // Must be called before Negotiate().
+ Status EnablePlain(const std::string& user,
+ const std::string& pass);
+
+ // Enable GSSAPI authentication.
+ // Must be called before Negotiate().
+ Status EnableGSSAPI();
+
+ // Returns mechanism negotiated by this connection.
+ // Must be called after Negotiate().
+ SaslMechanism::Type negotiated_mechanism() const;
+
+ // Returns the negotiated authentication type for the connection.
+ // Must be called after Negotiate().
+ AuthenticationType negotiated_authn() const {
+ DCHECK_NE(negotiated_authn_, AuthenticationType::INVALID);
+ return negotiated_authn_;
+ }
+
+ // Returns true if TLS was negotiated.
+ // Must be called after Negotiate().
+ bool tls_negotiated() const {
+ return tls_negotiated_;
+ }
+
+ // Returns the set of RPC system features supported by the remote server.
+ // Must be called before Negotiate().
+ std::set<RpcFeatureFlag> server_features() const {
+ return server_features_;
+ }
+
+ // Returns the set of RPC system features supported by the remote server.
+ // Must be called after Negotiate().
+ // Subsequent calls to this method or server_features() will return an empty set.
+ std::set<RpcFeatureFlag> take_server_features() {
+ return std::move(server_features_);
+ }
+
+ // Specify the fully-qualified domain name of the remote server.
+ // Must be called before Negotiate(). Required for some mechanisms.
+ void set_server_fqdn(const std::string& domain_name);
+
+ // Set deadline for connection negotiation.
+ void set_deadline(const MonoTime& deadline);
+
+ Socket* socket() { return socket_.get(); }
+
+ // Takes and returns the socket owned by this client negotiation. The caller
+ // will own the socket after this call, and the negotiation instance should no
+ // longer be used. Must be called after Negotiate(). Subsequent calls to this
+ // method or socket() will return a null pointer.
+ std::unique_ptr<Socket> release_socket() { return std::move(socket_); }
+
+ // Negotiate with the remote server. Should only be called once per
+ // ClientNegotiation and socket instance, after all options have been set.
+ //
+ // Returns OK on success, otherwise may return NotAuthorized, NotSupported, or
+ // another non-OK status.
+ Status Negotiate(std::unique_ptr<ErrorStatusPB>* rpc_error = nullptr);
+
+ // SASL callback for plugin options, supported mechanisms, etc.
+ // Returns SASL_FAIL if the option is not handled, which does not fail the handshake.
+ int GetOptionCb(const char* plugin_name, const char* option,
+ const char** result, unsigned* len);
+
+ // SASL callback for SASL_CB_USER, SASL_CB_AUTHNAME, SASL_CB_LANGUAGE
+ int SimpleCb(int id, const char** result, unsigned* len);
+
+ // SASL callback for SASL_CB_PASS
+ int SecretCb(sasl_conn_t* conn, int id, sasl_secret_t** psecret);
+
+ private:
+
+ // Encode and send the specified negotiate request message to the server.
+ Status SendNegotiatePB(const NegotiatePB& msg) WARN_UNUSED_RESULT;
+
+ // Receive a negotiate response message from the server, deserializing it into 'msg'.
+ // Validates that the response is not an error.
+ Status RecvNegotiatePB(NegotiatePB* msg,
+ faststring* buffer,
+ std::unique_ptr<ErrorStatusPB>* rpc_error) WARN_UNUSED_RESULT;
+
+ // Parse error status message from raw bytes of an ErrorStatusPB.
+ Status ParseError(const Slice& err_data,
+ std::unique_ptr<ErrorStatusPB>* rpc_error) WARN_UNUSED_RESULT;
+
+ Status SendConnectionHeader() WARN_UNUSED_RESULT;
+
+ // Initialize the SASL client negotiation instance.
+ Status InitSaslClient() WARN_UNUSED_RESULT;
+
+ // Send a NEGOTIATE step message to the server.
+ Status SendNegotiate() WARN_UNUSED_RESULT;
+
+ // Handle NEGOTIATE step response from the server.
+ Status HandleNegotiate(const NegotiatePB& response) WARN_UNUSED_RESULT;
+
+ // Send a TLS_HANDSHAKE request message to the server with the provided token.
+ Status SendTlsHandshake(std::string tls_token) WARN_UNUSED_RESULT;
+
+ // Handle a TLS_HANDSHAKE response message from the server.
+ Status HandleTlsHandshake(const NegotiatePB& response) WARN_UNUSED_RESULT;
+
+ // Authenticate to the server using SASL.
+ // 'recv_buf' allows a receive buffer to be reused.
+ Status AuthenticateBySasl(faststring* recv_buf,
+ std::unique_ptr<ErrorStatusPB>* rpc_error) WARN_UNUSED_RESULT;
+
+ // Authenticate to the server using a token.
+ // 'recv_buf' allows a receive buffer to be reused.
+ Status AuthenticateByToken(faststring* recv_buf,
+ std::unique_ptr<ErrorStatusPB> *rpc_error) WARN_UNUSED_RESULT;
+
+ // Send an SASL_INITIATE message to the server.
+ // Returns:
+ // Status::OK if the SASL_SUCCESS message is expected next.
+ // Status::Incomplete if the SASL_CHALLENGE message is expected next.
+ // Any other status indicates an error.
+ Status SendSaslInitiate() WARN_UNUSED_RESULT;
+
+ // Send a SASL_RESPONSE message to the server.
+ Status SendSaslResponse(const char* resp_msg, unsigned resp_msg_len) WARN_UNUSED_RESULT;
+
+ // Handle case when server sends SASL_CHALLENGE response.
+ // Returns:
+ // Status::OK if a SASL_SUCCESS message is expected next.
+ // Status::Incomplete if another SASL_CHALLENGE message is expected.
+ // Any other status indicates an error.
+ Status HandleSaslChallenge(const NegotiatePB& response) WARN_UNUSED_RESULT;
+
+ // Handle case when server sends SASL_SUCCESS response.
+ Status HandleSaslSuccess(const NegotiatePB& response) WARN_UNUSED_RESULT;
+
+ // Perform a client-side step of the SASL negotiation.
+ // Input is what came from the server. Output is what we will send back to the server.
+ // Returns:
+ // Status::OK if sasl_client_step returns SASL_OK.
+ // Status::Incomplete if sasl_client_step returns SASL_CONTINUE
+ // otherwise returns an appropriate error status.
+ Status DoSaslStep(const std::string& in, const char** out, unsigned* out_len) WARN_UNUSED_RESULT;
+
+ Status SendConnectionContext() WARN_UNUSED_RESULT;
+
+ // The socket to the remote server.
+ std::unique_ptr<Socket> socket_;
+
+ // SASL state.
+ std::vector<sasl_callback_t> callbacks_;
+ std::unique_ptr<sasl_conn_t, SaslDeleter> sasl_conn_;
+ SaslHelper helper_;
+ boost::optional<std::string> nonce_;
+
+ // TLS state.
+ const security::TlsContext* tls_context_;
+ security::TlsHandshake tls_handshake_;
+ const RpcEncryption encryption_;
+ bool tls_negotiated_;
+
+ // TSK state.
+ boost::optional<security::SignedTokenPB> authn_token_;
+
+ // Authentication state.
+ std::string plain_auth_user_;
+ std::string plain_pass_;
+ std::unique_ptr<sasl_secret_t, decltype(std::free)*> psecret_;
+
+ // The set of features advertised by the client. Filled in when we send
+ // the first message. This is not necessarily constant since some features
+ // may be dynamically enabled.
+ std::set<RpcFeatureFlag> client_features_;
+
+ // The set of features supported by the server. Filled in during negotiation.
+ std::set<RpcFeatureFlag> server_features_;
+
+ // The authentication type. Filled in during negotiation.
+ AuthenticationType negotiated_authn_;
+
+ // The SASL mechanism used by the connection. Filled in during negotiation.
+ SaslMechanism::Type negotiated_mech_;
+
+ // Negotiation timeout deadline.
+ MonoTime deadline_;
+};
+
+} // namespace rpc
+} // namespace kudu