You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/03/10 06:08:21 UTC

kudu git commit: KUDU-1923: rpc_encryption flag is not enforced

Repository: kudu
Updated Branches:
  refs/heads/branch-1.3.x be476c211 -> dfd9b491f


KUDU-1923: rpc_encryption flag is not enforced

The rpc_encryption flag wasn't being taken into account during
negotiation, so TLS encryption was in effect optional regardless of
configuration.

Change-Id: Ifdf17c6bb59ca4af1215376c5221a90bd1d93b64
Reviewed-on: http://gerrit.cloudera.org:8080/6342
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/dfd9b491
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/dfd9b491
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/dfd9b491

Branch: refs/heads/branch-1.3.x
Commit: dfd9b491f635a89b75889f8277316c9b78143883
Parents: be476c2
Author: Dan Burkert <da...@apache.org>
Authored: Thu Mar 9 16:58:22 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Fri Mar 10 06:05:21 2017 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/security-itest.cc |  10 ++
 src/kudu/rpc/client_negotiation.cc           |  25 +++--
 src/kudu/rpc/client_negotiation.h            |   8 +-
 src/kudu/rpc/constants.cc                    |  10 +-
 src/kudu/rpc/messenger.h                     |   1 +
 src/kudu/rpc/negotiation-test.cc             | 109 ++++++++++++++++++++--
 src/kudu/rpc/negotiation.cc                  |  13 ++-
 src/kudu/rpc/negotiation.h                   |   2 +
 src/kudu/rpc/reactor.cc                      |   3 +-
 src/kudu/rpc/server_negotiation.cc           |  25 +++--
 src/kudu/rpc/server_negotiation.h            |   8 +-
 11 files changed, 177 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/dfd9b491/src/kudu/integration-tests/security-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/security-itest.cc b/src/kudu/integration-tests/security-itest.cc
index cdac377..4dec15f 100644
--- a/src/kudu/integration-tests/security-itest.cc
+++ b/src/kudu/integration-tests/security-itest.cc
@@ -206,5 +206,15 @@ TEST_F(SecurityITest, TestDisableWebUI) {
   NO_FATALS(SmokeTestCluster());
 }
 
+// Test disabling authentication and encryption.
+TEST_F(SecurityITest, TestDisableAuthenticationEncryption) {
+  cluster_opts_.extra_master_flags.push_back("--rpc_authentication=disabled");
+  cluster_opts_.extra_tserver_flags.push_back("--rpc_authentication=disabled");
+  cluster_opts_.extra_master_flags.push_back("--rpc_encryption=disabled");
+  cluster_opts_.extra_tserver_flags.push_back("--rpc_encryption=disabled");
+  cluster_opts_.enable_kerberos = false;
+  StartCluster();
+  NO_FATALS(SmokeTestCluster());
+}
 
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/dfd9b491/src/kudu/rpc/client_negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/client_negotiation.cc b/src/kudu/rpc/client_negotiation.cc
index eb282fa..9a3acfa 100644
--- a/src/kudu/rpc/client_negotiation.cc
+++ b/src/kudu/rpc/client_negotiation.cc
@@ -37,6 +37,7 @@
 #include "kudu/gutil/strings/util.h"
 #include "kudu/rpc/blocking_ops.h"
 #include "kudu/rpc/constants.h"
+#include "kudu/rpc/messenger.h"
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/sasl_common.h"
 #include "kudu/rpc/sasl_helper.h"
@@ -104,10 +105,12 @@ static Status StatusFromRpcError(const ErrorStatusPB& error) {
 
 ClientNegotiation::ClientNegotiation(unique_ptr<Socket> socket,
                                      const security::TlsContext* tls_context,
-                                     const boost::optional<security::SignedTokenPB>& authn_token)
+                                     const boost::optional<security::SignedTokenPB>& authn_token,
+                                     RpcEncryption encryption)
     : socket_(std::move(socket)),
       helper_(SaslHelper::CLIENT),
       tls_context_(tls_context),
+      encryption_(encryption),
       tls_negotiated_(false),
       authn_token_(authn_token),
       psecret_(nullptr, std::free),
@@ -169,7 +172,8 @@ Status ClientNegotiation::Negotiate() {
 
   // Step 3: if both ends support TLS, do a TLS handshake.
   // TODO(PKI): allow the client to require TLS.
-  if (ContainsKey(server_features_, TLS)) {
+  if (encryption_ != RpcEncryption::DISABLED &&
+      ContainsKey(server_features_, TLS)) {
     RETURN_NOT_OK(tls_context_->InitiateHandshake(security::TlsHandshakeType::CLIENT,
                                                   &tls_handshake_));
 
@@ -290,10 +294,14 @@ Status ClientNegotiation::SendNegotiate() {
 
   // Advertise our supported features.
   client_features_ = kSupportedClientRpcFeatureFlags;
-  // If the remote peer is local, then we allow using TLS for authentication
-  // without encryption or integrity.
-  if (socket_->IsLoopbackConnection() && !FLAGS_rpc_encrypt_loopback_connections) {
-    client_features_.insert(TLS_AUTHENTICATION_ONLY);
+
+  if (encryption_ != RpcEncryption::DISABLED) {
+    client_features_.insert(TLS);
+    // If the remote peer is local, then we allow using TLS for authentication
+    // without encryption or integrity.
+    if (socket_->IsLoopbackConnection() && !FLAGS_rpc_encrypt_loopback_connections) {
+      client_features_.insert(TLS_AUTHENTICATION_ONLY);
+    }
   }
 
   for (RpcFeatureFlag feature : client_features_) {
@@ -337,6 +345,11 @@ Status ClientNegotiation::HandleNegotiate(const NegotiatePB& response) {
     }
   }
 
+  if (encryption_ == RpcEncryption::REQUIRED &&
+      !ContainsKey(server_features_, RpcFeatureFlag::TLS)) {
+    return Status::NotAuthorized("server does not support required TLS encryption");
+  }
+
   // Get the authentication type which the server would like to use.
   DCHECK_LE(response.authn_types().size(), 1);
   if (response.authn_types().empty()) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/dfd9b491/src/kudu/rpc/client_negotiation.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/client_negotiation.h b/src/kudu/rpc/client_negotiation.h
index e2ee09d..701076b 100644
--- a/src/kudu/rpc/client_negotiation.h
+++ b/src/kudu/rpc/client_negotiation.h
@@ -58,9 +58,10 @@ class ClientNegotiation {
   // 'release_socket'.
   //
   // The provided TlsContext must outlive this negotiation instance.
-  explicit ClientNegotiation(std::unique_ptr<Socket> socket,
-                             const security::TlsContext* tls_context,
-                             const boost::optional<security::SignedTokenPB>& authn_token);
+  ClientNegotiation(std::unique_ptr<Socket> socket,
+                    const security::TlsContext* tls_context,
+                    const boost::optional<security::SignedTokenPB>& authn_token,
+                    RpcEncryption encryption);
 
   // Enable PLAIN authentication.
   // Must be called before Negotiate().
@@ -213,6 +214,7 @@ class ClientNegotiation {
   // TLS state.
   const security::TlsContext* tls_context_;
   security::TlsHandshake tls_handshake_;
+  const RpcEncryption encryption_;
   bool tls_negotiated_;
 
   // TSK state.

http://git-wip-us.apache.org/repos/asf/kudu/blob/dfd9b491/src/kudu/rpc/constants.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/constants.cc b/src/kudu/rpc/constants.cc
index c6a93ce..bcf9985 100644
--- a/src/kudu/rpc/constants.cc
+++ b/src/kudu/rpc/constants.cc
@@ -26,13 +26,13 @@ const char* const kMagicNumber = "hrpc";
 const char* const kSaslAppName = "kudu";
 const char* const kSaslProtoName = "kudu";
 
-// The server supports the TLS flag if there is a TLS certificate available.
-// The flag is added during negotiation if this is the case.
+// NOTE: the TLS flag is dynamically added based on the local encryption
+// configuration.
 //
-// NOTE: the TLS_AUTHENTICATION_ONLY flag is dynamically added on both sides
-// based on the remote peer's address.
+// NOTE: the TLS_AUTHENTICATION_ONLY flag is dynamically added on both
+// sides based on the remote peer's address.
 set<RpcFeatureFlag> kSupportedServerRpcFeatureFlags = { APPLICATION_FEATURE_FLAGS };
-set<RpcFeatureFlag> kSupportedClientRpcFeatureFlags = { APPLICATION_FEATURE_FLAGS, TLS };
+set<RpcFeatureFlag> kSupportedClientRpcFeatureFlags = { APPLICATION_FEATURE_FLAGS };
 
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/dfd9b491/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index 2ada341..fdabdbc 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -225,6 +225,7 @@ class Messenger {
   }
 
   RpcAuthentication authentication() const { return authentication_; }
+  RpcEncryption encryption() const { return encryption_; }
 
   ThreadPool* negotiation_pool() const { return negotiation_pool_.get(); }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/dfd9b491/src/kudu/rpc/negotiation-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/negotiation-test.cc b/src/kudu/rpc/negotiation-test.cc
index 3efbb8b..38c8ed9 100644
--- a/src/kudu/rpc/negotiation-test.cc
+++ b/src/kudu/rpc/negotiation-test.cc
@@ -97,13 +97,22 @@ struct EndpointConfig {
   // For the client, whether the client has the token.
   // For the server, whether the server has the TSK.
   bool token;
+  RpcEncryption encryption;
 };
 std::ostream& operator<<(std::ostream& o, EndpointConfig config) {
   auto bool_string = [] (bool b) { return b ? "true" : "false"; };
   o << "{pki: " << config.pki
     << ", sasl-mechs: [" << JoinMapped(config.sasl_mechs, SaslMechanism::name_of, ", ")
     << "], token: " << bool_string(config.token)
-    << "}";
+    << ", encryption: ";
+
+  switch (config.encryption) {
+    case RpcEncryption::DISABLED: o << "DISABLED"; break;
+    case RpcEncryption::OPTIONAL: o << "OPTIONAL"; break;
+    case RpcEncryption::REQUIRED: o << "REQUIRED"; break;
+  }
+
+  o << "}";
   return o;
 }
 
@@ -205,10 +214,12 @@ TEST_P(TestNegotiation, TestNegotiation) {
   // Create and configure the client and server negotiation instances.
   ClientNegotiation client_negotiation(std::move(client_socket),
                                        &client_tls_context,
-                                       authn_token);
+                                       authn_token,
+                                       desc.client.encryption);
   ServerNegotiation server_negotiation(std::move(server_socket),
                                        &server_tls_context,
-                                       &token_verifier);
+                                       &token_verifier,
+                                       desc.server.encryption);
 
   // Set client and server SASL mechanisms.
   MiniKdc kdc;
@@ -347,11 +358,13 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             PkiConfig::NONE,
             {},
             false,
+            RpcEncryption::OPTIONAL,
           },
           EndpointConfig {
             PkiConfig::NONE,
             {},
             false,
+            RpcEncryption::OPTIONAL,
           },
           false,
           Status::NotAuthorized(".*client is not configured with an authentication type"),
@@ -368,11 +381,13 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             PkiConfig::NONE,
             { SaslMechanism::PLAIN },
             false,
+            RpcEncryption::OPTIONAL,
           },
           EndpointConfig {
             PkiConfig::NONE,
             {},
             false,
+            RpcEncryption::OPTIONAL,
           },
           false,
           Status::NotAuthorized(".* server mechanism list is empty"),
@@ -389,11 +404,13 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             PkiConfig::NONE,
             { SaslMechanism::PLAIN },
             false,
+            RpcEncryption::OPTIONAL
           },
           EndpointConfig {
             PkiConfig::NONE,
             { SaslMechanism::PLAIN },
             false,
+            RpcEncryption::DISABLED,
           },
           false,
           Status::OK(),
@@ -410,11 +427,13 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             PkiConfig::NONE,
             { SaslMechanism::GSSAPI },
             false,
+            RpcEncryption::OPTIONAL,
           },
           EndpointConfig {
             PkiConfig::NONE,
             { SaslMechanism::GSSAPI },
             false,
+            RpcEncryption::DISABLED,
           },
           false,
           Status::OK(),
@@ -431,11 +450,13 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             PkiConfig::NONE,
             { SaslMechanism::GSSAPI, SaslMechanism::PLAIN },
             false,
+            RpcEncryption::OPTIONAL,
           },
           EndpointConfig {
             PkiConfig::NONE,
             { SaslMechanism::GSSAPI, SaslMechanism::PLAIN },
             false,
+            RpcEncryption::DISABLED,
           },
           false,
           Status::OK(),
@@ -452,11 +473,13 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             PkiConfig::NONE,
             { SaslMechanism::GSSAPI, SaslMechanism::PLAIN },
             false,
+            RpcEncryption::OPTIONAL,
           },
           EndpointConfig {
             PkiConfig::NONE,
             { SaslMechanism::GSSAPI },
             false,
+            RpcEncryption::DISABLED,
           },
           false,
           Status::OK(),
@@ -473,11 +496,13 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             PkiConfig::NONE,
             { SaslMechanism::PLAIN },
             false,
+            RpcEncryption::OPTIONAL,
           },
           EndpointConfig {
             PkiConfig::NONE,
             { SaslMechanism::GSSAPI },
             false,
+            RpcEncryption::DISABLED,
           },
           false,
           Status::NotAuthorized(".*client does not have Kerberos enabled"),
@@ -495,11 +520,13 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             PkiConfig::NONE,
             { SaslMechanism::GSSAPI },
             false,
+            RpcEncryption::OPTIONAL,
           },
           EndpointConfig {
             PkiConfig::SELF_SIGNED,
             { SaslMechanism::GSSAPI },
             false,
+            RpcEncryption::OPTIONAL,
           },
           true,
           Status::OK(),
@@ -518,11 +545,13 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             PkiConfig::SIGNED,
             { SaslMechanism::GSSAPI },
             false,
+            RpcEncryption::OPTIONAL,
           },
           EndpointConfig {
             PkiConfig::SELF_SIGNED,
             { SaslMechanism::GSSAPI },
             false,
+            RpcEncryption::OPTIONAL,
           },
           false,
           Status::OK(),
@@ -539,11 +568,13 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             PkiConfig::NONE,
             { SaslMechanism::PLAIN },
             false,
+            RpcEncryption::OPTIONAL,
           },
           EndpointConfig {
             PkiConfig::SELF_SIGNED,
             { SaslMechanism::PLAIN },
             false,
+            RpcEncryption::OPTIONAL,
           },
           false,
           Status::OK(),
@@ -560,11 +591,13 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             PkiConfig::SIGNED,
             { SaslMechanism::GSSAPI },
             false,
+            RpcEncryption::OPTIONAL,
           },
           EndpointConfig {
             PkiConfig::SIGNED,
             { SaslMechanism::GSSAPI },
             false,
+            RpcEncryption::OPTIONAL,
           },
           false,
           Status::OK(),
@@ -581,11 +614,13 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             PkiConfig::TRUSTED,
             { },
             true,
+            RpcEncryption::OPTIONAL,
           },
           EndpointConfig {
             PkiConfig::SIGNED,
             { SaslMechanism::PLAIN },
             true,
+            RpcEncryption::OPTIONAL,
           },
           false,
           Status::OK(),
@@ -605,11 +640,13 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             PkiConfig::NONE,
             { SaslMechanism::PLAIN },
             true,
+            RpcEncryption::OPTIONAL,
           },
           EndpointConfig {
             PkiConfig::SIGNED,
             { SaslMechanism::PLAIN },
             true,
+            RpcEncryption::OPTIONAL,
           },
           false,
           Status::OK(),
@@ -626,11 +663,13 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             PkiConfig::SIGNED,
             { SaslMechanism::PLAIN, SaslMechanism::GSSAPI },
             true,
+            RpcEncryption::OPTIONAL,
           },
           EndpointConfig {
             PkiConfig::SIGNED,
             { SaslMechanism::PLAIN, SaslMechanism::GSSAPI },
             true,
+            RpcEncryption::OPTIONAL,
           },
           false,
           Status::OK(),
@@ -638,6 +677,52 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
           AuthenticationType::CERTIFICATE,
           SaslMechanism::INVALID,
           true,
+        },
+
+        // client: PLAIN, TLS disabled
+        // server: PLAIN, TLS required
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::PLAIN },
+            false,
+            RpcEncryption::DISABLED,
+          },
+          EndpointConfig {
+            PkiConfig::SIGNED,
+            { SaslMechanism::PLAIN },
+            false,
+            RpcEncryption::REQUIRED,
+          },
+          false,
+          Status::NotAuthorized(".*client does not support required TLS encryption"),
+          Status::NotAuthorized(".*client does not support required TLS encryption"),
+          AuthenticationType::SASL,
+          SaslMechanism::PLAIN,
+          true,
+        },
+
+        // client: PLAIN, TLS required
+        // server: PLAIN, TLS disabled
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::PLAIN },
+            false,
+            RpcEncryption::REQUIRED,
+          },
+          EndpointConfig {
+            PkiConfig::SIGNED,
+            { SaslMechanism::PLAIN },
+            false,
+            RpcEncryption::DISABLED,
+          },
+          false,
+          Status::NotAuthorized(".*server does not support required TLS encryption"),
+          Status::NetworkError(""),
+          AuthenticationType::SASL,
+          SaslMechanism::PLAIN,
+          true,
         }
 ));
 
@@ -690,7 +775,8 @@ static void RunGSSAPINegotiationServer(unique_ptr<Socket> socket,
   TlsContext tls_context;
   CHECK_OK(tls_context.Init());
   TokenVerifier token_verifier;
-  ServerNegotiation server_negotiation(std::move(socket), &tls_context, &token_verifier);
+  ServerNegotiation server_negotiation(std::move(socket), &tls_context,
+                                       &token_verifier, RpcEncryption::OPTIONAL);
   server_negotiation.set_server_fqdn("127.0.0.1");
   CHECK_OK(server_negotiation.EnableGSSAPI());
   post_check(server_negotiation.Negotiate(), server_negotiation);
@@ -702,7 +788,8 @@ static void RunGSSAPINegotiationClient(unique_ptr<Socket> conn,
                                        const CheckerFunction<ClientNegotiation>& post_check) {
   TlsContext tls_context;
   CHECK_OK(tls_context.Init());
-  ClientNegotiation client_negotiation(std::move(conn), &tls_context, boost::none);
+  ClientNegotiation client_negotiation(std::move(conn), &tls_context,
+                                       boost::none, RpcEncryption::OPTIONAL);
   client_negotiation.set_server_fqdn("127.0.0.1");
   CHECK_OK(client_negotiation.EnableGSSAPI());
   post_check(client_negotiation.Negotiate(), client_negotiation);
@@ -843,7 +930,8 @@ static void RunTimeoutExpectingServer(unique_ptr<Socket> socket) {
   TlsContext tls_context;
   CHECK_OK(tls_context.Init());
   TokenVerifier token_verifier;
-  ServerNegotiation server_negotiation(std::move(socket), &tls_context, &token_verifier);
+  ServerNegotiation server_negotiation(std::move(socket), &tls_context,
+                                       &token_verifier, RpcEncryption::OPTIONAL);
   CHECK_OK(server_negotiation.EnablePlain());
   Status s = server_negotiation.Negotiate();
   ASSERT_TRUE(s.IsNetworkError()) << "Expected client to time out and close the connection. Got: "
@@ -853,7 +941,8 @@ static void RunTimeoutExpectingServer(unique_ptr<Socket> socket) {
 static void RunTimeoutNegotiationClient(unique_ptr<Socket> sock) {
   TlsContext tls_context;
   CHECK_OK(tls_context.Init());
-  ClientNegotiation client_negotiation(std::move(sock), &tls_context, boost::none);
+  ClientNegotiation client_negotiation(std::move(sock), &tls_context,
+                                       boost::none, RpcEncryption::OPTIONAL);
   CHECK_OK(client_negotiation.EnablePlain("test", "test"));
   MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L);
   client_negotiation.set_deadline(deadline);
@@ -873,7 +962,8 @@ static void RunTimeoutNegotiationServer(unique_ptr<Socket> socket) {
   TlsContext tls_context;
   CHECK_OK(tls_context.Init());
   TokenVerifier token_verifier;
-  ServerNegotiation server_negotiation(std::move(socket), &tls_context, &token_verifier);
+  ServerNegotiation server_negotiation(std::move(socket), &tls_context,
+                                       &token_verifier, RpcEncryption::OPTIONAL);
   CHECK_OK(server_negotiation.EnablePlain());
   MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L);
   server_negotiation.set_deadline(deadline);
@@ -885,7 +975,8 @@ static void RunTimeoutNegotiationServer(unique_ptr<Socket> socket) {
 static void RunTimeoutExpectingClient(unique_ptr<Socket> socket) {
   TlsContext tls_context;
   CHECK_OK(tls_context.Init());
-  ClientNegotiation client_negotiation(std::move(socket), &tls_context, boost::none);
+  ClientNegotiation client_negotiation(std::move(socket), &tls_context,
+                                       boost::none, RpcEncryption::OPTIONAL);
   CHECK_OK(client_negotiation.EnablePlain("test", "test"));
   Status s = client_negotiation.Negotiate();
   ASSERT_TRUE(s.IsNetworkError()) << "Expected server to time out and close the connection. Got: "

http://git-wip-us.apache.org/repos/asf/kudu/blob/dfd9b491/src/kudu/rpc/negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/negotiation.cc b/src/kudu/rpc/negotiation.cc
index e912939..db742ca 100644
--- a/src/kudu/rpc/negotiation.cc
+++ b/src/kudu/rpc/negotiation.cc
@@ -155,11 +155,13 @@ static Status DisableSocketTimeouts(Socket* socket) {
 // Perform client negotiation. We don't LOG() anything, we leave that to our caller.
 static Status DoClientNegotiation(Connection* conn,
                                   RpcAuthentication authentication,
+                                  RpcEncryption encryption,
                                   MonoTime deadline) {
   const auto* messenger = conn->reactor_thread()->reactor()->messenger();
   ClientNegotiation client_negotiation(conn->release_socket(),
                                        &messenger->tls_context(),
-                                       messenger->authn_token());
+                                       messenger->authn_token(),
+                                       encryption);
 
   // Note that the fqdn is an IP address here: we've already lost whatever DNS
   // name the client was attempting to use. Unless krb5 is configured with 'rdns
@@ -214,6 +216,7 @@ static Status DoClientNegotiation(Connection* conn,
 // Perform server negotiation. We don't LOG() anything, we leave that to our caller.
 static Status DoServerNegotiation(Connection* conn,
                                   RpcAuthentication authentication,
+                                  RpcEncryption encryption,
                                   const MonoTime& deadline) {
   if (authentication == RpcAuthentication::REQUIRED &&
       FLAGS_keytab_file.empty() &&
@@ -233,7 +236,8 @@ static Status DoServerNegotiation(Connection* conn,
   const auto* messenger = conn->reactor_thread()->reactor()->messenger();
   ServerNegotiation server_negotiation(conn->release_socket(),
                                        &messenger->tls_context(),
-                                       &messenger->token_verifier());
+                                       &messenger->token_verifier(),
+                                       encryption);
 
   if (authentication != RpcAuthentication::DISABLED && !FLAGS_keytab_file.empty()) {
     RETURN_NOT_OK(server_negotiation.EnableGSSAPI());
@@ -259,12 +263,13 @@ static Status DoServerNegotiation(Connection* conn,
 
 void Negotiation::RunNegotiation(const scoped_refptr<Connection>& conn,
                                  RpcAuthentication authentication,
+                                 RpcEncryption encryption,
                                  MonoTime deadline) {
   Status s;
   if (conn->direction() == Connection::SERVER) {
-    s = DoServerNegotiation(conn.get(), authentication, deadline);
+    s = DoServerNegotiation(conn.get(), authentication, encryption, deadline);
   } else {
-    s = DoClientNegotiation(conn.get(), authentication, deadline);
+    s = DoClientNegotiation(conn.get(), authentication, encryption, deadline);
   }
 
   if (PREDICT_FALSE(!s.ok())) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/dfd9b491/src/kudu/rpc/negotiation.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/negotiation.h b/src/kudu/rpc/negotiation.h
index 715fca2..2ca459b 100644
--- a/src/kudu/rpc/negotiation.h
+++ b/src/kudu/rpc/negotiation.h
@@ -27,6 +27,7 @@ namespace rpc {
 
 class Connection;
 enum class RpcAuthentication;
+enum class RpcEncryption;
 
 enum class AuthenticationType {
   INVALID,
@@ -44,6 +45,7 @@ class Negotiation {
   // Perform negotiation for a connection (either server or client)
   static void RunNegotiation(const scoped_refptr<Connection>& conn,
                              RpcAuthentication authentication,
+                             RpcEncryption encryption,
                              MonoTime deadline);
  private:
   DISALLOW_IMPLICIT_CONSTRUCTORS(Negotiation);

http://git-wip-us.apache.org/repos/asf/kudu/blob/dfd9b491/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index 754bca4..87056d1 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -372,8 +372,9 @@ Status ReactorThread::StartConnectionNegotiation(const scoped_refptr<Connection>
   ADOPT_TRACE(trace.get());
   TRACE("Submitting negotiation task for $0", conn->ToString());
   auto authentication = reactor()->messenger()->authentication();
+  auto encryption = reactor()->messenger()->encryption();
   RETURN_NOT_OK(reactor()->messenger()->negotiation_pool()->SubmitClosure(
-        Bind(&Negotiation::RunNegotiation, conn, authentication, deadline)));
+        Bind(&Negotiation::RunNegotiation, conn, authentication, encryption, deadline)));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/dfd9b491/src/kudu/rpc/server_negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/server_negotiation.cc b/src/kudu/rpc/server_negotiation.cc
index ab31c0b..8b92113 100644
--- a/src/kudu/rpc/server_negotiation.cc
+++ b/src/kudu/rpc/server_negotiation.cc
@@ -33,6 +33,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/blocking_ops.h"
 #include "kudu/rpc/constants.h"
+#include "kudu/rpc/messenger.h"
 #include "kudu/rpc/serialization.h"
 #include "kudu/security/cert.h"
 #include "kudu/security/crypto.h"
@@ -75,10 +76,12 @@ static int ServerNegotiationPlainAuthCb(sasl_conn_t* conn,
 
 ServerNegotiation::ServerNegotiation(unique_ptr<Socket> socket,
                                      const security::TlsContext* tls_context,
-                                     const security::TokenVerifier* token_verifier)
+                                     const security::TokenVerifier* token_verifier,
+                                     RpcEncryption encryption)
     : socket_(std::move(socket)),
       helper_(SaslHelper::SERVER),
       tls_context_(tls_context),
+      encryption_(encryption),
       tls_negotiated_(false),
       token_verifier_(token_verifier),
       negotiated_authn_(AuthenticationType::INVALID),
@@ -137,8 +140,9 @@ Status ServerNegotiation::Negotiate() {
   }
 
   // Step 3: if both ends support TLS, do a TLS handshake.
-  // TODO(dan): allow the server to require TLS.
-  if (tls_context_->has_cert() && ContainsKey(client_features_, TLS)) {
+  if (encryption_ != RpcEncryption::DISABLED &&
+      tls_context_->has_cert() &&
+      ContainsKey(client_features_, TLS)) {
     RETURN_NOT_OK(tls_context_->InitiateHandshake(security::TlsHandshakeType::SERVER,
                                                   &tls_handshake_));
 
@@ -191,7 +195,7 @@ Status ServerNegotiation::PreflightCheckGSSAPI() {
   //
   // We aren't going to actually send/receive any messages, but
   // this makes it easier to reuse the initialization code.
-  ServerNegotiation server(nullptr, nullptr, nullptr);
+  ServerNegotiation server(nullptr, nullptr, nullptr, RpcEncryption::OPTIONAL);
   Status s = server.EnableGSSAPI();
   if (!s.ok()) {
     return Status::RuntimeError(s.message());
@@ -336,6 +340,13 @@ Status ServerNegotiation::HandleNegotiate(const NegotiatePB& request) {
     }
   }
 
+  if (encryption_ == RpcEncryption::REQUIRED &&
+      !ContainsKey(client_features_, RpcFeatureFlag::TLS)) {
+    Status s = Status::NotAuthorized("client does not support required TLS encryption");
+    RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+    return s;
+  }
+
   // Find the set of mutually supported authentication types.
   set<AuthenticationType> authn_types;
   if (request.authn_types().empty()) {
@@ -373,7 +384,8 @@ Status ServerNegotiation::HandleNegotiate(const NegotiatePB& request) {
     }
   }
 
-  if (ContainsKey(authn_types, AuthenticationType::CERTIFICATE) &&
+  if (encryption_ != RpcEncryption::DISABLED &&
+      ContainsKey(authn_types, AuthenticationType::CERTIFICATE) &&
       tls_context_->has_signed_cert()) {
     // If the client supports it and we are locally configured with TLS and have
     // a CA-signed cert, choose cert authn.
@@ -382,6 +394,7 @@ Status ServerNegotiation::HandleNegotiate(const NegotiatePB& request) {
     negotiated_authn_ = AuthenticationType::CERTIFICATE;
   } else if (ContainsKey(authn_types, AuthenticationType::TOKEN) &&
              token_verifier_->GetMaxKnownKeySequenceNumber() >= 0 &&
+             encryption_ != RpcEncryption::DISABLED &&
              tls_context_->has_signed_cert()) {
     // If the client supports it, we have a TSK to verify the client's token,
     // and we have a signed-cert so the client can verify us, choose token authn.
@@ -400,7 +413,7 @@ Status ServerNegotiation::HandleNegotiate(const NegotiatePB& request) {
 
   // Tell the client which features we support.
   server_features_ = kSupportedServerRpcFeatureFlags;
-  if (tls_context_->has_cert()) {
+  if (tls_context_->has_cert() && encryption_ != RpcEncryption::DISABLED) {
     server_features_.insert(TLS);
     // If the remote peer is local, then we allow using TLS for authentication
     // without encryption or integrity.

http://git-wip-us.apache.org/repos/asf/kudu/blob/dfd9b491/src/kudu/rpc/server_negotiation.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/server_negotiation.h b/src/kudu/rpc/server_negotiation.h
index 28209c5..07e0057 100644
--- a/src/kudu/rpc/server_negotiation.h
+++ b/src/kudu/rpc/server_negotiation.h
@@ -55,9 +55,10 @@ class ServerNegotiation {
   // release_socket().
   //
   // The provided TlsContext must outlive this negotiation instance.
-  explicit ServerNegotiation(std::unique_ptr<Socket> socket,
-                             const security::TlsContext* tls_context,
-                             const security::TokenVerifier* token_verifier);
+  ServerNegotiation(std::unique_ptr<Socket> socket,
+                    const security::TlsContext* tls_context,
+                    const security::TokenVerifier* token_verifier,
+                    RpcEncryption encryption);
 
   // Enable PLAIN authentication.
   // Despite PLAIN authentication taking a username and password, we disregard
@@ -215,6 +216,7 @@ class ServerNegotiation {
   // TLS state.
   const security::TlsContext* tls_context_;
   security::TlsHandshake tls_handshake_;
+  const RpcEncryption encryption_;
   bool tls_negotiated_;
 
   // TSK state.