You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/05/11 00:44:21 UTC

[1/5] kudu git commit: [catalog_manager] categorization of rw operation failures

Repository: kudu
Updated Branches:
  refs/heads/master b927e80a8 -> e4e59bb5a


[catalog_manager] categorization of rw operation failures

This changelist introduces the categorization of the system catalog's
read and write operation failures which happen on leader post-election
callback. There are two categories of errors: fatal and non-fatal.

If an operation against the system catalog fails in between terms of
the catalog leadership, the error is considered non-fatal. In case of
a non-fatal error the leader post-election task bails out: the catalog
is no longer the leader at the original term and the task should be
executed by the new leader upon execution of the ElectedAsLeaderCb.

If an operation against the system catalog fails within the same term
of catalog leadership, the error is considered fatal and that causes the
master process to crash (with an exception of writing a newly generated
TSK when the TokenSigner still has a TSK to use). This is to avoid a
possible inconsistency when working with the tables/tablets metadata,
the IPKI certificate authority information and the Token Signing Keys.

Any failure of a read or write operation against the system catalog
happened during the catalog's shutdown is ignored and the leader
post-election task bails out once detecting such failure.

The same policy applies to other (i.e. not specific to read and write
operations against the system catalog) errors which might happen while
working with the IPKI certificate authority information and TokenSigner.
The rationale is the same as for handling the system catalog operation
failures: in case of an error, the leader has no consistent information
to work with, meanwhile a non-leader does not use the information
affected by the failure at all and can safely ignore the error.

Added a test to verify that the master server does not crash if change
of leadership detected while trying to persist a newly generated TSK
(Token Signing Key).

Change-Id: I826826049e3c08a6c8345949690cbbedaea32ff8
Reviewed-on: http://gerrit.cloudera.org:8080/6170
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <da...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/6b6593a0
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/6b6593a0
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/6b6593a0

Branch: refs/heads/master
Commit: 6b6593a0ce9dff6d4adf35084d3e4aba24816a26
Parents: b927e80
Author: Alexey Serbin <as...@cloudera.com>
Authored: Mon Feb 27 14:40:41 2017 -0800
Committer: David Ribeiro Alves <da...@gmail.com>
Committed: Wed May 10 22:46:27 2017 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/CMakeLists.txt       |   1 +
 .../catalog_manager_tsk-itest.cc                | 166 ++++++++
 src/kudu/master/catalog_manager.cc              | 395 +++++++++++++------
 src/kudu/master/catalog_manager.h               |  54 ++-
 src/kudu/master/master-test.cc                  |   2 +-
 src/kudu/master/master_service.cc               |  15 +-
 src/kudu/master/sys_catalog-test.cc             |   4 +-
 7 files changed, 497 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/6b6593a0/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index e7430b2..5cb1500 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -53,6 +53,7 @@ set(KUDU_TEST_LINK_LIBS integration-tests ${KUDU_MIN_TEST_LIBS})
 ADD_KUDU_TEST(all_types-itest RESOURCE_LOCK "master-rpc-ports")
 ADD_KUDU_TEST(alter_table-randomized-test)
 ADD_KUDU_TEST(alter_table-test)
+ADD_KUDU_TEST(catalog_manager_tsk-itest)
 ADD_KUDU_TEST(client_failover-itest)
 ADD_KUDU_TEST(client-stress-test
   RESOURCE_LOCK "master-rpc-ports"

http://git-wip-us.apache.org/repos/asf/kudu/blob/6b6593a0/src/kudu/integration-tests/catalog_manager_tsk-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/catalog_manager_tsk-itest.cc b/src/kudu/integration-tests/catalog_manager_tsk-itest.cc
new file mode 100644
index 0000000..60bc55d
--- /dev/null
+++ b/src/kudu/integration-tests/catalog_manager_tsk-itest.cc
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <iterator>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "kudu/client/client.h"
+#include "kudu/client/client-test-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/external_mini_cluster.h"
+#include "kudu/tablet/key_value_test_schema.h"
+#include "kudu/util/test_util.h"
+
+using kudu::client::KuduClient;
+using kudu::client::KuduClientBuilder;
+using kudu::client::KuduInsert;
+using kudu::client::KuduSchema;
+using kudu::client::KuduSession;
+using kudu::client::KuduTable;
+using kudu::client::KuduTableCreator;
+using std::back_inserter;
+using std::copy;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace master {
+
+class CatalogManagerTskITest : public KuduTest {
+ public:
+  CatalogManagerTskITest()
+      : num_masters_(3),
+        num_tservers_(1),
+#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
+        hb_interval_ms_(32),
+#else
+        hb_interval_ms_(16),
+#endif
+        run_time_seconds_(AllowSlowTests() ? 300 : 10) {
+
+    cluster_opts_.num_masters = num_masters_;
+    cluster_opts_.master_rpc_ports = { 11030, 11031, 11032 };
+    cluster_opts_.num_tablet_servers = num_tservers_;
+
+    // Add common flags for both masters and tservers.
+    const vector<string> common_flags = {
+      Substitute("--raft_heartbeat_interval_ms=$0", hb_interval_ms_),
+    };
+    copy(common_flags.begin(), common_flags.end(),
+        back_inserter(cluster_opts_.extra_master_flags));
+    copy(common_flags.begin(), common_flags.end(),
+        back_inserter(cluster_opts_.extra_tserver_flags));
+
+    // Add master-only flags.
+    const vector<string> master_flags = {
+      "--catalog_manager_inject_latency_prior_tsk_write_ms=1000",
+      "--raft_enable_pre_election=false",
+      Substitute("--leader_failure_exp_backoff_max_delta_ms=$0",
+          hb_interval_ms_ * 4),
+      "--leader_failure_max_missed_heartbeat_periods=1.0",
+      "--master_non_leader_masters_propagate_tsk",
+      "--tsk_rotation_seconds=2",
+      Substitute("--authn_token_validity_seconds=$0", run_time_seconds_),
+    };
+    copy(master_flags.begin(), master_flags.end(),
+        back_inserter(cluster_opts_.extra_master_flags));
+
+    // Add tserver-only flags.
+    const vector<string> tserver_flags = {
+      Substitute("--heartbeat_interval_ms=$0", hb_interval_ms_),
+    };
+    copy(tserver_flags.begin(), tserver_flags.end(),
+        back_inserter(cluster_opts_.extra_tserver_flags));
+  }
+
+  void StartCluster() {
+    cluster_.reset(new ExternalMiniCluster(cluster_opts_));
+    ASSERT_OK(cluster_->Start());
+  }
+
+  void SmokeTestCluster() {
+    using ::kudu::client::sp::shared_ptr;
+    static const char* kTableName = "test-table";
+    // Using the setting for both RPC and admin operation timeout.
+    const MonoDelta timeout = MonoDelta::FromSeconds(run_time_seconds_);
+    KuduClientBuilder builder;
+    builder.default_admin_operation_timeout(timeout).default_rpc_timeout(timeout);
+    shared_ptr<KuduClient> client;
+    ASSERT_OK(cluster_->CreateClient(&builder, &client));
+
+    // Create a table.
+    KuduSchema schema = client::KuduSchemaFromSchema(CreateKeyValueTestSchema());
+    gscoped_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
+
+    ASSERT_OK(table_creator->table_name(kTableName)
+              .set_range_partition_columns({ "key" })
+              .schema(&schema)
+              .num_replicas(num_tservers_)
+              .Create());
+
+    // Insert a row.
+    shared_ptr<KuduTable> table;
+    ASSERT_OK(client->OpenTable(kTableName, &table));
+    unique_ptr<KuduInsert> ins(table->NewInsert());
+    ASSERT_OK(ins->mutable_row()->SetInt32(0, 12345));
+    ASSERT_OK(ins->mutable_row()->SetInt32(1, 54321));
+    shared_ptr<KuduSession> session = client->NewSession();
+    ASSERT_OK(session->Apply(ins.release()));
+    FlushSessionOrDie(session);
+
+    // Read it back.
+    ASSERT_EQ(1, CountTableRows(table.get()));
+
+    // Delete the table.
+    ASSERT_OK(client->DeleteTable(kTableName));
+  }
+
+ protected:
+  const int num_masters_;
+  const int num_tservers_;
+  const int hb_interval_ms_;
+  const int64_t run_time_seconds_;
+  ExternalMiniClusterOptions cluster_opts_;
+  std::shared_ptr<ExternalMiniCluster> cluster_;
+};
+
+// Check that master servers do not crash on change of leadership while
+// writing newly generated TSKs. The leadership changes are provoked
+// by the injected latency just after generating a TSK but prior to writing it
+// into the system table: setting --leader_failure_max_missed_heartbeat_periods
+// flag to just one heartbeat period and unsetting --raft_enable_pre_election
+// gives high chances of re-election to happen while current leader has blocked
+// its leadership-related activity.
+TEST_F(CatalogManagerTskITest, LeadershipChangeOnTskGeneration) {
+  NO_FATALS(StartCluster());
+
+  const MonoTime t_stop = MonoTime::Now() +
+      MonoDelta::FromSeconds(run_time_seconds_);
+  while (MonoTime::Now() < t_stop) {
+    NO_FATALS(SmokeTestCluster());
+  }
+
+  NO_FATALS(cluster_->AssertNoCrashes());
+}
+
+} // namespace master
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/6b6593a0/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 56904ff..a6763a7 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -43,6 +43,7 @@
 
 #include <algorithm>
 #include <condition_variable>
+#include <functional>
 #include <memory>
 #include <mutex>
 #include <set>
@@ -89,6 +90,7 @@
 #include "kudu/security/token_signing_key.h"
 #include "kudu/tserver/tserver_admin.proxy.h"
 #include "kudu/util/debug/trace_event.h"
+#include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
@@ -200,6 +202,13 @@ DEFINE_bool(catalog_manager_delete_orphaned_tablets, false,
             "master failures!");
 TAG_FLAG(catalog_manager_delete_orphaned_tablets, advanced);
 
+DEFINE_int32(catalog_manager_inject_latency_prior_tsk_write_ms, 0,
+             "Injects a random sleep between 0 and this many milliseconds "
+             "prior to writing newly generated TSK into the system table. "
+             "This is a test-only flag, do not use in production.");
+TAG_FLAG(catalog_manager_inject_latency_prior_tsk_write_ms, hidden);
+TAG_FLAG(catalog_manager_inject_latency_prior_tsk_write_ms, unsafe);
+
 using std::pair;
 using std::set;
 using std::shared_ptr;
@@ -452,14 +461,6 @@ void CatalogManagerBgTasks::Run() {
                        << l.catalog_status().ToString();
         }
       } else if (l.leader_status().ok()) {
-        // If this is the leader master, check if it's time to generate
-        // and store a new TSK (Token Signing Key).
-        Status s = catalog_manager_->CheckGenerateNewTskUnlocked();
-        if (!s.ok()) {
-          LOG(ERROR) << "Error processing TSK entry (will try next time): "
-                     << s.ToString();
-        }
-
         // Get list of tablets not yet running.
         std::vector<scoped_refptr<TabletInfo>> to_process;
         catalog_manager_->ExtractTabletsToProcess(&to_process);
@@ -474,8 +475,32 @@ void CatalogManagerBgTasks::Run() {
             //
             // TODO(unknown): Add tests for this in the revision that makes
             // create/alter fault tolerant.
-            LOG(ERROR) << "Error processing pending assignments, "
-                "aborting the current task: " << s.ToString();
+            LOG(ERROR) << "Error processing pending assignments: " << s.ToString();
+          }
+        }
+
+        // If this is the leader master, check if it's time to generate
+        // and store a new TSK (Token Signing Key).
+        Status s = catalog_manager_->TryGenerateNewTskUnlocked();
+        if (!s.ok()) {
+          const TokenSigner* signer = catalog_manager_->master_->token_signer();
+          const string err_msg = "failed to refresh TSK: " + s.ToString() + ": ";
+          if (l.has_term_changed()) {
+            LOG(INFO) << err_msg
+                      << "ignoring the error since not the leader anymore";
+          } else if (signer->IsCurrentKeyValid()) {
+            LOG(WARNING) << err_msg << "will try again next cycle";
+          } else {
+            // The TokenSigner ended up with no valid key to use. If the catalog
+            // manager is still the leader, it would not be able to create valid
+            // authn token signatures. It's not clear how to properly resolve
+            // this situation and keep the process running. To avoid possible
+            // inconsistency, let's crash the process.
+            //
+            // NOTE: This can only happen in a multi-master Kudu cluster. In
+            //       that case, after this particular master crashes, another
+            //       master will take over as leader.
+            LOG(FATAL) << err_msg;
           }
         }
       }
@@ -715,7 +740,7 @@ Status CatalogManager::Init(bool is_first_run) {
 
 Status CatalogManager::ElectedAsLeaderCb() {
   return leader_election_pool_->SubmitClosure(
-      Bind(&CatalogManager::VisitTablesAndTabletsTask, Unretained(this)));
+      Bind(&CatalogManager::PrepareForLeadershipTask, Unretained(this)));
 }
 
 Status CatalogManager::WaitUntilCaughtUpAsLeader(const MonoDelta& timeout) {
@@ -733,32 +758,104 @@ Status CatalogManager::WaitUntilCaughtUpAsLeader(const MonoDelta& timeout) {
   return Status::OK();
 }
 
-Status CatalogManager::LoadCertAuthorityInfo(unique_ptr<PrivateKey>* key,
-                                             unique_ptr<Cert>* cert) {
+Status CatalogManager::InitCertAuthority() {
   leader_lock_.AssertAcquiredForWriting();
 
-  SysCertAuthorityEntryPB info;
-  RETURN_NOT_OK(sys_catalog_->GetCertAuthorityEntry(&info));
-
-  unique_ptr<PrivateKey> ca_private_key(new PrivateKey);
-  unique_ptr<Cert> ca_cert(new Cert);
-  RETURN_NOT_OK(ca_private_key->FromString(
-      info.private_key(), DataFormat::DER));
-  RETURN_NOT_OK(ca_cert->FromString(
-      info.certificate(), DataFormat::DER));
-  // Extra sanity check.
-  RETURN_NOT_OK(ca_cert->CheckKeyMatch(*ca_private_key));
-
-  key->swap(ca_private_key);
-  cert->swap(ca_cert);
+  unique_ptr<PrivateKey> key;
+  unique_ptr<Cert> cert;
+  const Status s = LoadCertAuthorityInfo(&key, &cert);
+  if (s.ok()) {
+    return InitCertAuthorityWith(std::move(key), std::move(cert));
+  }
+  if (s.IsNotFound()) {
+    // Status::NotFound is returned if no IPKI certificate authority record is
+    // found in the system catalog table. It can happen on the very first run
+    // of a secured Kudu cluster. If so, it's necessary to create and persist
+    // a new CA record which, if persisted, will be used for this and next runs.
+    //
+    // The subtlety here is that first it's necessary to store the newly
+    // generated IPKI CA information (the private key and the certificate) into
+    // the system table and only after that initialize the master certificate
+    // authority. This protects against a leadership change between the
+    // generation and the usage of the newly generated IPKI CA information
+    // by the master.
+    //
+    // An example of such 'leadership change in the middle' scenario:
+    //
+    // 1. The catalog manager starts generating Kudu  IPKI CA private key and
+    //    corresponding certificate. This takes some time since generating
+    //    a cryptographically strong private key requires many CPU cycles.
+    //
+    // 2. While the catalog manager is busy with generating the CA info, a new
+    //    election happens in the background and the catalog manager loses its
+    //    leadership role.
+    //
+    // 3. The catalog manager tries to write the newly generated information
+    //    into the system table. There are two possible cases at the time when
+    //    applying the write operation:
+    //
+    //      a. The catalog manager is not the system tablet's leader.
+    //
+    //      b. The catalog manager is the system tablet's leader.
+    //         It regained its leadership role by the time the write operation
+    //         is applied. That can happen if another election occurs before
+    //         the write operation is applied.
+    //
+    // 4. Essentially, the following responses are possible for the write
+    //    operation, enumerated in accordance with 3.{a,b} items above:
+    //
+    //      a. A failure happens and corresponding error message is logged;
+    //         the failure is ignored.
+    //
+    //      b. In the case when the catalog manager becomes the leader again,
+    //         there are two possible outcomes for the write operation:
+    //
+    //           i.  Success. The master completes the initialization process
+    //               and proceeds to serve client requests.
+    //
+    //           ii. Failure. This is when the former in-the-middle leader has
+    //               succeeded in writing its CA info into the system table.
+    //               That could happen if the former in-the-middle leader was
+    //               very fast because there were plenty of CPU resources
+    //               available for CA info generation. Since the CA info record
+    //               has pre-defined identifier, it's impossible to have more
+    //               than one CA info record in the system table. This is due to
+    //               the {record_id, record_type} uniqueness constraint.
+    //
+    // In case of the write operation's success (4.b.i), it's safe to proceed
+    // with loading the persisted CA information into the CertAuthority run-time
+    // object.
+    //
+    // In case of the write operation's failure (4.a, 4.b.ii), the generated
+    // CA information is no longer relevant and can be safely discarded. The
+    // crucial point is to not initialize the CertAuthority with non-persisted
+    // information. Otherwise that information could get into the run-time
+    // structures of some system components, cutting them off from communicating
+    // with the rest of the system which uses the genuine CA information.
+    //
+    // Once the CA information is persisted in the system table, a catalog
+    // manager reads and loads it into the CertAuthority every time it becomes
+    // an elected leader.
+    unique_ptr<PrivateKey> key(new PrivateKey);
+    unique_ptr<Cert> cert(new Cert);
+
+    // Generate new private key and corresponding CA certificate.
+    RETURN_NOT_OK(master_->cert_authority()->Generate(key.get(), cert.get()));
+    // If the leadership was lost, writing into the system table fails.
+    RETURN_NOT_OK(StoreCertAuthorityInfo(*key, *cert));
+    // Once the CA information is persisted, it's necessary to initialize
+    // the certificate authority sub-component with it. The leader master
+    // should not run without a CA certificate.
+    return InitCertAuthorityWith(std::move(key), std::move(cert));
+  }
 
-  return Status::OK();
+  return s;
 }
 
 // Initialize the master's certificate authority component with the specified
 // private key and certificate.
-Status CatalogManager::InitCertAuthority(unique_ptr<PrivateKey> key,
-                                         unique_ptr<Cert> cert) {
+Status CatalogManager::InitCertAuthorityWith(
+    unique_ptr<PrivateKey> key, unique_ptr<Cert> cert) {
   leader_lock_.AssertAcquiredForWriting();
   auto* ca = master_->cert_authority();
   RETURN_NOT_OK_PREPEND(ca->Init(std::move(key), std::move(cert)),
@@ -780,6 +877,28 @@ Status CatalogManager::InitCertAuthority(unique_ptr<PrivateKey> key,
   return Status::OK();
 }
 
+Status CatalogManager::LoadCertAuthorityInfo(unique_ptr<PrivateKey>* key,
+                                             unique_ptr<Cert>* cert) {
+  leader_lock_.AssertAcquiredForWriting();
+
+  SysCertAuthorityEntryPB info;
+  RETURN_NOT_OK(sys_catalog_->GetCertAuthorityEntry(&info));
+
+  unique_ptr<PrivateKey> ca_private_key(new PrivateKey);
+  unique_ptr<Cert> ca_cert(new Cert);
+  RETURN_NOT_OK(ca_private_key->FromString(
+      info.private_key(), DataFormat::DER));
+  RETURN_NOT_OK(ca_cert->FromString(
+      info.certificate(), DataFormat::DER));
+  // Extra sanity check.
+  RETURN_NOT_OK(ca_cert->CheckKeyMatch(*ca_private_key));
+
+  key->swap(ca_private_key);
+  cert->swap(ca_cert);
+
+  return Status::OK();
+}
+
 // Store internal Kudu CA cert authority information into the system table.
 Status CatalogManager::StoreCertAuthorityInfo(const PrivateKey& key,
                                               const Cert& cert) {
@@ -789,28 +908,37 @@ Status CatalogManager::StoreCertAuthorityInfo(const PrivateKey& key,
   RETURN_NOT_OK(key.ToString(info.mutable_private_key(), DataFormat::DER));
   RETURN_NOT_OK(cert.ToString(info.mutable_certificate(), DataFormat::DER));
   RETURN_NOT_OK(sys_catalog_->AddCertAuthorityEntry(info));
-  LOG(INFO) << "Successfully stored the newly generated cert authority "
-               "information into the system table.";
+  LOG(INFO) << "Generated new certificate authority record";
 
   return Status::OK();
 }
 
-void CatalogManager::VisitTablesAndTabletsTask() {
+Status CatalogManager::InitTokenSigner() {
+  leader_lock_.AssertAcquiredForWriting();
+
+  set<string> expired_tsk_entry_ids;
+  RETURN_NOT_OK(LoadTskEntries(&expired_tsk_entry_ids));
+  RETURN_NOT_OK(TryGenerateNewTskUnlocked());
+  return DeleteTskEntries(expired_tsk_entry_ids);
+}
+
+void CatalogManager::PrepareForLeadershipTask() {
   {
     // Hack to block this function until InitSysCatalogAsync() is finished.
     shared_lock<LockType> l(lock_);
   }
   const Consensus* consensus = sys_catalog_->tablet_replica()->consensus();
-  int64_t term = consensus->ConsensusState(CONSENSUS_CONFIG_COMMITTED).current_term();
+  const int64_t term_before_wait =
+      consensus->ConsensusState(CONSENSUS_CONFIG_COMMITTED).current_term();
   {
     std::lock_guard<simple_spinlock> l(state_lock_);
-    if (leader_ready_term_ == term) {
+    if (leader_ready_term_ == term_before_wait) {
       // The term hasn't changed since the last time this master was the
       // leader. It's not possible for another master to be leader for the same
       // term, so there hasn't been any actual leadership change and thus
       // there's no reason to reload the on-disk metadata.
       VLOG(2) << Substitute("Term $0 hasn't changed, ignoring dirty callback",
-                            term);
+                            term_before_wait);
       return;
     }
   }
@@ -826,86 +954,91 @@ void CatalogManager::VisitTablesAndTabletsTask() {
     return;
   }
 
-  int64_t term_after_wait = consensus->ConsensusState(CONSENSUS_CONFIG_COMMITTED).current_term();
-  if (term_after_wait != term) {
+  const int64_t term =
+      consensus->ConsensusState(CONSENSUS_CONFIG_COMMITTED).current_term();
+  if (term_before_wait != term) {
     // If we got elected leader again while waiting to catch up then we will
     // get another callback to visit the tables and tablets, so bail.
-    LOG(INFO) << "Term change from " << term << " to " << term_after_wait
+    LOG(INFO) << "Term changed from " << term_before_wait << " to " << term
         << " while waiting for master leader catchup. Not loading sys catalog metadata";
     return;
   }
 
   {
+    // This lambda returns the result of calling the 'func', checking whether
+    // the error, if any, is fatal for the leader catalog. If the returned
+    // status is non-OK, the caller should bail on the leadership preparation
+    // task. If the error is considered fatal, LOG(FATAL) is called.
+    const auto check = [this](
+        std::function<Status()> func,
+        const Consensus& consensus,
+        int64_t start_term,
+        const char* op_description) {
+
+      leader_lock_.AssertAcquiredForWriting();
+      const Status s = func();
+      if (s.ok()) {
+        // Not an error at all.
+        return s;
+      }
+
+      {
+        std::lock_guard<simple_spinlock> l(state_lock_);
+        if (state_ == kClosing) {
+          // Errors on shutdown are not considered fatal.
+          LOG(INFO) << op_description
+                    << " failed due to the shutdown of the catalog: "
+                    << s.ToString();
+          return s;
+        }
+      }
+
+      const int64_t term =
+          consensus.ConsensusState(CONSENSUS_CONFIG_COMMITTED).current_term();
+      if (term != start_term) {
+        // If the term has changed we assume the new leader catalog is about
+        // to do the necessary work in its leadership preparation task.
+        LOG(INFO) << op_description << " failed; "
+                  << Substitute("change in term detected: $0 vs $1: ",
+                                start_term, term)
+                  << s.ToString();
+        return s;
+      }
+
+      // In all other cases non-OK status is considered fatal.
+      LOG(FATAL) << op_description << " failed: " << s.ToString();
+      return s; // unreachable
+    };
+
     // Block new catalog operations, and wait for existing operations to finish.
     std::lock_guard<RWMutex> leader_lock_guard(leader_lock_);
 
-    LOG(INFO) << "Loading table and tablet metadata into memory...";
-    LOG_SLOW_EXECUTION(WARNING, 1000, LogPrefix() +
-                       "Loading metadata into memory") {
-      CHECK_OK(VisitTablesAndTabletsUnlocked());
-    }
-
-    // TODO(KUDU-1920): this should not be done in case of external PKI.
-    // TODO(KUDU-1919): some kind of tool to rotate the IPKI CA
-    LOG(INFO) << "Loading CA info into memory...";
-    LOG_SLOW_EXECUTION(WARNING, 1000, LogPrefix() +
-                       "Loading CA info into memory") {
-      unique_ptr<PrivateKey> key;
-      unique_ptr<Cert> cert;
-      const Status& s = LoadCertAuthorityInfo(&key, &cert);
-      if (s.ok()) {
-        // Once succesfully loaded, the CA information is supposed to be valid:
-        // the leader master should not be run without CA certificate.
-        CHECK_OK(InitCertAuthority(std::move(key), std::move(cert)));
-      } else if (s.IsNotFound()) {
-        LOG(INFO) << "Did not find CA certificate and key for Kudu IPKI, "
-                     "will generate new ones";
-        // No CA information record has been found in the table -- generate
-        // a new one. The subtlety here is that first it's necessary to store
-        // the newly generated information into the system table and only after
-        // that initialize master certificate authority. The reason is:
-        // if the master server loses its leadership role by that time, there
-        // will be an error on an attempt to write into the system table.
-        // If that happens, skip the rest of the sequence: when this callback
-        // is invoked next time, the system table should already contain
-        // CA certificate information written by other master.
-        unique_ptr<PrivateKey> private_key(new PrivateKey);
-        unique_ptr<Cert> cert(new Cert);
-
-        // Generate new private key and corresponding CA certificate.
-        CHECK_OK(master_->cert_authority()->Generate(private_key.get(),
-                                                     cert.get()));
-        // It the leadership role is lost at this moment, writing into the
-        // system table will fail.
-        const Status& s = StoreCertAuthorityInfo(*private_key, *cert);
-        if (s.ok()) {
-          // The leader master should not run without CA certificate.
-          CHECK_OK(InitCertAuthority(std::move(private_key), std::move(cert)));
-        } else {
-          LOG(WARNING) << "Failed to write newly generated CA information into "
-                          "the system table, assuming change of leadership: "
-                       << s.ToString();
-        }
-      } else {
-        CHECK_OK(s);
+    static const char* const kLoadMetaOpDescription =
+        "Loading table and tablet metadata into memory";
+    LOG(INFO) << kLoadMetaOpDescription << "...";
+    LOG_SLOW_EXECUTION(WARNING, 1000, LogPrefix() + kLoadMetaOpDescription) {
+      if (!check(std::bind(&CatalogManager::VisitTablesAndTabletsUnlocked, this),
+                 *consensus, term, kLoadMetaOpDescription).ok()) {
+        return;
       }
     }
 
-    LOG(INFO) << "Loading token signing keys...";
-    LOG_SLOW_EXECUTION(WARNING, 1000, LogPrefix() +
-                       "Loading token signing keys...") {
-      set<string> expired_tsk_entry_ids;
-      CHECK_OK(LoadTskEntries(&expired_tsk_entry_ids));
-      Status s = CheckGenerateNewTskUnlocked();
-      if (!s.ok()) {
-        LOG(WARNING) << "Failed to generate and persist new TSK, "
-                        "assuming change of leadership: " << s.ToString();
+    // TODO(KUDU-1920): update this once "BYO PKI" feature is supported.
+    static const char* const kCaInitOpDescription =
+        "Initializing Kudu internal certificate authority";
+    LOG(INFO) << kCaInitOpDescription << "...";
+    LOG_SLOW_EXECUTION(WARNING, 1000, LogPrefix() + kCaInitOpDescription) {
+      if (!check(std::bind(&CatalogManager::InitCertAuthority, this),
+                 *consensus, term, kCaInitOpDescription).ok()) {
         return;
       }
-      s = DeleteTskEntries(expired_tsk_entry_ids);
-      if (!s.ok()) {
-        LOG(WARNING) << "Failed to purge expired TSK entries from system table, "
-                        "assuming change of leadership: " << s.ToString();
+    }
+
+    static const char* const kTskOpDescription = "Loading token signing keys";
+    LOG(INFO) << kTskOpDescription << "...";
+    LOG_SLOW_EXECUTION(WARNING, 1000, LogPrefix() + kTskOpDescription) {
+      if (!check(std::bind(&CatalogManager::InitTokenSigner, this),
+                 *consensus, term, kTskOpDescription).ok()) {
         return;
       }
     }
@@ -3279,8 +3412,9 @@ void CatalogManager::SendDeleteTabletRequest(const scoped_refptr<TabletInfo>& ta
     return;
   }
   const ConsensusStatePB& cstate = tablet_lock.data().pb.committed_consensus_state();
-  LOG(INFO) << "Sending DeleteTablet for " << cstate.config().peers().size()
-            << " replicas of tablet " << tablet->tablet_id();
+  LOG_WITH_PREFIX(INFO)
+      << "Sending DeleteTablet for " << cstate.config().peers().size()
+      << " replicas of tablet " << tablet->tablet_id();
   for (const auto& peer : cstate.config().peers()) {
     SendDeleteReplicaRequest(tablet->tablet_id(), TABLET_DATA_DELETED,
                              boost::none, tablet->table(),
@@ -3318,6 +3452,8 @@ void CatalogManager::SendAddServerRequest(const scoped_refptr<TabletInfo>& table
               Substitute("Failed to send AddServer request for tablet $0", tablet->tablet_id()));
   // We can't access 'task' after calling Run() because it may delete itself
   // inside Run() in the case that the tablet has no known leader.
+  LOG_WITH_PREFIX(INFO)
+      << "Started AddServer task for tablet " << tablet->tablet_id();
 }
 
 void CatalogManager::ExtractTabletsToProcess(
@@ -3359,7 +3495,7 @@ void CatalogManager::ExtractTabletsToProcess(
 //   2) Try to write it to the system table.
 //   3) Pass it back to the TokenSigner on success.
 //   4) Check and switch TokenSigner to the new key if it's time to do so.
-Status CatalogManager::CheckGenerateNewTskUnlocked() {
+Status CatalogManager::TryGenerateNewTskUnlocked() {
   TokenSigner* signer = master_->token_signer();
   unique_ptr<security::TokenSigningPrivateKey> tsk;
   RETURN_NOT_OK(signer->CheckNeedKey(&tsk));
@@ -3369,9 +3505,10 @@ Status CatalogManager::CheckGenerateNewTskUnlocked() {
     tsk->ExportPB(&tsk_pb);
     SysTskEntryPB sys_entry;
     sys_entry.mutable_tsk()->Swap(&tsk_pb);
+    MAYBE_INJECT_RANDOM_LATENCY(
+        FLAGS_catalog_manager_inject_latency_prior_tsk_write_ms);
     RETURN_NOT_OK(sys_catalog_->AddTskEntry(sys_entry));
-    LOG(INFO) << "Saved newly generated TSK " << tsk->key_seq_num()
-              << " into the system table.";
+    LOG_WITH_PREFIX(INFO) << "Generated new TSK " << tsk->key_seq_num();
     // Then add the new TSK into the signer.
     RETURN_NOT_OK(signer->AddKey(std::move(tsk)));
   }
@@ -3381,6 +3518,9 @@ Status CatalogManager::CheckGenerateNewTskUnlocked() {
 Status CatalogManager::LoadTskEntries(set<string>* expired_entry_ids) {
   TskEntryLoader loader;
   RETURN_NOT_OK(sys_catalog_->VisitTskEntries(&loader));
+  for (const auto& key : loader.entries()) {
+    LOG_WITH_PREFIX(INFO) << "Loaded TSK: " << key.key_seq_num();
+  }
   if (expired_entry_ids) {
     set<string> ref(loader.expired_entry_ids());
     expired_entry_ids->swap(ref);
@@ -3431,9 +3571,10 @@ void CatalogManager::HandleAssignCreatingTablet(TabletInfo* tablet,
   // within the timeout. So the tablet will be replaced by a new one.
   scoped_refptr<TabletInfo> replacement = CreateTabletInfo(tablet->table().get(),
                                                            old_info.pb.partition());
-  LOG(WARNING) << "Tablet " << tablet->ToString() << " was not created within "
-               << "the allowed timeout. Replacing with a new tablet "
-               << replacement->tablet_id();
+  LOG_WITH_PREFIX(WARNING)
+      << "Tablet " << tablet->ToString() << " was not created within "
+      << "the allowed timeout. Replacing with a new tablet "
+      << replacement->tablet_id();
 
   // Mark old tablet as replaced.
   tablet->mutable_metadata()->mutable_dirty()->set_state(
@@ -3484,12 +3625,15 @@ Status CatalogManager::HandleTabletSchemaVersionReport(TabletInfo *tablet, uint3
   actions.table_to_update = table;
   Status s = sys_catalog_->Write(actions);
   if (!s.ok()) {
-    LOG(WARNING) << "An error occurred while updating sys-tables: " << s.ToString();
+    LOG_WITH_PREFIX(WARNING)
+        << "An error occurred while updating sys-tables: " << s.ToString();
     return s;
   }
 
   l.Commit();
-  LOG(INFO) << table->ToString() << " - Alter table completed version=" << current_version;
+  LOG_WITH_PREFIX(INFO)
+      << table->ToString() << " - Alter table completed version="
+      << current_version;
   return Status::OK();
 }
 
@@ -3572,7 +3716,8 @@ Status CatalogManager::ProcessPendingAssignments(
   }
 
   if (!s.ok()) {
-    LOG(WARNING) << "Aborting the current task due to error: " << s.ToString();
+    LOG_WITH_PREFIX(WARNING)
+        << "Aborting the current task due to error: " << s.ToString();
     // If there was an error, abort any mutations started by the
     // current task.
     unlocker_out.Abort();
@@ -3882,7 +4027,9 @@ Status CatalogManager::GetTableLocations(const GetTableLocationsRequestPB* req,
       StatusToPB(s, resp->mutable_error()->mutable_status());
       break;
     } else {
-      LOG(FATAL) << "Unexpected error while building tablet locations: " << s.ToString();
+      LOG_WITH_PREFIX(FATAL)
+          << "Unexpected error while building tablet locations: "
+          << s.ToString();
     }
   }
   resp->set_ttl_millis(FLAGS_table_locations_ttl_ms);
@@ -3968,7 +4115,6 @@ void CatalogManager::AbortAndWaitForAllTasks(
     t->WaitTasksCompletion();
   }
 }
-
 ////////////////////////////////////////////////////////////
 // CatalogManager::ScopedLeaderSharedLock
 ////////////////////////////////////////////////////////////
@@ -3976,7 +4122,10 @@ void CatalogManager::AbortAndWaitForAllTasks(
 CatalogManager::ScopedLeaderSharedLock::ScopedLeaderSharedLock(
     CatalogManager* catalog)
     : catalog_(DCHECK_NOTNULL(catalog)),
-      leader_shared_lock_(catalog->leader_lock_, std::try_to_lock) {
+      leader_shared_lock_(catalog->leader_lock_, std::try_to_lock),
+      catalog_status_(Status::Uninitialized("")),
+      leader_status_(Status::Uninitialized("")),
+      initial_term_(-1) {
 
   // Check if the catalog manager is running.
   std::lock_guard<simple_spinlock> l(catalog_->state_lock_);
@@ -3986,11 +4135,13 @@ CatalogManager::ScopedLeaderSharedLock::ScopedLeaderSharedLock(
                    catalog_->state_));
     return;
   }
+  catalog_status_ = Status::OK();
 
   // Check if the catalog manager is the leader.
-  ConsensusStatePB cstate = catalog_->sys_catalog_->tablet_replica()->consensus()->
-      ConsensusState(CONSENSUS_CONFIG_COMMITTED);
-  string uuid = catalog_->master_->fs_manager()->uuid();
+  const ConsensusStatePB cstate = catalog_->sys_catalog_->tablet_replica()->
+      consensus()->ConsensusState(CONSENSUS_CONFIG_COMMITTED);
+  initial_term_ = cstate.current_term();
+  const string& uuid = catalog_->master_->fs_manager()->uuid();
   if (PREDICT_FALSE(!cstate.has_leader_uuid() || cstate.leader_uuid() != uuid)) {
     leader_status_ = Status::IllegalState(
         Substitute("Not the leader. Local UUID: $0, Consensus state: $1",
@@ -4003,6 +4154,14 @@ CatalogManager::ScopedLeaderSharedLock::ScopedLeaderSharedLock(
         "Leader not yet ready to serve requests");
     return;
   }
+  leader_status_ = Status::OK();
+}
+
+bool CatalogManager::ScopedLeaderSharedLock::has_term_changed() const {
+  DCHECK(leader_status().ok());
+  const ConsensusStatePB cstate = catalog_->sys_catalog_->tablet_replica()->
+      consensus()->ConsensusState(CONSENSUS_CONFIG_COMMITTED);
+  return cstate.current_term() != initial_term_;
 }
 
 template<typename RespClass>

http://git-wip-us.apache.org/repos/asf/kudu/blob/6b6593a0/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 59ffe16..c28f15b 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -17,6 +17,7 @@
 #ifndef KUDU_MASTER_CATALOG_MANAGER_H
 #define KUDU_MASTER_CATALOG_MANAGER_H
 
+#include <functional>
 #include <map>
 #include <memory>
 #include <set>
@@ -354,6 +355,11 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
       return leader_status_;
     }
 
+    // Check whether the consensus configuration term has changed from the term
+    // captured at object construction (initial_term_).
+    // Requires: leader_status() returns OK().
+    bool has_term_changed() const;
+
     // Check that the catalog manager is initialized. It may or may not be the
     // leader of its Raft configuration.
     //
@@ -376,6 +382,7 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
     shared_lock<RWMutex> leader_shared_lock_;
     Status catalog_status_;
     Status leader_status_;
+    int64_t initial_term_;
 
     DISALLOW_COPY_AND_ASSIGN(ScopedLeaderSharedLock);
   };
@@ -541,8 +548,8 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   typedef std::unordered_map<std::string, scoped_refptr<TabletInfo>> TabletInfoMap;
 
   // Called by SysCatalog::SysCatalogStateChanged when this node
-  // becomes the leader of a consensus configuration. Executes VisitTablesAndTabletsTask
-  // via 'worker_pool_'.
+  // becomes the leader of a consensus configuration. Executes
+  // PrepareForLeadershipTask() via 'worker_pool_'.
   Status ElectedAsLeaderCb();
 
   // Loops and sleeps until one of the following conditions occurs:
@@ -557,9 +564,10 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   // reading that data, to ensure consistency across failovers.
   Status WaitUntilCaughtUpAsLeader(const MonoDelta& timeout);
 
-  // Performs several checks before calling VisitTablesAndTablets to actually
-  // reload table/tablet metadata into memory.
-  void VisitTablesAndTabletsTask();
+  // Performs several checks before calling VisitTablesAndTablets() to actually
+  // reload table/tablet metadata into memory and do other work to update the
+  // internal state of this object upon becoming the leader.
+  void PrepareForLeadershipTask();
 
   // Clears out the existing metadata ('table_names_map_', 'table_ids_map_',
   // and 'tablet_map_'), loads tables metadata into memory and if successful
@@ -576,21 +584,36 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   // This method is thread-safe.
   Status InitSysCatalogAsync(bool is_first_run);
 
-  // Load the internal Kudu certficate authority information from the system
+  // Initialize the IPKI certificate authority: load the CA information record
+  // from the system table. If the CA information record is not present in the
+  // table, generate and store a new one.
+  Status InitCertAuthority();
+
+  // Initialize the IPKI certificate authority with the specified private key
+  // and certificate.
+  Status InitCertAuthorityWith(std::unique_ptr<security::PrivateKey> key,
+                               std::unique_ptr<security::Cert> cert);
+
+  // Load the IPKI certficate authority information from the system
   // table: the private key and the certificate. If the CA info entry is not
   // found in the table, return Status::NotFound.
   Status LoadCertAuthorityInfo(std::unique_ptr<security::PrivateKey>* key,
                                std::unique_ptr<security::Cert>* cert);
 
-  // Initialize master's certificate authority with the specified private key
-  // and certificate.
-  Status InitCertAuthority(std::unique_ptr<security::PrivateKey> key,
-                           std::unique_ptr<security::Cert> cert);
-
-  // Store CA certificate information into the system table.
+  // Store the IPKI certificate authority information into the system table.
   Status StoreCertAuthorityInfo(const security::PrivateKey& key,
                                 const security::Cert& cert);
 
+  // 1. Initialize the TokenSigner (the component which signs authn tokens):
+  //      a. Load TSK records from the system table.
+  //      b. Import the newly loaded TSK records into the TokenSigner.
+  // 2. Check whether it's time to generate a new token signing key.
+  //    If yes, then:
+  //      a. Generate a new TSK.
+  //      b. Store the new TSK one into the system catalog table.
+  // 3. Purge expired TSKs from the system table.
+  Status InitTokenSigner();
+
   // Helper for creating the initial TableInfo state
   // Leaves the table "write locked" with the new info in the
   // "dirty" state field.
@@ -627,9 +650,10 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   // Extract the set of tablets that must be processed because not running yet.
   void ExtractTabletsToProcess(std::vector<scoped_refptr<TabletInfo>>* tablets_to_process);
 
-  // Check if it's time to generate new Token Signing Key for TokenSigner.
-  // If so, generate one and persist it into the system table.
-  Status CheckGenerateNewTskUnlocked();
+  // Check if it's time to generate a new Token Signing Key for TokenSigner.
+  // If so, generate one and persist it into the system table. After that,
+  // push it into the TokenSigner's key queue.
+  Status TryGenerateNewTskUnlocked();
 
   // Load non-expired TSK entries from the system table.
   // Once done, initialize TokenSigner with the loaded entries.

http://git-wip-us.apache.org/repos/asf/kudu/blob/6b6593a0/src/kudu/master/master-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index 143797e..bc3aa99 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -702,7 +702,7 @@ TEST_F(MasterTest, TestShutdownDuringTableVisit) {
   ASSERT_OK(master_->catalog_manager()->ElectedAsLeaderCb());
 
   // Master will now shut down, potentially racing with
-  // CatalogManager::VisitTablesAndTabletsTask.
+  // CatalogManager::PrepareForLeadershipTask().
 }
 
 // Tests that the catalog manager handles spurious calls to ElectedAsLeaderCb()

http://git-wip-us.apache.org/repos/asf/kudu/blob/6b6593a0/src/kudu/master/master_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index 523f803..6476516 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -50,6 +50,12 @@ DEFINE_bool(master_support_connect_to_master_rpc, true,
 TAG_FLAG(master_support_connect_to_master_rpc, unsafe);
 TAG_FLAG(master_support_connect_to_master_rpc, hidden);
 
+DEFINE_bool(master_non_leader_masters_propagate_tsk, false,
+            "Whether a non-leader master sends information about its TSKs in "
+            "response to a tablet server's heartbeat. This is intended for "
+            "tests scenarios only and should not be used elsewhere.");
+TAG_FLAG(master_non_leader_masters_propagate_tsk, hidden);
+
 using kudu::security::SignedTokenPB;
 using google::protobuf::Message;
 using std::string;
@@ -191,9 +197,12 @@ void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
     resp->add_ca_cert_der(server_->cert_authority()->ca_cert_der());
   }
 
-  // 7. Only leaders send public parts of non-expired TSK
-  // which the TS doesn't have.
-  if (is_leader_master && req->has_latest_tsk_seq_num()) {
+  // 7. Only leaders send public parts of non-expired TSK which the TS doesn't
+  //    have, except if the '--master_non_leader_masters_propagate_tsk'
+  //    test-only flag is set.
+  if ((is_leader_master ||
+       PREDICT_FALSE(FLAGS_master_non_leader_masters_propagate_tsk)) &&
+      req->has_latest_tsk_seq_num()) {
     auto tsk_public_keys = server_->token_signer()->verifier().ExportKeys(
         req->latest_tsk_seq_num());
     for (auto& key : tsk_public_keys) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/6b6593a0/src/kudu/master/sys_catalog-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog-test.cc b/src/kudu/master/sys_catalog-test.cc
index 7d8f361..12d9c69 100644
--- a/src/kudu/master/sys_catalog-test.cc
+++ b/src/kudu/master/sys_catalog-test.cc
@@ -411,8 +411,6 @@ TEST_F(SysCatalogTest, LoadCertAuthorityInfo) {
 
 // Check that if the certificate authority information is already present,
 // it cannot be overwritten using SysCatalogTable::AddCertAuthorityInfo().
-// Basically, this is to verify that SysCatalogTable::AddCertAuthorityInfo()
-// can be called just once to store CA information on first cluster startup.
 TEST_F(SysCatalogTest, AttemptOverwriteCertAuthorityInfo) {
   // The system catalog should already contain newly generated CA private key
   // and certificate: the SetUp() method awaits for the catalog manager
@@ -422,7 +420,7 @@ TEST_F(SysCatalogTest, AttemptOverwriteCertAuthorityInfo) {
   ASSERT_OK(master_->catalog_manager()->sys_catalog()->
             GetCertAuthorityEntry(&ca_entry));
   const Status s = master_->catalog_manager()->sys_catalog()->
-            AddCertAuthorityEntry(ca_entry);
+      AddCertAuthorityEntry(ca_entry);
   ASSERT_TRUE(s.IsCorruption()) << s.ToString();
   ASSERT_EQ("Corruption: One or more rows failed to write", s.ToString());
 }


[3/5] kudu git commit: Fix flaky test TestRecoverFromOpIdOverflow

Posted by ad...@apache.org.
Fix flaky test TestRecoverFromOpIdOverflow

This test is flaky because we race against the COMMIT message for the
first NO_OP in the WAL being written. It is currently hard to know when
the actual COMMIT message is written to the WAL so we use a workaround
to delete the first log segment before restarting the EMC in this test.

If the COMMIT doesn't get written in time then the tablet bootstrap
process doesn't prune that entry at startup time and it ends up in the
new log prior to the much higher-numbered log entries. The flaky test
failure was due to an out of order log index being detected, and looked
like the following error in the log:

F0504 21:28:21.128690 13908 raft_consensus_state.cc:502] Check failed: _s.ok() Bad status: Corruption: New operation's index does not follow the previous op's index. Current: 2147483648.2147483648. Previous: 1.1
*** Check failure stack trace: ***
    @     0x7fe0adef915d  google::LogMessage::Fail() at ??:0
    @     0x7fe0adefb05d  google::LogMessage::SendToLog() at ??:0
    @     0x7fe0adef8c99  google::LogMessage::Flush() at ??:0
    @     0x7fe0adefbaff  google::LogMessageFatal::~LogMessageFatal() at ??:0
    @     0x7fe0b57a8018  kudu::consensus::PendingRounds::AdvanceCommittedIndex() at ??:0
    @     0x7fe0b57848a5  kudu::consensus::RaftConsensus::NotifyCommitIndex() at ??:0
    @     0x7fe0b5749ccf  kudu::consensus::PeerMessageQueue::NotifyObserversOfCommitIndexChangeTask() at ??:0
    @     0x7fe0b575bdc1  kudu::internal::RunnableAdapter<>::Run() at ??:0

Change-Id: Ib382819307da04bb76d68d2c015dc0edd9f60267
Reviewed-on: http://gerrit.cloudera.org:8080/6808
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <da...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/f0580499
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/f0580499
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/f0580499

Branch: refs/heads/master
Commit: f0580499dc50e8a47ff6251301cdc15b9b79edcb
Parents: ca64220
Author: Mike Percy <mp...@apache.org>
Authored: Thu May 4 18:17:34 2017 -0700
Committer: David Ribeiro Alves <da...@gmail.com>
Committed: Wed May 10 23:44:39 2017 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/ts_recovery-itest.cc | 28 ++++++++++++++++++++
 1 file changed, 28 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/f0580499/src/kudu/integration-tests/ts_recovery-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_recovery-itest.cc b/src/kudu/integration-tests/ts_recovery-itest.cc
index dd85464..0a2582a 100644
--- a/src/kudu/integration-tests/ts_recovery-itest.cc
+++ b/src/kudu/integration-tests/ts_recovery-itest.cc
@@ -18,6 +18,8 @@
 #include <memory>
 #include <string>
 
+#include <glog/stl_logging.h>
+
 #include "kudu/client/client.h"
 #include "kudu/consensus/log-test-base.h"
 #include "kudu/consensus/consensus.pb.h"
@@ -363,6 +365,32 @@ TEST_F(TsRecoveryITestDeathTest, TestRecoverFromOpIdOverflow) {
       ASSERT_OK(AppendNoOpsToLogSync(clock, log.get(), &opid, kNumOverflowedEntriesToWrite));
     }, "Check failed: log_index > 0");
 
+    // Before restarting the tablet server, delete the initial log segment from
+    // disk (the original leader election NO_OP) if it exists since it will
+    // contain OpId 1.1; If the COMMIT message for this NO_OP (OpId 1.1) was
+    // not written to disk yet, then it might get written _after_ the ops with
+    // the overflowed ids above, triggering a CHECK about non sequential OpIds.
+    // If we remove the first segment then the tablet will just assume that
+    // commit messages for all replicates in previous segments have already
+    // been written, thus avoiding the check.
+    string wal_dir = fs_manager->GetTabletWalDir(tablet_id);
+    vector<string> wal_children;
+    ASSERT_OK(fs_manager->env()->GetChildren(wal_dir, &wal_children));
+    // Skip '.', '..', and index files.
+    unordered_set<string> wal_segments;
+    for (const auto& filename : wal_children) {
+      if (HasPrefixString(filename, FsManager::kWalFileNamePrefix)) {
+        wal_segments.insert(filename);
+      }
+    }
+    ASSERT_GE(wal_segments.size(), 2) << "Too few WAL segments. Files in dir (" << wal_dir << "): "
+                                      << wal_children;
+    int64_t kLogSegmentIndex = 1;
+    string first_segment = fs_manager->GetWalSegmentFileName(tablet_id, kLogSegmentIndex);
+    if (ContainsKey(wal_segments, first_segment)) {
+      ASSERT_OK(fs_manager->env()->DeleteFile(JoinPathSegments(wal_dir, first_segment)));
+    }
+
     // We also need to update the ConsensusMetadata to match with the term we
     // want to end up with.
     unique_ptr<ConsensusMetadata> cmeta;


[5/5] kudu git commit: KUDU-1549: delete dead LBM containers at startup

Posted by ad...@apache.org.
KUDU-1549: delete dead LBM containers at startup

Full containers with no live blocks are "dead" in that they will never be
used for any block operations. These containers are safe to delete, and
though we lose some forensic information (e.g. block X was created at time Y
and deleted at time Z) in doing so, we'll wind up processing less container
metadata in the next startup.

This patch adds a quick and dirty implementation of dead container deletion
at startup. It would be nice to also do it in real time (perhaps as a
maintenance manager operation), but that requires containers to have shared
ownership, a lifecycle change I'm not keen on making right now.

Change-Id: I73a903092cf89508b7ba97fde3a2d6e691161b4c
Reviewed-on: http://gerrit.cloudera.org:8080/6824
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <da...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e4e59bb5
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e4e59bb5
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e4e59bb5

Branch: refs/heads/master
Commit: e4e59bb5a349514236778926bcc77c8f033da04d
Parents: 501e4cd
Author: Adar Dembo <ad...@cloudera.com>
Authored: Fri May 5 14:10:13 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Thu May 11 00:43:52 2017 +0000

----------------------------------------------------------------------
 src/kudu/fs/log_block_manager-test.cc |  27 ++++++
 src/kudu/fs/log_block_manager.cc      | 131 +++++++++++++++++++++++++----
 src/kudu/fs/log_block_manager.h       |  19 ++++-
 3 files changed, 157 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/e4e59bb5/src/kudu/fs/log_block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index 1118fb2..6afa50d 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -1097,5 +1097,32 @@ TEST_F(LogBlockManagerTest, TestRepairPartialRecords) {
   NO_FATALS(AssertEmptyReport(report));
 }
 
+TEST_F(LogBlockManagerTest, TestDeleteDeadContainersAtStartup) {
+  // Force our single container to become full once created.
+  FLAGS_log_container_max_size = 0;
+
+  // Create one container.
+  unique_ptr<WritableBlock> block;
+  ASSERT_OK(bm_->CreateBlock(&block));
+  ASSERT_OK(block->Append("a"));
+  ASSERT_OK(block->Close());
+  string data_file_name;
+  string metadata_file_name;
+  NO_FATALS(GetOnlyContainerDataFile(&data_file_name));
+  NO_FATALS(GetOnlyContainerDataFile(&metadata_file_name));
+
+  // Reopen the block manager. The container files should still be there.
+  ASSERT_OK(ReopenBlockManager());
+  ASSERT_TRUE(env_->FileExists(data_file_name));
+  ASSERT_TRUE(env_->FileExists(metadata_file_name));
+
+  // Delete the one block and reopen it again. The container files should have
+  // been deleted.
+  ASSERT_OK(bm_->DeleteBlock(block->id()));
+  ASSERT_OK(ReopenBlockManager());
+  ASSERT_FALSE(env_->FileExists(data_file_name));
+  ASSERT_FALSE(env_->FileExists(metadata_file_name));
+}
+
 } // namespace fs
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e4e59bb5/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index d804e70..2efdc09 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -1668,6 +1668,19 @@ void LogBlockManager::AddNewContainerUnlocked(LogBlockContainer* container) {
   }
 }
 
+void LogBlockManager::RemoveFullContainerUnlocked(const string& container_name) {
+  DCHECK(lock_.is_locked());
+  unique_ptr<LogBlockContainer> to_delete(EraseKeyReturnValuePtr(
+      &all_containers_by_name_, container_name));
+  CHECK(to_delete);
+  CHECK(to_delete->full())
+      << Substitute("Container $0 is not full", container_name);
+  if (metrics()) {
+    metrics()->containers->Decrement();
+    metrics()->full_containers->Decrement();
+  }
+}
+
 Status LogBlockManager::GetOrCreateContainer(LogBlockContainer** container) {
   DataDir* dir;
   RETURN_NOT_OK(dd_manager_.GetNextDataDir(&dir));
@@ -1818,9 +1831,14 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
   local_report.misaligned_block_check.emplace();
   local_report.partial_record_check.emplace();
 
-  // Keep track of deleted blocks that we may need to repunch.
+  // Keep track of deleted blocks whose space hasn't been punched; they will
+  // be repunched during repair.
   vector<scoped_refptr<internal::LogBlock>> need_repunching;
 
+  // Keep track of containers that have nothing but dead blocks; they will be
+  // deleted during repair.
+  vector<string> dead_containers;
+
   // Find all containers and open them.
   unordered_set<string> containers_seen;
   vector<string> children;
@@ -1897,10 +1915,21 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
       }
     }
 
-    // Having processed the block records, let's check whether any full
-    // containers have any extra space (left behind after a crash or from an
-    // older version of Kudu).
     if (container->full()) {
+      // Full containers without any live blocks can be deleted outright.
+      //
+      // TODO(adar): this should be reported as an inconsistency once dead
+      // container deletion is also done in real time. Until then, it would be
+      // confusing to report it as such since it'll be a natural event at startup.
+      if (container->live_blocks() == 0) {
+        DCHECK(live_blocks.empty());
+        dead_containers.emplace_back(container->ToString());
+      }
+
+      // Having processed the block records, let's check whether any full
+      // containers have any extra space (left behind after a crash or from an
+      // older version of Kudu).
+      //
       // Filesystems are unpredictable beasts and may misreport the amount of
       // space allocated to a file in various interesting ways. Some examples:
       // - XFS's speculative preallocation feature may artificially enlarge the
@@ -1936,8 +1965,14 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
       if (reported_size > cleanup_threshold_size) {
         local_report.full_container_space_check->entries.emplace_back(
             container->ToString(), reported_size - container->live_bytes_aligned());
-        need_repunching.insert(need_repunching.end(),
-                               dead_blocks.begin(), dead_blocks.end());
+
+        // If the container is to be deleted outright, don't bother repunching
+        // its blocks. The report entry remains, however, so it's clear that
+        // there was a space discrepancy.
+        if (container->live_blocks()) {
+          need_repunching.insert(need_repunching.end(),
+                                 dead_blocks.begin(), dead_blocks.end());
+        }
       }
 
       local_report.stats.lbm_full_container_count++;
@@ -1975,7 +2010,10 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
 
   // Like the rest of Open(), repairs are performed per data directory to take
   // advantage of parallelism.
-  s = Repair(&local_report, std::move(need_repunching));
+  s = Repair(dir,
+             &local_report,
+             std::move(need_repunching),
+             std::move(dead_containers));
   if (!s.ok()) {
     *result_status = s.CloneAndPrepend(Substitute(
         "fatal error while repairing inconsistencies in data directory $0",
@@ -1988,8 +2026,10 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
 }
 
 Status LogBlockManager::Repair(
+    DataDir* dir,
     FsReport* report,
-    vector<scoped_refptr<internal::LogBlock>> need_repunching) {
+    vector<scoped_refptr<internal::LogBlock>> need_repunching,
+    vector<string> dead_containers) {
   if (read_only_) {
     LOG(INFO) << "Read-only block manager, skipping repair";
     return Status::OK();
@@ -2005,20 +2045,68 @@ Status LogBlockManager::Repair(
   unordered_map<std::string, internal::LogBlockContainer*> containers_by_name;
   {
     std::lock_guard<simple_spinlock> l(lock_);
+
+    // Remove all of the dead containers from the block manager. They will be
+    // deleted from disk shortly thereafter, outside of the lock.
+    for (const auto& d : dead_containers) {
+      RemoveFullContainerUnlocked(d);
+    }
+
+    // Fetch all the containers we're going to need.
     if (report->partial_record_check) {
       for (const auto& pr : report->partial_record_check->entries) {
-        containers_by_name[pr.container] =
-            FindOrDie(all_containers_by_name_, pr.container);
+        LogBlockContainer* c = FindPtrOrNull(all_containers_by_name_,
+                                             pr.container);
+        if (c) {
+          containers_by_name[pr.container] = c;
+        }
       }
     }
     if (report->full_container_space_check) {
       for (const auto& fcp : report->full_container_space_check->entries) {
-        containers_by_name[fcp.container] =
-            FindOrDie(all_containers_by_name_, fcp.container);
+        LogBlockContainer* c = FindPtrOrNull(all_containers_by_name_,
+                                             fcp.container);
+        if (c) {
+          containers_by_name[fcp.container] = c;
+        }
       }
     }
   }
 
+
+  // Delete all dead containers.
+  //
+  // After the deletions, the data directory is sync'ed to reduce the chance
+  // of a data file existing without its corresponding metadata file (or vice
+  // versa) in the event of a crash. The block manager would treat such a case
+  // as corruption and require manual intervention.
+  //
+  // TODO(adar) the above is not fool-proof; a crash could manifest in between
+  // any pair of deletions. That said, the odds of it happening are incredibly
+  // rare, and manual resolution isn't hard (just delete the existing file).
+  for (const auto& d : dead_containers) {
+    string data_file_name = StrCat(d, kContainerDataFileSuffix);
+    string metadata_file_name = StrCat(d, kContainerMetadataFileSuffix);
+
+    Status data_file_status;
+    Status metadata_file_status;
+    if (file_cache_) {
+      data_file_status = file_cache_->DeleteFile(data_file_name);
+      metadata_file_status = file_cache_->DeleteFile(metadata_file_name);
+    } else {
+      data_file_status = env_->DeleteFile(data_file_name);
+      metadata_file_status = env_->DeleteFile(metadata_file_name);
+    }
+
+    WARN_NOT_OK(data_file_status,
+                "Could not delete dead container data file " + data_file_name);
+    WARN_NOT_OK(metadata_file_status,
+                "Could not delete dead container metadata file " + metadata_file_name);
+  }
+  if (!dead_containers.empty()) {
+    WARN_NOT_OK(env_->SyncDir(dir->dir()), "Could not sync data directory");
+  }
+
   // Truncate partial metadata records.
   //
   // This is a fatal inconsistency; if the repair fails, we cannot proceed.
@@ -2027,8 +2115,13 @@ Status LogBlockManager::Repair(
       unique_ptr<RWFile> file;
       RWFileOptions opts;
       opts.mode = Env::OPEN_EXISTING;
-      internal::LogBlockContainer* container = FindOrDie(containers_by_name,
-                                                         pr.container);
+      internal::LogBlockContainer* container = FindPtrOrNull(containers_by_name,
+                                                             pr.container);
+      if (!container) {
+        // The container was deleted outright.
+        pr.repaired = true;
+        continue;
+      }
       RETURN_NOT_OK_PREPEND(
           env_->NewRWFile(opts,
                           StrCat(pr.container, kContainerMetadataFileSuffix),
@@ -2080,8 +2173,14 @@ Status LogBlockManager::Repair(
   // disk space consumption.
   if (report->full_container_space_check) {
     for (auto& fcp : report->full_container_space_check->entries) {
-      internal::LogBlockContainer* container = FindOrDie(containers_by_name,
-                                                         fcp.container);
+      internal::LogBlockContainer* container = FindPtrOrNull(containers_by_name,
+                                                             fcp.container);
+      if (!container) {
+        // The container was deleted outright.
+        fcp.repaired = true;
+        continue;
+      }
+
       Status s = container->TruncateDataToNextBlockOffset();
       if (s.ok()) {
         fcp.repaired = true;

http://git-wip-us.apache.org/repos/asf/kudu/blob/e4e59bb5/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index e401446..5a23f37 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -211,8 +211,16 @@ class LogBlockManager : public BlockManager {
       BlockAllocator> BlockMap;
 
   // Adds an as of yet unseen container to this block manager.
+  //
+  // Must be called with 'lock_' held.
   void AddNewContainerUnlocked(internal::LogBlockContainer* container);
 
+  // Removes a previously added container from this block manager. The
+  // container must be full.
+  //
+  // Must be called with 'lock_' held.
+  void RemoveFullContainerUnlocked(const std::string& container_name);
+
   // Returns the next container available for writing using a round-robin
   // selection policy, creating a new one if necessary.
   //
@@ -258,12 +266,15 @@ class LogBlockManager : public BlockManager {
   // already gone.
   scoped_refptr<internal::LogBlock> RemoveLogBlock(const BlockId& block_id);
 
-  // Repairs any inconsistencies described in 'report'. Any blocks in
-  // 'need_repunching' will be punched out again.
+  // Repairs any inconsistencies for 'dir' described in 'report'. Any blocks in
+  // 'need_repunching' will be punched out again. Any containers in
+  // 'dead_containers' will be deleted from disk.
   //
   // Returns an error if repairing a fatal inconsistency failed.
-  Status Repair(FsReport* report,
-                std::vector<scoped_refptr<internal::LogBlock>> need_repunching);
+  Status Repair(DataDir* dir,
+                FsReport* report,
+                std::vector<scoped_refptr<internal::LogBlock>> need_repunching,
+                std::vector<std::string> dead_containers);
 
   // Opens a particular data directory belonging to the block manager. The
   // results of consistency checking (and repair, if applicable) are written to


[4/5] kudu git commit: log block manager: convert container metrics from counters to gauges

Posted by ad...@apache.org.
log block manager: convert container metrics from counters to gauges

Container deletion is just around the corner, so these metrics need to
become gauges so that they can be decremented.

As an aside, it's possible that we've already been mishandling these
counters: they should never drop in value, even across restarts, and we were
incrementing them every time a container was reopened at startup. However,
we may have been OK because the block manager loads before the web UI, so by
the time the metrics are fetchable, the counter values should be the same as
what they were pre-restart.

Change-Id: I343d99d42da7f4fbc0facb476c97f2ca8785031d
Reviewed-on: http://gerrit.cloudera.org:8080/6823
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/501e4cd7
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/501e4cd7
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/501e4cd7

Branch: refs/heads/master
Commit: 501e4cd71d9f861a6fd05cadd1717ec4c220b324
Parents: f058049
Author: Adar Dembo <ad...@cloudera.com>
Authored: Fri May 5 14:03:37 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Thu May 11 00:43:42 2017 +0000

----------------------------------------------------------------------
 src/kudu/fs/log_block_manager-test.cc |  8 +++----
 src/kudu/fs/log_block_manager.cc      | 38 ++++++++++++------------------
 2 files changed, 19 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/501e4cd7/src/kudu/fs/log_block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index a193ff6..1118fb2 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -52,8 +52,8 @@ DECLARE_uint64(log_container_max_size);
 // Log block manager metrics.
 METRIC_DECLARE_gauge_uint64(log_block_manager_bytes_under_management);
 METRIC_DECLARE_gauge_uint64(log_block_manager_blocks_under_management);
-METRIC_DECLARE_counter(log_block_manager_containers);
-METRIC_DECLARE_counter(log_block_manager_full_containers);
+METRIC_DECLARE_gauge_uint64(log_block_manager_containers);
+METRIC_DECLARE_gauge_uint64(log_block_manager_full_containers);
 
 namespace kudu {
 namespace fs {
@@ -185,10 +185,10 @@ static void CheckLogMetrics(const scoped_refptr<MetricEntity>& entity,
   ASSERT_EQ(blocks_under_management, down_cast<AtomicGauge<uint64_t>*>(
                 entity->FindOrNull(METRIC_log_block_manager_blocks_under_management)
                 .get())->value());
-  ASSERT_EQ(containers, down_cast<Counter*>(
+  ASSERT_EQ(containers, down_cast<AtomicGauge<uint64_t>*>(
                 entity->FindOrNull(METRIC_log_block_manager_containers)
                 .get())->value());
-  ASSERT_EQ(full_containers, down_cast<Counter*>(
+  ASSERT_EQ(full_containers, down_cast<AtomicGauge<uint64_t>*>(
                 entity->FindOrNull(METRIC_log_block_manager_full_containers)
                 .get())->value());
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/501e4cd7/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index dd181c9..d804e70 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -99,21 +99,15 @@ METRIC_DEFINE_gauge_uint64(server, log_block_manager_blocks_under_management,
                            kudu::MetricUnit::kBlocks,
                            "Number of data blocks currently under management");
 
-METRIC_DEFINE_counter(server, log_block_manager_containers,
-                      "Number of Block Containers",
-                      kudu::MetricUnit::kLogBlockContainers,
-                      "Number of log block containers");
-
-METRIC_DEFINE_counter(server, log_block_manager_full_containers,
-                      "Number of Full Block Counters",
-                      kudu::MetricUnit::kLogBlockContainers,
-                      "Number of full log block containers");
-
-METRIC_DEFINE_counter(server, log_block_manager_unavailable_containers,
-                      "Number of Unavailable Log Block Containers",
-                      kudu::MetricUnit::kLogBlockContainers,
-                      "Number of non-full log block containers that are under root paths "
-                      "whose disks are full");
+METRIC_DEFINE_gauge_uint64(server, log_block_manager_containers,
+                           "Number of Block Containers",
+                           kudu::MetricUnit::kLogBlockContainers,
+                           "Number of log block containers");
+
+METRIC_DEFINE_gauge_uint64(server, log_block_manager_full_containers,
+                           "Number of Full Block Containers",
+                           kudu::MetricUnit::kLogBlockContainers,
+                           "Number of full log block containers");
 
 namespace kudu {
 
@@ -148,24 +142,22 @@ struct LogBlockManagerMetrics {
   // Implementation-agnostic metrics.
   BlockManagerMetrics generic_metrics;
 
-  scoped_refptr<AtomicGauge<uint64_t> > bytes_under_management;
-  scoped_refptr<AtomicGauge<uint64_t> > blocks_under_management;
+  scoped_refptr<AtomicGauge<uint64_t>> bytes_under_management;
+  scoped_refptr<AtomicGauge<uint64_t>> blocks_under_management;
 
-  scoped_refptr<Counter> containers;
-  scoped_refptr<Counter> full_containers;
+  scoped_refptr<AtomicGauge<uint64_t>> containers;
+  scoped_refptr<AtomicGauge<uint64_t>> full_containers;
 };
 
-#define MINIT(x) x(METRIC_log_block_manager_##x.Instantiate(metric_entity))
 #define GINIT(x) x(METRIC_log_block_manager_##x.Instantiate(metric_entity, 0))
 LogBlockManagerMetrics::LogBlockManagerMetrics(const scoped_refptr<MetricEntity>& metric_entity)
   : generic_metrics(metric_entity),
     GINIT(bytes_under_management),
     GINIT(blocks_under_management),
-    MINIT(containers),
-    MINIT(full_containers) {
+    GINIT(containers),
+    GINIT(full_containers) {
 }
 #undef GINIT
-#undef MINIT
 
 ////////////////////////////////////////////////////////////
 // LogBlock (declaration)


[2/5] kudu git commit: benchmarks: make ensure_cpu_scaling more resilient

Posted by ad...@apache.org.
benchmarks: make ensure_cpu_scaling more resilient

It's called out of a trap, and since the script uses pushd/popd a lot, it's
not always reachable:

  + python write-jobs-stats-to-mysql.py kudu-benchmarks 1039 DenseNodeItest_time_bootstrapping_tablets 8
  usage: write-jobs-stats-to-mysql.py <job_name> <build_number> <workload> <iteration> <runtime>
  + restore_governor
  + ensure_cpu_scaling performance
  ++ dirname ./src/kudu/scripts/benchmarks.sh
  + ./src/kudu/scripts/ensure_cpu_scaling.sh performance
  ./src/kudu/scripts/benchmarks.sh: line 93: ./src/kudu/scripts/ensure_cpu_scaling.sh: No such file or directory

Change-Id: I3452b48577dd8f5f72c57d41ccf41848c682d616
Reviewed-on: http://gerrit.cloudera.org:8080/6847
Tested-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <da...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ca642204
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ca642204
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ca642204

Branch: refs/heads/master
Commit: ca642204fa88ad26e7855af34efae331ea50c74a
Parents: 6b6593a
Author: Adar Dembo <ad...@cloudera.com>
Authored: Wed May 10 15:19:40 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed May 10 23:26:35 2017 +0000

----------------------------------------------------------------------
 src/kudu/scripts/benchmarks.sh | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/ca642204/src/kudu/scripts/benchmarks.sh
----------------------------------------------------------------------
diff --git a/src/kudu/scripts/benchmarks.sh b/src/kudu/scripts/benchmarks.sh
index 4652afb..949099c 100755
--- a/src/kudu/scripts/benchmarks.sh
+++ b/src/kudu/scripts/benchmarks.sh
@@ -71,7 +71,7 @@ NUM_SAMPLES=${NUM_SAMPLES:-10}
 ################################################################
 
 BENCHMARK_MODE=$MODE_JENKINS # we default to "jenkins mode"
-BASE_DIR=""
+BASE_DIR=$(pwd)
 LOGDIR=""
 OUTDIR=""
 
@@ -90,7 +90,7 @@ usage_and_die() {
 }
 
 ensure_cpu_scaling() {
-  $(dirname $BASH_SOURCE)/ensure_cpu_scaling.sh "$@"
+  $BASE_DIR/src/kudu/scripts/ensure_cpu_scaling.sh "$@"
 }
 
 record_result() {
@@ -615,7 +615,6 @@ run() {
 ################################################################
 
 # Figure out where we are, store in global variables.
-BASE_DIR=$(pwd)
 LOGDIR="$BASE_DIR/$LOG_DIR_NAME"
 OUTDIR="$BASE_DIR/$OUT_DIR_NAME"