You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/09/01 00:30:43 UTC
kudu git commit: master: always use smart pointers when accessing
TableInfo and TabletInfo
Repository: kudu
Updated Branches:
refs/heads/master 1cfdf36fc -> 22835062f
master: always use smart pointers when accessing TableInfo and TabletInfo
Mixing scoped_refptr and raw access is error-prone and complicated code by
forcing the use of both in certain places. It's fair to be concerned about
the overhead; as far as I can tell it's minimal, because it's either in
functions that are already "slow" (like {Create,Alter,Delete}Table and the
equivalent in CatalogManagerBgTasks), or the call sites pass the object by
cref and thus avoid an extra ref/deref in the first place.
The one exception is in TableInfo::tablet_map, which would create a cycle if
it held strong refs to its TabletInfos. I tried to prevent this exception
from leaking outside of TableInfo.
I also reduced the number of ways to add/remove TabletInfo objects to a
TableInfo down to one.
Finally, I couldn't help myself and did some more modernization:
- Removed a couple std:: prefixes.
- Added "auto" to some loops.
- Replaced some vector::push_back calls with emplace_back.
- Reformatted double '>' (C++11 allows them to be adjacent).
Change-Id: I3c74f2aa048ee85cc3a5f863ce8f38147e78c5ae
Reviewed-on: http://gerrit.cloudera.org:8080/7909
Tested-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/22835062
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/22835062
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/22835062
Branch: refs/heads/master
Commit: 22835062f0c9b77a5a5786411a9f22f1d6a58a06
Parents: 1cfdf36
Author: Adar Dembo <ad...@cloudera.com>
Authored: Wed Aug 30 17:08:19 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Fri Sep 1 00:25:52 2017 +0000
----------------------------------------------------------------------
src/kudu/master/catalog_manager-test.cc | 18 +--
src/kudu/master/catalog_manager.cc | 205 ++++++++++++---------------
src/kudu/master/catalog_manager.h | 50 +++----
src/kudu/master/sys_catalog-test.cc | 111 ++++++---------
src/kudu/master/sys_catalog.cc | 22 +--
src/kudu/master/sys_catalog.h | 36 +++--
6 files changed, 191 insertions(+), 251 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/22835062/src/kudu/master/catalog_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager-test.cc b/src/kudu/master/catalog_manager-test.cc
index 3b52869..8a5ce2e 100644
--- a/src/kudu/master/catalog_manager-test.cc
+++ b/src/kudu/master/catalog_manager-test.cc
@@ -28,7 +28,6 @@
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.pb.h"
#include "kudu/master/ts_descriptor.h"
-#include "kudu/util/cow_object.h"
#include "kudu/util/monotime.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
@@ -47,7 +46,7 @@ namespace master {
TEST(TableInfoTest, TestAssignmentRanges) {
const string table_id = CURRENT_TEST_NAME();
scoped_refptr<TableInfo> table(new TableInfo(table_id));
- vector<scoped_refptr<TabletInfo> > tablets;
+ vector<scoped_refptr<TabletInfo>> tablets;
// Define & create the splits.
const int kNumSplits = 3;
@@ -57,16 +56,16 @@ TEST(TableInfoTest, TestAssignmentRanges) {
const string& end_key = (i == kNumSplits) ? "" : split_keys[i];
string tablet_id = Substitute("tablet-$0-$1", start_key, end_key);
- TabletInfo* tablet = new TabletInfo(table, tablet_id);
- TabletMetadataLock meta_lock(tablet, TabletMetadataLock::WRITE);
+ scoped_refptr<TabletInfo> tablet = new TabletInfo(table, tablet_id);
+ TabletMetadataLock meta_lock(tablet.get(), TabletMetadataLock::WRITE);
PartitionPB* partition = meta_lock.mutable_data()->pb.mutable_partition();
partition->set_partition_key_start(start_key);
partition->set_partition_key_end(end_key);
meta_lock.mutable_data()->pb.set_state(SysTabletsEntryPB::RUNNING);
meta_lock.Commit();
- table->AddTablet(tablet);
- tablets.push_back(make_scoped_refptr(tablet));
+ table->AddRemoveTablets({ tablet }, {});
+ tablets.push_back(tablet);
}
// Ensure they give us what we are expecting.
@@ -81,7 +80,7 @@ TEST(TableInfoTest, TestAssignmentRanges) {
req.set_max_returned_locations(1);
req.mutable_table()->mutable_table_name()->assign(table_id);
req.mutable_partition_key_start()->assign(start_key);
- vector<scoped_refptr<TabletInfo> > tablets_in_range;
+ vector<scoped_refptr<TabletInfo>> tablets_in_range;
table->GetTabletsInRange(&req, &tablets_in_range);
// Only one tablet should own this key.
@@ -91,10 +90,7 @@ TEST(TableInfoTest, TestAssignmentRanges) {
LOG(INFO) << "Key " << start_key << " found in tablet " << tablet_id;
}
- for (const scoped_refptr<TabletInfo>& tablet : tablets) {
- ASSERT_TRUE(table->RemoveTablet(
- tablet->metadata().state().pb.partition().partition_key_start()));
- }
+ table->AddRemoveTablets({}, tablets);
}
TEST(TestTSDescriptor, TestReplicaCreationsDecay) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/22835062/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 517082c..1a7f7b7 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -288,8 +288,8 @@ class TableLoader : public TableVisitor {
<< "Table already exists: " << table_id;
// Set up the table info.
- TableInfo *table = new TableInfo(table_id);
- TableMetadataLock l(table, TableMetadataLock::WRITE);
+ scoped_refptr<TableInfo> table = new TableInfo(table_id);
+ TableMetadataLock l(table.get(), TableMetadataLock::WRITE);
l.mutable_data()->pb.CopyFrom(metadata);
// Add the tablet to the IDs map and to the name map (if the table is not deleted).
@@ -340,8 +340,8 @@ class TabletLoader : public TabletVisitor {
}
// Set up the tablet info.
- TabletInfo* tablet = new TabletInfo(table, tablet_id);
- TabletMetadataLock l(tablet, TabletMetadataLock::WRITE);
+ scoped_refptr<TabletInfo> tablet = new TabletInfo(table, tablet_id);
+ TabletMetadataLock l(tablet.get(), TabletMetadataLock::WRITE);
l.mutable_data()->pb.CopyFrom(metadata);
// Add the tablet to the tablet manager.
@@ -351,7 +351,7 @@ class TabletLoader : public TabletVisitor {
bool is_deleted = l.mutable_data()->is_deleted();
l.Commit();
if (!is_deleted) {
- table->AddTablet(tablet);
+ table->AddRemoveTablets({ tablet }, {});
LOG(INFO) << Substitute("Loaded metadata for tablet $0 (table $1)",
tablet_id, table->ToString());
}
@@ -495,7 +495,7 @@ void CatalogManagerBgTasks::Run() {
}
} else if (l.leader_status().ok()) {
// Get list of tablets not yet running.
- std::vector<scoped_refptr<TabletInfo>> to_process;
+ vector<scoped_refptr<TabletInfo>> to_process;
catalog_manager_->ExtractTabletsToProcess(&to_process);
if (!to_process.empty()) {
@@ -1473,14 +1473,11 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
// d. Create the in-memory representation of the new table and its tablets.
// It's not yet in any global maps; that will happen in step g below.
table = CreateTableInfo(req, schema, partition_schema);
- vector<TabletInfo*> tablets;
- vector<scoped_refptr<TabletInfo>> tablet_refs;
+ vector<scoped_refptr<TabletInfo>> tablets;
for (const Partition& partition : partitions) {
PartitionPB partition_pb;
partition.ToPB(&partition_pb);
- scoped_refptr<TabletInfo> t = CreateTabletInfo(table.get(), partition_pb);
- tablets.push_back(t.get());
- tablet_refs.emplace_back(std::move(t));
+ tablets.emplace_back(CreateTabletInfo(table, partition_pb));
}
TRACE("Created new table and tablet info");
@@ -1489,14 +1486,14 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
// They will get committed at the end of this function.
// Sanity check: the tables and tablets should all be in "preparing" state.
CHECK_EQ(SysTablesEntryPB::PREPARING, table->metadata().dirty().pb.state());
- for (const TabletInfo *tablet : tablets) {
+ for (const auto& tablet : tablets) {
CHECK_EQ(SysTabletsEntryPB::PREPARING, tablet->metadata().dirty().pb.state());
}
table->mutable_metadata()->mutable_dirty()->pb.set_state(SysTablesEntryPB::RUNNING);
// e. Write table and tablets to sys-catalog.
SysCatalogTable::Actions actions;
- actions.table_to_add = table.get();
+ actions.table_to_add = table;
actions.tablets_to_add = tablets;
s = sys_catalog_->Write(actions);
if (!s.ok()) {
@@ -1511,10 +1508,10 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
// f. Commit the in-memory state.
table->mutable_metadata()->CommitMutation();
- for (TabletInfo *tablet : tablets) {
+ for (const auto& tablet : tablets) {
tablet->mutable_metadata()->CommitMutation();
}
- table->AddTablets(tablets);
+ table->AddRemoveTablets(tablets, {});
// g. Make the new table and tablets visible in the catalog.
{
@@ -1522,7 +1519,7 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
table_ids_map_[table->id()] = table;
table_names_map_[req.name()] = table;
- for (const auto& tablet : tablet_refs) {
+ for (const auto& tablet : tablets) {
InsertOrDie(&tablet_map_, tablet->tablet_id(), tablet);
}
}
@@ -1561,11 +1558,11 @@ Status CatalogManager::IsCreateTableDone(const IsCreateTableDoneRequestPB* req,
return Status::OK();
}
-TableInfo *CatalogManager::CreateTableInfo(const CreateTableRequestPB& req,
- const Schema& schema,
- const PartitionSchema& partition_schema) {
+scoped_refptr<TableInfo> CatalogManager::CreateTableInfo(const CreateTableRequestPB& req,
+ const Schema& schema,
+ const PartitionSchema& partition_schema) {
DCHECK(schema.has_column_ids());
- TableInfo* table = new TableInfo(GenerateId());
+ scoped_refptr<TableInfo> table = new TableInfo(GenerateId());
table->mutable_metadata()->StartMutation();
SysTablesEntryPB *metadata = &table->mutable_metadata()->mutable_dirty()->pb;
metadata->set_state(SysTablesEntryPB::PREPARING);
@@ -1580,7 +1577,7 @@ TableInfo *CatalogManager::CreateTableInfo(const CreateTableRequestPB& req,
return table;
}
-scoped_refptr<TabletInfo> CatalogManager::CreateTabletInfo(TableInfo* table,
+scoped_refptr<TabletInfo> CatalogManager::CreateTabletInfo(const scoped_refptr<TableInfo>& table,
const PartitionPB& partition) {
scoped_refptr<TabletInfo> tablet(new TabletInfo(table, GenerateId()));
tablet->mutable_metadata()->StartMutation();
@@ -1645,18 +1642,16 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB* req,
committer.AddTablets(tablets);
committer.LockTabletsForWriting();
- vector<TabletInfo*> tablets_raw;
for (const auto& t : committer) {
t->mutable_metadata()->mutable_dirty()->set_state(
SysTabletsEntryPB::DELETED, deletion_msg);
- tablets_raw.push_back(t.get());
}
// 3. Update sys-catalog with the removed table and tablet state.
TRACE("Removing table and tablets from system table");
SysCatalogTable::Actions actions;
- actions.table_to_update = table.get();
- actions.tablets_to_update = tablets_raw;
+ actions.table_to_update = table;
+ actions.tablets_to_update.assign(committer.begin(), committer.end());
Status s = sys_catalog_->Write(actions);
if (!s.ok()) {
s = s.CloneAndPrepend(Substitute("An error occurred while updating sys tables: $0",
@@ -1779,7 +1774,7 @@ Status CatalogManager::ApplyAlterSchemaSteps(const SysTablesEntryPB& current_pb,
Status CatalogManager::ApplyAlterPartitioningSteps(
const TableMetadataLock& l,
- TableInfo* table,
+ const scoped_refptr<TableInfo>& table,
const Schema& client_schema,
vector<AlterTableRequestPB::Step> steps,
vector<scoped_refptr<TabletInfo>>* tablets_to_add,
@@ -1790,8 +1785,8 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
PartitionSchema partition_schema;
RETURN_NOT_OK(PartitionSchema::FromPB(l.data().pb.partition_schema(), schema, &partition_schema));
- map<string, TabletInfo*> existing_tablets = table->tablet_map();
- map<string, scoped_refptr<TabletInfo>> new_tablets;
+ TableInfo::TabletInfoMap existing_tablets = table->tablet_map();
+ TableInfo::TabletInfoMap new_tablets;
for (const auto& step : steps) {
vector<DecodedRowOperation> ops;
@@ -1844,7 +1839,7 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
// existing_tablets.end(), if such a tablet does not exist).
auto existing_iter = existing_tablets.upper_bound(lower_bound);
if (existing_iter != existing_tablets.end()) {
- TabletMetadataLock metadata(existing_iter->second, TabletMetadataLock::READ);
+ TabletMetadataLock metadata(existing_iter->second.get(), TabletMetadataLock::READ);
if (upper_bound.empty() ||
metadata.data().pb.partition().partition_key_start() < upper_bound) {
return Status::InvalidArgument(
@@ -1853,7 +1848,8 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
}
}
if (existing_iter != existing_tablets.begin()) {
- TabletMetadataLock metadata(std::prev(existing_iter)->second, TabletMetadataLock::READ);
+ TabletMetadataLock metadata(std::prev(existing_iter)->second.get(),
+ TabletMetadataLock::READ);
if (metadata.data().pb.partition().partition_key_end().empty() ||
metadata.data().pb.partition().partition_key_end() > lower_bound) {
return Status::InvalidArgument(
@@ -1903,7 +1899,7 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
bool found_new = false;
if (existing_iter != existing_tablets.end()) {
- TabletMetadataLock metadata(existing_iter->second, TabletMetadataLock::READ);
+ TabletMetadataLock metadata(existing_iter->second.get(), TabletMetadataLock::READ);
const auto& partition = metadata.data().pb.partition();
found_existing = partition.partition_key_start() == lower_bound &&
partition.partition_key_end() == upper_bound;
@@ -2077,7 +2073,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
TRACE("Apply alter partitioning");
Schema client_schema;
RETURN_NOT_OK(SchemaFromPB(req->schema(), &client_schema));
- Status s = ApplyAlterPartitioningSteps(l, table.get(), client_schema, alter_partitioning_steps,
+ Status s = ApplyAlterPartitioningSteps(l, table, client_schema, alter_partitioning_steps,
&tablets_to_add, &tablets_to_drop);
if (!s.ok()) {
SetupError(resp->mutable_error(), MasterErrorPB::UNKNOWN_ERROR, s);
@@ -2127,11 +2123,9 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
SysCatalogTable::Actions actions;
if (!tablets_to_add.empty() || has_metadata_changes) {
// If anything modified the table's persistent metadata, then sync it to the sys catalog.
- actions.table_to_update = table.get();
- }
- for (const auto& tablet : tablets_to_add) {
- actions.tablets_to_add.push_back(tablet.get());
+ actions.table_to_update = table;
}
+ actions.tablets_to_add = tablets_to_add;
ScopedTabletInfoCommitter tablets_to_add_committer(ScopedTabletInfoCommitter::LOCKED);
ScopedTabletInfoCommitter tablets_to_drop_committer(ScopedTabletInfoCommitter::UNLOCKED);
@@ -2141,8 +2135,8 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
for (auto& tablet : tablets_to_drop) {
tablet->mutable_metadata()->mutable_dirty()->set_state(SysTabletsEntryPB::DELETED,
deletion_msg);
- actions.tablets_to_update.push_back(tablet.get());
}
+ actions.tablets_to_update = tablets_to_drop;
Status s = sys_catalog_->Write(actions);
if (!s.ok()) {
@@ -2315,15 +2309,13 @@ Status CatalogManager::GetTableInfo(const string& table_id, scoped_refptr<TableI
return Status::OK();
}
-Status CatalogManager::GetAllTables(std::vector<scoped_refptr<TableInfo>>* tables) {
+Status CatalogManager::GetAllTables(vector<scoped_refptr<TableInfo>>* tables) {
leader_lock_.AssertAcquiredForReading();
RETURN_NOT_OK(CheckOnline());
tables->clear();
shared_lock<LockType> l(lock_);
- for (const TableInfoMap::value_type& e : table_ids_map_) {
- tables->push_back(e.second);
- }
+ AppendValuesFromMap(table_ids_map_, tables);
return Status::OK();
}
@@ -2609,7 +2601,7 @@ Status CatalogManager::HandleReportedTablet(TSDescriptor* ts_desc,
// has not in fact changed since the previous version and avoid any
// unnecessary operations.
SysCatalogTable::Actions actions;
- actions.tablets_to_update.push_back(tablet.get());
+ actions.tablets_to_update.emplace_back(tablet);
Status s = sys_catalog_->Write(actions);
if (!s.ok()) {
LOG(ERROR) << Substitute("Error updating tablets: $0. Tablet report was: $1",
@@ -2624,7 +2616,7 @@ Status CatalogManager::HandleReportedTablet(TSDescriptor* ts_desc,
if (tablet_needs_alter) {
SendAlterTabletRequest(tablet);
} else if (report.has_schema_version()) {
- HandleTabletSchemaVersionReport(tablet.get(), report.schema_version());
+ HandleTabletSchemaVersionReport(tablet, report.schema_version());
}
return Status::OK();
@@ -3584,7 +3576,7 @@ void CatalogManager::ExtractTabletsToProcess(
tablet_lock.data().is_running()) {
continue;
}
- tablets_to_process->push_back(tablet);
+ tablets_to_process->emplace_back(tablet);
}
}
}
@@ -3635,23 +3627,23 @@ Status CatalogManager::DeleteTskEntries(const set<string>& entry_ids) {
}
struct DeferredAssignmentActions {
- vector<TabletInfo*> tablets_to_add;
- vector<TabletInfo*> tablets_to_update;
- vector<TabletInfo*> needs_create_rpc;
+ vector<scoped_refptr<TabletInfo>> tablets_to_add;
+ vector<scoped_refptr<TabletInfo>> tablets_to_update;
+ vector<scoped_refptr<TabletInfo>> needs_create_rpc;
};
-void CatalogManager::HandleAssignPreparingTablet(TabletInfo* tablet,
+void CatalogManager::HandleAssignPreparingTablet(const scoped_refptr<TabletInfo>& tablet,
DeferredAssignmentActions* deferred) {
// The tablet was just created (probably by a CreateTable RPC).
// Update the state to "creating" to be ready for the creation request.
tablet->mutable_metadata()->mutable_dirty()->set_state(
SysTabletsEntryPB::CREATING, "Sending initial creation of tablet");
- deferred->tablets_to_update.push_back(tablet);
- deferred->needs_create_rpc.push_back(tablet);
+ deferred->tablets_to_update.emplace_back(tablet);
+ deferred->needs_create_rpc.emplace_back(tablet);
VLOG(1) << "Assign new tablet " << tablet->ToString();
}
-void CatalogManager::HandleAssignCreatingTablet(TabletInfo* tablet,
+void CatalogManager::HandleAssignCreatingTablet(const scoped_refptr<TabletInfo>& tablet,
DeferredAssignmentActions* deferred,
vector<scoped_refptr<TabletInfo> >* new_tablets) {
MonoDelta time_since_updated =
@@ -3670,7 +3662,7 @@ void CatalogManager::HandleAssignCreatingTablet(TabletInfo* tablet,
// The "tablet creation" was already sent, but we didn't receive an answer
// within the timeout. So the tablet will be replaced by a new one.
- scoped_refptr<TabletInfo> replacement = CreateTabletInfo(tablet->table().get(),
+ scoped_refptr<TabletInfo> replacement = CreateTabletInfo(tablet->table(),
old_info.pb.partition());
LOG_WITH_PREFIX(WARNING) << Substitute("Tablet $0 was not created within the "
"allowed timeout. Replacing with a new tablet $1",
@@ -3687,9 +3679,9 @@ void CatalogManager::HandleAssignCreatingTablet(TabletInfo* tablet,
SysTabletsEntryPB::CREATING,
Substitute("Replacement for $0", tablet->tablet_id()));
- deferred->tablets_to_update.push_back(tablet);
- deferred->tablets_to_add.push_back(replacement.get());
- deferred->needs_create_rpc.push_back(replacement.get());
+ deferred->tablets_to_update.emplace_back(tablet);
+ deferred->tablets_to_add.emplace_back(replacement);
+ deferred->needs_create_rpc.emplace_back(replacement);
VLOG(1) << Substitute("Replaced tablet $0 with $1 (table $2)",
tablet->tablet_id(), replacement->tablet_id(),
tablet->table()->ToString());
@@ -3697,15 +3689,17 @@ void CatalogManager::HandleAssignCreatingTablet(TabletInfo* tablet,
new_tablets->emplace_back(std::move(replacement));
}
-// TODO: we could batch the IO onto a background thread.
-// but this is following the current HandleReportedTablet()
-Status CatalogManager::HandleTabletSchemaVersionReport(TabletInfo *tablet, uint32_t version) {
+// TODO(unknown): we could batch the IO onto a background thread.
+// but this is following the current HandleReportedTablet()
+Status CatalogManager::HandleTabletSchemaVersionReport(
+ const scoped_refptr<TabletInfo>& tablet,
+ uint32_t version) {
// Update the schema version if it's the latest
tablet->set_reported_schema_version(version);
// Verify if it's the last tablet report, and the alter completed.
- TableInfo *table = tablet->table().get();
- TableMetadataLock l(table, TableMetadataLock::WRITE);
+ const scoped_refptr<TableInfo>& table = tablet->table();
+ TableMetadataLock l(table.get(), TableMetadataLock::WRITE);
if (l.data().is_deleted() || l.data().pb.state() != SysTablesEntryPB::ALTERING) {
return Status::OK();
}
@@ -3756,18 +3750,18 @@ Status CatalogManager::ProcessPendingAssignments(
// Iterate over each of the tablets and handle it, whatever state
// it may be in. The actions required for the tablet are collected
// into 'deferred'.
- for (const scoped_refptr<TabletInfo>& tablet : tablets) {
+ for (const auto& tablet : tablets) {
SysTabletsEntryPB::State t_state = tablet->metadata().state().pb.state();
switch (t_state) {
case SysTabletsEntryPB::PREPARING:
- HandleAssignPreparingTablet(tablet.get(), &deferred);
+ HandleAssignPreparingTablet(tablet, &deferred);
break;
case SysTabletsEntryPB::CREATING:
{
vector<scoped_refptr<TabletInfo>> new_tablets;
- HandleAssignCreatingTablet(tablet.get(), &deferred, &new_tablets);
+ HandleAssignCreatingTablet(tablet, &deferred, &new_tablets);
unlocker_out.AddTablets(new_tablets);
break;
}
@@ -3790,7 +3784,7 @@ Status CatalogManager::ProcessPendingAssignments(
master_->ts_manager()->GetAllLiveDescriptors(&ts_descs);
Status s;
- for (TabletInfo *tablet : deferred.needs_create_rpc) {
+ for (const auto& tablet : deferred.needs_create_rpc) {
// NOTE: if we fail to select replicas on the first pass (due to
// insufficient Tablet Servers being online), we will still try
// again unless the tablet/table creation is cancelled.
@@ -3830,29 +3824,29 @@ Status CatalogManager::ProcessPendingAssignments(
{
std::lock_guard<LockType> l(lock_);
for (const auto& new_tablet : unlocker_out) {
- new_tablet->table()->AddTablet(new_tablet.get());
+ new_tablet->table()->AddRemoveTablets({ new_tablet }, {});
tablet_map_[new_tablet->tablet_id()] = new_tablet;
}
}
// Send DeleteTablet requests to tablet servers serving deleted tablets.
// This is asynchronous / non-blocking.
- for (TabletInfo* tablet : deferred.tablets_to_update) {
- TabletMetadataLock l(tablet, TabletMetadataLock::READ);
+ for (const auto& tablet : deferred.tablets_to_update) {
+ TabletMetadataLock l(tablet.get(), TabletMetadataLock::READ);
if (l.data().is_deleted()) {
SendDeleteTabletRequest(tablet, l, l.data().pb.state_msg());
}
}
// Send the CreateTablet() requests to the servers. This is asynchronous / non-blocking.
- for (TabletInfo* tablet : deferred.needs_create_rpc) {
- TabletMetadataLock l(tablet, TabletMetadataLock::READ);
+ for (const auto& tablet : deferred.needs_create_rpc) {
+ TabletMetadataLock l(tablet.get(), TabletMetadataLock::READ);
SendCreateTabletRequest(tablet, l);
}
return Status::OK();
}
Status CatalogManager::SelectReplicasForTablet(const TSDescriptorVector& ts_descs,
- TabletInfo* tablet) {
+ const scoped_refptr<TabletInfo>& tablet) {
TableMetadataLock table_guard(tablet->table().get(), TableMetadataLock::READ);
if (!table_guard.data().pb.IsInitialized()) {
@@ -4029,10 +4023,10 @@ Status CatalogManager::GetTableLocations(const GetTableLocationsRequestPB* req,
TableMetadataLock l(table.get(), TableMetadataLock::READ);
RETURN_NOT_OK(CheckIfTableDeletedOrNotRunning(&l, resp));
- vector<scoped_refptr<TabletInfo> > tablets_in_range;
+ vector<scoped_refptr<TabletInfo>> tablets_in_range;
table->GetTabletsInRange(req, &tablets_in_range);
- for (const scoped_refptr<TabletInfo>& tablet : tablets_in_range) {
+ for (const auto& tablet : tablets_in_range) {
Status s = BuildLocationsForTablet(tablet, resp->add_tablet_locations());
if (s.ok()) {
continue;
@@ -4076,11 +4070,11 @@ void CatalogManager::DumpState(std::ostream* out) const {
*out << "Tables:\n";
for (const TableInfoMap::value_type& e : ids_copy) {
- TableInfo* t = e.second.get();
- TableMetadataLock l(t, TableMetadataLock::READ);
+ const scoped_refptr<TableInfo>& table = e.second;
+ TableMetadataLock l(table.get(), TableMetadataLock::READ);
const string& name = l.data().name();
- *out << t->id() << ":\n";
+ *out << table->id() << ":\n";
*out << " name: \"" << strings::CHexEscape(name) << "\"\n";
// Erase from the map, so later we can check that we don't have
// any orphaned tables in the by-name map that aren't in the
@@ -4092,9 +4086,9 @@ void CatalogManager::DumpState(std::ostream* out) const {
*out << " tablets:\n";
- vector<scoped_refptr<TabletInfo> > table_tablets;
- t->GetAllTablets(&table_tablets);
- for (const scoped_refptr<TabletInfo>& tablet : table_tablets) {
+ vector<scoped_refptr<TabletInfo>> tablets;
+ table->GetAllTablets(&tablets);
+ for (const auto& tablet : tablets) {
TabletMetadataLock l_tablet(tablet.get(), TabletMetadataLock::READ);
*out << " " << tablet->tablet_id() << ": "
<< SecureShortDebugString(l_tablet.data().pb) << "\n";
@@ -4107,7 +4101,7 @@ void CatalogManager::DumpState(std::ostream* out) const {
if (!tablets_copy.empty()) {
*out << "Orphaned tablets (not referenced by any table):\n";
- for (const TabletInfoMap::value_type& entry : tablets_copy) {
+ for (const auto& entry : tablets_copy) {
const scoped_refptr<TabletInfo>& tablet = entry.second;
TabletMetadataLock l_tablet(tablet.get(), TabletMetadataLock::READ);
*out << " " << tablet->tablet_id() << ": "
@@ -4302,23 +4296,6 @@ string TableInfo::ToString() const {
return Substitute("$0 [id=$1]", l.data().pb.name(), table_id_);
}
-bool TableInfo::RemoveTablet(const string& partition_key_start) {
- std::lock_guard<rw_spinlock> l(lock_);
- return EraseKeyReturnValuePtr(&tablet_map_, partition_key_start) != nullptr;
-}
-
-void TableInfo::AddTablet(TabletInfo *tablet) {
- std::lock_guard<rw_spinlock> l(lock_);
- AddTabletUnlocked(tablet);
-}
-
-void TableInfo::AddTablets(const vector<TabletInfo*>& tablets) {
- std::lock_guard<rw_spinlock> l(lock_);
- for (TabletInfo *tablet : tablets) {
- AddTabletUnlocked(tablet);
- }
-}
-
void TableInfo::AddRemoveTablets(const vector<scoped_refptr<TabletInfo>>& tablets_to_add,
const vector<scoped_refptr<TabletInfo>>& tablets_to_drop) {
std::lock_guard<rw_spinlock> l(lock_);
@@ -4327,29 +4304,25 @@ void TableInfo::AddRemoveTablets(const vector<scoped_refptr<TabletInfo>>& tablet
CHECK(EraseKeyReturnValuePtr(&tablet_map_, lower_bound) != nullptr);
}
for (const auto& tablet : tablets_to_add) {
- AddTabletUnlocked(tablet.get());
- }
-}
-
-void TableInfo::AddTabletUnlocked(TabletInfo* tablet) {
- TabletInfo* old = nullptr;
- if (UpdateReturnCopy(&tablet_map_,
- tablet->metadata().state().pb.partition().partition_key_start(),
- tablet, &old)) {
- VLOG(1) << Substitute("Replaced tablet $0 with $1",
- old->tablet_id(), tablet->tablet_id());
- // TODO: can we assert that the replaced tablet is not in Running state?
- // May be a little tricky since we don't know whether to look at its committed or
- // uncommitted state.
+ TabletInfo* old = nullptr;
+ if (UpdateReturnCopy(&tablet_map_,
+ tablet->metadata().state().pb.partition().partition_key_start(),
+ tablet.get(), &old)) {
+ VLOG(1) << Substitute("Replaced tablet $0 with $1",
+ old->tablet_id(), tablet->tablet_id());
+ // TODO(unknown): can we assert that the replaced tablet is not in Running state?
+ // May be a little tricky since we don't know whether to look at its committed or
+ // uncommitted state.
+ }
}
}
void TableInfo::GetTabletsInRange(const GetTableLocationsRequestPB* req,
- vector<scoped_refptr<TabletInfo> > *ret) const {
+ vector<scoped_refptr<TabletInfo>>* ret) const {
shared_lock<rw_spinlock> l(lock_);
int max_returned_locations = req->max_returned_locations();
- TableInfo::TabletInfoMap::const_iterator it, it_end;
+ RawTabletInfoMap::const_iterator it, it_end;
if (req->has_partition_key_start()) {
it = tablet_map_.upper_bound(req->partition_key_start());
if (it != tablet_map_.begin()) {
@@ -4367,14 +4340,14 @@ void TableInfo::GetTabletsInRange(const GetTableLocationsRequestPB* req,
int count = 0;
for (; it != it_end && count < max_returned_locations; ++it) {
- ret->push_back(make_scoped_refptr(it->second));
+ ret->emplace_back(make_scoped_refptr(it->second));
count++;
}
}
bool TableInfo::IsAlterInProgress(uint32_t version) const {
shared_lock<rw_spinlock> l(lock_);
- for (const TableInfo::TabletInfoMap::value_type& e : tablet_map_) {
+ for (const auto& e : tablet_map_) {
if (e.second->reported_schema_version() < version) {
VLOG(3) << Substitute("Table $0 ALTER in progress due to tablet $1 "
"because reported schema $2 < expected $3", table_id_,
@@ -4387,7 +4360,7 @@ bool TableInfo::IsAlterInProgress(uint32_t version) const {
bool TableInfo::IsCreateInProgress() const {
shared_lock<rw_spinlock> l(lock_);
- for (const TableInfo::TabletInfoMap::value_type& e : tablet_map_) {
+ for (const auto& e : tablet_map_) {
TabletMetadataLock tablet_lock(e.second, TabletMetadataLock::READ);
if (!tablet_lock.data().is_running()) {
return true;
@@ -4447,7 +4420,7 @@ void TableInfo::GetAllTablets(vector<scoped_refptr<TabletInfo>>* ret) const {
ret->clear();
shared_lock<rw_spinlock> l(lock_);
for (const auto& e : tablet_map_) {
- ret->push_back(make_scoped_refptr(e.second));
+ ret->emplace_back(make_scoped_refptr(e.second));
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/22835062/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 14f65e9..24fd30a 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -26,6 +26,7 @@
#include <string>
#include <unordered_map>
#include <unordered_set>
+#include <utility>
#include <vector>
#include <glog/logging.h>
@@ -214,7 +215,7 @@ struct PersistentTableInfo {
class TableInfo : public RefCountedThreadSafe<TableInfo> {
public:
typedef PersistentTableInfo cow_state;
- typedef std::map<std::string, TabletInfo*> TabletInfoMap;
+ typedef std::map<std::string, scoped_refptr<TabletInfo>> TabletInfoMap;
explicit TableInfo(std::string table_id);
@@ -223,25 +224,16 @@ class TableInfo : public RefCountedThreadSafe<TableInfo> {
// Return the table's ID. Does not require synchronization.
const std::string& id() const { return table_id_; }
- // Add a tablet to this table.
- void AddTablet(TabletInfo *tablet);
- // Add multiple tablets to this table.
- void AddTablets(const std::vector<TabletInfo*>& tablets);
-
// Atomically add and remove multiple tablets from this table.
void AddRemoveTablets(const std::vector<scoped_refptr<TabletInfo>>& tablets_to_add,
const std::vector<scoped_refptr<TabletInfo>>& tablets_to_drop);
- // Return true if tablet with 'partition_key_start' has been
- // removed from 'tablet_map_' below.
- bool RemoveTablet(const std::string& partition_key_start);
-
// This only returns tablets which are in RUNNING state.
void GetTabletsInRange(const GetTableLocationsRequestPB* req,
- std::vector<scoped_refptr<TabletInfo> > *ret) const;
+ std::vector<scoped_refptr<TabletInfo>>* ret) const;
// Adds all tablets to the vector in partition key sorted order.
- void GetAllTablets(std::vector<scoped_refptr<TabletInfo> > *ret) const;
+ void GetAllTablets(std::vector<scoped_refptr<TabletInfo>>* ret) const;
// Access the persistent metadata. Typically you should use
// TableMetadataLock to gain access to this data.
@@ -265,7 +257,11 @@ class TableInfo : public RefCountedThreadSafe<TableInfo> {
// Returns a snapshot copy of the table info's tablet map.
TabletInfoMap tablet_map() const {
shared_lock<rw_spinlock> l(lock_);
- return tablet_map_;
+ TabletInfoMap ret;
+ for (const auto& e : tablet_map_) {
+ ret.emplace(e.first, make_scoped_refptr(e.second));
+ }
+ return ret;
}
// Returns the number of tablets.
@@ -283,8 +279,11 @@ class TableInfo : public RefCountedThreadSafe<TableInfo> {
const std::string table_id_;
// Sorted index of tablet start partition-keys to TabletInfo.
- // The TabletInfo objects are owned by the CatalogManager.
- TabletInfoMap tablet_map_;
+ //
+ // Every TabletInfo has a strong backpointer to its TableInfo, so these
+ // pointers must be raw.
+ typedef std::map<std::string, TabletInfo*> RawTabletInfoMap;
+ RawTabletInfoMap tablet_map_;
// Protects tablet_map_ and pending_tasks_
mutable rw_spinlock lock_;
@@ -639,14 +638,14 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
// Helper for creating the initial TableInfo state
// Leaves the table "write locked" with the new info in the
// "dirty" state field.
- TableInfo* CreateTableInfo(const CreateTableRequestPB& req,
- const Schema& schema,
- const PartitionSchema& partition_schema);
+ scoped_refptr<TableInfo> CreateTableInfo(const CreateTableRequestPB& req,
+ const Schema& schema,
+ const PartitionSchema& partition_schema);
// Helper for creating the initial TabletInfo state.
// Leaves the tablet "write locked" with the new info in the
// "dirty" state field.
- scoped_refptr<TabletInfo> CreateTabletInfo(TableInfo* table,
+ scoped_refptr<TabletInfo> CreateTabletInfo(const scoped_refptr<TableInfo>& table,
const PartitionPB& partition);
// Builds the TabletLocationsPB for a tablet based on the provided TabletInfo.
@@ -691,7 +690,7 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
ColumnId* next_col_id);
Status ApplyAlterPartitioningSteps(const TableMetadataLock& l,
- TableInfo* table,
+ const scoped_refptr<TableInfo>& table,
const Schema& client_schema,
std::vector<AlterTableRequestPB::Step> steps,
std::vector<scoped_refptr<TabletInfo>>* tablets_to_add,
@@ -707,7 +706,8 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
// servers to select the N replicas, return Status::InvalidArgument.
//
// This method is called by "ProcessPendingAssignments()".
- Status SelectReplicasForTablet(const TSDescriptorVector& ts_descs, TabletInfo* tablet);
+ Status SelectReplicasForTablet(const TSDescriptorVector& ts_descs,
+ const scoped_refptr<TabletInfo>& tablet);
// Select N Replicas from the online tablet servers
// and populate the consensus configuration object.
@@ -717,17 +717,17 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
int nreplicas,
consensus::RaftConfigPB *config);
- void HandleAssignPreparingTablet(TabletInfo* tablet,
+ void HandleAssignPreparingTablet(const scoped_refptr<TabletInfo>& tablet,
DeferredAssignmentActions* deferred);
// Assign tablets and send CreateTablet RPCs to tablet servers.
// The out param 'new_tablets' should have any newly-created TabletInfo
// objects appended to it.
- void HandleAssignCreatingTablet(TabletInfo* tablet,
+ void HandleAssignCreatingTablet(const scoped_refptr<TabletInfo>& tablet,
DeferredAssignmentActions* deferred,
- std::vector<scoped_refptr<TabletInfo> >* new_tablets);
+ std::vector<scoped_refptr<TabletInfo>>* new_tablets);
- Status HandleTabletSchemaVersionReport(TabletInfo *tablet,
+ Status HandleTabletSchemaVersionReport(const scoped_refptr<TabletInfo>& tablet,
uint32_t version);
// Send the "create tablet request" to all peers of a particular tablet.
http://git-wip-us.apache.org/repos/asf/kudu/blob/22835062/src/kudu/master/sys_catalog-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog-test.cc b/src/kudu/master/sys_catalog-test.cc
index 32ab82f..3bc959b 100644
--- a/src/kudu/master/sys_catalog-test.cc
+++ b/src/kudu/master/sys_catalog-test.cc
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include <algorithm>
#include <memory>
#include <string>
#include <vector>
@@ -95,29 +96,22 @@ class SysCatalogTest : public KuduTest {
class TestTableLoader : public TableVisitor {
public:
- TestTableLoader() {}
- ~TestTableLoader() { Reset(); }
-
void Reset() {
- for (TableInfo* ti : tables) {
- ti->Release();
- }
tables.clear();
}
- virtual Status VisitTable(const std::string& table_id,
+ virtual Status VisitTable(const string& table_id,
const SysTablesEntryPB& metadata) OVERRIDE {
// Setup the table info
- TableInfo *table = new TableInfo(table_id);
- TableMetadataLock l(table, TableMetadataLock::WRITE);
+ scoped_refptr<TableInfo> table = new TableInfo(table_id);
+ TableMetadataLock l(table.get(), TableMetadataLock::WRITE);
l.mutable_data()->pb.CopyFrom(metadata);
l.Commit();
- table->AddRef();
- tables.push_back(table);
+ tables.emplace_back(std::move(table));
return Status::OK();
}
- vector<TableInfo* > tables;
+ vector<scoped_refptr<TableInfo>> tables;
};
static bool PbEquals(const google::protobuf::Message& a, const google::protobuf::Message& b) {
@@ -125,9 +119,10 @@ static bool PbEquals(const google::protobuf::Message& a, const google::protobuf:
}
template<class C>
-static bool MetadatasEqual(C* ti_a, C* ti_b) {
- MetadataLock<C> l_a(ti_a, MetadataLock<C>::READ);
- MetadataLock<C> l_b(ti_a, MetadataLock<C>::READ);
+static bool MetadatasEqual(const scoped_refptr<C>& ti_a,
+ const scoped_refptr<C>& ti_b) {
+ MetadataLock<C> l_a(ti_a.get(), MetadataLock<C>::READ);
+ MetadataLock<C> l_b(ti_b.get(), MetadataLock<C>::READ);
return PbEquals(l_a.data().pb, l_b.data().pb);
}
@@ -158,7 +153,7 @@ TEST_F(SysCatalogTest, TestSysCatalogTablesOperations) {
loader.Reset();
ASSERT_OK(master_->catalog_manager()->sys_catalog()->VisitTables(&loader));
ASSERT_EQ(1, loader.tables.size());
- ASSERT_TRUE(MetadatasEqual(table.get(), loader.tables[0]));
+ ASSERT_TRUE(MetadatasEqual(table, loader.tables[0]));
// Update the table
{
@@ -174,7 +169,7 @@ TEST_F(SysCatalogTest, TestSysCatalogTablesOperations) {
loader.Reset();
ASSERT_OK(master_->catalog_manager()->sys_catalog()->VisitTables(&loader));
ASSERT_EQ(1, loader.tables.size());
- ASSERT_TRUE(MetadatasEqual(table.get(), loader.tables[0]));
+ ASSERT_TRUE(MetadatasEqual(table, loader.tables[0]));
// Delete the table
loader.Reset();
@@ -224,40 +219,34 @@ TEST_F(SysCatalogTest, TestTableInfoCommit) {
class TestTabletLoader : public TabletVisitor {
public:
- TestTabletLoader() {}
- ~TestTabletLoader() { Reset(); }
-
void Reset() {
- for (TabletInfo* ti : tablets) {
- ti->Release();
- }
tablets.clear();
}
- virtual Status VisitTablet(const std::string& table_id,
- const std::string& tablet_id,
+ virtual Status VisitTablet(const string& /*table_id*/,
+ const string& tablet_id,
const SysTabletsEntryPB& metadata) OVERRIDE {
// Setup the tablet info
- TabletInfo *tablet = new TabletInfo(nullptr, tablet_id);
- TabletMetadataLock l(tablet, TabletMetadataLock::WRITE);
+ scoped_refptr<TabletInfo> tablet = new TabletInfo(nullptr, tablet_id);
+ TabletMetadataLock l(tablet.get(), TabletMetadataLock::WRITE);
l.mutable_data()->pb.CopyFrom(metadata);
l.Commit();
- tablet->AddRef();
- tablets.push_back(tablet);
+ tablets.emplace_back(std::move(tablet));
return Status::OK();
}
- vector<TabletInfo *> tablets;
+ vector<scoped_refptr<TabletInfo>> tablets;
};
// Create a new TabletInfo. The object is in uncommitted
// state.
-static TabletInfo *CreateTablet(TableInfo *table,
- const string& tablet_id,
- const string& start_key,
- const string& end_key) {
- TabletInfo *tablet = new TabletInfo(table, tablet_id);
- TabletMetadataLock l(tablet, TabletMetadataLock::WRITE);
+static scoped_refptr<TabletInfo> CreateTablet(
+ const scoped_refptr<TableInfo>& table,
+ const string& tablet_id,
+ const string& start_key,
+ const string& end_key) {
+ scoped_refptr<TabletInfo> tablet = new TabletInfo(table, tablet_id);
+ TabletMetadataLock l(tablet.get(), TabletMetadataLock::WRITE);
l.mutable_data()->pb.set_state(SysTabletsEntryPB::PREPARING);
l.mutable_data()->pb.mutable_partition()->set_partition_key_start(start_key);
l.mutable_data()->pb.mutable_partition()->set_partition_key_end(end_key);
@@ -270,9 +259,9 @@ static TabletInfo *CreateTablet(TableInfo *table,
// visit)
TEST_F(SysCatalogTest, TestSysCatalogTabletsOperations) {
scoped_refptr<TableInfo> table(new TableInfo("abc"));
- scoped_refptr<TabletInfo> tablet1(CreateTablet(table.get(), "123", "a", "b"));
- scoped_refptr<TabletInfo> tablet2(CreateTablet(table.get(), "456", "b", "c"));
- scoped_refptr<TabletInfo> tablet3(CreateTablet(table.get(), "789", "c", "d"));
+ scoped_refptr<TabletInfo> tablet1(CreateTablet(table, "123", "a", "b"));
+ scoped_refptr<TabletInfo> tablet2(CreateTablet(table, "456", "b", "c"));
+ scoped_refptr<TabletInfo> tablet3(CreateTablet(table, "789", "c", "d"));
SysCatalogTable* sys_catalog = master_->catalog_manager()->sys_catalog();
@@ -282,54 +271,40 @@ TEST_F(SysCatalogTest, TestSysCatalogTabletsOperations) {
// Add tablet1 and tablet2
{
- std::vector<TabletInfo*> tablets;
- tablets.push_back(tablet1.get());
- tablets.push_back(tablet2.get());
-
loader.Reset();
TabletMetadataLock l1(tablet1.get(), TabletMetadataLock::WRITE);
TabletMetadataLock l2(tablet2.get(), TabletMetadataLock::WRITE);
SysCatalogTable::Actions actions;
- actions.tablets_to_add = tablets;
+ actions.tablets_to_add = { tablet1, tablet2 };
ASSERT_OK(sys_catalog->Write(actions));
l1.Commit();
l2.Commit();
ASSERT_OK(sys_catalog->VisitTablets(&loader));
ASSERT_EQ(2, loader.tablets.size());
- ASSERT_TRUE(MetadatasEqual(tablet1.get(), loader.tablets[0]));
- ASSERT_TRUE(MetadatasEqual(tablet2.get(), loader.tablets[1]));
+ ASSERT_TRUE(MetadatasEqual(tablet1, loader.tablets[0]));
+ ASSERT_TRUE(MetadatasEqual(tablet2, loader.tablets[1]));
}
// Update tablet1
{
- std::vector<TabletInfo*> tablets;
- tablets.push_back(tablet1.get());
-
TabletMetadataLock l1(tablet1.get(), TabletMetadataLock::WRITE);
l1.mutable_data()->pb.set_state(SysTabletsEntryPB::RUNNING);
SysCatalogTable::Actions actions;
- actions.tablets_to_update = tablets;
+ actions.tablets_to_update = { tablet1 };
ASSERT_OK(sys_catalog->Write(actions));
l1.Commit();
loader.Reset();
ASSERT_OK(sys_catalog->VisitTablets(&loader));
ASSERT_EQ(2, loader.tablets.size());
- ASSERT_TRUE(MetadatasEqual(tablet1.get(), loader.tablets[0]));
- ASSERT_TRUE(MetadatasEqual(tablet2.get(), loader.tablets[1]));
+ ASSERT_TRUE(MetadatasEqual(tablet1, loader.tablets[0]));
+ ASSERT_TRUE(MetadatasEqual(tablet2, loader.tablets[1]));
}
// Add tablet3 and Update tablet1 and tablet2
{
- std::vector<TabletInfo *> to_add;
- std::vector<TabletInfo *> to_update;
-
TabletMetadataLock l3(tablet3.get(), TabletMetadataLock::WRITE);
- to_add.push_back(tablet3.get());
- to_update.push_back(tablet1.get());
- to_update.push_back(tablet2.get());
-
TabletMetadataLock l1(tablet1.get(), TabletMetadataLock::WRITE);
l1.mutable_data()->pb.set_state(SysTabletsEntryPB::REPLACED);
TabletMetadataLock l2(tablet2.get(), TabletMetadataLock::WRITE);
@@ -337,8 +312,8 @@ TEST_F(SysCatalogTest, TestSysCatalogTabletsOperations) {
loader.Reset();
SysCatalogTable::Actions actions;
- actions.tablets_to_add = to_add;
- actions.tablets_to_update = to_update;
+ actions.tablets_to_add = { tablet3 };
+ actions.tablets_to_update = { tablet1, tablet2 };
ASSERT_OK(sys_catalog->Write(actions));
l1.Commit();
@@ -347,24 +322,20 @@ TEST_F(SysCatalogTest, TestSysCatalogTabletsOperations) {
ASSERT_OK(sys_catalog->VisitTablets(&loader));
ASSERT_EQ(3, loader.tablets.size());
- ASSERT_TRUE(MetadatasEqual(tablet1.get(), loader.tablets[0]));
- ASSERT_TRUE(MetadatasEqual(tablet2.get(), loader.tablets[1]));
- ASSERT_TRUE(MetadatasEqual(tablet3.get(), loader.tablets[2]));
+ ASSERT_TRUE(MetadatasEqual(tablet1, loader.tablets[0]));
+ ASSERT_TRUE(MetadatasEqual(tablet2, loader.tablets[1]));
+ ASSERT_TRUE(MetadatasEqual(tablet3, loader.tablets[2]));
}
// Delete tablet1 and tablet3 tablets
{
- std::vector<TabletInfo*> tablets;
- tablets.push_back(tablet1.get());
- tablets.push_back(tablet3.get());
-
loader.Reset();
SysCatalogTable::Actions actions;
- actions.tablets_to_delete = tablets;
+ actions.tablets_to_delete = { tablet1, tablet3 };
ASSERT_OK(master_->catalog_manager()->sys_catalog()->Write(actions));
ASSERT_OK(master_->catalog_manager()->sys_catalog()->VisitTablets(&loader));
ASSERT_EQ(1, loader.tablets.size());
- ASSERT_TRUE(MetadatasEqual(tablet2.get(), loader.tablets[0]));
+ ASSERT_TRUE(MetadatasEqual(tablet2, loader.tablets[0]));
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/22835062/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 0e3a9f1..b6593b8 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -44,6 +44,7 @@
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus_meta.h"
#include "kudu/consensus/consensus_meta_manager.h"
#include "kudu/consensus/consensus_peers.h"
@@ -508,7 +509,8 @@ Status SysCatalogTable::Write(const Actions& actions) {
// Table related methods
// ==================================================================
-void SysCatalogTable::ReqAddTable(WriteRequestPB* req, const TableInfo* table) {
+void SysCatalogTable::ReqAddTable(WriteRequestPB* req,
+ const scoped_refptr<TableInfo>& table) {
VLOG(2) << "Adding table " << table->id() << " in catalog: " <<
SecureShortDebugString(table->metadata().dirty().pb);
@@ -523,7 +525,8 @@ void SysCatalogTable::ReqAddTable(WriteRequestPB* req, const TableInfo* table) {
enc.Add(RowOperationsPB::INSERT, row);
}
-void SysCatalogTable::ReqUpdateTable(WriteRequestPB* req, const TableInfo* table) {
+void SysCatalogTable::ReqUpdateTable(WriteRequestPB* req,
+ const scoped_refptr<TableInfo>& table) {
string diff;
if (ArePBsEqual(table->metadata().state().pb,
table->metadata().dirty().pb,
@@ -544,7 +547,8 @@ void SysCatalogTable::ReqUpdateTable(WriteRequestPB* req, const TableInfo* table
enc.Add(RowOperationsPB::UPDATE, row);
}
-void SysCatalogTable::ReqDeleteTable(WriteRequestPB* req, const TableInfo* table) {
+void SysCatalogTable::ReqDeleteTable(WriteRequestPB* req,
+ const scoped_refptr<TableInfo>& table) {
KuduPartialRow row(&schema_);
CHECK_OK(row.SetInt8(kSysCatalogTableColType, TABLES_ENTRY));
CHECK_OK(row.SetStringNoCopy(kSysCatalogTableColId, table->id()));
@@ -726,11 +730,11 @@ Status SysCatalogTable::RemoveTskEntries(const set<string>& entry_ids) {
// ==================================================================
void SysCatalogTable::ReqAddTablets(WriteRequestPB* req,
- const vector<TabletInfo*>& tablets) {
+ const vector<scoped_refptr<TabletInfo>>& tablets) {
faststring metadata_buf;
KuduPartialRow row(&schema_);
RowOperationsPBEncoder enc(req->mutable_row_operations());
- for (auto tablet : tablets) {
+ for (const auto& tablet : tablets) {
VLOG(2) << "Adding tablet " << tablet->tablet_id() << " in catalog: "
<< SecureShortDebugString(tablet->metadata().dirty().pb);
pb_util::SerializeToString(tablet->metadata().dirty().pb, &metadata_buf);
@@ -742,11 +746,11 @@ void SysCatalogTable::ReqAddTablets(WriteRequestPB* req,
}
void SysCatalogTable::ReqUpdateTablets(WriteRequestPB* req,
- const vector<TabletInfo*>& tablets) {
+ const vector<scoped_refptr<TabletInfo>>& tablets) {
faststring metadata_buf;
KuduPartialRow row(&schema_);
RowOperationsPBEncoder enc(req->mutable_row_operations());
- for (auto tablet : tablets) {
+ for (const auto& tablet : tablets) {
string diff;
if (ArePBsEqual(tablet->metadata().state().pb,
tablet->metadata().dirty().pb,
@@ -765,10 +769,10 @@ void SysCatalogTable::ReqUpdateTablets(WriteRequestPB* req,
}
void SysCatalogTable::ReqDeleteTablets(WriteRequestPB* req,
- const vector<TabletInfo*>& tablets) {
+ const vector<scoped_refptr<TabletInfo>>& tablets) {
KuduPartialRow row(&schema_);
RowOperationsPBEncoder enc(req->mutable_row_operations());
- for (auto tablet : tablets) {
+ for (const auto& tablet : tablets) {
CHECK_OK(row.SetInt8(kSysCatalogTableColType, TABLETS_ENTRY));
CHECK_OK(row.SetStringNoCopy(kSysCatalogTableColId, tablet->tablet_id()));
enc.Add(RowOperationsPB::DELETE, row);
http://git-wip-us.apache.org/repos/asf/kudu/blob/22835062/src/kudu/master/sys_catalog.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.h b/src/kudu/master/sys_catalog.h
index efdef38..5ea5493 100644
--- a/src/kudu/master/sys_catalog.h
+++ b/src/kudu/master/sys_catalog.h
@@ -26,11 +26,11 @@
#include <gtest/gtest_prod.h>
#include "kudu/common/schema.h"
-#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/gutil/callback.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
+#include "kudu/master/catalog_manager.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/util/status.h"
@@ -60,8 +60,6 @@ class SysCertAuthorityEntryPB;
class SysTablesEntryPB;
class SysTabletsEntryPB;
class SysTskEntryPB;
-class TableInfo;
-class TabletInfo;
struct MasterOptions;
// The SysCatalogTable has two separate visitors because the tables
@@ -148,12 +146,12 @@ class SysCatalogTable {
struct Actions {
Actions();
- TableInfo* table_to_add;
- TableInfo* table_to_update;
- TableInfo* table_to_delete;
- std::vector<TabletInfo*> tablets_to_add;
- std::vector<TabletInfo*> tablets_to_update;
- std::vector<TabletInfo*> tablets_to_delete;
+ scoped_refptr<TableInfo> table_to_add;
+ scoped_refptr<TableInfo> table_to_update;
+ scoped_refptr<TableInfo> table_to_delete;
+ std::vector<scoped_refptr<TabletInfo>> tablets_to_add;
+ std::vector<scoped_refptr<TabletInfo>> tablets_to_update;
+ std::vector<scoped_refptr<TabletInfo>> tablets_to_delete;
};
Status Write(const Actions& actions);
@@ -237,25 +235,23 @@ class SysCatalogTable {
// Tablet related private methods.
- // Add dirty tablet data to the given row operations.
- Status AddTabletsToPB(const std::vector<TabletInfo*>& tablets,
- RowOperationsPB::Type op_type,
- RowOperationsPB* ops) const;
-
// Initializes the RaftPeerPB for the local peer.
// Crashes due to an invariant check if the rpc server is not running.
void InitLocalRaftPeerPB();
// Add an operation to a write adding/updating/deleting a table or tablet.
- void ReqAddTable(tserver::WriteRequestPB* req, const TableInfo* table);
- void ReqUpdateTable(tserver::WriteRequestPB* req, const TableInfo* table);
- void ReqDeleteTable(tserver::WriteRequestPB* req, const TableInfo* table);
+ void ReqAddTable(tserver::WriteRequestPB* req,
+ const scoped_refptr<TableInfo>& table);
+ void ReqUpdateTable(tserver::WriteRequestPB* req,
+ const scoped_refptr<TableInfo>& table);
+ void ReqDeleteTable(tserver::WriteRequestPB* req,
+ const scoped_refptr<TableInfo>& table);
void ReqAddTablets(tserver::WriteRequestPB* req,
- const std::vector<TabletInfo*>& tablets);
+ const std::vector<scoped_refptr<TabletInfo>>& tablets);
void ReqUpdateTablets(tserver::WriteRequestPB* req,
- const std::vector<TabletInfo*>& tablets);
+ const std::vector<scoped_refptr<TabletInfo>>& tablets);
void ReqDeleteTablets(tserver::WriteRequestPB* req,
- const std::vector<TabletInfo*>& tablets);
+ const std::vector<scoped_refptr<TabletInfo>>& tablets);
static std::string TskSeqNumberToEntryId(int64_t seq_number);