You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/02/17 08:17:21 UTC

[1/2] kudu git commit: [security] Negotiate authentication type during RPC setup

Repository: kudu
Updated Branches:
  refs/heads/master 0b0781bee -> 4bac53279


http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/security/tls_handshake-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/tls_handshake-test.cc b/src/kudu/security/tls_handshake-test.cc
index 0d8b9a8..6aa019a 100644
--- a/src/kudu/security/tls_handshake-test.cc
+++ b/src/kudu/security/tls_handshake-test.cc
@@ -40,42 +40,28 @@ namespace security {
 
 using ca::CertSigner;
 
-enum class CertType {
-  NONE,
-  SELF_SIGNED,
-  SIGNED,
-};
-
 struct Case {
-  CertType client_cert;
+  PkiConfig client_pki;
   TlsVerificationMode client_verification;
-  CertType server_cert;
+  PkiConfig server_pki;
   TlsVerificationMode server_verification;
   Status expected_status;
 };
 
 // Beautifies CLI test output.
 std::ostream& operator<<(std::ostream& o, Case c) {
-
-  auto cert_type_name = [] (const CertType& cert_type) {
-    switch (cert_type) {
-      case CertType::NONE: return "NONE";
-      case CertType::SELF_SIGNED: return "SELF_SIGNED";
-      case CertType::SIGNED: return "SIGNED";
-    }
-  };
-
   auto verification_mode_name = [] (const TlsVerificationMode& verification_mode) {
     switch (verification_mode) {
       case TlsVerificationMode::VERIFY_NONE: return "NONE";
       case TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST: return "REMOTE_CERT_AND_HOST";
     }
+    return "unreachable";
   };
 
-  o << "{client-cert: " << cert_type_name(c.client_cert) << ", "
+  o << "{client-pki: " << c.client_pki << ", "
     << "client-verification: " << verification_mode_name(c.client_verification) << ", "
-    << "server-cert: " << cert_type_name(c.client_cert) << ", "
-    << "server-verification: " << verification_mode_name(c.client_verification) << ", "
+    << "server-pki: " << c.server_pki << ", "
+    << "server-verification: " << verification_mode_name(c.server_verification) << ", "
     << "expected-status: " << c.expected_status.ToString() << "}";
 
   return o;
@@ -87,11 +73,6 @@ class TestTlsHandshake : public KuduTest,
   void SetUp() override {
     KuduTest::SetUp();
 
-    // Tune down the RSA key length in order to speed up tests. We would tune it
-    // smaller, but at 512 bits OpenSSL returns a "digest too big for rsa key"
-    // error during negotiation.
-    FLAGS_server_rsa_key_length_bits = 1024;
-
     ASSERT_OK(client_tls_.Init());
     ASSERT_OK(server_tls_.Init());
   }
@@ -144,38 +125,14 @@ class TestTlsHandshake : public KuduTest,
   string key_path_;
 };
 
-namespace {
-Status InitTlsContextCert(const PrivateKey& ca_key,
-                          const Cert& ca_cert,
-                          TlsContext* tls_context,
-                          CertType cert_type) {
-  RETURN_NOT_OK(tls_context->AddTrustedCertificate(ca_cert));
-  switch (cert_type) {
-    case CertType::SIGNED: {
-      Cert cert;
-      RETURN_NOT_OK(tls_context->GenerateSelfSignedCertAndKey("test-uuid"));
-      RETURN_NOT_OK(CertSigner(&ca_cert, &ca_key).Sign(*tls_context->GetCsrIfNecessary(), &cert));
-      RETURN_NOT_OK(tls_context->AdoptSignedCert(cert));
-      break;
-    }
-    case CertType::SELF_SIGNED:
-      RETURN_NOT_OK(tls_context->GenerateSelfSignedCertAndKey("test-uuid"));
-      break;
-    case CertType::NONE:
-      break;
-  }
-  return Status::OK();
-}
-} // anonymous namespace
-
 TEST_F(TestTlsHandshake, TestHandshakeSequence) {
   PrivateKey ca_key;
   Cert ca_cert;
   ASSERT_OK(GenerateSelfSignedCAForTests(&ca_key, &ca_cert));
 
   // Both client and server have certs and CA.
-  ASSERT_OK(InitTlsContextCert(ca_key, ca_cert, &client_tls_, CertType::SIGNED));
-  ASSERT_OK(InitTlsContextCert(ca_key, ca_cert, &server_tls_, CertType::SIGNED));
+  ASSERT_OK(ConfigureTlsContext(PkiConfig::SIGNED, ca_cert, ca_key, &client_tls_));
+  ASSERT_OK(ConfigureTlsContext(PkiConfig::SIGNED, ca_cert, ca_key, &server_tls_));
 
   TlsHandshake server;
   TlsHandshake client;
@@ -278,8 +235,8 @@ TEST_P(TestTlsHandshake, TestHandshake) {
   Cert ca_cert;
   ASSERT_OK(GenerateSelfSignedCAForTests(&ca_key, &ca_cert));
 
-  ASSERT_OK(InitTlsContextCert(ca_key, ca_cert, &client_tls_, test_case.client_cert));
-  ASSERT_OK(InitTlsContextCert(ca_key, ca_cert, &server_tls_, test_case.server_cert));
+  ASSERT_OK(ConfigureTlsContext(test_case.client_pki, ca_cert, ca_key, &client_tls_));
+  ASSERT_OK(ConfigureTlsContext(test_case.server_pki, ca_cert, ca_key, &server_tls_));
 
   Status s = RunHandshake(test_case.client_verification, test_case.server_verification);
 
@@ -295,56 +252,85 @@ INSTANTIATE_TEST_CASE_P(CertCombinations,
         // has a self-signed cert, since we don't expect those to occur in
         // practice.
 
-        Case { CertType::NONE, TlsVerificationMode::VERIFY_NONE,
-               CertType::SELF_SIGNED, TlsVerificationMode::VERIFY_NONE,
+        Case { PkiConfig::NONE, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_NONE,
                Status::OK() },
-        Case { CertType::NONE, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
-               CertType::SELF_SIGNED, TlsVerificationMode::VERIFY_NONE,
+        Case { PkiConfig::NONE, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_NONE,
                Status::RuntimeError("client error:.*certificate verify failed") },
-        Case { CertType::NONE, TlsVerificationMode::VERIFY_NONE,
-               CertType::SELF_SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+        Case { PkiConfig::NONE, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
                Status::RuntimeError("server error:.*peer did not return a certificate") },
-        Case { CertType::NONE, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
-               CertType::SELF_SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+        Case { PkiConfig::NONE, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
                Status::RuntimeError("client error:.*certificate verify failed") },
 
-        Case { CertType::NONE, TlsVerificationMode::VERIFY_NONE,
-               CertType::SIGNED, TlsVerificationMode::VERIFY_NONE,
+        Case { PkiConfig::NONE, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_NONE,
                Status::OK() },
-        Case { CertType::NONE, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
-               CertType::SIGNED, TlsVerificationMode::VERIFY_NONE,
+        Case { PkiConfig::NONE, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_NONE,
+               Status::RuntimeError("client error:.*certificate verify failed") },
+        Case { PkiConfig::NONE, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               Status::RuntimeError("server error:.*peer did not return a certificate") },
+        Case { PkiConfig::NONE, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               Status::RuntimeError("client error:.*certificate verify failed") },
+
+        Case { PkiConfig::TRUSTED, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_NONE,
                Status::OK() },
-        Case { CertType::NONE, TlsVerificationMode::VERIFY_NONE,
-               CertType::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+        Case { PkiConfig::TRUSTED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_NONE,
+               Status::RuntimeError("client error:.*certificate verify failed") },
+        Case { PkiConfig::TRUSTED, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
                Status::RuntimeError("server error:.*peer did not return a certificate") },
-        Case { CertType::NONE, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
-               CertType::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+        Case { PkiConfig::TRUSTED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               Status::RuntimeError("client error:.*certificate verify failed") },
+
+        Case { PkiConfig::TRUSTED, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_NONE,
+               Status::OK() },
+        Case { PkiConfig::TRUSTED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_NONE,
+               Status::OK() },
+        Case { PkiConfig::TRUSTED, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               Status::RuntimeError("server error:.*peer did not return a certificate") },
+        Case { PkiConfig::TRUSTED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
                Status::RuntimeError("server error:.*peer did not return a certificate") },
 
-        Case { CertType::SIGNED, TlsVerificationMode::VERIFY_NONE,
-               CertType::SELF_SIGNED, TlsVerificationMode::VERIFY_NONE,
+        Case { PkiConfig::SIGNED, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_NONE,
                Status::OK() },
-        Case { CertType::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
-               CertType::SELF_SIGNED, TlsVerificationMode::VERIFY_NONE,
+        Case { PkiConfig::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_NONE,
                Status::RuntimeError("client error:.*certificate verify failed") },
-        Case { CertType::SIGNED, TlsVerificationMode::VERIFY_NONE,
-               CertType::SELF_SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
-               Status::OK() },
-        Case { CertType::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
-               CertType::SELF_SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+        Case { PkiConfig::SIGNED, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               // OpenSSL 1.0.0 returns "no certificate returned" for this case,
+               // which appears to be a bug.
+               Status::RuntimeError("server error:.*(certificate verify failed|"
+                                                    "no certificate returned)") },
+        Case { PkiConfig::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
                Status::RuntimeError("client error:.*certificate verify failed") },
 
-        Case { CertType::SIGNED, TlsVerificationMode::VERIFY_NONE,
-               CertType::SIGNED, TlsVerificationMode::VERIFY_NONE,
+        Case { PkiConfig::SIGNED, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_NONE,
                Status::OK() },
-        Case { CertType::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
-               CertType::SIGNED, TlsVerificationMode::VERIFY_NONE,
+        Case { PkiConfig::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_NONE,
                Status::OK() },
-        Case { CertType::SIGNED, TlsVerificationMode::VERIFY_NONE,
-               CertType::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+        Case { PkiConfig::SIGNED, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
                Status::OK() },
-        Case { CertType::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
-               CertType::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+        Case { PkiConfig::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
                Status::OK() }
 ));
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/security/tls_handshake.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/tls_handshake.cc b/src/kudu/security/tls_handshake.cc
index 26bfa9f..f57f24f 100644
--- a/src/kudu/security/tls_handshake.cc
+++ b/src/kudu/security/tls_handshake.cc
@@ -141,6 +141,8 @@ Status TlsHandshake::Verify(const Socket& socket) const {
     return Status::OK();
   }
 
+  // TODO(PKI): KUDU-1886: Do hostname verification.
+  /*
   TRACE("Verifying peer cert");
 
   // Get the peer's hostname
@@ -167,6 +169,7 @@ Status TlsHandshake::Verify(const Socket& socket) const {
     return Status::RuntimeError("TLS certificate hostname verification error", GetOpenSSLErrors());
   }
   DCHECK_EQ(match, 1);
+  */
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/security/token-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/token-test.cc b/src/kudu/security/token-test.cc
index 0bb751d..ddc6a5c 100644
--- a/src/kudu/security/token-test.cc
+++ b/src/kudu/security/token-test.cc
@@ -131,13 +131,14 @@ TEST_F(TokenTest, TestEndToEnd_Valid) {
   ASSERT_OK(signer.RotateSigningKey());
 
   // Make and sign a token.
-  SignedTokenPB token = MakeUnsignedToken(WallTime_Now() + 600);
-  ASSERT_OK(signer.SignToken(&token));
+  SignedTokenPB signed_token = MakeUnsignedToken(WallTime_Now() + 600);
+  ASSERT_OK(signer.SignToken(&signed_token));
 
   // Try to verify it.
   TokenVerifier verifier;
   ASSERT_OK(verifier.ImportPublicKeys(signer.GetTokenSigningPublicKeys(0)));
-  ASSERT_EQ(VerificationResult::VALID, verifier.VerifyTokenSignature(token));
+  TokenPB token;
+  ASSERT_EQ(VerificationResult::VALID, verifier.VerifyTokenSignature(signed_token, &token));
 }
 
 // Test all of the possible cases covered by token verification.
@@ -151,32 +152,40 @@ TEST_F(TokenTest, TestEndToEnd_InvalidCases) {
 
   // Make and sign a token, but corrupt the data in it.
   {
-    SignedTokenPB token = MakeUnsignedToken(WallTime_Now() + 600);
-    ASSERT_OK(signer.SignToken(&token));
-    token.set_token_data("xyz");
-    ASSERT_EQ(VerificationResult::INVALID_TOKEN, verifier.VerifyTokenSignature(token));
+    SignedTokenPB signed_token = MakeUnsignedToken(WallTime_Now() + 600);
+    ASSERT_OK(signer.SignToken(&signed_token));
+    signed_token.set_token_data("xyz");
+    TokenPB token;
+    ASSERT_EQ(VerificationResult::INVALID_TOKEN,
+              verifier.VerifyTokenSignature(signed_token, &token));
   }
 
   // Make and sign a token, but corrupt the signature.
   {
-    SignedTokenPB token = MakeUnsignedToken(WallTime_Now() + 600);
-    ASSERT_OK(signer.SignToken(&token));
-    token.set_signature("xyz");
-    ASSERT_EQ(VerificationResult::INVALID_SIGNATURE, verifier.VerifyTokenSignature(token));
+    SignedTokenPB signed_token = MakeUnsignedToken(WallTime_Now() + 600);
+    ASSERT_OK(signer.SignToken(&signed_token));
+    signed_token.set_signature("xyz");
+    TokenPB token;
+    ASSERT_EQ(VerificationResult::INVALID_SIGNATURE,
+              verifier.VerifyTokenSignature(signed_token, &token));
   }
 
   // Make and sign a token, but set it to be already expired.
   {
-    SignedTokenPB token = MakeUnsignedToken(WallTime_Now() - 10);
-    ASSERT_OK(signer.SignToken(&token));
-    ASSERT_EQ(VerificationResult::EXPIRED_TOKEN, verifier.VerifyTokenSignature(token));
+    SignedTokenPB signed_token = MakeUnsignedToken(WallTime_Now() - 10);
+    ASSERT_OK(signer.SignToken(&signed_token));
+    TokenPB token;
+    ASSERT_EQ(VerificationResult::EXPIRED_TOKEN,
+              verifier.VerifyTokenSignature(signed_token, &token));
   }
 
   // Make and sign a token which uses an incompatible feature flag.
   {
-    SignedTokenPB token = MakeIncompatibleToken();
-    ASSERT_OK(signer.SignToken(&token));
-    ASSERT_EQ(VerificationResult::INCOMPATIBLE_FEATURE, verifier.VerifyTokenSignature(token));
+    SignedTokenPB signed_token = MakeIncompatibleToken();
+    ASSERT_OK(signer.SignToken(&signed_token));
+    TokenPB token;
+    ASSERT_EQ(VerificationResult::INCOMPATIBLE_FEATURE,
+              verifier.VerifyTokenSignature(signed_token, &token));
   }
 
   // Rotate to a new key, but don't inform the verifier of it yet. When we
@@ -184,9 +193,11 @@ TEST_F(TokenTest, TestEndToEnd_InvalidCases) {
   ASSERT_OK(signer.RotateSigningKey());
   ASSERT_OK(signer.RotateSigningKey());
   {
-    SignedTokenPB token = MakeUnsignedToken(WallTime_Now() + 600);
-    ASSERT_OK(signer.SignToken(&token));
-    ASSERT_EQ(VerificationResult::UNKNOWN_SIGNING_KEY, verifier.VerifyTokenSignature(token));
+    SignedTokenPB signed_token = MakeUnsignedToken(WallTime_Now() + 600);
+    ASSERT_OK(signer.SignToken(&signed_token));
+    TokenPB token;
+    ASSERT_EQ(VerificationResult::UNKNOWN_SIGNING_KEY,
+              verifier.VerifyTokenSignature(signed_token, &token));
   }
 
   // Rotate to a signing key which is already expired, and inform the verifier
@@ -198,9 +209,11 @@ TEST_F(TokenTest, TestEndToEnd_InvalidCases) {
   ASSERT_OK(verifier.ImportPublicKeys(signer.GetTokenSigningPublicKeys(
       verifier.GetMaxKnownKeySequenceNumber())));
   {
-    SignedTokenPB token = MakeUnsignedToken(WallTime_Now() + 600);
-    ASSERT_OK(signer.SignToken(&token));
-    ASSERT_EQ(VerificationResult::EXPIRED_SIGNING_KEY, verifier.VerifyTokenSignature(token));
+    SignedTokenPB signed_token = MakeUnsignedToken(WallTime_Now() + 600);
+    ASSERT_OK(signer.SignToken(&signed_token));
+    TokenPB token;
+    ASSERT_EQ(VerificationResult::EXPIRED_SIGNING_KEY,
+              verifier.VerifyTokenSignature(signed_token, &token));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/security/token_signer.h
----------------------------------------------------------------------
diff --git a/src/kudu/security/token_signer.h b/src/kudu/security/token_signer.h
index 2824be0..259b7b5 100644
--- a/src/kudu/security/token_signer.h
+++ b/src/kudu/security/token_signer.h
@@ -98,7 +98,7 @@ class TokenSigner {
   ~TokenSigner();
 
   // Sign the given token using the current TSK.
-  Status SignToken(SignedTokenPB* token) const;
+  Status SignToken(SignedTokenPB* token) const WARN_UNUSED_RESULT;
 
   // Returns the set of valid public keys with sequence numbers greater
   // than 'after_sequence_number'.
@@ -108,7 +108,7 @@ class TokenSigner {
   // Rotate to a new token-signing key.
   //
   // See class documentation for more information.
-  Status RotateSigningKey();
+  Status RotateSigningKey() WARN_UNUSED_RESULT;
 
  private:
   // Protects following fields.

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/security/token_verifier.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/token_verifier.cc b/src/kudu/security/token_verifier.cc
index 2150c91..cb1c0a1 100644
--- a/src/kudu/security/token_verifier.cc
+++ b/src/kudu/security/token_verifier.cc
@@ -82,28 +82,25 @@ Status TokenVerifier::ImportPublicKeys(const vector<TokenSigningPublicKeyPB>& pu
 }
 
 // Verify the signature on the given token.
-VerificationResult TokenVerifier::VerifyTokenSignature(
-    const SignedTokenPB& signed_token) const {
+VerificationResult TokenVerifier::VerifyTokenSignature(const SignedTokenPB& signed_token,
+                                                       TokenPB* token) const {
   if (!signed_token.has_signature() ||
       !signed_token.has_signing_key_seq_num() ||
       !signed_token.has_token_data()) {
     return VerificationResult::INVALID_TOKEN;
   }
 
-  // TODO(perf): should we return the deserialized TokenPB here
-  // since callers are probably going to need it, anyway?
-  TokenPB token;
-  if (!token.ParseFromString(signed_token.token_data()) ||
-      !token.has_expire_unix_epoch_seconds()) {
+  if (!token->ParseFromString(signed_token.token_data()) ||
+      !token->has_expire_unix_epoch_seconds()) {
     return VerificationResult::INVALID_TOKEN;
   }
 
   int64_t now = WallTime_Now();
-  if (token.expire_unix_epoch_seconds() < now) {
+  if (token->expire_unix_epoch_seconds() < now) {
     return VerificationResult::EXPIRED_TOKEN;
   }
 
-  for (auto flag : token.incompatible_features()) {
+  for (auto flag : token->incompatible_features()) {
     if (!TokenPB::Feature_IsValid(flag)) {
       return VerificationResult::INCOMPATIBLE_FEATURE;
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/security/token_verifier.h
----------------------------------------------------------------------
diff --git a/src/kudu/security/token_verifier.h b/src/kudu/security/token_verifier.h
index a35af22..3466fda 100644
--- a/src/kudu/security/token_verifier.h
+++ b/src/kudu/security/token_verifier.h
@@ -25,9 +25,10 @@
 namespace kudu {
 namespace security {
 
+class SignedTokenPB;
+class TokenPB;
 class TokenSigningPublicKey;
 class TokenSigningPublicKeyPB;
-class SignedTokenPB;
 enum class VerificationResult;
 
 // Class responsible for verifying tokens provided to a server.
@@ -64,8 +65,10 @@ class TokenVerifier {
   Status ImportPublicKeys(const std::vector<TokenSigningPublicKeyPB>& public_keys)
     WARN_UNUSED_RESULT;
 
-  // Verify the signature on the given token.
-  VerificationResult VerifyTokenSignature(const SignedTokenPB& signed_token) const;
+  // Verify the signature on the given signed token, and deserialize the
+  // contents into 'token'.
+  VerificationResult VerifyTokenSignature(const SignedTokenPB& signed_token,
+                                          TokenPB* token) const;
 
   // TODO(PKI): should expire out old key versions at some point. eg only
   // keep old key versions until their expiration is an hour or two in the past?

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/server/server_base.h
----------------------------------------------------------------------
diff --git a/src/kudu/server/server_base.h b/src/kudu/server/server_base.h
index 6f17985..a76a338 100644
--- a/src/kudu/server/server_base.h
+++ b/src/kudu/server/server_base.h
@@ -49,6 +49,7 @@ class ServiceIf;
 
 namespace security {
 class TlsContext;
+class TokenVerifier;
 } // namespace security
 
 namespace server {
@@ -76,9 +77,12 @@ class ServerBase {
 
   FsManager* fs_manager() { return fs_manager_.get(); }
 
-  const security::TlsContext& tls_context() { return messenger_->tls_context(); }
+  const security::TlsContext& tls_context() const { return messenger_->tls_context(); }
   security::TlsContext* mutable_tls_context() { return messenger_->mutable_tls_context(); }
 
+  const security::TokenVerifier& token_verifier() const { return messenger_->token_verifier(); }
+  security::TokenVerifier* mutable_token_verifier() { return messenger_->mutable_token_verifier(); }
+
   // Return the instance identifier of this server.
   // This may not be called until after the server is Started.
   const NodeInstancePB& instance_pb() const;

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/tserver/heartbeater.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc
index f0438ad..512f680 100644
--- a/src/kudu/tserver/heartbeater.cc
+++ b/src/kudu/tserver/heartbeater.cc
@@ -32,6 +32,7 @@
 #include "kudu/master/master.proxy.h"
 #include "kudu/security/cert.h"
 #include "kudu/security/tls_context.h"
+#include "kudu/security/token_verifier.h"
 #include "kudu/server/webserver.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/tablet_server_options.h"
@@ -372,6 +373,10 @@ Status Heartbeater::Thread::DoHeartbeat() {
   // TODO(PKI): send the version number of the latest CA cert which we know about.
   // The response should include new CA certs.
 
+  // Send the most recently known TSK sequence number so that the master can
+  // send us knew ones if they exist.
+  req.set_latest_tsk_seq_num(server_->token_verifier().GetMaxKnownKeySequenceNumber());
+
   if (send_full_tablet_report_) {
     LOG(INFO) << Substitute(
         "Master $0 was elected leader, sending a full tablet report...",
@@ -437,6 +442,15 @@ Status Heartbeater::Thread::DoHeartbeat() {
         "failed to adopt master-signed X509 cert");
   }
 
+  // Import TSKs.
+  if (!last_hb_response_.tsks().empty()) {
+    vector<security::TokenSigningPublicKeyPB> tsks(last_hb_response_.tsks().begin(),
+                                                   last_hb_response_.tsks().end());
+    RETURN_NOT_OK_PREPEND(
+        server_->mutable_token_verifier()->ImportPublicKeys(tsks),
+        "failed to import token signing public keys from master heartbeat");
+  }
+
   MarkTabletReportAcknowledged(req.tablet_report());
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/util/test_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/test_util.cc b/src/kudu/util/test_util.cc
index 4228e85..217f4dc 100644
--- a/src/kudu/util/test_util.cc
+++ b/src/kudu/util/test_util.cc
@@ -72,8 +72,8 @@ KuduTest::KuduTest()
     // Disable log redaction.
     {"log_redact_user_data", "false"},
     // Reduce default RSA key length for faster tests.
-    {"server_rsa_key_length_bits", "512"},
-    {"master_ca_rsa_key_length_bits", "512"}
+    {"server_rsa_key_length_bits", "1024"},
+    {"master_ca_rsa_key_length_bits", "1024"}
   };
   for (const auto& e : flags_for_tests) {
     // We don't check for errors here, because we have some default flags that


[2/2] kudu git commit: [security] Negotiate authentication type during RPC setup

Posted by to...@apache.org.
[security] Negotiate authentication type during RPC setup

This commit introduces the concept of an authentication type to the RPC
negotiation sequence. The current valid authentication types are SASL,
CERTIFICATE, and TOKEN. Early in the negotiation sequence the client and
server decide on an authentication type to use for the connection based
on the mutually supported capabilities. If either side does not support
the new authentication negotiation, the connection automatically falls
back to SASL authentication in order to maintain backwards
compatibility.

This commit also adds TSKs to the master<->tserver heartbeat protocol,
so that tokens may be verified on the server. The client's authn token
has been moved to the messenger to make it more accessible to
negotiation.

negotiation-test has been updated with a general test runner for
different negotiation configurations.  There are so many possible
negotiation configurations that an exhaustive set of tests is not
attempted, but it should be straightforward to add additional tests in
the future.

Change-Id: I8ed9a1a474990dbfe9b71173adffdec95ec02b6c
Reviewed-on: http://gerrit.cloudera.org:8080/5988
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4bac5327
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4bac5327
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4bac5327

Branch: refs/heads/master
Commit: 4bac532799de315e0720af167403b38be0e3e421
Parents: 0b0781b
Author: Dan Burkert <da...@apache.org>
Authored: Wed Feb 8 13:36:28 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Fri Feb 17 08:02:35 2017 +0000

----------------------------------------------------------------------
 src/kudu/client/client-internal.cc              |   8 +-
 src/kudu/client/client-internal.h               |   4 -
 src/kudu/client/client-test.cc                  |   2 +-
 .../integration-tests/external_mini_cluster.cc  |   9 +-
 src/kudu/master/master.proto                    |   7 +
 src/kudu/rpc/CMakeLists.txt                     |   2 +-
 src/kudu/rpc/client_negotiation.cc              | 148 +++-
 src/kudu/rpc/client_negotiation.h               |  27 +-
 src/kudu/rpc/messenger.cc                       |   2 +
 src/kudu/rpc/messenger.h                        |  21 +
 src/kudu/rpc/negotiation-test.cc                | 783 +++++++++++++------
 src/kudu/rpc/negotiation.cc                     |  22 +-
 src/kudu/rpc/negotiation.h                      |  10 +
 src/kudu/rpc/rpc_header.proto                   |  35 +
 src/kudu/rpc/server_negotiation.cc              | 243 ++++--
 src/kudu/rpc/server_negotiation.h               |  43 +-
 src/kudu/security/security-test-util.cc         |  34 +
 src/kudu/security/security-test-util.h          |  23 +
 src/kudu/security/tls_context.h                 |   9 +-
 src/kudu/security/tls_handshake-test.cc         | 158 ++--
 src/kudu/security/tls_handshake.cc              |   3 +
 src/kudu/security/token-test.cc                 |  59 +-
 src/kudu/security/token_signer.h                |   4 +-
 src/kudu/security/token_verifier.cc             |  15 +-
 src/kudu/security/token_verifier.h              |   9 +-
 src/kudu/server/server_base.h                   |   6 +-
 src/kudu/tserver/heartbeater.cc                 |  14 +
 src/kudu/util/test_util.cc                      |   4 +-
 28 files changed, 1256 insertions(+), 448 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/client/client-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index f761a9b..0b64cf2 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -618,14 +618,16 @@ void KuduClient::Data::ConnectedToClusterCb(
     }
   }
 
+  // Adopt the authentication token from the response, if it's been set.
+  if (connect_response.has_authn_token()) {
+    messenger_->set_authn_token(connect_response.authn_token());
+  }
+
   vector<StatusCallback> cbs;
   {
     std::lock_guard<simple_spinlock> l(leader_master_lock_);
     cbs.swap(leader_master_callbacks_);
     leader_master_rpc_.reset();
-    if (connect_response.has_authn_token()) {
-      authn_token_ = connect_response.authn_token();
-    }
 
     if (status.ok()) {
       leader_master_hostport_ = HostPort(leader_addr);

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/client/client-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.h b/src/kudu/client/client-internal.h
index 146a886..864a734 100644
--- a/src/kudu/client/client-internal.h
+++ b/src/kudu/client/client-internal.h
@@ -233,10 +233,6 @@ class KuduClient::Data {
   scoped_refptr<internal::ConnectToClusterRpc> leader_master_rpc_;
   std::vector<StatusCallback> leader_master_callbacks_;
 
-  // The latest authentication token that this client should use to talk
-  // to the cluster.
-  boost::optional<security::SignedTokenPB> authn_token_;
-
   // Protects 'leader_master_rpc_', 'leader_master_hostport_',
   // and master_proxy_
   //

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 26d2f6b..6f652c9 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -4655,7 +4655,7 @@ TEST_F(ClientTest, TestConnectToClusterCompatibility) {
 // certificate authority and authentication token from the master.
 TEST_F(ClientTest, TestGetSecurityInfoFromMaster) {
   // Client is already connected when the test starts.
-  ASSERT_TRUE(client_->data_->authn_token_ != boost::none);
+  ASSERT_TRUE(client_->data_->messenger_->authn_token() != boost::none);
   ASSERT_EQ(1, client_->data_->messenger_->tls_context().trusted_cert_count_for_tests());
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/integration-tests/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.cc b/src/kudu/integration-tests/external_mini_cluster.cc
index a3a3f1c..3f4b67d 100644
--- a/src/kudu/integration-tests/external_mini_cluster.cc
+++ b/src/kudu/integration-tests/external_mini_cluster.cc
@@ -624,10 +624,11 @@ Status ExternalDaemon::StartProcess(const vector<string>& user_flags) {
   // rely on forcefully cutting power to a machine or equivalent.
   argv.push_back("--never_fsync");
 
-  // Generate smaller RSA keys -- generating a 512-bit key is >15x faster
+  // Generate smaller RSA keys -- generating a 1024-bit key is faster
   // than generating the default 2048-bit key, and we don't care about
-  // strong encryption in tests.
-  argv.push_back("--server_rsa_key_length_bits=512");
+  // strong encryption in tests. Setting it lower (e.g. 512 bits) results
+  // in OpenSSL errors RSA_sign:digest too big for rsa key:rsa_sign.c:122.
+  argv.push_back("--server_rsa_key_length_bits=1024");
 
   // Disable minidumps by default since many tests purposely inject faults.
   argv.push_back("--enable_minidumps=false");
@@ -1011,7 +1012,7 @@ Status ExternalMaster::Start() {
   vector<string> flags;
 
   // Generate smaller RSA keys. See note above for server_rsa_key_length_bits.
-  flags.push_back("--master_ca_rsa_key_length_bits=512");
+  flags.push_back("--master_ca_rsa_key_length_bits=1024");
 
   flags.push_back("--fs_wal_dir=" + data_dir_);
   flags.push_back("--fs_data_dirs=" + data_dir_);

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/master/master.proto
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 9527ab5..02ddd1d 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -269,6 +269,10 @@ message TSHeartbeatRequestPB {
   // If the tablet server needs its certificate signed, the CSR
   // in DER format.
   optional bytes csr_der = 5;
+
+  // The most recently known TSK sequence number. Allows the master to
+  // selectively notify the tablet server of more recent TSKs.
+  optional int64 latest_tsk_seq_num = 6;
 }
 
 message TSHeartbeatResponsePB {
@@ -307,6 +311,9 @@ message TSHeartbeatResponsePB {
   // independent certs to be trusted. They may or may not have any signing
   // relationship between them.
   repeated bytes ca_cert_der = 8;
+
+  // Token signing keys which the tablet server should begin trusting.
+  repeated security.TokenSigningPublicKeyPB tsks = 9;
 }
 
 //////////////////////////////

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/CMakeLists.txt b/src/kudu/rpc/CMakeLists.txt
index 95b4268..6cfb3b6 100644
--- a/src/kudu/rpc/CMakeLists.txt
+++ b/src/kudu/rpc/CMakeLists.txt
@@ -23,7 +23,7 @@ PROTOBUF_GENERATE_CPP(
   PROTO_FILES rpc_header.proto)
 ADD_EXPORTABLE_LIBRARY(rpc_header_proto
   SRCS ${RPC_HEADER_PROTO_SRCS}
-  DEPS protobuf pb_util_proto
+  DEPS protobuf pb_util_proto token_proto
   NONLINK_DEPS ${RPC_HEADER_PROTO_TGTS})
 
 PROTOBUF_GENERATE_CPP(

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/rpc/client_negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/client_negotiation.cc b/src/kudu/rpc/client_negotiation.cc
index 63b3929..a58348c 100644
--- a/src/kudu/rpc/client_negotiation.cc
+++ b/src/kudu/rpc/client_negotiation.cc
@@ -104,11 +104,14 @@ static Status StatusFromRpcError(const ErrorStatusPB& error) {
 }
 
 ClientNegotiation::ClientNegotiation(unique_ptr<Socket> socket,
-                                     const security::TlsContext* tls_context)
+                                     const security::TlsContext* tls_context,
+                                     const boost::optional<security::SignedTokenPB>& authn_token)
     : socket_(std::move(socket)),
       helper_(SaslHelper::CLIENT),
       tls_context_(tls_context),
       tls_negotiated_(false),
+      authn_token_(authn_token),
+      negotiated_authn_(AuthenticationType::INVALID),
       negotiated_mech_(SaslMechanism::INVALID),
       deadline_(MonoTime::Max()) {
   callbacks_.push_back(SaslBuildCallback(SASL_CB_GETOPT,
@@ -169,15 +172,10 @@ Status ClientNegotiation::Negotiate() {
     RETURN_NOT_OK(tls_context_->InitiateHandshake(security::TlsHandshakeType::CLIENT,
                                                   &tls_handshake_));
 
-    if (negotiated_mech_ == SaslMechanism::GSSAPI ||
-        negotiated_mech_ == SaslMechanism::PLAIN) {
-      // When using GSSAPI, we don't verify the server's certificate. Instead,
-      // we rely on Kerberos authentication, and use channel binding to tie the
-      // SASL authentication to the TLS channel.
-      //
-      // When using 'PLAIN' authentication, this implies that strong authentication
-      // is not enabled. So, we are just using TLS for encryption and don't need to
-      // validate a cert.
+    if (negotiated_authn_ == AuthenticationType::SASL) {
+      // When using SASL authentication, verifying the server's certificate is
+      // not necessary. This allows the client to still use TLS encryption for
+      // connections to servers which only have a self-signed certificate.
       tls_handshake_.set_verification_mode(security::TlsVerificationMode::VERIFY_NONE);
     }
 
@@ -197,27 +195,18 @@ Status ClientNegotiation::Negotiate() {
     tls_negotiated_ = true;
   }
 
-  // Step 4: SASL negotiation.
-  RETURN_NOT_OK(InitSaslClient());
-  RETURN_NOT_OK(SendSaslInitiate());
-  for (bool cont = true; cont; ) {
-    NegotiatePB response;
-    RETURN_NOT_OK(RecvNegotiatePB(&response, &recv_buf));
-    Status s;
-    switch (response.step()) {
-      // SASL_CHALLENGE: Server sent us a follow-up to an SASL_INITIATE or SASL_RESPONSE request.
-      case NegotiatePB::SASL_CHALLENGE:
-        RETURN_NOT_OK(HandleSaslChallenge(response));
-        break;
-      // SASL_SUCCESS: Server has accepted our authentication request. Negotiation successful.
-      case NegotiatePB::SASL_SUCCESS:
-        RETURN_NOT_OK(HandleSaslSuccess(response));
-        cont = false;
-        break;
-      default:
-        return Status::NotAuthorized("expected SASL_CHALLENGE or SASL_SUCCESS step",
-                                     NegotiatePB::NegotiateStep_Name(response.step()));
-    }
+  // Step 4: Authentication
+  switch (negotiated_authn_) {
+    case AuthenticationType::SASL:
+      RETURN_NOT_OK(AuthenticateBySasl(&recv_buf));
+      break;
+    case AuthenticationType::TOKEN:
+      RETURN_NOT_OK(AuthenticateByToken(&recv_buf));
+      break;
+    case AuthenticationType::CERTIFICATE:
+      // The TLS handshake has already authenticated the server.
+      break;
+    case AuthenticationType::INVALID: LOG(FATAL) << "unreachable";
   }
 
   // Step 5: Send connection context.
@@ -310,6 +299,22 @@ Status ClientNegotiation::SendNegotiate() {
     msg.add_supported_features(feature);
   }
 
+  if (!helper_.EnabledMechs().empty()) {
+    msg.add_authn_types()->mutable_sasl();
+  }
+  if (tls_context_->has_signed_cert()) {
+    msg.add_authn_types()->mutable_certificate();
+  }
+  if (authn_token_ && tls_context_->has_trusted_cert()) {
+    // TODO(PKI): check that the authn token is not expired. Can this be done
+    // reliably on clients?
+    msg.add_authn_types()->mutable_token();
+  }
+
+  if (PREDICT_FALSE(msg.authn_types().empty())) {
+    return Status::NotAuthorized("client is not configured with an authentication type");
+  }
+
   RETURN_NOT_OK(SendNegotiatePB(msg));
   return Status::OK();
 }
@@ -331,6 +336,39 @@ Status ClientNegotiation::HandleNegotiate(const NegotiatePB& response) {
     }
   }
 
+  // Get the authentication type which the server would like to use.
+  DCHECK_LE(response.authn_types().size(), 1);
+  if (response.authn_types().empty()) {
+    // If the server doesn't send back an authentication type, default to SASL
+    // in order to maintain backwards compatibility.
+    negotiated_authn_ = AuthenticationType::SASL;
+  } else {
+    const auto& authn_type = response.authn_types(0);
+    switch (authn_type.type_case()) {
+      case AuthenticationTypePB::kSasl:
+        negotiated_authn_ = AuthenticationType::SASL;
+        break;
+      case AuthenticationTypePB::kToken:
+        if (!authn_token_) {
+          return Status::RuntimeError(
+              "server chose token authentication, but client has no token");
+        }
+        negotiated_authn_ = AuthenticationType::TOKEN;
+        return Status::OK();
+      case AuthenticationTypePB::kCertificate:
+        if (!tls_context_->has_signed_cert()) {
+          return Status::RuntimeError(
+              "server chose certificate authentication, but client has no certificate");
+        }
+        negotiated_authn_ = AuthenticationType::CERTIFICATE;
+        return Status::OK();
+      case AuthenticationTypePB::TYPE_NOT_SET:
+        return Status::RuntimeError("server chose an unknown authentication type");
+    }
+  }
+
+  DCHECK_EQ(negotiated_authn_, AuthenticationType::SASL);
+
   // Build a map of the SASL mechanisms offered by the server.
   const set<SaslMechanism::Type>& client_mechs = helper_.EnabledMechs();
   set<SaslMechanism::Type> server_mechs;
@@ -428,6 +466,50 @@ Status ClientNegotiation::HandleTlsHandshake(const NegotiatePB& response) {
   return tls_handshake_.Finish(&socket_);
 }
 
+Status ClientNegotiation::AuthenticateBySasl(faststring* recv_buf) {
+  RETURN_NOT_OK(InitSaslClient());
+  RETURN_NOT_OK(SendSaslInitiate());
+  NegotiatePB response;
+  while (true) {
+    RETURN_NOT_OK(RecvNegotiatePB(&response, recv_buf));
+    Status s;
+    switch (response.step()) {
+      // SASL_CHALLENGE: Server sent us a follow-up to a SASL_INITIATE or SASL_RESPONSE request.
+      case NegotiatePB::SASL_CHALLENGE:
+        RETURN_NOT_OK(HandleSaslChallenge(response));
+        break;
+      // SASL_SUCCESS: Server has accepted our authentication request. Negotiation successful.
+      case NegotiatePB::SASL_SUCCESS:
+        return HandleSaslSuccess(response);
+      default:
+        return Status::NotAuthorized("expected SASL_CHALLENGE or SASL_SUCCESS step",
+                                     NegotiatePB::NegotiateStep_Name(response.step()));
+    }
+  }
+}
+
+Status ClientNegotiation::AuthenticateByToken(faststring* recv_buf) {
+  // Sanity check that TLS has been negotiated. Sending the token on an
+  // unencrypted channel is a big no-no.
+  CHECK(tls_negotiated_);
+
+  // Send the token to the server.
+  NegotiatePB pb;
+  pb.set_step(NegotiatePB::TOKEN_EXCHANGE);
+  pb.mutable_authn_token()->Swap(authn_token_.get_ptr());
+  RETURN_NOT_OK(SendNegotiatePB(pb));
+  pb.Clear();
+
+  // Check that the server responds with a non-error TOKEN_EXCHANGE message.
+  RETURN_NOT_OK(RecvNegotiatePB(&pb, recv_buf));
+  if (pb.step() != NegotiatePB::TOKEN_EXCHANGE) {
+    return Status::NotAuthorized("expected TOKEN_EXCHANGE step",
+                                 NegotiatePB::NegotiateStep_Name(pb.step()));
+  }
+
+  return Status::OK();
+}
+
 Status ClientNegotiation::SendSaslInitiate() {
   TRACE("Initiating SASL $0 handshake", negotiated_mech_);
 
@@ -512,7 +594,9 @@ Status ClientNegotiation::HandleSaslChallenge(const NegotiatePB& response) {
 Status ClientNegotiation::HandleSaslSuccess(const NegotiatePB& response) {
   TRACE("Received SASL_SUCCESS response from server");
 
-  if (tls_negotiated_ && negotiated_mech_ == SaslMechanism::Type::GSSAPI) {
+  if (tls_negotiated_ &&
+      negotiated_authn_ == AuthenticationType::SASL &&
+      negotiated_mech_ == SaslMechanism::GSSAPI) {
     // Check the channel bindings provided by the server against the expected
     // channel bindings.
     security::Cert cert;

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/rpc/client_negotiation.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/client_negotiation.h b/src/kudu/rpc/client_negotiation.h
index b33694e..c68e9ec 100644
--- a/src/kudu/rpc/client_negotiation.h
+++ b/src/kudu/rpc/client_negotiation.h
@@ -22,13 +22,16 @@
 #include <string>
 #include <vector>
 
+#include <boost/optional.hpp>
 #include <sasl/sasl.h>
 
 #include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/rpc/negotiation.h"
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/sasl_common.h"
 #include "kudu/rpc/sasl_helper.h"
 #include "kudu/security/tls_handshake.h"
+#include "kudu/security/token.pb.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/socket.h"
 #include "kudu/util/status.h"
@@ -56,7 +59,8 @@ class ClientNegotiation {
   //
   // The provided TlsContext must outlive this negotiation instance.
   explicit ClientNegotiation(std::unique_ptr<Socket> socket,
-                             const security::TlsContext* tls_context);
+                             const security::TlsContext* tls_context,
+                             const boost::optional<security::SignedTokenPB>& authn_token);
 
   // Enable PLAIN authentication.
   // Must be called before Negotiate().
@@ -71,6 +75,13 @@ class ClientNegotiation {
   // Must be called after Negotiate().
   SaslMechanism::Type negotiated_mechanism() const;
 
+  // Returns the negotiated authentication type for the connection.
+  // Must be called after Negotiate().
+  AuthenticationType negotiated_authn() const {
+    DCHECK_NE(negotiated_authn_, AuthenticationType::INVALID);
+    return negotiated_authn_;
+  }
+
   // Returns true if TLS was negotiated.
   // Must be called after Negotiate().
   bool tls_negotiated() const {
@@ -152,6 +163,14 @@ class ClientNegotiation {
   // Handle a TLS_HANDSHAKE response message from the server.
   Status HandleTlsHandshake(const NegotiatePB& response) WARN_UNUSED_RESULT;
 
+  // Authenticate to the server using SASL.
+  // 'recv_buf' allows a receive buffer to be reused.
+  Status AuthenticateBySasl(faststring* recv_buf) WARN_UNUSED_RESULT;
+
+  // Authenticate to the server using a token.
+  // 'recv_buf' allows a receive buffer to be reused.
+  Status AuthenticateByToken(faststring* recv_buf) WARN_UNUSED_RESULT;
+
   // Send an SASL_INITIATE message to the server.
   Status SendSaslInitiate() WARN_UNUSED_RESULT;
 
@@ -190,6 +209,9 @@ class ClientNegotiation {
   security::TlsHandshake tls_handshake_;
   bool tls_negotiated_;
 
+  // TSK state.
+  boost::optional<security::SignedTokenPB> authn_token_;
+
   // Authentication state.
   std::string plain_auth_user_;
   std::string plain_pass_;
@@ -203,6 +225,9 @@ class ClientNegotiation {
   // The set of features supported by the server. Filled in during negotiation.
   std::set<RpcFeatureFlag> server_features_;
 
+  // The authentication type. Filled in during negotiation.
+  AuthenticationType negotiated_authn_;
+
   // The SASL mechanism used by the connection. Filled in during negotiation.
   SaslMechanism::Type negotiated_mech_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/rpc/messenger.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 81116de..591e7d0 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -44,6 +44,7 @@
 #include "kudu/rpc/server_negotiation.h"
 #include "kudu/rpc/transfer.h"
 #include "kudu/security/tls_context.h"
+#include "kudu/security/token_verifier.h"
 #include "kudu/util/errno.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/metrics.h"
@@ -292,6 +293,7 @@ Messenger::Messenger(const MessengerBuilder &bld)
   : name_(bld.name_),
     closing_(false),
     tls_context_(new security::TlsContext()),
+    token_verifier_(new security::TokenVerifier()),
     rpcz_store_(new RpczStore()),
     metric_entity_(bld.metric_entity_),
     retain_self_(this) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index c9fda69..9bbecec 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -31,6 +31,7 @@
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/rpc/response_callback.h"
+#include "kudu/security/token.pb.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
@@ -44,6 +45,7 @@ class ThreadPool;
 
 namespace security {
 class TlsContext;
+class TokenVerifier;
 }
 
 namespace rpc {
@@ -195,6 +197,18 @@ class Messenger {
   const security::TlsContext& tls_context() const { return *tls_context_; }
   security::TlsContext* mutable_tls_context() { return tls_context_.get(); }
 
+  const security::TokenVerifier& token_verifier() const { return *token_verifier_; }
+  security::TokenVerifier* mutable_token_verifier() { return token_verifier_.get(); }
+
+  boost::optional<security::SignedTokenPB> authn_token() const {
+    std::lock_guard<simple_spinlock> l(authn_token_lock_);
+    return authn_token_;
+  }
+  void set_authn_token(const security::SignedTokenPB& token) {
+    std::lock_guard<simple_spinlock> l(authn_token_lock_);
+    authn_token_ = token;
+  }
+
   ThreadPool* negotiation_pool() const { return negotiation_pool_.get(); }
 
   RpczStore* rpcz_store() { return rpcz_store_.get(); }
@@ -250,6 +264,13 @@ class Messenger {
 
   std::unique_ptr<security::TlsContext> tls_context_;
 
+  // A TokenVerifier, which can verify client provided authentication tokens.
+  std::unique_ptr<security::TokenVerifier> token_verifier_;
+
+  // An optional token, which can be used to authenticate to a server.
+  mutable simple_spinlock authn_token_lock_;
+  boost::optional<security::SignedTokenPB> authn_token_;
+
   std::unique_ptr<RpczStore> rpcz_store_;
 
   scoped_refptr<MetricEntity> metric_entity_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/rpc/negotiation-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/negotiation-test.cc b/src/kudu/rpc/negotiation-test.cc
index 9cd4f97..3c36e4e 100644
--- a/src/kudu/rpc/negotiation-test.cc
+++ b/src/kudu/rpc/negotiation-test.cc
@@ -22,31 +22,35 @@
 
 #include <functional>
 #include <memory>
+#include <ostream>
 #include <string>
 #include <thread>
 
+#include <gflags/gflags.h>
 #include <gtest/gtest.h>
 #include <sasl/sasl.h>
 
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
 #include "kudu/rpc/client_negotiation.h"
 #include "kudu/rpc/constants.h"
-#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/negotiation.h"
 #include "kudu/rpc/server_negotiation.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/security-test-util.h"
 #include "kudu/security/test/mini_kdc.h"
 #include "kudu/security/tls_context.h"
 #include "kudu/security/tls_socket.h"
+#include "kudu/security/token_signer.h"
+#include "kudu/security/token_verifier.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
 #include "kudu/util/subprocess.h"
 
-using std::string;
-using std::thread;
-using std::unique_ptr;
-
 // HACK: MIT Kerberos doesn't have any way of determining its version number,
 // but the error messages in krb5-1.10 and earlier are broken due to
 // a bug: http://krbdev.mit.edu/rt/Ticket/Display.html?id=6973
@@ -62,11 +66,74 @@ DEFINE_bool(is_test_child, false,
             "Used by tests which require clean processes. "
             "See TestDisableInit.");
 DECLARE_bool(rpc_encrypt_loopback_connections);
+DECLARE_int32(server_rsa_key_length_bits);
+
+using std::string;
+using std::thread;
+using std::unique_ptr;
+
+using kudu::security::Cert;
+using kudu::security::PkiConfig;
+using kudu::security::PrivateKey;
+using kudu::security::SignedTokenPB;
+using kudu::security::TlsContext;
+using kudu::security::TokenSigner;
+using kudu::security::TokenVerifier;
 
 namespace kudu {
 namespace rpc {
 
-class TestNegotiation : public RpcTestBase {
+// The negotiation configuration for a client or server endpoint.
+struct EndpointConfig {
+  // The PKI configuration.
+  PkiConfig pki;
+  // The supported SASL mechanisms.
+  vector<SaslMechanism::Type> sasl_mechs;
+  // For the client, whether the client has the token.
+  // For the server, whether the server has the TSK.
+  bool token;
+};
+std::ostream& operator<<(std::ostream& o, EndpointConfig config) {
+  auto bool_string = [] (bool b) { return b ? "true" : "false"; };
+  o << "{pki: " << config.pki
+    << ", sasl-mechs: [" << JoinMapped(config.sasl_mechs, SaslMechanism::name_of, ", ")
+    << "], token: " << bool_string(config.token)
+    << "}";
+  return o;
+}
+
+// A description of a negotiation sequence, including client and server
+// configuration, as well as expected results.
+struct NegotiationDescriptor {
+  EndpointConfig client;
+  EndpointConfig server;
+
+  bool rpc_encrypt_loopback;
+
+  // The expected client status from negotiating.
+  Status client_status;
+  // The expected server status from negotiating.
+  Status server_status;
+
+  // The expected negotiated authentication type.
+  AuthenticationType negotiated_authn;
+
+  // The expected SASL mechanism, if SASL authentication is negotiated.
+  SaslMechanism::Type negotiated_mech;
+
+  // Whether the negotiation is expected to perform a TLS handshake.
+  bool tls_negotiated;
+};
+std::ostream& operator<<(std::ostream& o, NegotiationDescriptor c) {
+  auto bool_string = [] (bool b) { return b ? "true" : "false"; };
+  o << "{client: " << c.client
+    << ", server: " << c.server
+    << "}, rpc-encrypt-loopback: " << bool_string(c.rpc_encrypt_loopback);
+  return o;
+}
+
+class TestNegotiation : public RpcTestBase,
+                        public ::testing::WithParamInterface<NegotiationDescriptor> {
  public:
   void SetUp() override {
     RpcTestBase::SetUp();
@@ -74,6 +141,479 @@ class TestNegotiation : public RpcTestBase {
   }
 };
 
+TEST_P(TestNegotiation, TestNegotiation) {
+  NegotiationDescriptor desc = GetParam();
+
+  // Generate a trusted root certificate.
+  PrivateKey ca_key;
+  Cert ca_cert;
+  ASSERT_OK(GenerateSelfSignedCAForTests(&ca_key, &ca_cert));
+
+  // Create and configure a TLS context for each endpoint.
+  TlsContext client_tls_context;
+  TlsContext server_tls_context;
+  ASSERT_OK(client_tls_context.Init());
+  ASSERT_OK(server_tls_context.Init());
+  ASSERT_OK(ConfigureTlsContext(desc.client.pki, ca_cert, ca_key, &client_tls_context));
+  ASSERT_OK(ConfigureTlsContext(desc.server.pki, ca_cert, ca_key, &server_tls_context));
+
+  FLAGS_rpc_encrypt_loopback_connections = desc.rpc_encrypt_loopback;
+
+  // Generate an optional client token and server token verifier.
+  TokenSigner token_signer(1);
+  TokenVerifier token_verifier;
+  ASSERT_OK(token_signer.RotateSigningKey());
+  boost::optional<SignedTokenPB> authn_token;
+  if (desc.client.token) {
+    authn_token = SignedTokenPB();
+    security::TokenPB token;
+    token.set_expire_unix_epoch_seconds(WallTime_Now() + 60);
+    token.mutable_authn()->set_username("client-token");
+    ASSERT_TRUE(token.SerializeToString(authn_token->mutable_token_data()));
+    ASSERT_OK(token_signer.SignToken(&*authn_token));
+  }
+  if (desc.server.token) {
+    ASSERT_OK(token_verifier.ImportPublicKeys(token_signer.GetTokenSigningPublicKeys(0)));
+  }
+
+  // Create the listening socket, client socket, and server socket.
+  Socket listening_socket;
+  ASSERT_OK(listening_socket.Init(0));
+  ASSERT_OK(listening_socket.BindAndListen(Sockaddr(), 1));
+  Sockaddr server_addr;
+  ASSERT_OK(listening_socket.GetSocketAddress(&server_addr));
+
+  unique_ptr<Socket> client_socket(new Socket());
+  ASSERT_OK(client_socket->Init(0));
+  client_socket->Connect(server_addr);
+
+  unique_ptr<Socket> server_socket(new Socket());
+  Sockaddr client_addr;
+  CHECK_OK(listening_socket.Accept(server_socket.get(), &client_addr, 0));
+
+  // Create and configure the client and server negotiation instances.
+  ClientNegotiation client_negotiation(std::move(client_socket),
+                                       &client_tls_context,
+                                       authn_token);
+  ServerNegotiation server_negotiation(std::move(server_socket),
+                                       &server_tls_context,
+                                       &token_verifier);
+
+  // Set client and server SASL mechanisms.
+  MiniKdc kdc;
+  bool kdc_started = false;
+  auto start_kdc_once = [&] () {
+    if (!kdc_started) {
+      kdc_started = true;
+      RETURN_NOT_OK(kdc.Start());
+    }
+    return Status::OK();
+  };
+  for (auto mech : desc.client.sasl_mechs) {
+    switch (mech) {
+      case SaslMechanism::INVALID: break;
+      case SaslMechanism::PLAIN:
+        ASSERT_OK(client_negotiation.EnablePlain("client-plain", "client-password"));
+        break;
+      case SaslMechanism::GSSAPI:
+        ASSERT_OK(start_kdc_once());
+        ASSERT_OK(kdc.CreateUserPrincipal("client-gssapi"));
+        ASSERT_OK(kdc.Kinit("client-gssapi"));
+        ASSERT_OK(kdc.SetKrb5Environment());
+        client_negotiation.set_server_fqdn("127.0.0.1");
+        ASSERT_OK(client_negotiation.EnableGSSAPI());
+        break;
+    }
+  }
+  for (auto mech : desc.server.sasl_mechs) {
+    switch (mech) {
+      case SaslMechanism::INVALID: break;
+      case SaslMechanism::PLAIN:
+        ASSERT_OK(server_negotiation.EnablePlain());
+        break;
+      case SaslMechanism::GSSAPI:
+        ASSERT_OK(start_kdc_once());
+        // Create the server principal and keytab.
+        string kt_path;
+        ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
+        CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+        server_negotiation.set_server_fqdn("127.0.0.1");
+        ASSERT_OK(server_negotiation.EnableGSSAPI());
+        break;
+    }
+  }
+
+  // Run the client/server negotiation. Because negotiation is blocking, it
+  // has to be done on separate threads.
+  Status client_status;
+  Status server_status;
+  thread client_thread([&] () {
+      client_status = client_negotiation.Negotiate();
+      // Close the socket so that the server will not block forever on error.
+      client_negotiation.socket()->Close();
+  });
+  thread server_thread([&] () {
+      server_status = server_negotiation.Negotiate();
+      // Close the socket so that the client will not block forever on error.
+      server_negotiation.socket()->Close();
+  });
+  client_thread.join();
+  server_thread.join();
+
+  // Check the negotiation outcome against the expected outcome.
+  EXPECT_EQ(desc.client_status.CodeAsString(), client_status.CodeAsString());
+  EXPECT_EQ(desc.server_status.CodeAsString(), server_status.CodeAsString());
+  ASSERT_STR_MATCHES(client_status.ToString(), desc.client_status.ToString());
+  ASSERT_STR_MATCHES(server_status.ToString(), desc.server_status.ToString());
+
+  if (client_status.ok()) {
+    EXPECT_TRUE(server_status.ok());
+
+    // Make sure the negotiations agree with the expected values.
+    EXPECT_EQ(desc.negotiated_authn, client_negotiation.negotiated_authn());
+    EXPECT_EQ(desc.negotiated_mech, client_negotiation.negotiated_mechanism());
+    EXPECT_EQ(desc.negotiated_authn, server_negotiation.negotiated_authn());
+    EXPECT_EQ(desc.negotiated_mech, server_negotiation.negotiated_mechanism());
+    EXPECT_EQ(desc.tls_negotiated, server_negotiation.tls_negotiated());
+    EXPECT_EQ(desc.tls_negotiated, server_negotiation.tls_negotiated());
+
+    bool client_tls_socket = dynamic_cast<security::TlsSocket*>(client_negotiation.socket());
+    bool server_tls_socket = dynamic_cast<security::TlsSocket*>(server_negotiation.socket());
+    EXPECT_EQ(desc.rpc_encrypt_loopback, client_tls_socket);
+    EXPECT_EQ(desc.rpc_encrypt_loopback, server_tls_socket);
+
+    // Check that the expected user subject is authenticated.
+    // TODO(PKI): reenable this once subjects are handled correctly among authn types.
+    /*
+    switch (server_negotiation.negotiated_authn()) {
+      case AuthenticationType::SASL:
+        switch (server_negotiation.negotiated_mechanism()) {
+          case SaslMechanism::PLAIN:
+            EXPECT_EQ("client-plain", server_negotiation.authenticated_user());
+            break;
+          case SaslMechanism::GSSAPI:
+            EXPECT_EQ("client-gssapi", server_negotiation.authenticated_user());
+            break;
+          case SaslMechanism::INVALID: LOG(FATAL) << "invalid mechanism negotiated";
+        }
+        break;
+      case AuthenticationType::CERTIFICATE:
+          EXPECT_EQ("client-certificate", server_negotiation.authenticated_user());
+          break;
+      case AuthenticationType::TOKEN:
+          EXPECT_EQ("client-token", server_negotiation.authenticated_user());
+          break;
+      case AuthenticationType::INVALID: LOG(FATAL) << "invalid authentication negotiated";
+    }
+    */
+  }
+}
+
+INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
+                        TestNegotiation,
+                        ::testing::Values(
+
+        // client: no authn/mechs
+        // server: no authn/mechs
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            {},
+            false,
+          },
+          EndpointConfig {
+            PkiConfig::NONE,
+            {},
+            false,
+          },
+          false,
+          Status::NotAuthorized(".*client is not configured with an authentication type"),
+          Status::NetworkError(""),
+          AuthenticationType::INVALID,
+          SaslMechanism::INVALID,
+          false,
+        },
+
+        // client: PLAIN
+        // server: no authn/mechs
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::PLAIN },
+            false,
+          },
+          EndpointConfig {
+            PkiConfig::NONE,
+            {},
+            false,
+          },
+          false,
+          Status::NotAuthorized(".* server mechanism list is empty"),
+          Status::NotAuthorized(".* server mechanism list is empty"),
+          AuthenticationType::INVALID,
+          SaslMechanism::INVALID,
+          false,
+        },
+
+        // client: PLAIN
+        // server: PLAIN
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::PLAIN },
+            false,
+          },
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::PLAIN },
+            false,
+          },
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::SASL,
+          SaslMechanism::PLAIN,
+          false,
+        },
+
+        // client: GSSAPI
+        // server: GSSAPI
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::GSSAPI },
+            false,
+          },
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::GSSAPI },
+            false,
+          },
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::SASL,
+          SaslMechanism::GSSAPI,
+          false,
+        },
+
+        // client: GSSAPI, PLAIN
+        // server: GSSAPI, PLAIN
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::GSSAPI, SaslMechanism::PLAIN },
+            false,
+          },
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::GSSAPI, SaslMechanism::PLAIN },
+            false,
+          },
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::SASL,
+          SaslMechanism::GSSAPI,
+          false,
+        },
+
+        // client: GSSAPI, PLAIN
+        // server: GSSAPI
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::GSSAPI, SaslMechanism::PLAIN },
+            false,
+          },
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::GSSAPI },
+            false,
+          },
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::SASL,
+          SaslMechanism::GSSAPI,
+          false,
+        },
+
+        // client: PLAIN
+        // server: GSSAPI
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::PLAIN },
+            false,
+          },
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::GSSAPI },
+            false,
+          },
+          false,
+          Status::NotAuthorized(".*client does not have Kerberos enabled"),
+          Status::NetworkError(""),
+          AuthenticationType::INVALID,
+          SaslMechanism::INVALID,
+          false,
+        },
+
+        // client: GSSAPI,
+        // server: GSSAPI, self-signed cert
+        // loopback encryption
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::GSSAPI },
+            false,
+          },
+          EndpointConfig {
+            PkiConfig::SELF_SIGNED,
+            { SaslMechanism::GSSAPI },
+            false,
+          },
+          true,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::SASL,
+          SaslMechanism::GSSAPI,
+          true,
+        },
+
+        // client: GSSAPI, signed-cert
+        // server: GSSAPI, self-signed cert
+        // This tests that the server will not advertise CERTIFICATE authentication,
+        // since it doesn't have a trusted cert.
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::SIGNED,
+            { SaslMechanism::GSSAPI },
+            false,
+          },
+          EndpointConfig {
+            PkiConfig::SELF_SIGNED,
+            { SaslMechanism::GSSAPI },
+            false,
+          },
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::SASL,
+          SaslMechanism::GSSAPI,
+          true,
+        },
+
+        // client: PLAIN,
+        // server: PLAIN, self-signed cert
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::PLAIN },
+            false,
+          },
+          EndpointConfig {
+            PkiConfig::SELF_SIGNED,
+            { SaslMechanism::PLAIN },
+            false,
+          },
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::SASL,
+          SaslMechanism::PLAIN,
+          true,
+        },
+
+        // client: signed-cert
+        // server: signed-cert
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::SIGNED,
+            { SaslMechanism::GSSAPI },
+            false,
+          },
+          EndpointConfig {
+            PkiConfig::SIGNED,
+            { SaslMechanism::GSSAPI },
+            false,
+          },
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::CERTIFICATE,
+          SaslMechanism::INVALID,
+          true,
+        },
+
+        // client: token, trusted cert
+        // server: token, signed-cert, GSSAPI
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::TRUSTED,
+            { },
+            true,
+          },
+          EndpointConfig {
+            PkiConfig::SIGNED,
+            { SaslMechanism::PLAIN },
+            true,
+          },
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::TOKEN,
+          SaslMechanism::INVALID,
+          true,
+        },
+
+        // client: PLAIN, token
+        // server: PLAIN, token, signed cert
+        // Test that the client won't negotiate token authn if it doesn't have a
+        // trusted cert. We aren't expecting this to happen in practice (the
+        // token and trusted CA cert should come as a package).
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::PLAIN },
+            true,
+          },
+          EndpointConfig {
+            PkiConfig::SIGNED,
+            { SaslMechanism::PLAIN },
+            true,
+          },
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::SASL,
+          SaslMechanism::PLAIN,
+          true,
+        },
+
+        // client: PLAIN, GSSAPI, signed-cert, token
+        // server: PLAIN, GSSAPI, signed-cert, token
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::SIGNED,
+            { SaslMechanism::PLAIN, SaslMechanism::GSSAPI },
+            true,
+          },
+          EndpointConfig {
+            PkiConfig::SIGNED,
+            { SaslMechanism::PLAIN, SaslMechanism::GSSAPI },
+            true,
+          },
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::CERTIFICATE,
+          SaslMechanism::INVALID,
+          true,
+        }
+));
+
 // A "Callable" that takes a socket for use with starting a thread.
 // Can be used for ServerNegotiation or ClientNegotiation threads.
 typedef std::function<void(unique_ptr<Socket>)> SocketCallable;
@@ -112,33 +652,7 @@ static void RunNegotiationTest(const SocketCallable& server_runner,
 
 ////////////////////////////////////////////////////////////////////////////////
 
-static void RunPlainNegotiationServer(unique_ptr<Socket> socket) {
-  security::TlsContext tls_context;
-  CHECK_OK(tls_context.Init());
-  ServerNegotiation server_negotiation(std::move(socket), &tls_context);
-  CHECK_OK(server_negotiation.EnablePlain());
-  CHECK_OK(server_negotiation.Negotiate());
-  CHECK(ContainsKey(server_negotiation.client_features(), APPLICATION_FEATURE_FLAGS));
-  CHECK_EQ("my-username", server_negotiation.authenticated_user());
-}
-
-static void RunPlainNegotiationClient(unique_ptr<Socket> socket) {
-  security::TlsContext tls_context;
-  CHECK_OK(tls_context.Init());
-  ClientNegotiation client_negotiation(std::move(socket), &tls_context);
-  CHECK_OK(client_negotiation.EnablePlain("my-username", "ignored password"));
-  CHECK_OK(client_negotiation.Negotiate());
-  CHECK(ContainsKey(client_negotiation.server_features(), APPLICATION_FEATURE_FLAGS));
-}
-
-// Test SASL negotiation using the PLAIN mechanism over a socket.
-TEST_F(TestNegotiation, TestPlainNegotiation) {
-  RunNegotiationTest(RunPlainNegotiationServer, RunPlainNegotiationClient);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-
+#ifndef __APPLE__
 template<class T>
 using CheckerFunction = std::function<void(const Status&, T&)>;
 
@@ -146,9 +660,10 @@ using CheckerFunction = std::function<void(const Status&, T&)>;
 // 'post_check' after negotiation to verify the result.
 static void RunGSSAPINegotiationServer(unique_ptr<Socket> socket,
                                        const CheckerFunction<ServerNegotiation>& post_check) {
-  security::TlsContext tls_context;
+  TlsContext tls_context;
   CHECK_OK(tls_context.Init());
-  ServerNegotiation server_negotiation(std::move(socket), &tls_context);
+  TokenVerifier token_verifier;
+  ServerNegotiation server_negotiation(std::move(socket), &tls_context, &token_verifier);
   server_negotiation.set_server_fqdn("127.0.0.1");
   CHECK_OK(server_negotiation.EnableGSSAPI());
   post_check(server_negotiation.Negotiate(), server_negotiation);
@@ -158,118 +673,14 @@ static void RunGSSAPINegotiationServer(unique_ptr<Socket> socket,
 // 'post_check' after negotiation to verify the result.
 static void RunGSSAPINegotiationClient(unique_ptr<Socket> conn,
                                        const CheckerFunction<ClientNegotiation>& post_check) {
-  security::TlsContext tls_context;
+  TlsContext tls_context;
   CHECK_OK(tls_context.Init());
-  ClientNegotiation client_negotiation(std::move(conn), &tls_context);
+  ClientNegotiation client_negotiation(std::move(conn), &tls_context, boost::none);
   client_negotiation.set_server_fqdn("127.0.0.1");
   CHECK_OK(client_negotiation.EnableGSSAPI());
   post_check(client_negotiation.Negotiate(), client_negotiation);
 }
 
-// Test configuring a client to allow but not require Kerberos/GSSAPI,
-// and connect to a server which requires Kerberos/GSSAPI.
-//
-// They should negotiate to use Kerberos/GSSAPI.
-TEST_F(TestNegotiation, TestRestrictiveServer_NonRestrictiveClient) {
-  MiniKdc kdc;
-  ASSERT_OK(kdc.Start());
-
-  // Create the server principal and keytab.
-  string kt_path;
-  ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
-  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
-
-  // Create and kinit as a client user.
-  ASSERT_OK(kdc.CreateUserPrincipal("testuser"));
-  ASSERT_OK(kdc.Kinit("testuser"));
-  ASSERT_OK(kdc.SetKrb5Environment());
-
-  // Authentication should now succeed on both sides.
-  RunNegotiationTest(
-      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
-                [](const Status& s, ServerNegotiation& server) {
-                  CHECK_OK(s);
-                  CHECK_EQ(SaslMechanism::GSSAPI, server.negotiated_mechanism());
-                  CHECK_EQ("testuser", server.authenticated_user());
-                }),
-      [](unique_ptr<Socket> socket) {
-        security::TlsContext tls_context;
-        CHECK_OK(tls_context.Init());
-        ClientNegotiation client_negotiation(std::move(socket), &tls_context);
-        client_negotiation.set_server_fqdn("127.0.0.1");
-        // The client enables both PLAIN and GSSAPI.
-        CHECK_OK(client_negotiation.EnablePlain("foo", "bar"));
-        CHECK_OK(client_negotiation.EnableGSSAPI());
-        CHECK_OK(client_negotiation.Negotiate());
-        CHECK_EQ(SaslMechanism::GSSAPI, client_negotiation.negotiated_mechanism());
-      });
-}
-
-// Test configuring a client to only support PLAIN, and a server which
-// only supports GSSAPI. This would happen, for example, if an old Kudu
-// client tries to talk to a secure-only cluster.
-TEST_F(TestNegotiation, TestNoMatchingMechanisms) {
-  MiniKdc kdc;
-  ASSERT_OK(kdc.Start());
-
-  // Create the server principal and keytab.
-  string kt_path;
-  ASSERT_OK(kdc.CreateServiceKeytab("kudu/localhost", &kt_path));
-  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
-
-  RunNegotiationTest(
-      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
-                [](const Status& s, ServerNegotiation& server) {
-                  // The client fails to find a matching mechanism and
-                  // doesn't send any failure message to the server.
-                  // Instead, it just disconnects.
-                  //
-                  // TODO(todd): this could produce a better message!
-                  ASSERT_STR_CONTAINS(s.ToString(), "got EOF from remote");
-                }),
-      [](unique_ptr<Socket> socket) {
-        security::TlsContext tls_context;
-        CHECK_OK(tls_context.Init());
-        ClientNegotiation client_negotiation(std::move(socket), &tls_context);
-        client_negotiation.set_server_fqdn("127.0.0.1");
-        // The client enables both PLAIN and GSSAPI.
-        CHECK_OK(client_negotiation.EnablePlain("foo", "bar"));
-        Status s = client_negotiation.Negotiate();
-        ASSERT_STR_CONTAINS(s.ToString(), "client does not have Kerberos enabled");
-      });
-}
-
-// Test SASL negotiation using the GSSAPI (kerberos) mechanism over a socket.
-TEST_F(TestNegotiation, TestGSSAPINegotiation) {
-  MiniKdc kdc;
-  ASSERT_OK(kdc.Start());
-
-  // Create the server principal and keytab.
-  string kt_path;
-  ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
-  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
-
-  // Create and kinit as a client user.
-  ASSERT_OK(kdc.CreateUserPrincipal("testuser"));
-  ASSERT_OK(kdc.Kinit("testuser"));
-  ASSERT_OK(kdc.SetKrb5Environment());
-
-  // Authentication should succeed on both sides.
-  RunNegotiationTest(
-      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
-                [](const Status& s, ServerNegotiation& server) {
-                  CHECK_OK(s);
-                  CHECK_EQ(SaslMechanism::GSSAPI, server.negotiated_mechanism());
-                  CHECK_EQ("testuser", server.authenticated_user());
-                }),
-      std::bind(RunGSSAPINegotiationClient, std::placeholders::_1,
-                [](const Status& s, ClientNegotiation& client) {
-                  CHECK_OK(s);
-                  CHECK_EQ(SaslMechanism::GSSAPI, client.negotiated_mechanism());
-                }));
-}
-
-#ifndef __APPLE__
 // Test invalid SASL negotiations using the GSSAPI (kerberos) mechanism over a socket.
 // This test is ignored on macOS because the system Kerberos implementation
 // (Heimdal) caches the non-existence of client credentials, which causes futher
@@ -401,9 +812,10 @@ TEST_F(TestNegotiation, TestPreflight) {
 ////////////////////////////////////////////////////////////////////////////////
 
 static void RunTimeoutExpectingServer(unique_ptr<Socket> socket) {
-  security::TlsContext tls_context;
+  TlsContext tls_context;
   CHECK_OK(tls_context.Init());
-  ServerNegotiation server_negotiation(std::move(socket), &tls_context);
+  TokenVerifier token_verifier;
+  ServerNegotiation server_negotiation(std::move(socket), &tls_context, &token_verifier);
   CHECK_OK(server_negotiation.EnablePlain());
   Status s = server_negotiation.Negotiate();
   ASSERT_TRUE(s.IsNetworkError()) << "Expected client to time out and close the connection. Got: "
@@ -411,9 +823,9 @@ static void RunTimeoutExpectingServer(unique_ptr<Socket> socket) {
 }
 
 static void RunTimeoutNegotiationClient(unique_ptr<Socket> sock) {
-  security::TlsContext tls_context;
+  TlsContext tls_context;
   CHECK_OK(tls_context.Init());
-  ClientNegotiation client_negotiation(std::move(sock), &tls_context);
+  ClientNegotiation client_negotiation(std::move(sock), &tls_context, boost::none);
   CHECK_OK(client_negotiation.EnablePlain("test", "test"));
   MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L);
   client_negotiation.set_deadline(deadline);
@@ -430,9 +842,10 @@ TEST_F(TestNegotiation, TestClientTimeout) {
 ////////////////////////////////////////////////////////////////////////////////
 
 static void RunTimeoutNegotiationServer(unique_ptr<Socket> socket) {
-  security::TlsContext tls_context;
+  TlsContext tls_context;
   CHECK_OK(tls_context.Init());
-  ServerNegotiation server_negotiation(std::move(socket), &tls_context);
+  TokenVerifier token_verifier;
+  ServerNegotiation server_negotiation(std::move(socket), &tls_context, &token_verifier);
   CHECK_OK(server_negotiation.EnablePlain());
   MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L);
   server_negotiation.set_deadline(deadline);
@@ -442,9 +855,9 @@ static void RunTimeoutNegotiationServer(unique_ptr<Socket> socket) {
 }
 
 static void RunTimeoutExpectingClient(unique_ptr<Socket> socket) {
-  security::TlsContext tls_context;
+  TlsContext tls_context;
   CHECK_OK(tls_context.Init());
-  ClientNegotiation client_negotiation(std::move(socket), &tls_context);
+  ClientNegotiation client_negotiation(std::move(socket), &tls_context, boost::none);
   CHECK_OK(client_negotiation.EnablePlain("test", "test"));
   Status s = client_negotiation.Negotiate();
   ASSERT_TRUE(s.IsNetworkError()) << "Expected server to time out and close the connection. Got: "
@@ -560,77 +973,5 @@ TEST_F(TestDisableInit, TestMultipleSaslInit_NoMutexImpl) {
 }
 #endif
 
-////////////////////////////////////////////////////////////////////////////////
-
-// Server which has TLS and SASL GSSAPI enabled.
-static void RunTlsGssapiNegotiationServer(unique_ptr<Socket> socket) {
-  security::TlsContext tls_context;
-  ASSERT_OK(tls_context.Init());
-
-  // TODO(PKI): switch this to use an in-memory cert and key.
-  std::string server_cert_path = JoinPathSegments(GetTestDataDirectory(), "server-cert.pem");
-  std::string private_key_path = JoinPathSegments(GetTestDataDirectory(), "server-key.pem");
-  ASSERT_OK(security::CreateSSLServerCert(server_cert_path));
-  ASSERT_OK(security::CreateSSLPrivateKey(private_key_path));
-  ASSERT_OK(tls_context.LoadCertificateAndKey(server_cert_path, private_key_path));
-
-  ServerNegotiation server_negotiation(std::move(socket), &tls_context);
-  server_negotiation.set_server_fqdn("127.0.0.1");
-  ASSERT_OK(server_negotiation.EnableGSSAPI());
-
-  ASSERT_OK(server_negotiation.Negotiate());
-  ASSERT_TRUE(ContainsKey(server_negotiation.client_features(), APPLICATION_FEATURE_FLAGS));
-  ASSERT_TRUE(ContainsKey(server_negotiation.client_features(), TLS));
-
-  ASSERT_EQ(SaslMechanism::GSSAPI, server_negotiation.negotiated_mechanism());
-  ASSERT_EQ("testuser", server_negotiation.authenticated_user());
-
-  CHECK(server_negotiation.tls_negotiated());
-  bool is_tls_socket = dynamic_cast<security::TlsSocket*>(server_negotiation.socket());
-  CHECK_EQ(is_tls_socket, FLAGS_rpc_encrypt_loopback_connections);
-}
-
-static void RunTlsGssapiNegotiationClient(unique_ptr<Socket> socket) {
-  security::TlsContext tls_context;
-  ASSERT_OK(tls_context.Init());
-
-  ClientNegotiation client_negotiation(std::move(socket), &tls_context);
-  client_negotiation.set_server_fqdn("127.0.0.1");
-  CHECK_OK(client_negotiation.EnableGSSAPI());
-  CHECK_OK(client_negotiation.Negotiate());
-  CHECK(ContainsKey(client_negotiation.server_features(), APPLICATION_FEATURE_FLAGS));
-  CHECK(ContainsKey(client_negotiation.server_features(), TLS));
-
-  CHECK(client_negotiation.tls_negotiated());
-  bool is_tls_socket = dynamic_cast<security::TlsSocket*>(client_negotiation.socket());
-  CHECK_EQ(is_tls_socket, FLAGS_rpc_encrypt_loopback_connections);
-}
-
-// Test TLS and GSSAPI (kerberos) SASL negotiation.
-TEST_F(TestNegotiation, TestTlsWithGssapiNegotiation) {
-  MiniKdc kdc;
-  ASSERT_OK(kdc.Start());
-
-  // Create the server principal and keytab.
-  string kt_path;
-  ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
-  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
-
-  // Create and kinit as a client user.
-  ASSERT_OK(kdc.CreateUserPrincipal("testuser"));
-  ASSERT_OK(kdc.Kinit("testuser"));
-  ASSERT_OK(kdc.SetKrb5Environment());
-
-  // Authentication should succeed on both sides.
-  // We run with both flag values for the loopback TLS encryption flag.
-  for (bool encrypt_loopback : { false, true }) {
-    SCOPED_TRACE(encrypt_loopback);
-    FLAGS_rpc_encrypt_loopback_connections = encrypt_loopback;
-    RunNegotiationTest(RunTlsGssapiNegotiationServer, RunTlsGssapiNegotiationClient);
-  }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/rpc/negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/negotiation.cc b/src/kudu/rpc/negotiation.cc
index c31b1f8..d8cdfe3 100644
--- a/src/kudu/rpc/negotiation.cc
+++ b/src/kudu/rpc/negotiation.cc
@@ -67,6 +67,16 @@ using strings::Substitute;
 namespace kudu {
 namespace rpc {
 
+std::ostream& operator<<(std::ostream& o, AuthenticationType authentication_type) {
+  switch (authentication_type) {
+    case AuthenticationType::INVALID: o << "INVALID"; break;
+    case AuthenticationType::SASL: o << "SASL"; break;
+    case AuthenticationType::TOKEN: o << "TOKEN"; break;
+    case AuthenticationType::CERTIFICATE: o << "CERTIFICATE"; break;
+  }
+  return o;
+}
+
 // Wait for the client connection to be established and become ready for writing.
 static Status WaitForClientConnect(Socket* socket, const MonoTime& deadline) {
   TRACE("Waiting for socket to connect");
@@ -137,8 +147,10 @@ static Status DisableSocketTimeouts(Socket* socket) {
 
 // Perform client negotiation. We don't LOG() anything, we leave that to our caller.
 static Status DoClientNegotiation(Connection* conn, MonoTime deadline) {
-  const auto* tls_context = &conn->reactor_thread()->reactor()->messenger()->tls_context();
-  ClientNegotiation client_negotiation(conn->release_socket(), tls_context);
+  const auto* messenger = conn->reactor_thread()->reactor()->messenger();
+  ClientNegotiation client_negotiation(conn->release_socket(),
+                                       &messenger->tls_context(),
+                                       messenger->authn_token());
 
   // Note that the fqdn is an IP address here: we've already lost whatever DNS
   // name the client was attempting to use. Unless krb5 is configured with 'rdns
@@ -186,8 +198,10 @@ static Status DoServerNegotiation(Connection* conn, const MonoTime& deadline) {
   }
 
   // Create a new ServerNegotiation to handle the synchronous negotiation.
-  const auto* tls_context = &conn->reactor_thread()->reactor()->messenger()->tls_context();
-  ServerNegotiation server_negotiation(conn->release_socket(), tls_context);
+  const auto* messenger = conn->reactor_thread()->reactor()->messenger();
+  ServerNegotiation server_negotiation(conn->release_socket(),
+                                       &messenger->tls_context(),
+                                       &messenger->token_verifier());
 
   if (FLAGS_server_require_kerberos) {
     RETURN_NOT_OK(server_negotiation.EnableGSSAPI());

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/rpc/negotiation.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/negotiation.h b/src/kudu/rpc/negotiation.h
index 64d009b..11b108a 100644
--- a/src/kudu/rpc/negotiation.h
+++ b/src/kudu/rpc/negotiation.h
@@ -17,6 +17,8 @@
 #ifndef KUDU_RPC_NEGOTIATION_H
 #define KUDU_RPC_NEGOTIATION_H
 
+#include <ostream>
+
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/monotime.h"
 
@@ -26,6 +28,14 @@ namespace rpc {
 
 class Connection;
 
+enum class AuthenticationType {
+  INVALID,
+  SASL,
+  TOKEN,
+  CERTIFICATE,
+};
+std::ostream& operator<<(std::ostream& o, AuthenticationType authentication_type);
+
 class Negotiation {
  public:
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/rpc/rpc_header.proto
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_header.proto b/src/kudu/rpc/rpc_header.proto
index 5d426e3..b295fed 100644
--- a/src/kudu/rpc/rpc_header.proto
+++ b/src/kudu/rpc/rpc_header.proto
@@ -22,6 +22,7 @@ package kudu.rpc;
 option java_package = "org.apache.kudu.rpc";
 
 import "google/protobuf/descriptor.proto";
+import "kudu/security/token.proto";
 import "kudu/util/pb_util.proto";
 
 // The Kudu RPC protocol is similar to the RPC protocol of Hadoop and HBase.
@@ -85,6 +86,32 @@ enum RpcFeatureFlag {
   TLS_AUTHENTICATION_ONLY = 3;
 };
 
+// An authentication type. This is modeled as a oneof in case any of these
+// authentication types, or any authentication types in the future, need to add
+// extra type-specific parameters during negotiation.
+message AuthenticationTypePB {
+  message Sasl {};
+  message Token {};
+  message Certificate {};
+
+  oneof type {
+    // The server and client mutually authenticate via SASL.
+    Sasl sasl = 1;
+
+    // The server authenticates the client via a signed token, and the client
+    // authenticates the server by verifying its certificate has been signed by
+    // a trusted CA.
+    //
+    // Token authentication requires the connection to be TLS encrypted.
+    Token token = 2;
+
+    // The server and client mutually authenticate by certificate.
+    //
+    // Certificate authentication requires the connection to be TLS encrypted.
+    Certificate certificate = 3;
+  }
+}
+
 // Message type passed back & forth for the SASL negotiation.
 message NegotiatePB {
   enum NegotiateStep {
@@ -95,6 +122,7 @@ message NegotiatePB {
     SASL_CHALLENGE = 3;
     SASL_RESPONSE  = 4;
     TLS_HANDSHAKE  = 5;
+    TOKEN_EXCHANGE = 6;
   }
 
   message SaslMechanism {
@@ -137,6 +165,13 @@ message NegotiatePB {
   // During the NEGOTIATE step, contains the supported SASL mechanisms.
   // During the SASL_INITIATE step, contains the single chosen SASL mechanism.
   repeated SaslMechanism sasl_mechanisms = 4;
+
+  // During the client to server NEGOTIATE step, contains the supported authentication types.
+  // During the server to client NEGOTIATE step, contains the chosen authentication type.
+  repeated AuthenticationTypePB authn_types = 7;
+
+  // During the TOKEN_EXCHANGE step, contains the client's signed authentication token.
+  optional security.SignedTokenPB authn_token = 8;
 }
 
 message RemoteMethodPB {

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/rpc/server_negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/server_negotiation.cc b/src/kudu/rpc/server_negotiation.cc
index cf10a12..76f0214 100644
--- a/src/kudu/rpc/server_negotiation.cc
+++ b/src/kudu/rpc/server_negotiation.cc
@@ -39,6 +39,8 @@
 #include "kudu/security/tls_context.h"
 #include "kudu/security/tls_handshake.h"
 #include "kudu/security/tls_socket.h"
+#include "kudu/security/token_verifier.h"
+#include "kudu/util/logging.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
 #include "kudu/util/scoped_cleanup.h"
@@ -71,11 +73,14 @@ static int ServerNegotiationPlainAuthCb(sasl_conn_t* conn,
 }
 
 ServerNegotiation::ServerNegotiation(unique_ptr<Socket> socket,
-                                     const security::TlsContext* tls_context)
+                                     const security::TlsContext* tls_context,
+                                     const security::TokenVerifier* token_verifier)
     : socket_(std::move(socket)),
       helper_(SaslHelper::SERVER),
       tls_context_(tls_context),
       tls_negotiated_(false),
+      token_verifier_(token_verifier),
+      negotiated_authn_(AuthenticationType::INVALID),
       negotiated_mech_(SaslMechanism::INVALID),
       deadline_(MonoTime::Max()) {
   callbacks_.push_back(SaslBuildCallback(SASL_CB_GETOPT,
@@ -113,11 +118,12 @@ void ServerNegotiation::set_deadline(const MonoTime& deadline) {
 Status ServerNegotiation::Negotiate() {
   TRACE("Beginning negotiation");
 
-  // Wait until starting negotiation to check that the socket and tls_context
-  // are not null, since the socket and tls_context need not be set for
+  // Wait until starting negotiation to check that the socket, tls_context, and
+  // token_verifier are not null, since they do not need to be set for
   // PreflightCheckGSSAPI.
   DCHECK(socket_);
   DCHECK(tls_context_);
+  DCHECK(token_verifier_);
 
   // Ensure we can use blocking calls on the socket during negotiation.
   RETURN_NOT_OK(EnsureBlockingMode(socket_.get()));
@@ -139,10 +145,11 @@ Status ServerNegotiation::Negotiate() {
     RETURN_NOT_OK(tls_context_->InitiateHandshake(security::TlsHandshakeType::SERVER,
                                                   &tls_handshake_));
 
-    // The server does not verify the client's certificate.
-    // TODO(PKI): Add client-certificate authn support, which will verify the
-    // client cert.
-    tls_handshake_.set_verification_mode(security::TlsVerificationMode::VERIFY_NONE);
+    if (negotiated_authn_ != AuthenticationType::CERTIFICATE) {
+      // The server does not need to verify the client's certificate unless it's
+      // being used for authentication.
+      tls_handshake_.set_verification_mode(security::TlsVerificationMode::VERIFY_NONE);
+    }
 
     while (true) {
       NegotiatePB request;
@@ -154,25 +161,19 @@ Status ServerNegotiation::Negotiate() {
     tls_negotiated_ = true;
   }
 
-  // Step 4: SASL negotiation.
-  RETURN_NOT_OK(InitSaslServer());
-  {
-    NegotiatePB request;
-    RETURN_NOT_OK(RecvNegotiatePB(&request, &recv_buf));
-    Status s = HandleSaslInitiate(request);
-
-    while (s.IsIncomplete()) {
-      RETURN_NOT_OK(RecvNegotiatePB(&request, &recv_buf));
-      s = HandleSaslResponse(request);
-    }
-    RETURN_NOT_OK(s);
+  // Step 4: Authentication
+  switch (negotiated_authn_) {
+    case AuthenticationType::SASL:
+      RETURN_NOT_OK(AuthenticateBySasl(&recv_buf));
+      break;
+    case AuthenticationType::TOKEN:
+      RETURN_NOT_OK(AuthenticateByToken(&recv_buf));
+      break;
+    case AuthenticationType::CERTIFICATE:
+      RETURN_NOT_OK(AuthenticateByCertificate());
+      break;
+    case AuthenticationType::INVALID: LOG(FATAL) << "unreachable";
   }
-  const char* username = nullptr;
-  int rc = sasl_getprop(sasl_conn_.get(), SASL_USERNAME,
-                        reinterpret_cast<const void**>(&username));
-  // We expect that SASL_USERNAME will always get set.
-  CHECK(rc == SASL_OK && username != nullptr) << "No username on authenticated connection";
-  authenticated_user_ = username;
 
   // Step 5: Receive connection context.
   RETURN_NOT_OK(RecvConnectionContext(&recv_buf));
@@ -193,7 +194,7 @@ Status ServerNegotiation::PreflightCheckGSSAPI() {
   //
   // We aren't going to actually send/receive any messages, but
   // this makes it easier to reuse the initialization code.
-  ServerNegotiation server(nullptr, nullptr);
+  ServerNegotiation server(nullptr, nullptr, nullptr);
   Status s = server.EnableGSSAPI();
   if (!s.ok()) {
     return Status::RuntimeError(s.message());
@@ -336,28 +337,68 @@ Status ServerNegotiation::HandleNegotiate(const NegotiatePB& request) {
     }
   }
 
-  set<SaslMechanism::Type> server_mechs = helper_.EnabledMechs();
-  if (PREDICT_FALSE(server_mechs.empty())) {
-    // This will happen if no mechanisms are enabled before calling Init()
-    Status s = Status::NotAuthorized("SASL server mechanism list is empty!");
-    LOG(ERROR) << s.ToString();
-    TRACE("Sending FATAL_UNAUTHORIZED response to client");
-    RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
-    return s;
+  // Find the set of mutually supported authentication types.
+  set<AuthenticationType> authn_types;
+  if (request.authn_types().empty()) {
+    // If the client doesn't send any support authentication types, we assume
+    // support for SASL. This preserves backwards compatibility with clients who
+    // don't support security features.
+    authn_types.insert(AuthenticationType::SASL);
+  } else {
+    for (const auto& type : request.authn_types()) {
+      switch (type.type_case()) {
+        case AuthenticationTypePB::kSasl:
+          authn_types.insert(AuthenticationType::SASL);
+          break;
+        case AuthenticationTypePB::kToken:
+          authn_types.insert(AuthenticationType::TOKEN);
+          break;
+        case AuthenticationTypePB::kCertificate:
+          authn_types.insert(AuthenticationType::CERTIFICATE);
+          break;
+        case AuthenticationTypePB::TYPE_NOT_SET: {
+          Sockaddr addr;
+          RETURN_NOT_OK(socket_->GetPeerAddress(&addr));
+          KLOG_EVERY_N_SECS(WARNING, 60)
+              << "client supports unknown authentication type, consider updating server, address: "
+              << addr.ToString();
+          break;
+        }
+      }
+    }
+
+    if (authn_types.empty()) {
+      Status s = Status::NotSupported("no mutually supported authentication types");
+      RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+      return s;
+    }
   }
 
-  RETURN_NOT_OK(SendNegotiate(server_mechs));
-  return Status::OK();
-}
+  if (ContainsKey(authn_types, AuthenticationType::CERTIFICATE) &&
+      tls_context_->has_signed_cert()) {
+    // If the client supports it and we are locally configured with TLS and have
+    // a CA-signed cert, choose cert authn.
+    // TODO(PKI): consider adding the fingerprint of the CA cert which signed
+    // the client's cert to the authentication message.
+    negotiated_authn_ = AuthenticationType::CERTIFICATE;
+  } else if (ContainsKey(authn_types, AuthenticationType::TOKEN) &&
+             token_verifier_->GetMaxKnownKeySequenceNumber() >= 0 &&
+             tls_context_->has_signed_cert()) {
+    // If the client supports it, we have a TSK to verify the client's token,
+    // and we have a signed-cert so the client can verify us, choose token authn.
+    // TODO(PKI): consider adding the TSK sequence number to the authentication
+    // message.
+    negotiated_authn_ = AuthenticationType::TOKEN;
+  } else {
+    // Otherwise we always can fallback to SASL.
+    DCHECK(ContainsKey(authn_types, AuthenticationType::SASL));
+    negotiated_authn_ = AuthenticationType::SASL;
+  }
 
-Status ServerNegotiation::SendNegotiate(const set<SaslMechanism::Type>& server_mechs) {
+  // Fill in the NEGOTIATE step response for the client.
   NegotiatePB response;
   response.set_step(NegotiatePB::NEGOTIATE);
 
-  for (auto mechanism : server_mechs) {
-    response.add_sasl_mechanisms()->set_mechanism(SaslMechanism::name_of(mechanism));
-  }
-
   // Tell the client which features we support.
   server_features_ = kSupportedServerRpcFeatureFlags;
   if (tls_context_->has_cert()) {
@@ -373,8 +414,34 @@ Status ServerNegotiation::SendNegotiate(const set<SaslMechanism::Type>& server_m
     response.add_supported_features(feature);
   }
 
-  RETURN_NOT_OK(SendNegotiatePB(response));
-  return Status::OK();
+  switch (negotiated_authn_) {
+    case AuthenticationType::CERTIFICATE:
+      response.add_authn_types()->mutable_certificate();
+      break;
+    case AuthenticationType::TOKEN:
+      response.add_authn_types()->mutable_token();
+      break;
+    case AuthenticationType::SASL: {
+      response.add_authn_types()->mutable_sasl();
+      const set<SaslMechanism::Type>& server_mechs = helper_.EnabledMechs();
+      if (PREDICT_FALSE(server_mechs.empty())) {
+        // This will happen if no mechanisms are enabled before calling Init()
+        Status s = Status::NotAuthorized("SASL server mechanism list is empty!");
+        LOG(ERROR) << s.ToString();
+        TRACE("Sending FATAL_UNAUTHORIZED response to client");
+        RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+        return s;
+      }
+
+      for (auto mechanism : server_mechs) {
+        response.add_sasl_mechanisms()->set_mechanism(SaslMechanism::name_of(mechanism));
+      }
+      break;
+    }
+    case AuthenticationType::INVALID: LOG(FATAL) << "unreachable";
+  }
+
+  return SendNegotiatePB(response);
 }
 
 Status ServerNegotiation::HandleTlsHandshake(const NegotiatePB& request) {
@@ -425,6 +492,94 @@ Status ServerNegotiation::SendTlsHandshake(string tls_token) {
   return SendNegotiatePB(msg);
 }
 
+Status ServerNegotiation::AuthenticateBySasl(faststring* recv_buf) {
+  RETURN_NOT_OK(InitSaslServer());
+
+  NegotiatePB request;
+  RETURN_NOT_OK(RecvNegotiatePB(&request, recv_buf));
+  Status s = HandleSaslInitiate(request);
+
+  while (s.IsIncomplete()) {
+    RETURN_NOT_OK(RecvNegotiatePB(&request, recv_buf));
+    s = HandleSaslResponse(request);
+  }
+  RETURN_NOT_OK(s);
+
+  const char* username = nullptr;
+  int rc = sasl_getprop(sasl_conn_.get(), SASL_USERNAME,
+                        reinterpret_cast<const void**>(&username));
+  // We expect that SASL_USERNAME will always get set.
+  CHECK(rc == SASL_OK && username != nullptr) << "No username on authenticated connection";
+  authenticated_user_ = username;
+
+  return Status::OK();
+}
+
+Status ServerNegotiation::AuthenticateByToken(faststring* recv_buf) {
+  // Sanity check that TLS has been negotiated. Receiving the token on an
+  // unencrypted channel is a big no-no.
+  CHECK(tls_negotiated_);
+
+  // Receive the token from the client.
+  NegotiatePB pb;
+  RETURN_NOT_OK(RecvNegotiatePB(&pb, recv_buf));
+
+  if (pb.step() != NegotiatePB::TOKEN_EXCHANGE) {
+    Status s =  Status::NotAuthorized("expected TOKEN_EXCHANGE step",
+                                      NegotiatePB::NegotiateStep_Name(pb.step()));
+  }
+  if (!pb.has_authn_token()) {
+    Status s = Status::NotAuthorized("TOKEN_EXCHANGE message must include an authentication token");
+  }
+
+  // TODO(PKI): propagate the specific token verification failure back to the client,
+  // so it knows how to intelligently retry.
+  security::TokenPB token;
+  switch (token_verifier_->VerifyTokenSignature(pb.authn_token(), &token)) {
+    case security::VerificationResult::VALID: break;
+    case security::VerificationResult::INVALID_TOKEN:
+      return Status::NotAuthorized("invalid authentication token");
+    case security::VerificationResult::INVALID_SIGNATURE:
+      return Status::NotAuthorized("invalid authentication token signature");
+    case security::VerificationResult::EXPIRED_TOKEN:
+      return Status::NotAuthorized("authentication token expired");
+    case security::VerificationResult::EXPIRED_SIGNING_KEY:
+      return Status::NotAuthorized("authentication token signing key expired");
+    case security::VerificationResult::UNKNOWN_SIGNING_KEY:
+      return Status::NotAuthorized("authentication token signed with unknown key");
+    case security::VerificationResult::INCOMPATIBLE_FEATURE:
+      return Status::NotAuthorized("authentication token uses incompatible feature");
+  }
+
+  if (!token.has_authn()) {
+    return Status::NotAuthorized("non-authentication token presented for authentication");
+  }
+  if (!token.authn().has_username()) {
+    // This is a runtime error because there should be no way a client could
+    // get a signed authn token without a subject.
+    return Status::RuntimeError("authentication token has no username");
+  }
+  authenticated_user_ = token.authn().username();
+
+  // Respond with success message.
+  pb.Clear();
+  pb.set_step(NegotiatePB::TOKEN_EXCHANGE);
+  return SendNegotiatePB(pb);
+}
+
+Status ServerNegotiation::AuthenticateByCertificate() {
+  // Sanity check that TLS has been negotiated. Cert-based authentication is
+  // only possible with TLS.
+  CHECK(tls_negotiated_);
+
+  // Grab the subject from the client's cert.
+  security::Cert cert;
+  RETURN_NOT_OK(tls_handshake_.GetRemoteCert(&cert));
+  authenticated_user_ = cert.SubjectName();
+
+  return Status::OK();
+}
+
 Status ServerNegotiation::HandleSaslInitiate(const NegotiatePB& request) {
   if (PREDICT_FALSE(request.step() != NegotiatePB::SASL_INITIATE)) {
     Status s =  Status::NotAuthorized("expected SASL_INITIATE step",

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/rpc/server_negotiation.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/server_negotiation.h b/src/kudu/rpc/server_negotiation.h
index ee5d395..f615d3b 100644
--- a/src/kudu/rpc/server_negotiation.h
+++ b/src/kudu/rpc/server_negotiation.h
@@ -25,6 +25,7 @@
 #include <sasl/sasl.h>
 
 #include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/rpc/negotiation.h"
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/sasl_common.h"
 #include "kudu/rpc/sasl_helper.h"
@@ -39,6 +40,7 @@ class Slice;
 
 namespace security {
 class TlsContext;
+class TokenVerifier;
 }
 
 namespace rpc {
@@ -54,7 +56,8 @@ class ServerNegotiation {
   //
   // The provided TlsContext must outlive this negotiation instance.
   explicit ServerNegotiation(std::unique_ptr<Socket> socket,
-                             const security::TlsContext* tls_context);
+                             const security::TlsContext* tls_context,
+                             const security::TokenVerifier* token_verifier);
 
   // Enable PLAIN authentication.
   // Despite PLAIN authentication taking a username and password, we disregard
@@ -70,6 +73,13 @@ class ServerNegotiation {
   // Must be called after Negotiate().
   SaslMechanism::Type negotiated_mechanism() const;
 
+  // Returns the negotiated authentication type for the connection.
+  // Must be called after Negotiate().
+  AuthenticationType negotiated_authn() const {
+    DCHECK_NE(negotiated_authn_, AuthenticationType::INVALID);
+    return negotiated_authn_;
+  }
+
   // Returns true if TLS was negotiated.
   // Must be called after Negotiate().
   bool tls_negotiated() const {
@@ -146,18 +156,32 @@ class ServerNegotiation {
   // Initialize the SASL server negotiation instance.
   Status InitSaslServer() WARN_UNUSED_RESULT;
 
-  // Handle case when client sends NEGOTIATE request.
+  // Handle case when client sends NEGOTIATE request. Builds the set of
+  // client-supported RPC features, determines a mutually supported
+  // authentication type to use for the connection, and sends a NEGOTIATE
+  // response.
   Status HandleNegotiate(const NegotiatePB& request) WARN_UNUSED_RESULT;
 
-  // Send a NEGOTIATE response to the client with the list of available mechanisms.
-  Status SendNegotiate(const std::set<SaslMechanism::Type>& server_mechs) WARN_UNUSED_RESULT;
-
   // Handle a TLS_HANDSHAKE request message from the server.
   Status HandleTlsHandshake(const NegotiatePB& request) WARN_UNUSED_RESULT;
 
   // Send a TLS_HANDSHAKE response message to the server with the provided token.
   Status SendTlsHandshake(std::string tls_token) WARN_UNUSED_RESULT;
 
+  // Authenticate the client using SASL. Populates the 'authenticated_user_'
+  // field with the SASL principal.
+  // 'recv_buf' allows a receive buffer to be reused.
+  Status AuthenticateBySasl(faststring* recv_buf) WARN_UNUSED_RESULT;
+
+  // Authenticate the client using a token. Populates the
+  // 'authenticated_user_' field with the token's principal.
+  // 'recv_buf' allows a receive buffer to be reused.
+  Status AuthenticateByToken(faststring* recv_buf) WARN_UNUSED_RESULT;
+
+  // Authenticate the client using the client's TLS certificate. Populates the
+  // 'authenticated_user_' field with the certificate's subject.
+  Status AuthenticateByCertificate() WARN_UNUSED_RESULT;
+
   // Handle case when client sends SASL_INITIATE request.
   // Returns Status::OK if the SASL negotiation is complete, or
   // Status::Incomplete if a SASL_RESPONSE step is expected.
@@ -191,6 +215,9 @@ class ServerNegotiation {
   security::TlsHandshake tls_handshake_;
   bool tls_negotiated_;
 
+  // TSK state.
+  const security::TokenVerifier* token_verifier_;
+
   // The set of features supported by the client and server. Filled in during negotiation.
   std::set<RpcFeatureFlag> client_features_;
   std::set<RpcFeatureFlag> server_features_;
@@ -199,7 +226,11 @@ class ServerNegotiation {
   // negotiation.
   std::string authenticated_user_;
 
-  // The SASL mechanism. Filled in during negotiation.
+  // The authentication type. Filled in during negotiation.
+  AuthenticationType negotiated_authn_;
+
+  // The SASL mechanism. Filled in during negotiation if the negotiated
+  // authentication type is SASL.
   SaslMechanism::Type negotiated_mech_;
 
   // Negotiation timeout deadline.

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/security/security-test-util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/security-test-util.cc b/src/kudu/security/security-test-util.cc
index c01f562..3bb7bfc 100644
--- a/src/kudu/security/security-test-util.cc
+++ b/src/kudu/security/security-test-util.cc
@@ -22,6 +22,7 @@
 #include "kudu/security/ca/cert_management.h"
 #include "kudu/security/cert.h"
 #include "kudu/security/crypto.h"
+#include "kudu/security/tls_context.h"
 
 namespace kudu {
 namespace security {
@@ -39,6 +40,39 @@ Status GenerateSelfSignedCAForTests(PrivateKey* ca_key, Cert* ca_cert) {
   return Status::OK();
 }
 
+std::ostream& operator<<(std::ostream& o, PkiConfig c) {
+    switch (c) {
+      case PkiConfig::NONE: o << "NONE"; break;
+      case PkiConfig::SELF_SIGNED: o << "SELF_SIGNED"; break;
+      case PkiConfig::TRUSTED: o << "TRUSTED"; break;
+      case PkiConfig::SIGNED: o << "SIGNED"; break;
+    }
+    return o;
+}
+
+Status ConfigureTlsContext(PkiConfig config,
+                           const Cert& ca_cert,
+                           const PrivateKey& ca_key,
+                           TlsContext* tls_context) {
+  switch (config) {
+    case PkiConfig::NONE: break;
+    case PkiConfig::SELF_SIGNED:
+      RETURN_NOT_OK(tls_context->GenerateSelfSignedCertAndKey("test-uuid"));
+      break;
+    case PkiConfig::TRUSTED:
+      RETURN_NOT_OK(tls_context->AddTrustedCertificate(ca_cert));
+      break;
+    case PkiConfig::SIGNED: {
+      RETURN_NOT_OK(tls_context->AddTrustedCertificate(ca_cert));
+      RETURN_NOT_OK(tls_context->GenerateSelfSignedCertAndKey("test-uuid"));
+      Cert cert;
+      RETURN_NOT_OK(CertSigner(&ca_cert, &ca_key).Sign(*tls_context->GetCsrIfNecessary(), &cert));
+      RETURN_NOT_OK(tls_context->AdoptSignedCert(cert));
+      break;
+    };
+  }
+  return Status::OK();
+}
 
 } // namespace security
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/security/security-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/security/security-test-util.h b/src/kudu/security/security-test-util.h
index df4730f..e4851f8 100644
--- a/src/kudu/security/security-test-util.h
+++ b/src/kudu/security/security-test-util.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <ostream>
 #include <string>
 
 #include "kudu/util/env.h"
@@ -27,6 +28,8 @@ namespace security {
 
 class Cert;
 class PrivateKey;
+class PublicKey;
+class TlsContext;
 
 // TODO(todd): consolidate these certs with those in
 // security/test/test_certs.h once we support configuring a password
@@ -101,5 +104,25 @@ dc+JVPKL8Fe4a8fmsI6ndcZQ9qpOdZM5WOD0ldKRc+SsrYKkTmOOJQ==
 
 Status GenerateSelfSignedCAForTests(PrivateKey* ca_key, Cert* ca_cert);
 
+// Describes the options for configuring a TlsContext.
+enum class PkiConfig {
+  // The TLS context has no TLS cert and no trusted certs.
+  NONE,
+  // The TLS context has a self-signed TLS cert and no trusted certs.
+  SELF_SIGNED,
+  // The TLS context has no TLS cert and a trusted cert.
+  TRUSTED,
+  // The TLS context has a signed TLS cert and trusts the corresonding signing cert.
+  SIGNED,
+};
+
+// PkiConfig pretty-printer.
+std::ostream& operator<<(std::ostream& o, PkiConfig c);
+
+Status ConfigureTlsContext(PkiConfig config,
+                           const Cert& ca_cert,
+                           const PrivateKey& ca_key,
+                           TlsContext* tls_context);
+
 } // namespace security
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bac5327/src/kudu/security/tls_context.h
----------------------------------------------------------------------
diff --git a/src/kudu/security/tls_context.h b/src/kudu/security/tls_context.h
index e1bf1a9..3e904bc 100644
--- a/src/kudu/security/tls_context.h
+++ b/src/kudu/security/tls_context.h
@@ -79,12 +79,19 @@ class TlsContext {
   }
 
   // Returns true if this TlsContext has been configured with a CA-signed TLS
-  // cert and key for use with TLS-encrypted connections.
+  // cert and key for use with TLS-encrypted connections. If this method returns
+  // true, then 'has_trusted_cert' will also return true.
   bool has_signed_cert() const {
     MutexLock lock(lock_);
     return has_cert_ && !csr_;
   }
 
+  // Returns true if this TlsContext has at least one certificate in its trust store.
+  bool has_trusted_cert() const {
+    MutexLock lock(lock_);
+    return trusted_cert_count_ > 0;
+  }
+
   // Adds 'cert' as a trusted root CA certificate.
   //
   // This determines whether other peers are trusted. It also must be called for