You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/02/22 02:00:47 UTC
[2/6] kudu git commit: [security] load/store public TSK in the system
table
[security] load/store public TSK in the system table
Non-expired, already existing TSK records are read from the system
catalog table upon the 'elected-as-leader' callback and fed into the
TokenSigner. New token signing keys are generated and written
into the system catalog table by a periodic task.
The expired entries are removed from the system table
upon the 'elected-as-leader' callback.
Added integration tests for cluster start/restart and switching
master leadership role into token_signer-itest.cc.
Change-Id: Ie91d4129bda0ca49e81988c28385895a2abcd201
Reviewed-on: http://gerrit.cloudera.org:8080/5935
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c06a3bc6
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c06a3bc6
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c06a3bc6
Branch: refs/heads/master
Commit: c06a3bc66e59e9467b599e85714825347aacf7ec
Parents: 48cc975
Author: Alexey Serbin <as...@cloudera.com>
Authored: Fri Feb 3 21:04:07 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Feb 21 23:00:55 2017 +0000
----------------------------------------------------------------------
src/kudu/client/client-test.cc | 2 +-
src/kudu/integration-tests/CMakeLists.txt | 1 +
src/kudu/integration-tests/registration-test.cc | 20 +-
.../integration-tests/token_signer-itest.cc | 207 +++++++++++++++++
src/kudu/integration-tests/ts_itest-base.h | 21 +-
src/kudu/master/catalog_manager.cc | 222 ++++++++++++++++++-
src/kudu/master/catalog_manager.h | 34 +++
src/kudu/master/master.cc | 13 +-
src/kudu/master/master.proto | 8 +
src/kudu/master/sys_catalog.cc | 72 +++++-
src/kudu/master/sys_catalog.h | 29 ++-
11 files changed, 587 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 6f652c9..2b0f339 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -674,7 +674,7 @@ TEST_F(ClientTest, TestMasterDown) {
shared_ptr<KuduTable> t;
client_->data_->default_admin_operation_timeout_ = MonoDelta::FromSeconds(1);
Status s = client_->OpenTable("other-tablet", &t);
- ASSERT_TRUE(s.IsNetworkError());
+ ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
}
TEST_F(ClientTest, TestScan) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index 8c6da5e..5b6a2e8 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -81,6 +81,7 @@ ADD_KUDU_TEST(tablet_copy-itest)
ADD_KUDU_TEST(tablet_copy_client_session-itest)
ADD_KUDU_TEST(tablet_history_gc-itest)
ADD_KUDU_TEST(tablet_replacement-itest)
+ADD_KUDU_TEST(token_signer-itest RESOURCE_LOCK "master-rpc-ports")
ADD_KUDU_TEST(ts_recovery-itest)
ADD_KUDU_TEST(ts_tablet_manager-itest)
ADD_KUDU_TEST(webserver-stress-itest)
http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/integration-tests/registration-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/registration-test.cc b/src/kudu/integration-tests/registration-test.cc
index 0b82ae6..168ce9e 100644
--- a/src/kudu/integration-tests/registration-test.cc
+++ b/src/kudu/integration-tests/registration-test.cc
@@ -97,21 +97,27 @@ class RegistrationTest : public KuduTest {
}
}
-
- Status WaitForReplicaCount(const string& tablet_id, int expected_count,
+ Status WaitForReplicaCount(const string& tablet_id,
+ int expected_count,
TabletLocationsPB* locations) {
while (true) {
- master::CatalogManager* catalog = cluster_->mini_master()->master()->catalog_manager();
+ master::CatalogManager* catalog =
+ cluster_->mini_master()->master()->catalog_manager();
Status s;
- {
+ do {
master::CatalogManager::ScopedLeaderSharedLock l(catalog);
- RETURN_NOT_OK(l.first_failed_status());
+ const Status& ls = l.first_failed_status();
+ if (ls.IsServiceUnavailable()) {
+ // ServiceUnavailable means catalog manager is not yet ready
+ // to serve requests -- try again later.
+ break; // exiting out of the 'do {...} while (false)' scope
+ }
+ RETURN_NOT_OK(ls);
s = catalog->GetTabletLocations(tablet_id, locations);
- }
+ } while (false);
if (s.ok() && locations->replicas_size() == expected_count) {
return Status::OK();
}
-
SleepFor(MonoDelta::FromMilliseconds(1));
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/integration-tests/token_signer-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/token_signer-itest.cc b/src/kudu/integration-tests/token_signer-itest.cc
new file mode 100644
index 0000000..121f4d1
--- /dev/null
+++ b/src/kudu/integration-tests/token_signer-itest.cc
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <string>
+
+#include <gflags/gflags_declare.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/mini_cluster.h"
+#include "kudu/master/catalog_manager.h"
+#include "kudu/master/master.h"
+#include "kudu/master/master.proxy.h"
+#include "kudu/master/master_cert_authority.h"
+#include "kudu/master/mini_master.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/security/token_signer.h"
+#include "kudu/security/token_verifier.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_int64(tsk_validity_seconds);
+DECLARE_int64(tsk_rotation_seconds);
+
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using kudu::security::TokenSigningPublicKeyPB;
+using kudu::security::SignedTokenPB;
+using kudu::security::TokenPB;
+
+namespace kudu {
+namespace master {
+
+class TokenSignerITest : public KuduTest {
+ public:
+ TokenSignerITest() {
+ FLAGS_tsk_validity_seconds = 60;
+ FLAGS_tsk_rotation_seconds = 20;
+
+ // Hard-coded ports for the masters. This is safe, as this unit test
+ // runs under a resource lock (see CMakeLists.txt in this directory).
+ // TODO(aserbin): we should have a generic method to obtain n free ports.
+ opts_.master_rpc_ports = { 11010, 11011, 11012 };
+
+ opts_.num_masters = num_masters_ = opts_.master_rpc_ports.size();
+ }
+
+ void SetUp() override {
+ KuduTest::SetUp();
+ cluster_.reset(new MiniCluster(env_, opts_));
+ ASSERT_OK(cluster_->Start());
+ }
+
+ Status RestartCluster() {
+ cluster_->Shutdown();
+ return cluster_->Start();
+ }
+
+ Status ShutdownLeader() {
+ int leader_idx;
+ RETURN_NOT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+ MiniMaster* leader_master = cluster_->mini_master(leader_idx);
+ leader_master->Shutdown();
+ return Status::OK();
+ }
+
+ // Check the leader is found on the cluster.
+ Status WaitForNewLeader() {
+ return cluster_->GetLeaderMasterIndex(nullptr);
+ }
+
+ static SignedTokenPB MakeToken() {
+ SignedTokenPB ret;
+ TokenPB token;
+ token.set_expire_unix_epoch_seconds(WallTime_Now() + 600);
+ CHECK(token.SerializeToString(ret.mutable_token_data()));
+ return ret;
+ }
+
+ Status SignToken(SignedTokenPB* token) {
+ int leader_idx;
+ RETURN_NOT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+ MiniMaster* leader = cluster_->mini_master(leader_idx);
+ Master* master = leader->master();
+ RETURN_NOT_OK(master->token_signer()->SignToken(token));
+ return Status::OK();
+ }
+
+ Status GetPublicKeys(int idx, vector<TokenSigningPublicKeyPB>* public_keys) {
+ CHECK_GE(idx, 0);
+ CHECK_LT(idx, num_masters_);
+ MiniMaster* mm = cluster_->mini_master(idx);
+ vector<TokenSigningPublicKeyPB> keys =
+ mm->master()->token_signer()->verifier().ExportKeys();
+ public_keys->swap(keys);
+ return Status::OK();
+ }
+
+ Status GetLeaderPublicKeys(vector<TokenSigningPublicKeyPB>* public_keys) {
+ int leader_idx;
+ RETURN_NOT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+ return GetPublicKeys(leader_idx, public_keys);
+ }
+
+ protected:
+ int num_masters_;
+ MiniClusterOptions opts_;
+ unique_ptr<MiniCluster> cluster_;
+};
+
+// Check that once cluster has started, the TSK for signing is available at the
+// leader master and nothing is available at master-followers. The follower
+// masters do not poll the system table for TSK entries nor TSK information
+// is transferred to them from the leader master in any other way.
+TEST_F(TokenSignerITest, TskAtLeaderMaster) {
+ // Check the leader can sign tokens: this guarantees at least one TSK has been
+ // generated and is available for token signing.
+ SignedTokenPB t(MakeToken());
+ ASSERT_OK(SignToken(&t));
+
+ // Get the public part of the signing key from the leader.
+ vector<TokenSigningPublicKeyPB> leader_public_keys;
+ ASSERT_OK(GetLeaderPublicKeys(&leader_public_keys));
+ ASSERT_EQ(1, leader_public_keys.size());
+ EXPECT_EQ(t.signing_key_seq_num(), leader_public_keys[0].key_seq_num());
+}
+
+// New TSK is generated upon start of the leader master and persisted in
+// the system catalog table. So, check that appropriate TSK
+// information is available upon start of the cluster and it's persistent
+// for some time after restart (until the expiration time, actually;
+// the removal of expired TSK info is not covered by this scenario).
+TEST_F(TokenSignerITest, TskClusterRestart) {
+ // Check the leader can sign tokens just after start.
+ SignedTokenPB t_pre(MakeToken());
+ ASSERT_OK(SignToken(&t_pre));
+
+ vector<TokenSigningPublicKeyPB> public_keys_before;
+ ASSERT_OK(GetLeaderPublicKeys(&public_keys_before));
+ ASSERT_EQ(1, public_keys_before.size());
+
+ ASSERT_OK(RestartCluster());
+
+ // Check the leader can sign tokens after the restart.
+ SignedTokenPB t_post(MakeToken());
+ ASSERT_OK(SignToken(&t_post));
+ EXPECT_EQ(t_post.signing_key_seq_num(), t_pre.signing_key_seq_num());
+
+ vector<TokenSigningPublicKeyPB> public_keys_after;
+ ASSERT_OK(GetLeaderPublicKeys(&public_keys_after));
+ ASSERT_EQ(1, public_keys_after.size());
+ EXPECT_EQ(public_keys_before[0].SerializeAsString(),
+ public_keys_after[0].SerializeAsString());
+}
+
+// Test that if leadership changes, the new leader has the same TSK information
+// for token verification as the former leader
+// (it's assumed no new TSK generation happened in between).
+TEST_F(TokenSignerITest, TskMasterLeadershipChange) {
+ SignedTokenPB t_former_leader(MakeToken());
+ ASSERT_OK(SignToken(&t_former_leader));
+
+ vector<TokenSigningPublicKeyPB> public_keys;
+ ASSERT_OK(GetLeaderPublicKeys(&public_keys));
+ ASSERT_EQ(1, public_keys.size());
+
+ ASSERT_OK(ShutdownLeader());
+ ASSERT_OK(WaitForNewLeader());
+
+ // The new leader should use the same signing key.
+ SignedTokenPB t_new_leader(MakeToken());
+ ASSERT_OK(SignToken(&t_new_leader));
+ EXPECT_EQ(t_new_leader.signing_key_seq_num(),
+ t_former_leader.signing_key_seq_num());
+
+ vector<TokenSigningPublicKeyPB> public_keys_new_leader;
+ ASSERT_OK(GetLeaderPublicKeys(&public_keys_new_leader));
+ ASSERT_EQ(1, public_keys_new_leader.size());
+ EXPECT_EQ(public_keys[0].SerializeAsString(),
+ public_keys_new_leader[0].SerializeAsString());
+}
+
+// TODO(aserbin): add a test case which corresponds to multiple signing keys
+// right after cluster start-up.
+
+} // namespace master
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/integration-tests/ts_itest-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_itest-base.h b/src/kudu/integration-tests/ts_itest-base.h
index 844369c..3655977 100644
--- a/src/kudu/integration-tests/ts_itest-base.h
+++ b/src/kudu/integration-tests/ts_itest-base.h
@@ -147,12 +147,23 @@ class TabletServerIntegrationTestBase : public TabletServerTestBase {
CHECK_OK(cluster_->master_proxy()->GetTableLocations(req, &resp, &controller));
CHECK_OK(controller.status());
if (resp.has_error()) {
- if (resp.error().code() == master::MasterErrorPB::TABLET_NOT_RUNNING) {
- LOG(WARNING)<< "At least one tablet is not yet running";
- SleepFor(MonoDelta::FromSeconds(1));
- continue;
+ switch (resp.error().code()) {
+ case master::MasterErrorPB::TABLET_NOT_RUNNING:
+ LOG(WARNING)<< "At least one tablet is not yet running";
+ break;
+
+ case master::MasterErrorPB::NOT_THE_LEADER: // fallthrough
+ case master::MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED:
+ LOG(WARNING)<< "CatalogManager is not yet ready to serve requests";
+ break;
+
+ default:
+ FAIL() << "Response had a fatal error: "
+ << SecureShortDebugString(resp.error());
+ break; // unreachable
}
- FAIL() << "Response had a fatal error: " << SecureShortDebugString(resp.error());
+ SleepFor(MonoDelta::FromSeconds(1));
+ continue;
}
for (const master::TabletLocationsPB& location : resp.tablet_locations()) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index a2a77e2..59b990f 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -83,6 +83,9 @@
#include "kudu/rpc/rpc_context.h"
#include "kudu/security/cert.h"
#include "kudu/security/crypto.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/security/token_signer.h"
+#include "kudu/security/token_signing_key.h"
#include "kudu/tserver/tserver_admin.proxy.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/flag_tags.h"
@@ -197,6 +200,7 @@ DEFINE_bool(catalog_manager_delete_orphaned_tablets, false,
TAG_FLAG(catalog_manager_delete_orphaned_tablets, advanced);
using std::pair;
+using std::set;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
@@ -205,19 +209,24 @@ using std::vector;
namespace kudu {
namespace master {
-using base::subtle::NoBarrier_Load;
using base::subtle::NoBarrier_CompareAndSwap;
+using base::subtle::NoBarrier_Load;
using cfile::TypeEncodingInfo;
-using consensus::kMinimumTerm;
using consensus::CONSENSUS_CONFIG_COMMITTED;
using consensus::Consensus;
using consensus::ConsensusServiceProxy;
using consensus::ConsensusStatePB;
using consensus::GetConsensusRole;
-using consensus::OpId;
using consensus::RaftPeerPB;
using consensus::StartTabletCopyRequestPB;
+using consensus::kMinimumTerm;
using rpc::RpcContext;
+using security::Cert;
+using security::DataFormat;
+using security::PrivateKey;
+using security::TokenSigner;
+using security::TokenSigningPrivateKey;
+using security::TokenSigningPrivateKeyPB;
using strings::Substitute;
using tablet::TABLET_DATA_DELETED;
using tablet::TABLET_DATA_TOMBSTONED;
@@ -317,6 +326,48 @@ class TabletLoader : public TabletVisitor {
};
////////////////////////////////////////////////////////////
+// TSK (Token Signing Key) Entry Loader
+////////////////////////////////////////////////////////////
+
+class TskEntryLoader : public TskEntryVisitor {
+ public:
+ TskEntryLoader()
+ : entry_expiration_seconds_(WallTime_Now()) {
+ }
+
+ Status Visit(const string& entry_id,
+ const SysTskEntryPB& metadata) override {
+ TokenSigningPrivateKeyPB tsk(metadata.tsk());
+ CHECK(tsk.has_key_seq_num());
+ CHECK(tsk.has_expire_unix_epoch_seconds());
+ CHECK(tsk.has_rsa_key_der());
+
+ // Expired entries are useful as well: they are needed for correct tracking
+ // of TSK sequence numbers.
+ entries_.emplace_back(std::move(tsk));
+ if (tsk.expire_unix_epoch_seconds() <= entry_expiration_seconds_) {
+ expired_entry_ids_.insert(entry_id);
+ }
+ return Status::OK();
+ }
+
+ const vector<TokenSigningPrivateKeyPB>& entries() const {
+ return entries_;
+ }
+
+ const set<string>& expired_entry_ids() const {
+ return expired_entry_ids_;
+ }
+
+ private:
+ const int64_t entry_expiration_seconds_;
+ vector<TokenSigningPrivateKeyPB> entries_;
+ set<string> expired_entry_ids_;
+
+ DISALLOW_COPY_AND_ASSIGN(TskEntryLoader);
+};
+
+////////////////////////////////////////////////////////////
// Background Tasks
////////////////////////////////////////////////////////////
@@ -400,9 +451,16 @@ void CatalogManagerBgTasks::Run() {
<< l.catalog_status().ToString();
}
} else if (l.leader_status().ok()) {
- std::vector<scoped_refptr<TabletInfo>> to_process;
+ // If this is the leader master, check if it's time to generate
+ // and store a new TSK (Token Signing Key).
+ Status s = catalog_manager_->CheckGenerateNewTskUnlocked();
+ if (!s.ok()) {
+ LOG(ERROR) << "Error processing TSK entry (will try next time): "
+ << s.ToString();
+ }
// Get list of tablets not yet running.
+ std::vector<scoped_refptr<TabletInfo>> to_process;
catalog_manager_->ExtractTabletsToProcess(&to_process);
if (!to_process.empty()) {
@@ -413,15 +471,14 @@ void CatalogManagerBgTasks::Run() {
// If there is an error (e.g., we are not the leader) abort this task
// and wait until we're woken up again.
//
- // TODO Add tests for this in the revision that makes
+ // TODO(unknown): Add tests for this in the revision that makes
// create/alter fault tolerant.
- LOG(ERROR) << "Error processing pending assignments, aborting the current task: "
- << s.ToString();
+ LOG(ERROR) << "Error processing pending assignments, "
+ "aborting the current task: " << s.ToString();
}
}
}
}
-
// Wait for a notification or a timeout expiration.
// - CreateTable will call Wake() to notify about the tablets to add
// - HandleReportedTablet/ProcessPendingAssignments will call WakeIfHasPendingUpdates()
@@ -675,9 +732,54 @@ Status CatalogManager::WaitUntilCaughtUpAsLeader(const MonoDelta& timeout) {
return Status::OK();
}
+Status CatalogManager::LoadCertAuthorityInfo(unique_ptr<PrivateKey>* key,
+ unique_ptr<Cert>* cert) {
+ leader_lock_.AssertAcquiredForWriting();
+
+ SysCertAuthorityEntryPB info;
+ RETURN_NOT_OK(sys_catalog_->GetCertAuthorityEntry(&info));
+
+ unique_ptr<PrivateKey> ca_private_key(new PrivateKey);
+ unique_ptr<Cert> ca_cert(new Cert);
+ RETURN_NOT_OK(ca_private_key->FromString(
+ info.private_key(), DataFormat::DER));
+ RETURN_NOT_OK(ca_cert->FromString(
+ info.certificate(), DataFormat::DER));
+ // Extra sanity check.
+ RETURN_NOT_OK(ca_cert->CheckKeyMatch(*ca_private_key));
+
+ key->swap(ca_private_key);
+ cert->swap(ca_cert);
+
+ return Status::OK();
+}
+
+// Initialize the master's certificate authority component with the specified
+// private key and certificate.
+Status CatalogManager::InitCertAuthority(unique_ptr<PrivateKey> key,
+ unique_ptr<Cert> cert) {
+ leader_lock_.AssertAcquiredForWriting();
+ return master_->cert_authority()->Init(std::move(key), std::move(cert));
+}
+
+// Store internal Kudu CA cert authority information into the system table.
+Status CatalogManager::StoreCertAuthorityInfo(const PrivateKey& key,
+ const Cert& cert) {
+ leader_lock_.AssertAcquiredForWriting();
+
+ SysCertAuthorityEntryPB info;
+ RETURN_NOT_OK(key.ToString(info.mutable_private_key(), DataFormat::DER));
+ RETURN_NOT_OK(cert.ToString(info.mutable_certificate(), DataFormat::DER));
+ RETURN_NOT_OK(sys_catalog_->AddCertAuthorityEntry(info));
+ LOG(INFO) << "Successfully stored the newly generated cert authority "
+ "information into the system table.";
+
+ return Status::OK();
+}
+
// Check if CA private key and the certificate were loaded into the
-// memory. If not, generate new ones and store them into the system table.
-// Use the CA information to initialize the certificate authority.
+// memory. If not, generate new ones and store them into the system table
+// and then load into the memory.
Status CatalogManager::CheckInitCertAuthority() {
using security::Cert;
using security::DataFormat;
@@ -777,7 +879,64 @@ void CatalogManager::VisitTablesAndTabletsTask() {
LOG(INFO) << "Loading CA info into memory...";
LOG_SLOW_EXECUTION(WARNING, 1000, LogPrefix() +
"Loading CA info into memory") {
- CHECK_OK(CheckInitCertAuthority());
+ unique_ptr<PrivateKey> key;
+ unique_ptr<Cert> cert;
+ const Status& s = LoadCertAuthorityInfo(&key, &cert);
+ if (s.ok()) {
+ // Once succesfully loaded, the CA information is supposed to be valid:
+ // the leader master should not be run without CA certificate.
+ CHECK_OK(InitCertAuthority(std::move(key), std::move(cert)));
+ } else if (s.IsNotFound()) {
+ LOG(WARNING) << "Did not find CA certificate and key for Kudu IPKI, "
+ "will generate new ones";
+ // No CA information record has been found in the table -- generate
+ // a new one. The subtlety here is that first it's necessary to store
+ // the newly generated information into the system table and only after
+ // that initialize master certificate authority. The reason is:
+ // if the master server loses its leadership role by that time, there
+ // will be an error on an attempt to write into the system table.
+ // If that happens, skip the rest of the sequence: when this callback
+ // is invoked next time, the system table should already contain
+ // CA certificate information written by other master.
+ unique_ptr<PrivateKey> private_key(new PrivateKey);
+ unique_ptr<Cert> cert(new Cert);
+
+ // Generate new private key and corresponding CA certificate.
+ CHECK_OK(master_->cert_authority()->Generate(private_key.get(),
+ cert.get()));
+ // It the leadership role is lost at this moment, writing into the
+ // system table will fail.
+ const Status& s = StoreCertAuthorityInfo(*private_key, *cert);
+ if (s.ok()) {
+ // The leader master should not run without CA certificate.
+ CHECK_OK(InitCertAuthority(std::move(private_key), std::move(cert)));
+ } else {
+ LOG(WARNING) << "Failed to write newly generated CA information into "
+ "the system table, assuming change of leadership: "
+ << s.ToString();
+ }
+ } else {
+ CHECK_OK(s);
+ }
+ }
+
+ LOG(INFO) << "Loading token signing keys...";
+ LOG_SLOW_EXECUTION(WARNING, 1000, LogPrefix() +
+ "Loading token signing keys...") {
+ set<string> expired_tsk_entry_ids;
+ CHECK_OK(LoadTskEntries(&expired_tsk_entry_ids));
+ Status s = CheckGenerateNewTskUnlocked();
+ if (!s.ok()) {
+ LOG(WARNING) << "Failed to generate and persist new TSK, "
+ "assuming change of leadership: " << s.ToString();
+ return;
+ }
+ s = DeleteTskEntries(expired_tsk_entry_ids);
+ if (!s.ok()) {
+ LOG(WARNING) << "Failed to purge expired TSK entries from system table, "
+ "assuming change of leadership: " << s.ToString();
+ return;
+ }
}
}
@@ -3177,6 +3336,47 @@ void CatalogManager::ExtractTabletsToProcess(
}
}
+// Check if it's time to roll TokenSigner's key. There's a bit of subtlety here:
+// we shouldn't start exporting a key until it is properly persisted.
+// So, the protocol is:
+// 1) Generate a new TSK.
+// 2) Try to write it to the system table.
+// 3) Pass it back to the TokenSigner on success.
+// 4) Check and switch TokenSigner to the new key if it's time to do so.
+Status CatalogManager::CheckGenerateNewTskUnlocked() {
+ TokenSigner* signer = master_->token_signer();
+ unique_ptr<security::TokenSigningPrivateKey> tsk;
+ RETURN_NOT_OK(signer->CheckNeedKey(&tsk));
+ if (tsk) {
+ // First save the new TSK into the system table.
+ TokenSigningPrivateKeyPB tsk_pb;
+ tsk->ExportPB(&tsk_pb);
+ SysTskEntryPB sys_entry;
+ sys_entry.mutable_tsk()->Swap(&tsk_pb);
+ RETURN_NOT_OK(sys_catalog_->AddTskEntry(sys_entry));
+ LOG(INFO) << "Saved newly generated TSK " << tsk->key_seq_num()
+ << " into the system table.";
+ // Then add the new TSK into the signer.
+ RETURN_NOT_OK(signer->AddKey(std::move(tsk)));
+ }
+ return signer->TryRotateKey();
+}
+
+Status CatalogManager::LoadTskEntries(set<string>* expired_entry_ids) {
+ TskEntryLoader loader;
+ RETURN_NOT_OK(sys_catalog_->VisitTskEntries(&loader));
+ if (expired_entry_ids) {
+ set<string> ref(loader.expired_entry_ids());
+ expired_entry_ids->swap(ref);
+ }
+ return master_->token_signer()->ImportKeys(loader.entries());
+}
+
+Status CatalogManager::DeleteTskEntries(const set<string>& entry_ids) {
+ leader_lock_.AssertAcquiredForWriting();
+ return sys_catalog_->RemoveTskEntries(entry_ids);
+}
+
struct DeferredAssignmentActions {
vector<TabletInfo*> tablets_to_add;
vector<TabletInfo*> tablets_to_update;
http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 65614d6..cf83363 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -59,6 +59,13 @@ namespace rpc {
class RpcContext;
} // namespace rpc
+namespace security {
+class Cert;
+class PrivateKey;
+class TokenSigner;
+class TokenSigningPrivateKey;
+} // namespace security
+
namespace master {
class CatalogManagerBgTasks;
@@ -569,6 +576,21 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
// This method is thread-safe.
Status InitSysCatalogAsync(bool is_first_run);
+ // Load the internal Kudu certficate authority information from the system
+ // table: the private key and the certificate. If the CA info entry is not
+ // found in the table, return Status::NotFound.
+ Status LoadCertAuthorityInfo(std::unique_ptr<security::PrivateKey>* key,
+ std::unique_ptr<security::Cert>* cert);
+
+ // Initialize master's certificate authority with the specified private key
+ // and certificate.
+ Status InitCertAuthority(std::unique_ptr<security::PrivateKey> key,
+ std::unique_ptr<security::Cert> cert);
+
+ // Store CA certificate information into the system table.
+ Status StoreCertAuthorityInfo(const security::PrivateKey& key,
+ const security::Cert& cert);
+
// Load existing root CA information from the system table, if present.
// If not, generate new ones, store them into the system table.
// After that, use the CA information to initialize the certificate authority.
@@ -610,6 +632,18 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
// Extract the set of tablets that must be processed because not running yet.
void ExtractTabletsToProcess(std::vector<scoped_refptr<TabletInfo>>* tablets_to_process);
+ // Check if it's time to generate new Token Signing Key for TokenSigner.
+ // If so, generate one and persist it into the system table.
+ Status CheckGenerateNewTskUnlocked();
+
+ // Load non-expired TSK entries from the system table.
+ // Once done, initialize TokenSigner with the loaded entries.
+ Status LoadTskEntries(std::set<std::string>* expired_entry_ids);
+
+ // Delete TSK entries with the specified entry identifiers
+ // (identifiers correspond to the 'entry_id' column).
+ Status DeleteTskEntries(const std::set<std::string>& entry_ids);
+
Status ApplyAlterSchemaSteps(const SysTablesEntryPB& current_pb,
std::vector<AlterTableRequestPB::Step> steps,
Schema* new_schema,
http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/master/master.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index dfe28a4..0f04c22 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -18,7 +18,6 @@
#include "kudu/master/master.h"
#include <algorithm>
-#include <list>
#include <memory>
#include <vector>
@@ -119,19 +118,9 @@ Status Master::Init() {
// becomes a leader.
cert_authority_.reset(new MasterCertAuthority(fs_manager_->uuid()));
+ // The TokenSigner loads its keys during catalog manager initialization.
token_signer_.reset(new TokenSigner(FLAGS_tsk_validity_seconds,
FLAGS_tsk_rotation_seconds));
- // TODO(PKI): this also will need to be wired together with CatalogManager
- // soon, including initializing the token manager with the proper next
- // sequence number.
- {
- unique_ptr<TokenSigningPrivateKey> key;
- RETURN_NOT_OK(token_signer_->CheckNeedKey(&key));
- if (key) {
- RETURN_NOT_OK(token_signer_->AddKey(std::move(key)));
- }
- }
-
state_ = kInitialized;
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/master/master.proto
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 02ddd1d..37deba4 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -186,6 +186,14 @@ message SysCertAuthorityEntryPB {
required bytes certificate = 2 [(kudu.REDACT) = true];
}
+// The on-disk entry in the sys.catalog table ("metadata" column) to represent
+// a Token Signing Key (TSK) object. Multiple entries of this type
+// can simultaneously co-exist in the sys.catalog table.
+message SysTskEntryPB {
+ // TokenSigningPrivateKeyPB message representing a TSK.
+ required security.TokenSigningPrivateKeyPB tsk = 1;
+}
+
////////////////////////////////////////////////////////////
// RPCs
////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 5d8181a..1d698ff 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -19,6 +19,7 @@
#include <algorithm>
#include <functional>
+#include <iomanip>
#include <iterator>
#include <memory>
#include <set>
@@ -27,6 +28,7 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
+#include "kudu/common/key_encoder.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/partition.h"
#include "kudu/common/row_operations.h"
@@ -45,6 +47,7 @@
#include "kudu/master/master.h"
#include "kudu/master/master.pb.h"
#include "kudu/rpc/rpc_context.h"
+#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_bootstrap.h"
#include "kudu/tablet/transactions/write_transaction.h"
@@ -66,7 +69,6 @@ using kudu::consensus::ConsensusStatePB;
using kudu::consensus::RaftConfigPB;
using kudu::consensus::RaftPeerPB;
using kudu::log::Log;
-using kudu::log::LogAnchorRegistry;
using kudu::tablet::LatchTransactionCompletionCallback;
using kudu::tablet::Tablet;
using kudu::tablet::TabletPeer;
@@ -472,6 +474,14 @@ void SysCatalogTable::ReqDeleteTable(WriteRequestPB* req, const TableInfo* table
enc.Add(RowOperationsPB::DELETE, row);
}
+// Convert the sequence number into string padded with zeroes in the
+// beginning -- that's the key for the new TSK record.
+string SysCatalogTable::TskSeqNumberToEntryId(int64_t seq_number) {
+ string entry_id;
+ KeyEncoderTraits<DataType::INT64, string>::Encode(seq_number, &entry_id);
+ return entry_id;
+}
+
Status SysCatalogTable::VisitTables(TableVisitor* visitor) {
TRACE_EVENT0("master", "SysCatalogTable::VisitTables");
auto processor = [&](
@@ -508,8 +518,9 @@ Status SysCatalogTable::ProcessRows(
<< "cannot find sys catalog table column " << kSysCatalogTableColType
<< " in schema: " << schema_.ToString();
- const int8_t et = entry_type;
- auto pred = ColumnPredicate::Equality(schema_.column(type_col_idx), &et);
+ static const int8_t kEntryType = entry_type;
+ auto pred = ColumnPredicate::Equality(schema_.column(type_col_idx),
+ &kEntryType);
ScanSpec spec;
spec.AddPredicate(pred);
@@ -521,7 +532,8 @@ Status SysCatalogTable::ProcessRows(
RowBlock block(iter->schema(), 512, &arena);
while (iter->HasNext()) {
RETURN_NOT_OK(iter->NextBlock(&block));
- for (size_t i = 0; i < block.nrows(); ++i) {
+ const size_t nrows = block.nrows();
+ for (size_t i = 0; i < nrows; ++i) {
if (!block.selection_vector()->IsRowSelected(i)) {
continue;
}
@@ -534,6 +546,16 @@ Status SysCatalogTable::ProcessRows(
return Status::OK();
}
+Status SysCatalogTable::VisitTskEntries(TskEntryVisitor* visitor) {
+ TRACE_EVENT0("master", "SysCatalogTable::VisitTskEntries");
+ auto processor = [&](
+ const string& entry_id,
+ const SysTskEntryPB& entry_data) {
+ return visitor->Visit(entry_id, entry_data);
+ };
+ return ProcessRows<SysTskEntryPB, TSK_ENTRY>(processor);
+}
+
Status SysCatalogTable::GetCertAuthorityEntry(SysCertAuthorityEntryPB* entry) {
CHECK(entry);
vector<SysCertAuthorityEntryPB> entries;
@@ -577,6 +599,48 @@ Status SysCatalogTable::AddCertAuthorityEntry(
return Status::OK();
}
+Status SysCatalogTable::AddTskEntry(const SysTskEntryPB& entry) {
+ WriteRequestPB req;
+ WriteResponsePB resp;
+
+ req.set_tablet_id(kSysCatalogTabletId);
+ CHECK_OK(SchemaToPB(schema_, req.mutable_schema()));
+
+ CHECK(entry.tsk().has_key_seq_num());
+ CHECK(entry.tsk().has_expire_unix_epoch_seconds());
+ CHECK(entry.tsk().has_rsa_key_der());
+
+ faststring metadata_buf;
+ pb_util::SerializeToString(entry, &metadata_buf);
+
+ KuduPartialRow row(&schema_);
+ CHECK_OK(row.SetInt8(kSysCatalogTableColType, TSK_ENTRY));
+ CHECK_OK(row.SetString(kSysCatalogTableColId,
+ TskSeqNumberToEntryId(entry.tsk().key_seq_num())));
+ CHECK_OK(row.SetString(kSysCatalogTableColMetadata, metadata_buf));
+ RowOperationsPBEncoder enc(req.mutable_row_operations());
+ enc.Add(RowOperationsPB::INSERT, row);
+
+ return SyncWrite(&req, &resp);
+}
+
+Status SysCatalogTable::RemoveTskEntries(const set<string>& entry_ids) {
+ WriteRequestPB req;
+ WriteResponsePB resp;
+
+ req.set_tablet_id(kSysCatalogTabletId);
+ CHECK_OK(SchemaToPB(schema_, req.mutable_schema()));
+ for (const auto& id : entry_ids) {
+ KuduPartialRow row(&schema_);
+ CHECK_OK(row.SetInt8(kSysCatalogTableColType, TSK_ENTRY));
+ CHECK_OK(row.SetString(kSysCatalogTableColId, id));
+ RowOperationsPBEncoder enc(req.mutable_row_operations());
+ enc.Add(RowOperationsPB::DELETE, row);
+ }
+
+ return SyncWrite(&req, &resp);
+}
+
// ==================================================================
// Tablet related methods
// ==================================================================
http://git-wip-us.apache.org/repos/asf/kudu/blob/c06a3bc6/src/kudu/master/sys_catalog.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.h b/src/kudu/master/sys_catalog.h
index 4620f9d..11e92cd 100644
--- a/src/kudu/master/sys_catalog.h
+++ b/src/kudu/master/sys_catalog.h
@@ -18,6 +18,7 @@
#define KUDU_MASTER_SYS_CATALOG_H_
#include <functional>
+#include <set>
#include <string>
#include <vector>
@@ -58,12 +59,23 @@ class TabletVisitor {
const SysTabletsEntryPB& metadata) = 0;
};
+// Visitor for TSK-related (Token Signing Key) entries. Actually, only the
+// public part of those are stored in the system catalog table. That information
+// is preserved to allow any master to verify token which might be signed
+// by current or former master leader.
+class TskEntryVisitor {
+ public:
+ virtual Status Visit(const std::string& entry_id,
+ const SysTskEntryPB& metadata) = 0;
+};
+
// SysCatalogTable is a Kudu table that keeps track of the following
// system information:
// * table metadata
// * tablet metadata
-// * root CA (certificate authority) certificates
-// * corresponding CA private keys
+// * root CA (certificate authority) certificate of the Kudu IPKI
+// * Kudu IPKI root CA cert's private key
+// * TSK (Token Signing Key) entries
//
// The essential properties of the SysCatalogTable are:
// * SysCatalogTable has only one tablet.
@@ -84,6 +96,7 @@ class SysCatalogTable {
TABLES_ENTRY = 1,
TABLETS_ENTRY = 2,
CERT_AUTHORITY_INFO = 3, // Kudu's root certificate authority entry.
+ TSK_ENTRY = 4, // Token Signing Key entry.
};
// 'leader_cb_' is invoked whenever this node is elected as a leader
@@ -128,6 +141,9 @@ class SysCatalogTable {
// Scan of the tablet-related entries.
Status VisitTablets(TabletVisitor* visitor);
+ // Scan for TSK-related entries in the system table.
+ Status VisitTskEntries(TskEntryVisitor* visitor);
+
// Retrive the CA entry (private key and certificate) from the system table.
Status GetCertAuthorityEntry(SysCertAuthorityEntryPB* entry);
@@ -135,6 +151,13 @@ class SysCatalogTable {
// There should be no more than one CA entry in the system table.
Status AddCertAuthorityEntry(const SysCertAuthorityEntryPB& entry);
+ // Add TSK (Token Signing Key) entry into the system table.
+ Status AddTskEntry(const SysTskEntryPB& entry);
+
+ // Remove TSK (Token Signing Key) entries with the specified entry identifiers
+ // (as in 'entry_id' column) from the system table.
+ Status RemoveTskEntries(const std::set<std::string>& entry_ids);
+
private:
FRIEND_TEST(MasterTest, TestMasterMetadataConsistentDespiteFailures);
DISALLOW_COPY_AND_ASSIGN(SysCatalogTable);
@@ -209,6 +232,8 @@ class SysCatalogTable {
void ReqDeleteTablets(tserver::WriteRequestPB* req,
const std::vector<TabletInfo*>& tablets);
+ static string TskSeqNumberToEntryId(int64_t seq_number);
+
// Special string injected into SyncWrite() random failures (if enabled).
//
// Only useful for tests.