You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/02/22 02:00:47 UTC

[2/6] kudu git commit: [security] load/store public TSK in the system table

[security] load/store public TSK in the system table

Non-expired, already existing TSK records are read from the system
catalog table upon the 'elected-as-leader' callback and fed into the
TokenSigner.  New token signing keys are generated and written
into the system catalog table by a periodic task.

The expired entries are removed from the system table
upon the 'elected-as-leader' callback.

Added integration tests for cluster start/restart and switching
master leadership role into token_signer-itest.cc.

Change-Id: Ie91d4129bda0ca49e81988c28385895a2abcd201
Reviewed-on: http://gerrit.cloudera.org:8080/5935
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c06a3bc6
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c06a3bc6
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c06a3bc6

Branch: refs/heads/master
Commit: c06a3bc66e59e9467b599e85714825347aacf7ec
Parents: 48cc975
Author: Alexey Serbin <as...@cloudera.com>
Authored: Fri Feb 3 21:04:07 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Feb 21 23:00:55 2017 +0000

----------------------------------------------------------------------
 src/kudu/client/client-test.cc                  |   2 +-
 src/kudu/integration-tests/CMakeLists.txt       |   1 +
 src/kudu/integration-tests/registration-test.cc |  20 +-
 .../integration-tests/token_signer-itest.cc     | 207 +++++++++++++++++
 src/kudu/integration-tests/ts_itest-base.h      |  21 +-
 src/kudu/master/catalog_manager.cc              | 222 ++++++++++++++++++-
 src/kudu/master/catalog_manager.h               |  34 +++
 src/kudu/master/master.cc                       |  13 +-
 src/kudu/master/master.proto                    |   8 +
 src/kudu/master/sys_catalog.cc                  |  72 +++++-
 src/kudu/master/sys_catalog.h                   |  29 ++-
 11 files changed, 587 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 6f652c9..2b0f339 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -674,7 +674,7 @@ TEST_F(ClientTest, TestMasterDown) {
   shared_ptr<KuduTable> t;
   client_->data_->default_admin_operation_timeout_ = MonoDelta::FromSeconds(1);
   Status s = client_->OpenTable("other-tablet", &t);
-  ASSERT_TRUE(s.IsNetworkError());
+  ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
 }
 
 TEST_F(ClientTest, TestScan) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index 8c6da5e..5b6a2e8 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -81,6 +81,7 @@ ADD_KUDU_TEST(tablet_copy-itest)
 ADD_KUDU_TEST(tablet_copy_client_session-itest)
 ADD_KUDU_TEST(tablet_history_gc-itest)
 ADD_KUDU_TEST(tablet_replacement-itest)
+ADD_KUDU_TEST(token_signer-itest RESOURCE_LOCK "master-rpc-ports")
 ADD_KUDU_TEST(ts_recovery-itest)
 ADD_KUDU_TEST(ts_tablet_manager-itest)
 ADD_KUDU_TEST(webserver-stress-itest)

http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/integration-tests/registration-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/registration-test.cc b/src/kudu/integration-tests/registration-test.cc
index 0b82ae6..168ce9e 100644
--- a/src/kudu/integration-tests/registration-test.cc
+++ b/src/kudu/integration-tests/registration-test.cc
@@ -97,21 +97,27 @@ class RegistrationTest : public KuduTest {
     }
   }
 
-
-  Status WaitForReplicaCount(const string& tablet_id, int expected_count,
+  Status WaitForReplicaCount(const string& tablet_id,
+                             int expected_count,
                              TabletLocationsPB* locations) {
     while (true) {
-      master::CatalogManager* catalog = cluster_->mini_master()->master()->catalog_manager();
+      master::CatalogManager* catalog =
+          cluster_->mini_master()->master()->catalog_manager();
       Status s;
-      {
+      do {
         master::CatalogManager::ScopedLeaderSharedLock l(catalog);
-        RETURN_NOT_OK(l.first_failed_status());
+        const Status& ls = l.first_failed_status();
+        if (ls.IsServiceUnavailable()) {
+          // ServiceUnavailable means catalog manager is not yet ready
+          // to serve requests -- try again later.
+          break;  // exiting out of the 'do {...} while (false)' scope
+        }
+        RETURN_NOT_OK(ls);
         s = catalog->GetTabletLocations(tablet_id, locations);
-      }
+      } while (false);
       if (s.ok() && locations->replicas_size() == expected_count) {
         return Status::OK();
       }
-
       SleepFor(MonoDelta::FromMilliseconds(1));
     }
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/integration-tests/token_signer-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/token_signer-itest.cc b/src/kudu/integration-tests/token_signer-itest.cc
new file mode 100644
index 0000000..121f4d1
--- /dev/null
+++ b/src/kudu/integration-tests/token_signer-itest.cc
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <string>
+
+#include <gflags/gflags_declare.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/mini_cluster.h"
+#include "kudu/master/catalog_manager.h"
+#include "kudu/master/master.h"
+#include "kudu/master/master.proxy.h"
+#include "kudu/master/master_cert_authority.h"
+#include "kudu/master/mini_master.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/security/token_signer.h"
+#include "kudu/security/token_verifier.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_int64(tsk_validity_seconds);
+DECLARE_int64(tsk_rotation_seconds);
+
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using kudu::security::TokenSigningPublicKeyPB;
+using kudu::security::SignedTokenPB;
+using kudu::security::TokenPB;
+
+namespace kudu {
+namespace master {
+
+class TokenSignerITest : public KuduTest {
+ public:
+  TokenSignerITest() {
+    FLAGS_tsk_validity_seconds = 60;
+    FLAGS_tsk_rotation_seconds = 20;
+
+    // Hard-coded ports for the masters. This is safe, as this unit test
+    // runs under a resource lock (see CMakeLists.txt in this directory).
+    // TODO(aserbin): we should have a generic method to obtain n free ports.
+    opts_.master_rpc_ports = { 11010, 11011, 11012 };
+
+    opts_.num_masters = num_masters_ = opts_.master_rpc_ports.size();
+  }
+
+  void SetUp() override {
+    KuduTest::SetUp();
+    cluster_.reset(new MiniCluster(env_, opts_));
+    ASSERT_OK(cluster_->Start());
+  }
+
+  Status RestartCluster() {
+    cluster_->Shutdown();
+    return cluster_->Start();
+  }
+
+  Status ShutdownLeader() {
+    int leader_idx;
+    RETURN_NOT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+    MiniMaster* leader_master = cluster_->mini_master(leader_idx);
+    leader_master->Shutdown();
+    return Status::OK();
+  }
+
+  // Check the leader is found on the cluster.
+  Status WaitForNewLeader() {
+    return cluster_->GetLeaderMasterIndex(nullptr);
+  }
+
+  static SignedTokenPB MakeToken() {
+    SignedTokenPB ret;
+    TokenPB token;
+    token.set_expire_unix_epoch_seconds(WallTime_Now() + 600);
+    CHECK(token.SerializeToString(ret.mutable_token_data()));
+    return ret;
+  }
+
+  Status SignToken(SignedTokenPB* token) {
+    int leader_idx;
+    RETURN_NOT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+    MiniMaster* leader = cluster_->mini_master(leader_idx);
+    Master* master = leader->master();
+    RETURN_NOT_OK(master->token_signer()->SignToken(token));
+    return Status::OK();
+  }
+
+  Status GetPublicKeys(int idx, vector<TokenSigningPublicKeyPB>* public_keys) {
+    CHECK_GE(idx, 0);
+    CHECK_LT(idx, num_masters_);
+    MiniMaster* mm = cluster_->mini_master(idx);
+    vector<TokenSigningPublicKeyPB> keys =
+        mm->master()->token_signer()->verifier().ExportKeys();
+    public_keys->swap(keys);
+    return Status::OK();
+  }
+
+  Status GetLeaderPublicKeys(vector<TokenSigningPublicKeyPB>* public_keys) {
+    int leader_idx;
+    RETURN_NOT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+    return GetPublicKeys(leader_idx, public_keys);
+  }
+
+ protected:
+  int num_masters_;
+  MiniClusterOptions opts_;
+  unique_ptr<MiniCluster> cluster_;
+};
+
+// Check that once cluster has started, the TSK for signing is available at the
+// leader master and nothing is available at master-followers. The follower
+// masters do not poll the system table for TSK entries nor TSK information
+// is transferred to them from the leader master in any other way.
+TEST_F(TokenSignerITest, TskAtLeaderMaster) {
+  // Check the leader can sign tokens: this guarantees at least one TSK has been
+  // generated and is available for token signing.
+  SignedTokenPB t(MakeToken());
+  ASSERT_OK(SignToken(&t));
+
+  // Get the public part of the signing key from the leader.
+  vector<TokenSigningPublicKeyPB> leader_public_keys;
+  ASSERT_OK(GetLeaderPublicKeys(&leader_public_keys));
+  ASSERT_EQ(1, leader_public_keys.size());
+  EXPECT_EQ(t.signing_key_seq_num(), leader_public_keys[0].key_seq_num());
+}
+
+// New TSK is generated upon start of the leader master and persisted in
+// the system catalog table. So, check that appropriate TSK
+// information is available upon start of the cluster and it's persistent
+// for some time after restart (until the expiration time, actually;
+// the removal of expired TSK info is not covered by this scenario).
+TEST_F(TokenSignerITest, TskClusterRestart) {
+  // Check the leader can sign tokens just after start.
+  SignedTokenPB t_pre(MakeToken());
+  ASSERT_OK(SignToken(&t_pre));
+
+  vector<TokenSigningPublicKeyPB> public_keys_before;
+  ASSERT_OK(GetLeaderPublicKeys(&public_keys_before));
+  ASSERT_EQ(1, public_keys_before.size());
+
+  ASSERT_OK(RestartCluster());
+
+  // Check the leader can sign tokens after the restart.
+  SignedTokenPB t_post(MakeToken());
+  ASSERT_OK(SignToken(&t_post));
+  EXPECT_EQ(t_post.signing_key_seq_num(), t_pre.signing_key_seq_num());
+
+  vector<TokenSigningPublicKeyPB> public_keys_after;
+  ASSERT_OK(GetLeaderPublicKeys(&public_keys_after));
+  ASSERT_EQ(1, public_keys_after.size());
+  EXPECT_EQ(public_keys_before[0].SerializeAsString(),
+            public_keys_after[0].SerializeAsString());
+}
+
+// Test that if leadership changes, the new leader has the same TSK information
+// for token verification as the former leader
+// (it's assumed no new TSK generation happened in between).
+TEST_F(TokenSignerITest, TskMasterLeadershipChange) {
+  SignedTokenPB t_former_leader(MakeToken());
+  ASSERT_OK(SignToken(&t_former_leader));
+
+  vector<TokenSigningPublicKeyPB> public_keys;
+  ASSERT_OK(GetLeaderPublicKeys(&public_keys));
+  ASSERT_EQ(1, public_keys.size());
+
+  ASSERT_OK(ShutdownLeader());
+  ASSERT_OK(WaitForNewLeader());
+
+  // The new leader should use the same signing key.
+  SignedTokenPB t_new_leader(MakeToken());
+  ASSERT_OK(SignToken(&t_new_leader));
+  EXPECT_EQ(t_new_leader.signing_key_seq_num(),
+            t_former_leader.signing_key_seq_num());
+
+  vector<TokenSigningPublicKeyPB> public_keys_new_leader;
+  ASSERT_OK(GetLeaderPublicKeys(&public_keys_new_leader));
+  ASSERT_EQ(1, public_keys_new_leader.size());
+  EXPECT_EQ(public_keys[0].SerializeAsString(),
+            public_keys_new_leader[0].SerializeAsString());
+}
+
+// TODO(aserbin): add a test case which corresponds to multiple signing keys
+// right after cluster start-up.
+
+} // namespace master
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/integration-tests/ts_itest-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_itest-base.h b/src/kudu/integration-tests/ts_itest-base.h
index 844369c..3655977 100644
--- a/src/kudu/integration-tests/ts_itest-base.h
+++ b/src/kudu/integration-tests/ts_itest-base.h
@@ -147,12 +147,23 @@ class TabletServerIntegrationTestBase : public TabletServerTestBase {
       CHECK_OK(cluster_->master_proxy()->GetTableLocations(req, &resp, &controller));
       CHECK_OK(controller.status());
       if (resp.has_error()) {
-        if (resp.error().code() == master::MasterErrorPB::TABLET_NOT_RUNNING) {
-          LOG(WARNING)<< "At least one tablet is not yet running";
-          SleepFor(MonoDelta::FromSeconds(1));
-          continue;
+        switch (resp.error().code()) {
+          case master::MasterErrorPB::TABLET_NOT_RUNNING:
+            LOG(WARNING)<< "At least one tablet is not yet running";
+            break;
+
+          case master::MasterErrorPB::NOT_THE_LEADER:   // fallthrough
+          case master::MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED:
+            LOG(WARNING)<< "CatalogManager is not yet ready to serve requests";
+            break;
+
+          default:
+            FAIL() << "Response had a fatal error: "
+                   << SecureShortDebugString(resp.error());
+            break;  // unreachable
         }
-        FAIL() << "Response had a fatal error: " << SecureShortDebugString(resp.error());
+        SleepFor(MonoDelta::FromSeconds(1));
+        continue;
       }
 
       for (const master::TabletLocationsPB& location : resp.tablet_locations()) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index a2a77e2..59b990f 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -83,6 +83,9 @@
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/security/cert.h"
 #include "kudu/security/crypto.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/security/token_signer.h"
+#include "kudu/security/token_signing_key.h"
 #include "kudu/tserver/tserver_admin.proxy.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/flag_tags.h"
@@ -197,6 +200,7 @@ DEFINE_bool(catalog_manager_delete_orphaned_tablets, false,
 TAG_FLAG(catalog_manager_delete_orphaned_tablets, advanced);
 
 using std::pair;
+using std::set;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
@@ -205,19 +209,24 @@ using std::vector;
 namespace kudu {
 namespace master {
 
-using base::subtle::NoBarrier_Load;
 using base::subtle::NoBarrier_CompareAndSwap;
+using base::subtle::NoBarrier_Load;
 using cfile::TypeEncodingInfo;
-using consensus::kMinimumTerm;
 using consensus::CONSENSUS_CONFIG_COMMITTED;
 using consensus::Consensus;
 using consensus::ConsensusServiceProxy;
 using consensus::ConsensusStatePB;
 using consensus::GetConsensusRole;
-using consensus::OpId;
 using consensus::RaftPeerPB;
 using consensus::StartTabletCopyRequestPB;
+using consensus::kMinimumTerm;
 using rpc::RpcContext;
+using security::Cert;
+using security::DataFormat;
+using security::PrivateKey;
+using security::TokenSigner;
+using security::TokenSigningPrivateKey;
+using security::TokenSigningPrivateKeyPB;
 using strings::Substitute;
 using tablet::TABLET_DATA_DELETED;
 using tablet::TABLET_DATA_TOMBSTONED;
@@ -317,6 +326,48 @@ class TabletLoader : public TabletVisitor {
 };
 
 ////////////////////////////////////////////////////////////
+// TSK (Token Signing Key) Entry Loader
+////////////////////////////////////////////////////////////
+
+class TskEntryLoader : public TskEntryVisitor {
+ public:
+  TskEntryLoader()
+      : entry_expiration_seconds_(WallTime_Now()) {
+  }
+
+  Status Visit(const string& entry_id,
+               const SysTskEntryPB& metadata) override {
+    TokenSigningPrivateKeyPB tsk(metadata.tsk());
+    CHECK(tsk.has_key_seq_num());
+    CHECK(tsk.has_expire_unix_epoch_seconds());
+    CHECK(tsk.has_rsa_key_der());
+
+    // Expired entries are useful as well: they are needed for correct tracking
+    // of TSK sequence numbers.
+    entries_.emplace_back(std::move(tsk));
+    if (tsk.expire_unix_epoch_seconds() <= entry_expiration_seconds_) {
+      expired_entry_ids_.insert(entry_id);
+    }
+    return Status::OK();
+  }
+
+  const vector<TokenSigningPrivateKeyPB>& entries() const {
+    return entries_;
+  }
+
+  const set<string>& expired_entry_ids() const {
+    return expired_entry_ids_;
+  }
+
+ private:
+  const int64_t entry_expiration_seconds_;
+  vector<TokenSigningPrivateKeyPB> entries_;
+  set<string> expired_entry_ids_;
+
+  DISALLOW_COPY_AND_ASSIGN(TskEntryLoader);
+};
+
+////////////////////////////////////////////////////////////
 // Background Tasks
 ////////////////////////////////////////////////////////////
 
@@ -400,9 +451,16 @@ void CatalogManagerBgTasks::Run() {
                        << l.catalog_status().ToString();
         }
       } else if (l.leader_status().ok()) {
-        std::vector<scoped_refptr<TabletInfo>> to_process;
+        // If this is the leader master, check if it's time to generate
+        // and store a new TSK (Token Signing Key).
+        Status s = catalog_manager_->CheckGenerateNewTskUnlocked();
+        if (!s.ok()) {
+          LOG(ERROR) << "Error processing TSK entry (will try next time): "
+                     << s.ToString();
+        }
 
         // Get list of tablets not yet running.
+        std::vector<scoped_refptr<TabletInfo>> to_process;
         catalog_manager_->ExtractTabletsToProcess(&to_process);
 
         if (!to_process.empty()) {
@@ -413,15 +471,14 @@ void CatalogManagerBgTasks::Run() {
             // If there is an error (e.g., we are not the leader) abort this task
             // and wait until we're woken up again.
             //
-            // TODO Add tests for this in the revision that makes
+            // TODO(unknown): Add tests for this in the revision that makes
             // create/alter fault tolerant.
-            LOG(ERROR) << "Error processing pending assignments, aborting the current task: "
-                       << s.ToString();
+            LOG(ERROR) << "Error processing pending assignments, "
+                "aborting the current task: " << s.ToString();
           }
         }
       }
     }
-
     // Wait for a notification or a timeout expiration.
     //  - CreateTable will call Wake() to notify about the tablets to add
     //  - HandleReportedTablet/ProcessPendingAssignments will call WakeIfHasPendingUpdates()
@@ -675,9 +732,54 @@ Status CatalogManager::WaitUntilCaughtUpAsLeader(const MonoDelta& timeout) {
   return Status::OK();
 }
 
+Status CatalogManager::LoadCertAuthorityInfo(unique_ptr<PrivateKey>* key,
+                                             unique_ptr<Cert>* cert) {
+  leader_lock_.AssertAcquiredForWriting();
+
+  SysCertAuthorityEntryPB info;
+  RETURN_NOT_OK(sys_catalog_->GetCertAuthorityEntry(&info));
+
+  unique_ptr<PrivateKey> ca_private_key(new PrivateKey);
+  unique_ptr<Cert> ca_cert(new Cert);
+  RETURN_NOT_OK(ca_private_key->FromString(
+      info.private_key(), DataFormat::DER));
+  RETURN_NOT_OK(ca_cert->FromString(
+      info.certificate(), DataFormat::DER));
+  // Extra sanity check.
+  RETURN_NOT_OK(ca_cert->CheckKeyMatch(*ca_private_key));
+
+  key->swap(ca_private_key);
+  cert->swap(ca_cert);
+
+  return Status::OK();
+}
+
+// Initialize the master's certificate authority component with the specified
+// private key and certificate.
+Status CatalogManager::InitCertAuthority(unique_ptr<PrivateKey> key,
+                                         unique_ptr<Cert> cert) {
+  leader_lock_.AssertAcquiredForWriting();
+  return master_->cert_authority()->Init(std::move(key), std::move(cert));
+}
+
+// Store internal Kudu CA cert authority information into the system table.
+Status CatalogManager::StoreCertAuthorityInfo(const PrivateKey& key,
+                                              const Cert& cert) {
+  leader_lock_.AssertAcquiredForWriting();
+
+  SysCertAuthorityEntryPB info;
+  RETURN_NOT_OK(key.ToString(info.mutable_private_key(), DataFormat::DER));
+  RETURN_NOT_OK(cert.ToString(info.mutable_certificate(), DataFormat::DER));
+  RETURN_NOT_OK(sys_catalog_->AddCertAuthorityEntry(info));
+  LOG(INFO) << "Successfully stored the newly generated cert authority "
+               "information into the system table.";
+
+  return Status::OK();
+}
+
 // Check if CA private key and the certificate were loaded into the
-// memory. If not, generate new ones and store them into the system table.
-// Use the CA information to initialize the certificate authority.
+// memory. If not, generate new ones and store them into the system table
+// and then load into the memory.
 Status CatalogManager::CheckInitCertAuthority() {
   using security::Cert;
   using security::DataFormat;
@@ -777,7 +879,64 @@ void CatalogManager::VisitTablesAndTabletsTask() {
     LOG(INFO) << "Loading CA info into memory...";
     LOG_SLOW_EXECUTION(WARNING, 1000, LogPrefix() +
                        "Loading CA info into memory") {
-      CHECK_OK(CheckInitCertAuthority());
+      unique_ptr<PrivateKey> key;
+      unique_ptr<Cert> cert;
+      const Status& s = LoadCertAuthorityInfo(&key, &cert);
+      if (s.ok()) {
+        // Once succesfully loaded, the CA information is supposed to be valid:
+        // the leader master should not be run without CA certificate.
+        CHECK_OK(InitCertAuthority(std::move(key), std::move(cert)));
+      } else if (s.IsNotFound()) {
+        LOG(WARNING) << "Did not find CA certificate and key for Kudu IPKI, "
+                        "will generate new ones";
+        // No CA information record has been found in the table -- generate
+        // a new one. The subtlety here is that first it's necessary to store
+        // the newly generated information into the system table and only after
+        // that initialize master certificate authority. The reason is:
+        // if the master server loses its leadership role by that time, there
+        // will be an error on an attempt to write into the system table.
+        // If that happens, skip the rest of the sequence: when this callback
+        // is invoked next time, the system table should already contain
+        // CA certificate information written by other master.
+        unique_ptr<PrivateKey> private_key(new PrivateKey);
+        unique_ptr<Cert> cert(new Cert);
+
+        // Generate new private key and corresponding CA certificate.
+        CHECK_OK(master_->cert_authority()->Generate(private_key.get(),
+                                                     cert.get()));
+        // It the leadership role is lost at this moment, writing into the
+        // system table will fail.
+        const Status& s = StoreCertAuthorityInfo(*private_key, *cert);
+        if (s.ok()) {
+          // The leader master should not run without CA certificate.
+          CHECK_OK(InitCertAuthority(std::move(private_key), std::move(cert)));
+        } else {
+          LOG(WARNING) << "Failed to write newly generated CA information into "
+                          "the system table, assuming change of leadership: "
+                       << s.ToString();
+        }
+      } else {
+        CHECK_OK(s);
+      }
+    }
+
+    LOG(INFO) << "Loading token signing keys...";
+    LOG_SLOW_EXECUTION(WARNING, 1000, LogPrefix() +
+                       "Loading token signing keys...") {
+      set<string> expired_tsk_entry_ids;
+      CHECK_OK(LoadTskEntries(&expired_tsk_entry_ids));
+      Status s = CheckGenerateNewTskUnlocked();
+      if (!s.ok()) {
+        LOG(WARNING) << "Failed to generate and persist new TSK, "
+                        "assuming change of leadership: " << s.ToString();
+        return;
+      }
+      s = DeleteTskEntries(expired_tsk_entry_ids);
+      if (!s.ok()) {
+        LOG(WARNING) << "Failed to purge expired TSK entries from system table, "
+                        "assuming change of leadership: " << s.ToString();
+        return;
+      }
     }
   }
 
@@ -3177,6 +3336,47 @@ void CatalogManager::ExtractTabletsToProcess(
   }
 }
 
+// Check if it's time to roll TokenSigner's key. There's a bit of subtlety here:
+// we shouldn't start exporting a key until it is properly persisted.
+// So, the protocol is:
+//   1) Generate a new TSK.
+//   2) Try to write it to the system table.
+//   3) Pass it back to the TokenSigner on success.
+//   4) Check and switch TokenSigner to the new key if it's time to do so.
+Status CatalogManager::CheckGenerateNewTskUnlocked() {
+  TokenSigner* signer = master_->token_signer();
+  unique_ptr<security::TokenSigningPrivateKey> tsk;
+  RETURN_NOT_OK(signer->CheckNeedKey(&tsk));
+  if (tsk) {
+    // First save the new TSK into the system table.
+    TokenSigningPrivateKeyPB tsk_pb;
+    tsk->ExportPB(&tsk_pb);
+    SysTskEntryPB sys_entry;
+    sys_entry.mutable_tsk()->Swap(&tsk_pb);
+    RETURN_NOT_OK(sys_catalog_->AddTskEntry(sys_entry));
+    LOG(INFO) << "Saved newly generated TSK " << tsk->key_seq_num()
+              << " into the system table.";
+    // Then add the new TSK into the signer.
+    RETURN_NOT_OK(signer->AddKey(std::move(tsk)));
+  }
+  return signer->TryRotateKey();
+}
+
+Status CatalogManager::LoadTskEntries(set<string>* expired_entry_ids) {
+  TskEntryLoader loader;
+  RETURN_NOT_OK(sys_catalog_->VisitTskEntries(&loader));
+  if (expired_entry_ids) {
+    set<string> ref(loader.expired_entry_ids());
+    expired_entry_ids->swap(ref);
+  }
+  return master_->token_signer()->ImportKeys(loader.entries());
+}
+
+Status CatalogManager::DeleteTskEntries(const set<string>& entry_ids) {
+  leader_lock_.AssertAcquiredForWriting();
+  return sys_catalog_->RemoveTskEntries(entry_ids);
+}
+
 struct DeferredAssignmentActions {
   vector<TabletInfo*> tablets_to_add;
   vector<TabletInfo*> tablets_to_update;

http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 65614d6..cf83363 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -59,6 +59,13 @@ namespace rpc {
 class RpcContext;
 } // namespace rpc
 
+namespace security {
+class Cert;
+class PrivateKey;
+class TokenSigner;
+class TokenSigningPrivateKey;
+} // namespace security
+
 namespace master {
 
 class CatalogManagerBgTasks;
@@ -569,6 +576,21 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   // This method is thread-safe.
   Status InitSysCatalogAsync(bool is_first_run);
 
+  // Load the internal Kudu certficate authority information from the system
+  // table: the private key and the certificate. If the CA info entry is not
+  // found in the table, return Status::NotFound.
+  Status LoadCertAuthorityInfo(std::unique_ptr<security::PrivateKey>* key,
+                               std::unique_ptr<security::Cert>* cert);
+
+  // Initialize master's certificate authority with the specified private key
+  // and certificate.
+  Status InitCertAuthority(std::unique_ptr<security::PrivateKey> key,
+                           std::unique_ptr<security::Cert> cert);
+
+  // Store CA certificate information into the system table.
+  Status StoreCertAuthorityInfo(const security::PrivateKey& key,
+                                const security::Cert& cert);
+
   // Load existing root CA information from the system table, if present.
   // If not, generate new ones, store them into the system table.
   // After that, use the CA information to initialize the certificate authority.
@@ -610,6 +632,18 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   // Extract the set of tablets that must be processed because not running yet.
   void ExtractTabletsToProcess(std::vector<scoped_refptr<TabletInfo>>* tablets_to_process);
 
+  // Check if it's time to generate new Token Signing Key for TokenSigner.
+  // If so, generate one and persist it into the system table.
+  Status CheckGenerateNewTskUnlocked();
+
+  // Load non-expired TSK entries from the system table.
+  // Once done, initialize TokenSigner with the loaded entries.
+  Status LoadTskEntries(std::set<std::string>* expired_entry_ids);
+
+  // Delete TSK entries with the specified entry identifiers
+  // (identifiers correspond to the 'entry_id' column).
+  Status DeleteTskEntries(const std::set<std::string>& entry_ids);
+
   Status ApplyAlterSchemaSteps(const SysTablesEntryPB& current_pb,
                                std::vector<AlterTableRequestPB::Step> steps,
                                Schema* new_schema,

http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/master/master.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index dfe28a4..0f04c22 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -18,7 +18,6 @@
 #include "kudu/master/master.h"
 
 #include <algorithm>
-#include <list>
 #include <memory>
 #include <vector>
 
@@ -119,19 +118,9 @@ Status Master::Init() {
   // becomes a leader.
   cert_authority_.reset(new MasterCertAuthority(fs_manager_->uuid()));
 
+  // The TokenSigner loads its keys during catalog manager initialization.
   token_signer_.reset(new TokenSigner(FLAGS_tsk_validity_seconds,
                                       FLAGS_tsk_rotation_seconds));
-  // TODO(PKI): this also will need to be wired together with CatalogManager
-  // soon, including initializing the token manager with the proper next
-  // sequence number.
-  {
-    unique_ptr<TokenSigningPrivateKey> key;
-    RETURN_NOT_OK(token_signer_->CheckNeedKey(&key));
-    if (key) {
-      RETURN_NOT_OK(token_signer_->AddKey(std::move(key)));
-    }
-  }
-
   state_ = kInitialized;
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/master/master.proto
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 02ddd1d..37deba4 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -186,6 +186,14 @@ message SysCertAuthorityEntryPB {
   required bytes certificate = 2 [(kudu.REDACT) = true];
 }
 
+// The on-disk entry in the sys.catalog table ("metadata" column) to represent
+// a Token Signing Key (TSK) object. Multiple entries of this type
+// can simultaneously co-exist in the sys.catalog table.
+message SysTskEntryPB {
+  // TokenSigningPrivateKeyPB message representing a TSK.
+  required security.TokenSigningPrivateKeyPB tsk = 1;
+}
+
 ////////////////////////////////////////////////////////////
 // RPCs
 ////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 5d8181a..1d698ff 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 #include <functional>
+#include <iomanip>
 #include <iterator>
 #include <memory>
 #include <set>
@@ -27,6 +28,7 @@
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 
+#include "kudu/common/key_encoder.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/partition.h"
 #include "kudu/common/row_operations.h"
@@ -45,6 +47,7 @@
 #include "kudu/master/master.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/rpc/rpc_context.h"
+#include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_bootstrap.h"
 #include "kudu/tablet/transactions/write_transaction.h"
@@ -66,7 +69,6 @@ using kudu::consensus::ConsensusStatePB;
 using kudu::consensus::RaftConfigPB;
 using kudu::consensus::RaftPeerPB;
 using kudu::log::Log;
-using kudu::log::LogAnchorRegistry;
 using kudu::tablet::LatchTransactionCompletionCallback;
 using kudu::tablet::Tablet;
 using kudu::tablet::TabletPeer;
@@ -472,6 +474,14 @@ void SysCatalogTable::ReqDeleteTable(WriteRequestPB* req, const TableInfo* table
   enc.Add(RowOperationsPB::DELETE, row);
 }
 
+// Convert the sequence number into string padded with zeroes in the
+// beginning -- that's the key for the new TSK record.
+string SysCatalogTable::TskSeqNumberToEntryId(int64_t seq_number) {
+  string entry_id;
+  KeyEncoderTraits<DataType::INT64, string>::Encode(seq_number, &entry_id);
+  return entry_id;
+}
+
 Status SysCatalogTable::VisitTables(TableVisitor* visitor) {
   TRACE_EVENT0("master", "SysCatalogTable::VisitTables");
   auto processor = [&](
@@ -508,8 +518,9 @@ Status SysCatalogTable::ProcessRows(
       << "cannot find sys catalog table column " << kSysCatalogTableColType
       << " in schema: " << schema_.ToString();
 
-  const int8_t et = entry_type;
-  auto pred = ColumnPredicate::Equality(schema_.column(type_col_idx), &et);
+  static const int8_t kEntryType = entry_type;
+  auto pred = ColumnPredicate::Equality(schema_.column(type_col_idx),
+                                        &kEntryType);
   ScanSpec spec;
   spec.AddPredicate(pred);
 
@@ -521,7 +532,8 @@ Status SysCatalogTable::ProcessRows(
   RowBlock block(iter->schema(), 512, &arena);
   while (iter->HasNext()) {
     RETURN_NOT_OK(iter->NextBlock(&block));
-    for (size_t i = 0; i < block.nrows(); ++i) {
+    const size_t nrows = block.nrows();
+    for (size_t i = 0; i < nrows; ++i) {
       if (!block.selection_vector()->IsRowSelected(i)) {
         continue;
       }
@@ -534,6 +546,16 @@ Status SysCatalogTable::ProcessRows(
   return Status::OK();
 }
 
+Status SysCatalogTable::VisitTskEntries(TskEntryVisitor* visitor) {
+  TRACE_EVENT0("master", "SysCatalogTable::VisitTskEntries");
+  auto processor = [&](
+      const string& entry_id,
+      const SysTskEntryPB& entry_data) {
+    return visitor->Visit(entry_id, entry_data);
+  };
+  return ProcessRows<SysTskEntryPB, TSK_ENTRY>(processor);
+}
+
 Status SysCatalogTable::GetCertAuthorityEntry(SysCertAuthorityEntryPB* entry) {
   CHECK(entry);
   vector<SysCertAuthorityEntryPB> entries;
@@ -577,6 +599,48 @@ Status SysCatalogTable::AddCertAuthorityEntry(
   return Status::OK();
 }
 
+Status SysCatalogTable::AddTskEntry(const SysTskEntryPB& entry) {
+  WriteRequestPB req;
+  WriteResponsePB resp;
+
+  req.set_tablet_id(kSysCatalogTabletId);
+  CHECK_OK(SchemaToPB(schema_, req.mutable_schema()));
+
+  CHECK(entry.tsk().has_key_seq_num());
+  CHECK(entry.tsk().has_expire_unix_epoch_seconds());
+  CHECK(entry.tsk().has_rsa_key_der());
+
+  faststring metadata_buf;
+  pb_util::SerializeToString(entry, &metadata_buf);
+
+  KuduPartialRow row(&schema_);
+  CHECK_OK(row.SetInt8(kSysCatalogTableColType, TSK_ENTRY));
+  CHECK_OK(row.SetString(kSysCatalogTableColId,
+                         TskSeqNumberToEntryId(entry.tsk().key_seq_num())));
+  CHECK_OK(row.SetString(kSysCatalogTableColMetadata, metadata_buf));
+  RowOperationsPBEncoder enc(req.mutable_row_operations());
+  enc.Add(RowOperationsPB::INSERT, row);
+
+  return SyncWrite(&req, &resp);
+}
+
+Status SysCatalogTable::RemoveTskEntries(const set<string>& entry_ids) {
+  WriteRequestPB req;
+  WriteResponsePB resp;
+
+  req.set_tablet_id(kSysCatalogTabletId);
+  CHECK_OK(SchemaToPB(schema_, req.mutable_schema()));
+  for (const auto& id : entry_ids) {
+    KuduPartialRow row(&schema_);
+    CHECK_OK(row.SetInt8(kSysCatalogTableColType, TSK_ENTRY));
+    CHECK_OK(row.SetString(kSysCatalogTableColId, id));
+    RowOperationsPBEncoder enc(req.mutable_row_operations());
+    enc.Add(RowOperationsPB::DELETE, row);
+  }
+
+  return SyncWrite(&req, &resp);
+}
+
 // ==================================================================
 // Tablet related methods
 // ==================================================================

http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/master/sys_catalog.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.h b/src/kudu/master/sys_catalog.h
index 4620f9d..11e92cd 100644
--- a/src/kudu/master/sys_catalog.h
+++ b/src/kudu/master/sys_catalog.h
@@ -18,6 +18,7 @@
 #define KUDU_MASTER_SYS_CATALOG_H_
 
 #include <functional>
+#include <set>
 #include <string>
 #include <vector>
 
@@ -58,12 +59,23 @@ class TabletVisitor {
                              const SysTabletsEntryPB& metadata) = 0;
 };
 
+// Visitor for TSK-related (Token Signing Key) entries. Actually, only the
+// public part of those are stored in the system catalog table. That information
+// is preserved to allow any master to verify token which might be signed
+// by current or former master leader.
+class TskEntryVisitor {
+ public:
+  virtual Status Visit(const std::string& entry_id,
+                       const SysTskEntryPB& metadata) = 0;
+};
+
 // SysCatalogTable is a Kudu table that keeps track of the following
 // system information:
 //   * table metadata
 //   * tablet metadata
-//   * root CA (certificate authority) certificates
-//   * corresponding CA private keys
+//   * root CA (certificate authority) certificate of the Kudu IPKI
+//   * Kudu IPKI root CA cert's private key
+//   * TSK (Token Signing Key) entries
 //
 // The essential properties of the SysCatalogTable are:
 //   * SysCatalogTable has only one tablet.
@@ -84,6 +96,7 @@ class SysCatalogTable {
     TABLES_ENTRY = 1,
     TABLETS_ENTRY = 2,
     CERT_AUTHORITY_INFO = 3,  // Kudu's root certificate authority entry.
+    TSK_ENTRY = 4,            // Token Signing Key entry.
   };
 
   // 'leader_cb_' is invoked whenever this node is elected as a leader
@@ -128,6 +141,9 @@ class SysCatalogTable {
   // Scan of the tablet-related entries.
   Status VisitTablets(TabletVisitor* visitor);
 
+  // Scan for TSK-related entries in the system table.
+  Status VisitTskEntries(TskEntryVisitor* visitor);
+
   // Retrive the CA entry (private key and certificate) from the system table.
   Status GetCertAuthorityEntry(SysCertAuthorityEntryPB* entry);
 
@@ -135,6 +151,13 @@ class SysCatalogTable {
   // There should be no more than one CA entry in the system table.
   Status AddCertAuthorityEntry(const SysCertAuthorityEntryPB& entry);
 
+  // Add TSK (Token Signing Key) entry into the system table.
+  Status AddTskEntry(const SysTskEntryPB& entry);
+
+  // Remove TSK (Token Signing Key) entries with the specified entry identifiers
+  // (as in 'entry_id' column) from the system table.
+  Status RemoveTskEntries(const std::set<std::string>& entry_ids);
+
  private:
   FRIEND_TEST(MasterTest, TestMasterMetadataConsistentDespiteFailures);
   DISALLOW_COPY_AND_ASSIGN(SysCatalogTable);
@@ -209,6 +232,8 @@ class SysCatalogTable {
   void ReqDeleteTablets(tserver::WriteRequestPB* req,
                         const std::vector<TabletInfo*>& tablets);
 
+  static string TskSeqNumberToEntryId(int64_t seq_number);
+
   // Special string injected into SyncWrite() random failures (if enabled).
   //
   // Only useful for tests.