You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2020/04/16 17:45:19 UTC
[kudu] 01/04: Avoid calling Schema::find_column() once per RowBlock
in columnar serialization
This is an automated email from the ASF dual-hosted git repository.
todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit c7c4d47ecca0ea0d90435dac736d22f4063d5507
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Tue Mar 31 12:03:09 2020 -0700
Avoid calling Schema::find_column() once per RowBlock in columnar serialization
Prior to this patch, each row block being serialized in the columnar
format would result in a call to Schema::find_column(name) for each
projected column. That was relatively expensive, involving a hash
computation and string equality check, etc.
This changes the projection calculation to happen "up front" once per
Scan RPC and per-rowblock calls.
This optimization could also apply to the rowwise serialization, but I
found that the other overheads inherent in that code path are so high
that the find_column calls aren't particularly noticeable. Nonetheless
I left a TODO.
Change-Id: I1b683c7d6d6fe1026ee06c8b5ebfe2a5f1ee6cb1
Reviewed-on: http://gerrit.cloudera.org:8080/15678
Reviewed-by: Andrew Wong <aw...@cloudera.com>
Tested-by: Todd Lipcon <to...@apache.org>
---
src/kudu/common/columnar_serialization.cc | 54 +++++++++-----------
src/kudu/common/columnar_serialization.h | 41 +++++++++++----
src/kudu/common/wire_protocol-test.cc | 12 ++---
src/kudu/tserver/tablet_service.cc | 84 +++++++++++++++++++------------
4 files changed, 112 insertions(+), 79 deletions(-)
diff --git a/src/kudu/common/columnar_serialization.cc b/src/kudu/common/columnar_serialization.cc
index f6b289e..52a3425 100644
--- a/src/kudu/common/columnar_serialization.cc
+++ b/src/kudu/common/columnar_serialization.cc
@@ -587,34 +587,31 @@ void CopySelectedVarlenCellsFromColumn(const ColumnBlock& cblock,
} // anonymous namespace
} // namespace internal
-int SerializeRowBlockColumnar(
- const RowBlock& block,
- const Schema* projection_schema,
- ColumnarSerializedBatch* out) {
- DCHECK_GT(block.nrows(), 0);
- const Schema* tablet_schema = block.schema();
-
- if (projection_schema == nullptr) {
- projection_schema = tablet_schema;
- }
-
+ColumnarSerializedBatch::ColumnarSerializedBatch(const Schema& rowblock_schema,
+ const Schema& client_schema) {
// Initialize buffers for the columns.
// TODO(todd) don't pre-size these to 1MB per column -- quite
// expensive if there are a lot of columns!
- if (out->columns.size() != projection_schema->num_columns()) {
- CHECK_EQ(out->columns.size(), 0);
- out->columns.reserve(projection_schema->num_columns());
- for (const auto& col : projection_schema->columns()) {
- out->columns.emplace_back();
- out->columns.back().data.reserve(1024 * 1024);
- if (col.type_info()->physical_type() == BINARY) {
- out->columns.back().varlen_data.emplace();
- }
- if (col.is_nullable()) {
- out->columns.back().non_null_bitmap.emplace();
- }
+ columns_.reserve(client_schema.num_columns());
+ for (const auto& schema_col : client_schema.columns()) {
+ columns_.emplace_back();
+ auto& col = columns_.back();
+
+ col.rowblock_schema_col_idx = rowblock_schema.find_column(schema_col.name());
+ CHECK_NE(col.rowblock_schema_col_idx, -1);
+
+ col.data.reserve(1024 * 1024);
+ if (schema_col.type_info()->physical_type() == BINARY) {
+ col.varlen_data.emplace();
+ }
+ if (schema_col.is_nullable()) {
+ col.non_null_bitmap.emplace();
}
}
+}
+
+int ColumnarSerializedBatch::AddRowBlock(const RowBlock& block) {
+ DCHECK_GT(block.nrows(), 0);
SelectedRows sel = block.selection_vector()->GetSelectedRows();
if (sel.num_selected() == 0) {
@@ -622,21 +619,18 @@ int SerializeRowBlockColumnar(
}
int col_idx = 0;
- for (const auto& col : projection_schema->columns()) {
- int t_schema_idx = tablet_schema->find_column(col.name());
- CHECK_NE(t_schema_idx, -1);
- const ColumnBlock& column_block = block.column_block(t_schema_idx);
-
+ for (const auto& col : columns_) {
+ const ColumnBlock& column_block = block.column_block(col.rowblock_schema_col_idx);
if (column_block.type_info()->physical_type() == BINARY) {
internal::CopySelectedVarlenCellsFromColumn(
column_block,
sel,
- &out->columns[col_idx]);
+ &columns_[col_idx]);
} else {
internal::CopySelectedCellsFromColumn(
column_block,
sel,
- &out->columns[col_idx]);
+ &columns_[col_idx]);
}
col_idx++;
}
diff --git a/src/kudu/common/columnar_serialization.h b/src/kudu/common/columnar_serialization.h
index bc74f08..b4862c7 100644
--- a/src/kudu/common/columnar_serialization.h
+++ b/src/kudu/common/columnar_serialization.h
@@ -17,6 +17,7 @@
#pragma once
#include <cstdint>
+#include <utility>
#include <vector>
#include <boost/optional/optional.hpp>
@@ -30,8 +31,24 @@ class Schema;
// A pending batch of serialized rows, suitable for easy conversion
// into the protobuf representation and a set of sidecars.
-struct ColumnarSerializedBatch {
+class ColumnarSerializedBatch {
+ public:
+ // 'rowblock_schema': the schema of the RowBlocks that will be passed to
+ // AddRowBlock().
+ // 'client_schema': the schema to be returned to the client, which may
+ // contain a subset of columns
+ ColumnarSerializedBatch(const Schema& rowblock_schema,
+ const Schema& client_schema);
+
+ // Append the data in 'block' into this columnar batch.
+ //
+ // Returns the number of selected rows serialized.
+ int AddRowBlock(const RowBlock& block);
+
struct Column {
+ // The index of the column in the schema of the RowBlocks to be appended.
+ int rowblock_schema_col_idx;
+
// Underlying column data.
faststring data;
@@ -41,17 +58,19 @@ struct ColumnarSerializedBatch {
// Each bit is set when a value is non-null
boost::optional<faststring> non_null_bitmap;
};
- std::vector<Column> columns;
-};
-// Serialize the data in 'block' into the columnar batch 'out', appending to
-// any data already serialized to the same batch.
-//
-// Returns the number of selected rows serialized.
-int SerializeRowBlockColumnar(
- const RowBlock& block,
- const Schema* projection_schema,
- ColumnarSerializedBatch* out);
+ const std::vector<Column>& columns() const {
+ return columns_;
+ }
+
+ std::vector<Column> TakeColumns() && {
+ return std::move(columns_);
+ }
+
+ private:
+ friend class WireProtocolTest;
+ std::vector<Column> columns_;
+};
////////////////////////////////////////////////////////////
diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc
index 4a1c791..9ab534d 100644
--- a/src/kudu/common/wire_protocol-test.cc
+++ b/src/kudu/common/wire_protocol-test.cc
@@ -306,13 +306,13 @@ TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) {
}
// Convert all of the RowBlocks to a single serialized (concatenated) columnar format.
- ColumnarSerializedBatch batch;
+ ColumnarSerializedBatch batch(schema_, schema_);
for (const auto& block : blocks) {
- SerializeRowBlockColumnar(block, nullptr, &batch);
+ batch.AddRowBlock(block);
}
// Verify that the resulting serialized data matches the concatenated original data blocks.
- ASSERT_EQ(5, batch.columns.size());
+ ASSERT_EQ(5, batch.columns().size());
int dst_row_idx = 0;
for (const auto& block : blocks) {
for (int src_row_idx = 0; src_row_idx < block.nrows(); src_row_idx++) {
@@ -325,7 +325,7 @@ TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) {
for (int c = 0; c < schema_.num_columns(); c++) {
SCOPED_TRACE(c);
const auto& col = schema_.column(c);
- const auto& serialized_col = batch.columns[c];
+ const auto& serialized_col = batch.columns()[c];
if (col.is_nullable()) {
bool expect_null = row.is_null(c);;
EXPECT_EQ(!BitmapTest(serialized_col.non_null_bitmap->data(), dst_row_idx),
@@ -464,8 +464,8 @@ struct RowwiseConverter {
struct ColumnarConverter {
static void Run(const RowBlock& block) {
- ColumnarSerializedBatch batch;
- SerializeRowBlockColumnar(block, nullptr, &batch);
+ ColumnarSerializedBatch batch(*block.schema(), *block.schema());
+ batch.AddRowBlock(block);
}
static constexpr const char* kName = "columnar";
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 1bb93ff..edfc1fd 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -26,6 +26,7 @@
#include <numeric>
#include <ostream>
#include <string>
+#include <type_traits>
#include <unordered_set>
#include <vector>
@@ -691,7 +692,9 @@ class ScanResultCollector {
// request is decoded and checked for 'row_format_flags'.
//
// Does nothing by default.
- virtual Status InitSerializer(uint64_t /* row_format_flags */) {
+ virtual Status InitSerializer(uint64_t /* row_format_flags */,
+ const Schema& /* scanner_schema */,
+ const Schema& /* client_schema */) {
return Status::OK();
}
@@ -754,6 +757,8 @@ class RowwiseResultSerializer : public ResultSerializer {
int SerializeRowBlock(const RowBlock& row_block,
const Schema* client_projection_schema) override {
+ // TODO(todd) create some kind of serializer object that caches the projection
+ // information to avoid recalculating it on every SerializeRowBlock call.
int num_selected = kudu::SerializeRowBlock(
row_block, client_projection_schema,
&rows_data_, &indirect_data_, pad_unixtime_micros_to_16_bytes_);
@@ -795,18 +800,21 @@ class RowwiseResultSerializer : public ResultSerializer {
class ColumnarResultSerializer : public ResultSerializer {
public:
- static Status Create(uint64_t flags, unique_ptr<ResultSerializer>* serializer) {
+ static Status Create(uint64_t flags,
+ const Schema& scanner_schema,
+ const Schema& client_schema,
+ unique_ptr<ResultSerializer>* serializer) {
if (flags & ~RowFormatFlags::COLUMNAR_LAYOUT) {
return Status::InvalidArgument("Row format flags not supported with columnar layout");
}
- serializer->reset(new ColumnarResultSerializer());
+ serializer->reset(new ColumnarResultSerializer(scanner_schema, client_schema));
return Status::OK();
}
int SerializeRowBlock(const RowBlock& row_block,
- const Schema* client_projection_schema) override {
+ const Schema* /* unused */) override {
CHECK(!done_);
- int n_sel = SerializeRowBlockColumnar(row_block, client_projection_schema, &results_);
+ int n_sel = results_.AddRowBlock(row_block);
num_rows_ += n_sel;
return n_sel;
}
@@ -815,7 +823,7 @@ class ColumnarResultSerializer : public ResultSerializer {
CHECK(!done_);
int total = 0;
- for (const auto& col : results_.columns) {
+ for (const auto& col : results_.columns()) {
total += col.data.size();
if (col.varlen_data) {
total += col.varlen_data->size();
@@ -831,7 +839,8 @@ class ColumnarResultSerializer : public ResultSerializer {
CHECK(!done_);
done_ = true;
ColumnarRowBlockPB* data = resp->mutable_columnar_data();
- for (auto& col : results_.columns) {
+ auto cols = std::move(results_).TakeColumns();
+ for (auto& col : cols) {
auto* col_pb = data->add_columns();
int sidecar_idx;
CHECK_OK(context->AddOutboundSidecar(
@@ -854,7 +863,10 @@ class ColumnarResultSerializer : public ResultSerializer {
}
private:
- ColumnarResultSerializer() {}
+ ColumnarResultSerializer(const Schema& scanner_schema,
+ const Schema& client_schema)
+ : results_(scanner_schema, client_schema) {
+ }
int64_t num_rows_ = 0;
ColumnarSerializedBatch results_;
@@ -898,14 +910,17 @@ class ScanResultCopier : public ScanResultCollector {
return num_rows_returned_;
}
- Status InitSerializer(uint64_t row_format_flags) override {
+ Status InitSerializer(uint64_t row_format_flags,
+ const Schema& scanner_schema,
+ const Schema& client_schema) override {
if (serializer_) {
// TODO(todd) for the NewScanner case, this gets called twice
// which is a bit ugly. Refactor to avoid!
return Status::OK();
}
if (row_format_flags & COLUMNAR_LAYOUT) {
- return ColumnarResultSerializer::Create(row_format_flags, &serializer_);
+ return ColumnarResultSerializer::Create(
+ row_format_flags, scanner_schema, client_schema, &serializer_);
}
serializer_.reset(new RowwiseResultSerializer(batch_size_bytes_, row_format_flags));
return Status::OK();
@@ -2432,14 +2447,6 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
TRACE_EVENT1("tserver", "TabletServiceImpl::HandleNewScanRequest",
"tablet_id", scan_pb.tablet_id());
- Status s = result_collector->InitSerializer(scan_pb.row_format_flags());
- if (!s.ok()) {
- *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
- return s;
- }
-
- const Schema& tablet_schema = replica->tablet_metadata()->schema();
-
SharedScanner scanner;
server_->scanner_manager()->NewScanner(replica,
rpc_context->remote_user(),
@@ -2455,7 +2462,7 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
// Create the user's requested projection.
// TODO(todd): Add test cases for bad projections including 0 columns.
Schema projection;
- s = ColumnPBsToSchema(scan_pb.projected_columns(), &projection);
+ Status s = ColumnPBsToSchema(scan_pb.projected_columns(), &projection);
if (PREDICT_FALSE(!s.ok())) {
*error_code = TabletServerErrorPB::INVALID_SCHEMA;
return s;
@@ -2480,6 +2487,8 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
}
}
+ const Schema& tablet_schema = replica->tablet_metadata()->schema();
+
ScanSpec spec;
s = SetupScanSpec(scan_pb, tablet_schema, scanner, &spec);
if (PREDICT_FALSE(!s.ok())) {
@@ -2498,11 +2507,6 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
// NOTE: We should build the missing column after optimizing scan which will
// remove unnecessary predicates.
vector<ColumnSchema> missing_cols = spec.GetMissingColumns(projection);
- if (spec.CanShortCircuit()) {
- VLOG(1) << "short-circuiting without creating a server-side scanner.";
- *has_more_results = false;
- return Status::OK();
- }
// Store the original projection.
{
@@ -2551,6 +2555,20 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
projection = projection_builder.BuildWithoutIds();
VLOG(3) << "Scan projection: " << projection.ToString(Schema::BASE_INFO);
+ s = result_collector->InitSerializer(scan_pb.row_format_flags(),
+ projection,
+ *scanner->client_projection_schema());
+ if (!s.ok()) {
+ *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
+ return s;
+ }
+
+ if (spec.CanShortCircuit()) {
+ VLOG(1) << "short-circuiting without creating a server-side scanner.";
+ *has_more_results = false;
+ return Status::OK();
+ }
+
// It's important to keep the reference to the tablet for the case when the
// tablet replica's shutdown is run concurrently with the code below.
shared_ptr<Tablet> tablet;
@@ -2741,13 +2759,6 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
<< SecureShortDebugString(*req);
TRACE("Found scanner $0 for tablet $1", scanner->id(), scanner->tablet_id());
- // Set the row format flags on the ScanResultCollector.
- s = result_collector->InitSerializer(scanner->row_format_flags());
- if (!s.ok()) {
- *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
- return s;
- }
-
if (batch_size_bytes == 0 && req->close_scanner()) {
*has_more_results = false;
return Status::OK();
@@ -2761,11 +2772,20 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
RowwiseIterator* iter = scanner->iter();
+ // Set the row format flags on the ScanResultCollector.
+ s = result_collector->InitSerializer(scanner->row_format_flags(),
+ iter->schema(),
+ *scanner->client_projection_schema());
+ if (!s.ok()) {
+ *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
+ return s;
+ }
+
// TODO(todd): could size the RowBlock based on the user's requested batch size?
// If people had really large indirect objects, we would currently overshoot
// their requested batch size by a lot.
Arena arena(32 * 1024);
- RowBlock block(&scanner->iter()->schema(),
+ RowBlock block(&iter->schema(),
FLAGS_scanner_batch_size_rows, &arena);
// TODO(todd): in the future, use the client timeout to set a budget. For now,