You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/05/02 21:13:54 UTC
[2/3] kudu git commit: log_block_manager: refactor container record
processing
log_block_manager: refactor container record processing
While working on some other LBM stuff I noticed that
LogBlockManager::ProcessBlockRecord() was effectively static; this patch
moves it into the LogBlockContainer. Besides narrowing the function's scope,
we can now process records as they are read rather than storing them in a
deque to be processed later.
To make this refactor work, I had to move the LogBlock definition so that
LogBlockContainer could use it. This also means various LogBlockContainer
methods can now take LogBlock-typed arguments, and GetAlignedBlockLength()
can move over to LogBlock where it belongs.
Change-Id: I5b8da74dc699f4afbeda41ee154eb1ba89dac292
Reviewed-on: http://gerrit.cloudera.org:8080/6769
Tested-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: David Ribeiro Alves <da...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/72e58eca
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/72e58eca
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/72e58eca
Branch: refs/heads/master
Commit: 72e58eca93c36ed2dcce23032503b3d964a34236
Parents: 6a11859
Author: Adar Dembo <ad...@cloudera.com>
Authored: Sun Apr 30 20:51:13 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Tue May 2 21:11:41 2017 +0000
----------------------------------------------------------------------
src/kudu/fs/log_block_manager.cc | 453 +++++++++++++++++-----------------
src/kudu/fs/log_block_manager.h | 26 +-
2 files changed, 238 insertions(+), 241 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/72e58eca/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index c866fc0..7e18c33 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -167,6 +167,59 @@ LogBlockManagerMetrics::LogBlockManagerMetrics(const scoped_refptr<MetricEntity>
#undef MINIT
////////////////////////////////////////////////////////////
+// LogBlock (declaration)
+////////////////////////////////////////////////////////////
+
+// The persistent metadata that describes a logical block.
+//
+// A block grows a LogBlock when its data has been synchronized with
+// the disk. That's when it's fully immutable (i.e. none of its metadata
+// can change), and when it becomes readable and persistent.
+//
+// LogBlocks are reference counted to simplify support for deletion with
+// outstanding readers. All refcount increments are performed with the
+// block manager lock held, as are deletion-based decrements. However,
+// no lock is held when ~LogReadableBlock decrements the refcount, thus it
+// must be made thread safe (by extending RefCountedThreadSafe instead of
+// the simpler RefCounted).
+class LogBlock : public RefCountedThreadSafe<LogBlock> {
+ public:
+ LogBlock(LogBlockContainer* container, BlockId block_id, int64_t offset,
+ int64_t length);
+ ~LogBlock();
+
+ const BlockId& block_id() const { return block_id_; }
+ LogBlockContainer* container() const { return container_; }
+ int64_t offset() const { return offset_; }
+ int64_t length() const { return length_; }
+
+ // Returns a block's length aligned to the nearest filesystem block size.
+ int64_t fs_aligned_length() const;
+
+ // Delete the block. Actual deletion takes place when the
+ // block is destructed.
+ void Delete();
+
+ private:
+ // The owning container. Must outlive the LogBlock.
+ LogBlockContainer* container_;
+
+ // The block identifier.
+ const BlockId block_id_;
+
+ // The block's offset in the container.
+ const int64_t offset_;
+
+ // The block's length.
+ const int64_t length_;
+
+ // Whether the block has been marked for deletion.
+ bool deleted_;
+
+ DISALLOW_COPY_AND_ASSIGN(LogBlock);
+};
+
+////////////////////////////////////////////////////////////
// LogBlockContainer
////////////////////////////////////////////////////////////
@@ -281,18 +334,28 @@ class LogBlockContainer {
// This function is thread unsafe.
Status TruncateDataToNextBlockOffset();
- // Reads the container's metadata from disk, sanity checking and
- // returning the records in 'records'. Any on-disk inconsistencies are
- // recorded in 'report'.
- Status ReadContainerRecords(FsReport* report,
- deque<BlockRecordPB>* records) const;
+ // Reads the container's metadata from disk, sanity checking and processing
+ // records along the way.
+ //
+ // Malformed records and other container inconsistencies are written to
+ // 'report'. Healthy blocks are written either to 'live_blocks' or
+ // 'dead_blocks'. The greatest block ID seen thus far in the container is
+ // written to 'max_block_id'.
+ //
+ // Returns an error only if there was a problem accessing the container from
+ // disk; such errors are fatal and effectively halt processing immediately.
+ Status ProcessRecords(
+ FsReport* report,
+ LogBlockManager::UntrackedBlockMap* live_blocks,
+ std::vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
+ uint64_t* max_block_id);
// Updates internal bookkeeping state to reflect the creation of a block,
// marking this container as full if needed. Should only be called when a
// block is fully written, as it will round up the container data file's position.
//
// This function is thread unsafe.
- void BlockCreated(int64_t block_offset, int64_t block_length);
+ void BlockCreated(const scoped_refptr<LogBlock>& block);
// Updates internal bookkeeping state to reflect the deletion of a block.
//
@@ -300,7 +363,7 @@ class LogBlockContainer {
// deletions can happen concurrently with creations.
//
// Note: the container is not made "unfull"; containers remain sparse until deleted.
- void BlockDeleted(int64_t block_offset, int64_t block_length);
+ void BlockDeleted(const scoped_refptr<LogBlock>& block);
// Run a task on this container's data directory thread pool.
//
@@ -311,9 +374,6 @@ class LogBlockContainer {
// Produces a debug-friendly string representation of this container.
string ToString() const;
- // Returns a block's length aligned to the nearest filesystem block size;
- int64_t GetAlignedBlockLength(int64_t block_offset, int64_t block_length) const;
-
// Simple accessors.
LogBlockManager* block_manager() const { return block_manager_; }
int64_t next_block_offset() const { return next_block_offset_; }
@@ -339,19 +399,23 @@ class LogBlockContainer {
unique_ptr<WritablePBContainerFile> metadata_file,
shared_ptr<RWFile> data_file);
- // Performs sanity checks on a block record.
+ // Processes a single block record, performing sanity checks on it and adding
+ // it either to 'live_blocks' or 'dead_blocks'.
//
// Returns an error only if there was a problem accessing the container from
// disk; such errors are fatal and effectively halt processing immediately.
//
// On success, 'report' is updated with any inconsistencies found in the
// record, 'data_file_size' may be updated with the latest size of the
- // container's data file, and 'is_malformed_record' reflects whether or not
- // the record was malformed.
- Status CheckBlockRecord(const BlockRecordPB& record,
- FsReport* report,
- uint64_t* data_file_size,
- bool* is_malformed_record) const;
+ // container's data file, and 'max_block_id' reflects the largest block ID
+ // seen thus far in the container.
+ Status ProcessRecord(
+ const BlockRecordPB& record,
+ FsReport* report,
+ LogBlockManager::UntrackedBlockMap* live_blocks,
+ std::vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
+ uint64_t* data_file_size,
+ uint64_t* max_block_id);
// The owning block manager. Must outlive the container itself.
LogBlockManager* const block_manager_;
@@ -576,8 +640,11 @@ Status LogBlockContainer::TruncateDataToNextBlockOffset() {
return Status::OK();
}
-Status LogBlockContainer::ReadContainerRecords(FsReport* report,
- deque<BlockRecordPB>* records) const {
+Status LogBlockContainer::ProcessRecords(
+ FsReport* report,
+ LogBlockManager::UntrackedBlockMap* live_blocks,
+ vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
+ uint64_t* max_block_id) {
string metadata_path = metadata_file_->filename();
unique_ptr<RandomAccessFile> metadata_reader;
RETURN_NOT_OK(block_manager()->env()->NewRandomAccessFile(metadata_path, &metadata_reader));
@@ -585,7 +652,6 @@ Status LogBlockContainer::ReadContainerRecords(FsReport* report,
RETURN_NOT_OK(pb_reader.Open());
uint64_t data_file_size = 0;
- deque<BlockRecordPB> local_records;
Status read_status;
while (true) {
BlockRecordPB record;
@@ -593,19 +659,14 @@ Status LogBlockContainer::ReadContainerRecords(FsReport* report,
if (!read_status.ok()) {
break;
}
-
- bool is_malformed_record;
- RETURN_NOT_OK(CheckBlockRecord(record, report,
- &data_file_size, &is_malformed_record));
- if (PREDICT_FALSE(!is_malformed_record)) {
- local_records.emplace_back(std::move(record));
- }
+ RETURN_NOT_OK(ProcessRecord(record, report,
+ live_blocks, dead_blocks,
+ &data_file_size, max_block_id));
}
// NOTE: 'read_status' will never be OK here.
if (PREDICT_TRUE(read_status.IsEndOfFile())) {
// We've reached the end of the file without any problems.
- records->swap(local_records);
return Status::OK();
}
if (read_status.IsIncomplete()) {
@@ -614,49 +675,95 @@ Status LogBlockContainer::ReadContainerRecords(FsReport* report,
// write and truncate the metadata file to remove this partial record.
report->partial_record_check->entries.emplace_back(ToString(),
pb_reader.offset());
- records->swap(local_records);
return Status::OK();
}
// If we've made it here, we've found (and are returning) an unrecoverable error.
return read_status;
}
-Status LogBlockContainer::CheckBlockRecord(const BlockRecordPB& record,
- FsReport* report,
- uint64_t* data_file_size,
- bool* is_malformed_record) const {
- if (record.op_type() == CREATE) {
- // First verify that the record's offset/length aren't wildly incorrect.
- if (PREDICT_FALSE(!record.has_offset() ||
- !record.has_length() ||
- record.offset() < 0 ||
- record.length() < 0)) {
- report->malformed_record_check->entries.emplace_back(ToString(), record);
- *is_malformed_record = true;
- return Status::OK();
- }
+Status LogBlockContainer::ProcessRecord(
+ const BlockRecordPB& record,
+ FsReport* report,
+ LogBlockManager::UntrackedBlockMap* live_blocks,
+ vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
+ uint64_t* data_file_size,
+ uint64_t* max_block_id) {
+ BlockId block_id(BlockId::FromPB(record.block_id()));
+ scoped_refptr<LogBlock> lb;
+ switch (record.op_type()) {
+ case CREATE:
+ // First verify that the record's offset/length aren't wildly incorrect.
+ if (PREDICT_FALSE(!record.has_offset() ||
+ !record.has_length() ||
+ record.offset() < 0 ||
+ record.length() < 0)) {
+ report->malformed_record_check->entries.emplace_back(ToString(), record);
+ break;
+ }
- // Now it should be safe to access the record's offset/length.
- //
- // KUDU-1657: When opening a container in read-only mode which is actively
- // being written to by another lbm, we must reinspect the data file's size
- // frequently in order to account for the latest appends. Inspecting the
- // file size is expensive, so we only do it when the metadata indicates
- // that additional data has been written to the file.
- if (PREDICT_FALSE(record.offset() + record.length() > *data_file_size)) {
- RETURN_NOT_OK(data_file_->Size(data_file_size));
- }
+ // Now it should be safe to access the record's offset/length.
+ //
+ // KUDU-1657: When opening a container in read-only mode which is actively
+ // being written to by another lbm, we must reinspect the data file's size
+ // frequently in order to account for the latest appends. Inspecting the
+ // file size is expensive, so we only do it when the metadata indicates
+ // that additional data has been written to the file.
+ if (PREDICT_FALSE(record.offset() + record.length() > *data_file_size)) {
+ RETURN_NOT_OK(data_file_->Size(data_file_size));
+ }
+
+ // If the record still extends beyond the end of the file, it is malformed.
+ if (PREDICT_FALSE(record.offset() + record.length() > *data_file_size)) {
+ // TODO(adar): treat as a different kind of inconsistency?
+ report->malformed_record_check->entries.emplace_back(ToString(), record);
+ break;
+ }
+
+ lb = new LogBlock(this, block_id, record.offset(), record.length());
+ if (!InsertIfNotPresent(live_blocks, block_id, lb)) {
+ // We found a record whose ID matches that of an already created block.
+ //
+ // TODO(adar): treat as a different kind of inconsistency?
+ report->malformed_record_check->entries.emplace_back(
+ ToString(), record);
+ break;
+ }
+
+ VLOG(2) << Substitute("Found CREATE block $0 at offset $1 with length $2",
+ block_id.ToString(),
+ record.offset(), record.length());
+
+ // This block must be included in the container's logical size, even if
+ // it has since been deleted. This helps satisfy one of our invariants:
+ // once a container byte range has been used, it may never be reused in
+ // the future.
+ //
+ // If we ignored deleted blocks, we would end up reusing the space
+ // belonging to the last deleted block in the container.
+ BlockCreated(lb);
- // If the record still extends beyond the end of the file, it is malformed.
- if (PREDICT_FALSE(record.offset() + record.length() > *data_file_size)) {
+ *max_block_id = std::max(*max_block_id, block_id.id());
+ break;
+ case DELETE:
+ lb = EraseKeyReturnValuePtr(live_blocks, block_id);
+ if (!lb) {
+ // We found a record for which there is no already created block.
+ //
+ // TODO(adar): treat as a different kind of inconsistency?
+ report->malformed_record_check->entries.emplace_back(ToString(), record);
+ break;
+ }
+ VLOG(2) << Substitute("Found DELETE block $0", block_id.ToString());
+ BlockDeleted(lb);
+ dead_blocks->emplace_back(std::move(lb));
+ break;
+ default:
+ // We found a record with an unknown type.
+ //
// TODO(adar): treat as a different kind of inconsistency?
report->malformed_record_check->entries.emplace_back(ToString(), record);
- *is_malformed_record = true;
- return Status::OK();
- }
+ break;
}
-
- *is_malformed_record = false;
return Status::OK();
}
@@ -678,10 +785,10 @@ Status LogBlockContainer::FinishBlock(const Status& s, WritableBlock* block) {
// will have written some garbage that can be expunged during a GC.
RETURN_NOT_OK(block_manager()->SyncContainer(*this));
- CHECK(block_manager()->AddLogBlock(this, block->id(),
- next_block_offset_,
- block->BytesAppended()));
- BlockCreated(next_block_offset_, block->BytesAppended());
+ scoped_refptr<LogBlock> lb = block_manager()->AddLogBlock(
+ this, block->id(), next_block_offset_, block->BytesAppended());
+ CHECK(lb);
+ BlockCreated(lb);
// Truncate the container if it's now full; any left over preallocated space
// is no longer needed.
@@ -790,8 +897,8 @@ Status LogBlockContainer::EnsurePreallocated(int64_t block_start_offset,
return Status::OK();
}
-void LogBlockContainer::BlockCreated(int64_t block_offset, int64_t block_length) {
- DCHECK_GE(block_offset, 0);
+void LogBlockContainer::BlockCreated(const scoped_refptr<LogBlock>& block) {
+ DCHECK_GE(block->offset(), 0);
// The log block manager maintains block contiguity as an invariant, which
// means accounting for the new block should be as simple as adding its
@@ -804,7 +911,8 @@ void LogBlockContainer::BlockCreated(int64_t block_offset, int64_t block_length)
// boundary. This guarantees that the disk space can be reclaimed when
// the block is deleted.
int64_t new_next_block_offset = KUDU_ALIGN_UP(
- block_offset + block_length, instance()->filesystem_block_size_bytes());
+ block->offset() + block->length(),
+ instance()->filesystem_block_size_bytes());
if (PREDICT_FALSE(new_next_block_offset < next_block_offset_)) {
LOG(WARNING) << Substitute(
"Container $0 unexpectedly tried to lower its next block offset "
@@ -813,7 +921,7 @@ void LogBlockContainer::BlockCreated(int64_t block_offset, int64_t block_length)
} else {
int64_t aligned_block_length = new_next_block_offset - next_block_offset_;
total_bytes_+= aligned_block_length;
- live_bytes_.IncrementBy(block_length);
+ live_bytes_.IncrementBy(block->length());
live_bytes_aligned_.IncrementBy(aligned_block_length);
next_block_offset_ = new_next_block_offset;
}
@@ -827,11 +935,11 @@ void LogBlockContainer::BlockCreated(int64_t block_offset, int64_t block_length)
}
}
-void LogBlockContainer::BlockDeleted(int64_t block_offset, int64_t block_length) {
- DCHECK_GE(block_offset, 0);
+void LogBlockContainer::BlockDeleted(const scoped_refptr<LogBlock>& block) {
+ DCHECK_GE(block->offset(), 0);
- live_bytes_.IncrementBy(-block_length);
- live_bytes_aligned_.IncrementBy(-GetAlignedBlockLength(block_offset, block_length));
+ live_bytes_.IncrementBy(-block->length());
+ live_bytes_aligned_.IncrementBy(-block->fs_aligned_length());
live_blocks_.IncrementBy(-1);
}
@@ -846,77 +954,10 @@ string LogBlockContainer::ToString() const {
return s;
}
-
-int64_t LogBlockContainer::GetAlignedBlockLength(int64_t block_offset,
- int64_t block_length) const {
- uint64_t fs_block_size =
- data_dir_->instance()->metadata()->filesystem_block_size_bytes();
-
- // Nearly all blocks are placed on a filesystem block boundary, which means
- // their length post-alignment is simply their length aligned up to the
- // nearest fs block size.
- //
- // However, due to KUDU-1793, some blocks may start or end at misaligned
- // offsets. We don't maintain enough state to precisely pinpoint such a
- // block's (aligned) end offset in this case, so we'll just undercount it.
- // This should be safe, although it may mean unreclaimed disk space (i.e.
- // when GetAlignedBlockLength() is used in hole punching).
- if (PREDICT_TRUE(block_offset % fs_block_size == 0)) {
- return KUDU_ALIGN_UP(block_length, fs_block_size);
- }
- return block_length;
-}
-
////////////////////////////////////////////////////////////
-// LogBlock
+// LogBlock (definition)
////////////////////////////////////////////////////////////
-// The persistent metadata that describes a logical block.
-//
-// A block grows a LogBlock when its data has been synchronized with
-// the disk. That's when it's fully immutable (i.e. none of its metadata
-// can change), and when it becomes readable and persistent.
-//
-// LogBlocks are reference counted to simplify support for deletion with
-// outstanding readers. All refcount increments are performed with the
-// block manager lock held, as are deletion-based decrements. However,
-// no lock is held when ~LogReadableBlock decrements the refcount, thus it
-// must be made thread safe (by extending RefCountedThreadSafe instead of
-// the simpler RefCounted).
-class LogBlock : public RefCountedThreadSafe<LogBlock> {
- public:
- LogBlock(LogBlockContainer* container, BlockId block_id, int64_t offset,
- int64_t length);
- ~LogBlock();
-
- const BlockId& block_id() const { return block_id_; }
- LogBlockContainer* container() const { return container_; }
- int64_t offset() const { return offset_; }
- int64_t length() const { return length_; }
-
- // Delete the block. Actual deletion takes place when the
- // block is destructed.
- void Delete();
-
- private:
- // The owning container. Must outlive the LogBlock.
- LogBlockContainer* container_;
-
- // The block identifier.
- const BlockId block_id_;
-
- // The block's offset in the container.
- const int64_t offset_;
-
- // The block's length.
- const int64_t length_;
-
- // Whether the block has been marked for deletion.
- bool deleted_;
-
- DISALLOW_COPY_AND_ASSIGN(LogBlock);
-};
-
LogBlock::LogBlock(LogBlockContainer* container, BlockId block_id,
int64_t offset, int64_t length)
: container_(container),
@@ -931,24 +972,40 @@ LogBlock::LogBlock(LogBlockContainer* container, BlockId block_id,
static void DeleteBlockAsync(LogBlockContainer* container,
BlockId block_id,
int64_t offset,
- int64_t length) {
- // Use the block's aligned length so that the filesystem can reclaim maximal
- // disk space.
- //
+ int64_t fs_aligned_length) {
// We don't call SyncData() to synchronize the deletion because it's
// expensive, and in the worst case, we'll just leave orphaned data
// behind to be cleaned up in the next GC.
VLOG(3) << "Freeing space belonging to block " << block_id;
- WARN_NOT_OK(container->PunchHole(
- offset, container->GetAlignedBlockLength(offset, length)),
+ WARN_NOT_OK(container->PunchHole(offset, fs_aligned_length),
Substitute("Could not delete block $0", block_id.ToString()));
}
LogBlock::~LogBlock() {
if (deleted_) {
+ // Use the block's aligned length so that the filesystem can reclaim
+ // maximal disk space.
container_->ExecClosure(Bind(&DeleteBlockAsync, container_, block_id_,
- offset_, length_));
+ offset_, fs_aligned_length()));
+ }
+}
+
+int64_t LogBlock::fs_aligned_length() const {
+ uint64_t fs_block_size = container_->instance()->filesystem_block_size_bytes();
+
+ // Nearly all blocks are placed on a filesystem block boundary, which means
+ // their length post-alignment is simply their length aligned up to the
+ // nearest fs block size.
+ //
+ // However, due to KUDU-1793, some blocks may start or end at misaligned
+ // offsets. We don't maintain enough state to precisely pinpoint such a
+ // block's (aligned) end offset in this case, so we'll just undercount it.
+ // This should be safe, although it may mean unreclaimed disk space (i.e.
+ // when fs_aligned_length() is used in hole punching).
+ if (PREDICT_TRUE(offset_ % fs_block_size == 0)) {
+ return KUDU_ALIGN_UP(length_, fs_block_size);
}
+ return length_;
}
void LogBlock::Delete() {
@@ -1522,7 +1579,7 @@ Status LogBlockManager::DeleteBlock(const BlockId& block_id) {
}
VLOG(3) << "Deleting block " << block_id;
lb->Delete();
- lb->container()->BlockDeleted(lb->offset(), lb->length());
+ lb->container()->BlockDeleted(lb);
// Record the on-disk deletion.
//
@@ -1660,15 +1717,19 @@ bool LogBlockManager::TryUseBlockId(const BlockId& block_id) {
return InsertIfNotPresent(&open_block_ids_, block_id);
}
-bool LogBlockManager::AddLogBlock(LogBlockContainer* container,
- const BlockId& block_id,
- int64_t offset,
- int64_t length) {
+scoped_refptr<LogBlock> LogBlockManager::AddLogBlock(
+ LogBlockContainer* container,
+ const BlockId& block_id,
+ int64_t offset,
+ int64_t length) {
std::lock_guard<simple_spinlock> l(lock_);
scoped_refptr<LogBlock> lb(new LogBlock(container, block_id, offset, length));
mem_tracker_->Consume(kudu_malloc_usable_size(lb.get()));
- return AddLogBlockUnlocked(lb);
+ if (AddLogBlockUnlocked(lb)) {
+ return lb;
+ }
+ return nullptr;
}
bool LogBlockManager::AddLogBlockUnlocked(const scoped_refptr<LogBlock>& lb) {
@@ -1763,16 +1824,8 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
return;
}
- // Populate the in-memory block maps using each container's records.
- deque<BlockRecordPB> records;
- s = container->ReadContainerRecords(&local_report, &records);
- if (!s.ok()) {
- *result_status = s.CloneAndPrepend(Substitute(
- "Could not read records from container $0", container->ToString()));
- return;
- }
-
- // Process the records, building a container-local map.
+ // Process the records, building a container-local map for live blocks and
+ // a list of dead blocks.
//
// It's important that we don't try to add these blocks to the global map
// incrementally as we see each record, since it's possible that one container
@@ -1785,13 +1838,17 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
// NOTE: Since KUDU-1538, we allocate sequential block IDs, which makes reuse
// exceedingly unlikely. However, we might have old data which still exhibits
// the above issue.
- UntrackedBlockMap blocks_in_container;
- vector<scoped_refptr<internal::LogBlock>> deleted;
+ UntrackedBlockMap live_blocks;
+ vector<scoped_refptr<internal::LogBlock>> dead_blocks;
uint64_t max_block_id = 0;
- for (const BlockRecordPB& r : records) {
- ProcessBlockRecord(r, &local_report, container.get(),
- &blocks_in_container, &deleted);
- max_block_id = std::max(max_block_id, r.block_id().id());
+ s = container->ProcessRecords(&local_report,
+ &live_blocks,
+ &dead_blocks,
+ &max_block_id);
+ if (!s.ok()) {
+ *result_status = s.CloneAndPrepend(Substitute(
+ "Could not process records in container $0", container->ToString()));
+ return;
}
// With deleted blocks out of the way, check for misaligned blocks.
@@ -1800,7 +1857,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
// underlying filesystem's block size, an invariant maintained by the log
// block manager. However, due to KUDU-1793, that invariant may have been
// broken, so we'll note but otherwise allow it.
- for (const auto& e : blocks_in_container) {
+ for (const auto& e : live_blocks) {
if (PREDICT_FALSE(e.second->offset() %
container->instance()->filesystem_block_size_bytes() != 0)) {
local_report.misaligned_block_check->entries.emplace_back(
@@ -1849,7 +1906,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
local_report.full_container_space_check->entries.emplace_back(
container->ToString(), reported_size - container->live_bytes_aligned());
need_repunching.insert(need_repunching.end(),
- deleted.begin(), deleted.end());
+ dead_blocks.begin(), dead_blocks.end());
}
local_report.stats.lbm_full_container_count++;
@@ -1869,7 +1926,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
// memory in a local and add it to the mem-tracker in a single increment
// at the end of this loop.
int64_t mem_usage = 0;
- for (const UntrackedBlockMap::value_type& e : blocks_in_container) {
+ for (const UntrackedBlockMap::value_type& e : live_blocks) {
if (!AddLogBlockUnlocked(e.second)) {
// TODO(adar): track as an inconsistency?
LOG(FATAL) << "Found duplicate CREATE record for block " << e.first
@@ -1899,58 +1956,6 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
*result_status = Status::OK();
}
-void LogBlockManager::ProcessBlockRecord(
- const BlockRecordPB& record,
- FsReport* report,
- LogBlockContainer* container,
- UntrackedBlockMap* block_map,
- vector<scoped_refptr<internal::LogBlock>>* deleted) {
- BlockId block_id(BlockId::FromPB(record.block_id()));
- scoped_refptr<LogBlock> lb;
- switch (record.op_type()) {
- case CREATE: {
- lb = new LogBlock(container, block_id, record.offset(), record.length());
- if (!InsertIfNotPresent(block_map, block_id, lb)) {
- // TODO(adar): treat as a different kind of inconsistency?
- report->malformed_record_check->entries.emplace_back(
- container->ToString(), record);
- return;
- }
-
- VLOG(2) << Substitute("Found CREATE block $0 at offset $1 with length $2",
- block_id.ToString(),
- record.offset(), record.length());
-
- // This block must be included in the container's logical size, even if
- // it has since been deleted. This helps satisfy one of our invariants:
- // once a container byte range has been used, it may never be reused in
- // the future.
- //
- // If we ignored deleted blocks, we would end up reusing the space
- // belonging to the last deleted block in the container.
- container->BlockCreated(record.offset(), record.length());
- break;
- }
- case DELETE:
- lb = EraseKeyReturnValuePtr(block_map, block_id);
- if (!lb) {
- // TODO(adar): treat as a different kind of inconsistency?
- report->malformed_record_check->entries.emplace_back(
- container->ToString(), record);
- return;
- }
- VLOG(2) << Substitute("Found DELETE block $0", block_id.ToString());
- container->BlockDeleted(lb->offset(), lb->length());
- deleted->emplace_back(std::move(lb));
- break;
- default:
- // TODO(adar): treat as a different kind of inconsistency?
- report->malformed_record_check->entries.emplace_back(
- container->ToString(), record);
- return;
- }
-}
-
Status LogBlockManager::Repair(
FsReport* report,
vector<scoped_refptr<internal::LogBlock>> need_repunching) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/72e58eca/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index d69aeaa..e401446 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -238,15 +238,18 @@ class LogBlockManager : public BlockManager {
// Adds a LogBlock to in-memory data structures.
//
- // Returns success if the LogBlock was successfully added, failure if it
- // was already present.
- bool AddLogBlock(internal::LogBlockContainer* container,
- const BlockId& block_id,
- int64_t offset,
- int64_t length);
+ // Returns the created LogBlock if it was successfully added or nullptr if a
+ // block with that ID was already present.
+ scoped_refptr<internal::LogBlock> AddLogBlock(
+ internal::LogBlockContainer* container,
+ const BlockId& block_id,
+ int64_t offset,
+ int64_t length);
// Unlocked variant of AddLogBlock() for an already-constructed LogBlock object.
// Must hold 'lock_'.
+ //
+ // Returns true if the LogBlock was successfully added, false if it was already present.
bool AddLogBlockUnlocked(const scoped_refptr<internal::LogBlock>& lb);
// Removes a LogBlock from in-memory data structures.
@@ -255,17 +258,6 @@ class LogBlockManager : public BlockManager {
// already gone.
scoped_refptr<internal::LogBlock> RemoveLogBlock(const BlockId& block_id);
- // Parses a block record, adding or removing it in 'block_map', and
- // accounting for it in the metadata for 'container'. Deleted blocks are
- // written to 'deleted'.
- //
- // If any record is malformed, it is written to 'report'.
- void ProcessBlockRecord(const BlockRecordPB& record,
- FsReport* report,
- internal::LogBlockContainer* container,
- UntrackedBlockMap* block_map,
- std::vector<scoped_refptr<internal::LogBlock>>* deleted);
-
// Repairs any inconsistencies described in 'report'. Any blocks in
// 'need_repunching' will be punched out again.
//