You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2020/08/26 21:20:07 UTC
[kudu] 01/05: KUDU-2844 (3/3): avoid copying plain/dict strings to
RowBlock Arena
This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 4b3e884c09f0b4e8871273a29590e8c56f3c5df7
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Thu Apr 23 23:52:57 2020 -0700
KUDU-2844 (3/3): avoid copying plain/dict strings to RowBlock Arena
This changes Dictionary and Plain binary blocks to no longer copy string
values into the destination RowBlock's Arena. Instead, the dictionary
block (or plain block) is attached to the RowBlockMemory's list of
reference-counted block handles, and the cell directly refers to the
underlying block handle.
I modified full_stack-insert-scan-test to have it do some scans with
fault tolerance and no caching. If I comment out the code path that
reference-counts the blocks during a scan, the newly added test fails.
I performance-tested this by loading a lineitem table using
tpch_real_world:
$ tpch_real_world -tpch-path-to-dbgen-dir /data/2/todd/dbgen/ \
-tpch_num_inserters=8 -tpch_scaling_factor=100 \
--tpch_mini_cluster_base_dir /data/1/todd/tpch-kudu-data \
For the numbers reported below, I accidentally cancelled the load after
222M rows were present, but I used the same dataset for both "before"
and "after" so the relative comparison is still valid.
I started a tserver and master with the same data directories, with
the tserver running inside perf stat (or perf record to look at
profile):
$ kudu-master \
-fs-wal-dir /data/1/todd/tpch-kudu-data/master-0/wal/ \
-fs-data-dirs /data/1/todd/tpch-kudu-data/master-0/data/
$ perf stat kudu-tserver \
-fs-wal-dir /data/1/todd/tpch-kudu-data/ts-0/wal/ \
-fs-data-dirs /data/1/todd/tpch-kudu-data/ts-0/data/
I waited until the data had been fully flushed from MRS and compacted
before running the read workloads. To test the reads I ran the following
10 times:
$ kudu perf table_scan localhost tpch_real_world \
--columns l_shipdate,l_shipmode,l_comment --num_threads=16
The results of the first test were a bit noisy due to NUMA placement
issues -- some runs were 30-40% faster than other runs, even on the same
build, which made it hard to compare results, even though it was clear
that the optimized version used fewer cycles on average. So, I ran both
the tserver and the client using 'numactl -m 0 -N 0' to force everything
to a single NUMA node. This made results much more consistent.
Before:
255870.36 msec task-clock # 3.058 CPUs utilized
244847 context-switches # 0.957 K/sec
3322 cpu-migrations # 0.013 K/sec
245814 page-faults # 0.961 K/sec
1066864136000 cycles # 4.170 GHz (83.46%)
84410991344 stalled-cycles-frontend # 7.91% frontend cycles idle (83.37%)
340913242391 stalled-cycles-backend # 31.95% backend cycles idle (83.25%)
1131564485394 instructions # 1.06 insn per cycle
# 0.30 stalled cycles per insn (83.34%)
187879069908 branches # 734.274 M/sec (83.32%)
8550168935 branch-misses # 4.55% of all branches (83.26%)
191.262870000 seconds user
64.765755000 seconds sys
After:
214131.49 msec task-clock # 2.750 CPUs utilized
245357 context-switches # 0.001 M/sec
2734 cpu-migrations # 0.013 K/sec
248108 page-faults # 0.001 M/sec
893270854012 cycles # 4.172 GHz (83.45%)
83805641687 stalled-cycles-frontend # 9.38% frontend cycles idle (83.25%)
345166097238 stalled-cycles-backend # 38.64% backend cycles idle (83.29%)
913435059189 instructions # 1.02 insn per cycle
# 0.38 stalled cycles per insn (83.36%)
142198832288 branches # 664.072 M/sec (83.36%)
4819907752 branch-misses # 3.39% of all branches (83.29%)
77.876854360 seconds time elapsed
146.195821000 seconds user
68.113598000 seconds sys
To summarize, the change gives about 1.30x reduction in CPU cycles on the
tserver. The wall clock reported by the perf scan tool showed a 15-20%
reduction in wall-clock. Testing just scanning a dictionary-encoded column
shows even better results.
Looking at 'perf diff -c ratio -o 0 --sort=sym' between two profiles collected
by perf-record, we can see that the BinaryDictBlockDecoder code path is much
cheaper, most of the memcpy calls are removed, and slightly more CPU is spent
serializing the result (probably due to reduced locality of reference copying
the string data to the wire).
Orig % CPU Ratio Symbol
-------------------------------------------------------------------------------------
26.16% 0.437272 [.] kudu::cfile::BinaryDictBlockDecoder::CopyNextDecodeStrings(unsigned long*, kudu::ColumnDataView*)
19.63% 1.135304 [.] kudu::SerializeRowBlock(kudu::RowBlock const&, kudu::Schema const*, kudu::faststring*, kudu::faststring*, bool)
11.49% 1.002845 [k] copy_user_enhanced_fast_string
10.29% 0.068955 [.] __memcpy_ssse3_back
Change-Id: I93fa1f9fd401814a42dc5a1f3fd2ffb1286ac441
Reviewed-on: http://gerrit.cloudera.org:8080/15802
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/cfile/binary_dict_block.cc | 23 +++++++++-----
src/kudu/cfile/binary_plain_block.cc | 28 +++++++++++------
src/kudu/cfile/binary_plain_block.h | 6 +++-
src/kudu/cfile/block_handle.h | 4 +--
src/kudu/common/rowblock_memory.h | 36 +++++++++++++++++++++-
.../full_stack-insert-scan-test.cc | 28 +++++++++++++++--
6 files changed, 101 insertions(+), 24 deletions(-)
diff --git a/src/kudu/cfile/binary_dict_block.cc b/src/kudu/cfile/binary_dict_block.cc
index 4000e61..194915a 100644
--- a/src/kudu/cfile/binary_dict_block.cc
+++ b/src/kudu/cfile/binary_dict_block.cc
@@ -17,6 +17,7 @@
#include "kudu/cfile/binary_dict_block.h"
+#include <functional>
#include <limits>
#include <ostream>
#include <utility>
@@ -35,6 +36,7 @@
#include "kudu/common/columnblock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/types.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/map-util.h"
@@ -293,12 +295,13 @@ Status BinaryDictBlockDecoder::CopyNextAndEval(size_t* n,
return CopyNextDecodeStrings(n, dst);
}
+ bool retain_dict = false;
+
// Load the rows' codeword values into a buffer for scanning.
BShufBlockDecoder<UINT32>* d_bptr = down_cast<BShufBlockDecoder<UINT32>*>(data_decoder_.get());
codeword_buf_.resize(*n * sizeof(uint32_t));
d_bptr->CopyNextValuesToArray(n, codeword_buf_.data());
Slice* out = reinterpret_cast<Slice*>(dst->data());
- Arena* out_arena = dst->arena();
for (size_t i = 0; i < *n; i++, out++) {
// Check with the SelectionVectorView to see whether the data has already
// been cleared, in which case we can skip evaluation.
@@ -307,13 +310,18 @@ Status BinaryDictBlockDecoder::CopyNextAndEval(size_t* n,
}
uint32_t codeword = *reinterpret_cast<uint32_t*>(&codeword_buf_[i*sizeof(uint32_t)]);
if (BitmapTest(codewords_matching_pred->bitmap(), codeword)) {
- // Row is included in predicate, copy data to block.
- CHECK(out_arena->RelocateSlice(dict_decoder_->string_at_index(codeword), out));
+ // Row is included in predicate: point the cell in the block
+ // to the entry in the dictionary.
+ *out = dict_decoder_->string_at_index(codeword);
+ retain_dict = true;
} else {
// Mark that the row will not be returned.
sel->ClearBit(i);
}
}
+ if (retain_dict) {
+ dst->memory()->RetainReference(dict_decoder_->block_handle());
+ }
return Status::OK();
}
@@ -323,22 +331,21 @@ Status BinaryDictBlockDecoder::CopyNextDecodeStrings(size_t* n, ColumnDataView*
DCHECK_LE(*n, dst->nrows());
DCHECK_EQ(dst->stride(), sizeof(Slice));
- Arena* out_arena = dst->arena();
Slice* out = reinterpret_cast<Slice*>(dst->data());
codeword_buf_.resize((*n)*sizeof(uint32_t));
// Copy the codewords into a temporary buffer first.
- // And then Copy the strings corresponding to the codewords to the destination buffer.
BShufBlockDecoder<UINT32>* d_bptr = down_cast<BShufBlockDecoder<UINT32>*>(data_decoder_.get());
RETURN_NOT_OK(d_bptr->CopyNextValuesToArray(n, codeword_buf_.data()));
+ // Now point the cells in the destination block to the string data in the dictionary
+ // block.
for (int i = 0; i < *n; i++) {
uint32_t codeword = *reinterpret_cast<uint32_t*>(&codeword_buf_[i*sizeof(uint32_t)]);
- Slice elem = dict_decoder_->string_at_index(codeword);
- CHECK(out_arena->RelocateSlice(elem, out));
- out++;
+ *out++ = dict_decoder_->string_at_index(codeword);
}
+ dst->memory()->RetainReference(dict_decoder_->block_handle());
return Status::OK();
}
diff --git a/src/kudu/cfile/binary_plain_block.cc b/src/kudu/cfile/binary_plain_block.cc
index 9ab00ed..9782d24 100644
--- a/src/kudu/cfile/binary_plain_block.cc
+++ b/src/kudu/cfile/binary_plain_block.cc
@@ -19,6 +19,7 @@
#include <algorithm>
#include <cstdint>
+#include <functional>
#include <ostream>
#include <vector>
@@ -31,6 +32,7 @@
#include "kudu/common/columnblock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
#include "kudu/gutil/stringprintf.h"
@@ -39,7 +41,6 @@
#include "kudu/util/coding-inl.h"
#include "kudu/util/group_varint-inl.h"
#include "kudu/util/hexdump.h"
-#include "kudu/util/memory/arena.h"
using std::vector;
@@ -163,8 +164,8 @@ Status BinaryPlainBlockBuilder::GetLastKey(void *key_void) const {
////////////////////////////////////////////////////////////
BinaryPlainBlockDecoder::BinaryPlainBlockDecoder(scoped_refptr<BlockHandle> block)
- : block_handle_(std::move(block)),
- data_(block_handle_->data()),
+ : block_(std::move(block)),
+ data_(block_->data()),
parsed_(false),
num_elems_(0),
ordinal_pos_base_(0),
@@ -301,7 +302,6 @@ Status BinaryPlainBlockDecoder::HandleBatch(size_t* n, ColumnDataView* dst, Cell
CHECK_EQ(dst->type_info()->physical_type(), BINARY);
DCHECK_LE(*n, dst->nrows());
DCHECK_EQ(dst->stride(), sizeof(Slice));
- Arena *out_arena = dst->arena();
if (PREDICT_FALSE(*n == 0 || cur_idx_ >= num_elems_)) {
*n = 0;
return Status::OK();
@@ -311,15 +311,16 @@ Status BinaryPlainBlockDecoder::HandleBatch(size_t* n, ColumnDataView* dst, Cell
Slice *out = reinterpret_cast<Slice*>(dst->data());
for (size_t i = 0; i < max_fetch; i++, out++, cur_idx_++) {
Slice elem(string_at_index(cur_idx_));
- c(i, elem, out, out_arena);
+ c(i, elem, out);
}
*n = max_fetch;
return Status::OK();
}
Status BinaryPlainBlockDecoder::CopyNextValues(size_t* n, ColumnDataView* dst) {
- return HandleBatch(n, dst, [&](size_t i, Slice elem, Slice* out, Arena* out_arena) {
- CHECK(out_arena->RelocateSlice(elem, out));
+ dst->memory()->RetainReference(block_);
+ return HandleBatch(n, dst, [&](size_t /*i*/, Slice elem, Slice* out) {
+ *out = elem;
});
}
@@ -327,16 +328,23 @@ Status BinaryPlainBlockDecoder::CopyNextAndEval(size_t* n,
ColumnMaterializationContext* ctx,
SelectionVectorView* sel,
ColumnDataView* dst) {
+ bool retain_block = false;
ctx->SetDecoderEvalSupported();
- return HandleBatch(n, dst, [&](size_t i, Slice elem, Slice* out, Arena* out_arena) {
+ Status s = HandleBatch(n, dst, [&](size_t i, Slice elem, Slice* out) {
if (!sel->TestBit(i)) {
return;
- } else if (ctx->pred()->EvaluateCell<BINARY>(static_cast<const void*>(&elem))) {
- CHECK(out_arena->RelocateSlice(elem, out));
+ }
+ if (ctx->pred()->EvaluateCell<BINARY>(static_cast<const void*>(&elem))) {
+ *out = elem;
+ retain_block = true;
} else {
sel->ClearBit(i);
}
});
+ if (PREDICT_TRUE(s.ok() && retain_block)) {
+ dst->memory()->RetainReference(block_);
+ }
+ return s;
}
diff --git a/src/kudu/cfile/binary_plain_block.h b/src/kudu/cfile/binary_plain_block.h
index 953c4ae..edd4784 100644
--- a/src/kudu/cfile/binary_plain_block.h
+++ b/src/kudu/cfile/binary_plain_block.h
@@ -142,6 +142,10 @@ class BinaryPlainBlockDecoder final : public BlockDecoder {
return Slice(&data_[str_offset], len);
}
+ const scoped_refptr<BlockHandle>& block_handle() {
+ return block_;
+ }
+
// Minimum length of a header.
static const size_t kMinHeaderSize = sizeof(uint32_t) * 3;
@@ -163,7 +167,7 @@ class BinaryPlainBlockDecoder final : public BlockDecoder {
return ret;
}
- scoped_refptr<BlockHandle> block_handle_;
+ scoped_refptr<BlockHandle> block_;
Slice data_;
bool parsed_;
diff --git a/src/kudu/cfile/block_handle.h b/src/kudu/cfile/block_handle.h
index d1bb62e..b88d844 100644
--- a/src/kudu/cfile/block_handle.h
+++ b/src/kudu/cfile/block_handle.h
@@ -24,8 +24,8 @@
#include <boost/variant/variant.hpp>
#include "kudu/cfile/block_cache.h"
-#include "kudu/gutil/ref_counted.h"
#include "kudu/common/rowblock_memory.h"
+#include "kudu/gutil/ref_counted.h"
namespace kudu {
namespace cfile {
@@ -40,7 +40,7 @@ namespace cfile {
// Note that the BlockHandle itself may refer to a BlockCacheHandle, which itself is
// reference-counted. When all of the references to a BlockHandle go out of scope, it
// results in decrementing the BlockCacheHandle's reference count.
-class BlockHandle : public RefCountedThreadSafe<BlockHandle> {
+class BlockHandle final : public RefCountedThreadSafe<BlockHandle> {
public:
static scoped_refptr<BlockHandle> WithOwnedData(const Slice& data) {
return { new BlockHandle(data) };
diff --git a/src/kudu/common/rowblock_memory.h b/src/kudu/common/rowblock_memory.h
index 9117ebb..5a11b4b 100644
--- a/src/kudu/common/rowblock_memory.h
+++ b/src/kudu/common/rowblock_memory.h
@@ -16,10 +16,16 @@
// under the License.
#pragma once
+#include <functional>
+#include <vector>
+
+#include "kudu/gutil/ref_counted.h"
#include "kudu/util/memory/arena.h"
namespace kudu {
+class RowBlockRefCounted;
+
// Handles the memory allocated alongside a RowBlock for variable-length
// cells.
//
@@ -27,11 +33,39 @@ namespace kudu {
// data (eg BINARY columns). In this case, the data cannot be inlined directly
// into the columnar data arrays that are part of the RowBlock and instead need
// to be allocated out of a separate Arena. This class wraps that Arena.
+//
+// In some cases (eg "plain" or "dictionary" encodings), the underlying blocks may contain
+// string data in a non-encoded form. In that case, instead of copying strings, we can
+// refer to the strings within those data blocks themselves, and hold a reference to
+// the underlying block. This class holds those reference counts as well.
struct RowBlockMemory {
Arena arena;
explicit RowBlockMemory(int arena_size = 32 * 1024) : arena(arena_size) {}
- void Reset() { arena.Reset(); }
+ ~RowBlockMemory() { Reset(); }
+
+ void Reset() {
+ arena.Reset();
+ for (auto& f : to_release_) {
+ f();
+ }
+ to_release_.clear();
+ }
+
+ // Retain a reference, typically to a BlockHandle. This is templatized to avoid
+ // a circular dependency between kudu/common/ and kudu/cfile/
+ template<class T>
+ void RetainReference(const scoped_refptr<T>& item) {
+ // TODO(todd) if this ever ends up being a hot code path, we could
+ // probably optimize by having a small hashset of pointers. If an
+ // element is already in the set, we don't need to add a second copy.
+ T* raw = item.get();
+ raw->AddRef();
+ to_release_.emplace_back([=]() { raw->Release(); });
+ }
+
+ private:
+ std::vector<std::function<void()>> to_release_;
};
} // namespace kudu
diff --git a/src/kudu/integration-tests/full_stack-insert-scan-test.cc b/src/kudu/integration-tests/full_stack-insert-scan-test.cc
index b91949e..7d3134e 100644
--- a/src/kudu/integration-tests/full_stack-insert-scan-test.cc
+++ b/src/kudu/integration-tests/full_stack-insert-scan-test.cc
@@ -21,6 +21,7 @@
#include <cstdint>
#include <memory>
#include <ostream>
+#include <set>
#include <string>
#include <thread>
#include <utility>
@@ -40,6 +41,7 @@
#include "kudu/client/write_op.h"
#include "kudu/codegen/compilation_manager.h"
#include "kudu/common/partial_row.h"
+#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/split.h"
@@ -98,6 +100,7 @@ using kudu::client::KuduTable;
using kudu::client::KuduTableCreator;
using kudu::cluster::InternalMiniCluster;
using kudu::cluster::InternalMiniClusterOptions;
+using std::set;
using std::string;
using std::thread;
using std::unique_ptr;
@@ -196,9 +199,17 @@ class FullStackInsertScanTest : public KuduTest {
// Insert the rows that are associated with that ID.
void InsertRows(CountDownLatch* start_latch, int id, uint32_t seed);
+ enum class ScanFlag {
+ // Disable the block cache for the scan.
+ kDontCacheBlocks,
+ // Enable fault tolerance. This triggers different iterator code paths.
+ kFaultTolerant,
+ };
+
// Run a scan from the reader_client_ with the projection schema schema
// and LOG_TIMING message msg.
- void ScanProjection(const vector<string>& cols, const string& msg);
+ void ScanProjection(const vector<string>& cols, const string& msg,
+ const set<ScanFlag>& flags = {});
vector<string> AllColumnNames() const;
vector<string> StringColumnNames() const;
@@ -325,6 +336,11 @@ void FullStackInsertScanTest::DoTestScans() {
NO_FATALS(ScanProjection({}, "empty projection, 0 col"));
NO_FATALS(ScanProjection({ "key" }, "key scan, 1 col"));
+ NO_FATALS(ScanProjection(AllColumnNames(), "full schema scan, no cache, 10 col",
+ { ScanFlag::kDontCacheBlocks }));
+ NO_FATALS(ScanProjection(AllColumnNames(),
+ "fault-tolerant full schema scan, no cache, 10 col",
+ { ScanFlag::kDontCacheBlocks, ScanFlag::kFaultTolerant }));
NO_FATALS(ScanProjection(AllColumnNames(), "full schema scan, 10 col"));
NO_FATALS(ScanProjection(StringColumnNames(), "String projection, 1 col"));
NO_FATALS(ScanProjection(Int32ColumnNames(), "Int32 projection, 4 col"));
@@ -394,8 +410,10 @@ void FullStackInsertScanTest::InsertRows(CountDownLatch* start_latch, int id,
FlushSessionOrDie(session);
}
+
void FullStackInsertScanTest::ScanProjection(const vector<string>& cols,
- const string& msg) {
+ const string& msg,
+ const set<ScanFlag>& flags) {
{
// Warmup codegen cache
KuduScanner scanner(reader_table_.get());
@@ -404,6 +422,12 @@ void FullStackInsertScanTest::ScanProjection(const vector<string>& cols,
codegen::CompilationManager::GetSingleton()->Wait();
}
KuduScanner scanner(reader_table_.get());
+ if (ContainsKey(flags, ScanFlag::kDontCacheBlocks)) {
+ CHECK_OK(scanner.SetCacheBlocks(false));
+ }
+ if (ContainsKey(flags, ScanFlag::kFaultTolerant)) {
+ CHECK_OK(scanner.SetFaultTolerant());
+ }
ASSERT_OK(scanner.SetProjectedColumnNames(cols));
uint64_t nrows = 0;
LOG_TIMING(INFO, msg) {