You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/02/22 02:00:46 UTC

[1/6] kudu git commit: [security] tailored TokenSigner for system catalog

Repository: kudu
Updated Branches:
  refs/heads/master 91a35784e -> 2f95904fc


[security] tailored TokenSigner for system catalog

Updated the TokenSigner class in preparation for loading/storing TSKs
(Token Signing Keys) in system catalog.

The expected use-case for the TokenSigner is calling ImportKey() on
elected-as-leader callback with entries loaded from the system catalog
table, and then calling CheckNeedKey()/AddKey(), TryRotateKey() sequence.
Further down the road, it's necessary to call the CheckNeedKey()/AddKey(),
TryRotateKey() sequence periodically to generate and activate new TSKs.

Change-Id: Ie2417e2ccba6a1114db366b2f642f95362bf479c
Reviewed-on: http://gerrit.cloudera.org:8080/5930
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/48cc975f
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/48cc975f
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/48cc975f

Branch: refs/heads/master
Commit: 48cc975f5ff05da565cde82961cfdd5271b1bcf9
Parents: 91a3578
Author: Alexey Serbin <as...@cloudera.com>
Authored: Tue Feb 7 11:49:24 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Feb 21 22:56:09 2017 +0000

----------------------------------------------------------------------
 src/kudu/master/CMakeLists.txt         |   1 -
 src/kudu/master/authn_token_manager.cc |  98 ---------
 src/kudu/master/authn_token_manager.h  |  50 -----
 src/kudu/master/master-test.cc         |   3 +-
 src/kudu/master/master.cc              |  33 ++-
 src/kudu/master/master.h               |  10 +-
 src/kudu/master/master_service.cc      |   4 +-
 src/kudu/rpc/negotiation-test.cc       |  14 +-
 src/kudu/security/token-test.cc        | 319 +++++++++++++++++++++++-----
 src/kudu/security/token.proto          |  19 +-
 src/kudu/security/token_signer.cc      | 249 ++++++++++++++++++----
 src/kudu/security/token_signer.h       | 245 ++++++++++++++++-----
 src/kudu/security/token_signing_key.cc |  23 +-
 src/kudu/security/token_signing_key.h  |  15 +-
 src/kudu/security/token_verifier.cc    |  18 +-
 src/kudu/security/token_verifier.h     |  24 ++-
 src/kudu/tserver/heartbeater.cc        |   2 +-
 17 files changed, 797 insertions(+), 330 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/48cc975f/src/kudu/master/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/master/CMakeLists.txt b/src/kudu/master/CMakeLists.txt
index 54ee218..e504d8a 100644
--- a/src/kudu/master/CMakeLists.txt
+++ b/src/kudu/master/CMakeLists.txt
@@ -33,7 +33,6 @@ ADD_EXPORTABLE_LIBRARY(master_proto
   NONLINK_DEPS ${MASTER_KRPC_TGTS})
 
 set(MASTER_SRCS
-  authn_token_manager.cc
   catalog_manager.cc
   master.cc
   master_cert_authority.cc

http://git-wip-us.apache.org/repos/asf/kudu/blob/48cc975f/src/kudu/master/authn_token_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/authn_token_manager.cc b/src/kudu/master/authn_token_manager.cc
deleted file mode 100644
index b9b8306..0000000
--- a/src/kudu/master/authn_token_manager.cc
+++ /dev/null
@@ -1,98 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/master/authn_token_manager.h"
-
-#include <memory>
-#include <string>
-
-#include <gflags/gflags.h>
-
-#include "kudu/gutil/walltime.h"
-#include "kudu/security/token.pb.h"
-#include "kudu/security/token_signer.h"
-#include "kudu/util/flag_tags.h"
-#include "kudu/util/status.h"
-
-DEFINE_int64(authn_token_validity_seconds, 120,
-             "Period of time for which an issued authentication token is valid.");
-// TODO(PKI): docs for what actual effect this has, given we don't support
-// token renewal.
-// TODO(PKI): this is set extremely low, so that we don't forget to come back to
-// this and add rolling and refetching code.
-TAG_FLAG(authn_token_validity_seconds, experimental);
-
-
-using kudu::security::AuthnTokenPB;
-using kudu::security::SignedTokenPB;
-using kudu::security::TokenPB;
-using kudu::security::TokenSigner;
-using std::unique_ptr;
-using std::string;
-
-namespace kudu {
-namespace master {
-
-AuthnTokenManager::AuthnTokenManager() {
-}
-
-AuthnTokenManager::~AuthnTokenManager() {
-}
-
-Status AuthnTokenManager::Init(int64_t next_tsk_seq_num) {
-  CHECK(!signer_);
-  unique_ptr<TokenSigner> signer(new TokenSigner(next_tsk_seq_num));
-
-  // Roll twice at startup. See TokenSigner class documentation for reasoning.
-  RETURN_NOT_OK(signer->RotateSigningKey());
-  RETURN_NOT_OK(signer->RotateSigningKey());
-
-  // TODO(PKI): need to persist the public keys every time we roll. There's
-  // a bit of subtlety here: we shouldn't start exporting a key until it is
-  // properly persisted. Perhaps need some refactor, so we can do:
-  // 1) generate a new TSK
-  // 2) try to write the public portion to system table (keep in mind we could lose
-  //    leadership here)
-  // 3) pass it back to the TokenSigner as successful?
-
-  // TODO(PKI): manage a thread which periodically rolls the TSK. Otherwise
-  // we'll die after some number of days (whatever the validity is).
-
-  signer_ = std::move(signer);
-  return Status::OK();
-}
-
-Status AuthnTokenManager::GenerateToken(string username,
-                                        SignedTokenPB* signed_token) {
-  TokenPB token;
-
-  token.set_expire_unix_epoch_seconds(
-      WallTime_Now() + FLAGS_authn_token_validity_seconds);
-  AuthnTokenPB* authn = token.mutable_authn();
-  authn->mutable_username()->assign(std::move(username));
-  SignedTokenPB ret;
-
-  if (!token.SerializeToString(ret.mutable_token_data())) {
-    return Status::RuntimeError("could not serialize authn token");
-  }
-  RETURN_NOT_OK_PREPEND(signer_->SignToken(&ret), "could not sign authn token");
-  signed_token->Swap(&ret);
-  return Status::OK();
-}
-
-} // namespace master
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/48cc975f/src/kudu/master/authn_token_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/authn_token_manager.h b/src/kudu/master/authn_token_manager.h
deleted file mode 100644
index 701ee91..0000000
--- a/src/kudu/master/authn_token_manager.h
+++ /dev/null
@@ -1,50 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#pragma once
-
-#include <memory>
-#include <string>
-
-#include "kudu/gutil/macros.h"
-
-namespace kudu {
-class Status;
-
-namespace security {
-class SignedTokenPB;
-class TokenSigner;
-} // namespace security
-
-namespace master {
-
-class AuthnTokenManager {
- public:
-  AuthnTokenManager();
-  ~AuthnTokenManager();
-
-  Status Init(int64_t next_tsk_seq_num);
-
-  Status GenerateToken(std::string username,
-                       security::SignedTokenPB* signed_token);
-
- private:
-  std::unique_ptr<security::TokenSigner> signer_;
-  DISALLOW_COPY_AND_ASSIGN(AuthnTokenManager);
-};
-
-} // namespace master
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/48cc975f/src/kudu/master/master-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index 7c6a046..020c4d4 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -1325,7 +1325,8 @@ TEST_F(MasterTest, TestConnectToMaster) {
   EXPECT_GT(resp.ca_cert_der(0).size(), 100) << "CA cert should be at least 100 bytes";
   ASSERT_TRUE(resp.has_authn_token()) << "should return an authn token";
   EXPECT_EQ(256, resp.authn_token().signature().size());
-  EXPECT_EQ(1, resp.authn_token().signing_key_seq_num());
+  ASSERT_TRUE(resp.authn_token().has_signing_key_seq_num());
+  EXPECT_GT(resp.authn_token().signing_key_seq_num(), -1);
 
   security::TokenPB token;
   ASSERT_TRUE(token.ParseFromString(resp.authn_token().token_data()));

http://git-wip-us.apache.org/repos/asf/kudu/blob/48cc975f/src/kudu/master/master.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index 341531b..dfe28a4 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -18,16 +18,16 @@
 #include "kudu/master/master.h"
 
 #include <algorithm>
-#include <boost/bind.hpp>
-#include <glog/logging.h>
 #include <list>
 #include <memory>
 #include <vector>
 
+#include <boost/bind.hpp>
+#include <glog/logging.h>
+
 #include "kudu/cfile/block_cache.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/master/authn_token_manager.h"
 #include "kudu/master/catalog_manager.h"
 #include "kudu/master/master_cert_authority.h"
 #include "kudu/master/master_service.h"
@@ -37,6 +37,8 @@
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/service_if.h"
 #include "kudu/rpc/service_pool.h"
+#include "kudu/security/token_signer.h"
+#include "kudu/security/token_signing_key.h"
 #include "kudu/server/rpc_server.h"
 #include "kudu/tserver/tablet_copy_service.h"
 #include "kudu/tserver/tablet_service.h"
@@ -52,12 +54,26 @@ DEFINE_int32(master_registration_rpc_timeout_ms, 1500,
              "Timeout for retrieving master registration over RPC.");
 TAG_FLAG(master_registration_rpc_timeout_ms, experimental);
 
+DEFINE_int64(tsk_validity_seconds, 60 * 60 * 24 * 7,
+             "Number of seconds that a TSK (Token Signing Key) is valid for.");
+TAG_FLAG(tsk_validity_seconds, advanced);
+TAG_FLAG(tsk_validity_seconds, experimental);
+
+DEFINE_int64(tsk_rotation_seconds, 60 * 60 * 24 * 1,
+             "Number of seconds between consecutive activations of newly "
+             "generated TSKs (Token Signing Keys).");
+TAG_FLAG(tsk_rotation_seconds, advanced);
+TAG_FLAG(tsk_rotation_seconds, experimental);
+
 using std::min;
 using std::shared_ptr;
+using std::unique_ptr;
 using std::vector;
 
 using kudu::consensus::RaftPeerPB;
 using kudu::rpc::ServiceIf;
+using kudu::security::TokenSigner;
+using kudu::security::TokenSigningPrivateKey;
 using kudu::tserver::ConsensusServiceImpl;
 using kudu::tserver::TabletCopyServiceImpl;
 using strings::Substitute;
@@ -103,11 +119,18 @@ Status Master::Init() {
   // becomes a leader.
   cert_authority_.reset(new MasterCertAuthority(fs_manager_->uuid()));
 
+  token_signer_.reset(new TokenSigner(FLAGS_tsk_validity_seconds,
+                                      FLAGS_tsk_rotation_seconds));
   // TODO(PKI): this also will need to be wired together with CatalogManager
   // soon, including initializing the token manager with the proper next
   // sequence number.
-  authn_token_manager_.reset(new AuthnTokenManager());
-  RETURN_NOT_OK(authn_token_manager_->Init(1));
+  {
+    unique_ptr<TokenSigningPrivateKey> key;
+    RETURN_NOT_OK(token_signer_->CheckNeedKey(&key));
+    if (key) {
+      RETURN_NOT_OK(token_signer_->AddKey(std::move(key)));
+    }
+  }
 
   state_ = kInitialized;
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/kudu/blob/48cc975f/src/kudu/master/master.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.h b/src/kudu/master/master.h
index 33db987..1976d77 100644
--- a/src/kudu/master/master.h
+++ b/src/kudu/master/master.h
@@ -42,7 +42,11 @@ class ThreadPool;
 namespace rpc {
 class Messenger;
 class ServicePool;
-}
+} // namespace rpc
+
+namespace security {
+class TokenSigner;
+} // namespace security
 
 namespace master {
 
@@ -78,7 +82,7 @@ class Master : public server::ServerBase {
 
   MasterCertAuthority* cert_authority() { return cert_authority_.get(); }
 
-  AuthnTokenManager* authn_token_manager() { return authn_token_manager_.get(); }
+  security::TokenSigner* token_signer() { return token_signer_.get(); }
 
   TSManager* ts_manager() { return ts_manager_.get(); }
 
@@ -125,7 +129,7 @@ class Master : public server::ServerBase {
   MasterState state_;
 
   std::unique_ptr<MasterCertAuthority> cert_authority_;
-  std::unique_ptr<AuthnTokenManager> authn_token_manager_;
+  std::unique_ptr<security::TokenSigner> token_signer_;
   gscoped_ptr<TSManager> ts_manager_;
   gscoped_ptr<CatalogManager> catalog_manager_;
   gscoped_ptr<MasterPathHandlers> path_handlers_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/48cc975f/src/kudu/master/master_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index 53b2503..2e28f07 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -23,7 +23,6 @@
 #include <vector>
 
 #include "kudu/common/wire_protocol.h"
-#include "kudu/master/authn_token_manager.h"
 #include "kudu/master/catalog_manager.h"
 #include "kudu/master/master.h"
 #include "kudu/master/master_cert_authority.h"
@@ -32,6 +31,7 @@
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/rpc/user_credentials.h"
 #include "kudu/server/webserver.h"
+#include "kudu/security/token_signer.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/pb_util.h"
 
@@ -378,7 +378,7 @@ void MasterServiceImpl::ConnectToMaster(const ConnectToMasterRequestPB* /*req*/,
     // essentially allowing unlimited renewal, which is probably not what
     // we want.
     SignedTokenPB authn_token;
-    Status s = server_->authn_token_manager()->GenerateToken(
+    Status s = server_->token_signer()->GenerateAuthnToken(
         rpc->user_credentials().real_user(),
         &authn_token);
     if (!s.ok()) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/48cc975f/src/kudu/rpc/negotiation-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/negotiation-test.cc b/src/kudu/rpc/negotiation-test.cc
index 3c36e4e..fac759f 100644
--- a/src/kudu/rpc/negotiation-test.cc
+++ b/src/kudu/rpc/negotiation-test.cc
@@ -45,6 +45,7 @@
 #include "kudu/security/tls_context.h"
 #include "kudu/security/tls_socket.h"
 #include "kudu/security/token_signer.h"
+#include "kudu/security/token_signing_key.h"
 #include "kudu/security/token_verifier.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/sockaddr.h"
@@ -78,6 +79,7 @@ using kudu::security::PrivateKey;
 using kudu::security::SignedTokenPB;
 using kudu::security::TlsContext;
 using kudu::security::TokenSigner;
+using kudu::security::TokenSigningPrivateKey;
 using kudu::security::TokenVerifier;
 
 namespace kudu {
@@ -160,9 +162,15 @@ TEST_P(TestNegotiation, TestNegotiation) {
   FLAGS_rpc_encrypt_loopback_connections = desc.rpc_encrypt_loopback;
 
   // Generate an optional client token and server token verifier.
-  TokenSigner token_signer(1);
+  TokenSigner token_signer(60, 20);
+  {
+    unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(token_signer.CheckNeedKey(&key));
+    // No keys are available yet, so should be able to add.
+    ASSERT_NE(nullptr, key.get());
+    ASSERT_OK(token_signer.AddKey(std::move(key)));
+  }
   TokenVerifier token_verifier;
-  ASSERT_OK(token_signer.RotateSigningKey());
   boost::optional<SignedTokenPB> authn_token;
   if (desc.client.token) {
     authn_token = SignedTokenPB();
@@ -173,7 +181,7 @@ TEST_P(TestNegotiation, TestNegotiation) {
     ASSERT_OK(token_signer.SignToken(&*authn_token));
   }
   if (desc.server.token) {
-    ASSERT_OK(token_verifier.ImportPublicKeys(token_signer.GetTokenSigningPublicKeys(0)));
+    ASSERT_OK(token_verifier.ImportKeys(token_signer.verifier().ExportKeys()));
   }
 
   // Create the listening socket, client socket, and server socket.

http://git-wip-us.apache.org/repos/asf/kudu/blob/48cc975f/src/kudu/security/token-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/token-test.cc b/src/kudu/security/token-test.cc
index ddc6a5c..fc9937b 100644
--- a/src/kudu/security/token-test.cc
+++ b/src/kudu/security/token-test.cc
@@ -15,16 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "kudu/util/test_util.h"
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
 
 #include "kudu/gutil/walltime.h"
+#include "kudu/security/crypto.h"
 #include "kudu/security/token.pb.h"
 #include "kudu/security/token_signer.h"
+#include "kudu/security/token_signing_key.h"
 #include "kudu/security/token_verifier.h"
+#include "kudu/util/test_util.h"
 
-DECLARE_int32(token_signing_key_num_rsa_bits);
-DECLARE_int64(token_signing_key_validity_seconds);
+DECLARE_int32(tsk_num_rsa_bits);
 
+using std::unique_ptr;
 
 namespace kudu {
 namespace security {
@@ -48,87 +55,229 @@ SignedTokenPB MakeIncompatibleToken() {
   return ret;
 }
 
+// Generate public key as a string in DER format for tests.
+Status GeneratePublicKeyStrDer(string* ret) {
+  PrivateKey private_key;
+  RETURN_NOT_OK(GeneratePrivateKey(512, &private_key));
+  PublicKey public_key;
+  RETURN_NOT_OK(private_key.GetPublicKey(&public_key));
+  string public_key_str_der;
+  RETURN_NOT_OK(public_key.ToString(&public_key_str_der, DataFormat::DER));
+  *ret = public_key_str_der;
+  return Status::OK();
+}
+
+// Generate token signing key with the specified parameters.
+Status GenerateTokenSigningKey(int64_t seq_num,
+                               int64_t expire_time_seconds,
+                               unique_ptr<TokenSigningPrivateKey>* tsk) {
+  {
+    unique_ptr<PrivateKey> private_key(new PrivateKey);
+    RETURN_NOT_OK(GeneratePrivateKey(512, private_key.get()));
+    tsk->reset(new TokenSigningPrivateKey(
+        seq_num, expire_time_seconds, std::move(private_key)));
+  }
+  return Status::OK();
+}
+
 } // anonymous namespace
 
 class TokenTest : public KuduTest {
   void SetUp() override {
     KuduTest::SetUp();
     // Set the keylength smaller to make tests run faster.
-    FLAGS_token_signing_key_num_rsa_bits = 512;
+    FLAGS_tsk_num_rsa_bits = 512;
   }
 };
 
-TEST_F(TokenTest, TestSigner) {
+TEST_F(TokenTest, TestInit) {
+  TokenSigner signer(60, 20);
+  const TokenVerifier& verifier(signer.verifier());
+
   SignedTokenPB token = MakeUnsignedToken(WallTime_Now());
+  Status s = signer.SignToken(&token);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+
+  static const int64_t kKeySeqNum = 100;
+  PrivateKey private_key;
+  ASSERT_OK(GeneratePrivateKey(512, &private_key));
+  string private_key_str_der;
+  ASSERT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER));
+  TokenSigningPrivateKeyPB pb;
+  pb.set_rsa_key_der(private_key_str_der);
+  pb.set_key_seq_num(kKeySeqNum);
+  pb.set_expire_unix_epoch_seconds(WallTime_Now() + 120);
+
+  ASSERT_OK(signer.ImportKeys({pb}));
+  vector<TokenSigningPublicKeyPB> public_keys(verifier.ExportKeys());
+  ASSERT_EQ(1, public_keys.size());
+  ASSERT_EQ(kKeySeqNum, public_keys[0].key_seq_num());
+
+  // It should be possible to sign tokens once the signer is initialized.
+  ASSERT_OK(signer.SignToken(&token));
+  ASSERT_TRUE(token.has_signature());
+}
 
-  const int kStartingSeqNum = 123;
-  TokenSigner signer(kStartingSeqNum);
+TEST_F(TokenTest, TestTokenSignerAddKeys) {
+  {
+    TokenSigner signer(60, 20);
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    // No keys are available yet, so should be able to add.
+    ASSERT_NE(nullptr, key.get());
+    ASSERT_OK(signer.AddKey(std::move(key)));
+
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    // It's not time to add next key yet.
+    ASSERT_EQ(nullptr, key.get());
+  }
+
+  {
+    // Special configuration for TokenSigner: rotation interval is zero,
+    // so should be able to add two keys right away.
+    TokenSigner signer(60, 0);
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    // No keys are available yet, so should be able to add.
+    ASSERT_NE(nullptr, key.get());
+    ASSERT_OK(signer.AddKey(std::move(key)));
+
+    // Should be able to add next key right away.
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_NE(nullptr, key.get());
+    ASSERT_OK(signer.AddKey(std::move(key)));
+
+    // Active key and next key are already in place: no need for a new key.
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_EQ(nullptr, key.get());
+  }
+
+  {
+    // Special configuration for TokenSigner: key rotation interval
+    // just one second shorter than the validity interval. It should not
+    // need next key right away, but should need next key after 1 second.
+    TokenSigner signer(60, 1);
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    // No keys are available yet, so should be able to add.
+    ASSERT_NE(nullptr, key.get());
+    ASSERT_OK(signer.AddKey(std::move(key)));
+
+    // Should not need next key right away.
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_EQ(nullptr, key.get());
+
+    SleepFor(MonoDelta::FromMilliseconds(1001));
+
+    // Should need next key after 1 second.
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_NE(nullptr, key.get());
+    ASSERT_OK(signer.AddKey(std::move(key)));
+
+    // Active key and next key are already in place: no need for a new key.
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_EQ(nullptr, key.get());
+  }
+}
+
+// Test how test rotation works.
+TEST_F(TokenTest, TestTokenSignerSignVerifyExport) {
+  // Key rotation interval 0 allows adding 2 keys in a row with no delay.
+  TokenSigner signer(60, 0);
+  const TokenVerifier& verifier(signer.verifier());
 
   // Should start off with no signing keys.
-  ASSERT_TRUE(signer.GetTokenSigningPublicKeys(0).empty());
+  ASSERT_TRUE(verifier.ExportKeys().empty());
 
-  // Trying to sign a token when there is no TSK should give
-  // an error.
+  // Trying to sign a token when there is no TSK should give an error.
+  SignedTokenPB token = MakeUnsignedToken(WallTime_Now());
   Status s = signer.SignToken(&token);
-  ASSERT_TRUE(s.IsIllegalState());
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 
-  // Rotate the TSK once.
-  ASSERT_OK(signer.RotateSigningKey());
+  // Generate and set a new key.
+  int64_t signing_key_seq_num;
+  {
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_NE(nullptr, key.get());
+    signing_key_seq_num = key->key_seq_num();
+    ASSERT_GT(signing_key_seq_num, -1);
+    ASSERT_OK(signer.AddKey(std::move(key)));
+  }
 
   // We should see the key now if we request TSKs starting at a
   // lower sequence number.
-  ASSERT_EQ(signer.GetTokenSigningPublicKeys(0).size(), 1);
+  ASSERT_EQ(1, verifier.ExportKeys().size());
   // We should not see the key if we ask for the sequence number
   // that it is assigned.
-  ASSERT_EQ(signer.GetTokenSigningPublicKeys(kStartingSeqNum).size(), 0);
+  ASSERT_EQ(0, verifier.ExportKeys(signing_key_seq_num).size());
 
-  // We should be able to sign a token, even though we have
-  // just one key.
+  // We should be able to sign a token now.
   ASSERT_OK(signer.SignToken(&token));
   ASSERT_TRUE(token.has_signature());
-  ASSERT_EQ(token.signing_key_seq_num(), kStartingSeqNum);
-
-  // Rotate again and check that we return the right keys.
-  ASSERT_OK(signer.RotateSigningKey());
-  ASSERT_EQ(signer.GetTokenSigningPublicKeys(0).size(), 2);
-  ASSERT_EQ(signer.GetTokenSigningPublicKeys(kStartingSeqNum).size(), 1);
-  ASSERT_EQ(signer.GetTokenSigningPublicKeys(kStartingSeqNum + 1).size(), 0);
+  ASSERT_EQ(signing_key_seq_num, token.signing_key_seq_num());
 
-  // We still use the original key for signing (we always use the second-to-latest
-  // key).
-  token = MakeUnsignedToken(WallTime_Now());
-  ASSERT_OK(signer.SignToken(&token));
-  ASSERT_TRUE(token.has_signature());
-  ASSERT_EQ(token.signing_key_seq_num(), kStartingSeqNum);
+  // Set next key and check that we return the right keys.
+  int64_t next_signing_key_seq_num;
+  {
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_NE(nullptr, key.get());
+    next_signing_key_seq_num = key->key_seq_num();
+    ASSERT_GT(next_signing_key_seq_num, signing_key_seq_num);
+    ASSERT_OK(signer.AddKey(std::move(key)));
+  }
+  ASSERT_EQ(2, verifier.ExportKeys().size());
+  ASSERT_EQ(1, verifier.ExportKeys(signing_key_seq_num).size());
+  ASSERT_EQ(0, verifier.ExportKeys(next_signing_key_seq_num).size());
 
-  // If we rotate one more time, we should start using the second key.
-  ASSERT_OK(signer.RotateSigningKey());
-  token = MakeUnsignedToken(WallTime_Now());
-  ASSERT_OK(signer.SignToken(&token));
-  ASSERT_TRUE(token.has_signature());
-  ASSERT_EQ(token.signing_key_seq_num(), kStartingSeqNum + 1);
+  // The first key should be used for signing: the next one is saved
+  // for the next round.
+  {
+    SignedTokenPB token = MakeUnsignedToken(WallTime_Now());
+    ASSERT_OK(signer.SignToken(&token));
+    ASSERT_TRUE(token.has_signature());
+    ASSERT_EQ(signing_key_seq_num, token.signing_key_seq_num());
+  }
 }
 
-// Test that the TokenSigner can export its public keys in protobuf form.
+// Test that the TokenSigner can export its public keys in protobuf form
+// via bound TokenVerifier.
 TEST_F(TokenTest, TestExportKeys) {
   // Test that the exported public keys don't contain private key material,
   // and have an appropriate expiration.
-  TokenSigner signer(1);
-  ASSERT_OK(signer.RotateSigningKey());
-  auto keys = signer.GetTokenSigningPublicKeys(0);
-  ASSERT_EQ(keys.size(), 1);
+  const int64_t key_exp_seconds = 60;
+  TokenSigner signer(key_exp_seconds, 30);
+  int64_t key_seq_num;
+  {
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_NE(nullptr, key.get());
+    key_seq_num = key->key_seq_num();
+    ASSERT_OK(signer.AddKey(std::move(key)));
+  }
+  const TokenVerifier& verifier(signer.verifier());
+  auto keys = verifier.ExportKeys();
+  ASSERT_EQ(1, keys.size());
   const TokenSigningPublicKeyPB& key = keys[0];
   ASSERT_TRUE(key.has_rsa_key_der());
-  ASSERT_EQ(key.key_seq_num(), 1);
+  ASSERT_EQ(key_seq_num, key.key_seq_num());
   ASSERT_TRUE(key.has_expire_unix_epoch_seconds());
-  ASSERT_GT(key.expire_unix_epoch_seconds(), WallTime_Now());
+  const int64_t now = WallTime_Now();
+  ASSERT_GT(key.expire_unix_epoch_seconds(), now);
+  ASSERT_LE(key.expire_unix_epoch_seconds(), now + key_exp_seconds);
 }
 
 // Test that the TokenVerifier can import keys exported by the TokenSigner
 // and then verify tokens signed by it.
 TEST_F(TokenTest, TestEndToEnd_Valid) {
-  TokenSigner signer(1);
-  ASSERT_OK(signer.RotateSigningKey());
+  TokenSigner signer(60, 20);
+  {
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_NE(nullptr, key.get());
+    ASSERT_OK(signer.AddKey(std::move(key)));
+  }
 
   // Make and sign a token.
   SignedTokenPB signed_token = MakeUnsignedToken(WallTime_Now() + 600);
@@ -136,7 +285,7 @@ TEST_F(TokenTest, TestEndToEnd_Valid) {
 
   // Try to verify it.
   TokenVerifier verifier;
-  ASSERT_OK(verifier.ImportPublicKeys(signer.GetTokenSigningPublicKeys(0)));
+  ASSERT_OK(verifier.ImportKeys(signer.verifier().ExportKeys()));
   TokenPB token;
   ASSERT_EQ(VerificationResult::VALID, verifier.VerifyTokenSignature(signed_token, &token));
 }
@@ -144,11 +293,17 @@ TEST_F(TokenTest, TestEndToEnd_Valid) {
 // Test all of the possible cases covered by token verification.
 // See VerificationResult.
 TEST_F(TokenTest, TestEndToEnd_InvalidCases) {
-  TokenSigner signer(1);
-  ASSERT_OK(signer.RotateSigningKey());
+  // Key rotation interval 0 allows adding 2 keys in a row with no delay.
+  TokenSigner signer(60, 0);
+  {
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_NE(nullptr, key.get());
+    ASSERT_OK(signer.AddKey(std::move(key)));
+  }
 
   TokenVerifier verifier;
-  ASSERT_OK(verifier.ImportPublicKeys(signer.GetTokenSigningPublicKeys(0)));
+  ASSERT_OK(verifier.ImportKeys(signer.verifier().ExportKeys()));
 
   // Make and sign a token, but corrupt the data in it.
   {
@@ -188,11 +343,18 @@ TEST_F(TokenTest, TestEndToEnd_InvalidCases) {
               verifier.VerifyTokenSignature(signed_token, &token));
   }
 
-  // Rotate to a new key, but don't inform the verifier of it yet. When we
+  // Set a new signing key, but don't inform the verifier of it yet. When we
   // verify, we expect the verifier to complain the key is unknown.
-  ASSERT_OK(signer.RotateSigningKey());
-  ASSERT_OK(signer.RotateSigningKey());
   {
+    {
+      std::unique_ptr<TokenSigningPrivateKey> key;
+      ASSERT_OK(signer.CheckNeedKey(&key));
+      ASSERT_NE(nullptr, key.get());
+      ASSERT_OK(signer.AddKey(std::move(key)));
+      bool has_rotated = false;
+      ASSERT_OK(signer.TryRotateKey(&has_rotated));
+      ASSERT_TRUE(has_rotated);
+    }
     SignedTokenPB signed_token = MakeUnsignedToken(WallTime_Now() + 600);
     ASSERT_OK(signer.SignToken(&signed_token));
     TokenPB token;
@@ -200,15 +362,21 @@ TEST_F(TokenTest, TestEndToEnd_InvalidCases) {
               verifier.VerifyTokenSignature(signed_token, &token));
   }
 
-  // Rotate to a signing key which is already expired, and inform the verifier
+  // Set a new signing key which is already expired, and inform the verifier
   // of all of the current keys. The verifier should recognize the key but
   // know that it's expired.
-  FLAGS_token_signing_key_validity_seconds = -10;
-  ASSERT_OK(signer.RotateSigningKey());
-  ASSERT_OK(signer.RotateSigningKey());
-  ASSERT_OK(verifier.ImportPublicKeys(signer.GetTokenSigningPublicKeys(
-      verifier.GetMaxKnownKeySequenceNumber())));
   {
+    {
+      unique_ptr<TokenSigningPrivateKey> tsk;
+      ASSERT_OK(GenerateTokenSigningKey(100, WallTime_Now() - 1, &tsk));
+      // This direct access is necessary because AddKey() does not allow to add
+      // an expired key.
+      TokenSigningPublicKeyPB tsk_public_pb;
+      tsk->ExportPublicKeyPB(&tsk_public_pb);
+      ASSERT_OK(verifier.ImportKeys({tsk_public_pb}));
+      signer.tsk_deque_.push_front(std::move(tsk));
+    }
+
     SignedTokenPB signed_token = MakeUnsignedToken(WallTime_Now() + 600);
     ASSERT_OK(signer.SignToken(&signed_token));
     TokenPB token;
@@ -217,5 +385,40 @@ TEST_F(TokenTest, TestEndToEnd_InvalidCases) {
   }
 }
 
+// Test functionality of the TokenVerifier::ImportKeys() method.
+TEST_F(TokenTest, TestTokenVerifierImportKeys) {
+  TokenVerifier verifier;
+
+  // An attempt to import no keys is fine.
+  ASSERT_OK(verifier.ImportKeys({}));
+  ASSERT_TRUE(verifier.ExportKeys().empty());
+
+  TokenSigningPublicKeyPB tsk_public_pb;
+  const auto exp_time = WallTime_Now() + 600;
+  tsk_public_pb.set_key_seq_num(100500);
+  tsk_public_pb.set_expire_unix_epoch_seconds(exp_time);
+  string public_key_str_der;
+  ASSERT_OK(GeneratePublicKeyStrDer(&public_key_str_der));
+  tsk_public_pb.set_rsa_key_der(public_key_str_der);
+
+  ASSERT_OK(verifier.ImportKeys({ tsk_public_pb }));
+  {
+    const auto& exported_tsks_public_pb = verifier.ExportKeys();
+    ASSERT_EQ(1, exported_tsks_public_pb.size());
+    EXPECT_EQ(tsk_public_pb.SerializeAsString(),
+              exported_tsks_public_pb[0].SerializeAsString());
+  }
+
+  // Re-importing the same key again is fine, and the total number
+  // of exported keys should not increase.
+  ASSERT_OK(verifier.ImportKeys({ tsk_public_pb }));
+  {
+    const auto& exported_tsks_public_pb = verifier.ExportKeys();
+    ASSERT_EQ(1, exported_tsks_public_pb.size());
+    EXPECT_EQ(tsk_public_pb.SerializeAsString(),
+              exported_tsks_public_pb[0].SerializeAsString());
+  }
+}
+
 } // namespace security
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/48cc975f/src/kudu/security/token.proto
----------------------------------------------------------------------
diff --git a/src/kudu/security/token.proto b/src/kudu/security/token.proto
index 26284be..c3f9ab6 100644
--- a/src/kudu/security/token.proto
+++ b/src/kudu/security/token.proto
@@ -72,14 +72,25 @@ message SignedTokenPB {
   optional int64 signing_key_seq_num = 3;
 };
 
-// A public key corresponding to the private key used to sign tokens.
+// A private key used to sign tokens.
+message TokenSigningPrivateKeyPB {
+  optional int64 key_seq_num = 1;
+
+  // The private key material, in DER format.
+  optional bytes rsa_key_der = 2 [ (kudu.REDACT) = true ];
+
+  // The time at which signatures made by this key should no longer be valid.
+  optional int64 expire_unix_epoch_seconds = 3;
+};
+
+// A public key corresponding to the private key used to sign tokens. Only
+// this part is necessary for token verification.
 message TokenSigningPublicKeyPB {
   optional int64 key_seq_num = 1;
 
   // The public key material, in DER format.
   optional bytes rsa_key_der = 2;
 
-  // The time at which signatures made by this key should no longer
-  // be valid.
+  // The time at which signatures made by this key should no longer be valid.
   optional int64 expire_unix_epoch_seconds = 3;
-};
\ No newline at end of file
+};

http://git-wip-us.apache.org/repos/asf/kudu/blob/48cc975f/src/kudu/security/token_signer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/token_signer.cc b/src/kudu/security/token_signer.cc
index 46a8846..2cf85ae 100644
--- a/src/kudu/security/token_signer.cc
+++ b/src/kudu/security/token_signer.cc
@@ -17,86 +17,259 @@
 
 #include "kudu/security/token_signer.h"
 
+#include <algorithm>
 #include <map>
 #include <memory>
+#include <mutex>
+#include <string>
+#include <utility>
 #include <vector>
 
 #include <gflags/gflags.h>
 
 #include "kudu/gutil/walltime.h"
-#include "kudu/security/crypto.h"
 #include "kudu/security/openssl_util.h"
 #include "kudu/security/token.pb.h"
 #include "kudu/security/token_signing_key.h"
+#include "kudu/security/token_verifier.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/locks.h"
+#include "kudu/util/status.h"
 
-DEFINE_int32(token_signing_key_num_rsa_bits, 2048,
-             "number of bits used for token signing keys");
+DEFINE_int64(authn_token_validity_seconds, 120,
+             "Period of time for which an issued authentication token is valid.");
+// TODO(PKI): docs for what actual effect this has, given we don't support
+// token renewal.
+// TODO(PKI): this is set extremely low, so that we don't forget to come back to
+// this and add rolling and refetching code.
+TAG_FLAG(authn_token_validity_seconds, experimental);
+
+DEFINE_int32(tsk_num_rsa_bits, 2048,
+             "Number of bits used for token signing keys");
 // TODO(PKI) is 1024 enough for TSKs since they rotate frequently?
 // maybe it would verify faster?
-DEFINE_int64(token_signing_key_validity_seconds, 60 * 60 * 24 * 7,
-             "number of seconds that a token signing key is valid for");
-// TODO(PKI): add flag tags
 
 using std::lock_guard;
+using std::map;
+using std::string;
+using std::unique_lock;
 using std::unique_ptr;
 using std::vector;
 
 namespace kudu {
 namespace security {
 
-TokenSigner::TokenSigner(int64_t next_seq_num)
-    : next_seq_num_(next_seq_num) {
+TokenSigner::TokenSigner(int64_t key_validity_seconds,
+                         int64_t key_rotation_seconds)
+    : verifier_(new TokenVerifier),
+      key_validity_seconds_(key_validity_seconds),
+      key_rotation_seconds_(key_rotation_seconds),
+      next_key_seq_num_(0) {
 }
 
 TokenSigner::~TokenSigner() {
 }
 
-Status TokenSigner::RotateSigningKey() {
-  unique_ptr<PrivateKey> key(new PrivateKey());
-  RETURN_NOT_OK_PREPEND(
-      GeneratePrivateKey(FLAGS_token_signing_key_num_rsa_bits, key.get()),
-      "could not generate new RSA token-signing key");
-  int64_t expire = WallTime_Now() + FLAGS_token_signing_key_validity_seconds;
+Status TokenSigner::ImportKeys(const vector<TokenSigningPrivateKeyPB>& keys) {
   lock_guard<RWMutex> l(lock_);
-  int64_t seq = next_seq_num_++;
-  unique_ptr<TokenSigningPrivateKey> new_tsk(new TokenSigningPrivateKey(
-        seq, expire, unique_ptr<PrivateKey>(key.release())));
-  keys_by_seq_[seq] = std::move(new_tsk);
+
+  const int64_t now = WallTime_Now();
+  map<int64_t, unique_ptr<TokenSigningPrivateKey>> tsk_by_seq;
+  vector<TokenSigningPublicKeyPB> public_keys_pb;
+  public_keys_pb.reserve(keys.size());
+  for (const auto& key : keys) {
+    // Check the input for consistency.
+    CHECK(key.has_key_seq_num());
+    CHECK(key.has_expire_unix_epoch_seconds());
+    CHECK(key.has_rsa_key_der());
+
+    const int64_t key_seq_num = key.key_seq_num();
+    unique_ptr<TokenSigningPrivateKey> tsk(new TokenSigningPrivateKey(key));
+
+    // Advance the key sequence number, if needed.
+    next_key_seq_num_ = std::max(next_key_seq_num_, key_seq_num + 1);
+    const int64_t key_expire_time = tsk->expire_time();
+    if (key_expire_time <= now) {
+      // Do nothing else with an expired TSK.
+      continue;
+    }
+
+    // Need the public part of the key for the TokenVerifier.
+    {
+      TokenSigningPublicKeyPB public_key_pb;
+      tsk->ExportPublicKeyPB(&public_key_pb);
+      public_keys_pb.emplace_back(std::move(public_key_pb));
+    }
+
+    tsk_by_seq[key_seq_num] = std::move(tsk);
+    if (tsk_by_seq.size() > 2) {
+      tsk_by_seq.erase(tsk_by_seq.begin());
+    }
+  }
+  // Register the public parts of the imported keys with the TokenVerifier.
+  RETURN_NOT_OK(verifier_->ImportKeys(public_keys_pb));
+
+  // Use two most recent keys known so far (in terms of sequence numbers)
+  // for token signing.
+  for (auto& e : tsk_deque_) {
+    const int64_t seq_num = e->key_seq_num();
+    tsk_by_seq[seq_num] = std::move(e);
+  }
+  tsk_deque_.clear();
+  for (auto& e : tsk_by_seq) {
+    tsk_deque_.emplace_back(std::move(e.second));
+  }
+  while (tsk_deque_.size() > 2) {
+    tsk_deque_.pop_front();
+  }
+
+  return Status::OK();
+}
+
+Status TokenSigner::GenerateAuthnToken(string username,
+                                       SignedTokenPB* signed_token) const {
+  TokenPB token;
+  token.set_expire_unix_epoch_seconds(
+      WallTime_Now() + FLAGS_authn_token_validity_seconds);
+  AuthnTokenPB* authn = token.mutable_authn();
+  authn->mutable_username()->assign(std::move(username));
+
+  SignedTokenPB ret;
+  if (!token.SerializeToString(ret.mutable_token_data())) {
+    return Status::RuntimeError("could not serialize authn token");
+  }
+
+  RETURN_NOT_OK(SignToken(&ret));
+  signed_token->Swap(&ret);
   return Status::OK();
 }
 
 Status TokenSigner::SignToken(SignedTokenPB* token) const {
   CHECK(token);
   shared_lock<RWMutex> l(lock_);
-  if (keys_by_seq_.empty()) {
-    return Status::IllegalState("must generate a key before signing");
+  if (tsk_deque_.empty()) {
+    return Status::IllegalState("no token signing key");
+  }
+  TokenSigningPrivateKey* key = tsk_deque_.front().get();
+  RETURN_NOT_OK_PREPEND(key->Sign(token), "could not sign authn token");
+  return Status::OK();
+}
+
+Status TokenSigner::CheckNeedKey(unique_ptr<TokenSigningPrivateKey>* tsk) const {
+  CHECK(tsk);
+  const int64_t now = WallTime_Now();
+
+  unique_lock<RWMutex> l(lock_);
+  if (tsk_deque_.empty()) {
+    // No active key: need a new one.
+    const int64 key_seq_num = next_key_seq_num_++;
+    const int64 key_expiration = now + key_validity_seconds_;
+    // Generation of cryptographically strong key takes many CPU cycles;
+    // do not want to block other parallel activity.
+    l.unlock();
+    return GenerateSigningKey(key_seq_num, key_expiration, tsk);
   }
-  // If there is more than one key available, we use the second-latest key,
-  // since the latest one may not have yet propagated to other servers, etc.
-  auto it = keys_by_seq_.end();
-  --it;
-  if (it != keys_by_seq_.begin()) {
-    --it;
+
+  if (tsk_deque_.size() >= 2) {
+    // It does not make much sense to keep more than two keys in the queue.
+    // It's enough to have just one active key and next key ready to be
+    // activated when it's time to do so.  However, it does not mean the
+    // process of key refreshement is about to stop once there are two keys
+    // in the queue: the TryRotate() method (which should be called periodically
+    // along with CheckNeedKey()/AddKey() pair) will eventually pop the
+    // current key out of the keys queue once the key enters its inactive phase.
+    tsk->reset();
+    return Status::OK();
+  }
+
+  // The currently active key is in the front of the queue.
+  const auto* key = tsk_deque_.front().get();
+
+  // Check if it's time to generate a new token signing key.
+  //
+  //   <-----AAAAA===========>
+  //         ^
+  //        now
+  //
+  const auto key_creation_time = key->expire_time() - key_validity_seconds_;
+  if (key_creation_time + key_rotation_seconds_ <= now) {
+    // It's time to create and start propagating next key.
+    const int64 key_seq_num = next_key_seq_num_++;
+    const int64 key_expiration = now + key_validity_seconds_;
+    // Generation of cryptographically strong key takes many CPU cycles:
+    // do not want to block other parallel activity.
+    l.unlock();
+    return GenerateSigningKey(key_seq_num, key_expiration, tsk);
   }
-  const auto& tsk = it->second;
-  return tsk->Sign(token);
+
+  // It's not yet time to generate a new key.
+  tsk->reset();
+  return Status::OK();
 }
 
-std::vector<TokenSigningPublicKeyPB> TokenSigner::GetTokenSigningPublicKeys(
-    int64_t after_sequence_number) const {
-  vector<TokenSigningPublicKeyPB> ret;
-  shared_lock<RWMutex> l(lock_);
-  for (auto it = keys_by_seq_.upper_bound(after_sequence_number);
-       it != keys_by_seq_.end();
-       ++it) {
-    ret.emplace_back();
-    it->second->ExportPublicKeyPB(&ret.back());
+Status TokenSigner::AddKey(unique_ptr<TokenSigningPrivateKey> tsk) {
+  CHECK(tsk);
+  const int64_t seq_num = tsk->key_seq_num();
+  if (seq_num < 0) {
+    return Status::InvalidArgument("invalid key sequence number");
+  }
+  if (tsk->expire_time() <= WallTime_Now()) {
+    return Status::InvalidArgument("key has already expired");
+  }
+
+  lock_guard<RWMutex> l(lock_);
+  next_key_seq_num_ = std::max(next_key_seq_num_, seq_num + 1);
+  // Register the public part of the key in TokenVerifier first.
+  TokenSigningPublicKeyPB public_key_pb;
+  tsk->ExportPublicKeyPB(&public_key_pb);
+  RETURN_NOT_OK(verifier_->ImportKeys({public_key_pb}));
+
+  tsk_deque_.emplace_back(std::move(tsk));
+
+  return Status::OK();
+}
+
+Status TokenSigner::TryRotateKey(bool* has_rotated) {
+  lock_guard<RWMutex> l(lock_);
+  if (has_rotated) {
+    *has_rotated = false;
+  }
+  if (tsk_deque_.size() < 2) {
+    // No next key to rotate to.
+    return Status::OK();
+  }
+
+  const auto* key = tsk_deque_.front().get();
+  // Check if it's time to switch to next key. The key propagation interval
+  // is equal to the key rotation interval.
+  //
+  // current active key   <-----AAAAA===========>
+  //           next key        <-----AAAAA===========>
+  //                                 ^
+  //                                now
+  //
+  const auto key_creation_time = key->expire_time() - key_validity_seconds_;
+  if (key_creation_time + 2 * key_rotation_seconds_ <= WallTime_Now()) {
+    tsk_deque_.pop_front();
+    if (has_rotated) {
+      *has_rotated = true;
+    }
   }
-  return ret;
+  return Status::OK();
 }
 
+Status TokenSigner::GenerateSigningKey(int64_t key_seq_num,
+                                       int64_t key_expiration,
+                                       unique_ptr<TokenSigningPrivateKey>* tsk) {
+  unique_ptr<PrivateKey> key(new PrivateKey());
+  RETURN_NOT_OK_PREPEND(
+      GeneratePrivateKey(FLAGS_tsk_num_rsa_bits, key.get()),
+      "could not generate new RSA token-signing key");
+  tsk->reset(new TokenSigningPrivateKey(key_seq_num,
+                                        key_expiration,
+                                        std::move(key)));
+  return Status::OK();
+}
 
 } // namespace security
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/48cc975f/src/kudu/security/token_signer.h
----------------------------------------------------------------------
diff --git a/src/kudu/security/token_signer.h b/src/kudu/security/token_signer.h
index 259b7b5..5bc8a40 100644
--- a/src/kudu/security/token_signer.h
+++ b/src/kudu/security/token_signer.h
@@ -16,106 +16,243 @@
 // under the License.
 #pragma once
 
-#include "kudu/gutil/macros.h"
-
-#include <map>
+#include <cstdint>
+#include <deque>
 #include <memory>
+#include <string>
 #include <vector>
 
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/macros.h"
 #include "kudu/util/rw_mutex.h"
 
 namespace kudu {
-namespace security {
+class Status;
 
+namespace security {
 class SignedTokenPB;
+class TokenSigner;
 class TokenSigningPrivateKey;
+class TokenSigningPrivateKeyPB;
 class TokenSigningPublicKeyPB;
+class TokenVerifier;
 
 // Class responsible for managing Token Signing Keys (TSKs) and signing tokens.
 //
-// This class manages a set of private TSKs, each identified by a sequence number.
-// Callers can export their public TSK counterparts, optionally transfer them
-// to another node, and then import them into a TokenVerifier.
+// This class manages a set of private TSKs, each identified by a sequence
+// number. Callers can export their public TSK counterparts via the included
+// TokenVerifier, optionally transfer them to another node, and then import
+// them into a TokenVerifier.
 //
-// The class provides the ability to rotate the current TSK. This generates a new
-// key pair and assigns it a sequence number. Note that, when signing tokens,
-// the most recent key is not used. Rather, the second-most-recent key is used.
-// This ensures that there is plenty of time to transmit the public key for the
-// new TSK to all TokenVerifiers (eg on other servers, via heartbeats), before
-// the new key enters usage.
+// The class provides the ability to check whether it's time go generate and
+// activate a new key. Every generated private/public key pair is assigned a
+// sequence number. Note that, when signing tokens, the most recent key
+// (a.k.a. next key) is not used. Rather, the second-most-recent key, if exists,
+// is used. This ensures that there is plenty of time to transmit the public
+// part of the new TSK to all TokenVerifiers (e.g. on other servers via
+// heatbeats or by other means), before the new key enters usage.
 //
 // On a fresh instance, with only one key, there is no "second most recent"
-// key. Thus, we fall back to signing tokens with the one available key.
-//
+// key. Thus, we fall back to signing tokens with the only available key.
 //
 // Key rotation schedules and validity periods
 // ===========================================
 // The TokenSigner does not automatically handle the rotation of keys.
-// Rotation must be performed by the caller using the 'RotateSigningKey()'
-// method. Typically, key rotation is performed much more frequently than
-// the validity period of the key, so that at any given point in time
-// there are several valid keys.
+// Rotation must be performed by an external caller using the combination of
+// 'CheckNeedKey()/AddKey()' and 'TryRotateKey()' methods. Typically,
+// key rotation is performed more frequently than the validity period
+// of the key, so that at any given point in time there are several valid keys.
+//
+// Below is the lifecycle of a TSK (token signing key):
+//
+//      <---AAAAA===============>
+//      ^                       ^
+// creation time          expiration time
+//
+// Prior to the creation time the TSK does not exist in the system.
 //
-// For example, consider a validity period of 4 days and a rotation interval of
-// 1 day:
+// '-' propagation interval
+//       The TSK is already created but not yet used to sign tokens. However,
+//       its public part is already being sent to the components which
+//       may be involved in validation of tokens signed by the key.
+//
+// 'A' activity interval
+//       The TSK is used to sign tokens. It's assumed that the components which
+//       are involved in token verification have already received
+//       the corresponding public part of the TSK.
+//
+// '=' inactivity interval
+//       The TSK is no longer used to sign tokens. However, it's still sent
+//       to other components which validate token signatures.
+//
+// Shortly after the TSK's expiration the token signing components stop
+// propagating its public part.
+//
+// The TSK is considered valid from its creation time until its expiration time.
+//
+// NOTE: The very first key created on the system bootstrap does not have
+//       propagation interval -- it turns active immediately.
+//
+// For example, consider the following configuration for token signing keys:
+//   validity period:      4 days
+//   rotation interval:    1 days
+//   propagation interval: 1 day
 //
 // Day      1    2    3    4    5    6    7    8
 // ------------------------------------------------
 // Key 1:   <AAAAAAAAA==========>
-// Key 2:        <====AAAAA=========>
-// Key 3:             <====AAAAA========>
-// Key 4:                  <====AAAAA==========>
-//                               .............
-// 'A' indicates the 'Originator Usage Period' (the period in which the key
-// is being used to sign tokens).
+// Key 2:        <----AAAAA==========>
+// Key 3:             <----AAAAA==========>
+// Key 4:                  <----AAAAA==========>
+//                              ...............
+// 'A' indicates the 'Originator Usage Period' (a.k.a. 'Activity Interval'),
+// i.e. the period in which the key is being used to sign tokens.
 //
-// '<...>' indicates the 'Recipient Usage Period' (the period in which the
-// verifier will consider the key valid).
+// '<...>' indicates the 'Recipient Usage Period': the period in which
+// the verifier may get tokens signed by the TSK and should consider them
+// for verification. The start of the recipient usage period is not crucial
+// in that regard, but the end of that period is -- after the TSK is expired,
+// a verifier should consider tokens signed by that TSK invalid
+// and stop accepting them even if the token signature is correct.
 //
 // When configuring the rotation and validity, consider the following constraint:
 //
-//   max_token_validity < tsk_validity_period - 2 * tsk_rotation_interval
+//   max_token_validity < tsk_validity_period - tsk_propagation_interval
 //
 // In the example above, this means that no token may be issued with a validity
-// longer than 2 days, without risking that the signing key would expire before
-// the token.
+// period longer than or equal to 3 days, without risking that the
+// signing/verification key would expire before the token.
 //
 // TODO(PKI): should we try to enforce this constraint in code?
 //
-// NOTE: one other result of the above is that the first key (Key 1) is actually
-// active for longer than the rest. This has some potential security implications,
-// so it's worth considering rolling twice at startup.
+// NOTE: One other result of the above is that the first key (Key 1) is actually
+//       active for longer than the rest. This has some potential security
+//       implications, so it's worth considering rolling twice at startup.
+//
+// NOTE: Current implementation of TokenSigner assumes the propagation
+//       interval is equal to the rotation interval.
+//
+// Typical usage pattern:
+//
+//    TokenSigner ts;
+//    // Load existing TSKs from the system table.
+//    ...
+//    RETURN_NOT_OK(ts.ImportKeys(...));
+//
+//    // Check that there is a valid TSK to sign keys.
+//    {
+//      unique_ptr<TokenSigningPrivateKey> key;
+//      RETURN_NOT_OK(ts.CheckNeedKey(&key));
+//      if (key) {
+//        // Store the newly generated key into the system table.
+//        ...
+//
+//        // Add the key into the queue of the TokenSigner.
+//        RETURN_NOT_OK(ts.AddKey(std::move(key)));
+//      }
+//    }
+//    // Check and switch to the next key, if it's time.
+//    RETURN_NOT_OK(ts.TryRotateKey());
+//
+//    ...
+//    // Time to time (but much more often than TSK validity/rotation interval)
+//    // call the 'CheckNeedKey()/AddKey() followed by TryRotateKey()' sequence.
+//    // It's a good idea to dedicate a separate periodic task for that.
+//    ...
 //
-// This class is thread-safe.
 class TokenSigner {
  public:
-  // Create a new TokenSigner. It will start assigning key sequence numbers
-  // at 'next_seq_num'.
-  //
-  // NOTE: this does not initialize an initial key. Call 'RotateSigningKey()'
-  // to initialize the first key.
-  explicit TokenSigner(int64_t next_seq_num);
+  // Parameters of the TokenSigner constructor define the TSK rotation schedule.
+  // See the class's comment just above for details.
+  TokenSigner(int64_t key_validity_seconds,
+              int64_t key_rotation_seconds);
   ~TokenSigner();
 
-  // Sign the given token using the current TSK.
-  Status SignToken(SignedTokenPB* token) const WARN_UNUSED_RESULT;
+  // Import token signing keys in PB format, notifying TokenVerifier
+  // and updating internal key sequence number. This method can be called
+  // multiple times. Depending on the input keys and current time, the instance
+  // might not be ready to sign keys right after calling ImportKeys(),
+  // so additional cycle of CheckNeedKey/AddKey might be needed.
+  //
+  // See the class comment above for more information about the intended usage.
+  Status ImportKeys(const std::vector<TokenSigningPrivateKeyPB>& keys)
+      WARN_UNUSED_RESULT;
 
-  // Returns the set of valid public keys with sequence numbers greater
-  // than 'after_sequence_number'.
-  std::vector<TokenSigningPublicKeyPB> GetTokenSigningPublicKeys(
-      int64_t after_sequence_number) const;
+  // Check whether it's time to generate and add a new key. If so, the new key
+  // is generated and output into the 'tsk' parameter so it's possible to
+  // examine and otherwise process the key as needed (e.g. store it).
+  // After that, use AddKey() method to actually add the key into the
+  // TokenSigner's key queue.
+  //
+  // Every non-null key returned by this methods has key sequence number.
+  // The key sequence number always increases with newly generated keys.
+  // It's not a problem to call this method multiple times but call the AddKey()
+  // method only once, effectively discarding all the generated keys except for
+  // the key passed to the AddKey() call as a parameter. In other words,
+  // it's possible and not a problem to have 'holes' in the key sequence
+  // numbers. Other components working with verification of the signed tokens
+  // should take that into account.
+  //
+  // See the class comment above for more information about the intended usage.
+  Status CheckNeedKey(std::unique_ptr<TokenSigningPrivateKey>* tsk) const
+      WARN_UNUSED_RESULT;
 
-  // Rotate to a new token-signing key.
+  // Add the new key into the token signing keys queue. Call TryRotateKey()
+  // to make this key active when it's time.
   //
-  // See class documentation for more information.
-  Status RotateSigningKey() WARN_UNUSED_RESULT;
+  // See the class comment above for more information about the intended usage.
+  Status AddKey(std::unique_ptr<TokenSigningPrivateKey> tsk) WARN_UNUSED_RESULT;
+
+  // Check whether it's possible and it's time to switch to next signing key
+  // from the token signing keys queue. A key can be added using the
+  // CheckNeedKey()/AddKey() method pair. If there is next key to switch to
+  // and it's time to do so, the methods switches to the next key and reports
+  // on that via the 'has_rotated' parameter.
+  // The intended use case is to call TryRotateKey() periodically.
+  //
+  // See the class comment above for more information about the intended usage.
+  Status TryRotateKey(bool* has_rotated = nullptr) WARN_UNUSED_RESULT;
+
+  Status GenerateAuthnToken(std::string username,
+                            SignedTokenPB* signed_token) const
+      WARN_UNUSED_RESULT;
+
+  Status SignToken(SignedTokenPB* token) const WARN_UNUSED_RESULT;
+
+  const TokenVerifier& verifier() const { return *verifier_; }
 
  private:
-  // Protects following fields.
+  FRIEND_TEST(TokenTest, TestEndToEnd_InvalidCases);
+
+  static Status GenerateSigningKey(int64_t key_seq_num,
+                                   int64_t key_expiration,
+                                   std::unique_ptr<TokenSigningPrivateKey>* tsk)
+      WARN_UNUSED_RESULT;
+
+  std::unique_ptr<TokenVerifier> verifier_;
+
+  // Period of validity for newly created token signing keys. In other words,
+  // the expiration time for a new key is set to (now + key_validity_seconds_).
+  const int64_t key_validity_seconds_;
+
+  // TSK rotation interval: number of seconds between consecutive activations
+  // of new token signing keys.
+  const int64_t key_rotation_seconds_;
+
+  // Protects next_seq_num_ and tsk_deque_ members.
   mutable RWMutex lock_;
-  std::map<int64_t, std::unique_ptr<TokenSigningPrivateKey>> keys_by_seq_;
 
-  int64_t next_seq_num_;
+  // The sequence number to assign to next generated key.
+  // It's allowable to have 'holes' in the key sequence numbers, i.e. it's
+  // acceptable to have sequence numbers which do not correspond to any
+  // existing TSK. The only crucial point is to keep the key sequence numbers
+  // increasing.
+  mutable int64_t next_key_seq_num_;
+
+  // The currently active key is in the front of the queue,
+  // the newly added ones are pushed into back of the queue.
+  std::deque<std::unique_ptr<TokenSigningPrivateKey>> tsk_deque_;
 
   DISALLOW_COPY_AND_ASSIGN(TokenSigner);
 };

http://git-wip-us.apache.org/repos/asf/kudu/blob/48cc975f/src/kudu/security/token_signing_key.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/token_signing_key.cc b/src/kudu/security/token_signing_key.cc
index fdb2067..20bcfbe 100644
--- a/src/kudu/security/token_signing_key.cc
+++ b/src/kudu/security/token_signing_key.cc
@@ -56,10 +56,24 @@ bool TokenSigningPublicKey::VerifySignature(const SignedTokenPB& token) const {
 }
 
 TokenSigningPrivateKey::TokenSigningPrivateKey(
+    const TokenSigningPrivateKeyPB& pb)
+    : key_(new PrivateKey) {
+  CHECK_OK(key_->FromString(pb.rsa_key_der(), DataFormat::DER));
+  private_key_der_ = pb.rsa_key_der();
+  key_seq_num_ = pb.key_seq_num();
+  expire_time_ = pb.expire_unix_epoch_seconds();
+
+  PublicKey public_key;
+  CHECK_OK(key_->GetPublicKey(&public_key));
+  CHECK_OK(public_key.ToString(&public_key_der_, DataFormat::DER));
+}
+
+TokenSigningPrivateKey::TokenSigningPrivateKey(
     int64_t key_seq_num, int64_t expire_time, unique_ptr<PrivateKey> key)
     : key_(std::move(key)),
       key_seq_num_(key_seq_num),
       expire_time_(expire_time) {
+  CHECK_OK(key_->ToString(&private_key_der_, DataFormat::DER));
   PublicKey public_key;
   CHECK_OK(key_->GetPublicKey(&public_key));
   CHECK_OK(public_key.ToString(&public_key_der_, DataFormat::DER));
@@ -77,7 +91,14 @@ Status TokenSigningPrivateKey::Sign(SignedTokenPB* token) const {
   return Status::OK();
 }
 
-void TokenSigningPrivateKey::ExportPublicKeyPB(TokenSigningPublicKeyPB* pb) {
+void TokenSigningPrivateKey::ExportPB(TokenSigningPrivateKeyPB* pb) const {
+  pb->Clear();
+  pb->set_key_seq_num(key_seq_num_);
+  pb->set_rsa_key_der(private_key_der_);
+  pb->set_expire_unix_epoch_seconds(expire_time_);
+}
+
+void TokenSigningPrivateKey::ExportPublicKeyPB(TokenSigningPublicKeyPB* pb) const {
   pb->Clear();
   pb->set_key_seq_num(key_seq_num_);
   pb->set_rsa_key_der(public_key_der_);

http://git-wip-us.apache.org/repos/asf/kudu/blob/48cc975f/src/kudu/security/token_signing_key.h
----------------------------------------------------------------------
diff --git a/src/kudu/security/token_signing_key.h b/src/kudu/security/token_signing_key.h
index 899f63e..875ebbf 100644
--- a/src/kudu/security/token_signing_key.h
+++ b/src/kudu/security/token_signing_key.h
@@ -62,6 +62,7 @@ class TokenSigningPublicKey {
 // number and expiration date.
 class TokenSigningPrivateKey {
  public:
+  explicit TokenSigningPrivateKey(const TokenSigningPrivateKeyPB& pb);
   TokenSigningPrivateKey(int64_t key_seq_num,
                          int64_t expire_time,
                          std::unique_ptr<PrivateKey> key);
@@ -70,13 +71,23 @@ class TokenSigningPrivateKey {
   // Sign a token, and store the signature and signing key's sequence number.
   Status Sign(SignedTokenPB* token) const;
 
+  // Export data into corresponding PB structure.
+  void ExportPB(TokenSigningPrivateKeyPB* pb) const;
+
   // Export the public-key portion of this signing key.
-  void ExportPublicKeyPB(TokenSigningPublicKeyPB* pb);
+  void ExportPublicKeyPB(TokenSigningPublicKeyPB* pb) const;
+
+  int64_t key_seq_num() const { return key_seq_num_; }
+  int64_t expire_time() const { return expire_time_; }
 
  private:
   std::unique_ptr<PrivateKey> key_;
-  // The 'public_key_der_' is a serialized 'key_' in DER format: just a cache.
+  // The 'private_key_der_' is a serialized 'key_' in DER format: just a cache.
+  std::string private_key_der_;
+  // The 'public_key_der_' is serialized public part of 'key_' in DER format;
+  // just a cache.
   std::string public_key_der_;
+
   int64_t key_seq_num_;
   int64_t expire_time_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/48cc975f/src/kudu/security/token_verifier.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/token_verifier.cc b/src/kudu/security/token_verifier.cc
index cb1c0a1..2f6ba03 100644
--- a/src/kudu/security/token_verifier.cc
+++ b/src/kudu/security/token_verifier.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/security/token_verifier.h"
 
+#include <algorithm>
 #include <mutex>
 #include <string>
 #include <vector>
@@ -29,6 +30,7 @@
 
 using std::lock_guard;
 using std::string;
+using std::transform;
 using std::unique_ptr;
 using std::vector;
 
@@ -52,11 +54,11 @@ int64_t TokenVerifier::GetMaxKnownKeySequenceNumber() const {
 
 // Import a set of public keys provided by the token signer (typically
 // running on another node).
-Status TokenVerifier::ImportPublicKeys(const vector<TokenSigningPublicKeyPB>& public_keys) {
+Status TokenVerifier::ImportKeys(const vector<TokenSigningPublicKeyPB>& keys) {
   // Do the construction outside of the lock, to avoid holding the
   // lock while doing lots of allocation.
   vector<unique_ptr<TokenSigningPublicKey>> tsks;
-  for (const auto& pb : public_keys) {
+  for (const auto& pb : keys) {
     // Sanity check the key.
     if (!pb.has_rsa_key_der()) {
       return Status::RuntimeError(
@@ -81,6 +83,18 @@ Status TokenVerifier::ImportPublicKeys(const vector<TokenSigningPublicKeyPB>& pu
   return Status::OK();
 }
 
+std::vector<TokenSigningPublicKeyPB> TokenVerifier::ExportKeys(
+    int64_t after_sequence_number) const {
+  vector<TokenSigningPublicKeyPB> ret;
+  shared_lock<RWMutex> l(lock_);
+  ret.reserve(keys_by_seq_.size());
+  transform(keys_by_seq_.upper_bound(after_sequence_number),
+            keys_by_seq_.end(),
+            back_inserter(ret),
+            [](const KeysMap::value_type& e) { return e.second->pb(); });
+  return ret;
+}
+
 // Verify the signature on the given token.
 VerificationResult TokenVerifier::VerifyTokenSignature(const SignedTokenPB& signed_token,
                                                        TokenPB* token) const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/48cc975f/src/kudu/security/token_verifier.h
----------------------------------------------------------------------
diff --git a/src/kudu/security/token_verifier.h b/src/kudu/security/token_verifier.h
index 3466fda..00c541f 100644
--- a/src/kudu/security/token_verifier.h
+++ b/src/kudu/security/token_verifier.h
@@ -56,14 +56,22 @@ class TokenVerifier {
 
   // Return the highest key sequence number known by this instance.
   //
-  // If no keys are known, returns -1.
+  // If no keys are known, return -1.
   int64_t GetMaxKnownKeySequenceNumber() const;
 
-  // Import a set of public keys provided by a TokenSigner instance (which might
-  // be running on a remote node). If any public keys already exist with matching key
-  // sequence numbers, they are replaced by the new keys.
-  Status ImportPublicKeys(const std::vector<TokenSigningPublicKeyPB>& public_keys)
-    WARN_UNUSED_RESULT;
+  // Import a set of public keys provided by a TokenSigner instance
+  // (which might be running on a remote node). If any public keys already
+  // exist with matching key sequence numbers, they are replaced by
+  // the new keys.
+  Status ImportKeys(const std::vector<TokenSigningPublicKeyPB>& keys)
+      WARN_UNUSED_RESULT;
+
+  // Export token signing public keys. Specifying the 'after_sequence_number'
+  // allows to get public keys with sequence numbers greater than
+  // 'after_sequence_number'. If the 'after_sequence_number' parameter is
+  // omitted, all known public keys are exported.
+  std::vector<TokenSigningPublicKeyPB> ExportKeys(
+      int64_t after_sequence_number = -1) const;
 
   // Verify the signature on the given signed token, and deserialize the
   // contents into 'token'.
@@ -77,9 +85,11 @@ class TokenVerifier {
   // void ExpireOldKeys();
 
  private:
+  typedef std::map<int64_t, std::unique_ptr<TokenSigningPublicKey>> KeysMap;
+
   // Lock protecting keys_by_seq_
   mutable RWMutex lock_;
-  std::map<int64_t, std::unique_ptr<TokenSigningPublicKey>> keys_by_seq_;
+  KeysMap keys_by_seq_;
 
   DISALLOW_COPY_AND_ASSIGN(TokenVerifier);
 };

http://git-wip-us.apache.org/repos/asf/kudu/blob/48cc975f/src/kudu/tserver/heartbeater.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc
index 512f680..5a292b4 100644
--- a/src/kudu/tserver/heartbeater.cc
+++ b/src/kudu/tserver/heartbeater.cc
@@ -447,7 +447,7 @@ Status Heartbeater::Thread::DoHeartbeat() {
     vector<security::TokenSigningPublicKeyPB> tsks(last_hb_response_.tsks().begin(),
                                                    last_hb_response_.tsks().end());
     RETURN_NOT_OK_PREPEND(
-        server_->mutable_token_verifier()->ImportPublicKeys(tsks),
+        server_->mutable_token_verifier()->ImportKeys(tsks),
         "failed to import token signing public keys from master heartbeat");
   }
 


[4/6] kudu git commit: [security] leader master sends public TSKs to tservers

Posted by to...@apache.org.
[security] leader master sends public TSKs to tservers

Leader master responds with list of public TSK parts to tablet servers
in TSHeartbeatResponsePB.tsks field.

Added a small integration test for verification.

Change-Id: Idd65bda944be9d365580e2d4b37b293b4dcff3e0
Reviewed-on: http://gerrit.cloudera.org:8080/6065
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/6ec83123
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/6ec83123
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/6ec83123

Branch: refs/heads/master
Commit: 6ec831234fd1682c22a6a2cb547dbaff96188f95
Parents: b5734ea
Author: Alexey Serbin <as...@cloudera.com>
Authored: Fri Feb 17 17:22:50 2017 -0800
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Wed Feb 22 01:44:09 2017 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/registration-test.cc | 10 ++++++++++
 src/kudu/master/master.proto                    |  1 -
 src/kudu/master/master_service.cc               | 11 ++++++++++-
 3 files changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/6ec83123/src/kudu/integration-tests/registration-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/registration-test.cc b/src/kudu/integration-tests/registration-test.cc
index 168ce9e..fa91d7a 100644
--- a/src/kudu/integration-tests/registration-test.cc
+++ b/src/kudu/integration-tests/registration-test.cc
@@ -33,6 +33,7 @@
 #include "kudu/master/ts_descriptor.h"
 #include "kudu/security/test/test_certs.h"
 #include "kudu/security/tls_context.h"
+#include "kudu/security/token_verifier.h"
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/util/curl_util.h"
@@ -221,6 +222,15 @@ TEST_F(RegistrationTest, TestTSGetsSignedX509Certificate) {
     }, MonoDelta::FromSeconds(10));
 }
 
+// Check that after the tablet server registers, it gets the list of valid
+// public token signing keys.
+TEST_F(RegistrationTest, TestTSGetsTskList) {
+  MiniTabletServer* ts = cluster_->mini_tablet_server(0);
+  AssertEventually([&](){
+      ASSERT_FALSE(ts->server()->token_verifier().ExportKeys().empty());
+    });
+}
+
 // Test that, if the tserver has HTTPS enabled, the master links to it
 // via https:// URLs and not http://.
 TEST_F(RegistrationTest, TestExposeHttpsURLs) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/6ec83123/src/kudu/master/master.proto
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 37deba4..9846968 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -608,7 +608,6 @@ message ConnectToMasterResponsePB {
 
   // If the client requested an authentication token, and security is
   // enabled on the cluster, the master returns a signed authn token.
-  // TODO(PKI): implement me!
   optional security.SignedTokenPB authn_token = 4;
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/6ec83123/src/kudu/master/master_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index 2e28f07..4352025 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -32,6 +32,7 @@
 #include "kudu/rpc/user_credentials.h"
 #include "kudu/server/webserver.h"
 #include "kudu/security/token_signer.h"
+#include "kudu/security/token_verifier.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/pb_util.h"
 
@@ -164,7 +165,15 @@ void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
     resp->add_ca_cert_der(server_->cert_authority()->ca_cert_der());
   }
 
-  // TODO(aserbin): 7. Send any active CA certs which the TS doesn't have.
+  // 7. Only leaders send public parts of non-expired TSK
+  // which the TS doesn't have.
+  if (is_leader_master && req->has_latest_tsk_seq_num()) {
+    auto tsk_public_keys = server_->token_signer()->verifier().ExportKeys(
+        req->latest_tsk_seq_num());
+    for (auto& key : tsk_public_keys) {
+      resp->add_tsks()->Swap(&key);
+    }
+  }
 
   rpc->RespondSuccess();
 }


[6/6] kudu git commit: [security] removed CheckInitCertAuthority method

Posted by to...@apache.org.
[security] removed CheckInitCertAuthority method

Removing the CatalogManager::CheckInitCertAuthority() method
since it's no longer used. This is a follow-up for commit
c06a3bc66e59e9467b599e85714825347aacf7ec.

This patch does not contain any functional changes.

Change-Id: Ia281ded7210d5e905005884d8b286a8ac1a547ed
Reviewed-on: http://gerrit.cloudera.org:8080/6101
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Alexey Serbin <as...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/2f95904f
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/2f95904f
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/2f95904f

Branch: refs/heads/master
Commit: 2f95904fc39fbe767b18cb193a271dcba5538215
Parents: 11ba0cc
Author: Alexey Serbin <as...@cloudera.com>
Authored: Tue Feb 21 17:07:30 2017 -0800
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Wed Feb 22 01:48:22 2017 +0000

----------------------------------------------------------------------
 src/kudu/master/catalog_manager.cc | 47 ---------------------------------
 src/kudu/master/catalog_manager.h  |  5 ----
 2 files changed, 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/2f95904f/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 59b990f..c5456ca 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -777,53 +777,6 @@ Status CatalogManager::StoreCertAuthorityInfo(const PrivateKey& key,
   return Status::OK();
 }
 
-// Check if CA private key and the certificate were loaded into the
-// memory. If not, generate new ones and store them into the system table
-// and then load into the memory.
-Status CatalogManager::CheckInitCertAuthority() {
-  using security::Cert;
-  using security::DataFormat;
-  using security::PrivateKey;
-
-  leader_lock_.AssertAcquiredForWriting();
-
-  MasterCertAuthority* ca = master_->cert_authority();
-  unique_ptr<PrivateKey> ca_private_key(new PrivateKey);
-  unique_ptr<Cert> ca_cert(new Cert);
-
-  SysCertAuthorityEntryPB info;
-  const Status s = sys_catalog_->GetCertAuthorityEntry(&info);
-  if (!(s.ok() || s.IsNotFound())) {
-    return s;
-  }
-  if (PREDICT_TRUE(s.ok())) {
-    // The loaded data is necessary to initialize master's cert authority.
-    RETURN_NOT_OK(ca_private_key->FromString(
-        info.private_key(), DataFormat::DER));
-    RETURN_NOT_OK(ca_cert->FromString(
-        info.certificate(), DataFormat::DER));
-    // Extra sanity check.
-    RETURN_NOT_OK(ca_cert->CheckKeyMatch(*ca_private_key));
-  } else {
-    // Generate new private key and corresponding CA certificate.
-    RETURN_NOT_OK(ca->Generate(ca_private_key.get(), ca_cert.get()));
-    RETURN_NOT_OK(ca_private_key->ToString(
-        info.mutable_private_key(), DataFormat::DER));
-    RETURN_NOT_OK(ca_cert->ToString(
-        info.mutable_certificate(), DataFormat::DER));
-    // Store the newly generated private key and certificate
-    // in the system table.
-    RETURN_NOT_OK(sys_catalog_->AddCertAuthorityEntry(info));
-    LOG(INFO) << "Wrote cert authority information into the system table.";
-  }
-
-  // Initialize/re-initialize the master's certificate authority component
-  // with the new private key and certificate.
-  RETURN_NOT_OK(ca->Init(std::move(ca_private_key), std::move(ca_cert)));
-
-  return Status::OK();
-}
-
 void CatalogManager::VisitTablesAndTabletsTask() {
   {
     // Hack to block this function until InitSysCatalogAsync() is finished.

http://git-wip-us.apache.org/repos/asf/kudu/blob/2f95904f/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index cf83363..fe8c4aa 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -591,11 +591,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   Status StoreCertAuthorityInfo(const security::PrivateKey& key,
                                 const security::Cert& cert);
 
-  // Load existing root CA information from the system table, if present.
-  // If not, generate new ones, store them into the system table.
-  // After that, use the CA information to initialize the certificate authority.
-  Status CheckInitCertAuthority();
-
   // Helper for creating the initial TableInfo state
   // Leaves the table "write locked" with the new info in the
   // "dirty" state field.


[3/6] kudu git commit: java: support token authentication in negotiation

Posted by to...@apache.org.
java: support token authentication in negotiation

This adds support for the TOKEN authentication type in negotiation, as
well as a place to store the token and CA cert within the client. This
isn't quite hooked up to the ConnectToCluster RPC yet, so it won't
actually take effect for real connections, but a new unit test verifies
the code paths.

A later patch will actually hook this up to the real certs and tokens
once the appropriate server-side code has all landed.

Change-Id: I7da5809495d67b308828e83c857249b852f21db3
Reviewed-on: http://gerrit.cloudera.org:8080/6047
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b5734ead
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b5734ead
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b5734ead

Branch: refs/heads/master
Commit: b5734eadd4835df2f116b45a052a1adeb2dabc59
Parents: c06a3bc
Author: Todd Lipcon <to...@apache.org>
Authored: Thu Feb 16 17:55:48 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Feb 22 01:18:49 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java |  19 +-
 .../java/org/apache/kudu/client/Negotiator.java | 171 +++++++++++++---
 .../org/apache/kudu/client/SecurityContext.java | 203 +++++++++++++++++++
 .../org/apache/kudu/client/TabletClient.java    |  15 +-
 .../java/org/apache/kudu/util/SecurityUtil.java |  44 ----
 .../org/apache/kudu/client/TestNegotiator.java  | 190 +++++++++++++----
 src/kudu/rpc/client_negotiation.cc              |   3 +
 7 files changed, 518 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/b5734ead/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 680773e..4c0d41b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -193,12 +193,13 @@ public class AsyncKuduClient implements AutoCloseable {
 
   private final RequestTracker requestTracker;
 
-  private final Subject subject;
+  private final SecurityContext securityContext;
 
   private volatile boolean closed;
 
   private AsyncKuduClient(AsyncKuduClientBuilder b) {
     this.channelFactory = b.createChannelFactory();
+    this.securityContext = new SecurityContext(b.subject);
     this.masterAddresses = b.masterAddresses;
     this.masterTable = new KuduTable(this, MASTER_TABLE_NAME_PLACEHOLDER,
         MASTER_TABLE_NAME_PLACEHOLDER, null, null);
@@ -210,7 +211,6 @@ public class AsyncKuduClient implements AutoCloseable {
     this.timer = b.timer;
     String clientId = UUID.randomUUID().toString().replace("-", "");
     this.requestTracker = new RequestTracker(clientId);
-    this.subject = b.subject;
     this.connectionCache = new ConnectionCache(this);
   }
 
@@ -567,6 +567,10 @@ public class AsyncKuduClient implements AutoCloseable {
     return channelFactory;
   }
 
+  SecurityContext getSecurityContext() {
+    return securityContext;
+  }
+
   /**
    * Creates a new {@link AsyncKuduScanner.AsyncKuduScannerBuilder} for a particular table.
    * @param table the name of the table you intend to scan.
@@ -955,17 +959,6 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
-   * Gets the subject who created the Kudu client.
-   *
-   * The subject contains credentials necessary to authenticate to Kerberized Kudu clusters.
-   *
-   * @return the subject who created the Kudu client, or null if no login context was active.
-   */
-  Subject getSubject() {
-    return subject;
-  }
-
-  /**
    * Clears {@link #tableLocations} of the table's entries.
    *
    * This method makes the maps momentarily inconsistent, and should only be

http://git-wip-us.apache.org/repos/asf/kudu/blob/b5734ead/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index a1e2400..856a2bc 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -70,9 +70,11 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.rpc.RpcHeader;
+import org.apache.kudu.rpc.RpcHeader.AuthenticationTypePB;
 import org.apache.kudu.rpc.RpcHeader.NegotiatePB;
 import org.apache.kudu.rpc.RpcHeader.NegotiatePB.NegotiateStep;
 import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
+import org.apache.kudu.security.Token.SignedTokenPB;
 import org.apache.kudu.util.SecurityUtil;
 
 /**
@@ -107,14 +109,21 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     INITIAL,
     AWAIT_NEGOTIATE,
     AWAIT_TLS_HANDSHAKE,
+    AWAIT_TOKEN_EXCHANGE,
     AWAIT_SASL,
     FINISHED
   }
 
-  /** Subject to authenticate as, when using Kerberos/GSSAPI */
-  private final Subject subject;
   /** The remote hostname we're connecting to, used by TLS and GSSAPI */
   private final String remoteHostname;
+  /** The security context holding the client credentials */
+  private final SecurityContext securityContext;
+  /**
+   * The authentication token we'll try to connect with, maybe null.
+   * This is fetched from {@link #securityContext} in the constructor to
+   * ensure that it doesn't change over the course of a negotiation attempt.
+   */
+  private final SignedTokenPB authnToken;
 
   private State state = State.INITIAL;
   private SaslClient saslClient;
@@ -122,6 +131,9 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
   /** The negotiated mechanism, set after NEGOTIATE stage. */
   private String chosenMech;
 
+  /** The negotiated authentication type, set after NEGOTIATE state. */
+  private AuthenticationTypePB.TypeCase chosenAuthnType;
+
   /** The features supported by the server, set after NEGOTIATE stage. */
   private Set<RpcHeader.RpcFeatureFlag> serverFeatures;
 
@@ -134,6 +146,9 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
    */
   private DecoderEmbedder<ChannelBuffer> sslEmbedder;
 
+  /** True if we have negotiated TLS with the server */
+  private boolean negotiatedTls;
+
   /**
    * Future indicating whether the embedded handshake has completed.
    * Only non-null once TLS is initiated.
@@ -142,9 +157,10 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
 
   private Certificate peerCert;
 
-  public Negotiator(Subject subject, String remoteHostname) {
-    this.subject = subject;
+  public Negotiator(String remoteHostname, SecurityContext securityContext) {
     this.remoteHostname = remoteHostname;
+    this.securityContext = securityContext;
+    this.authnToken = securityContext.getAuthenticationToken();
   }
 
   public void sendHello(Channel channel) {
@@ -152,16 +168,33 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
   }
 
   private void sendNegotiateMessage(Channel channel) {
-    RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder();
+    RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder()
+        .setStep(RpcHeader.NegotiatePB.NegotiateStep.NEGOTIATE);
 
     // Advertise our supported features
+    // ----------------------------------
     for (RpcHeader.RpcFeatureFlag flag : SUPPORTED_RPC_FEATURES) {
       builder.addSupportedFeatures(flag);
     }
     // TODO(todd): if we are on a loopback connection, advertise and support
     // TLS_AUTHENTICATION_ONLY.
 
-    builder.setStep(RpcHeader.NegotiatePB.NegotiateStep.NEGOTIATE);
+    // Advertise our authentication types.
+    // ----------------------------------
+    // We always advertise SASL.
+    builder.addAuthnTypesBuilder().setSasl(
+        AuthenticationTypePB.Sasl.getDefaultInstance());
+
+    // We may also have a token. But, we can only use the token
+    // if we are able to use authenticated TLS to authenticate the server.
+    if (authnToken != null && securityContext.hasTrustedCerts()) {
+      builder.addAuthnTypesBuilder().setToken(
+          AuthenticationTypePB.Token.getDefaultInstance());
+    }
+
+    // We currently don't support client-certificate authentication from the
+    // Java client.
+
     state = State.AWAIT_NEGOTIATE;
     sendSaslMessage(channel, builder.build());
   }
@@ -198,6 +231,9 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
       case AWAIT_SASL:
         handleSaslMessage(chan, response);
         break;
+      case AWAIT_TOKEN_EXCHANGE:
+        handleTokenExchangeResponse(chan, response);
+        break;
       case AWAIT_TLS_HANDSHAKE:
         handleTlsMessage(chan, response);
         break;
@@ -237,17 +273,31 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
 
   private void handleNegotiateResponse(Channel chan, RpcHeader.NegotiatePB response) throws
       SaslException, SSLException {
+    Preconditions.checkState(response.getStep() == NegotiateStep.NEGOTIATE,
+        "Expected NEGOTIATE message, got {}", response.getStep());
+
     // Store the supported features advertised by the server.
-    ImmutableSet.Builder<RpcHeader.RpcFeatureFlag> features = ImmutableSet.builder();
-    for (RpcHeader.RpcFeatureFlag feature : response.getSupportedFeaturesList()) {
-      if (SUPPORTED_RPC_FEATURES.contains(feature)) {
-        features.add(feature);
-      }
+    serverFeatures = getFeatureFlags(response);
+    // If the server supports TLS, we will always speak TLS to it.
+    negotiatedTls = serverFeatures.contains(RpcFeatureFlag.TLS);
+
+    // Check the negotiated authentication type sent by the server.
+    chosenAuthnType = chooseAuthenticationType(response);
+
+    if (chosenAuthnType == AuthenticationTypePB.TypeCase.SASL) {
+      chooseAndInitializeSaslMech(response);
     }
-    serverFeatures = features.build();
 
-    boolean willUseTls = serverFeatures.contains(RpcFeatureFlag.TLS);
+    // If we negotiated TLS, then we want to start the TLS handshake; otherwise,
+    // we can move directly to the authentication phase.
+    if (negotiatedTls) {
+      startTlsHandshake(chan);
+    } else {
+      startAuthentication(chan);
+    }
+  }
 
+  private void chooseAndInitializeSaslMech(NegotiatePB response) throws SaslException {
     // Gather the set of server-supported mechanisms.
     Set<String> serverMechs = Sets.newHashSet();
     for (RpcHeader.NegotiatePB.SaslMechanism mech : response.getSaslMechanismsList()) {
@@ -267,7 +317,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
       Map<String, String> props = Maps.newHashMap();
       // If we are using GSSAPI with TLS, enable integrity protection, which we use
       // to securely transmit the channel bindings.
-      if ("GSSAPI".equals(clientMech) && willUseTls) {
+      if ("GSSAPI".equals(clientMech) && negotiatedTls) {
         props.put(Sasl.QOP, "auth-int");
       }
       try {
@@ -284,26 +334,65 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
         saslClient = null;
       }
     }
-
     if (chosenMech == null) {
       throw new SaslException("unable to negotiate a matching mechanism. Errors: [" +
                               Joiner.on(",").withKeyValueSeparator(": ").join(errorsByMech) +
                               "]");
     }
+  }
 
-    // If we negotiated TLS, then we want to start the TLS handshake.
-    if (willUseTls) {
-      startTlsHandshake(chan);
-    } else {
-      sendSaslInitiate(chan);
+  private AuthenticationTypePB.TypeCase chooseAuthenticationType(NegotiatePB response) {
+    Preconditions.checkArgument(response.getAuthnTypesCount() <= 1,
+        "Expected server to reply with at most one authn type");
+
+    if (response.getAuthnTypesCount() == 0) {
+      // Default to SASL for compatibility with old servers.
+      return AuthenticationTypePB.TypeCase.SASL;
+    }
+
+    AuthenticationTypePB.TypeCase type = response.getAuthnTypes(0).getTypeCase();
+    switch (type) {
+      case SASL:
+        break;
+      case TOKEN:
+        if (authnToken == null) {
+          // TODO(todd): should we also check whether we have a CA cert?
+          // it seems like this should have the same logic as whether we advertised it
+          throw new IllegalArgumentException("server chose token authentication " +
+              "but client had no valid token");
+        }
+        break;
+      default:
+        throw new IllegalArgumentException("server chose bad authn type " + chosenAuthnType);
+    }
+    return type;
+  }
+
+  private Set<RpcFeatureFlag> getFeatureFlags(NegotiatePB response) {
+    ImmutableSet.Builder<RpcHeader.RpcFeatureFlag> features = ImmutableSet.builder();
+    for (RpcHeader.RpcFeatureFlag feature : response.getSupportedFeaturesList()) {
+      if (SUPPORTED_RPC_FEATURES.contains(feature)) {
+        features.add(feature);
+      }
     }
+    return features.build();
   }
 
   /**
    * Send the initial TLS "ClientHello" message.
    */
   private void startTlsHandshake(Channel chan) throws SSLException {
-    SSLEngine engine = SecurityUtil.createSslEngine();
+    SSLEngine engine;
+    switch (chosenAuthnType) {
+      case SASL:
+        engine = securityContext.createSSLEngineTrustAll();
+        break;
+      case TOKEN:
+        engine = securityContext.createSSLEngine();
+        break;
+      default:
+        throw new AssertionError("unreachable");
+    }
     engine.setUseClientMode(true);
     SslHandler handler = new SslHandler(engine);
     handler.setEnableRenegotiation(false);
@@ -348,7 +437,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     }
 
     chan.getPipeline().addFirst("tls", handler);
-    sendSaslInitiate(chan);
+    startAuthentication(chan);
   }
 
   /**
@@ -391,6 +480,41 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
         .build());
   }
 
+  private void startAuthentication(Channel chan) throws SaslException {
+    switch (chosenAuthnType) {
+      case SASL:
+        sendSaslInitiate(chan);
+        break;
+      case TOKEN:
+        sendTokenExchange(chan);
+        break;
+      default:
+        throw new AssertionError("unreachable");
+    }
+  }
+
+  private void sendTokenExchange(Channel chan) {
+    // We must not send a token unless we have successfully finished
+    // authenticating via TLS.
+    Preconditions.checkNotNull(authnToken);
+    Preconditions.checkNotNull(sslHandshakeFuture);
+    Preconditions.checkState(sslHandshakeFuture.isSuccess());
+
+    RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder()
+        .setStep(NegotiateStep.TOKEN_EXCHANGE)
+        .setAuthnToken(authnToken);
+    state = State.AWAIT_TOKEN_EXCHANGE;
+    sendSaslMessage(chan, builder.build());
+  }
+
+  private void handleTokenExchangeResponse(Channel chan, NegotiatePB response) {
+    Preconditions.checkArgument(response.getStep() == NegotiateStep.TOKEN_EXCHANGE,
+        "expected TOKEN_EXCHANGE, got step: {}", response.getStep());
+
+    // The token response doesn't have any actual data in it, so we can just move on.
+    handleSuccessResponse(chan, response);
+  }
+
   private void sendSaslInitiate(Channel chan) throws SaslException {
     RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder();
     if (saslClient.hasInitialResponse()) {
@@ -470,7 +594,8 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
 
   private byte[] evaluateChallenge(final byte[] challenge) throws SaslException {
     try {
-      return Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
+      return Subject.doAs(securityContext.getSubject(),
+          new PrivilegedExceptionAction<byte[]>() {
           @Override
           public byte[] run() throws Exception {
             return saslClient.evaluateChallenge(challenge);

http://git-wip-us.apache.org/repos/asf/kudu/blob/b5734ead/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java b/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
new file mode 100644
index 0000000..51195da
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import java.security.KeyStore;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+import javax.security.auth.Subject;
+
+import com.google.common.base.Throwables;
+import com.google.protobuf.ByteString;
+
+import org.apache.kudu.security.Token.SignedTokenPB;
+
+/**
+ * Class associated with a single AsyncKuduClient which stores security-related
+ * infrastructure, credentials, and trusted certificates.
+ *
+ * Each client has a single instance of this class. This class is threadsafe.
+ */
+class SecurityContext {
+  @GuardedBy("this")
+  private SignedTokenPB authnToken;
+
+  private final DelegatedTrustManager trustManager = new DelegatedTrustManager();
+
+  /**
+   * SSLContext which trusts only the configured certificate.
+   */
+  private final SSLContext sslContextWithCert;
+
+  /**
+   * SSLContext which trusts any certificate.
+   */
+  private final SSLContext sslContextTrustAny;
+
+  /**
+   * The JAAS Subject that the client's credentials are stored in.
+   */
+  @Nullable
+  private final Subject subject;
+
+  public SecurityContext(Subject subject) {
+    try {
+      this.subject = subject;
+
+      this.sslContextWithCert = SSLContext.getInstance("TLS");
+      sslContextWithCert.init(null, new TrustManager[] { trustManager }, null);
+
+      this.sslContextTrustAny = SSLContext.getInstance("TLS");
+      sslContextTrustAny.init(null, new TrustManager[] { new TrustAnyCert() }, null);
+
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  public Subject getSubject() {
+    return subject;
+  }
+
+  /**
+   * @return the current authentication token, or null if we have no valid token
+   */
+  public synchronized SignedTokenPB getAuthenticationToken() {
+    return authnToken;
+  }
+
+  /**
+   * Set the token that we will use to authenticate to servers. Replaces any
+   * prior token.
+   */
+  public synchronized void setAuthenticationToken(SignedTokenPB token) {
+    authnToken = token;
+  }
+
+  /**
+   * Create an SSLEngine which will trust all certificates without verification.
+   */
+  public SSLEngine createSSLEngineTrustAll() {
+    return sslContextTrustAny.createSSLEngine();
+  }
+
+  /**
+   * Create an SSLEngine which will trust only certificates that have a valid chain
+   * of trust.
+   */
+  public SSLEngine createSSLEngine() {
+    return sslContextWithCert.createSSLEngine();
+  }
+
+  /**
+   * @return true if any cert has been marked as trusted
+   */
+  public boolean hasTrustedCerts() {
+    return trustManager.delegate.get() != null;
+  }
+
+  /**
+   * Mark the given CA cert (provided in DER form) as the trusted CA cert for the
+   * client. Replaces any previously trusted cert.
+   * @throws CertificateException if the cert was invalid
+   */
+  public void trustCertificate(ByteString certDer) throws CertificateException {
+    CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
+    X509Certificate cert = (X509Certificate)certFactory.generateCertificate(
+        certDer.newInput());
+
+    // This is implemented by making a new TrustManager and swapping it out under
+    // our delegating trust manager. It might seem more straight-forward to instead
+    // just keep one keystore around and load new certs into it, but apparently the
+    // TrustManager loads the certs from the KeyStore upon creation, so adding new
+    // ones to an existing KeyStore doesn't have any effect.
+    try {
+      KeyStore certKeyStore = KeyStore.getInstance(KeyStore.getDefaultType());
+      certKeyStore.load(null);
+      certKeyStore.setCertificateEntry("my-ca", cert);
+
+      TrustManagerFactory tmf = TrustManagerFactory.getInstance(
+          TrustManagerFactory.getDefaultAlgorithm());
+      tmf.init(certKeyStore);
+      TrustManager[] managers = tmf.getTrustManagers();
+      if (managers.length != 1) {
+        throw new RuntimeException("TrustManagerFactory generated multiple TrustManagers");
+      }
+      trustManager.delegate.set((X509TrustManager) managers[0]);
+    } catch (Exception e) {
+      Throwables.propagateIfInstanceOf(e, CertificateException.class);
+      throw Throwables.propagate(e);
+    }
+  }
+
+  /**
+   * TrustManager implementation which will trust any certificate.
+   */
+  private static class TrustAnyCert implements X509TrustManager {
+    @Override
+    public void checkClientTrusted(X509Certificate[] arg0, String arg1)
+        throws CertificateException {
+    }
+
+    @Override
+    public void checkServerTrusted(X509Certificate[] arg0, String arg1)
+        throws CertificateException {
+    }
+
+    @Override
+    public X509Certificate[] getAcceptedIssuers() {
+      return null;
+    }
+  }
+
+  /**
+   * Trust manager that delegates to an underlying trust manager which
+   * can be swapped out atomically.
+   */
+  private static class DelegatedTrustManager implements X509TrustManager {
+    AtomicReference<X509TrustManager> delegate = new AtomicReference<>();
+
+    @Override
+    public void checkClientTrusted(X509Certificate[] chain, String authType)
+        throws CertificateException {
+      delegate.get().checkClientTrusted(chain, authType);
+    }
+
+    @Override
+    public void checkServerTrusted(X509Certificate[] chain, String authType)
+        throws CertificateException {
+      delegate.get().checkServerTrusted(chain, authType);
+    }
+
+    @Override
+    public X509Certificate[] getAcceptedIssuers() {
+      return delegate.get().getAcceptedIssuers();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/b5734ead/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
index 370d45e..938af00 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
@@ -34,7 +34,6 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.locks.ReentrantLock;
 
 import javax.annotation.concurrent.GuardedBy;
-import javax.security.auth.Subject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -571,9 +570,10 @@ public class TabletClient extends SimpleChannelUpstreamHandler {
                                final ChannelStateEvent e) {
     assert chan != null;
     Channels.write(chan, ChannelBuffers.wrappedBuffer(CONNECTION_HEADER));
-    Negotiator secureRpcHelper = new Negotiator(getSubject(), serverInfo.getHostname());
-    ctx.getPipeline().addBefore(ctx.getName(), "negotiation", secureRpcHelper);
-    secureRpcHelper.sendHello(chan);
+    Negotiator negotiator = new Negotiator(serverInfo.getHostname(),
+        kuduClient.getSecurityContext());
+    ctx.getPipeline().addBefore(ctx.getName(), "negotiation", negotiator);
+    negotiator.sendHello(chan);
   }
 
   @Override
@@ -722,13 +722,6 @@ public class TabletClient extends SimpleChannelUpstreamHandler {
     return serverInfo;
   }
 
-  /**
-   * @return the subject containing security credentials, or null if no subject is available.
-   */
-  Subject getSubject() {
-    return kuduClient.getSubject();
-  }
-
   public String toString() {
     final StringBuilder buf = new StringBuilder();
     buf.append("TabletClient@")

http://git-wip-us.apache.org/repos/asf/kudu/blob/b5734ead/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java b/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
index 654dc5e..963e53d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
@@ -21,16 +21,10 @@ import java.security.AccessControlContext;
 import java.security.AccessController;
 import java.security.MessageDigest;
 import java.security.cert.Certificate;
-import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
 import java.util.HashMap;
 import java.util.Map;
 
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLException;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.X509TrustManager;
 import javax.security.auth.Subject;
 import javax.security.auth.kerberos.KerberosPrincipal;
 import javax.security.auth.login.AppConfigurationEntry;
@@ -166,42 +160,4 @@ public abstract class SecurityUtil {
       throw Throwables.propagate(e);
     }
   }
-
-  /**
-   * TrustManager implementation which will trust any certificate.
-   * TODO(PKI): this needs to change so that it can be configured with
-   * the cluster's CA cert.
-   */
-  static class TrustAnyCert implements X509TrustManager {
-    @Override
-    public void checkClientTrusted(X509Certificate[] arg0, String arg1)
-        throws CertificateException {
-    }
-
-    @Override
-    public void checkServerTrusted(X509Certificate[] arg0, String arg1)
-        throws CertificateException {
-    }
-
-    @Override
-    public X509Certificate[] getAcceptedIssuers() {
-      return null;
-    }
-  }
-
-  /**
-   * Create an SSL engine configured to trust any certificate.
-   * @return
-   * @throws SSLException
-   */
-  public static SSLEngine createSslEngine() throws SSLException {
-    try {
-      SSLContext ctx = SSLContext.getInstance("TLS");
-      ctx.init(null, new TrustManager[] { new TrustAnyCert() }, null);
-      return ctx.createSSLEngine();
-    } catch (Exception e) {
-      Throwables.propagateIfInstanceOf(e, SSLException.class);
-      throw Throwables.propagate(e);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/b5734ead/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
index 8300fdf..6df45de 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
@@ -20,7 +20,6 @@ package org.apache.kudu.client;
 import static org.junit.Assert.*;
 
 import java.nio.ByteBuffer;
-import java.security.AccessControlContext;
 import java.security.AccessController;
 import java.security.KeyStore;
 import java.security.cert.Certificate;
@@ -35,12 +34,14 @@ import javax.net.ssl.SSLException;
 import javax.security.auth.Subject;
 
 import org.apache.kudu.client.Negotiator.Result;
+import org.apache.kudu.rpc.RpcHeader.AuthenticationTypePB;
 import org.apache.kudu.rpc.RpcHeader.ConnectionContextPB;
 import org.apache.kudu.rpc.RpcHeader.NegotiatePB;
 import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
 import org.apache.kudu.util.SecurityUtil;
 import org.apache.kudu.rpc.RpcHeader.NegotiatePB.NegotiateStep;
 import org.apache.kudu.rpc.RpcHeader.NegotiatePB.SaslMechanism;
+import org.apache.kudu.security.Token.SignedTokenPB;
 import org.apache.kudu.rpc.RpcHeader.ResponseHeader;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
@@ -52,19 +53,53 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
+import com.google.protobuf.TextFormat;
 
 public class TestNegotiator {
-  private Negotiator negotiator;
   private DecoderEmbedder<Object> embedder;
+  private SecurityContext secContext;
+  private SSLEngine serverEngine;
 
-  private static final char[] password = "password".toCharArray();
+  private static final char[] KEYSTORE_PASSWORD = "password".toCharArray();
+
+  /**
+   * The cert stored in the keystore, in base64ed DER format.
+   * The real certs we'll get from the server will not be in Base64,
+   * but the CertificateFactory also supports binary DER.
+   */
+  private static final String CA_CERT_DER =
+      "-----BEGIN CERTIFICATE-----\n" +
+      "MIIDXTCCAkWgAwIBAgIJAOOmFHYkBz4rMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNVBAYTAkFVMRMw" +
+      "EQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwHhcN" +
+      "MTYxMTAyMjI0OTQ5WhcNMTcwMjEwMjI0OTQ5WjBFMQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29t" +
+      "ZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0B" +
+      "AQEFAAOCAQ8AMIIBCgKCAQEAppo9GwiDisQVYAF9NXl8ykqo0MIi5rfNwiE9kUWbZ2ejzxs+1Cf7" +
+      "WCn4mzbkJx5ZscRjhnNb6dJxtZJeid/qgiNVBcNzh35H8J+ao0tEbHjCs7rKOX0etsFUp4GQwYkd" +
+      "fpvVBsU8ciXvkxhvt1XjSU3/YJJRAvCyGVxUQlKiVKGCD4OnFNBwMdNw7qI8ryiRv++7I9udfSuM" +
+      "713yMeBtkkV7hWUfxrTgQOLsV/CS+TsSoOJ7JJqHozeZ+VYom85UqSfpIFJVzM6S7BTb6SX/vwYI" +
+      "oS70gubT3HbHgDRcMvpCye1npHL9fL7B87XZn7wnnUem0eeCqWyUjJ82Uj9mQQIDAQABo1AwTjAd" +
+      "BgNVHQ4EFgQUOY7rpWGoZMrmyRZ9RohPWVwyPBowHwYDVR0jBBgwFoAUOY7rpWGoZMrmyRZ9RohP" +
+      "WVwyPBowDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEATKh3io8ruqbhmopY3xQWA2pE" +
+      "hs4ZSu3H+AfULMruVsXKEZjWp27nTsFaxLZYUlzeZr0EcWwZ79qkcA8Dyj+mVHhrCAPpcjsDACh1" +
+      "ZdUQAgASkVS4VQvkukct3DFa3y0lz5VwQIxjoQR5y6dCvxxXT9NpRo/Z7pd4MRhEbz3NT6PScQ9f" +
+      "2MTrR0NOikLdB98JlpKQbEKxzbMhWDw4J3mrmK6zdemjdCcRDsBVPswKnyAjkibXaZkpNRzjvDNA" +
+      "gO88MKlArCYoyRZqIfkcSXAwwTdGQ+5GQLsY9zS49Rrhk9R7eOmDhaHybdRBDqW1JiCSmzURZAxl" +
+      "nrjox4GmC3JJaA==\n" +
+      "-----END CERTIFICATE-----";
 
   @Before
   public void setup() {
-    AccessControlContext context = AccessController.getContext();
-    Subject subject = Subject.getSubject(context);
-    negotiator = new Negotiator(subject, "127.0.0.1");
+    serverEngine = createServerEngine();
+    serverEngine.setUseClientMode(false);
+    secContext = new SecurityContext(Subject.getSubject(
+        AccessController.getContext()));
+  }
+
+  private Negotiator startNegotiation() {
+    Negotiator negotiator = new Negotiator("127.0.0.1", secContext);
     embedder = new DecoderEmbedder<Object>(negotiator);
+    negotiator.sendHello(embedder.getPipeline().getChannel());
+    return negotiator;
   }
 
   static CallResponse fakeResponse(ResponseHeader header, Message body) {
@@ -76,14 +111,14 @@ public class TestNegotiator {
   KeyStore loadTestKeystore() throws Exception {
     KeyStore ks = KeyStore.getInstance("JKS");
     ks.load(TestNegotiator.class.getResourceAsStream("/test-key-and-cert.jks"),
-        password);
+        KEYSTORE_PASSWORD);
     return ks;
   }
 
   SSLEngine createServerEngine() {
     try {
       KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
-      kmf.init(loadTestKeystore(), password);
+      kmf.init(loadTestKeystore(), KEYSTORE_PASSWORD);
       SSLContext ctx = SSLContext.getInstance("TLS");
       ctx.init(kmf.getKeyManagers(), null, null);
       return ctx.createSSLEngine();
@@ -92,6 +127,23 @@ public class TestNegotiator {
     }
   }
 
+  /**
+   * Checks that the client sends a connection context and then yields
+   * a Negotiation.Result to the pipeline.
+   * @return the result
+   */
+  private Result assertComplete() {
+    RpcOutboundMessage msg = (RpcOutboundMessage)embedder.poll();
+    ConnectionContextPB connCtx = (ConnectionContextPB)msg.getBody();
+    assertEquals(Negotiator.CONNECTION_CTX_CALL_ID, msg.getHeaderBuilder().getCallId());
+    assertEquals("java_client", connCtx.getDEPRECATEDUserInfo().getRealUser());
+
+    // Expect the client to also emit a negotiation Result.
+    Result result = (Result)embedder.poll();
+    assertNotNull(result);
+    return result;
+  }
+
   @Test
   public void testChannelBinding() throws Exception {
     KeyStore ks = loadTestKeystore();
@@ -105,7 +157,7 @@ public class TestNegotiator {
    */
   @Test
   public void testNegotiation() {
-    negotiator.sendHello(embedder.getPipeline().getChannel());
+    startNegotiation();
 
     // Expect client->server: NEGOTIATE.
     RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
@@ -139,14 +191,7 @@ public class TestNegotiator {
           .build()));
 
     // Expect client->server: ConnectionContext
-    msg = (RpcOutboundMessage)embedder.poll();
-    ConnectionContextPB connCtx = (ConnectionContextPB)msg.getBody();
-    assertEquals(Negotiator.CONNECTION_CTX_CALL_ID, msg.getHeaderBuilder().getCallId());
-    assertEquals("java_client", connCtx.getDEPRECATEDUserInfo().getRealUser());
-
-    // Expect the client to also emit a negotiation Result.
-    Result result = (Result)embedder.poll();
-    assertNotNull(result);
+    assertComplete();
   }
 
   private static void runTasks(SSLEngineResult result,
@@ -181,7 +226,7 @@ public class TestNegotiator {
         bufs.add(ByteString.copyFrom(dst));
       }
       return fakeResponse(
-          ResponseHeader.newBuilder().setCallId(-33).build(),
+          ResponseHeader.newBuilder().setCallId(Negotiator.SASL_CALL_ID).build(),
           NegotiatePB.newBuilder()
             .setTlsHandshake(ByteString.copyFrom(bufs))
             .setStep(NegotiateStep.TLS_HANDSHAKE)
@@ -194,11 +239,30 @@ public class TestNegotiator {
     }
   }
 
+  /**
+   * Completes the 3-step TLS handshake, assuming that the client is
+   * about to generate the first of the messages.
+   */
+  private void runTlsHandshake() throws SSLException {
+    RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
+    NegotiatePB body = (NegotiatePB) msg.getBody();
+    assertEquals(NegotiateStep.TLS_HANDSHAKE, body.getStep());
+
+    // Consume the ClientHello in our fake server, which should generate ServerHello.
+    embedder.offer(runServerStep(serverEngine, body.getTlsHandshake()));
+
+    // Expect client to generate ClientKeyExchange, ChangeCipherSpec, Finished.
+    msg = (RpcOutboundMessage) embedder.poll();
+    body = (NegotiatePB) msg.getBody();
+    assertEquals(NegotiateStep.TLS_HANDSHAKE, body.getStep());
+
+    // Server consumes the above. Should send the TLS "Finished" message.
+    embedder.offer(runServerStep(serverEngine, body.getTlsHandshake()));
+  }
+
   @Test
   public void testTlsNegotiation() throws Exception {
-    SSLEngine serverEngine = createServerEngine();
-    serverEngine.setUseClientMode(false);
-    negotiator.sendHello(embedder.getPipeline().getChannel());
+    startNegotiation();
 
     // Expect client->server: NEGOTIATE, TLS included.
     RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
@@ -208,7 +272,7 @@ public class TestNegotiator {
 
     // Fake a server response with TLS enabled.
     embedder.offer(fakeResponse(
-        ResponseHeader.newBuilder().setCallId(-33).build(),
+        ResponseHeader.newBuilder().setCallId(Negotiator.SASL_CALL_ID).build(),
         NegotiatePB.newBuilder()
           .addSaslMechanisms(NegotiatePB.SaslMechanism.newBuilder().setMechanism("PLAIN"))
           .addSupportedFeatures(RpcFeatureFlag.TLS)
@@ -216,20 +280,7 @@ public class TestNegotiator {
           .build()));
 
     // Expect client->server: TLS_HANDSHAKE.
-    msg = (RpcOutboundMessage) embedder.poll();
-    body = (NegotiatePB) msg.getBody();
-    assertEquals(NegotiateStep.TLS_HANDSHAKE, body.getStep());
-
-    // Consume the ClientHello in our fake server, which should generate ServerHello.
-    embedder.offer(runServerStep(serverEngine, body.getTlsHandshake()));
-
-    // Expect client to generate ClientKeyExchange, ChangeCipherSpec, Finished.
-    msg = (RpcOutboundMessage) embedder.poll();
-    body = (NegotiatePB) msg.getBody();
-    assertEquals(NegotiateStep.TLS_HANDSHAKE, body.getStep());
-
-    // Server consumes the above. Should send the TLS "Finished" message.
-    embedder.offer(runServerStep(serverEngine, body.getTlsHandshake()));
+    runTlsHandshake();
 
     // The pipeline should now have an SSL handler as the first handler.
     assertTrue(embedder.getPipeline().getFirst() instanceof SslHandler);
@@ -243,4 +294,71 @@ public class TestNegotiator {
     body = (NegotiatePB) msg.getBody();
     assertEquals(NegotiateStep.SASL_INITIATE, body.getStep());
   }
+
+  /**
+   * Test that, if we don't have any trusted certs, we don't expose
+   * token authentication as an option.
+   */
+  @Test
+  public void testNoTokenAuthWhenNoTrustedCerts() throws Exception {
+    secContext.setAuthenticationToken(SignedTokenPB.getDefaultInstance());
+    startNegotiation();
+
+    // Expect client->server: NEGOTIATE, TLS included, Token not included.
+    RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
+    NegotiatePB body = (NegotiatePB) msg.getBody();
+    assertEquals("supported_features: APPLICATION_FEATURE_FLAGS " +
+        "supported_features: TLS " +
+        "step: NEGOTIATE " +
+        "authn_types { sasl { } }", TextFormat.shortDebugString(body));
+  }
+
+  /**
+   * Test that, if we have a trusted CA cert, we expose token authentication
+   * as an option during negotiation, and run it to completion.
+   */
+  @Test
+  public void testTokenAuthWithTrustedCerts() throws Exception {
+    secContext.trustCertificate(ByteString.copyFromUtf8(CA_CERT_DER));
+    secContext.setAuthenticationToken(SignedTokenPB.getDefaultInstance());
+    startNegotiation();
+
+    // Expect client->server: NEGOTIATE, TLS included, Token included.
+    RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
+    NegotiatePB body = (NegotiatePB) msg.getBody();
+    assertEquals("supported_features: APPLICATION_FEATURE_FLAGS " +
+        "supported_features: TLS " +
+        "step: NEGOTIATE " +
+        "authn_types { sasl { } } " +
+        "authn_types { token { } }", TextFormat.shortDebugString(body));
+
+    // Fake a server response with TLS enabled and TOKEN chosen.
+    embedder.offer(fakeResponse(
+        ResponseHeader.newBuilder().setCallId(Negotiator.SASL_CALL_ID).build(),
+        NegotiatePB.newBuilder()
+          .addSupportedFeatures(RpcFeatureFlag.TLS)
+          .addAuthnTypes(AuthenticationTypePB.newBuilder().setToken(
+              AuthenticationTypePB.Token.getDefaultInstance()))
+          .setStep(NegotiateStep.NEGOTIATE)
+          .build()));
+
+    // Expect to now run the TLS handshake
+    runTlsHandshake();
+
+    // Expect the client to send the token.
+    msg = (RpcOutboundMessage) embedder.poll();
+    body = (NegotiatePB) msg.getBody();
+    assertEquals("step: TOKEN_EXCHANGE authn_token { }",
+        TextFormat.shortDebugString(body));
+
+    // Fake a response indicating success.
+    embedder.offer(fakeResponse(
+        ResponseHeader.newBuilder().setCallId(Negotiator.SASL_CALL_ID).build(),
+        NegotiatePB.newBuilder()
+        .setStep(NegotiateStep.TOKEN_EXCHANGE)
+          .build()));
+
+    // Should be complete now.
+    assertComplete();
+  }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/b5734ead/src/kudu/rpc/client_negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/client_negotiation.cc b/src/kudu/rpc/client_negotiation.cc
index a58348c..1f3f648 100644
--- a/src/kudu/rpc/client_negotiation.cc
+++ b/src/kudu/rpc/client_negotiation.cc
@@ -349,6 +349,9 @@ Status ClientNegotiation::HandleNegotiate(const NegotiatePB& response) {
         negotiated_authn_ = AuthenticationType::SASL;
         break;
       case AuthenticationTypePB::kToken:
+        // TODO(todd): we should also be checking tls_context_->has_trusted_cert()
+        // here to match the original logic we used to advertise TOKEN support,
+        // or perhaps just check explicitly whether we advertised TOKEN.
         if (!authn_token_) {
           return Status::RuntimeError(
               "server chose token authentication, but client has no token");


[5/6] kudu git commit: tserver: fix an error message when failing to trust master CA

Posted by to...@apache.org.
tserver: fix an error message when failing to trust master CA

Change-Id: Ic8deebc3cfc82687bc6a4eea94cf6223a484cedf
Reviewed-on: http://gerrit.cloudera.org:8080/6080
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/11ba0ccf
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/11ba0ccf
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/11ba0ccf

Branch: refs/heads/master
Commit: 11ba0ccfa196dd44db1dff1f803275cd2aea3421
Parents: 6ec8312
Author: Todd Lipcon <to...@apache.org>
Authored: Sun Feb 19 17:09:13 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Feb 22 01:46:40 2017 +0000

----------------------------------------------------------------------
 src/kudu/tserver/heartbeater.cc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/11ba0ccf/src/kudu/tserver/heartbeater.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc
index 5a292b4..892cf84 100644
--- a/src/kudu/tserver/heartbeater.cc
+++ b/src/kudu/tserver/heartbeater.cc
@@ -428,7 +428,7 @@ Status Heartbeater::Thread::DoHeartbeat() {
         "failed to parse CA certificate from master");
     RETURN_NOT_OK_PREPEND(
         server_->mutable_tls_context()->AddTrustedCertificate(ca_cert),
-        "failed to adopt master-signed X509 cert");
+        "failed to trust master CA cert");
   }
 
   // If we have a new signed certificate from the master, adopt it.


[2/6] kudu git commit: [security] load/store public TSK in the system table

Posted by to...@apache.org.
[security] load/store public TSK in the system table

Non-expired, already existing TSK records are read from the system
catalog table upon the 'elected-as-leader' callback and fed into the
TokenSigner.  New token signing keys are generated and written
into the system catalog table by a periodic task.

The expired entries are removed from the system table
upon the 'elected-as-leader' callback.

Added integration tests for cluster start/restart and switching
master leadership role into token_signer-itest.cc.

Change-Id: Ie91d4129bda0ca49e81988c28385895a2abcd201
Reviewed-on: http://gerrit.cloudera.org:8080/5935
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c06a3bc6
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c06a3bc6
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c06a3bc6

Branch: refs/heads/master
Commit: c06a3bc66e59e9467b599e85714825347aacf7ec
Parents: 48cc975
Author: Alexey Serbin <as...@cloudera.com>
Authored: Fri Feb 3 21:04:07 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Feb 21 23:00:55 2017 +0000

----------------------------------------------------------------------
 src/kudu/client/client-test.cc                  |   2 +-
 src/kudu/integration-tests/CMakeLists.txt       |   1 +
 src/kudu/integration-tests/registration-test.cc |  20 +-
 .../integration-tests/token_signer-itest.cc     | 207 +++++++++++++++++
 src/kudu/integration-tests/ts_itest-base.h      |  21 +-
 src/kudu/master/catalog_manager.cc              | 222 ++++++++++++++++++-
 src/kudu/master/catalog_manager.h               |  34 +++
 src/kudu/master/master.cc                       |  13 +-
 src/kudu/master/master.proto                    |   8 +
 src/kudu/master/sys_catalog.cc                  |  72 +++++-
 src/kudu/master/sys_catalog.h                   |  29 ++-
 11 files changed, 587 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 6f652c9..2b0f339 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -674,7 +674,7 @@ TEST_F(ClientTest, TestMasterDown) {
   shared_ptr<KuduTable> t;
   client_->data_->default_admin_operation_timeout_ = MonoDelta::FromSeconds(1);
   Status s = client_->OpenTable("other-tablet", &t);
-  ASSERT_TRUE(s.IsNetworkError());
+  ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
 }
 
 TEST_F(ClientTest, TestScan) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index 8c6da5e..5b6a2e8 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -81,6 +81,7 @@ ADD_KUDU_TEST(tablet_copy-itest)
 ADD_KUDU_TEST(tablet_copy_client_session-itest)
 ADD_KUDU_TEST(tablet_history_gc-itest)
 ADD_KUDU_TEST(tablet_replacement-itest)
+ADD_KUDU_TEST(token_signer-itest RESOURCE_LOCK "master-rpc-ports")
 ADD_KUDU_TEST(ts_recovery-itest)
 ADD_KUDU_TEST(ts_tablet_manager-itest)
 ADD_KUDU_TEST(webserver-stress-itest)

http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/integration-tests/registration-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/registration-test.cc b/src/kudu/integration-tests/registration-test.cc
index 0b82ae6..168ce9e 100644
--- a/src/kudu/integration-tests/registration-test.cc
+++ b/src/kudu/integration-tests/registration-test.cc
@@ -97,21 +97,27 @@ class RegistrationTest : public KuduTest {
     }
   }
 
-
-  Status WaitForReplicaCount(const string& tablet_id, int expected_count,
+  Status WaitForReplicaCount(const string& tablet_id,
+                             int expected_count,
                              TabletLocationsPB* locations) {
     while (true) {
-      master::CatalogManager* catalog = cluster_->mini_master()->master()->catalog_manager();
+      master::CatalogManager* catalog =
+          cluster_->mini_master()->master()->catalog_manager();
       Status s;
-      {
+      do {
         master::CatalogManager::ScopedLeaderSharedLock l(catalog);
-        RETURN_NOT_OK(l.first_failed_status());
+        const Status& ls = l.first_failed_status();
+        if (ls.IsServiceUnavailable()) {
+          // ServiceUnavailable means catalog manager is not yet ready
+          // to serve requests -- try again later.
+          break;  // exiting out of the 'do {...} while (false)' scope
+        }
+        RETURN_NOT_OK(ls);
         s = catalog->GetTabletLocations(tablet_id, locations);
-      }
+      } while (false);
       if (s.ok() && locations->replicas_size() == expected_count) {
         return Status::OK();
       }
-
       SleepFor(MonoDelta::FromMilliseconds(1));
     }
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/integration-tests/token_signer-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/token_signer-itest.cc b/src/kudu/integration-tests/token_signer-itest.cc
new file mode 100644
index 0000000..121f4d1
--- /dev/null
+++ b/src/kudu/integration-tests/token_signer-itest.cc
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <string>
+
+#include <gflags/gflags_declare.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/mini_cluster.h"
+#include "kudu/master/catalog_manager.h"
+#include "kudu/master/master.h"
+#include "kudu/master/master.proxy.h"
+#include "kudu/master/master_cert_authority.h"
+#include "kudu/master/mini_master.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/security/token_signer.h"
+#include "kudu/security/token_verifier.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_int64(tsk_validity_seconds);
+DECLARE_int64(tsk_rotation_seconds);
+
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using kudu::security::TokenSigningPublicKeyPB;
+using kudu::security::SignedTokenPB;
+using kudu::security::TokenPB;
+
+namespace kudu {
+namespace master {
+
+class TokenSignerITest : public KuduTest {
+ public:
+  TokenSignerITest() {
+    FLAGS_tsk_validity_seconds = 60;
+    FLAGS_tsk_rotation_seconds = 20;
+
+    // Hard-coded ports for the masters. This is safe, as this unit test
+    // runs under a resource lock (see CMakeLists.txt in this directory).
+    // TODO(aserbin): we should have a generic method to obtain n free ports.
+    opts_.master_rpc_ports = { 11010, 11011, 11012 };
+
+    opts_.num_masters = num_masters_ = opts_.master_rpc_ports.size();
+  }
+
+  void SetUp() override {
+    KuduTest::SetUp();
+    cluster_.reset(new MiniCluster(env_, opts_));
+    ASSERT_OK(cluster_->Start());
+  }
+
+  Status RestartCluster() {
+    cluster_->Shutdown();
+    return cluster_->Start();
+  }
+
+  Status ShutdownLeader() {
+    int leader_idx;
+    RETURN_NOT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+    MiniMaster* leader_master = cluster_->mini_master(leader_idx);
+    leader_master->Shutdown();
+    return Status::OK();
+  }
+
+  // Check the leader is found on the cluster.
+  Status WaitForNewLeader() {
+    return cluster_->GetLeaderMasterIndex(nullptr);
+  }
+
+  static SignedTokenPB MakeToken() {
+    SignedTokenPB ret;
+    TokenPB token;
+    token.set_expire_unix_epoch_seconds(WallTime_Now() + 600);
+    CHECK(token.SerializeToString(ret.mutable_token_data()));
+    return ret;
+  }
+
+  Status SignToken(SignedTokenPB* token) {
+    int leader_idx;
+    RETURN_NOT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+    MiniMaster* leader = cluster_->mini_master(leader_idx);
+    Master* master = leader->master();
+    RETURN_NOT_OK(master->token_signer()->SignToken(token));
+    return Status::OK();
+  }
+
+  Status GetPublicKeys(int idx, vector<TokenSigningPublicKeyPB>* public_keys) {
+    CHECK_GE(idx, 0);
+    CHECK_LT(idx, num_masters_);
+    MiniMaster* mm = cluster_->mini_master(idx);
+    vector<TokenSigningPublicKeyPB> keys =
+        mm->master()->token_signer()->verifier().ExportKeys();
+    public_keys->swap(keys);
+    return Status::OK();
+  }
+
+  Status GetLeaderPublicKeys(vector<TokenSigningPublicKeyPB>* public_keys) {
+    int leader_idx;
+    RETURN_NOT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+    return GetPublicKeys(leader_idx, public_keys);
+  }
+
+ protected:
+  int num_masters_;
+  MiniClusterOptions opts_;
+  unique_ptr<MiniCluster> cluster_;
+};
+
+// Check that once cluster has started, the TSK for signing is available at the
+// leader master and nothing is available at master-followers. The follower
+// masters do not poll the system table for TSK entries nor TSK information
+// is transferred to them from the leader master in any other way.
+TEST_F(TokenSignerITest, TskAtLeaderMaster) {
+  // Check the leader can sign tokens: this guarantees at least one TSK has been
+  // generated and is available for token signing.
+  SignedTokenPB t(MakeToken());
+  ASSERT_OK(SignToken(&t));
+
+  // Get the public part of the signing key from the leader.
+  vector<TokenSigningPublicKeyPB> leader_public_keys;
+  ASSERT_OK(GetLeaderPublicKeys(&leader_public_keys));
+  ASSERT_EQ(1, leader_public_keys.size());
+  EXPECT_EQ(t.signing_key_seq_num(), leader_public_keys[0].key_seq_num());
+}
+
+// New TSK is generated upon start of the leader master and persisted in
+// the system catalog table. So, check that appropriate TSK
+// information is available upon start of the cluster and it's persistent
+// for some time after restart (until the expiration time, actually;
+// the removal of expired TSK info is not covered by this scenario).
+TEST_F(TokenSignerITest, TskClusterRestart) {
+  // Check the leader can sign tokens just after start.
+  SignedTokenPB t_pre(MakeToken());
+  ASSERT_OK(SignToken(&t_pre));
+
+  vector<TokenSigningPublicKeyPB> public_keys_before;
+  ASSERT_OK(GetLeaderPublicKeys(&public_keys_before));
+  ASSERT_EQ(1, public_keys_before.size());
+
+  ASSERT_OK(RestartCluster());
+
+  // Check the leader can sign tokens after the restart.
+  SignedTokenPB t_post(MakeToken());
+  ASSERT_OK(SignToken(&t_post));
+  EXPECT_EQ(t_post.signing_key_seq_num(), t_pre.signing_key_seq_num());
+
+  vector<TokenSigningPublicKeyPB> public_keys_after;
+  ASSERT_OK(GetLeaderPublicKeys(&public_keys_after));
+  ASSERT_EQ(1, public_keys_after.size());
+  EXPECT_EQ(public_keys_before[0].SerializeAsString(),
+            public_keys_after[0].SerializeAsString());
+}
+
+// Test that if leadership changes, the new leader has the same TSK information
+// for token verification as the former leader
+// (it's assumed no new TSK generation happened in between).
+TEST_F(TokenSignerITest, TskMasterLeadershipChange) {
+  SignedTokenPB t_former_leader(MakeToken());
+  ASSERT_OK(SignToken(&t_former_leader));
+
+  vector<TokenSigningPublicKeyPB> public_keys;
+  ASSERT_OK(GetLeaderPublicKeys(&public_keys));
+  ASSERT_EQ(1, public_keys.size());
+
+  ASSERT_OK(ShutdownLeader());
+  ASSERT_OK(WaitForNewLeader());
+
+  // The new leader should use the same signing key.
+  SignedTokenPB t_new_leader(MakeToken());
+  ASSERT_OK(SignToken(&t_new_leader));
+  EXPECT_EQ(t_new_leader.signing_key_seq_num(),
+            t_former_leader.signing_key_seq_num());
+
+  vector<TokenSigningPublicKeyPB> public_keys_new_leader;
+  ASSERT_OK(GetLeaderPublicKeys(&public_keys_new_leader));
+  ASSERT_EQ(1, public_keys_new_leader.size());
+  EXPECT_EQ(public_keys[0].SerializeAsString(),
+            public_keys_new_leader[0].SerializeAsString());
+}
+
+// TODO(aserbin): add a test case which corresponds to multiple signing keys
+// right after cluster start-up.
+
+} // namespace master
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/integration-tests/ts_itest-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_itest-base.h b/src/kudu/integration-tests/ts_itest-base.h
index 844369c..3655977 100644
--- a/src/kudu/integration-tests/ts_itest-base.h
+++ b/src/kudu/integration-tests/ts_itest-base.h
@@ -147,12 +147,23 @@ class TabletServerIntegrationTestBase : public TabletServerTestBase {
       CHECK_OK(cluster_->master_proxy()->GetTableLocations(req, &resp, &controller));
       CHECK_OK(controller.status());
       if (resp.has_error()) {
-        if (resp.error().code() == master::MasterErrorPB::TABLET_NOT_RUNNING) {
-          LOG(WARNING)<< "At least one tablet is not yet running";
-          SleepFor(MonoDelta::FromSeconds(1));
-          continue;
+        switch (resp.error().code()) {
+          case master::MasterErrorPB::TABLET_NOT_RUNNING:
+            LOG(WARNING)<< "At least one tablet is not yet running";
+            break;
+
+          case master::MasterErrorPB::NOT_THE_LEADER:   // fallthrough
+          case master::MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED:
+            LOG(WARNING)<< "CatalogManager is not yet ready to serve requests";
+            break;
+
+          default:
+            FAIL() << "Response had a fatal error: "
+                   << SecureShortDebugString(resp.error());
+            break;  // unreachable
         }
-        FAIL() << "Response had a fatal error: " << SecureShortDebugString(resp.error());
+        SleepFor(MonoDelta::FromSeconds(1));
+        continue;
       }
 
       for (const master::TabletLocationsPB& location : resp.tablet_locations()) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index a2a77e2..59b990f 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -83,6 +83,9 @@
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/security/cert.h"
 #include "kudu/security/crypto.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/security/token_signer.h"
+#include "kudu/security/token_signing_key.h"
 #include "kudu/tserver/tserver_admin.proxy.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/flag_tags.h"
@@ -197,6 +200,7 @@ DEFINE_bool(catalog_manager_delete_orphaned_tablets, false,
 TAG_FLAG(catalog_manager_delete_orphaned_tablets, advanced);
 
 using std::pair;
+using std::set;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
@@ -205,19 +209,24 @@ using std::vector;
 namespace kudu {
 namespace master {
 
-using base::subtle::NoBarrier_Load;
 using base::subtle::NoBarrier_CompareAndSwap;
+using base::subtle::NoBarrier_Load;
 using cfile::TypeEncodingInfo;
-using consensus::kMinimumTerm;
 using consensus::CONSENSUS_CONFIG_COMMITTED;
 using consensus::Consensus;
 using consensus::ConsensusServiceProxy;
 using consensus::ConsensusStatePB;
 using consensus::GetConsensusRole;
-using consensus::OpId;
 using consensus::RaftPeerPB;
 using consensus::StartTabletCopyRequestPB;
+using consensus::kMinimumTerm;
 using rpc::RpcContext;
+using security::Cert;
+using security::DataFormat;
+using security::PrivateKey;
+using security::TokenSigner;
+using security::TokenSigningPrivateKey;
+using security::TokenSigningPrivateKeyPB;
 using strings::Substitute;
 using tablet::TABLET_DATA_DELETED;
 using tablet::TABLET_DATA_TOMBSTONED;
@@ -317,6 +326,48 @@ class TabletLoader : public TabletVisitor {
 };
 
 ////////////////////////////////////////////////////////////
+// TSK (Token Signing Key) Entry Loader
+////////////////////////////////////////////////////////////
+
+class TskEntryLoader : public TskEntryVisitor {
+ public:
+  TskEntryLoader()
+      : entry_expiration_seconds_(WallTime_Now()) {
+  }
+
+  Status Visit(const string& entry_id,
+               const SysTskEntryPB& metadata) override {
+    TokenSigningPrivateKeyPB tsk(metadata.tsk());
+    CHECK(tsk.has_key_seq_num());
+    CHECK(tsk.has_expire_unix_epoch_seconds());
+    CHECK(tsk.has_rsa_key_der());
+
+    // Expired entries are useful as well: they are needed for correct tracking
+    // of TSK sequence numbers.
+    entries_.emplace_back(std::move(tsk));
+    if (tsk.expire_unix_epoch_seconds() <= entry_expiration_seconds_) {
+      expired_entry_ids_.insert(entry_id);
+    }
+    return Status::OK();
+  }
+
+  const vector<TokenSigningPrivateKeyPB>& entries() const {
+    return entries_;
+  }
+
+  const set<string>& expired_entry_ids() const {
+    return expired_entry_ids_;
+  }
+
+ private:
+  const int64_t entry_expiration_seconds_;
+  vector<TokenSigningPrivateKeyPB> entries_;
+  set<string> expired_entry_ids_;
+
+  DISALLOW_COPY_AND_ASSIGN(TskEntryLoader);
+};
+
+////////////////////////////////////////////////////////////
 // Background Tasks
 ////////////////////////////////////////////////////////////
 
@@ -400,9 +451,16 @@ void CatalogManagerBgTasks::Run() {
                        << l.catalog_status().ToString();
         }
       } else if (l.leader_status().ok()) {
-        std::vector<scoped_refptr<TabletInfo>> to_process;
+        // If this is the leader master, check if it's time to generate
+        // and store a new TSK (Token Signing Key).
+        Status s = catalog_manager_->CheckGenerateNewTskUnlocked();
+        if (!s.ok()) {
+          LOG(ERROR) << "Error processing TSK entry (will try next time): "
+                     << s.ToString();
+        }
 
         // Get list of tablets not yet running.
+        std::vector<scoped_refptr<TabletInfo>> to_process;
         catalog_manager_->ExtractTabletsToProcess(&to_process);
 
         if (!to_process.empty()) {
@@ -413,15 +471,14 @@ void CatalogManagerBgTasks::Run() {
             // If there is an error (e.g., we are not the leader) abort this task
             // and wait until we're woken up again.
             //
-            // TODO Add tests for this in the revision that makes
+            // TODO(unknown): Add tests for this in the revision that makes
             // create/alter fault tolerant.
-            LOG(ERROR) << "Error processing pending assignments, aborting the current task: "
-                       << s.ToString();
+            LOG(ERROR) << "Error processing pending assignments, "
+                "aborting the current task: " << s.ToString();
           }
         }
       }
     }
-
     // Wait for a notification or a timeout expiration.
     //  - CreateTable will call Wake() to notify about the tablets to add
     //  - HandleReportedTablet/ProcessPendingAssignments will call WakeIfHasPendingUpdates()
@@ -675,9 +732,54 @@ Status CatalogManager::WaitUntilCaughtUpAsLeader(const MonoDelta& timeout) {
   return Status::OK();
 }
 
+Status CatalogManager::LoadCertAuthorityInfo(unique_ptr<PrivateKey>* key,
+                                             unique_ptr<Cert>* cert) {
+  leader_lock_.AssertAcquiredForWriting();
+
+  SysCertAuthorityEntryPB info;
+  RETURN_NOT_OK(sys_catalog_->GetCertAuthorityEntry(&info));
+
+  unique_ptr<PrivateKey> ca_private_key(new PrivateKey);
+  unique_ptr<Cert> ca_cert(new Cert);
+  RETURN_NOT_OK(ca_private_key->FromString(
+      info.private_key(), DataFormat::DER));
+  RETURN_NOT_OK(ca_cert->FromString(
+      info.certificate(), DataFormat::DER));
+  // Extra sanity check.
+  RETURN_NOT_OK(ca_cert->CheckKeyMatch(*ca_private_key));
+
+  key->swap(ca_private_key);
+  cert->swap(ca_cert);
+
+  return Status::OK();
+}
+
+// Initialize the master's certificate authority component with the specified
+// private key and certificate.
+Status CatalogManager::InitCertAuthority(unique_ptr<PrivateKey> key,
+                                         unique_ptr<Cert> cert) {
+  leader_lock_.AssertAcquiredForWriting();
+  return master_->cert_authority()->Init(std::move(key), std::move(cert));
+}
+
+// Store internal Kudu CA cert authority information into the system table.
+Status CatalogManager::StoreCertAuthorityInfo(const PrivateKey& key,
+                                              const Cert& cert) {
+  leader_lock_.AssertAcquiredForWriting();
+
+  SysCertAuthorityEntryPB info;
+  RETURN_NOT_OK(key.ToString(info.mutable_private_key(), DataFormat::DER));
+  RETURN_NOT_OK(cert.ToString(info.mutable_certificate(), DataFormat::DER));
+  RETURN_NOT_OK(sys_catalog_->AddCertAuthorityEntry(info));
+  LOG(INFO) << "Successfully stored the newly generated cert authority "
+               "information into the system table.";
+
+  return Status::OK();
+}
+
 // Check if CA private key and the certificate were loaded into the
-// memory. If not, generate new ones and store them into the system table.
-// Use the CA information to initialize the certificate authority.
+// memory. If not, generate new ones and store them into the system table
+// and then load into the memory.
 Status CatalogManager::CheckInitCertAuthority() {
   using security::Cert;
   using security::DataFormat;
@@ -777,7 +879,64 @@ void CatalogManager::VisitTablesAndTabletsTask() {
     LOG(INFO) << "Loading CA info into memory...";
     LOG_SLOW_EXECUTION(WARNING, 1000, LogPrefix() +
                        "Loading CA info into memory") {
-      CHECK_OK(CheckInitCertAuthority());
+      unique_ptr<PrivateKey> key;
+      unique_ptr<Cert> cert;
+      const Status& s = LoadCertAuthorityInfo(&key, &cert);
+      if (s.ok()) {
+        // Once succesfully loaded, the CA information is supposed to be valid:
+        // the leader master should not be run without CA certificate.
+        CHECK_OK(InitCertAuthority(std::move(key), std::move(cert)));
+      } else if (s.IsNotFound()) {
+        LOG(WARNING) << "Did not find CA certificate and key for Kudu IPKI, "
+                        "will generate new ones";
+        // No CA information record has been found in the table -- generate
+        // a new one. The subtlety here is that first it's necessary to store
+        // the newly generated information into the system table and only after
+        // that initialize master certificate authority. The reason is:
+        // if the master server loses its leadership role by that time, there
+        // will be an error on an attempt to write into the system table.
+        // If that happens, skip the rest of the sequence: when this callback
+        // is invoked next time, the system table should already contain
+        // CA certificate information written by other master.
+        unique_ptr<PrivateKey> private_key(new PrivateKey);
+        unique_ptr<Cert> cert(new Cert);
+
+        // Generate new private key and corresponding CA certificate.
+        CHECK_OK(master_->cert_authority()->Generate(private_key.get(),
+                                                     cert.get()));
+        // It the leadership role is lost at this moment, writing into the
+        // system table will fail.
+        const Status& s = StoreCertAuthorityInfo(*private_key, *cert);
+        if (s.ok()) {
+          // The leader master should not run without CA certificate.
+          CHECK_OK(InitCertAuthority(std::move(private_key), std::move(cert)));
+        } else {
+          LOG(WARNING) << "Failed to write newly generated CA information into "
+                          "the system table, assuming change of leadership: "
+                       << s.ToString();
+        }
+      } else {
+        CHECK_OK(s);
+      }
+    }
+
+    LOG(INFO) << "Loading token signing keys...";
+    LOG_SLOW_EXECUTION(WARNING, 1000, LogPrefix() +
+                       "Loading token signing keys...") {
+      set<string> expired_tsk_entry_ids;
+      CHECK_OK(LoadTskEntries(&expired_tsk_entry_ids));
+      Status s = CheckGenerateNewTskUnlocked();
+      if (!s.ok()) {
+        LOG(WARNING) << "Failed to generate and persist new TSK, "
+                        "assuming change of leadership: " << s.ToString();
+        return;
+      }
+      s = DeleteTskEntries(expired_tsk_entry_ids);
+      if (!s.ok()) {
+        LOG(WARNING) << "Failed to purge expired TSK entries from system table, "
+                        "assuming change of leadership: " << s.ToString();
+        return;
+      }
     }
   }
 
@@ -3177,6 +3336,47 @@ void CatalogManager::ExtractTabletsToProcess(
   }
 }
 
+// Check if it's time to roll TokenSigner's key. There's a bit of subtlety here:
+// we shouldn't start exporting a key until it is properly persisted.
+// So, the protocol is:
+//   1) Generate a new TSK.
+//   2) Try to write it to the system table.
+//   3) Pass it back to the TokenSigner on success.
+//   4) Check and switch TokenSigner to the new key if it's time to do so.
+Status CatalogManager::CheckGenerateNewTskUnlocked() {
+  TokenSigner* signer = master_->token_signer();
+  unique_ptr<security::TokenSigningPrivateKey> tsk;
+  RETURN_NOT_OK(signer->CheckNeedKey(&tsk));
+  if (tsk) {
+    // First save the new TSK into the system table.
+    TokenSigningPrivateKeyPB tsk_pb;
+    tsk->ExportPB(&tsk_pb);
+    SysTskEntryPB sys_entry;
+    sys_entry.mutable_tsk()->Swap(&tsk_pb);
+    RETURN_NOT_OK(sys_catalog_->AddTskEntry(sys_entry));
+    LOG(INFO) << "Saved newly generated TSK " << tsk->key_seq_num()
+              << " into the system table.";
+    // Then add the new TSK into the signer.
+    RETURN_NOT_OK(signer->AddKey(std::move(tsk)));
+  }
+  return signer->TryRotateKey();
+}
+
+Status CatalogManager::LoadTskEntries(set<string>* expired_entry_ids) {
+  TskEntryLoader loader;
+  RETURN_NOT_OK(sys_catalog_->VisitTskEntries(&loader));
+  if (expired_entry_ids) {
+    set<string> ref(loader.expired_entry_ids());
+    expired_entry_ids->swap(ref);
+  }
+  return master_->token_signer()->ImportKeys(loader.entries());
+}
+
+Status CatalogManager::DeleteTskEntries(const set<string>& entry_ids) {
+  leader_lock_.AssertAcquiredForWriting();
+  return sys_catalog_->RemoveTskEntries(entry_ids);
+}
+
 struct DeferredAssignmentActions {
   vector<TabletInfo*> tablets_to_add;
   vector<TabletInfo*> tablets_to_update;

http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 65614d6..cf83363 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -59,6 +59,13 @@ namespace rpc {
 class RpcContext;
 } // namespace rpc
 
+namespace security {
+class Cert;
+class PrivateKey;
+class TokenSigner;
+class TokenSigningPrivateKey;
+} // namespace security
+
 namespace master {
 
 class CatalogManagerBgTasks;
@@ -569,6 +576,21 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   // This method is thread-safe.
   Status InitSysCatalogAsync(bool is_first_run);
 
+  // Load the internal Kudu certficate authority information from the system
+  // table: the private key and the certificate. If the CA info entry is not
+  // found in the table, return Status::NotFound.
+  Status LoadCertAuthorityInfo(std::unique_ptr<security::PrivateKey>* key,
+                               std::unique_ptr<security::Cert>* cert);
+
+  // Initialize master's certificate authority with the specified private key
+  // and certificate.
+  Status InitCertAuthority(std::unique_ptr<security::PrivateKey> key,
+                           std::unique_ptr<security::Cert> cert);
+
+  // Store CA certificate information into the system table.
+  Status StoreCertAuthorityInfo(const security::PrivateKey& key,
+                                const security::Cert& cert);
+
   // Load existing root CA information from the system table, if present.
   // If not, generate new ones, store them into the system table.
   // After that, use the CA information to initialize the certificate authority.
@@ -610,6 +632,18 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   // Extract the set of tablets that must be processed because not running yet.
   void ExtractTabletsToProcess(std::vector<scoped_refptr<TabletInfo>>* tablets_to_process);
 
+  // Check if it's time to generate new Token Signing Key for TokenSigner.
+  // If so, generate one and persist it into the system table.
+  Status CheckGenerateNewTskUnlocked();
+
+  // Load non-expired TSK entries from the system table.
+  // Once done, initialize TokenSigner with the loaded entries.
+  Status LoadTskEntries(std::set<std::string>* expired_entry_ids);
+
+  // Delete TSK entries with the specified entry identifiers
+  // (identifiers correspond to the 'entry_id' column).
+  Status DeleteTskEntries(const std::set<std::string>& entry_ids);
+
   Status ApplyAlterSchemaSteps(const SysTablesEntryPB& current_pb,
                                std::vector<AlterTableRequestPB::Step> steps,
                                Schema* new_schema,

http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/master/master.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index dfe28a4..0f04c22 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -18,7 +18,6 @@
 #include "kudu/master/master.h"
 
 #include <algorithm>
-#include <list>
 #include <memory>
 #include <vector>
 
@@ -119,19 +118,9 @@ Status Master::Init() {
   // becomes a leader.
   cert_authority_.reset(new MasterCertAuthority(fs_manager_->uuid()));
 
+  // The TokenSigner loads its keys during catalog manager initialization.
   token_signer_.reset(new TokenSigner(FLAGS_tsk_validity_seconds,
                                       FLAGS_tsk_rotation_seconds));
-  // TODO(PKI): this also will need to be wired together with CatalogManager
-  // soon, including initializing the token manager with the proper next
-  // sequence number.
-  {
-    unique_ptr<TokenSigningPrivateKey> key;
-    RETURN_NOT_OK(token_signer_->CheckNeedKey(&key));
-    if (key) {
-      RETURN_NOT_OK(token_signer_->AddKey(std::move(key)));
-    }
-  }
-
   state_ = kInitialized;
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/master/master.proto
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 02ddd1d..37deba4 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -186,6 +186,14 @@ message SysCertAuthorityEntryPB {
   required bytes certificate = 2 [(kudu.REDACT) = true];
 }
 
+// The on-disk entry in the sys.catalog table ("metadata" column) to represent
+// a Token Signing Key (TSK) object. Multiple entries of this type
+// can simultaneously co-exist in the sys.catalog table.
+message SysTskEntryPB {
+  // TokenSigningPrivateKeyPB message representing a TSK.
+  required security.TokenSigningPrivateKeyPB tsk = 1;
+}
+
 ////////////////////////////////////////////////////////////
 // RPCs
 ////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 5d8181a..1d698ff 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 #include <functional>
+#include <iomanip>
 #include <iterator>
 #include <memory>
 #include <set>
@@ -27,6 +28,7 @@
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 
+#include "kudu/common/key_encoder.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/partition.h"
 #include "kudu/common/row_operations.h"
@@ -45,6 +47,7 @@
 #include "kudu/master/master.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/rpc/rpc_context.h"
+#include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_bootstrap.h"
 #include "kudu/tablet/transactions/write_transaction.h"
@@ -66,7 +69,6 @@ using kudu::consensus::ConsensusStatePB;
 using kudu::consensus::RaftConfigPB;
 using kudu::consensus::RaftPeerPB;
 using kudu::log::Log;
-using kudu::log::LogAnchorRegistry;
 using kudu::tablet::LatchTransactionCompletionCallback;
 using kudu::tablet::Tablet;
 using kudu::tablet::TabletPeer;
@@ -472,6 +474,14 @@ void SysCatalogTable::ReqDeleteTable(WriteRequestPB* req, const TableInfo* table
   enc.Add(RowOperationsPB::DELETE, row);
 }
 
+// Convert the sequence number into string padded with zeroes in the
+// beginning -- that's the key for the new TSK record.
+string SysCatalogTable::TskSeqNumberToEntryId(int64_t seq_number) {
+  string entry_id;
+  KeyEncoderTraits<DataType::INT64, string>::Encode(seq_number, &entry_id);
+  return entry_id;
+}
+
 Status SysCatalogTable::VisitTables(TableVisitor* visitor) {
   TRACE_EVENT0("master", "SysCatalogTable::VisitTables");
   auto processor = [&](
@@ -508,8 +518,9 @@ Status SysCatalogTable::ProcessRows(
       << "cannot find sys catalog table column " << kSysCatalogTableColType
       << " in schema: " << schema_.ToString();
 
-  const int8_t et = entry_type;
-  auto pred = ColumnPredicate::Equality(schema_.column(type_col_idx), &et);
+  static const int8_t kEntryType = entry_type;
+  auto pred = ColumnPredicate::Equality(schema_.column(type_col_idx),
+                                        &kEntryType);
   ScanSpec spec;
   spec.AddPredicate(pred);
 
@@ -521,7 +532,8 @@ Status SysCatalogTable::ProcessRows(
   RowBlock block(iter->schema(), 512, &arena);
   while (iter->HasNext()) {
     RETURN_NOT_OK(iter->NextBlock(&block));
-    for (size_t i = 0; i < block.nrows(); ++i) {
+    const size_t nrows = block.nrows();
+    for (size_t i = 0; i < nrows; ++i) {
       if (!block.selection_vector()->IsRowSelected(i)) {
         continue;
       }
@@ -534,6 +546,16 @@ Status SysCatalogTable::ProcessRows(
   return Status::OK();
 }
 
+Status SysCatalogTable::VisitTskEntries(TskEntryVisitor* visitor) {
+  TRACE_EVENT0("master", "SysCatalogTable::VisitTskEntries");
+  auto processor = [&](
+      const string& entry_id,
+      const SysTskEntryPB& entry_data) {
+    return visitor->Visit(entry_id, entry_data);
+  };
+  return ProcessRows<SysTskEntryPB, TSK_ENTRY>(processor);
+}
+
 Status SysCatalogTable::GetCertAuthorityEntry(SysCertAuthorityEntryPB* entry) {
   CHECK(entry);
   vector<SysCertAuthorityEntryPB> entries;
@@ -577,6 +599,48 @@ Status SysCatalogTable::AddCertAuthorityEntry(
   return Status::OK();
 }
 
+Status SysCatalogTable::AddTskEntry(const SysTskEntryPB& entry) {
+  WriteRequestPB req;
+  WriteResponsePB resp;
+
+  req.set_tablet_id(kSysCatalogTabletId);
+  CHECK_OK(SchemaToPB(schema_, req.mutable_schema()));
+
+  CHECK(entry.tsk().has_key_seq_num());
+  CHECK(entry.tsk().has_expire_unix_epoch_seconds());
+  CHECK(entry.tsk().has_rsa_key_der());
+
+  faststring metadata_buf;
+  pb_util::SerializeToString(entry, &metadata_buf);
+
+  KuduPartialRow row(&schema_);
+  CHECK_OK(row.SetInt8(kSysCatalogTableColType, TSK_ENTRY));
+  CHECK_OK(row.SetString(kSysCatalogTableColId,
+                         TskSeqNumberToEntryId(entry.tsk().key_seq_num())));
+  CHECK_OK(row.SetString(kSysCatalogTableColMetadata, metadata_buf));
+  RowOperationsPBEncoder enc(req.mutable_row_operations());
+  enc.Add(RowOperationsPB::INSERT, row);
+
+  return SyncWrite(&req, &resp);
+}
+
+Status SysCatalogTable::RemoveTskEntries(const set<string>& entry_ids) {
+  WriteRequestPB req;
+  WriteResponsePB resp;
+
+  req.set_tablet_id(kSysCatalogTabletId);
+  CHECK_OK(SchemaToPB(schema_, req.mutable_schema()));
+  for (const auto& id : entry_ids) {
+    KuduPartialRow row(&schema_);
+    CHECK_OK(row.SetInt8(kSysCatalogTableColType, TSK_ENTRY));
+    CHECK_OK(row.SetString(kSysCatalogTableColId, id));
+    RowOperationsPBEncoder enc(req.mutable_row_operations());
+    enc.Add(RowOperationsPB::DELETE, row);
+  }
+
+  return SyncWrite(&req, &resp);
+}
+
 // ==================================================================
 // Tablet related methods
 // ==================================================================

http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/master/sys_catalog.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.h b/src/kudu/master/sys_catalog.h
index 4620f9d..11e92cd 100644
--- a/src/kudu/master/sys_catalog.h
+++ b/src/kudu/master/sys_catalog.h
@@ -18,6 +18,7 @@
 #define KUDU_MASTER_SYS_CATALOG_H_
 
 #include <functional>
+#include <set>
 #include <string>
 #include <vector>
 
@@ -58,12 +59,23 @@ class TabletVisitor {
                              const SysTabletsEntryPB& metadata) = 0;
 };
 
+// Visitor for TSK-related (Token Signing Key) entries. Actually, only the
+// public part of those are stored in the system catalog table. That information
+// is preserved to allow any master to verify token which might be signed
+// by current or former master leader.
+class TskEntryVisitor {
+ public:
+  virtual Status Visit(const std::string& entry_id,
+                       const SysTskEntryPB& metadata) = 0;
+};
+
 // SysCatalogTable is a Kudu table that keeps track of the following
 // system information:
 //   * table metadata
 //   * tablet metadata
-//   * root CA (certificate authority) certificates
-//   * corresponding CA private keys
+//   * root CA (certificate authority) certificate of the Kudu IPKI
+//   * Kudu IPKI root CA cert's private key
+//   * TSK (Token Signing Key) entries
 //
 // The essential properties of the SysCatalogTable are:
 //   * SysCatalogTable has only one tablet.
@@ -84,6 +96,7 @@ class SysCatalogTable {
     TABLES_ENTRY = 1,
     TABLETS_ENTRY = 2,
     CERT_AUTHORITY_INFO = 3,  // Kudu's root certificate authority entry.
+    TSK_ENTRY = 4,            // Token Signing Key entry.
   };
 
   // 'leader_cb_' is invoked whenever this node is elected as a leader
@@ -128,6 +141,9 @@ class SysCatalogTable {
   // Scan of the tablet-related entries.
   Status VisitTablets(TabletVisitor* visitor);
 
+  // Scan for TSK-related entries in the system table.
+  Status VisitTskEntries(TskEntryVisitor* visitor);
+
   // Retrive the CA entry (private key and certificate) from the system table.
   Status GetCertAuthorityEntry(SysCertAuthorityEntryPB* entry);
 
@@ -135,6 +151,13 @@ class SysCatalogTable {
   // There should be no more than one CA entry in the system table.
   Status AddCertAuthorityEntry(const SysCertAuthorityEntryPB& entry);
 
+  // Add TSK (Token Signing Key) entry into the system table.
+  Status AddTskEntry(const SysTskEntryPB& entry);
+
+  // Remove TSK (Token Signing Key) entries with the specified entry identifiers
+  // (as in 'entry_id' column) from the system table.
+  Status RemoveTskEntries(const std::set<std::string>& entry_ids);
+
  private:
   FRIEND_TEST(MasterTest, TestMasterMetadataConsistentDespiteFailures);
   DISALLOW_COPY_AND_ASSIGN(SysCatalogTable);
@@ -209,6 +232,8 @@ class SysCatalogTable {
   void ReqDeleteTablets(tserver::WriteRequestPB* req,
                         const std::vector<TabletInfo*>& tablets);
 
+  static string TskSeqNumberToEntryId(int64_t seq_number);
+
   // Special string injected into SyncWrite() random failures (if enabled).
   //
   // Only useful for tests.