You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/02/04 07:20:56 UTC

[1/2] kudu git commit: [sys_catalog] make visitor's code more generic

Repository: kudu
Updated Branches:
  refs/heads/master f3ec97bd1 -> 8329bdc88


[sys_catalog] make visitor's code more generic

Minor refactoring on sys_catalog.{cc,h}: the visitor-related code
is made more generic to avoid code duplication.

There are no functional changes in this patch.

Change-Id: I7aa1e81ad7e61af8376ce4b52cd4020f5ce69a63
Reviewed-on: http://gerrit.cloudera.org:8080/5901
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/27e05ee8
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/27e05ee8
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/27e05ee8

Branch: refs/heads/master
Commit: 27e05ee84cb5f3abae0d36d1a2425f5d7701d1f8
Parents: f3ec97b
Author: Alexey Serbin <as...@cloudera.com>
Authored: Fri Feb 3 16:04:13 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Sat Feb 4 04:39:57 2017 +0000

----------------------------------------------------------------------
 src/kudu/master/sys_catalog.cc | 138 +++++++++++++++++-------------------
 src/kudu/master/sys_catalog.h  |  13 ++--
 2 files changed, 74 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/27e05ee8/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 3f2e093..59c4bc3 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -18,11 +18,14 @@
 #include "kudu/master/sys_catalog.h"
 
 #include <algorithm>
-#include <gflags/gflags.h>
-#include <glog/logging.h>
+#include <functional>
 #include <iterator>
 #include <memory>
 #include <set>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
 
 #include "kudu/common/partial_row.h"
 #include "kudu/common/partition.h"
@@ -70,8 +73,10 @@ using kudu::tablet::TabletPeer;
 using kudu::tablet::TabletStatusListener;
 using kudu::tserver::WriteRequestPB;
 using kudu::tserver::WriteResponsePB;
+using std::function;
 using std::shared_ptr;
 using std::unique_ptr;
+using std::vector;
 using strings::Substitute;
 
 namespace kudu {
@@ -352,7 +357,7 @@ Status SysCatalogTable::SyncWrite(const WriteRequestPB *req, WriteResponsePB *re
 
   CountDownLatch latch(1);
   gscoped_ptr<tablet::TransactionCompletionCallback> txn_callback(
-    new LatchTransactionCompletionCallback<WriteResponsePB>(&latch, resp));
+      new LatchTransactionCompletionCallback<WriteResponsePB>(&latch, resp));
   unique_ptr<tablet::WriteTransactionState> tx_state(
       new tablet::WriteTransactionState(tablet_peer_.get(),
                                         req,
@@ -467,16 +472,44 @@ void SysCatalogTable::ReqDeleteTable(WriteRequestPB* req, const TableInfo* table
 
 Status SysCatalogTable::VisitTables(TableVisitor* visitor) {
   TRACE_EVENT0("master", "SysCatalogTable::VisitTables");
+  auto processor = [&](
+      const string& entry_id,
+      const SysTablesEntryPB& entry_data) {
+    return visitor->VisitTable(entry_id, entry_data);
+  };
+  return ProcessRows<SysTablesEntryPB, TABLES_ENTRY>(processor);
+}
 
-  const int8_t tables_entry = TABLES_ENTRY;
+template<typename T>
+Status SysCatalogTable::GetEntryFromRow(
+    const RowBlockRow& row, string* entry_id, T* entry_data) const {
+  const Slice* id = schema_.ExtractColumnFromRow<STRING>(
+      row, schema_.find_column(kSysCatalogTableColId));
+  const Slice* data = schema_.ExtractColumnFromRow<STRING>(
+      row, schema_.find_column(kSysCatalogTableColMetadata));
+  const string& str_id = id->ToString();
+  RETURN_NOT_OK_PREPEND(
+      pb_util::ParseFromArray(entry_data, data->data(), data->size()),
+      "unable to parse metadata field for row " + str_id);
+  *entry_id = str_id;
+
+  return Status::OK();
+}
+
+// Scan for entries of the specified type and run the specified function
+// with each entry found.
+template<typename T, SysCatalogTable::CatalogEntryType entry_type>
+Status SysCatalogTable::ProcessRows(
+    function<Status(const string&, const T&)> processor) const {
   const int type_col_idx = schema_.find_column(kSysCatalogTableColType);
   CHECK(type_col_idx != Schema::kColumnNotFound)
-      << "Cannot find sys catalog table column " << kSysCatalogTableColType << " in schema: "
-      << schema_.ToString();
+      << "cannot find sys catalog table column " << kSysCatalogTableColType
+      << " in schema: " << schema_.ToString();
 
-  auto pred_tables = ColumnPredicate::Equality(schema_.column(type_col_idx), &tables_entry);
+  const int8_t et = entry_type;
+  auto pred = ColumnPredicate::Equality(schema_.column(type_col_idx), &et);
   ScanSpec spec;
-  spec.AddPredicate(pred_tables);
+  spec.AddPredicate(pred);
 
   gscoped_ptr<RowwiseIterator> iter;
   RETURN_NOT_OK(tablet_peer_->tablet()->NewRowIterator(schema_, &iter));
@@ -486,30 +519,19 @@ Status SysCatalogTable::VisitTables(TableVisitor* visitor) {
   RowBlock block(iter->schema(), 512, &arena);
   while (iter->HasNext()) {
     RETURN_NOT_OK(iter->NextBlock(&block));
-    for (size_t i = 0; i < block.nrows(); i++) {
-      if (!block.selection_vector()->IsRowSelected(i)) continue;
-
-      RETURN_NOT_OK(VisitTableFromRow(block.row(i), visitor));
+    for (size_t i = 0; i < block.nrows(); ++i) {
+      if (!block.selection_vector()->IsRowSelected(i)) {
+        continue;
+      }
+      string entry_id;
+      T entry_data;
+      RETURN_NOT_OK(GetEntryFromRow(block.row(i), &entry_id, &entry_data));
+      RETURN_NOT_OK(processor(entry_id, entry_data));
     }
   }
   return Status::OK();
 }
 
-Status SysCatalogTable::VisitTableFromRow(const RowBlockRow& row,
-                                          TableVisitor* visitor) {
-  const Slice* table_id =
-      schema_.ExtractColumnFromRow<STRING>(row, schema_.find_column(kSysCatalogTableColId));
-  const Slice* data =
-      schema_.ExtractColumnFromRow<STRING>(row, schema_.find_column(kSysCatalogTableColMetadata));
-
-  SysTablesEntryPB metadata;
-  RETURN_NOT_OK_PREPEND(pb_util::ParseFromArray(&metadata, data->data(), data->size()),
-                        "Unable to parse metadata field for table " + table_id->ToString());
-
-  RETURN_NOT_OK(visitor->VisitTable(table_id->ToString(), metadata));
-  return Status::OK();
-}
-
 // ==================================================================
 // Tablet related methods
 // ==================================================================
@@ -553,55 +575,25 @@ void SysCatalogTable::ReqDeleteTablets(WriteRequestPB* req,
   }
 }
 
-Status SysCatalogTable::VisitTabletFromRow(const RowBlockRow& row, TabletVisitor *visitor) {
-  const Slice *tablet_id =
-    schema_.ExtractColumnFromRow<STRING>(row, schema_.find_column(kSysCatalogTableColId));
-  const Slice *data =
-    schema_.ExtractColumnFromRow<STRING>(row, schema_.find_column(kSysCatalogTableColMetadata));
-
-  SysTabletsEntryPB metadata;
-  RETURN_NOT_OK_PREPEND(pb_util::ParseFromArray(&metadata, data->data(), data->size()),
-                        "Unable to parse metadata field for tablet " + tablet_id->ToString());
-
-  // Upgrade from the deprecated start/end-key fields to the 'partition' field.
-  if (!metadata.has_partition()) {
-    metadata.mutable_partition()->set_partition_key_start(
-        metadata.deprecated_start_key());
-    metadata.mutable_partition()->set_partition_key_end(
-        metadata.deprecated_end_key());
-    metadata.clear_deprecated_start_key();
-    metadata.clear_deprecated_end_key();
-  }
-
-  RETURN_NOT_OK(visitor->VisitTablet(metadata.table_id(), tablet_id->ToString(), metadata));
-  return Status::OK();
-}
-
 Status SysCatalogTable::VisitTablets(TabletVisitor* visitor) {
   TRACE_EVENT0("master", "SysCatalogTable::VisitTablets");
-  const int8_t tablets_entry = TABLETS_ENTRY;
-  const int type_col_idx = schema_.find_column(kSysCatalogTableColType);
-  CHECK(type_col_idx != Schema::kColumnNotFound);
-
-  auto pred_tablets = ColumnPredicate::Equality(schema_.column(type_col_idx), &tablets_entry);
-  ScanSpec spec;
-  spec.AddPredicate(pred_tablets);
-
-  gscoped_ptr<RowwiseIterator> iter;
-  RETURN_NOT_OK(tablet_peer_->tablet()->NewRowIterator(schema_, &iter));
-  RETURN_NOT_OK(iter->Init(&spec));
-
-  Arena arena(32 * 1024, 256 * 1024);
-  RowBlock block(iter->schema(), 512, &arena);
-  while (iter->HasNext()) {
-    RETURN_NOT_OK(iter->NextBlock(&block));
-    for (size_t i = 0; i < block.nrows(); i++) {
-      if (!block.selection_vector()->IsRowSelected(i)) continue;
-
-      RETURN_NOT_OK(VisitTabletFromRow(block.row(i), visitor));
+  auto processor = [&](
+      const string& entry_id,
+      const SysTabletsEntryPB& entry_data) {
+    if (entry_data.has_partition()) {
+      return visitor->VisitTablet(entry_data.table_id(), entry_id, entry_data);
     }
-  }
-  return Status::OK();
+    // Upgrade from the deprecated start/end-key fields to the 'partition' field.
+    SysTabletsEntryPB metadata = entry_data;
+    metadata.mutable_partition()->set_partition_key_start(
+        entry_data.deprecated_start_key());
+    metadata.clear_deprecated_start_key();
+    metadata.mutable_partition()->set_partition_key_end(
+        entry_data.deprecated_end_key());
+    metadata.clear_deprecated_end_key();
+    return visitor->VisitTablet(metadata.table_id(), entry_id, metadata);
+  };
+  return ProcessRows<SysTabletsEntryPB, TABLETS_ENTRY>(processor);
 }
 
 void SysCatalogTable::InitLocalRaftPeerPB() {

http://git-wip-us.apache.org/repos/asf/kudu/blob/27e05ee8/src/kudu/master/sys_catalog.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.h b/src/kudu/master/sys_catalog.h
index e4702ed..1916636 100644
--- a/src/kudu/master/sys_catalog.h
+++ b/src/kudu/master/sys_catalog.h
@@ -17,6 +17,7 @@
 #ifndef KUDU_MASTER_SYS_CATALOG_H_
 #define KUDU_MASTER_SYS_CATALOG_H_
 
+#include <functional>
 #include <string>
 #include <vector>
 
@@ -36,10 +37,11 @@ class WriteResponsePB;
 }
 
 namespace master {
+
 class Master;
-struct MasterOptions;
 class TableInfo;
 class TabletInfo;
+struct MasterOptions;
 
 // The SysCatalogTable has two separate visitors because the tables
 // data must be loaded into memory before the tablets data.
@@ -160,8 +162,12 @@ class SysCatalogTable {
   // tablets.
   Status WaitUntilRunning();
 
-  // Table related private methods.
-  Status VisitTableFromRow(const RowBlockRow& row, TableVisitor* visitor);
+  template<typename T>
+  Status GetEntryFromRow(const RowBlockRow& row,
+                         std::string* entry_id, T* entry_data) const;
+
+  template<typename T, CatalogEntryType entry_type>
+  Status ProcessRows(std::function<Status(const std::string&, const T&)>) const;
 
   // Tablet related private methods.
 
@@ -169,7 +175,6 @@ class SysCatalogTable {
   Status AddTabletsToPB(const std::vector<TabletInfo*>& tablets,
                         RowOperationsPB::Type op_type,
                         RowOperationsPB* ops) const;
-  Status VisitTabletFromRow(const RowBlockRow& row, TabletVisitor* visitor);
 
   // Initializes the RaftPeerPB for the local peer.
   // Crashes due to an invariant check if the rpc server is not running.


[2/2] kudu git commit: [master] store CA information in the system table

Posted by al...@apache.org.
[master] store CA information in the system table

The certificate authority information (private key and corresponding
CA certificate) is generated upon first start of Kudu cluster and
persistently stored in the system table.

Added simple unit tests for the corresponding entries in the
catalog table.  Also, added integration tests to verify the operation
in case of multi-master clusters.

The next step would be adding a way to re-generate CA information.

Change-Id: I16fa077e39f5d75f682cca7220371acdef4f0630
Reviewed-on: http://gerrit.cloudera.org:8080/5793
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Alexey Serbin <as...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/8329bdc8
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/8329bdc8
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/8329bdc8

Branch: refs/heads/master
Commit: 8329bdc8887fc72c07a792922c13f25521e8af21
Parents: 27e05ee
Author: Alexey Serbin <as...@cloudera.com>
Authored: Fri Jan 20 14:52:47 2017 -0800
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Sat Feb 4 06:26:22 2017 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/CMakeLists.txt       |   1 +
 .../master_cert_authority-itest.cc              | 278 +++++++++++++++++++
 src/kudu/master/catalog_manager.cc              |  84 +++++-
 src/kudu/master/catalog_manager.h               |  11 +-
 src/kudu/master/master.cc                       |  13 +-
 src/kudu/master/master.proto                    |  13 +
 src/kudu/master/master_cert_authority.cc        |  27 +-
 src/kudu/master/master_cert_authority.h         |  32 ++-
 src/kudu/master/master_service.cc               |   7 +-
 src/kudu/master/sys_catalog-test.cc             |  51 +++-
 src/kudu/master/sys_catalog.cc                  |  49 +++-
 src/kudu/master/sys_catalog.h                   |  30 +-
 12 files changed, 547 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/8329bdc8/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index e119bbe..7e7d16e 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -65,6 +65,7 @@ ADD_KUDU_TEST(external_mini_cluster-test RESOURCE_LOCK "master-rpc-ports")
 ADD_KUDU_TEST(fuzz-itest RUN_SERIAL true)
 ADD_KUDU_TEST(linked_list-test RESOURCE_LOCK "master-rpc-ports")
 ADD_KUDU_TEST(log-rolling-itest)
+ADD_KUDU_TEST(master_cert_authority-itest RESOURCE_LOCK "master-rpc-ports")
 ADD_KUDU_TEST(master_failover-itest RESOURCE_LOCK "master-rpc-ports")
 ADD_KUDU_TEST(master_migration-itest RESOURCE_LOCK "master-rpc-ports")
 ADD_KUDU_TEST_DEPENDENCIES(master_migration-itest

http://git-wip-us.apache.org/repos/asf/kudu/blob/8329bdc8/src/kudu/integration-tests/master_cert_authority-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/master_cert_authority-itest.cc b/src/kudu/integration-tests/master_cert_authority-itest.cc
new file mode 100644
index 0000000..9e4dbfe
--- /dev/null
+++ b/src/kudu/integration-tests/master_cert_authority-itest.cc
@@ -0,0 +1,278 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/mini_cluster.h"
+#include "kudu/master/catalog_manager.h"
+#include "kudu/master/master.h"
+#include "kudu/master/master.proxy.h"
+#include "kudu/master/master_cert_authority.h"
+#include "kudu/master/mini_master.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/security/ca/cert_management.h"
+#include "kudu/security/cert.h"
+#include "kudu/security/crypto.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::shared_ptr;
+
+namespace kudu {
+
+using security::ca::CertRequestGenerator;
+using security::Cert;
+using security::CertSignRequest;
+using security::DataFormat;
+using security::PrivateKey;
+
+namespace master {
+
+class MasterCertAuthorityTest : public KuduTest {
+ public:
+  MasterCertAuthorityTest() {
+    // Hard-coded ports for the masters. This is safe, as this unit test
+    // runs under a resource lock (see CMakeLists.txt in this directory).
+    // TODO(aserbin): we should have a generic method to obtain n free ports.
+    opts_.master_rpc_ports = { 11010, 11011, 11012 };
+
+    opts_.num_masters = num_masters_ = opts_.master_rpc_ports.size();
+  }
+
+  virtual void SetUp() OVERRIDE {
+    KuduTest::SetUp();
+    cluster_.reset(new MiniCluster(env_, opts_));
+    ASSERT_OK(cluster_->Start());
+  }
+
+  Status RestartCluster() {
+    cluster_->Shutdown();
+    return cluster_->Start();
+  }
+
+  // Check the leader is found on the cluster.
+  Status WaitForLeader() {
+    return cluster_->GetLeaderMasterIndex(nullptr);
+  }
+
+  void GetCurrentCertAuthInfo(string* ca_pkey_str, string* ca_cert_str) {
+    int leader_idx;
+    ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+    MiniMaster* leader = cluster_->mini_master(leader_idx);
+    Master* master = leader->master();
+
+    auto ca_pkey = master->cert_authority()->ca_private_key_.get();
+    ASSERT_NE(nullptr, ca_pkey);
+    ASSERT_OK(ca_pkey->ToString(ca_pkey_str, DataFormat::PEM));
+
+    auto ca_cert = master->cert_authority()->ca_cert_.get();
+    ASSERT_NE(nullptr, ca_cert);
+    ASSERT_OK(ca_cert->ToString(ca_cert_str, DataFormat::PEM));
+  }
+
+  void SendRegistrationHBs(const shared_ptr<rpc::Messenger>& messenger) {
+    TSToMasterCommonPB common;
+    common.mutable_ts_instance()->set_permanent_uuid(kFakeTsUUID);
+    common.mutable_ts_instance()->set_instance_seqno(1);
+    ServerRegistrationPB fake_reg;
+    HostPortPB* pb = fake_reg.add_rpc_addresses();
+    pb->set_host("localhost");
+    pb->set_port(1000);
+    pb = fake_reg.add_http_addresses();
+    pb->set_host("localhost");
+    pb->set_port(2000);
+
+    for (int i = 0; i < cluster_->num_masters(); ++i) {
+      TSHeartbeatRequestPB req;
+      TSHeartbeatResponsePB resp;
+      rpc::RpcController rpc;
+
+      req.mutable_common()->CopyFrom(common);
+      req.mutable_registration()->CopyFrom(fake_reg);
+
+      MiniMaster* m = cluster_->mini_master(i);
+      if (!m->is_running()) {
+        continue;
+      }
+      MasterServiceProxy proxy(messenger, m->bound_rpc_addr());
+
+      // All masters (including followers) should accept the heartbeat.
+      ASSERT_OK(proxy.TSHeartbeat(req, &resp, &rpc));
+      SCOPED_TRACE(SecureDebugString(resp));
+      ASSERT_FALSE(resp.has_error());
+    }
+  }
+
+  void SendCertSignRequestHBs(const shared_ptr<rpc::Messenger>& messenger,
+                              const string& csr_str,
+                              bool* has_signed_certificate,
+                              string* signed_certificate) {
+    TSToMasterCommonPB common;
+    common.mutable_ts_instance()->set_permanent_uuid(kFakeTsUUID);
+    common.mutable_ts_instance()->set_instance_seqno(1);
+
+    string ts_cert_str;
+    bool has_leader_master_response = false;
+    for (int i = 0; i < cluster_->num_masters(); ++i) {
+      TSHeartbeatRequestPB req;
+      TSHeartbeatResponsePB resp;
+      rpc::RpcController rpc;
+
+      req.mutable_common()->CopyFrom(common);
+      req.set_csr_der(csr_str);
+
+      MiniMaster* m = cluster_->mini_master(i);
+      if (!m->is_running()) {
+        continue;
+      }
+      MasterServiceProxy proxy(messenger, m->bound_rpc_addr());
+
+      // All masters (including followers) should accept the heartbeat.
+      ASSERT_OK(proxy.TSHeartbeat(req, &resp, &rpc));
+      SCOPED_TRACE(SecureDebugString(resp));
+      ASSERT_FALSE(resp.has_error());
+
+      // Only the leader sends back the signed server certificate.
+      if (resp.leader_master()) {
+        has_leader_master_response = true;
+        ASSERT_TRUE(resp.has_signed_cert_der());
+        ts_cert_str = resp.signed_cert_der();
+      } else {
+        ASSERT_FALSE(resp.has_signed_cert_der());
+      }
+    }
+    if (has_leader_master_response) {
+      *has_signed_certificate = true;
+      *signed_certificate = ts_cert_str;
+    }
+  }
+
+ protected:
+  static const char kFakeTsUUID[];
+
+  int num_masters_;
+  MiniClusterOptions opts_;
+  gscoped_ptr<MiniCluster> cluster_;
+};
+const char MasterCertAuthorityTest::kFakeTsUUID[] = "fake-ts-uuid";
+
+// If no certificate authority information is present, a new one is generated
+// upon start of the cluster. So, check that the CA information is available
+// upon start of the cluster and it stays the same after the cluster restarts.
+TEST_F(MasterCertAuthorityTest, CertAuthorityPersistsUponRestart) {
+  string ref_ca_pkey_str;
+  string ref_ca_cert_str;
+  NO_FATALS(GetCurrentCertAuthInfo(&ref_ca_pkey_str, &ref_ca_cert_str));
+
+  ASSERT_OK(RestartCluster());
+
+  string ca_pkey_str;
+  string ca_cert_str;
+  NO_FATALS(GetCurrentCertAuthInfo(&ca_pkey_str, &ca_cert_str));
+
+  EXPECT_EQ(ref_ca_pkey_str, ca_pkey_str);
+  EXPECT_EQ(ref_ca_cert_str, ca_cert_str);
+}
+
+// Check that current master leader posesses the CA private key and certificate
+// and if the leader role switches to some other master server, the new leader
+// provides the same information as the former one.
+TEST_F(MasterCertAuthorityTest, CertAuthorityOnLeaderRoleSwitch) {
+  string ref_pkey_str;
+  string ref_cert_str;
+  NO_FATALS(GetCurrentCertAuthInfo(&ref_pkey_str, &ref_cert_str));
+
+  int leader_idx;
+  ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+  MiniMaster* leader_master = cluster_->mini_master(leader_idx);
+  leader_master->Shutdown();
+
+  string new_leader_pkey_str;
+  string new_leader_cert_str;
+  NO_FATALS(GetCurrentCertAuthInfo(&new_leader_pkey_str, &new_leader_cert_str));
+
+  EXPECT_EQ(ref_pkey_str, new_leader_pkey_str);
+  EXPECT_EQ(ref_cert_str, new_leader_cert_str);
+}
+
+// Test that every master accepts heartbeats, but only the leader master
+// responds with with signed certificate if a heartbeat contains the CSR field.
+TEST_F(MasterCertAuthorityTest, MasterLeaderSignsCSR) {
+  shared_ptr<rpc::Messenger> messenger;
+  rpc::MessengerBuilder bld("Client");
+  ASSERT_OK(bld.Build(&messenger));
+  NO_FATALS(SendRegistrationHBs(messenger));
+
+  string csr_str;
+  {
+    CertRequestGenerator::Config gen_config;
+    gen_config.uuid = kFakeTsUUID;
+    gen_config.hostnames =  {"localhost"};
+    PrivateKey key;
+    ASSERT_OK(security::GeneratePrivateKey(512, &key));
+    CertRequestGenerator gen(gen_config);
+    ASSERT_OK(gen.Init());
+    CertSignRequest csr;
+    ASSERT_OK(gen.GenerateRequest(key, &csr));
+    ASSERT_OK(csr.ToString(&csr_str, DataFormat::DER));
+  }
+
+  // Make sure a tablet server receives signed certificate from
+  // the leader master.
+  ASSERT_OK(WaitForLeader());
+  {
+    string ts_cert_str;
+    bool has_ts_cert = false;
+    NO_FATALS(SendCertSignRequestHBs(messenger, csr_str,
+                                     &has_ts_cert, &ts_cert_str));
+    ASSERT_TRUE(has_ts_cert);
+
+    // Try to load the certificate to check that the data is not corrupted.
+    Cert ts_cert;
+    ASSERT_OK(ts_cert.FromString(ts_cert_str, DataFormat::DER));
+  }
+
+  // Shutdown the leader master and check the new leader signs
+  // certificate request sent in a tablet server hearbeat.
+  int leader_idx;
+  ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+  MiniMaster* leader_master = cluster_->mini_master(leader_idx);
+  leader_master->Shutdown();
+
+  // Re-register with the new leader.
+  ASSERT_OK(WaitForLeader());
+  NO_FATALS(SendRegistrationHBs(messenger));
+  {
+    string ts_cert_str;
+    bool has_ts_cert = false;
+    NO_FATALS(SendCertSignRequestHBs(messenger, csr_str,
+                                     &has_ts_cert, &ts_cert_str));
+    ASSERT_TRUE(has_ts_cert);
+    // Try to load the certificate to check that the data is not corrupted.
+    Cert ts_cert;
+    ASSERT_OK(ts_cert.FromString(ts_cert_str, DataFormat::DER));
+  }
+}
+
+} // namespace master
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/8329bdc8/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index e29aca1..02acb38 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -75,11 +75,14 @@
 #include "kudu/gutil/walltime.h"
 #include "kudu/master/master.h"
 #include "kudu/master/master.pb.h"
+#include "kudu/master/master_cert_authority.h"
 #include "kudu/master/sys_catalog.h"
 #include "kudu/master/ts_descriptor.h"
 #include "kudu/master/ts_manager.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/rpc_context.h"
+#include "kudu/security/cert.h"
+#include "kudu/security/crypto.h"
 #include "kudu/tserver/tserver_admin.proxy.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/flag_tags.h"
@@ -90,8 +93,8 @@
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/thread.h"
-#include "kudu/util/threadpool.h"
 #include "kudu/util/thread_restrictions.h"
+#include "kudu/util/threadpool.h"
 #include "kudu/util/trace.h"
 
 DEFINE_int32(master_ts_rpc_timeout_ms, 30 * 1000, // 30 sec
@@ -672,6 +675,51 @@ Status CatalogManager::WaitUntilCaughtUpAsLeader(const MonoDelta& timeout) {
   return Status::OK();
 }
 
+// Check if CA private key and the certificate were loaded into the
+// memory. If not, generate new ones and store them into the system table.
+// Use the CA information to initialize the certificate authority.
+Status CatalogManager::CheckInitCertAuthority() {
+  using security::Cert;
+  using security::DataFormat;
+  using security::PrivateKey;
+
+  leader_lock_.AssertAcquiredForWriting();
+
+  MasterCertAuthority* ca = master_->cert_authority();
+  unique_ptr<PrivateKey> ca_private_key(new PrivateKey);
+  unique_ptr<Cert> ca_cert(new Cert);
+
+  SysCertAuthorityEntryPB info;
+  const Status s = sys_catalog_->GetCertAuthorityEntry(&info);
+  if (!(s.ok() || s.IsNotFound())) {
+    return s;
+  }
+  if (PREDICT_TRUE(s.ok())) {
+    // The loaded data is necessary to initialize master's cert authority.
+    RETURN_NOT_OK(ca_private_key->FromString(
+        info.private_key(), DataFormat::DER));
+    RETURN_NOT_OK(ca_cert->FromString(
+        info.certificate(), DataFormat::DER));
+  } else {
+    // Generate new private key and corresponding CA certificate.
+    RETURN_NOT_OK(ca->Generate(ca_private_key.get(), ca_cert.get()));
+    RETURN_NOT_OK(ca_private_key->ToString(
+        info.mutable_private_key(), DataFormat::DER));
+    RETURN_NOT_OK(ca_cert->ToString(
+        info.mutable_certificate(), DataFormat::DER));
+    // Store the newly generated private key and certificate
+    // in the system table.
+    RETURN_NOT_OK(sys_catalog_->AddCertAuthorityEntry(info));
+    LOG(INFO) << "Wrote cert authority information into the system table.";
+  }
+
+  // Initialize/re-initialize the master's certificate authority component
+  // with the new private key and certificate.
+  RETURN_NOT_OK(ca->Init(std::move(ca_private_key), std::move(ca_cert)));
+
+  return Status::OK();
+}
+
 void CatalogManager::VisitTablesAndTabletsTask() {
   {
     // Hack to block this function until InitSysCatalogAsync() is finished.
@@ -712,17 +760,31 @@ void CatalogManager::VisitTablesAndTabletsTask() {
     return;
   }
 
-  LOG(INFO) << "Loading table and tablet metadata into memory...";
-  LOG_SLOW_EXECUTION(WARNING, 1000, LogPrefix() + "Loading metadata into memory") {
-    CHECK_OK(VisitTablesAndTablets());
+  {
+    // Block new catalog operations, and wait for existing operations to finish.
+    std::lock_guard<RWMutex> leader_lock_guard(leader_lock_);
+
+    LOG(INFO) << "Loading table and tablet metadata into memory...";
+    LOG_SLOW_EXECUTION(WARNING, 1000, LogPrefix() +
+                       "Loading metadata into memory") {
+      CHECK_OK(VisitTablesAndTabletsUnlocked());
+    }
+
+    // TODO(PKI): this should not be done in case of external PKI.
+    // TODO(PKI): should be there a flag to reset already existing CA info?
+    LOG(INFO) << "Loading CA info into memory...";
+    LOG_SLOW_EXECUTION(WARNING, 1000, LogPrefix() +
+                       "Loading CA info into memory") {
+      CHECK_OK(CheckInitCertAuthority());
+    }
   }
+
   std::lock_guard<simple_spinlock> l(state_lock_);
   leader_ready_term_ = term;
 }
 
-Status CatalogManager::VisitTablesAndTablets() {
-  // Block new catalog operations, and wait for existing operations to finish.
-  std::lock_guard<RWMutex> leader_lock_guard(leader_lock_);
+Status CatalogManager::VisitTablesAndTabletsUnlocked() {
+  leader_lock_.AssertAcquiredForWriting();
 
   // This lock is held for the entirety of the function because the calls to
   // VisitTables and VisitTablets mutate global maps.
@@ -750,6 +812,13 @@ Status CatalogManager::VisitTablesAndTablets() {
   return Status::OK();
 }
 
+// This method is called by tests only.
+Status CatalogManager::VisitTablesAndTablets() {
+  // Block new catalog operations, and wait for existing operations to finish.
+  std::lock_guard<RWMutex> leader_lock_guard(leader_lock_);
+  return VisitTablesAndTabletsUnlocked();
+}
+
 Status CatalogManager::InitSysCatalogAsync(bool is_first_run) {
   std::lock_guard<LockType> l(lock_);
   unique_ptr<SysCatalogTable> new_catalog(
@@ -3613,6 +3682,7 @@ void CatalogManager::DumpState(std::ostream* out) const {
     ids_copy = table_ids_map_;
     names_copy = table_names_map_;
     tablets_copy = tablet_map_;
+    // TODO(aserbin): add information about root CA certs, if any
   }
 
   *out << "Tables:\n";

http://git-wip-us.apache.org/repos/asf/kudu/blob/8329bdc8/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 3820e86..336cca3 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -17,14 +17,16 @@
 #ifndef KUDU_MASTER_CATALOG_MANAGER_H
 #define KUDU_MASTER_CATALOG_MANAGER_H
 
-#include <boost/optional/optional_fwd.hpp>
 #include <map>
+#include <memory>
 #include <set>
 #include <string>
 #include <unordered_map>
 #include <unordered_set>
 #include <vector>
 
+#include <boost/optional/optional_fwd.hpp>
+
 #include "kudu/common/partition.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
@@ -546,6 +548,8 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   // Clears out the existing metadata ('table_names_map_', 'table_ids_map_',
   // and 'tablet_map_'), loads tables metadata into memory and if successful
   // loads the tablets metadata.
+  Status VisitTablesAndTabletsUnlocked();
+  // This is called by tests only.
   Status VisitTablesAndTablets();
 
   // Helper for initializing 'sys_catalog_'. After calling this
@@ -556,6 +560,11 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   // This method is thread-safe.
   Status InitSysCatalogAsync(bool is_first_run);
 
+  // Load existing root CA information from the system table, if present.
+  // If not, generate new ones, store them into the system table.
+  // After that, use the CA information to initialize the certificate authority.
+  Status CheckInitCertAuthority();
+
   // Helper for creating the initial TableInfo state
   // Leaves the table "write locked" with the new info in the
   // "dirty" state field.

http://git-wip-us.apache.org/repos/asf/kudu/blob/8329bdc8/src/kudu/master/master.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index 93873fc..5fb45c2 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -95,16 +95,13 @@ Status Master::Init() {
 
   RETURN_NOT_OK(ServerBase::Init());
 
-  // TODO(PKI): the CA should not be initialized if built-in PKI is disabled
-  // by some flag.
-  cert_authority_.reset(new MasterCertAuthority(fs_manager_->uuid()));
-  // TODO(PKI): master should sign its own cert, but it has to be done
-  // after the catalog manager is loaded. Not clear the right place to
-  // put it, so punting on it at the moment.
-  RETURN_NOT_OK(cert_authority_->Init());
-
   RETURN_NOT_OK(path_handlers_->Register(web_server_.get()));
 
+  // The certificate authority object is initialized upon loading
+  // CA private key and certificate from the system table when the server
+  // becomes a leader.
+  cert_authority_.reset(new MasterCertAuthority(fs_manager_->uuid()));
+
   state_ = kInitialized;
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/8329bdc8/src/kudu/master/master.proto
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index bc23b91..3ab3146 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -172,6 +172,19 @@ message SysTablesEntryPB {
   optional bytes state_msg = 7;
 }
 
+// The on-disk entry in the sys.catalog table ("metadata" column) to represent
+// certificate authority (CA) information. Not more than one entry of this type
+// should exist in the sys.catalog table at any time.
+message SysCertAuthorityEntryPB {
+  // Private key body in DER format. This information is sensitive,
+  // that's why the REDACT attribute is present.
+  required bytes private_key = 1 [(kudu.REDACT) = true];
+  // Certificate body in DER format. This information is not sensitive,
+  // but we are redacting most of security-related info anyway
+  // (handshake/SASL tokens, etc.).
+  required bytes certificate = 2 [(kudu.REDACT) = true];
+}
+
 ////////////////////////////////////////////////////////////
 // RPCs
 ////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/kudu/blob/8329bdc8/src/kudu/master/master_cert_authority.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master_cert_authority.cc b/src/kudu/master/master_cert_authority.cc
index fd24f28..36fe1c7 100644
--- a/src/kudu/master/master_cert_authority.cc
+++ b/src/kudu/master/master_cert_authority.cc
@@ -17,15 +17,18 @@
 
 #include "kudu/master/master_cert_authority.h"
 
-#include <gflags/gflags.h>
 #include <memory>
 #include <string>
+#include <utility>
+
+#include <gflags/gflags.h>
 
 #include "kudu/security/ca/cert_management.h"
 #include "kudu/security/cert.h"
 #include "kudu/security/crypto.h"
 #include "kudu/security/openssl_util.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/status.h"
 
 using std::string;
 using std::unique_ptr;
@@ -71,18 +74,22 @@ MasterCertAuthority::MasterCertAuthority(string server_uuid)
 MasterCertAuthority::~MasterCertAuthority() {
 }
 
-Status MasterCertAuthority::Init() {
-  CHECK(!ca_private_key_);
-
+// Generate
+Status MasterCertAuthority::Generate(security::PrivateKey* key,
+                                     security::Cert* cert) const {
+  CHECK(key);
+  CHECK(cert);
   // Create a key and cert for the self-signed CA.
-  unique_ptr<PrivateKey> key(new PrivateKey());
-  unique_ptr<Cert> ca_cert(new Cert());
-  RETURN_NOT_OK(GeneratePrivateKey(FLAGS_master_ca_rsa_key_length_bits, key.get()));
-  RETURN_NOT_OK(CertSigner::SelfSignCA(*key, PrepareCaConfig(server_uuid_), ca_cert.get()));
+  RETURN_NOT_OK(GeneratePrivateKey(FLAGS_master_ca_rsa_key_length_bits, key));
+  return CertSigner::SelfSignCA(*key, PrepareCaConfig(server_uuid_), cert);
+}
 
-  // Initialize our signer with the new CA.
-  ca_cert_ = std::move(ca_cert);
+Status MasterCertAuthority::Init(unique_ptr<PrivateKey> key,
+                                 unique_ptr<Cert> cert) {
+  CHECK(key);
+  CHECK(cert);
   ca_private_key_ = std::move(key);
+  ca_cert_ = std::move(cert);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/8329bdc8/src/kudu/master/master_cert_authority.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/master_cert_authority.h b/src/kudu/master/master_cert_authority.h
index 3ffed73..ca70163 100644
--- a/src/kudu/master/master_cert_authority.h
+++ b/src/kudu/master/master_cert_authority.h
@@ -16,14 +16,15 @@
 // under the License.
 #pragma once
 
-#include "kudu/gutil/macros.h"
-#include "kudu/util/status.h"
-
 #include <memory>
 #include <string>
 
+#include "kudu/gutil/macros.h"
+
 namespace kudu {
 
+class Status;
+
 namespace security {
 
 class Cert;
@@ -36,6 +37,8 @@ class CertSigner;
 
 namespace master {
 
+class MasterCertAuthorityTest;
+
 // Implements the X509 certificate-authority functionality of the Master.
 //
 // This is used in the "built-in PKI" mode of operation. The master generates
@@ -48,11 +51,19 @@ class MasterCertAuthority {
   explicit MasterCertAuthority(std::string server_uuid);
   virtual ~MasterCertAuthority();
 
-  // Initializes the MasterCertAuthority by generating a new private key
-  // and self-signed CA certificate.
-  //
-  // Must be called exactly once before any other functions.
-  Status Init();
+  // Generate a private key and corresponding self-signed root CA certificate
+  // bound to the aggregated server UUID. Does not require Init() to be called.
+  // Calling this method does not have side-effects on the instance, i.e.
+  // even this method has been called on object, it's still necessary
+  // to call Init() prior to calling SignServerCSR() method.
+  Status Generate(security::PrivateKey* key, security::Cert* cert) const;
+
+  // Initializes the MasterCertAuthority with the given private key
+  // and CA certificate. This method is called when the master server
+  // is elected as a leader -- upon that event it initializes the certificate
+  // authority with the information read from the system table.
+  Status Init(std::unique_ptr<security::PrivateKey> key,
+              std::unique_ptr<security::Cert> cert);
 
   // Sign the given CSR 'csr_der' provided by a server in the cluster.
   //
@@ -60,9 +71,14 @@ class MasterCertAuthority {
   // The resulting certificate, also in DER format, is returned in 'cert_der'.
   //
   // REQUIRES: Init() must be called first.
+  //
+  // NOTE:  This method is not going to be called in parallel with Init()
+  //        due to the current design, so there is no internal synchronization
+  //        to keep the internal state consistent.
   Status SignServerCSR(const std::string& csr_der, std::string* cert_der);
 
  private:
+  friend class ::kudu::master::MasterCertAuthorityTest;
   // The UUID of the master. This is used as a field in the certificate.
   const std::string server_uuid_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/8329bdc8/src/kudu/master/master_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index 13c319c..c153d76 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -147,9 +147,8 @@ void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
     }
   }
 
-  // 6. If the heartbeat has a CSR, sign their cert.
-  // TODO(PKI): should this be done only by leaders or all masters?
-  if (req->has_csr_der()) {
+  // 6. Only leaders sign CSR from tablet servers (if present).
+  if (is_leader_master && req->has_csr_der()) {
     string cert;
     Status s = server_->cert_authority()->SignServerCSR(req->csr_der(), &cert);
     if (!s.ok()) {
@@ -160,7 +159,7 @@ void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
     resp->mutable_signed_cert_der()->swap(cert);
   }
 
-  // 7. Send any active CA certs which the TS doesn't have.
+  // TODO(aserbin): 7. Send any active CA certs which the TS doesn't have.
 
   rpc->RespondSuccess();
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/8329bdc8/src/kudu/master/sys_catalog-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog-test.cc b/src/kudu/master/sys_catalog-test.cc
index abaef6a..7d8f361 100644
--- a/src/kudu/master/sys_catalog-test.cc
+++ b/src/kudu/master/sys_catalog-test.cc
@@ -15,12 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <gtest/gtest.h>
 
 #include <algorithm>
 #include <memory>
+#include <string>
 #include <vector>
 
+#include <gtest/gtest.h>
+
 #include "kudu/common/wire_protocol.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/master/catalog_manager.h"
@@ -28,8 +30,11 @@
 #include "kudu/master/master.proxy.h"
 #include "kudu/master/mini_master.h"
 #include "kudu/master/sys_catalog.h"
-#include "kudu/server/rpc_server.h"
 #include "kudu/rpc/messenger.h"
+#include "kudu/security/cert.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/server/rpc_server.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
@@ -39,7 +44,9 @@ using std::string;
 using std::shared_ptr;
 using kudu::rpc::Messenger;
 using kudu::rpc::MessengerBuilder;
-using kudu::rpc::RpcController;
+using kudu::security::Cert;
+using kudu::security::DataFormat;
+using kudu::security::PrivateKey;
 
 namespace kudu {
 namespace master {
@@ -382,5 +389,43 @@ TEST_F(SysCatalogTest, TestTabletInfoCommit) {
   }
 }
 
+// Check loading the auto-generated certificate authority information
+// upon startup.
+TEST_F(SysCatalogTest, LoadCertAuthorityInfo) {
+  // The system catalog should already contain newly generated CA private key
+  // and certificate: the SetUp() method awaits for the catalog manager
+  // becoming leader master, and by that time the certificate authority
+  // information should be loaded.
+  SysCertAuthorityEntryPB ca_entry;
+  ASSERT_OK(master_->catalog_manager()->sys_catalog()->
+            GetCertAuthorityEntry(&ca_entry));
+
+  // The CA private key data should be valid (DER format).
+  PrivateKey pkey;
+  EXPECT_OK(pkey.FromString(ca_entry.private_key(), DataFormat::DER));
+
+  // The data should be valid CA certificate in (DER format).
+  Cert cert;
+  EXPECT_OK(cert.FromString(ca_entry.certificate(), DataFormat::DER));
+}
+
+// Check that if the certificate authority information is already present,
+// it cannot be overwritten using SysCatalogTable::AddCertAuthorityInfo().
+// Basically, this is to verify that SysCatalogTable::AddCertAuthorityInfo()
+// can be called just once to store CA information on first cluster startup.
+TEST_F(SysCatalogTest, AttemptOverwriteCertAuthorityInfo) {
+  // The system catalog should already contain newly generated CA private key
+  // and certificate: the SetUp() method awaits for the catalog manager
+  // becoming leader master, and by that time the certificate authority
+  // information should be loaded.
+  SysCertAuthorityEntryPB ca_entry;
+  ASSERT_OK(master_->catalog_manager()->sys_catalog()->
+            GetCertAuthorityEntry(&ca_entry));
+  const Status s = master_->catalog_manager()->sys_catalog()->
+            AddCertAuthorityEntry(ca_entry);
+  ASSERT_TRUE(s.IsCorruption()) << s.ToString();
+  ASSERT_EQ("Corruption: One or more rows failed to write", s.ToString());
+}
+
 } // namespace master
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/8329bdc8/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 59c4bc3..5d8181a 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -88,6 +88,8 @@ static const char* const kSysCatalogTableColMetadata = "metadata";
 
 const char* const SysCatalogTable::kSysCatalogTabletId =
     "00000000000000000000000000000000";
+const char* const SysCatalogTable::kSysCertAuthorityEntryId =
+    "root-ca-info";
 const char* const SysCatalogTable::kInjectedFailureStatusMsg =
     "INJECTED FAILURE";
 
@@ -487,11 +489,11 @@ Status SysCatalogTable::GetEntryFromRow(
       row, schema_.find_column(kSysCatalogTableColId));
   const Slice* data = schema_.ExtractColumnFromRow<STRING>(
       row, schema_.find_column(kSysCatalogTableColMetadata));
-  const string& str_id = id->ToString();
+  string str_id = id->ToString();
   RETURN_NOT_OK_PREPEND(
       pb_util::ParseFromArray(entry_data, data->data(), data->size()),
       "unable to parse metadata field for row " + str_id);
-  *entry_id = str_id;
+  *entry_id = std::move(str_id);
 
   return Status::OK();
 }
@@ -532,6 +534,49 @@ Status SysCatalogTable::ProcessRows(
   return Status::OK();
 }
 
+Status SysCatalogTable::GetCertAuthorityEntry(SysCertAuthorityEntryPB* entry) {
+  CHECK(entry);
+  vector<SysCertAuthorityEntryPB> entries;
+  auto processor = [&](
+      const string& entry_id,
+      const SysCertAuthorityEntryPB& entry_data) {
+    CHECK_EQ(entry_id, kSysCertAuthorityEntryId);
+    entries.push_back(entry_data);
+    return Status::OK();
+  };
+  RETURN_NOT_OK((
+      ProcessRows<SysCertAuthorityEntryPB, CERT_AUTHORITY_INFO>(processor)));
+  // There should be no more than one root CA entry in the system table.
+  CHECK_LE(entries.size(), 1);
+  if (entries.empty()) {
+    return Status::NotFound("root CA entry not found");
+  }
+  entries.front().Swap(entry);
+
+  return Status::OK();
+}
+
+Status SysCatalogTable::AddCertAuthorityEntry(
+    const SysCertAuthorityEntryPB& entry) {
+  WriteRequestPB req;
+  WriteResponsePB resp;
+  req.set_tablet_id(kSysCatalogTabletId);
+  RETURN_NOT_OK(SchemaToPB(schema_, req.mutable_schema()));
+
+  faststring metadata_buf;
+  pb_util::SerializeToString(entry, &metadata_buf);
+
+  KuduPartialRow row(&schema_);
+  CHECK_OK(row.SetInt8(kSysCatalogTableColType, CERT_AUTHORITY_INFO));
+  CHECK_OK(row.SetString(kSysCatalogTableColId, kSysCertAuthorityEntryId));
+  CHECK_OK(row.SetString(kSysCatalogTableColMetadata, metadata_buf));
+  RowOperationsPBEncoder enc(req.mutable_row_operations());
+  enc.Add(RowOperationsPB::INSERT, row);
+  RETURN_NOT_OK(SyncWrite(&req, &resp));
+
+  return Status::OK();
+}
+
 // ==================================================================
 // Tablet related methods
 // ==================================================================

http://git-wip-us.apache.org/repos/asf/kudu/blob/8329bdc8/src/kudu/master/sys_catalog.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.h b/src/kudu/master/sys_catalog.h
index 1916636..4620f9d 100644
--- a/src/kudu/master/sys_catalog.h
+++ b/src/kudu/master/sys_catalog.h
@@ -58,21 +58,32 @@ class TabletVisitor {
                              const SysTabletsEntryPB& metadata) = 0;
 };
 
-// SysCatalogTable is a Kudu table that keeps track of table and
-// tablet metadata.
-// - SysCatalogTable has only one tablet.
-// - SysCatalogTable is managed by the master and not exposed to the user
-//   as a "normal table", instead we have Master APIs to query the table.
+// SysCatalogTable is a Kudu table that keeps track of the following
+// system information:
+//   * table metadata
+//   * tablet metadata
+//   * root CA (certificate authority) certificates
+//   * corresponding CA private keys
+//
+// The essential properties of the SysCatalogTable are:
+//   * SysCatalogTable has only one tablet.
+//   * SysCatalogTable is managed by the master and not exposed to the user
+//     as a "normal table", instead we have Master APIs to query the table.
 class SysCatalogTable {
  public:
   // Magic ID of the system tablet.
   static const char* const kSysCatalogTabletId;
 
+  // Root certificate authority (CA) entry identifier in the system table.
+  // There should be no more than one entry of this type in the system table.
+  static const char* const kSysCertAuthorityEntryId;
+
   typedef Callback<Status()> ElectedLeaderCallback;
 
   enum CatalogEntryType {
     TABLES_ENTRY = 1,
-    TABLETS_ENTRY = 2
+    TABLETS_ENTRY = 2,
+    CERT_AUTHORITY_INFO = 3,  // Kudu's root certificate authority entry.
   };
 
   // 'leader_cb_' is invoked whenever this node is elected as a leader
@@ -117,6 +128,13 @@ class SysCatalogTable {
   // Scan of the tablet-related entries.
   Status VisitTablets(TabletVisitor* visitor);
 
+  // Retrive the CA entry (private key and certificate) from the system table.
+  Status GetCertAuthorityEntry(SysCertAuthorityEntryPB* entry);
+
+  // Add root CA entry (private key and certificate) into the system table.
+  // There should be no more than one CA entry in the system table.
+  Status AddCertAuthorityEntry(const SysCertAuthorityEntryPB& entry);
+
  private:
   FRIEND_TEST(MasterTest, TestMasterMetadataConsistentDespiteFailures);
   DISALLOW_COPY_AND_ASSIGN(SysCatalogTable);