You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2020/08/26 21:20:07 UTC

[kudu] 01/05: KUDU-2844 (3/3): avoid copying plain/dict strings to RowBlock Arena

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 4b3e884c09f0b4e8871273a29590e8c56f3c5df7
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Thu Apr 23 23:52:57 2020 -0700

    KUDU-2844 (3/3): avoid copying plain/dict strings to RowBlock Arena
    
    This changes Dictionary and Plain binary blocks to no longer copy string
    values into the destination RowBlock's Arena. Instead, the dictionary
    block (or plain block) is attached to the RowBlockMemory's list of
    reference-counted block handles, and the cell directly refers to the
    underlying block handle.
    
    I modified full_stack-insert-scan-test to have it do some scans with
    fault tolerance and no caching. If I comment out the code path that
    reference-counts the blocks during a scan, the newly added test fails.
    
    I performance-tested this by loading a lineitem table using
    tpch_real_world:
    
    $ tpch_real_world -tpch-path-to-dbgen-dir /data/2/todd/dbgen/ \
      -tpch_num_inserters=8 -tpch_scaling_factor=100 \
      --tpch_mini_cluster_base_dir /data/1/todd/tpch-kudu-data \
    
    For the numbers reported below, I accidentally cancelled the load after
    222M rows were present, but I used the same dataset for both "before"
    and "after" so the relative comparison is still valid.
    
    I started a tserver and master with the same data directories, with
    the tserver running inside perf stat (or perf record to look at
    profile):
    
    $ kudu-master \
      -fs-wal-dir /data/1/todd/tpch-kudu-data/master-0/wal/ \
      -fs-data-dirs /data/1/todd/tpch-kudu-data/master-0/data/
    
    $ perf stat kudu-tserver \
      -fs-wal-dir /data/1/todd/tpch-kudu-data/ts-0/wal/ \
      -fs-data-dirs /data/1/todd/tpch-kudu-data/ts-0/data/
    
    I waited until the data had been fully flushed from MRS and compacted
    before running the read workloads. To test the reads I ran the following
    10 times:
    
    $ kudu perf table_scan localhost tpch_real_world  \
      --columns l_shipdate,l_shipmode,l_comment --num_threads=16
    
    The results of the first test were a bit noisy due to NUMA placement
    issues -- some runs were 30-40% faster than other runs, even on the same
    build, which made it hard to compare results, even though it was clear
    that the optimized version used fewer cycles on average. So, I ran both
    the tserver and the client using 'numactl -m 0 -N 0' to force everything
    to a single NUMA node. This made results much more consistent.
    
    Before:
    
             255870.36 msec task-clock                #    3.058 CPUs utilized
                244847      context-switches          #    0.957 K/sec
                  3322      cpu-migrations            #    0.013 K/sec
                245814      page-faults               #    0.961 K/sec
         1066864136000      cycles                    #    4.170 GHz                      (83.46%)
           84410991344      stalled-cycles-frontend   #    7.91% frontend cycles idle     (83.37%)
          340913242391      stalled-cycles-backend    #   31.95% backend cycles idle      (83.25%)
         1131564485394      instructions              #    1.06  insn per cycle
                                                      #    0.30  stalled cycles per insn  (83.34%)
          187879069908      branches                  #  734.274 M/sec                    (83.32%)
            8550168935      branch-misses             #    4.55% of all branches          (83.26%)
    
         191.262870000 seconds user
          64.765755000 seconds sys
    
    After:
    
             214131.49 msec task-clock                #    2.750 CPUs utilized
                245357      context-switches          #    0.001 M/sec
                  2734      cpu-migrations            #    0.013 K/sec
                248108      page-faults               #    0.001 M/sec
          893270854012      cycles                    #    4.172 GHz                      (83.45%)
           83805641687      stalled-cycles-frontend   #    9.38% frontend cycles idle     (83.25%)
          345166097238      stalled-cycles-backend    #   38.64% backend cycles idle      (83.29%)
          913435059189      instructions              #    1.02  insn per cycle
                                                      #    0.38  stalled cycles per insn  (83.36%)
          142198832288      branches                  #  664.072 M/sec                    (83.36%)
            4819907752      branch-misses             #    3.39% of all branches          (83.29%)
    
          77.876854360 seconds time elapsed
    
         146.195821000 seconds user
          68.113598000 seconds sys
    
    To summarize, the change gives about 1.30x reduction in CPU cycles on the
    tserver. The wall clock reported by the perf scan tool showed a 15-20%
    reduction in wall-clock. Testing just scanning a dictionary-encoded column
    shows even better results.
    
    Looking at 'perf diff -c ratio -o 0 --sort=sym' between two profiles collected
    by perf-record, we can see that the BinaryDictBlockDecoder code path is much
    cheaper, most of the memcpy calls are removed, and slightly more CPU is spent
    serializing the result (probably due to reduced locality of reference copying
    the string data to the wire).
    
        Orig % CPU     Ratio      Symbol
    -------------------------------------------------------------------------------------
        26.16%        0.437272  [.] kudu::cfile::BinaryDictBlockDecoder::CopyNextDecodeStrings(unsigned long*, kudu::ColumnDataView*)
        19.63%        1.135304  [.] kudu::SerializeRowBlock(kudu::RowBlock const&, kudu::Schema const*, kudu::faststring*, kudu::faststring*, bool)
        11.49%        1.002845  [k] copy_user_enhanced_fast_string
        10.29%        0.068955  [.] __memcpy_ssse3_back
    
    Change-Id: I93fa1f9fd401814a42dc5a1f3fd2ffb1286ac441
    Reviewed-on: http://gerrit.cloudera.org:8080/15802
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/cfile/binary_dict_block.cc                | 23 +++++++++-----
 src/kudu/cfile/binary_plain_block.cc               | 28 +++++++++++------
 src/kudu/cfile/binary_plain_block.h                |  6 +++-
 src/kudu/cfile/block_handle.h                      |  4 +--
 src/kudu/common/rowblock_memory.h                  | 36 +++++++++++++++++++++-
 .../full_stack-insert-scan-test.cc                 | 28 +++++++++++++++--
 6 files changed, 101 insertions(+), 24 deletions(-)

diff --git a/src/kudu/cfile/binary_dict_block.cc b/src/kudu/cfile/binary_dict_block.cc
index 4000e61..194915a 100644
--- a/src/kudu/cfile/binary_dict_block.cc
+++ b/src/kudu/cfile/binary_dict_block.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/cfile/binary_dict_block.h"
 
+#include <functional>
 #include <limits>
 #include <ostream>
 #include <utility>
@@ -35,6 +36,7 @@
 #include "kudu/common/columnblock.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/types.h"
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/map-util.h"
@@ -293,12 +295,13 @@ Status BinaryDictBlockDecoder::CopyNextAndEval(size_t* n,
     return CopyNextDecodeStrings(n, dst);
   }
 
+  bool retain_dict = false;
+
   // Load the rows' codeword values into a buffer for scanning.
   BShufBlockDecoder<UINT32>* d_bptr = down_cast<BShufBlockDecoder<UINT32>*>(data_decoder_.get());
   codeword_buf_.resize(*n * sizeof(uint32_t));
   d_bptr->CopyNextValuesToArray(n, codeword_buf_.data());
   Slice* out = reinterpret_cast<Slice*>(dst->data());
-  Arena* out_arena = dst->arena();
   for (size_t i = 0; i < *n; i++, out++) {
     // Check with the SelectionVectorView to see whether the data has already
     // been cleared, in which case we can skip evaluation.
@@ -307,13 +310,18 @@ Status BinaryDictBlockDecoder::CopyNextAndEval(size_t* n,
     }
     uint32_t codeword = *reinterpret_cast<uint32_t*>(&codeword_buf_[i*sizeof(uint32_t)]);
     if (BitmapTest(codewords_matching_pred->bitmap(), codeword)) {
-      // Row is included in predicate, copy data to block.
-      CHECK(out_arena->RelocateSlice(dict_decoder_->string_at_index(codeword), out));
+      // Row is included in predicate: point the cell in the block
+      // to the entry in the dictionary.
+      *out = dict_decoder_->string_at_index(codeword);
+      retain_dict = true;
     } else {
       // Mark that the row will not be returned.
       sel->ClearBit(i);
     }
   }
+  if (retain_dict) {
+    dst->memory()->RetainReference(dict_decoder_->block_handle());
+  }
   return Status::OK();
 }
 
@@ -323,22 +331,21 @@ Status BinaryDictBlockDecoder::CopyNextDecodeStrings(size_t* n, ColumnDataView*
   DCHECK_LE(*n, dst->nrows());
   DCHECK_EQ(dst->stride(), sizeof(Slice));
 
-  Arena* out_arena = dst->arena();
   Slice* out = reinterpret_cast<Slice*>(dst->data());
 
   codeword_buf_.resize((*n)*sizeof(uint32_t));
 
   // Copy the codewords into a temporary buffer first.
-  // And then Copy the strings corresponding to the codewords to the destination buffer.
   BShufBlockDecoder<UINT32>* d_bptr = down_cast<BShufBlockDecoder<UINT32>*>(data_decoder_.get());
   RETURN_NOT_OK(d_bptr->CopyNextValuesToArray(n, codeword_buf_.data()));
 
+  // Now point the cells in the destination block to the string data in the dictionary
+  // block.
   for (int i = 0; i < *n; i++) {
     uint32_t codeword = *reinterpret_cast<uint32_t*>(&codeword_buf_[i*sizeof(uint32_t)]);
-    Slice elem = dict_decoder_->string_at_index(codeword);
-    CHECK(out_arena->RelocateSlice(elem, out));
-    out++;
+    *out++ = dict_decoder_->string_at_index(codeword);
   }
+  dst->memory()->RetainReference(dict_decoder_->block_handle());
   return Status::OK();
 }
 
diff --git a/src/kudu/cfile/binary_plain_block.cc b/src/kudu/cfile/binary_plain_block.cc
index 9ab00ed..9782d24 100644
--- a/src/kudu/cfile/binary_plain_block.cc
+++ b/src/kudu/cfile/binary_plain_block.cc
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 #include <cstdint>
+#include <functional>
 #include <ostream>
 #include <vector>
 
@@ -31,6 +32,7 @@
 #include "kudu/common/columnblock.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
 #include "kudu/gutil/stringprintf.h"
@@ -39,7 +41,6 @@
 #include "kudu/util/coding-inl.h"
 #include "kudu/util/group_varint-inl.h"
 #include "kudu/util/hexdump.h"
-#include "kudu/util/memory/arena.h"
 
 using std::vector;
 
@@ -163,8 +164,8 @@ Status BinaryPlainBlockBuilder::GetLastKey(void *key_void) const {
 ////////////////////////////////////////////////////////////
 
 BinaryPlainBlockDecoder::BinaryPlainBlockDecoder(scoped_refptr<BlockHandle> block)
-    : block_handle_(std::move(block)),
-      data_(block_handle_->data()),
+    : block_(std::move(block)),
+      data_(block_->data()),
       parsed_(false),
       num_elems_(0),
       ordinal_pos_base_(0),
@@ -301,7 +302,6 @@ Status BinaryPlainBlockDecoder::HandleBatch(size_t* n, ColumnDataView* dst, Cell
   CHECK_EQ(dst->type_info()->physical_type(), BINARY);
   DCHECK_LE(*n, dst->nrows());
   DCHECK_EQ(dst->stride(), sizeof(Slice));
-  Arena *out_arena = dst->arena();
   if (PREDICT_FALSE(*n == 0 || cur_idx_ >= num_elems_)) {
     *n = 0;
     return Status::OK();
@@ -311,15 +311,16 @@ Status BinaryPlainBlockDecoder::HandleBatch(size_t* n, ColumnDataView* dst, Cell
   Slice *out = reinterpret_cast<Slice*>(dst->data());
   for (size_t i = 0; i < max_fetch; i++, out++, cur_idx_++) {
     Slice elem(string_at_index(cur_idx_));
-    c(i, elem, out, out_arena);
+    c(i, elem, out);
   }
   *n = max_fetch;
   return Status::OK();
 }
 
 Status BinaryPlainBlockDecoder::CopyNextValues(size_t* n, ColumnDataView* dst) {
-  return HandleBatch(n, dst, [&](size_t i, Slice elem, Slice* out, Arena* out_arena) {
-    CHECK(out_arena->RelocateSlice(elem, out));
+  dst->memory()->RetainReference(block_);
+  return HandleBatch(n, dst, [&](size_t /*i*/, Slice elem, Slice* out) {
+                               *out = elem;
   });
 }
 
@@ -327,16 +328,23 @@ Status BinaryPlainBlockDecoder::CopyNextAndEval(size_t* n,
                                                 ColumnMaterializationContext* ctx,
                                                 SelectionVectorView* sel,
                                                 ColumnDataView* dst) {
+  bool retain_block = false;
   ctx->SetDecoderEvalSupported();
-  return HandleBatch(n, dst, [&](size_t i, Slice elem, Slice* out, Arena* out_arena) {
+  Status s = HandleBatch(n, dst, [&](size_t i, Slice elem, Slice* out) {
     if (!sel->TestBit(i)) {
       return;
-    } else if (ctx->pred()->EvaluateCell<BINARY>(static_cast<const void*>(&elem))) {
-      CHECK(out_arena->RelocateSlice(elem, out));
+    }
+    if (ctx->pred()->EvaluateCell<BINARY>(static_cast<const void*>(&elem))) {
+      *out = elem;
+      retain_block = true;
     } else {
       sel->ClearBit(i);
     }
   });
+  if (PREDICT_TRUE(s.ok() && retain_block)) {
+    dst->memory()->RetainReference(block_);
+  }
+  return s;
 }
 
 
diff --git a/src/kudu/cfile/binary_plain_block.h b/src/kudu/cfile/binary_plain_block.h
index 953c4ae..edd4784 100644
--- a/src/kudu/cfile/binary_plain_block.h
+++ b/src/kudu/cfile/binary_plain_block.h
@@ -142,6 +142,10 @@ class BinaryPlainBlockDecoder final : public BlockDecoder {
     return Slice(&data_[str_offset], len);
   }
 
+  const scoped_refptr<BlockHandle>& block_handle() {
+    return block_;
+  }
+
   // Minimum length of a header.
   static const size_t kMinHeaderSize = sizeof(uint32_t) * 3;
 
@@ -163,7 +167,7 @@ class BinaryPlainBlockDecoder final : public BlockDecoder {
     return ret;
   }
 
-  scoped_refptr<BlockHandle> block_handle_;
+  scoped_refptr<BlockHandle> block_;
   Slice data_;
   bool parsed_;
 
diff --git a/src/kudu/cfile/block_handle.h b/src/kudu/cfile/block_handle.h
index d1bb62e..b88d844 100644
--- a/src/kudu/cfile/block_handle.h
+++ b/src/kudu/cfile/block_handle.h
@@ -24,8 +24,8 @@
 #include <boost/variant/variant.hpp>
 
 #include "kudu/cfile/block_cache.h"
-#include "kudu/gutil/ref_counted.h"
 #include "kudu/common/rowblock_memory.h"
+#include "kudu/gutil/ref_counted.h"
 
 namespace kudu {
 namespace cfile {
@@ -40,7 +40,7 @@ namespace cfile {
 // Note that the BlockHandle itself may refer to a BlockCacheHandle, which itself is
 // reference-counted. When all of the references to a BlockHandle go out of scope, it
 // results in decrementing the BlockCacheHandle's reference count.
-class BlockHandle : public RefCountedThreadSafe<BlockHandle> {
+class BlockHandle final : public RefCountedThreadSafe<BlockHandle> {
  public:
   static scoped_refptr<BlockHandle> WithOwnedData(const Slice& data) {
     return { new BlockHandle(data) };
diff --git a/src/kudu/common/rowblock_memory.h b/src/kudu/common/rowblock_memory.h
index 9117ebb..5a11b4b 100644
--- a/src/kudu/common/rowblock_memory.h
+++ b/src/kudu/common/rowblock_memory.h
@@ -16,10 +16,16 @@
 // under the License.
 #pragma once
 
+#include <functional>
+#include <vector>
+
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/util/memory/arena.h"
 
 namespace kudu {
 
+class RowBlockRefCounted;
+
 // Handles the memory allocated alongside a RowBlock for variable-length
 // cells.
 //
@@ -27,11 +33,39 @@ namespace kudu {
 // data (eg BINARY columns). In this case, the data cannot be inlined directly
 // into the columnar data arrays that are part of the RowBlock and instead need
 // to be allocated out of a separate Arena. This class wraps that Arena.
+//
+// In some cases (eg "plain" or "dictionary" encodings), the underlying blocks may contain
+// string data in a non-encoded form. In that case, instead of copying strings, we can
+// refer to the strings within those data blocks themselves, and hold a reference to
+// the underlying block. This class holds those reference counts as well.
 struct RowBlockMemory {
   Arena arena;
 
   explicit RowBlockMemory(int arena_size = 32 * 1024) : arena(arena_size) {}
-  void Reset() { arena.Reset(); }
+  ~RowBlockMemory() { Reset(); }
+
+  void Reset() {
+    arena.Reset();
+    for (auto& f : to_release_) {
+      f();
+    }
+    to_release_.clear();
+  }
+
+  // Retain a reference, typically to a BlockHandle. This is templatized to avoid
+  // a circular dependency between kudu/common/ and kudu/cfile/
+  template<class T>
+  void RetainReference(const scoped_refptr<T>& item) {
+    // TODO(todd) if this ever ends up being a hot code path, we could
+    // probably optimize by having a small hashset of pointers. If an
+    // element is already in the set, we don't need to add a second copy.
+    T* raw = item.get();
+    raw->AddRef();
+    to_release_.emplace_back([=]() { raw->Release(); });
+  }
+
+ private:
+  std::vector<std::function<void()>> to_release_;
 };
 
 }  // namespace kudu
diff --git a/src/kudu/integration-tests/full_stack-insert-scan-test.cc b/src/kudu/integration-tests/full_stack-insert-scan-test.cc
index b91949e..7d3134e 100644
--- a/src/kudu/integration-tests/full_stack-insert-scan-test.cc
+++ b/src/kudu/integration-tests/full_stack-insert-scan-test.cc
@@ -21,6 +21,7 @@
 #include <cstdint>
 #include <memory>
 #include <ostream>
+#include <set>
 #include <string>
 #include <thread>
 #include <utility>
@@ -40,6 +41,7 @@
 #include "kudu/client/write_op.h"
 #include "kudu/codegen/compilation_manager.h"
 #include "kudu/common/partial_row.h"
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/split.h"
@@ -98,6 +100,7 @@ using kudu::client::KuduTable;
 using kudu::client::KuduTableCreator;
 using kudu::cluster::InternalMiniCluster;
 using kudu::cluster::InternalMiniClusterOptions;
+using std::set;
 using std::string;
 using std::thread;
 using std::unique_ptr;
@@ -196,9 +199,17 @@ class FullStackInsertScanTest : public KuduTest {
   // Insert the rows that are associated with that ID.
   void InsertRows(CountDownLatch* start_latch, int id, uint32_t seed);
 
+  enum class ScanFlag {
+    // Disable the block cache for the scan.
+    kDontCacheBlocks,
+    // Enable fault tolerance. This triggers different iterator code paths.
+    kFaultTolerant,
+  };
+
   // Run a scan from the reader_client_ with the projection schema schema
   // and LOG_TIMING message msg.
-  void ScanProjection(const vector<string>& cols, const string& msg);
+  void ScanProjection(const vector<string>& cols, const string& msg,
+                      const set<ScanFlag>& flags = {});
 
   vector<string> AllColumnNames() const;
   vector<string> StringColumnNames() const;
@@ -325,6 +336,11 @@ void FullStackInsertScanTest::DoTestScans() {
 
   NO_FATALS(ScanProjection({}, "empty projection, 0 col"));
   NO_FATALS(ScanProjection({ "key" }, "key scan, 1 col"));
+  NO_FATALS(ScanProjection(AllColumnNames(), "full schema scan, no cache, 10 col",
+                           { ScanFlag::kDontCacheBlocks }));
+  NO_FATALS(ScanProjection(AllColumnNames(),
+                           "fault-tolerant full schema scan, no cache, 10 col",
+                           { ScanFlag::kDontCacheBlocks, ScanFlag::kFaultTolerant }));
   NO_FATALS(ScanProjection(AllColumnNames(), "full schema scan, 10 col"));
   NO_FATALS(ScanProjection(StringColumnNames(), "String projection, 1 col"));
   NO_FATALS(ScanProjection(Int32ColumnNames(), "Int32 projection, 4 col"));
@@ -394,8 +410,10 @@ void FullStackInsertScanTest::InsertRows(CountDownLatch* start_latch, int id,
   FlushSessionOrDie(session);
 }
 
+
 void FullStackInsertScanTest::ScanProjection(const vector<string>& cols,
-                                             const string& msg) {
+                                             const string& msg,
+                                             const set<ScanFlag>& flags) {
   {
     // Warmup codegen cache
     KuduScanner scanner(reader_table_.get());
@@ -404,6 +422,12 @@ void FullStackInsertScanTest::ScanProjection(const vector<string>& cols,
     codegen::CompilationManager::GetSingleton()->Wait();
   }
   KuduScanner scanner(reader_table_.get());
+  if (ContainsKey(flags, ScanFlag::kDontCacheBlocks)) {
+    CHECK_OK(scanner.SetCacheBlocks(false));
+  }
+  if (ContainsKey(flags, ScanFlag::kFaultTolerant)) {
+    CHECK_OK(scanner.SetFaultTolerant());
+  }
   ASSERT_OK(scanner.SetProjectedColumnNames(cols));
   uint64_t nrows = 0;
   LOG_TIMING(INFO, msg) {