You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2021/12/07 17:51:48 UTC
[kudu] branch master updated: [bootstrap] Speedup tablet bootstrap
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 3779690 [bootstrap] Speedup tablet bootstrap
3779690 is described below
commit 3779690f2eabe91da7f3c3abf5119579f2294ecf
Author: Yingchun Lai <ac...@gmail.com>
AuthorDate: Mon Nov 22 21:39:45 2021 +0800
[bootstrap] Speedup tablet bootstrap
Speedup tablet bootstrap by
- using multi-threads to load tablets metadata
- short circuit return when found an active memory store
in TabletBootstrap::AnalyzeActiveStores()
- improving branch prediction by adding more PREDICT_FALSE/TRUE
on hot path
A simple startup benchmark has been added, it shows that this patch can
reduce about 85% tablets metadata load time when set
--num_tablets_to_open_simultaneously from default 1 to 8
Change-Id: I816417d2d4c24014edb6b2a40c060f29e37ae219
Reviewed-on: http://gerrit.cloudera.org:8080/18053
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Attila Bukor <ab...@apache.org>
---
src/kudu/fs/fs_manager.cc | 16 ++--
src/kudu/fs/log_block_manager.cc | 4 +-
src/kudu/tablet/tablet_bootstrap.cc | 73 ++++++++----------
src/kudu/tablet/tablet_metadata.cc | 7 ++
src/kudu/tserver/ts_tablet_manager-test.cc | 52 +++++++++++--
src/kudu/tserver/ts_tablet_manager.cc | 120 +++++++++++++++++++++--------
src/kudu/tserver/ts_tablet_manager.h | 2 +-
7 files changed, 183 insertions(+), 91 deletions(-)
diff --git a/src/kudu/fs/fs_manager.cc b/src/kudu/fs/fs_manager.cc
index d58a195..03fd2cd 100644
--- a/src/kudu/fs/fs_manager.cc
+++ b/src/kudu/fs/fs_manager.cc
@@ -681,21 +681,20 @@ string FsManager::GetTabletMetadataPath(const string& tablet_id) const {
bool FsManager::IsValidTabletId(const string& fname) {
// Prevent warning logs for hidden files or ./..
- if (HasPrefixString(fname, ".")) {
+ if (PREDICT_FALSE(HasPrefixString(fname, "."))) {
VLOG(1) << "Ignoring hidden file in tablet metadata dir: " << fname;
return false;
}
string canonicalized_uuid;
Status s = oid_generator_.Canonicalize(fname, &canonicalized_uuid);
-
- if (!s.ok()) {
+ if (PREDICT_FALSE(!s.ok())) {
LOG(WARNING) << "Ignoring file in tablet metadata dir: " << fname << ": " <<
s.message().ToString();
return false;
}
- if (fname != canonicalized_uuid) {
+ if (PREDICT_FALSE(fname != canonicalized_uuid)) {
LOG(WARNING) << "Ignoring file in tablet metadata dir: " << fname << ": " <<
Substitute("canonicalized uuid $0 does not match file name",
canonicalized_uuid);
@@ -711,12 +710,13 @@ Status FsManager::ListTabletIds(vector<string>* tablet_ids) {
RETURN_NOT_OK_PREPEND(ListDir(dir, &children),
Substitute("Couldn't list tablets in metadata directory $0", dir));
- vector<string> tablets;
- for (const string& child : children) {
- if (!IsValidTabletId(child)) {
+ // Suppose all children are valid tablet metadata.
+ tablet_ids->reserve(children.size());
+ for (auto& child : children) {
+ if (PREDICT_FALSE(!IsValidTabletId(child))) {
continue;
}
- tablet_ids->push_back(child);
+ tablet_ids->emplace_back(std::move(child));
}
return Status::OK();
}
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 4cb27f5..a9973dd 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -2990,7 +2990,7 @@ Status LogBlockManager::Repair(
Status first_fatal_error;
SCOPED_LOG_TIMING(INFO, "loading block containers with low live blocks");
for (const auto& e : low_live_block_containers) {
- if (seen_fatal_error.load()) {
+ if (seen_fatal_error) {
break;
}
LogBlockContainerRefPtr container = FindPtrOrNull(containers_by_name, e.first);
@@ -3031,7 +3031,7 @@ Status LogBlockManager::Repair(
}
dir->WaitOnClosures();
- if (seen_fatal_error.load()) {
+ if (seen_fatal_error) {
LOG_AND_RETURN(WARNING, first_fatal_error);
}
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 790fd53..7ccedfe 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -178,7 +178,7 @@ class FlushedStoresSnapshot {
bool IsMemStoreActive(const MemStoreTargetPB& target) const;
private:
- int64_t last_durable_mrs_id_;
+ int64_t last_durable_mrs_id_ = 0;
unordered_map<int64_t, int64_t> flushed_dms_by_drs_id_;
DISALLOW_COPY_AND_ASSIGN(FlushedStoresSnapshot);
@@ -221,7 +221,7 @@ class FlushedStoresSnapshot {
// original WALs, following a crash during bootstrapping, subsequent bootstraps
// should attempt to replay segments out of the recovery directory.
//
-// TODO(dralves): Because the table that is being rebuilt is never
+// TODO(dralves): Because the tablet that is being rebuilt is never
// flushed/compacted, consensus is only set on the tablet after bootstrap, when
// we get to flushes/compactions though we need to set it before replay or we
// won't be able to re-rebuild.
@@ -651,7 +651,7 @@ Status TabletBootstrap::RunBootstrap(shared_ptr<Tablet>* rebuilt_tablet,
// This is a new tablet, nothing left to do.
if (!has_blocks && !needs_recovery) {
- LOG_WITH_PREFIX(INFO) << "No blocks or log segments found. Creating new log.";
+ LOG_WITH_PREFIX(INFO) << "Neither blocks nor log segments found. Creating new log.";
RETURN_NOT_OK_PREPEND(OpenNewLog(), "Failed to open new log");
RETURN_NOT_OK(FinishBootstrap("No bootstrap required, opened a new log",
rebuilt_log, rebuilt_tablet));
@@ -816,7 +816,7 @@ struct ReplayState {
// Return true if 'b' is allowed to immediately follow 'a' in the log.
static bool IsValidSequence(const OpId& a, const OpId& b) {
- if (a.term() == 0 && a.index() == 0) {
+ if (PREDICT_FALSE(a.term() == 0 && a.index() == 0)) {
// Not initialized - can start with any opid.
return true;
}
@@ -831,7 +831,7 @@ struct ReplayState {
return true;
}
- // Return a Corruption status if 'id' seems to be out-of-sequence in the log.
+ // Return a Corruption status if 'msg' seems to be out-of-sequence in the log.
Status CheckSequentialReplicateId(const ReplicateMsg& msg) {
DCHECK(msg.has_id());
if (PREDICT_FALSE(!IsValidSequence(prev_op_id, msg.id()))) {
@@ -861,7 +861,7 @@ struct ReplayState {
}
}
- void DumpReplayStateToStrings(vector<string>* strings) const {
+ void DumpReplayStateToStrings(vector<string>* strings) const {
strings->push_back(Substitute("ReplayState: Previous OpId: $0, Committed OpId: $1, "
"Pending Replicates: $2, Pending Commits: $3", OpIdToString(prev_op_id),
OpIdToString(committed_op_id), pending_replicates.size(), pending_commits.size()));
@@ -952,6 +952,7 @@ Status TabletBootstrap::HandleReplicateMessage(ReplayState* state,
const ReplicateMsg& replicate = entry->replicate();
RETURN_NOT_OK(state->CheckSequentialReplicateId(replicate));
DCHECK(replicate.has_timestamp());
+ // TODO(yingchun): Should we try to update clock by batch?
CHECK_OK(UpdateClock(replicate.timestamp()));
// Append the replicate message to the log as is
@@ -959,7 +960,7 @@ Status TabletBootstrap::HandleReplicateMessage(ReplayState* state,
const int64_t index = replicate.id().index();
const auto existing_entry_iter = state->pending_replicates.find(index);
- if (existing_entry_iter != state->pending_replicates.end()) {
+ if (PREDICT_FALSE(existing_entry_iter != state->pending_replicates.end())) {
// If there was a entry with the same index we're overwriting then we need
// to delete that entry and all entries with higher indexes.
const auto& existing_entry = existing_entry_iter->second;
@@ -1005,8 +1006,8 @@ Status TabletBootstrap::HandleCommitMessage(const IOContext* io_context, ReplayS
// If there are no pending replicates, or if this commit's index is lower than the
// the first pending replicate on record this is likely an orphaned commit.
- if (state->pending_replicates.empty() ||
- (*state->pending_replicates.begin()).first > committed_op_id.index()) {
+ if (PREDICT_FALSE(state->pending_replicates.empty() ||
+ (*state->pending_replicates.begin()).first > committed_op_id.index())) {
VLOG_WITH_PREFIX(2) << "Found orphaned commit for " << committed_op_id;
RETURN_NOT_OK(CheckOrphanedCommitDoesntNeedReplay(entry->commit()));
stats_.orphaned_commits++;
@@ -1049,21 +1050,17 @@ Status TabletBootstrap::HandleCommitMessage(const IOContext* io_context, ReplayS
TabletBootstrap::ActiveStores TabletBootstrap::AnalyzeActiveStores(const CommitMsg& commit) {
bool has_mutated_stores = false;
- bool has_active_stores = false;
for (const OperationResultPB& op_result : commit.result().ops()) {
for (const MemStoreTargetPB& mutated_store : op_result.mutated_stores()) {
has_mutated_stores = true;
if (flushed_stores_.IsMemStoreActive(mutated_store)) {
- has_active_stores = true;
+ return SOME_STORES_ACTIVE;
}
}
}
- if (!has_mutated_stores) {
- return NO_MUTATED_STORES;
- }
- return has_active_stores ? SOME_STORES_ACTIVE : NO_STORES_ACTIVE;
+ return has_mutated_stores ? NO_STORES_ACTIVE : NO_MUTATED_STORES;
}
Status TabletBootstrap::CheckOrphanedCommitDoesntNeedReplay(const CommitMsg& commit) {
@@ -1089,11 +1086,10 @@ Status TabletBootstrap::ApplyCommitMessage(const IOContext* io_context,
// deleted log segment though).
unique_ptr<LogEntryPB> pending_replicate_entry(EraseKeyReturnValuePtr(
&state->pending_replicates, committed_op_id.index()));
- if (pending_replicate_entry) {
- // We found a replicate with the same index, make sure it also has the same
- // term.
+ if (PREDICT_TRUE(pending_replicate_entry)) {
+ // We found a replicate with the same index, make sure it also has the same term.
const auto& replicate = pending_replicate_entry->replicate();
- if (!OpIdEquals(committed_op_id, replicate.id())) {
+ if (PREDICT_FALSE(!OpIdEquals(committed_op_id, replicate.id()))) {
string error_msg = Substitute("Committed operation's OpId: $0 didn't match the"
"commit message's committed OpId: $1. Pending operation: $2, Commit message: $3",
SecureShortDebugString(replicate.id()),
@@ -1120,8 +1116,8 @@ Status TabletBootstrap::HandleEntryPair(const IOContext* io_context, LogEntryPB*
#define RETURN_NOT_OK_REPLAY(ReplayMethodName, io_context, replicate, commit) \
RETURN_NOT_OK_PREPEND(ReplayMethodName(io_context, replicate, commit), \
- Substitute(error_fmt, OperationType_Name(op_type), \
- SecureShortDebugString(*(replicate)), \
+ Substitute(error_fmt, OperationType_Name(op_type), \
+ SecureShortDebugString(*(replicate)), \
SecureShortDebugString(commit)))
ReplicateMsg* replicate = replicate_entry->mutable_replicate();
@@ -1156,6 +1152,7 @@ Status TabletBootstrap::HandleEntryPair(const IOContext* io_context, LogEntryPB*
#undef RETURN_NOT_OK_REPLAY
+ // TODO(yingchun): We should try to avoid update MVCC's safe time for every entry?
// We should only advance MVCC's safe time based on a specific set of
// operations: those whose timestamps are guaranteed to be monotonically
// increasing with respect to their entries in the write-ahead log.
@@ -1272,7 +1269,7 @@ Status TabletBootstrap::PlaySegments(const IOContext* io_context,
string entry_debug_info;
s = HandleEntry(io_context, &state, std::move(entry), &entry_debug_info);
- if (!s.ok()) {
+ if (PREDICT_FALSE(!s.ok())) {
DumpReplayStateToLog(state);
RETURN_NOT_OK_PREPEND(s, DebugInfo(tablet_->tablet_id(),
segment->header().sequence_number(),
@@ -1303,22 +1300,20 @@ Status TabletBootstrap::PlaySegments(const IOContext* io_context,
// If we have non-applied commits they all must belong to pending operations and
// they should only pertain to stores which are still active.
- if (!state.pending_commits.empty()) {
- for (const OpIndexToEntryMap::value_type& entry : state.pending_commits) {
- if (!ContainsKey(state.pending_replicates, entry.first)) {
- DumpReplayStateToLog(state);
- return Status::Corruption("Had orphaned commits at the end of replay.");
- }
+ for (const auto& entry : state.pending_commits) {
+ if (!ContainsKey(state.pending_replicates, entry.first)) {
+ DumpReplayStateToLog(state);
+ return Status::Corruption("Had orphaned commits at the end of replay.");
+ }
- if (entry.second->commit().op_type() == WRITE_OP &&
- AnalyzeActiveStores(entry.second->commit()) == NO_STORES_ACTIVE) {
- DumpReplayStateToLog(state);
- TabletSuperBlockPB super;
- WARN_NOT_OK(tablet_meta_->ToSuperBlock(&super), "Couldn't build TabletSuperBlockPB.");
- return Status::Corruption(Substitute("CommitMsg was pending but it did not refer "
- "to any active memory stores. Commit: $0. TabletMetadata: $1",
- SecureShortDebugString(entry.second->commit()), SecureShortDebugString(super)));
- }
+ if (entry.second->commit().op_type() == WRITE_OP &&
+ AnalyzeActiveStores(entry.second->commit()) == NO_STORES_ACTIVE) {
+ DumpReplayStateToLog(state);
+ TabletSuperBlockPB super;
+ WARN_NOT_OK(tablet_meta_->ToSuperBlock(&super), "Couldn't build TabletSuperBlockPB.");
+ return Status::Corruption(Substitute("CommitMsg was pending but it did not refer "
+ "to any active memory stores. Commit: $0. TabletMetadata: $1",
+ SecureShortDebugString(entry.second->commit()), SecureShortDebugString(super)));
}
}
@@ -1366,8 +1361,8 @@ Status TabletBootstrap::PlaySegments(const IOContext* io_context,
}
// Set up the ConsensusBootstrapInfo structure for the caller.
- for (OpIndexToEntryMap::value_type& e : state.pending_replicates) {
- consensus_info->orphaned_replicates.push_back(e.second->release_replicate());
+ for (auto& entry : state.pending_replicates) {
+ consensus_info->orphaned_replicates.push_back(entry.second->release_replicate());
}
consensus_info->last_id = state.prev_op_id;
consensus_info->last_committed_id = state.committed_op_id;
diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc
index 4e4e244..00b5de1 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -51,6 +51,7 @@
#include "kudu/tablet/txn_participant.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/env.h"
+#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/pb_util.h"
@@ -65,6 +66,11 @@ TAG_FLAG(enable_tablet_orphaned_block_deletion, advanced);
TAG_FLAG(enable_tablet_orphaned_block_deletion, hidden);
TAG_FLAG(enable_tablet_orphaned_block_deletion, runtime);
+DEFINE_int32(tablet_metadata_load_inject_latency_ms, 0,
+ "Amount of latency in ms to inject when load tablet metadata file. "
+ "Only for testing.");
+TAG_FLAG(tablet_metadata_load_inject_latency_ms, hidden);
+
using base::subtle::Barrier_AtomicIncrement;
using kudu::consensus::MinimumOpId;
using kudu::consensus::OpId;
@@ -139,6 +145,7 @@ Status TabletMetadata::CreateNew(FsManager* fs_manager,
Status TabletMetadata::Load(FsManager* fs_manager,
const string& tablet_id,
scoped_refptr<TabletMetadata>* metadata) {
+ MAYBE_INJECT_FIXED_LATENCY(FLAGS_tablet_metadata_load_inject_latency_ms);
scoped_refptr<TabletMetadata> ret(new TabletMetadata(fs_manager, tablet_id));
RETURN_NOT_OK(ret->LoadFromDisk());
metadata->swap(ret);
diff --git a/src/kudu/tserver/ts_tablet_manager-test.cc b/src/kudu/tserver/ts_tablet_manager-test.cc
index 8ce9bc9..b72b2de 100644
--- a/src/kudu/tserver/ts_tablet_manager-test.cc
+++ b/src/kudu/tserver/ts_tablet_manager-test.cc
@@ -26,7 +26,7 @@
#include <vector>
#include <boost/optional/optional.hpp>
-#include <gflags/gflags_declare.h>
+#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
@@ -39,6 +39,7 @@
#include "kudu/consensus/raft_consensus.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/master.pb.h"
#include "kudu/tablet/local_tablet_writer.h"
#include "kudu/tablet/metadata.pb.h"
@@ -49,13 +50,22 @@
#include "kudu/tserver/heartbeater.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/tserver/tablet_server.h"
+#include "kudu/util/logging.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
+#include "kudu/util/oid_generator.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
+DEFINE_int32(startup_benchmark_tablet_count_for_testing, 100,
+ "Tablet count to do startup benchmark.");
+
+DECLARE_bool(enable_leader_failure_detection);
+DECLARE_int32(num_tablets_to_open_simultaneously);
+DECLARE_bool(tablet_bootstrap_skip_opening_tablet_for_testing);
+DECLARE_int32(tablet_metadata_load_inject_latency_ms);
DECLARE_int32(update_tablet_metrics_interval_ms);
#define ASSERT_REPORT_HAS_UPDATED_TABLET(report, tablet_id) \
@@ -75,6 +85,7 @@ using kudu::tablet::TabletReplica;
using std::string;
using std::unique_ptr;
using std::vector;
+using strings::Substitute;
namespace kudu {
@@ -108,6 +119,7 @@ class TsTabletManagerTest : public KuduTest {
Status CreateNewTablet(const std::string& tablet_id,
const Schema& schema,
+ bool wait_leader,
boost::optional<TableExtraConfigPB> extra_config,
boost::optional<std::string> dimension_label,
scoped_refptr<tablet::TabletReplica>* out_tablet_replica) {
@@ -128,7 +140,8 @@ class TsTabletManagerTest : public KuduTest {
}
RETURN_NOT_OK(tablet_replica->WaitUntilConsensusRunning(MonoDelta::FromMilliseconds(2000)));
- return tablet_replica->consensus()->WaitUntilLeader(MonoDelta::FromSeconds(10));
+ return wait_leader ? tablet_replica->consensus()->WaitUntilLeader(MonoDelta::FromSeconds(10)) :
+ Status::OK();
}
void GenerateFullTabletReport(TabletReportPB* report) {
@@ -177,9 +190,9 @@ TEST_F(TsTabletManagerTest, TestCreateTablet) {
extra_config.set_history_max_age_sec(7200);
// Create a new tablet.
- ASSERT_OK(CreateNewTablet(tablet1, schema_, boost::none, boost::none, &replica1));
+ ASSERT_OK(CreateNewTablet(tablet1, schema_, true, boost::none, boost::none, &replica1));
// Create a new tablet with extra config.
- ASSERT_OK(CreateNewTablet(tablet2, schema_, extra_config, boost::none, &replica2));
+ ASSERT_OK(CreateNewTablet(tablet2, schema_, true, extra_config, boost::none, &replica2));
ASSERT_EQ(tablet1, replica1->tablet()->tablet_id());
ASSERT_EQ(tablet2, replica2->tablet()->tablet_id());
ASSERT_EQ(boost::none, replica1->tablet()->metadata()->extra_config());
@@ -259,7 +272,7 @@ TEST_F(TsTabletManagerTest, TestTabletReports) {
MarkTabletReportAcknowledged(report);
// Create a tablet and do another incremental report - should include the tablet.
- ASSERT_OK(CreateNewTablet("tablet-1", schema_, boost::none, boost::none, nullptr));
+ ASSERT_OK(CreateNewTablet("tablet-1", schema_, true, boost::none, boost::none, nullptr));
int updated_tablets = 0;
while (updated_tablets != 1) {
GenerateIncrementalTabletReport(&report);
@@ -286,7 +299,7 @@ TEST_F(TsTabletManagerTest, TestTabletReports) {
MarkTabletReportAcknowledged(report);
// Create a second tablet, and ensure the incremental report shows it.
- ASSERT_OK(CreateNewTablet("tablet-2", schema_, boost::none, boost::none, nullptr));
+ ASSERT_OK(CreateNewTablet("tablet-2", schema_, true, boost::none, boost::none, nullptr));
// Wait up to 10 seconds to get a tablet report from tablet-2.
// TabletReplica does not mark tablets dirty until after it commits the
@@ -333,8 +346,8 @@ TEST_F(TsTabletManagerTest, TestTabletStatsReports) {
// 1. Create two tablets.
scoped_refptr<tablet::TabletReplica> replica1;
- ASSERT_OK(CreateNewTablet("tablet-1", schema_, boost::none, boost::none, &replica1));
- ASSERT_OK(CreateNewTablet("tablet-2", schema_, boost::none, boost::none, nullptr));
+ ASSERT_OK(CreateNewTablet("tablet-1", schema_, true, boost::none, boost::none, &replica1));
+ ASSERT_OK(CreateNewTablet("tablet-2", schema_, true, boost::none, boost::none, nullptr));
// 2. Do a full report - should include two tablets and statistics are uninitialized.
NO_FATALS(GenerateFullTabletReport(&report));
@@ -399,5 +412,28 @@ TEST_F(TsTabletManagerTest, TestTabletStatsReports) {
MarkTabletReportAcknowledged(report);
}
+TEST_F(TsTabletManagerTest, StartupBenchmark) {
+ const int64_t kTabletCount = FLAGS_startup_benchmark_tablet_count_for_testing;
+
+ FLAGS_enable_leader_failure_detection = false;
+
+ // Mute logs, cause there are too many tablets to be created.
+ FLAGS_minloglevel = 2;
+ ObjectIdGenerator generator;
+ for (int i = 0; i < kTabletCount; i++) {
+ KLOG_EVERY_N_SECS(ERROR, 1) << Substitute("Created tablet ($0/$1 complete)", i, kTabletCount);
+ ASSERT_OK(CreateNewTablet(generator.Next(), schema_, false, boost::none, boost::none, nullptr));
+ }
+
+ mini_server_->Shutdown();
+ // Revert log level to see how much time cost when load tablet metadata.
+ FLAGS_minloglevel = 0;
+ FLAGS_tablet_bootstrap_skip_opening_tablet_for_testing = true;
+ FLAGS_tablet_metadata_load_inject_latency_ms = 2;
+ ASSERT_OK(mini_server_->Start());
+ // Mute logs, cause there are too many tablets to be shutdown.
+ FLAGS_minloglevel = 2;
+}
+
} // namespace tserver
} // namespace kudu
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index ab6c7eb..3d16de4 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -184,6 +184,11 @@ DEFINE_int32(txn_participant_registration_inject_latency_ms, 0,
TAG_FLAG(txn_participant_registration_inject_latency_ms, runtime);
TAG_FLAG(txn_participant_registration_inject_latency_ms, unsafe);
+DEFINE_bool(tablet_bootstrap_skip_opening_tablet_for_testing, false,
+ "Whether to skip opening tablet when bootstrap."
+ "Only for testing.");
+TAG_FLAG(tablet_bootstrap_skip_opening_tablet_for_testing, hidden);
+
DECLARE_bool(raft_prepare_replacement_before_eviction);
DECLARE_uint32(txn_staleness_tracker_interval_ms);
@@ -435,6 +440,7 @@ Status TSTabletManager::Init(Timer* start_tablets,
RETURN_NOT_OK(ThreadPoolBuilder("tablet-open")
.set_max_threads(max_open_threads)
.Build(&open_tablet_pool_));
+
int max_delete_threads = FLAGS_num_tablets_to_delete_simultaneously;
if (max_delete_threads == 0) {
// Default to the number of disks.
@@ -479,51 +485,99 @@ Status TSTabletManager::Init(Timer* start_tablets,
InitLocalRaftPeerPB();
- vector<scoped_refptr<TabletMetadata>> metas;
+ vector<scoped_refptr<TabletMetadata>> metas(tablet_ids.size());
// First, load all of the tablet metadata. We do this before we start
// submitting the actual OpenTablet() tasks so that we don't have to compete
// for disk resources, etc, with bootstrap processes and running tablets.
- int loaded_count = 0;
- for (const string& tablet_id : tablet_ids) {
- KLOG_EVERY_N_SECS(INFO, 1) << Substitute("Loading tablet metadata ($0/$1 complete)",
- loaded_count, tablet_ids.size());
- scoped_refptr<TabletMetadata> meta;
- RETURN_NOT_OK_PREPEND(OpenTabletMeta(tablet_id, &meta),
- "Failed to open tablet metadata for tablet: " + tablet_id);
- loaded_count++;
- if (meta->tablet_data_state() != TABLET_DATA_READY) {
- RETURN_NOT_OK(HandleNonReadyTabletOnStartup(meta));
- continue;
+ {
+ SCOPED_LOG_TIMING(INFO, Substitute("load tablet metadata"));
+ std::atomic<int> total_loaded_count = 0;
+ std::atomic<int> success_loaded_count = 0;
+ std::atomic<bool> seen_error = false;
+ Status first_error;
+ for (int i = 0; i < tablet_ids.size(); i++) {
+ if (seen_error) {
+ // If seen any error, we should abort loading tablet metadata.
+ break;
+ }
+
+ RETURN_NOT_OK(open_tablet_pool_->Submit([this, i, tablet_ids, &total_loaded_count,
+ &success_loaded_count, &metas,
+ &seen_error, &first_error]() {
+ const string& tablet_id = tablet_ids[i];
+ Status s;
+ do {
+ KLOG_EVERY_N_SECS(INFO, 1) << Substitute("Loading tablet metadata ($0/$1 complete)",
+ total_loaded_count.load(), tablet_ids.size());
+
+ scoped_refptr<TabletMetadata> meta;
+ s = OpenTabletMeta(tablet_id, &meta);
+ if (!s.ok()) {
+ s = s.CloneAndPrepend(Substitute("could not open tablet metadata: $0", tablet_id));
+ break;
+ }
+
+ total_loaded_count++;
+
+ if (meta->tablet_data_state() != TABLET_DATA_READY) {
+ s = HandleNonReadyTabletOnStartup(meta);
+ if (!s.ok()) {
+ s = s.CloneAndPrepend(Substitute("could not handle non-ready tablet: $0", tablet_id));
+ }
+ break;
+ }
+
+ success_loaded_count++;
+ metas[i] = meta;
+ } while (false);
+
+ if (!s.ok()) {
+ bool current_seen_error = false;
+ if (seen_error.compare_exchange_strong(current_seen_error, true)) {
+ first_error = s;
+ }
+ }
+ }));
}
- metas.push_back(meta);
+ open_tablet_pool_->Wait();
+ if (seen_error) {
+ LOG_AND_RETURN(WARNING, first_error);
+ }
+
+ LOG(INFO) << Substitute("Loaded tablet metadata ($0 total tablets, $1 live tablets)",
+ total_loaded_count.load(), success_loaded_count.load());
+ *tablets_total = success_loaded_count.load();
}
- LOG(INFO) << Substitute("Loaded tablet metadata ($0 total tablets, $1 live tablets)",
- loaded_count, metas.size());
// Now submit the "Open" task for each.
- *tablets_total = metas.size();
METRIC_tablets_num_total_startup.Instantiate(server_->metric_entity(), *tablets_total);
*tablets_processed = 0;
int registered_count = 0;
- for (const auto& meta : metas) {
- KLOG_EVERY_N_SECS(INFO, 1) << Substitute("Registering tablets ($0/$1 complete)",
- registered_count, metas.size());
- scoped_refptr<TransitionInProgressDeleter> deleter;
- {
- std::lock_guard<RWMutex> lock(lock_);
- CHECK_OK(StartTabletStateTransitionUnlocked(meta->tablet_id(), "opening tablet", &deleter));
- }
+ if (PREDICT_TRUE(!FLAGS_tablet_bootstrap_skip_opening_tablet_for_testing)) {
+ SCOPED_LOG_TIMING(INFO, Substitute("register tablets"));
+ for (const auto& meta : metas) {
+ if (!meta.get()) {
+ continue;
+ }
+ KLOG_EVERY_N_SECS(INFO, 1) << Substitute("Registering tablets ($0/$1 complete)",
+ registered_count, metas.size());
+ scoped_refptr<TransitionInProgressDeleter> deleter;
+ {
+ std::lock_guard<RWMutex> lock(lock_);
+ CHECK_OK(StartTabletStateTransitionUnlocked(meta->tablet_id(), "opening tablet", &deleter));
+ }
- scoped_refptr<TabletReplica> replica;
- RETURN_NOT_OK(CreateAndRegisterTabletReplica(meta, NEW_REPLICA, &replica));
- RETURN_NOT_OK(open_tablet_pool_->Submit([this, replica, deleter, tablets_processed,
- tablets_total, start_tablets]() {
- this->OpenTablet(replica, deleter, tablets_processed, tablets_total, start_tablets);
- }));
- registered_count++;
+ scoped_refptr<TabletReplica> replica;
+ RETURN_NOT_OK(CreateAndRegisterTabletReplica(meta, NEW_REPLICA, &replica));
+ RETURN_NOT_OK(open_tablet_pool_->Submit(
+ [this, replica, deleter, tablets_processed, tablets_total, start_tablets]() {
+ this->OpenTablet(replica, deleter, tablets_processed, tablets_total, start_tablets);
+ }));
+ registered_count++;
+ }
+ LOG(INFO) << Substitute("Registered $0 tablets", registered_count);
}
- LOG(INFO) << Substitute("Registered $0 tablets", registered_count);
if (registered_count == 0) {
start_tablets->Stop();
@@ -1277,7 +1331,7 @@ void TSTabletManager::OpenTablet(const scoped_refptr<tablet::TabletReplica>& rep
VLOG(1) << LogPrefix(tablet_id) << "Bootstrapping tablet";
TRACE("Bootstrapping tablet");
- if (FLAGS_tablet_bootstrap_inject_latency_ms > 0) {
+ if (PREDICT_FALSE(FLAGS_tablet_bootstrap_inject_latency_ms > 0)) {
LOG(WARNING) << "Injecting " << FLAGS_tablet_bootstrap_inject_latency_ms
<< "ms delay in tablet bootstrapping";
SleepFor(MonoDelta::FromMilliseconds(FLAGS_tablet_bootstrap_inject_latency_ms));
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index ed45c8d..11ea8ad 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -354,7 +354,7 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
// Create and register a new TabletReplica, given tablet metadata.
// Calls RegisterTablet() with the given 'mode' parameter after constructing
- // the TablerPeer object. See RegisterTablet() for details about the
+ // the TabletReplica object. See RegisterTablet() for details about the
// semantics of 'mode' and the locking requirements.
Status CreateAndRegisterTabletReplica(scoped_refptr<tablet::TabletMetadata> meta,
RegisterTabletReplicaMode mode,