You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/01/09 06:22:08 UTC
[4/4] kudu git commit: [fs]: wrapping up containers in scoped_refptr
[fs]: wrapping up containers in scoped_refptr
It's necessary to wrap up containers in scoped_refptr
to support deleting the full containers that are dead
at runtime. Based on this, the KUDU-2636 can be fixed.
Change-Id: I3c5c620014782b3d57dcbe047d0df73c949590c7
Reviewed-on: http://gerrit.cloudera.org:8080/12121
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b63d9219
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b63d9219
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b63d9219
Branch: refs/heads/master
Commit: b63d9219eb6c20d76f61f072af958940891b5c48
Parents: 1147921
Author: helifu <hz...@corp.netease.com>
Authored: Sat Dec 22 22:03:20 2018 +0800
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Jan 9 06:21:36 2019 +0000
----------------------------------------------------------------------
src/kudu/fs/log_block_manager-test.cc | 6 +-
src/kudu/fs/log_block_manager.cc | 224 ++++++++++++++---------------
src/kudu/fs/log_block_manager.h | 40 +++---
3 files changed, 129 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/b63d9219/src/kudu/fs/log_block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index 8c755f0..be997c3 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -615,7 +615,7 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
// Start corrupting the metadata file in different ways.
string path = LogBlockManager::ContainerPathForTests(
- bm_->all_containers_by_name_.begin()->second);
+ bm_->all_containers_by_name_.begin()->second.get());
string metadata_path = path + LogBlockManager::kContainerMetadataFileSuffix;
string data_path = path + LogBlockManager::kContainerDataFileSuffix;
@@ -1704,7 +1704,7 @@ TEST_F(LogBlockManagerTest, TestLIFOContainerSelection) {
// Create some other blocks, and finalize each block after write.
// The first available container in the queue will be reused every time.
internal::LogBlockContainer* container =
- bm_->available_containers_by_data_dir_.begin()->second.front();
+ bm_->available_containers_by_data_dir_.begin()->second.front().get();
for (int i = 0; i < 4; i++) {
unique_ptr<WritableBlock> writer;
ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
@@ -1713,7 +1713,7 @@ TEST_F(LogBlockManagerTest, TestLIFOContainerSelection) {
// After finalizing the written block, the used container will be
// available again and can be reused for the following created block.
ASSERT_EQ(container,
- bm_->available_containers_by_data_dir_.begin()->second.front());
+ bm_->available_containers_by_data_dir_.begin()->second.front().get());
blocks.emplace_back(std::move(writer));
}
for (const auto& block : blocks) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/b63d9219/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index f1c66f5..7f1669f 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -49,7 +49,6 @@
#include "kudu/gutil/casts.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
-#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/numbers.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/strip.h"
@@ -213,12 +212,12 @@ LogBlockManagerMetrics::LogBlockManagerMetrics(const scoped_refptr<MetricEntity>
// the simpler RefCounted).
class LogBlock : public RefCountedThreadSafe<LogBlock> {
public:
- LogBlock(LogBlockContainer* container, BlockId block_id, int64_t offset,
+ LogBlock(LogBlockContainerRefPtr container, BlockId block_id, int64_t offset,
int64_t length);
~LogBlock() = default;
const BlockId& block_id() const { return block_id_; }
- LogBlockContainer* container() const { return container_; }
+ LogBlockContainer* container() const { return container_.get(); }
int64_t offset() const { return offset_; }
int64_t length() const { return length_; }
@@ -230,8 +229,8 @@ class LogBlock : public RefCountedThreadSafe<LogBlock> {
void RegisterDeletion(const shared_ptr<LogBlockDeletionTransaction>& transaction);
private:
- // The owning container. Must outlive the LogBlock.
- LogBlockContainer* container_;
+ // The owning container.
+ LogBlockContainerRefPtr container_;
// The block identifier.
const BlockId block_id_;
@@ -258,7 +257,7 @@ class LogBlock : public RefCountedThreadSafe<LogBlock> {
// persisted.
class LogWritableBlock : public WritableBlock {
public:
- LogWritableBlock(LogBlockContainer* container, BlockId block_id,
+ LogWritableBlock(LogBlockContainerRefPtr container, BlockId block_id,
int64_t block_offset);
virtual ~LogWritableBlock();
@@ -296,11 +295,11 @@ class LogWritableBlock : public WritableBlock {
// Does not synchronize the written data; that takes place in Close().
Status AppendMetadata();
- LogBlockContainer* container() const { return container_; }
+ LogBlockContainer* container() const { return container_.get(); }
private:
- // The owning container. Must outlive the block.
- LogBlockContainer* container_;
+ // The owning container.
+ LogBlockContainerRefPtr container_;
// The block's identifier.
const BlockId block_id_;
@@ -331,7 +330,10 @@ class LogWritableBlock : public WritableBlock {
// is logically thread unsafe (i.e. multiple clients calling WriteData()
// concurrently will produce nonsensical container data). Thread unsafe
// functions are marked explicitly.
-class LogBlockContainer {
+//
+// Block containers are reference counted so that they can be safely removed
+// despite concurrent access.
+class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> {
public:
enum SyncMode {
SYNC,
@@ -343,7 +345,7 @@ class LogBlockContainer {
// Creates a new block container in 'dir'.
static Status Create(LogBlockManager* block_manager,
DataDir* dir,
- unique_ptr<LogBlockContainer>* container);
+ LogBlockContainerRefPtr* container);
// Opens an existing block container in 'dir'.
//
@@ -357,8 +359,8 @@ class LogBlockContainer {
static Status Open(LogBlockManager* block_manager,
DataDir* dir,
FsReport* report,
- const std::string& id,
- unique_ptr<LogBlockContainer>* container);
+ const string& id,
+ LogBlockContainerRefPtr* container);
// Closes a set of blocks belonging to this container, possibly synchronizing
// the dirty data and metadata to disk.
@@ -455,11 +457,11 @@ class LogBlockContainer {
FsReport* report,
LogBlockManager::UntrackedBlockMap* live_blocks,
LogBlockManager::BlockRecordMap* live_block_records,
- std::vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
+ vector<LogBlockRefPtr>* dead_blocks,
uint64_t* max_block_id);
// Updates internal bookkeeping state to reflect the creation of a block.
- void BlockCreated(const scoped_refptr<LogBlock>& block);
+ void BlockCreated(const LogBlockRefPtr& block);
// Updates internal bookkeeping state to reflect the deletion of a block.
//
@@ -467,7 +469,7 @@ class LogBlockContainer {
// with creations.
//
// Note: the container is not made "unfull"; containers remain sparse until deleted.
- void BlockDeleted(const scoped_refptr<LogBlock>& block);
+ void BlockDeleted(const LogBlockRefPtr& block);
// Finalizes a fully written block. It updates the container data file's position,
// truncates the container if full and marks the container as available.
@@ -540,7 +542,7 @@ class LogBlockContainer {
FsReport* report,
LogBlockManager::UntrackedBlockMap* live_blocks,
LogBlockManager::BlockRecordMap* live_block_records,
- std::vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
+ vector<LogBlockRefPtr>* dead_blocks,
uint64_t* data_file_size,
uint64_t* max_block_id);
@@ -632,7 +634,7 @@ void LogBlockContainer::HandleError(const Status& s) const {
Status LogBlockContainer::Create(LogBlockManager* block_manager,
DataDir* dir,
- unique_ptr<LogBlockContainer>* container) {
+ LogBlockContainerRefPtr* container) {
string common_path;
string metadata_path;
string data_path;
@@ -702,7 +704,7 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
DataDir* dir,
FsReport* report,
const string& id,
- unique_ptr<LogBlockContainer>* container) {
+ LogBlockContainerRefPtr* container) {
Env* env = block_manager->env();
string common_path = JoinPathSegments(dir->dir(), id);
string metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix);
@@ -751,10 +753,10 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(data_file->Size(&data_file_size));
// Create the in-memory container and populate it.
- unique_ptr<LogBlockContainer> open_container(new LogBlockContainer(block_manager,
- dir,
- std::move(metadata_pb_writer),
- std::move(data_file)));
+ LogBlockContainerRefPtr open_container(new LogBlockContainer(block_manager,
+ dir,
+ std::move(metadata_pb_writer),
+ std::move(data_file)));
open_container->preallocated_offset_ = data_file_size;
VLOG(1) << "Opened log block container " << open_container->ToString();
container->swap(open_container);
@@ -776,7 +778,7 @@ Status LogBlockContainer::ProcessRecords(
FsReport* report,
LogBlockManager::UntrackedBlockMap* live_blocks,
LogBlockManager::BlockRecordMap* live_block_records,
- vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
+ vector<LogBlockRefPtr>* dead_blocks,
uint64_t* max_block_id) {
string metadata_path = metadata_file_->filename();
unique_ptr<RandomAccessFile> metadata_reader;
@@ -822,11 +824,11 @@ Status LogBlockContainer::ProcessRecord(
FsReport* report,
LogBlockManager::UntrackedBlockMap* live_blocks,
LogBlockManager::BlockRecordMap* live_block_records,
- vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
+ vector<LogBlockRefPtr>* dead_blocks,
uint64_t* data_file_size,
uint64_t* max_block_id) {
const BlockId block_id(BlockId::FromPB(record->block_id()));
- scoped_refptr<LogBlock> lb;
+ LogBlockRefPtr lb;
switch (record->op_type()) {
case CREATE:
// First verify that the record's offset/length aren't wildly incorrect.
@@ -1119,7 +1121,7 @@ void LogBlockContainer::UpdateNextBlockOffset(int64_t block_offset, int64_t bloc
}
}
-void LogBlockContainer::BlockCreated(const scoped_refptr<LogBlock>& block) {
+void LogBlockContainer::BlockCreated(const LogBlockRefPtr& block) {
DCHECK_GE(block->offset(), 0);
total_bytes_.IncrementBy(block->fs_aligned_length());
@@ -1129,7 +1131,7 @@ void LogBlockContainer::BlockCreated(const scoped_refptr<LogBlock>& block) {
live_blocks_.Increment();
}
-void LogBlockContainer::BlockDeleted(const scoped_refptr<LogBlock>& block) {
+void LogBlockContainer::BlockDeleted(const LogBlockRefPtr& block) {
DCHECK_GE(block->offset(), 0);
live_bytes_.IncrementBy(-block->length());
@@ -1174,16 +1176,16 @@ class LogBlockCreationTransaction : public BlockCreationTransaction {
virtual ~LogBlockCreationTransaction() = default;
- virtual void AddCreatedBlock(std::unique_ptr<WritableBlock> block) override;
+ virtual void AddCreatedBlock(unique_ptr<WritableBlock> block) override;
virtual Status CommitCreatedBlocks() override;
private:
- std::vector<std::unique_ptr<LogWritableBlock>> created_blocks_;
+ vector<unique_ptr<LogWritableBlock>> created_blocks_;
};
void LogBlockCreationTransaction::AddCreatedBlock(
- std::unique_ptr<WritableBlock> block) {
+ unique_ptr<WritableBlock> block) {
LogWritableBlock* lwb = down_cast<LogWritableBlock*>(block.release());
created_blocks_.emplace_back(unique_ptr<LogWritableBlock>(lwb));
}
@@ -1234,23 +1236,25 @@ class LogBlockDeletionTransaction : public BlockDeletionTransaction,
virtual void AddDeletedBlock(BlockId block) override;
- virtual Status CommitDeletedBlocks(std::vector<BlockId>* deleted) override;
+ virtual Status CommitDeletedBlocks(vector<BlockId>* deleted) override;
// Add the given block that needs to be deleted to 'deleted_interval_map_',
// which keeps track of container and the range to be hole punched.
- void AddBlock(const scoped_refptr<internal::LogBlock>& lb);
+ void AddBlock(const LogBlockRefPtr& lb);
private:
// Block <offset, offset + length> pair.
typedef std::pair<int64_t, int64_t> BlockInterval;
// Map used to aggregate BlockInterval instances across containers.
- std::unordered_map<internal::LogBlockContainer*,
- std::vector<BlockInterval>> deleted_interval_map_;
+ unordered_map<LogBlockContainerRefPtr,
+ vector<BlockInterval>,
+ ScopedRefPtrHashFunctor<LogBlockContainer>,
+ ScopedRefPtrEqualToFunctor<LogBlockContainer>> deleted_interval_map_;
// The owning LogBlockManager. Must outlive the LogBlockDeletionTransaction.
LogBlockManager* lbm_;
- std::vector<BlockId> deleted_blocks_;
+ vector<BlockId> deleted_blocks_;
DISALLOW_COPY_AND_ASSIGN(LogBlockDeletionTransaction);
};
@@ -1260,25 +1264,25 @@ void LogBlockDeletionTransaction::AddDeletedBlock(BlockId block) {
LogBlockDeletionTransaction::~LogBlockDeletionTransaction() {
for (auto& entry : deleted_interval_map_) {
- LogBlockContainer* container = entry.first;
+ LogBlockContainer* container = entry.first.get();
CHECK_OK_PREPEND(CoalesceIntervals<int64_t>(&entry.second),
Substitute("could not coalesce hole punching for container: $0",
container->ToString()));
for (const auto& interval : entry.second) {
container->ExecClosure(Bind(&LogBlockContainer::ContainerDeletionAsync,
- Unretained(container),
+ container,
interval.first,
interval.second - interval.first));
}
}
}
-Status LogBlockDeletionTransaction::CommitDeletedBlocks(std::vector<BlockId>* deleted) {
+Status LogBlockDeletionTransaction::CommitDeletedBlocks(vector<BlockId>* deleted) {
deleted->clear();
shared_ptr<LogBlockDeletionTransaction> transaction = shared_from_this();
- vector<scoped_refptr<LogBlock>> log_blocks;
+ vector<LogBlockRefPtr> log_blocks;
Status first_failure = lbm_->RemoveLogBlocks(deleted_blocks_, &log_blocks, deleted);
for (const auto& lb : log_blocks) {
// Register the block to be hole punched if metadata recording
@@ -1299,7 +1303,7 @@ Status LogBlockDeletionTransaction::CommitDeletedBlocks(std::vector<BlockId>* de
return first_failure;
}
-void LogBlockDeletionTransaction::AddBlock(const scoped_refptr<internal::LogBlock>& lb) {
+void LogBlockDeletionTransaction::AddBlock(const LogBlockRefPtr& lb) {
DCHECK_GE(lb->fs_aligned_length(), 0);
BlockInterval block_interval(lb->offset(),
@@ -1311,9 +1315,9 @@ void LogBlockDeletionTransaction::AddBlock(const scoped_refptr<internal::LogBloc
// LogBlock (definition)
////////////////////////////////////////////////////////////
-LogBlock::LogBlock(LogBlockContainer* container, BlockId block_id,
+LogBlock::LogBlock(LogBlockContainerRefPtr container, BlockId block_id,
int64_t offset, int64_t length)
- : container_(container),
+ : container_(std::move(container)),
block_id_(block_id),
offset_(offset),
length_(length) {
@@ -1351,18 +1355,18 @@ void LogBlock::RegisterDeletion(
// LogWritableBlock (definition)
////////////////////////////////////////////////////////////
-LogWritableBlock::LogWritableBlock(LogBlockContainer* container,
+LogWritableBlock::LogWritableBlock(LogBlockContainerRefPtr container,
BlockId block_id, int64_t block_offset)
- : container_(container),
+ : container_(std::move(container)),
block_id_(block_id),
block_offset_(block_offset),
block_length_(0),
state_(CLEAN) {
DCHECK_GE(block_offset, 0);
- DCHECK_EQ(0, block_offset % container->instance()->filesystem_block_size_bytes());
- if (container->metrics()) {
- container->metrics()->generic_metrics.blocks_open_writing->Increment();
- container->metrics()->generic_metrics.total_writable_blocks->Increment();
+ DCHECK_EQ(0, block_offset % container_->instance()->filesystem_block_size_bytes());
+ if (container_->metrics()) {
+ container_->metrics()->generic_metrics.blocks_open_writing->Increment();
+ container_->metrics()->generic_metrics.total_writable_blocks->Increment();
}
}
@@ -1524,7 +1528,7 @@ void LogWritableBlock::DoClose() {
container_->FinalizeBlock(block_offset_, block_length_);
}
- scoped_refptr<LogBlock> lb = container_->block_manager()->AddLogBlock(
+ LogBlockRefPtr lb = container_->block_manager()->AddLogBlock(
container_, block_id_, block_offset_, block_length_);
CHECK(lb);
container_->BlockCreated(lb);
@@ -1550,8 +1554,7 @@ Status LogWritableBlock::AppendMetadata() {
// Refers to a LogBlock representing the block's persisted metadata.
class LogReadableBlock : public ReadableBlock {
public:
- LogReadableBlock(LogBlockContainer* container,
- scoped_refptr<LogBlock> log_block);
+ explicit LogReadableBlock(LogBlockRefPtr log_block);
virtual ~LogReadableBlock();
@@ -1570,11 +1573,8 @@ class LogReadableBlock : public ReadableBlock {
virtual size_t memory_footprint() const OVERRIDE;
private:
- // The owning container. Must outlive this block.
- LogBlockContainer* container_;
-
// A reference to this block's metadata.
- scoped_refptr<internal::LogBlock> log_block_;
+ LogBlockRefPtr log_block_;
// Whether or not this block has been closed. Close() is thread-safe, so
// this must be an atomic primitive.
@@ -1583,14 +1583,12 @@ class LogReadableBlock : public ReadableBlock {
DISALLOW_COPY_AND_ASSIGN(LogReadableBlock);
};
-LogReadableBlock::LogReadableBlock(LogBlockContainer* container,
- scoped_refptr<LogBlock> log_block)
- : container_(container),
- log_block_(std::move(log_block)),
+LogReadableBlock::LogReadableBlock(LogBlockRefPtr log_block)
+ : log_block_(std::move(log_block)),
closed_(false) {
- if (container_->metrics()) {
- container_->metrics()->generic_metrics.blocks_open_reading->Increment();
- container_->metrics()->generic_metrics.total_readable_blocks->Increment();
+ if (log_block_->container()->metrics()) {
+ log_block_->container()->metrics()->generic_metrics.blocks_open_reading->Increment();
+ log_block_->container()->metrics()->generic_metrics.total_readable_blocks->Increment();
}
}
@@ -1601,17 +1599,17 @@ LogReadableBlock::~LogReadableBlock() {
Status LogReadableBlock::Close() {
if (closed_.CompareAndSet(false, true)) {
- log_block_.reset();
- if (container_->metrics()) {
- container_->metrics()->generic_metrics.blocks_open_reading->Decrement();
+ if (log_block_->container()->metrics()) {
+ log_block_->container()->metrics()->generic_metrics.blocks_open_reading->Decrement();
}
+ log_block_.reset();
}
return Status::OK();
}
BlockManager* LogReadableBlock::block_manager() const {
- return container_->block_manager();
+ return log_block_->container()->block_manager();
}
const BlockId& LogReadableBlock::id() const {
@@ -1648,7 +1646,7 @@ Status LogReadableBlock::ReadV(uint64_t offset, ArrayView<Slice> results) const
}
MicrosecondsInt64 start_time = GetMonoTimeMicros();
- RETURN_NOT_OK(container_->ReadVData(read_offset, results));
+ RETURN_NOT_OK(log_block_->container()->ReadVData(read_offset, results));
MicrosecondsInt64 end_time = GetMonoTimeMicros();
int64_t dur = end_time - start_time;
@@ -1657,8 +1655,8 @@ Status LogReadableBlock::ReadV(uint64_t offset, ArrayView<Slice> results) const
const char* counter = BUCKETED_COUNTER_NAME("lbm_reads", dur);
TRACE_COUNTER_INCREMENT(counter, 1);
- if (container_->metrics()) {
- container_->metrics()->generic_metrics.total_bytes_read->IncrementBy(read_length);
+ if (log_block_->container()->metrics()) {
+ log_block_->container()->metrics()->generic_metrics.total_bytes_read->IncrementBy(read_length);
}
return Status::OK();
}
@@ -1736,8 +1734,6 @@ LogBlockManager::~LogBlockManager() {
// Containers may have outstanding tasks running on data directories; wait
// for them to complete before destroying the containers.
dd_manager_->WaitOnClosures();
-
- STLDeleteValues(&all_containers_by_name_);
}
Status LogBlockManager::Open(FsReport* report) {
@@ -1848,7 +1844,7 @@ Status LogBlockManager::CreateBlock(const CreateBlockOptions& opts,
//
// TODO(unknown): should we cap the number of outstanding containers and
// force callers to block if we've reached it?
- LogBlockContainer* container;
+ LogBlockContainerRefPtr container;
RETURN_NOT_OK(GetOrCreateContainer(opts, &container));
// Generate a free block ID.
@@ -1869,7 +1865,7 @@ Status LogBlockManager::CreateBlock(const CreateBlockOptions& opts,
Status LogBlockManager::OpenBlock(const BlockId& block_id,
unique_ptr<ReadableBlock>* block) {
- scoped_refptr<LogBlock> lb;
+ LogBlockRefPtr lb;
{
std::lock_guard<simple_spinlock> l(lock_);
lb = FindPtrOrNull(blocks_by_block_id_, block_id);
@@ -1878,10 +1874,9 @@ Status LogBlockManager::OpenBlock(const BlockId& block_id,
return Status::NotFound("Can't find block", block_id.ToString());
}
- block->reset(new internal::LogReadableBlock(lb->container(),
- lb.get()));
VLOG(3) << "Opened block " << (*block)->id()
<< " from container " << lb->container()->ToString();
+ block->reset(new internal::LogReadableBlock(std::move(lb)));
return Status::OK();
}
@@ -1907,7 +1902,7 @@ void LogBlockManager::NotifyBlockId(BlockId block_id) {
next_block_id_.StoreMax(block_id.id() + 1);
}
-void LogBlockManager::AddNewContainerUnlocked(LogBlockContainer* container) {
+void LogBlockManager::AddNewContainerUnlocked(const LogBlockContainerRefPtr& container) {
DCHECK(lock_.is_locked());
InsertOrDie(&all_containers_by_name_, container->ToString(), container);
if (metrics()) {
@@ -1920,7 +1915,7 @@ void LogBlockManager::AddNewContainerUnlocked(LogBlockContainer* container) {
void LogBlockManager::RemoveFullContainerUnlocked(const string& container_name) {
DCHECK(lock_.is_locked());
- unique_ptr<LogBlockContainer> to_delete(EraseKeyReturnValuePtr(
+ LogBlockContainerRefPtr to_delete(EraseKeyReturnValuePtr(
&all_containers_by_name_, container_name));
CHECK(to_delete);
CHECK(to_delete->full())
@@ -1932,7 +1927,7 @@ void LogBlockManager::RemoveFullContainerUnlocked(const string& container_name)
}
Status LogBlockManager::GetOrCreateContainer(const CreateBlockOptions& opts,
- LogBlockContainer** container) {
+ LogBlockContainerRefPtr* container) {
DataDir* dir;
RETURN_NOT_OK_EVAL(dd_manager_->GetNextDataDir(opts, &dir),
error_manager_->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS, opts.tablet_id));
@@ -1948,7 +1943,7 @@ Status LogBlockManager::GetOrCreateContainer(const CreateBlockOptions& opts,
}
// All containers are in use; create a new one.
- unique_ptr<LogBlockContainer> new_container;
+ LogBlockContainerRefPtr new_container;
Status s = LogBlockContainer::Create(this, dir, &new_container);
// We could create a container in a different directory, but there's
@@ -1959,24 +1954,24 @@ Status LogBlockManager::GetOrCreateContainer(const CreateBlockOptions& opts,
{
std::lock_guard<simple_spinlock> l(lock_);
dirty_dirs_.insert(dir->dir());
- AddNewContainerUnlocked(new_container.get());
+ AddNewContainerUnlocked(new_container);
}
- *container = new_container.release();
+ *container = std::move(new_container);
return Status::OK();
}
-void LogBlockManager::MakeContainerAvailable(LogBlockContainer* container) {
+void LogBlockManager::MakeContainerAvailable(LogBlockContainerRefPtr container) {
std::lock_guard<simple_spinlock> l(lock_);
- MakeContainerAvailableUnlocked(container);
+ MakeContainerAvailableUnlocked(std::move(container));
}
-void LogBlockManager::MakeContainerAvailableUnlocked(LogBlockContainer* container) {
+void LogBlockManager::MakeContainerAvailableUnlocked(LogBlockContainerRefPtr container) {
DCHECK(lock_.is_locked());
if (container->full() || container->read_only()) {
return;
}
VLOG(3) << Substitute("container $0 being made available", container->ToString());
- available_containers_by_data_dir_[container->data_dir()].push_front(container);
+ available_containers_by_data_dir_[container->data_dir()].push_front(std::move(container));
}
Status LogBlockManager::SyncContainer(const LogBlockContainer& container) {
@@ -2019,13 +2014,13 @@ bool LogBlockManager::TryUseBlockId(const BlockId& block_id) {
return InsertIfNotPresent(&open_block_ids_, block_id);
}
-scoped_refptr<LogBlock> LogBlockManager::AddLogBlock(
- LogBlockContainer* container,
+LogBlockRefPtr LogBlockManager::AddLogBlock(
+ LogBlockContainerRefPtr container,
const BlockId& block_id,
int64_t offset,
int64_t length) {
std::lock_guard<simple_spinlock> l(lock_);
- scoped_refptr<LogBlock> lb(new LogBlock(container, block_id, offset, length));
+ LogBlockRefPtr lb(new LogBlock(std::move(container), block_id, offset, length));
mem_tracker_->Consume(kudu_malloc_usable_size(lb.get()));
if (AddLogBlockUnlocked(lb)) {
@@ -2034,13 +2029,13 @@ scoped_refptr<LogBlock> LogBlockManager::AddLogBlock(
return nullptr;
}
-bool LogBlockManager::AddLogBlockUnlocked(scoped_refptr<LogBlock> lb) {
+bool LogBlockManager::AddLogBlockUnlocked(LogBlockRefPtr lb) {
DCHECK(lock_.is_locked());
// InsertIfNotPresent doesn't use move semantics, so instead we just
// insert an empty scoped_refptr and assign into it down below rather
// than using the utility function.
- scoped_refptr<LogBlock>* entry_ptr = &blocks_by_block_id_[lb->block_id()];
+ LogBlockRefPtr* entry_ptr = &blocks_by_block_id_[lb->block_id()];
if (*entry_ptr) {
// Already have an entry for this block ID.
return false;
@@ -2062,15 +2057,15 @@ bool LogBlockManager::AddLogBlockUnlocked(scoped_refptr<LogBlock> lb) {
}
Status LogBlockManager::RemoveLogBlocks(vector<BlockId> block_ids,
- vector<scoped_refptr<LogBlock>>* log_blocks,
+ vector<LogBlockRefPtr>* log_blocks,
vector<BlockId>* deleted) {
Status first_failure;
- vector<scoped_refptr<LogBlock>> lbs;
+ vector<LogBlockRefPtr> lbs;
int64_t malloc_space = 0, blocks_length = 0;
{
std::lock_guard<simple_spinlock> l(lock_);
for (const auto& block_id : block_ids) {
- scoped_refptr<LogBlock> lb;
+ LogBlockRefPtr lb;
Status s = RemoveLogBlockUnlocked(block_id, &lb);
// If we get NotFound, then the block was already deleted.
if (!s.ok() && !s.IsNotFound()) {
@@ -2127,7 +2122,7 @@ Status LogBlockManager::RemoveLogBlocks(vector<BlockId> block_ids,
}
Status LogBlockManager::RemoveLogBlockUnlocked(const BlockId& block_id,
- scoped_refptr<internal::LogBlock>* lb) {
+ LogBlockRefPtr* lb) {
auto it = blocks_by_block_id_.find(block_id);
if (it == blocks_by_block_id_.end()) {
return Status::NotFound("Can't find block", block_id.ToString());
@@ -2174,7 +2169,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
// Keep track of deleted blocks whose space hasn't been punched; they will
// be repunched during repair.
- vector<scoped_refptr<internal::LogBlock>> need_repunching;
+ vector<LogBlockRefPtr> need_repunching;
// Keep track of containers that have nothing but dead blocks; they will be
// deleted during repair.
@@ -2208,7 +2203,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
continue;
}
- unique_ptr<LogBlockContainer> container;
+ LogBlockContainerRefPtr container;
s = LogBlockContainer::Open(
this, dir, &local_report, container_name, &container);
if (s.IsAborted()) {
@@ -2237,7 +2232,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
// the above issue.
UntrackedBlockMap live_blocks;
BlockRecordMap live_block_records;
- vector<scoped_refptr<internal::LogBlock>> dead_blocks;
+ vector<LogBlockRefPtr> dead_blocks;
uint64_t max_block_id = 0;
s = container->ProcessRecords(&local_report,
&live_blocks,
@@ -2401,8 +2396,8 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
}
mem_tracker_->Consume(mem_usage);
- AddNewContainerUnlocked(container.get());
- MakeContainerAvailableUnlocked(container.release());
+ AddNewContainerUnlocked(container);
+ MakeContainerAvailableUnlocked(std::move(container));
}
}
@@ -2441,7 +2436,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
Status LogBlockManager::Repair(
DataDir* dir,
FsReport* report,
- vector<scoped_refptr<internal::LogBlock>> need_repunching,
+ vector<LogBlockRefPtr> need_repunching,
vector<string> dead_containers,
unordered_map<string, vector<BlockRecordPB>> low_live_block_containers) {
if (opts_.read_only) {
@@ -2456,7 +2451,7 @@ Status LogBlockManager::Repair(
// From here on out we're committed to repairing.
// Fetch all the containers we're going to need.
- unordered_map<std::string, internal::LogBlockContainer*> containers_by_name;
+ unordered_map<string, LogBlockContainerRefPtr> containers_by_name;
{
std::lock_guard<simple_spinlock> l(lock_);
@@ -2469,8 +2464,8 @@ Status LogBlockManager::Repair(
// Fetch all the containers we're going to need.
if (report->partial_record_check) {
for (const auto& pr : report->partial_record_check->entries) {
- LogBlockContainer* c = FindPtrOrNull(all_containers_by_name_,
- pr.container);
+ LogBlockContainerRefPtr c = FindPtrOrNull(all_containers_by_name_,
+ pr.container);
if (c) {
containers_by_name[pr.container] = c;
}
@@ -2478,16 +2473,16 @@ Status LogBlockManager::Repair(
}
if (report->full_container_space_check) {
for (const auto& fcp : report->full_container_space_check->entries) {
- LogBlockContainer* c = FindPtrOrNull(all_containers_by_name_,
- fcp.container);
+ LogBlockContainerRefPtr c = FindPtrOrNull(all_containers_by_name_,
+ fcp.container);
if (c) {
containers_by_name[fcp.container] = c;
}
}
}
for (const auto& e : low_live_block_containers) {
- LogBlockContainer* c = FindPtrOrNull(all_containers_by_name_,
- e.first);
+ LogBlockContainerRefPtr c = FindPtrOrNull(all_containers_by_name_,
+ e.first);
if (c) {
containers_by_name[e.first] = c;
}
@@ -2538,8 +2533,7 @@ Status LogBlockManager::Repair(
unique_ptr<RWFile> file;
RWFileOptions opts;
opts.mode = Env::OPEN_EXISTING;
- internal::LogBlockContainer* container = FindPtrOrNull(containers_by_name,
- pr.container);
+ LogBlockContainerRefPtr container = FindPtrOrNull(containers_by_name, pr.container);
if (!container) {
// The container was deleted outright.
pr.repaired = true;
@@ -2594,8 +2588,7 @@ Status LogBlockManager::Repair(
// disk space consumption.
if (report->full_container_space_check) {
for (auto& fcp : report->full_container_space_check->entries) {
- internal::LogBlockContainer* container = FindPtrOrNull(containers_by_name,
- fcp.container);
+ LogBlockContainerRefPtr container = FindPtrOrNull(containers_by_name, fcp.container);
if (!container) {
// The container was deleted outright.
fcp.repaired = true;
@@ -2631,8 +2624,7 @@ Status LogBlockManager::Repair(
int64_t metadata_files_compacted = 0;
int64_t metadata_bytes_delta = 0;
for (const auto& e : low_live_block_containers) {
- internal::LogBlockContainer* container = FindPtrOrNull(containers_by_name,
- e.first);
+ LogBlockContainerRefPtr container = FindPtrOrNull(containers_by_name, e.first);
if (!container) {
// The container was deleted outright.
continue;
@@ -2641,7 +2633,7 @@ Status LogBlockManager::Repair(
// Rewrite this metadata file. Failures are non-fatal.
int64_t file_bytes_delta;
const auto& meta_path = StrCat(e.first, kContainerMetadataFileSuffix);
- Status s = RewriteMetadataFile(*container, e.second, &file_bytes_delta);
+ Status s = RewriteMetadataFile(*(container.get()), e.second, &file_bytes_delta);
if (!s.ok()) {
WARN_NOT_OK(s, "could not rewrite metadata file");
continue;
@@ -2731,7 +2723,7 @@ Status LogBlockManager::RewriteMetadataFile(const LogBlockContainer& container,
return Status::OK();
}
-std::string LogBlockManager::ContainerPathForTests(internal::LogBlockContainer* container) {
+string LogBlockManager::ContainerPathForTests(internal::LogBlockContainer* container) {
return container->ToString();
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/b63d9219/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index edcec5c..4ca3ecc 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -63,6 +63,9 @@ class LogWritableBlock;
struct LogBlockManagerMetrics;
} // namespace internal
+typedef scoped_refptr<internal::LogBlock> LogBlockRefPtr;
+typedef scoped_refptr<internal::LogBlockContainer> LogBlockContainerRefPtr;
+
// A log-backed (i.e. sequentially allocated file) block storage
// implementation.
//
@@ -229,10 +232,10 @@ class LogBlockManager : public BlockManager {
// Type for the actual block map used to store all live blocks.
// We use sparse_hash_map<> here to reduce memory overhead.
typedef MemTrackerAllocator<
- std::pair<const BlockId, scoped_refptr<internal::LogBlock>>> BlockAllocator;
+ std::pair<const BlockId, LogBlockRefPtr>> BlockAllocator;
typedef spp::sparse_hash_map<
BlockId,
- scoped_refptr<internal::LogBlock>,
+ LogBlockRefPtr,
BlockIdHash,
BlockIdEqual,
BlockAllocator> BlockMap;
@@ -242,7 +245,7 @@ class LogBlockManager : public BlockManager {
// Only used during startup.
typedef std::unordered_map<
const BlockId,
- scoped_refptr<internal::LogBlock>,
+ LogBlockRefPtr,
BlockIdHash,
BlockIdEqual> UntrackedBlockMap;
@@ -255,17 +258,10 @@ class LogBlockManager : public BlockManager {
BlockIdHash,
BlockIdEqual> BlockRecordMap;
- // Map used to aggregate BlockRecordMap instances across containers.
- //
- // Only used during startup.
- typedef std::unordered_map<
- std::string,
- std::vector<BlockRecordPB>> BlockRecordsByContainerMap;
-
// Adds an as of yet unseen container to this block manager.
//
// Must be called with 'lock_' held.
- void AddNewContainerUnlocked(internal::LogBlockContainer* container);
+ void AddNewContainerUnlocked(const LogBlockContainerRefPtr& container);
// Removes a previously added container from this block manager. The
// container must be full.
@@ -280,12 +276,12 @@ class LogBlockManager : public BlockManager {
// writing is finished, call MakeContainerAvailable() to make it
// available to other writers.
Status GetOrCreateContainer(const CreateBlockOptions& opts,
- internal::LogBlockContainer** container);
+ LogBlockContainerRefPtr* container);
// Indicate that this container is no longer in use and can be handed out
// to other writers.
- void MakeContainerAvailable(internal::LogBlockContainer* container);
- void MakeContainerAvailableUnlocked(internal::LogBlockContainer* container);
+ void MakeContainerAvailable(LogBlockContainerRefPtr container);
+ void MakeContainerAvailableUnlocked(LogBlockContainerRefPtr container);
// Synchronizes a container's dirty metadata to disk, taking care not to
// sync more than is necessary (using 'dirty_dirs_').
@@ -301,8 +297,8 @@ class LogBlockManager : public BlockManager {
//
// Returns the created LogBlock if it was successfully added or nullptr if a
// block with that ID was already present.
- scoped_refptr<internal::LogBlock> AddLogBlock(
- internal::LogBlockContainer* container,
+ LogBlockRefPtr AddLogBlock(
+ LogBlockContainerRefPtr container,
const BlockId& block_id,
int64_t offset,
int64_t length);
@@ -311,7 +307,7 @@ class LogBlockManager : public BlockManager {
// Must hold 'lock_'.
//
// Returns true if the LogBlock was successfully added, false if it was already present.
- bool AddLogBlockUnlocked(scoped_refptr<internal::LogBlock> lb);
+ bool AddLogBlockUnlocked(LogBlockRefPtr lb);
// Removes the given set of LogBlocks from in-memory data structures, and
// appends the block deletion metadata to record the on-disk deletion.
@@ -321,7 +317,7 @@ class LogBlockManager : public BlockManager {
//
// Returns the first deletion failure that was seen, if any.
Status RemoveLogBlocks(std::vector<BlockId> block_ids,
- std::vector<scoped_refptr<internal::LogBlock>>* log_blocks,
+ std::vector<LogBlockRefPtr>* log_blocks,
std::vector<BlockId>* deleted);
// Removes a LogBlock from in-memory data structures. Must hold 'lock_'.
@@ -329,7 +325,7 @@ class LogBlockManager : public BlockManager {
//
// Returns an error of LogBlock cannot be successfully removed.
Status RemoveLogBlockUnlocked(const BlockId& block_id,
- scoped_refptr<internal::LogBlock>* lb);
+ LogBlockRefPtr* lb);
// Repairs any inconsistencies for 'dir' described in 'report'.
//
@@ -342,7 +338,7 @@ class LogBlockManager : public BlockManager {
// Returns an error if repairing a fatal inconsistency failed.
Status Repair(DataDir* dir,
FsReport* report,
- std::vector<scoped_refptr<internal::LogBlock>> need_repunching,
+ std::vector<LogBlockRefPtr> need_repunching,
std::vector<std::string> dead_containers,
std::unordered_map<
std::string,
@@ -434,14 +430,14 @@ class LogBlockManager : public BlockManager {
// Holds (and owns) all containers loaded from disk.
std::unordered_map<std::string,
- internal::LogBlockContainer*> all_containers_by_name_;
+ LogBlockContainerRefPtr> all_containers_by_name_;
// Holds only those containers that are currently available for writing,
// excluding containers that are either in use or full.
//
// Does not own the containers.
std::unordered_map<const DataDir*,
- std::deque<internal::LogBlockContainer*>> available_containers_by_data_dir_;
+ std::deque<LogBlockContainerRefPtr>> available_containers_by_data_dir_;
// Tracks dirty container directories.
//