You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2020/03/25 18:10:18 UTC
[kudu] branch master updated: wire_protocol: some simplification
and optimization for rowwise encoding
This is an automated email from the ASF dual-hosted git repository.
todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 3ae02a9 wire_protocol: some simplification and optimization for rowwise encoding
3ae02a9 is described below
commit 3ae02a92ccb99aa1a6b0c3d05deea75756aaa6e3
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Tue Mar 24 14:23:34 2020 -0700
wire_protocol: some simplification and optimization for rowwise encoding
* Re-implement GetSelectedRows based on the new ForEachSetBit(...)
utility, which operates word-by-word instead of byte-by-byte.
* use a boolean return value which indicates the common case of "all
rows are selected". Currently the rowwise serialization code path
doesn't use this special value (and just reproduces the old
std::iota() call to generate the sequence of all indexes), but the
columnar code path will special case this as a memcpy.
* Avoid one call to CountSelected() in SerializeRowBlock() by calculating
num_rows from the size of the row index vector.
* Change SerializeRowBlock() to return an int indicating the number of
rows selected, instead of accumulating it into the protobuf. This
value can then be re-used to eliminate one extra call to CountSelected
in ScanResultCopier::HandleRowBlock().
After this change, the protobuf is no longer used by
SerializeRowBlock, so I removed the parameter, which required a bit of
fixup in the tests.
Change-Id: I24dfb1bd036fde514ca6494bae0ddc171dd225dd
Reviewed-on: http://gerrit.cloudera.org:8080/15550
Reviewed-by: Grant Henke <gr...@apache.org>
Tested-by: Todd Lipcon <to...@apache.org>
---
src/kudu/common/rowblock-test.cc | 14 ++++++------
src/kudu/common/rowblock.cc | 42 +++++++++++++++++++----------------
src/kudu/common/rowblock.h | 8 ++++++-
src/kudu/common/wire_protocol-test.cc | 26 ++++++++++------------
src/kudu/common/wire_protocol.cc | 35 ++++++++++++++++++++---------
src/kudu/common/wire_protocol.h | 12 +++++-----
src/kudu/tserver/tablet_service.cc | 21 +++++++++---------
7 files changed, 90 insertions(+), 68 deletions(-)
diff --git a/src/kudu/common/rowblock-test.cc b/src/kudu/common/rowblock-test.cc
index c2d5a84..f9e9aca 100644
--- a/src/kudu/common/rowblock-test.cc
+++ b/src/kudu/common/rowblock-test.cc
@@ -18,6 +18,7 @@
#include "kudu/common/rowblock.h"
#include <cstddef>
+#include <cstdint>
#include <vector>
#include <gtest/gtest.h>
@@ -59,28 +60,27 @@ TEST(TestSelectionVector, TestNonByteAligned) {
ASSERT_EQ(sv.nrows(), sv.CountSelected());
ASSERT_TRUE(sv.AnySelected());
- vector<int> sel;
- sv.GetSelectedRows(&sel);
- ASSERT_EQ(sv.nrows(), sel.size());
+ vector<uint16_t> sel;
+ ASSERT_FALSE(sv.GetSelectedRows(&sel));
for (size_t i = 0; i < sv.nrows(); i++) {
sv.SetRowUnselected(i);
}
ASSERT_EQ(0, sv.CountSelected());
ASSERT_FALSE(sv.AnySelected());
- sv.GetSelectedRows(&sel);
+ ASSERT_TRUE(sv.GetSelectedRows(&sel));
ASSERT_EQ(0, sel.size());
}
TEST(TestSelectionVector, TestGetSelectedRows) {
- vector<int> expected = {1, 4, 9, 10, 18};
+ vector<uint16_t> expected = {1, 4, 9, 10, 18};
SelectionVector sv(20);
sv.SetAllFalse();
for (int i : expected) {
sv.SetRowSelected(i);
}
- vector<int> selected;
- sv.GetSelectedRows(&selected);
+ vector<uint16_t> selected;
+ ASSERT_TRUE(sv.GetSelectedRows(&selected));
ASSERT_EQ(expected, selected);
}
diff --git a/src/kudu/common/rowblock.cc b/src/kudu/common/rowblock.cc
index 7e8b22b..74d1964 100644
--- a/src/kudu/common/rowblock.cc
+++ b/src/kudu/common/rowblock.cc
@@ -16,7 +16,7 @@
// under the License.
#include "kudu/common/rowblock.h"
-#include <numeric>
+#include <limits>
#include <vector>
#include <glog/logging.h>
@@ -74,31 +74,35 @@ void SelectionVector::ClearToSelectAtMost(size_t max_rows) {
}
}
-void SelectionVector::GetSelectedRows(vector<int>* selected) const {
+
+// TODO(todd) this is a bit faster when implemented with target "bmi" enabled.
+// Consider duplicating it and doing runtime switching.
+static void GetSelectedRowsInternal(const uint8_t* __restrict__ bitmap,
+ int n_bytes,
+ uint16_t* __restrict__ dst) {
+ ForEachSetBit(bitmap, n_bytes * 8,
+ [&](int bit) {
+ *dst++ = bit;
+ });
+}
+
+bool SelectionVector::GetSelectedRows(vector<uint16_t>* selected) const {
+ CHECK_LE(n_rows_, std::numeric_limits<uint16_t>::max());
int n_selected = CountSelected();
- selected->resize(n_selected);
- if (n_selected == 0) {
- return;
- }
if (n_selected == n_rows_) {
- std::iota(selected->begin(), selected->end(), 0);
- return;
+ selected->clear();
+ return false;
}
- const uint8_t* bitmap = &bitmap_[0];
- int* dst = selected->data();
- // Within each byte, keep flipping the least significant non-zero bit and adding
- // the bit index to the output until none are set.
- for (int i = 0; i < n_bytes_; i++) {
- uint8_t bm = *bitmap++;
- while (bm != 0) {
- int bit = Bits::FindLSBSetNonZero(bm);
- *dst++ = (i * 8) + bit;
- bm ^= (1 << bit);
- }
+ selected->resize(n_selected);
+ if (n_selected == 0) {
+ return true;
}
+ GetSelectedRowsInternal(&bitmap_[0], n_bytes_, selected->data());
+ return true;
}
+
size_t SelectionVector::CountSelected() const {
return Bits::Count(&bitmap_[0], n_bytes_);
}
diff --git a/src/kudu/common/rowblock.h b/src/kudu/common/rowblock.h
index 04fa419..cedc4a7 100644
--- a/src/kudu/common/rowblock.h
+++ b/src/kudu/common/rowblock.h
@@ -18,6 +18,7 @@
#include <cstdint>
#include <cstring>
+#include <memory>
#include <string>
#include <vector>
@@ -27,6 +28,7 @@
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/stringpiece.h"
#include "kudu/util/bitmap.h"
#include "kudu/util/status.h"
@@ -102,7 +104,11 @@ class SelectionVector {
// Sets '*selected' to the indices of all rows marked as selected
// in this selection vector.
- void GetSelectedRows(std::vector<int>* selected) const;
+ //
+ // NOTE: in the case that all rows are selected, a fast path is triggered
+ // in which false is returned with an empty 'selected'. Otherwise, returns
+ // true.
+ bool GetSelectedRows(std::vector<uint16_t>* selected) const WARN_UNUSED_RESULT;
uint8_t *mutable_bitmap() {
return &bitmap_[0];
diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc
index d85e947..4c9ac0d 100644
--- a/src/kudu/common/wire_protocol-test.cc
+++ b/src/kudu/common/wire_protocol-test.cc
@@ -44,7 +44,6 @@
#include "kudu/util/hash.pb.h"
#include "kudu/util/hexdump.h"
#include "kudu/util/memory/arena.h"
-#include "kudu/util/pb_util.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h" // IWYU pragma: keep
@@ -157,14 +156,12 @@ class WireProtocolTest : public KuduTest,
FillRowBlockForBenchmark(&block);
SelectRandomRowsWithRate(&block, select_rate);
- RowwiseRowBlockPB pb;
faststring direct, indirect;
int64_t cycle_start = CycleClock::Now();
for (int i = 0; i < kNumTrials; ++i) {
- pb.Clear();
direct.clear();
indirect.clear();
- SerializeRowBlock(block, &pb, nullptr, &direct, &indirect);
+ SerializeRowBlock(block, nullptr, &direct, &indirect);
}
int64_t cycle_end = CycleClock::Now();
LOG(INFO) << Substitute(
@@ -315,19 +312,20 @@ TEST_F(WireProtocolTest, TestColumnarRowBlockToPB) {
FillRowBlockWithTestRows(&block);
// Convert to PB.
- RowwiseRowBlockPB pb;
faststring direct, indirect;
- SerializeRowBlock(block, &pb, nullptr, &direct, &indirect);
- SCOPED_TRACE(pb_util::SecureDebugString(pb));
+ int num_rows = SerializeRowBlock(block, nullptr, &direct, &indirect);
SCOPED_TRACE("Row data: " + direct.ToString());
SCOPED_TRACE("Indirect data: " + indirect.ToString());
// Convert back to a row, ensure that the resulting row is the same
// as the one we put in.
+ RowwiseRowBlockPB pb;
+ pb.set_num_rows(num_rows);
+
vector<const uint8_t*> row_ptrs;
Slice direct_sidecar = direct;
ASSERT_OK(ExtractRowsFromRowBlockPB(schema_, pb, indirect,
- &direct_sidecar, &row_ptrs));
+ &direct_sidecar, &row_ptrs));
ASSERT_EQ(block.nrows(), row_ptrs.size());
for (int i = 0; i < block.nrows(); ++i) {
ConstContiguousRow row_roundtripped(&schema_, row_ptrs[i]);
@@ -375,16 +373,17 @@ TEST_F(WireProtocolTest, TestColumnarRowBlockToPBWithPadding) {
ColumnSchema("col3", INT32, true /* nullable */)}, 0);
// Convert to PB.
- RowwiseRowBlockPB pb;
faststring direct, indirect;
- SerializeRowBlock(block, &pb, &proj_schema, &direct, &indirect, true /* pad timestamps */);
- SCOPED_TRACE(pb_util::SecureDebugString(pb));
+ int num_rows = SerializeRowBlock(block, &proj_schema, &direct, &indirect,
+ true /* pad timestamps */);
SCOPED_TRACE("Row data: " + HexDump(direct));
SCOPED_TRACE("Indirect data: " + HexDump(indirect));
// Convert back to a row, ensure that the resulting row is the same
// as the one we put in. Can't reuse the decoding methods since we
// won't support decoding padded rows within Kudu.
+ RowwiseRowBlockPB pb;
+ pb.set_num_rows(num_rows);
vector<const uint8_t*> row_ptrs;
Slice direct_sidecar = direct;
Slice indirect_sidecar = indirect;
@@ -476,10 +475,9 @@ TEST_F(WireProtocolTest, TestBlockWithNoColumns) {
ASSERT_EQ(900, block.selection_vector()->CountSelected());
// Convert it to protobuf, ensure that the results look right.
- RowwiseRowBlockPB pb;
faststring direct, indirect;
- SerializeRowBlock(block, &pb, nullptr, &direct, &indirect);
- ASSERT_EQ(900, pb.num_rows());
+ int num_rows = SerializeRowBlock(block, nullptr, &direct, &indirect);
+ ASSERT_EQ(900, num_rows);
}
TEST_F(WireProtocolTest, TestColumnDefaultValue) {
diff --git a/src/kudu/common/wire_protocol.cc b/src/kudu/common/wire_protocol.cc
index 533de57..5b2bc3f 100644
--- a/src/kudu/common/wire_protocol.cc
+++ b/src/kudu/common/wire_protocol.cc
@@ -22,6 +22,7 @@
#include <algorithm>
#include <cstdint>
#include <cstring>
+#include <numeric>
#include <ostream>
#include <string>
#include <vector>
@@ -891,7 +892,7 @@ static void CopyColumn(
const ColumnBlock& column_block, int dst_col_idx, uint8_t* __restrict__ dst_base,
faststring* indirect_data, const Schema* dst_schema, size_t row_stride,
size_t schema_byte_size, size_t column_offset,
- const vector<int>& row_idx_select) {
+ const vector<uint16_t>& row_idx_select) {
DCHECK(dst_schema);
uint8_t* dst = dst_base + column_offset;
size_t offset_to_non_null_bitmap = schema_byte_size - column_offset;
@@ -927,13 +928,28 @@ static void CopyColumn(
// Because we use a faststring here, ASAN tests become unbearably slow
// with the extra verifications.
ATTRIBUTE_NO_ADDRESS_SAFETY_ANALYSIS
-void SerializeRowBlock(const RowBlock& block,
- RowwiseRowBlockPB* rowblock_pb,
- const Schema* projection_schema,
- faststring* data_buf,
- faststring* indirect_data,
- bool pad_unixtime_micros_to_16_bytes) {
+int SerializeRowBlock(const RowBlock& block,
+ const Schema* projection_schema,
+ faststring* data_buf,
+ faststring* indirect_data,
+ bool pad_unixtime_micros_to_16_bytes) {
DCHECK_GT(block.nrows(), 0);
+
+ vector<uint16_t> selected_row_indexes;
+ bool all_selected = !block.selection_vector()->GetSelectedRows(
+ &selected_row_indexes);
+ if (all_selected) {
+ // TODO(todd): add a fast-path for this in the 'Copy' functions.
+ selected_row_indexes.resize(block.nrows());
+ std::iota(selected_row_indexes.begin(),
+ selected_row_indexes.end(), 0);
+ }
+ size_t num_rows = selected_row_indexes.size();
+
+ // Fast-path empty blocks (eg because the predicate didn't match any rows or
+ // all rows in the block were deleted)
+ if (num_rows == 0) return 0;
+
const Schema* tablet_schema = block.schema();
if (projection_schema == nullptr) {
@@ -958,7 +974,6 @@ void SerializeRowBlock(const RowBlock& block,
size_t old_size = data_buf->size();
size_t row_stride = ContiguousRowHelper::row_size(*projection_schema) + total_padding;
- size_t num_rows = block.selection_vector()->CountSelected();
size_t schema_byte_size = projection_schema->byte_size() + total_padding;
size_t additional_size = row_stride * num_rows;
@@ -971,8 +986,6 @@ void SerializeRowBlock(const RowBlock& block,
memset(base, 0, additional_size);
}
- vector<int> selected_row_indexes;
- block.selection_vector()->GetSelectedRows(&selected_row_indexes);
size_t t_schema_idx = 0;
size_t padding_so_far = 0;
for (int p_schema_idx = 0; p_schema_idx < projection_schema->num_columns(); p_schema_idx++) {
@@ -1009,7 +1022,7 @@ void SerializeRowBlock(const RowBlock& block,
padding_so_far += 8;
}
}
- rowblock_pb->set_num_rows(rowblock_pb->num_rows() + num_rows);
+ return num_rows;
}
string StartTimeToString(const ServerRegistrationPB& reg) {
diff --git a/src/kudu/common/wire_protocol.h b/src/kudu/common/wire_protocol.h
index 6a88b3d..7ac76fd 100644
--- a/src/kudu/common/wire_protocol.h
+++ b/src/kudu/common/wire_protocol.h
@@ -158,7 +158,7 @@ Status ParseInt32Config(const std::string& name, const std::string& value, int32
Status ExtraConfigPBToPBMap(const TableExtraConfigPB& pb,
google::protobuf::Map<std::string, std::string>* configs);
-// Encode the given row block into the provided protobuf and data buffers.
+// Encode the given row block into the provided data buffers.
//
// All data (both direct and indirect) for each selected row in the RowBlock is
// copied into the protobuf and faststrings.
@@ -172,10 +172,12 @@ Status ExtraConfigPBToPBMap(const TableExtraConfigPB& pb,
// schema will be padded to the right by 8 (zero'd) bytes for a total of 16 bytes.
//
// Requires that block.nrows() > 0
-void SerializeRowBlock(const RowBlock& block, RowwiseRowBlockPB* rowblock_pb,
- const Schema* projection_schema,
- faststring* data_buf, faststring* indirect_data,
- bool pad_unixtime_micros_to_16_bytes = false);
+//
+// Returns the number of rows serialized.
+int SerializeRowBlock(const RowBlock& block,
+ const Schema* projection_schema,
+ faststring* data_buf, faststring* indirect_data,
+ bool pad_unixtime_micros_to_16_bytes = false);
// Rewrites the data pointed-to by row data slice 'row_data_slice' by replacing
// relative indirect data pointers with absolute ones in 'indirect_data_slice'.
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index e3f861b..c69e0b3 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -91,7 +91,6 @@
#include "kudu/tserver/tserver_admin.pb.h"
#include "kudu/tserver/tserver_service.pb.h"
#include "kudu/util/auto_release_pool.h"
-#include "kudu/util/bitset.h"
#include "kudu/util/crc.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/faststring.h"
@@ -758,16 +757,16 @@ class ScanResultCopier : public ScanResultCollector {
pad_unixtime_micros_to_16_bytes_(false) {}
void HandleRowBlock(Scanner* scanner, const RowBlock& row_block) override {
- int64_t num_selected = row_block.selection_vector()->CountSelected();
- // Fast-path empty blocks (eg because the predicate didn't match any rows or
- // all rows in the block were deleted)
- if (num_selected == 0) return;
-
- num_rows_returned_ += num_selected;
- scanner->add_num_rows_returned(num_selected);
- SerializeRowBlock(row_block, rowblock_pb_, scanner->client_projection_schema(),
- rows_data_, indirect_data_, pad_unixtime_micros_to_16_bytes_);
- SetLastRow(row_block, &last_primary_key_);
+ int num_selected = SerializeRowBlock(
+ row_block, scanner->client_projection_schema(),
+ rows_data_, indirect_data_, pad_unixtime_micros_to_16_bytes_);
+
+ if (num_selected > 0) {
+ rowblock_pb_->set_num_rows(rowblock_pb_->num_rows() + num_selected);
+ num_rows_returned_ += num_selected;
+ scanner->add_num_rows_returned(num_selected);
+ SetLastRow(row_block, &last_primary_key_);
+ }
}
// Returns number of bytes buffered to return.