You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/09/14 17:36:57 UTC
[kudu] 02/02: [cfile] unsorted updates on cfile_reader.{h,cc}
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 1ad8c1f18a35e06d4aafd7813de71ee335c68ea1
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Fri Sep 9 19:14:02 2022 -0700
[cfile] unsorted updates on cfile_reader.{h,cc}
This patch makes the code in cfile_reader.{h,cc} a bit more robust:
* check more rigorously for invariants to protect against unsigned
integer underflow, etc.
* don't crash but instead return Status::Corruption() when possible
* optimized the check on whether to verify checksums
* added 'final' to CFileIterator and DefaultColumnValueIterator
classes to provide more de-virtualisation opportunities for compiler
* added PREDICT_{FALSE,TRUE} where appropriate
* added the 'runtime' tag for the --cfile_verify_checksums flag
* other minor updates
Change-Id: I25507963f87e08a6c3a8ba2ff1ca58836b713e18
Reviewed-on: http://gerrit.cloudera.org:8080/18965
Tested-by: Alexey Serbin <al...@apache.org>
Reviewed-by: Abhishek Chennaka <ac...@cloudera.com>
Reviewed-by: Yifan Zhang <ch...@163.com>
---
src/kudu/cfile/cfile_reader.cc | 144 ++++++++++++++++++++++++-----------------
src/kudu/cfile/cfile_reader.h | 28 +++++---
2 files changed, 101 insertions(+), 71 deletions(-)
diff --git a/src/kudu/cfile/cfile_reader.cc b/src/kudu/cfile/cfile_reader.cc
index bbd8779dd..5f1968da5 100644
--- a/src/kudu/cfile/cfile_reader.cc
+++ b/src/kudu/cfile/cfile_reader.cc
@@ -78,6 +78,7 @@ TAG_FLAG(cfile_lazy_open, hidden);
DEFINE_bool(cfile_verify_checksums, true,
"Verify the checksum for each block on read if one exists");
TAG_FLAG(cfile_verify_checksums, evolving);
+TAG_FLAG(cfile_verify_checksums, runtime);
DEFINE_double(cfile_inject_corruption, 0,
"Fraction of the time that read operations on CFiles will fail "
@@ -108,8 +109,8 @@ static const size_t kMaxHeaderFooterPBSize = 64*1024;
static Status ParseMagicAndLength(const Slice& data,
uint8_t* cfile_version,
uint32_t* parsed_len) {
- if (data.size() != kMagicAndLengthSize) {
- return Status::Corruption("Bad size data");
+ if (PREDICT_FALSE(data.size() != kMagicAndLengthSize)) {
+ return Status::Corruption("invalid data size", to_string(data.size()));
}
uint8_t version;
@@ -122,7 +123,7 @@ static Status ParseMagicAndLength(const Slice& data,
}
uint32_t len = DecodeFixed32(data.data() + kMagicLength);
- if (len > kMaxHeaderFooterPBSize) {
+ if (PREDICT_FALSE(len > kMaxHeaderFooterPBSize)) {
return Status::Corruption("invalid data size for header", to_string(len));
}
@@ -138,6 +139,7 @@ CFileReader::CFileReader(ReaderOptions options,
block_(std::move(block)),
file_size_(file_size),
codec_(nullptr),
+ do_verify_checksum_(false),
mem_consumption_(std::move(options.parent_mem_tracker),
memory_footprint()) {
}
@@ -228,12 +230,11 @@ Status CFileReader::ReadAndParseHeader() {
"failed to parse CFile pre-header");
// Quick check to ensure the header size is reasonable.
- if (header_size >= file_size_ - kMagicAndLengthSize) {
+ if (PREDICT_FALSE(file_size_ <= header_size + kMagicAndLengthSize)) {
return Status::Corruption("invalid CFile header size", to_string(header_size));
}
// Setup the data slices.
- uint64_t off = kMagicAndLengthSize;
uint8_t header_scratch[header_size];
Slice header(header_scratch, header_size);
uint8_t checksum_scratch[kChecksumSize];
@@ -241,23 +242,22 @@ Status CFileReader::ReadAndParseHeader() {
// Read the header and checksum if needed.
vector<Slice> results = { header };
- if (has_checksums() && FLAGS_cfile_verify_checksums) {
+ if (do_verify_checksum()) {
results.push_back(checksum);
}
- RETURN_NOT_OK(block_->ReadV(off, results));
+ RETURN_NOT_OK(block_->ReadV(kMagicAndLengthSize, results));
- if (has_checksums() && FLAGS_cfile_verify_checksums) {
- Slice slices[] = { mal, header };
+ if (do_verify_checksum()) {
+ Slice slices[]{ mal, header };
RETURN_NOT_OK(VerifyChecksum(slices, checksum));
}
// Parse the protobuf header.
header_.reset(new CFileHeaderPB());
- if (!header_->ParseFromArray(header.data(), header.size())) {
+ if (PREDICT_FALSE(!header_->ParseFromArray(header.data(), header.size()))) {
return Status::Corruption("invalid CFile pb header",
header.ToDebugString());
}
-
VLOG(2) << "Read header: " << SecureDebugString(*header_);
return Status::OK();
@@ -267,7 +267,10 @@ Status CFileReader::ReadAndParseFooter() {
TRACE_EVENT1("io", "CFileReader::ReadAndParseFooter",
"cfile", ToString());
DCHECK(!init_once_.init_succeeded());
- CHECK_GT(file_size_, kMagicAndLengthSize) << "file too short: " << file_size_;
+
+ if (PREDICT_FALSE(file_size_ <= kMagicAndLengthSize)) {
+ return Status::Corruption(Substitute("file too short: $0", file_size_));
+ }
// First read and parse the "post-footer", which has magic
// and the length of the actual protobuf footer.
@@ -278,7 +281,7 @@ Status CFileReader::ReadAndParseFooter() {
RETURN_NOT_OK(ParseMagicAndLength(mal, &cfile_version_, &footer_size));
// Quick check to ensure the footer size is reasonable.
- if (footer_size >= file_size_ - kMagicAndLengthSize) {
+ if (PREDICT_FALSE(file_size_ <= footer_size + kMagicAndLengthSize)) {
return Status::Corruption(Substitute(
"invalid CFile footer size $0 in block of size $1",
footer_size, file_size_));
@@ -294,21 +297,31 @@ Status CFileReader::ReadAndParseFooter() {
// We read the checksum position in case one exists.
// This is done to avoid the need for a follow up read call.
Slice results[2] = {checksum, footer};
- uint64_t off = file_size_ - kMagicAndLengthSize - footer_size - kChecksumSize;
- RETURN_NOT_OK(block_->ReadV(off, results));
+ if (PREDICT_FALSE(file_size_ <
+ kMagicAndLengthSize + footer_size + kChecksumSize)) {
+ return Status::Corruption(Substitute(
+ "unexpected CFile contents: total size $0, footer size $1",
+ file_size_, footer_size));
+ }
+ RETURN_NOT_OK(block_->ReadV(
+ file_size_ - kMagicAndLengthSize - footer_size - kChecksumSize, results));
// Parse the protobuf footer.
// This needs to be done before validating the checksum since the
// incompatible_features flag tells us if a checksum exists at all.
footer_.reset(new CFileFooterPB());
- if (!footer_->ParseFromArray(footer.data(), footer.size())) {
+ if (PREDICT_FALSE(!footer_->ParseFromArray(footer.data(), footer.size()))) {
return Status::Corruption("invalid CFile pb footer", footer.ToDebugString());
}
+ // Remember the checksum verification choice.
+ do_verify_checksum_ =
+ PREDICT_TRUE(FLAGS_cfile_verify_checksums) && has_checksum();
+
// Verify the footer checksum if needed.
- if (has_checksums() && FLAGS_cfile_verify_checksums) {
+ if (do_verify_checksum()) {
// If a checksum exists it was pre-read.
- Slice slices[2] = {footer, mal};
+ Slice slices[]{ footer, mal };
RETURN_NOT_OK(VerifyChecksum(slices, checksum));
}
@@ -317,26 +330,21 @@ Status CFileReader::ReadAndParseFooter() {
RETURN_NOT_OK_PREPEND(GetCompressionCodec(footer_->compression(), &codec_),
"failed to load CFile compression codec");
}
-
VLOG(2) << "Read footer: " << SecureDebugString(*footer_);
return Status::OK();
}
-bool CFileReader::has_checksums() const {
- return footer_->incompatible_features() & IncompatibleFeatures::CHECKSUM;
-}
-
-Status CFileReader::VerifyChecksum(ArrayView<const Slice> data, const Slice& checksum) const {
+Status CFileReader::VerifyChecksum(ArrayView<const Slice> data, const Slice& checksum) {
uint32_t expected_checksum = DecodeFixed32(checksum.data());
uint32_t checksum_value = 0;
- for (auto& d : data) {
+ for (const auto& d : data) {
checksum_value = crc::Crc32c(d.data(), d.size(), checksum_value);
}
if (PREDICT_FALSE(checksum_value != expected_checksum ||
MaybeTrue(FLAGS_cfile_inject_corruption))) {
return Status::Corruption(
- Substitute("Checksum does not match: $0 vs expected $1",
+ Substitute("checksum does not match: $0 vs expected $1",
checksum_value, expected_checksum));
}
return Status::OK();
@@ -361,7 +369,9 @@ class ScratchMemory {
public:
ScratchMemory() : ptr_(nullptr), size_(-1) {}
~ScratchMemory() {
- if (!ptr_) return;
+ if (!ptr_) {
+ return;
+ }
if (!from_cache_.valid()) {
delete[] ptr_;
}
@@ -372,14 +382,13 @@ class ScratchMemory {
// to allocating from the heap. In that case, IsFromCache() will
// return false.
void TryAllocateFromCache(BlockCache* cache, const BlockCache::CacheKey& key, int size) {
+ DCHECK(!from_cache_.valid());
DCHECK(!ptr_);
from_cache_ = cache->Allocate(key, size);
if (!from_cache_.valid()) {
- AllocateFromHeap(size);
- return;
- } else {
- ptr_ = from_cache_.val_ptr();
+ return AllocateFromHeap(size);
}
+ ptr_ = from_cache_.val_ptr();
size_ = size;
}
@@ -434,8 +443,13 @@ Status CFileReader::ReadBlock(const IOContext* io_context,
CacheControl cache_control,
scoped_refptr<BlockHandle>* ret) const {
DCHECK(init_once_.init_succeeded());
- CHECK(ptr.offset() > 0 && ptr.offset() + ptr.size() < file_size_)
- << Substitute("bad offset $0 in file of size $1", ptr.ToString(), file_size_);
+
+ if (PREDICT_FALSE(ptr.offset() == 0 ||
+ file_size_ <= ptr.offset() + ptr.size())) {
+ return Status::Corruption(Substitute(
+ "bad offset $0 in file of size $1", ptr.ToString(), file_size_));
+ }
+
BlockCacheHandle bc_handle;
Cache::CacheBehavior cache_behavior = cache_control == CACHE_BLOCK ?
Cache::EXPECT_IN_CACHE : Cache::NO_EXPECT_IN_CACHE;
@@ -459,8 +473,8 @@ Status CFileReader::ReadBlock(const IOContext* io_context,
TRACE_COUNTER_INCREMENT(CFILE_CACHE_MISS_BYTES_METRIC_NAME, ptr.size());
uint32_t data_size = ptr.size();
- if (has_checksums()) {
- if (PREDICT_FALSE(kChecksumSize > data_size)) {
+ if (has_checksum()) {
+ if (PREDICT_FALSE(data_size < kChecksumSize)) {
return Status::Corruption("invalid data size for block pointer",
ptr.ToString());
}
@@ -483,15 +497,14 @@ Status CFileReader::ReadBlock(const IOContext* io_context,
// Read the data and checksum if needed.
Slice results_backing[] = { block, checksum };
- bool read_checksum = has_checksums() && FLAGS_cfile_verify_checksums;
- ArrayView<Slice> results(results_backing, read_checksum ? 2 : 1);
+ ArrayView<Slice> results(results_backing, do_verify_checksum() ? 2 : 1);
RETURN_NOT_OK_PREPEND(block_->ReadV(ptr.offset(), results),
Substitute("failed to read CFile block $0 at $1",
block_id().ToString(), ptr.ToString()));
- if (has_checksums() && FLAGS_cfile_verify_checksums) {
- Status s = VerifyChecksum(ArrayView<const Slice>(&block, 1), checksum);
- if (!s.ok()) {
+ if (do_verify_checksum()) {
+ if (auto s = VerifyChecksum(ArrayView<const Slice>(&block, 1), checksum);
+ PREDICT_FALSE(!s.ok())) {
RETURN_NOT_OK_HANDLE_CORRUPTION(
s.CloneAndPrepend(Substitute("checksum error on CFile block $0 at $1",
block_id().ToString(), ptr.ToString())),
@@ -503,11 +516,10 @@ Status CFileReader::ReadBlock(const IOContext* io_context,
if (codec_ != nullptr) {
// Init the decompressor and get the size required for the uncompressed buffer.
CompressedBlockDecoder uncompressor(codec_, cfile_version_, block);
- Status s = uncompressor.Init();
- if (!s.ok()) {
- LOG(WARNING) << "Unable to validate compressed block " << block_id().ToString()
- << " at " << ptr.offset() << " of size " << block.size() << ": "
- << s.ToString();
+ if (auto s = uncompressor.Init(); PREDICT_FALSE(!s.ok())) {
+ LOG(WARNING) << Substitute(
+ "unable to validate compressed block $0 of size $1 at offset $2: $3",
+ block_id().ToString(), block.size(), ptr.offset(), s.ToString());
return s;
}
int uncompressed_size = uncompressor.uncompressed_size();
@@ -520,11 +532,11 @@ Status CFileReader::ReadBlock(const IOContext* io_context,
} else {
decompressed_scratch.AllocateFromHeap(uncompressed_size);
}
- s = uncompressor.UncompressIntoBuffer(decompressed_scratch.get());
- if (!s.ok()) {
- LOG(WARNING) << "Unable to uncompress block " << block_id().ToString()
- << " at " << ptr.offset()
- << " of size " << block.size() << ": " << s.ToString();
+ if (auto s = uncompressor.UncompressIntoBuffer(decompressed_scratch.get());
+ PREDICT_FALSE(!s.ok())) {
+ LOG(WARNING) << Substitute(
+ "unable to uncompress block $0 of size $1 at offset $2: $3",
+ block_id().ToString(), block.size(), ptr.offset(), s.ToString());
return s;
}
@@ -582,13 +594,6 @@ bool CFileReader::GetMetadataEntry(const string& key, string* val) const {
return false;
}
-void CFileReader::HandleCorruption(const fs::IOContext* io_context) const {
- DCHECK(io_context);
- LOG(ERROR) << "Encountered corrupted CFile in filesystem block: " << block_->id().ToString();
- block_->block_manager()->error_manager()->RunErrorNotificationCb(
- ErrorHandlerType::CFILE_CORRUPTION, io_context->tablet_id);
-}
-
Status CFileReader::NewIterator(unique_ptr<CFileIterator>* iter,
CacheControl cache_control,
const IOContext* io_context) {
@@ -596,6 +601,16 @@ Status CFileReader::NewIterator(unique_ptr<CFileIterator>* iter,
return Status::OK();
}
+bool CFileReader::has_checksum() const {
+ DCHECK(footer_);
+ return footer_->incompatible_features() & IncompatibleFeatures::CHECKSUM;
+}
+
+bool CFileReader::do_verify_checksum() const {
+ DCHECK(footer_);
+ return do_verify_checksum_;
+}
+
size_t CFileReader::memory_footprint() const {
size_t size = kudu_malloc_usable_size(this);
size += block_->memory_footprint();
@@ -613,6 +628,13 @@ size_t CFileReader::memory_footprint() const {
return size;
}
+void CFileReader::HandleCorruption(const fs::IOContext* io_context) const {
+ DCHECK(io_context);
+ LOG(ERROR) << "Encountered corrupted CFile in filesystem block: " << block_->id().ToString();
+ block_->block_manager()->error_manager()->RunErrorNotificationCb(
+ ErrorHandlerType::CFILE_CORRUPTION, io_context->tablet_id);
+}
+
////////////////////////////////////////////////////////////
// Default Column Value Iterator
////////////////////////////////////////////////////////////
@@ -924,12 +946,12 @@ Status DecodeNullInfo(scoped_refptr<BlockHandle>* data_block_handle,
uint32_t* num_rows_in_block,
Slice* non_null_bitmap) {
Slice data_block = (*data_block_handle)->data();
- if (!GetVarint32(&data_block, num_rows_in_block)) {
+ if (PREDICT_FALSE(!GetVarint32(&data_block, num_rows_in_block))) {
return Status::Corruption("bad null header, num elements in block");
}
uint32_t non_null_bitmap_size;
- if (!GetVarint32(&data_block, &non_null_bitmap_size)) {
+ if (PREDICT_FALSE(!GetVarint32(&data_block, &non_null_bitmap_size))) {
return Status::Corruption("bad null header, bitmap size");
}
@@ -1012,10 +1034,10 @@ Status CFileIterator::PrepareBatch(size_t* n) {
// prepared_blocks_ queue.
while (prepared_blocks_.back()->last_row_idx() < end_idx) {
Status s = seeked_->Next();
- if (PREDICT_FALSE(s.IsNotFound())) {
+ if (s.IsNotFound()) {
VLOG(1) << "Reached EOF";
break;
- } else if (!s.ok()) {
+ } else if (PREDICT_FALSE(!s.ok())) {
return s;
}
RETURN_NOT_OK(QueueCurrentDataBlock(*seeked_));
diff --git a/src/kudu/cfile/cfile_reader.h b/src/kudu/cfile/cfile_reader.h
index f0f406c58..2f05ca4b1 100644
--- a/src/kudu/cfile/cfile_reader.h
+++ b/src/kudu/cfile/cfile_reader.h
@@ -176,19 +176,15 @@ class CFileReader {
return BlockPointer(footer().validx_info().root_block());
}
- // Returns true if the file has checksums on the header, footer, and data blocks.
- bool has_checksums() const;
-
// Can be called before Init().
std::string ToString() const { return block_->id().ToString(); }
- // Handles a corruption error. Functions that may return due to a CFile
- // corruption should call this method before returning.
- void HandleCorruption(const fs::IOContext* io_context) const;
-
private:
DISALLOW_COPY_AND_ASSIGN(CFileReader);
+ static Status VerifyChecksum(ArrayView<const Slice> data,
+ const Slice& checksum);
+
CFileReader(ReaderOptions options,
uint64_t file_size,
std::unique_ptr<fs::ReadableBlock> block);
@@ -198,11 +194,21 @@ class CFileReader {
Status ReadAndParseHeader();
Status ReadAndParseFooter();
- Status VerifyChecksum(ArrayView<const Slice> data, const Slice& checksum) const;
+
+ // Return true if the file has checksum on the header, footer, and data blocks.
+ bool has_checksum() const;
+
+ // Return true if has_checksum() returns true and the checksum verification
+ // is requested.
+ bool do_verify_checksum() const;
// Returns the memory usage of the object including the object itself.
size_t memory_footprint() const;
+ // Handles a corruption error. Functions that may return due to a CFile
+ // corruption should call this method before returning.
+ void HandleCorruption(const fs::IOContext* io_context) const;
+
const std::unique_ptr<fs::ReadableBlock> block_;
const uint64_t file_size_;
@@ -214,6 +220,8 @@ class CFileReader {
const TypeInfo* type_info_;
const TypeEncodingInfo* type_encoding_info_;
+ bool do_verify_checksum_;
+
KuduOnceLambda init_once_;
ScopedTrackedConsumption mem_consumption_;
@@ -282,7 +290,7 @@ class ColumnIterator {
// Example:
// DefaultColumnValueIterator iter;
// iter.Scan(&column_block);
-class DefaultColumnValueIterator : public ColumnIterator {
+class DefaultColumnValueIterator final : public ColumnIterator {
public:
DefaultColumnValueIterator(const TypeInfo* typeinfo, const void* value)
: typeinfo_(typeinfo),
@@ -312,7 +320,7 @@ class DefaultColumnValueIterator : public ColumnIterator {
};
-class CFileIterator : public ColumnIterator {
+class CFileIterator final : public ColumnIterator {
public:
CFileIterator(CFileReader* reader,
CFileReader::CacheControl cache_control,