You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/12/02 08:25:12 UTC

[kudu] 02/02: [tablet] minor optimization on DeltaFileIterator

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit f413f09b295e825a75371ac928c6a28323f6d45c
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Wed Nov 23 00:51:18 2022 -0800

    [tablet] minor optimization on DeltaFileIterator
    
    This patch reduces the number of objects allocated on the heap
    by DeltaFileIterator<Type>::ReadCurrentBlockOntoQueue():
    PreparedDeltaBlock and its decoder are no longer allocated on the heap.
    It helps to reduce contention in tcmalloc for high intensity workloads.
    
    Also, switched from std::vector to std::deque for
    DeltaPreparer::prepared_deltas_ to accommodate existing patterns
    of adding prepared deltas: the number of deltas might be huge,
    but the exact number isn't known in advance at the upper level.
    That helps to avoid reallocating huge chunks of memory when there
    are many deltas to process.
    
    I tested this patch against a scenario having a few rowsets with very
    high number of deltas (about 10M), looking at how CompactRowSetsOp()
    performed as reported upon completion of the compaction operation:
      Before:
        Timing: real 131.803s     user 83.797s    sys 41.467s
      After:
        Timing: real 121.764s     user 85.979s    sys 32.401s
    
    I also monitored the total amount of memory allocated by capturing
    tcmalloc's stats at the embedded webserver's /memz page every second.
    The new code allocated a bit less memory at the peak (~1.3%):
      Before:
        MALLOC:    28882367400 (27544.4 MiB) Bytes in use by application
      After:
        MALLOC:    28492412984 (27172.5 MiB) Bytes in use by application
    
    Change-Id: Ia5edd08ab060074d123d1d05ec4b656be3bfc3c8
    Reviewed-on: http://gerrit.cloudera.org:8080/19277
    Tested-by: Alexey Serbin <al...@apache.org>
    Reviewed-by: Yifan Zhang <ch...@163.com>
---
 src/kudu/cfile/binary_plain_block.h |  15 +++++
 src/kudu/tablet/delta_store.h       |   2 +-
 src/kudu/tablet/delta_tracker.cc    |  19 +++---
 src/kudu/tablet/deltafile.cc        | 120 ++++++++++++------------------------
 src/kudu/tablet/deltafile.h         |  50 +++++++++++++--
 5 files changed, 114 insertions(+), 92 deletions(-)

diff --git a/src/kudu/cfile/binary_plain_block.h b/src/kudu/cfile/binary_plain_block.h
index 25cfc8417..82ad6b091 100644
--- a/src/kudu/cfile/binary_plain_block.h
+++ b/src/kudu/cfile/binary_plain_block.h
@@ -105,8 +105,23 @@ class BinaryPlainBlockBuilder final : public BlockBuilder {
 class BinaryPlainBlockDecoder final : public BlockDecoder {
  public:
   explicit BinaryPlainBlockDecoder(scoped_refptr<BlockHandle> block);
+  BinaryPlainBlockDecoder(BinaryPlainBlockDecoder&& other) noexcept {
+    *this = std::move(other);
+  }
   virtual ~BinaryPlainBlockDecoder();
 
+  BinaryPlainBlockDecoder& operator=(BinaryPlainBlockDecoder&& other) noexcept {
+    block_ = std::move(other.block_);
+    data_ = other.data_;
+    parsed_ = other.parsed_;
+    offsets_buf_ = std::move(other.offsets_buf_);
+    num_elems_ = other.num_elems_;
+    ordinal_pos_base_ = other.ordinal_pos_base_;
+    cur_idx_ = other.cur_idx_;
+
+    return *this;
+  }
+
   virtual Status ParseHeader() OVERRIDE;
   virtual void SeekToPositionInBlock(uint pos) OVERRIDE;
   virtual Status SeekAtOrAfterValue(const void *value,
diff --git a/src/kudu/tablet/delta_store.h b/src/kudu/tablet/delta_store.h
index 3b30ea0db..18f72b46b 100644
--- a/src/kudu/tablet/delta_store.h
+++ b/src/kudu/tablet/delta_store.h
@@ -546,7 +546,7 @@ class DeltaPreparer : public PreparedDeltas {
     DeltaKey key;
     Slice val;
   };
-  std::vector<PreparedDelta> prepared_deltas_;
+  std::deque<PreparedDelta> prepared_deltas_;
 
   // State when prepared_for_ & PREPARED_FOR_SELECT
   // ------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 5a1e86bef..71310ff35 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -105,7 +105,7 @@ DeltaTracker::DeltaTracker(shared_ptr<RowSetMetadata> rowset_metadata,
 
 Status DeltaTracker::OpenDeltaReaders(vector<DeltaBlockIdAndStats> blocks,
                                       const IOContext* io_context,
-                                      vector<shared_ptr<DeltaStore> >* stores,
+                                      vector<shared_ptr<DeltaStore>>* stores,
                                       DeltaType type) {
   FsManager* fs = rowset_metadata_->fs_manager();
   for (auto& block_and_stats : blocks) {
@@ -138,7 +138,7 @@ Status DeltaTracker::OpenDeltaReaders(vector<DeltaBlockIdAndStats> blocks,
 
     VLOG_WITH_PREFIX(1) << "Successfully opened " << DeltaType_Name(type)
                         << " delta file " << block_id.ToString();
-    stores->push_back(dfr);
+    stores->emplace_back(std::move(dfr));
   }
   return Status::OK();
 }
@@ -150,17 +150,22 @@ Status DeltaTracker::DoOpen(const IOContext* io_context) {
   CHECK(undo_delta_stores_.empty()) << "should call before opening any readers";
   CHECK(!open_);
 
+  auto rdb = rowset_metadata_->redo_delta_blocks();
   vector<DeltaBlockIdAndStats> redos;
-  for (auto block_id : rowset_metadata_->redo_delta_blocks()) {
-    redos.emplace_back(std::make_pair(block_id, nullptr));
+  redos.reserve(rdb.size());
+  for (const auto& block_id : rdb) {
+    redos.emplace_back(block_id, nullptr);
   }
   RETURN_NOT_OK(OpenDeltaReaders(std::move(redos),
                                  io_context,
                                  &redo_delta_stores_,
                                  REDO));
+
+  auto udb = rowset_metadata_->undo_delta_blocks();
   vector<DeltaBlockIdAndStats> undos;
-  for (auto block_id : rowset_metadata_->undo_delta_blocks()) {
-    undos.emplace_back(std::make_pair(block_id, nullptr));
+  undos.reserve(udb.size());
+  for (const auto& block_id : udb) {
+    undos.emplace_back(block_id, nullptr);
   }
   RETURN_NOT_OK(OpenDeltaReaders(std::move(undos),
                                  io_context,
@@ -457,7 +462,7 @@ Status DeltaTracker::CompactStores(const IOContext* io_context, int start_idx, i
                                     BlockId::JoinStrings(compacted_blocks),
                                     new_block_id.ToString());
   vector<DeltaBlockIdAndStats> new_block_and_stats;
-  new_block_and_stats.emplace_back(std::make_pair(new_block_id, std::move(stats)));
+  new_block_and_stats.emplace_back(new_block_id, std::move(stats));
   RETURN_NOT_OK_PREPEND(CommitDeltaStoreMetadataUpdate(update, compacted_stores,
                                                        std::move(new_block_and_stats),
                                                        io_context, REDO, FLUSH_METADATA),
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index cdc87f86f..6e3ef38a5 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -27,7 +27,6 @@
 
 #include "kudu/cfile/binary_plain_block.h"
 #include "kudu/cfile/block_handle.h"
-#include "kudu/cfile/block_pointer.h"
 #include "kudu/cfile/cfile_reader.h"
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/cfile/cfile_writer.h"
@@ -44,7 +43,6 @@
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
-#include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/delta_relevancy.h"
 #include "kudu/tablet/mvcc.h"
@@ -65,6 +63,17 @@ DEFINE_string(deltafile_default_compression_codec, "lz4",
               "The compression codec used when writing deltafiles.");
 TAG_FLAG(deltafile_default_compression_codec, experimental);
 
+
+using kudu::cfile::BinaryPlainBlockDecoder;
+using kudu::cfile::BlockPointer;
+using kudu::cfile::CFileReader;
+using kudu::cfile::IndexTreeIterator;
+using kudu::cfile::ReaderOptions;
+using kudu::fs::BlockCreationTransaction;
+using kudu::fs::BlockManager;
+using kudu::fs::IOContext;
+using kudu::fs::ReadableBlock;
+using kudu::fs::WritableBlock;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
@@ -75,24 +84,13 @@ namespace kudu {
 
 class MemTracker;
 
-using cfile::BinaryPlainBlockDecoder;
-using cfile::BlockPointer;
-using cfile::CFileReader;
-using cfile::IndexTreeIterator;
-using cfile::ReaderOptions;
-using fs::BlockCreationTransaction;
-using fs::BlockManager;
-using fs::IOContext;
-using fs::ReadableBlock;
-using fs::WritableBlock;
-
 namespace tablet {
 
-const char * const DeltaFileReader::kDeltaStatsEntryName = "deltafilestats";
+const char* const DeltaFileReader::kDeltaStatsEntryName = "deltafilestats";
 
 DeltaFileWriter::DeltaFileWriter(unique_ptr<WritableBlock> block)
 #ifndef NDEBUG
- : has_appended_(false)
+   : has_appended_(false)
 #endif
 { // NOLINT(*)
   cfile::WriterOptions opts;
@@ -488,71 +486,33 @@ Status DeltaFileIterator<Type>::SeekToOrdinal(rowid_t idx) {
   return Status::OK();
 }
 
-struct PreparedDeltaBlock {
-  // The pointer from which this block was read. This is only used for
-  // logging, etc.
-  cfile::BlockPointer block_ptr_;
-
-  // Handle to the block, so it doesn't get freed from underneath us.
-  scoped_refptr<cfile::BlockHandle> block_;
-
-  // The block decoder, to avoid having to re-parse the block header
-  // on every ApplyUpdates() call
-  std::unique_ptr<cfile::BinaryPlainBlockDecoder> decoder_;
-
-  // The first row index for which there is an update in this delta block.
-  rowid_t first_updated_idx_;
-
-  // The last row index for which there is an update in this delta block.
-  rowid_t last_updated_idx_;
-
-  // Within this block, the index of the update which is the first one that
-  // needs to be consulted. This allows deltas to be skipped at the beginning
-  // of the block when the row block starts towards the end of the delta block.
-  // For example:
-  // <-- delta block ---->
-  //                   <--- prepared row block --->
-  // Here, we can skip a bunch of deltas at the beginning of the delta block
-  // which we know don't apply to the prepared row block.
-  rowid_t prepared_block_start_idx_;
-
-  // Return a string description of this prepared block, for logging.
-  std::string ToString() const {
-    return StringPrintf("%d-%d (%s)", first_updated_idx_, last_updated_idx_,
-                        block_ptr_.ToString().c_str());
-  }
-};
-
-
 template<DeltaType Type>
 Status DeltaFileIterator<Type>::ReadCurrentBlockOntoQueue() {
   DCHECK(initted_) << "Must call Init()";
   DCHECK(index_iter_) << "Must call SeekToOrdinal()";
 
-  unique_ptr<PreparedDeltaBlock> pdb(new PreparedDeltaBlock());
-  BlockPointer dblk_ptr = index_iter_->GetCurrentBlockPointer();
+  const BlockPointer& dblk_ptr = index_iter_->GetCurrentBlockPointer();
   shared_ptr<CFileReader> reader = dfr_->cfile_reader();
+  scoped_refptr<cfile::BlockHandle> block;
   RETURN_NOT_OK(reader->ReadBlock(preparer_.opts().io_context,
-                                  dblk_ptr, cache_blocks_, &pdb->block_));
+                                  dblk_ptr, cache_blocks_, &block));
 
   // The data has been successfully read. Finish creating the decoder.
-  pdb->prepared_block_start_idx_ = 0;
-  pdb->block_ptr_ = dblk_ptr;
+  PreparedDeltaBlock pdb(dblk_ptr, std::move(block),  0);
 
   // Decode the block.
-  pdb->decoder_.reset(new BinaryPlainBlockDecoder(pdb->block_));
-  RETURN_NOT_OK_PREPEND(pdb->decoder_->ParseHeader(),
+  RETURN_NOT_OK_PREPEND(pdb.decoder.ParseHeader(),
                         Substitute("unable to decode data block header in delta block $0 ($1)",
                                    dfr_->cfile_reader()->block_id().ToString(),
-                                   dblk_ptr.ToString()));
+                                   pdb.block_ptr.ToString()));
 
-  RETURN_NOT_OK(GetFirstRowIndexInCurrentBlock(&pdb->first_updated_idx_));
-  RETURN_NOT_OK(GetLastRowIndexInDecodedBlock(*pdb->decoder_, &pdb->last_updated_idx_));
+  RETURN_NOT_OK(GetFirstRowIndexInCurrentBlock(&pdb.first_updated_idx));
+  RETURN_NOT_OK(GetLastRowIndexInDecodedBlock(pdb.decoder, &pdb.last_updated_idx));
 
   #ifndef NDEBUG
   VLOG(2) << "Read delta block which updates " <<
-    pdb->first_updated_idx_ << " through " <<
-    pdb->last_updated_idx_;
+    pdb.first_updated_idx << " through " <<
+    pdb.last_updated_idx;
   #endif
 
   delta_blocks_.emplace_back(std::move(pdb));
@@ -571,8 +531,9 @@ Status DeltaFileIterator<Type>::GetFirstRowIndexInCurrentBlock(rowid_t *idx) {
 }
 
 template<DeltaType Type>
-Status DeltaFileIterator<Type>::GetLastRowIndexInDecodedBlock(const BinaryPlainBlockDecoder &dec,
-                                                              rowid_t *idx) {
+Status DeltaFileIterator<Type>::GetLastRowIndexInDecodedBlock(
+    const BinaryPlainBlockDecoder& dec,
+    rowid_t* idx) {
   DCHECK_GT(dec.Count(), 0);
   Slice s(dec.string_at_index(dec.Count() - 1));
   DeltaKey k;
@@ -595,7 +556,7 @@ Status DeltaFileIterator<Type>::PrepareBatch(size_t nrows, int prepare_flags) {
   // Remove blocks from our list which are no longer relevant to the range
   // being prepared.
   while (!delta_blocks_.empty() &&
-         delta_blocks_.front()->last_updated_idx_ < start_row) {
+         delta_blocks_.front().last_updated_idx < start_row) {
     delta_blocks_.pop_front();
   }
 
@@ -619,17 +580,18 @@ Status DeltaFileIterator<Type>::PrepareBatch(size_t nrows, int prepare_flags) {
   }
 
   if (!delta_blocks_.empty()) {
-    PreparedDeltaBlock& block = *delta_blocks_.front();
-    int i = 0;
-    for (i = block.prepared_block_start_idx_;
-         i < block.decoder_->Count();
-         i++) {
-      Slice s(block.decoder_->string_at_index(i));
+    PreparedDeltaBlock& block = delta_blocks_.front();
+    rowid_t idx = block.prepared_block_start_idx;
+    const size_t end_idx = block.decoder.Count();
+    for (; idx < end_idx; ++idx) {
+      Slice s(block.decoder.string_at_index(idx));
       DeltaKey key;
       RETURN_NOT_OK(key.DecodeFrom(&s));
-      if (key.row_idx() >= start_row) break;
+      if (key.row_idx() >= start_row) {
+        break;
+      }
     }
-    block.prepared_block_start_idx_ = i;
+    block.prepared_block_start_idx = idx;
   }
 
   #ifndef NDEBUG
@@ -649,11 +611,11 @@ Status DeltaFileIterator<Type>::AddDeltas(rowid_t start_row, rowid_t stop_row) {
   DCHECK(prepared_) << "must Prepare";
 
   for (auto& block : delta_blocks_) {
-    const BinaryPlainBlockDecoder& bpd = *block->decoder_;
-    DVLOG(2) << "Adding deltas from delta block " << block->first_updated_idx_ << "-"
-             << block->last_updated_idx_ << " for row block starting at " << start_row;
+    const BinaryPlainBlockDecoder& bpd = block.decoder;
+    DVLOG(2) << "Adding deltas from delta block " << block.first_updated_idx << "-"
+             << block.last_updated_idx << " for row block starting at " << start_row;
 
-    if (PREDICT_FALSE(start_row > block->last_updated_idx_)) {
+    if (PREDICT_FALSE(start_row > block.last_updated_idx)) {
       // The block to be updated completely falls after this delta block:
       //  <-- delta block -->      <-- delta block -->
       //                      <-- block to update     -->
@@ -664,7 +626,7 @@ Status DeltaFileIterator<Type>::AddDeltas(rowid_t start_row, rowid_t stop_row) {
     }
 
     bool finished_row = false;
-    for (int i = block->prepared_block_start_idx_; i < bpd.Count(); i++) {
+    for (int i = block.prepared_block_start_idx; i < bpd.Count(); i++) {
       Slice slice = bpd.string_at_index(i);
 
       // Decode and check the ID of the row we're going to update.
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index 57156557a..ef97fe591 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -23,15 +23,20 @@
 #include <mutex>
 #include <optional>
 #include <string>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
 #include <glog/logging.h>
 
+#include "kudu/cfile/binary_plain_block.h"
+#include "kudu/cfile/block_handle.h"
+#include "kudu/cfile/block_pointer.h"
 #include "kudu/cfile/cfile_reader.h"
 #include "kudu/cfile/cfile_writer.h"
 #include "kudu/common/rowid.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/delta_key.h"
 #include "kudu/tablet/delta_stats.h"
@@ -55,7 +60,6 @@ class SelectionVector;
 struct ColumnId;
 
 namespace cfile {
-class BinaryPlainBlockDecoder;
 class IndexTreeIterator;
 struct ReaderOptions;
 } // namespace cfile
@@ -234,8 +238,6 @@ class DeltaFileReader : public DeltaStore,
   KuduOnceLambda init_once_;
 };
 
-struct PreparedDeltaBlock;
-
 // Iterator over the deltas contained in a delta file.
 //
 // See DeltaIterator for details.
@@ -282,7 +284,43 @@ class DeltaFileIterator : public DeltaIterator {
  private:
   friend class DeltaFileReader;
 
-  DISALLOW_COPY_AND_ASSIGN(DeltaFileIterator);
+  struct PreparedDeltaBlock {
+    PreparedDeltaBlock(const cfile::BlockPointer& block_ptr_in,
+                       scoped_refptr<cfile::BlockHandle> block_in,
+                       rowid_t start_idx_in)
+        : block_ptr(block_ptr_in),
+          block(std::move(block_in)),
+          decoder(block),
+          prepared_block_start_idx(start_idx_in) {
+    }
+
+    // The pointer from which this block was read. This is only used for
+    // logging, etc.
+    cfile::BlockPointer block_ptr;
+
+    // Handle to the block, so it doesn't get freed from underneath us.
+    scoped_refptr<cfile::BlockHandle> block;
+
+    // The block decoder, to avoid having to re-parse the block header
+    // on every ApplyUpdates() call
+    cfile::BinaryPlainBlockDecoder decoder;
+
+    // The first row index for which there is an update in this delta block.
+    rowid_t first_updated_idx;
+
+    // The last row index for which there is an update in this delta block.
+    rowid_t last_updated_idx;
+
+    // Within this block, the index of the update which is the first one that
+    // needs to be consulted. This allows deltas to be skipped at the beginning
+    // of the block when the row block starts towards the end of the delta block.
+    // For example:
+    // <-- delta block ---->
+    //                   <--- prepared row block --->
+    // Here, we can skip a bunch of deltas at the beginning of the delta block
+    // which we know don't apply to the prepared row block.
+    rowid_t prepared_block_start_idx;
+  };
 
   // The pointers in 'opts' and 'dfr' must remain valid for the lifetime of the iterator.
   DeltaFileIterator(std::shared_ptr<DeltaFileReader> dfr,
@@ -318,12 +356,14 @@ class DeltaFileIterator : public DeltaIterator {
 
   // After PrepareBatch(), the set of delta blocks in the delta file
   // which correspond to prepared_block_.
-  std::deque<std::unique_ptr<PreparedDeltaBlock>> delta_blocks_;
+  std::deque<PreparedDeltaBlock> delta_blocks_;
 
   // Temporary buffer used in seeking.
   faststring tmp_buf_;
 
   cfile::CFileReader::CacheControl cache_blocks_;
+
+  DISALLOW_COPY_AND_ASSIGN(DeltaFileIterator);
 };