You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2020/04/16 17:45:19 UTC

[kudu] 01/04: Avoid calling Schema::find_column() once per RowBlock in columnar serialization

This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit c7c4d47ecca0ea0d90435dac736d22f4063d5507
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Tue Mar 31 12:03:09 2020 -0700

    Avoid calling Schema::find_column() once per RowBlock in columnar serialization
    
    Prior to this patch, each row block being serialized in the columnar
    format would result in a call to Schema::find_column(name) for each
    projected column. That was relatively expensive, involving a hash
    computation and string equality check, etc.
    
    This changes the projection calculation to happen "up front" once per
    Scan RPC and per-rowblock calls.
    
    This optimization could also apply to the rowwise serialization, but I
    found that the other overheads inherent in that code path are so high
    that the find_column calls aren't particularly noticeable. Nonetheless
    I left a TODO.
    
    Change-Id: I1b683c7d6d6fe1026ee06c8b5ebfe2a5f1ee6cb1
    Reviewed-on: http://gerrit.cloudera.org:8080/15678
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Todd Lipcon <to...@apache.org>
---
 src/kudu/common/columnar_serialization.cc | 54 +++++++++-----------
 src/kudu/common/columnar_serialization.h  | 41 +++++++++++----
 src/kudu/common/wire_protocol-test.cc     | 12 ++---
 src/kudu/tserver/tablet_service.cc        | 84 +++++++++++++++++++------------
 4 files changed, 112 insertions(+), 79 deletions(-)

diff --git a/src/kudu/common/columnar_serialization.cc b/src/kudu/common/columnar_serialization.cc
index f6b289e..52a3425 100644
--- a/src/kudu/common/columnar_serialization.cc
+++ b/src/kudu/common/columnar_serialization.cc
@@ -587,34 +587,31 @@ void CopySelectedVarlenCellsFromColumn(const ColumnBlock& cblock,
 } // anonymous namespace
 } // namespace internal
 
-int SerializeRowBlockColumnar(
-    const RowBlock& block,
-    const Schema* projection_schema,
-    ColumnarSerializedBatch* out) {
-  DCHECK_GT(block.nrows(), 0);
-  const Schema* tablet_schema = block.schema();
-
-  if (projection_schema == nullptr) {
-    projection_schema = tablet_schema;
-  }
-
+ColumnarSerializedBatch::ColumnarSerializedBatch(const Schema& rowblock_schema,
+                                                 const Schema& client_schema) {
   // Initialize buffers for the columns.
   // TODO(todd) don't pre-size these to 1MB per column -- quite
   // expensive if there are a lot of columns!
-  if (out->columns.size() != projection_schema->num_columns()) {
-    CHECK_EQ(out->columns.size(), 0);
-    out->columns.reserve(projection_schema->num_columns());
-    for (const auto& col : projection_schema->columns()) {
-      out->columns.emplace_back();
-      out->columns.back().data.reserve(1024 * 1024);
-      if (col.type_info()->physical_type() == BINARY) {
-        out->columns.back().varlen_data.emplace();
-      }
-      if (col.is_nullable()) {
-        out->columns.back().non_null_bitmap.emplace();
-      }
+  columns_.reserve(client_schema.num_columns());
+  for (const auto& schema_col : client_schema.columns()) {
+    columns_.emplace_back();
+    auto& col = columns_.back();
+
+    col.rowblock_schema_col_idx = rowblock_schema.find_column(schema_col.name());
+    CHECK_NE(col.rowblock_schema_col_idx, -1);
+
+    col.data.reserve(1024 * 1024);
+    if (schema_col.type_info()->physical_type() == BINARY) {
+      col.varlen_data.emplace();
+    }
+    if (schema_col.is_nullable()) {
+      col.non_null_bitmap.emplace();
     }
   }
+}
+
+int ColumnarSerializedBatch::AddRowBlock(const RowBlock& block) {
+  DCHECK_GT(block.nrows(), 0);
 
   SelectedRows sel = block.selection_vector()->GetSelectedRows();
   if (sel.num_selected() == 0) {
@@ -622,21 +619,18 @@ int SerializeRowBlockColumnar(
   }
 
   int col_idx = 0;
-  for (const auto& col : projection_schema->columns()) {
-    int t_schema_idx = tablet_schema->find_column(col.name());
-    CHECK_NE(t_schema_idx, -1);
-    const ColumnBlock& column_block = block.column_block(t_schema_idx);
-
+  for (const auto& col : columns_) {
+    const ColumnBlock& column_block = block.column_block(col.rowblock_schema_col_idx);
     if (column_block.type_info()->physical_type() == BINARY) {
       internal::CopySelectedVarlenCellsFromColumn(
           column_block,
           sel,
-          &out->columns[col_idx]);
+          &columns_[col_idx]);
     } else {
       internal::CopySelectedCellsFromColumn(
           column_block,
           sel,
-          &out->columns[col_idx]);
+          &columns_[col_idx]);
     }
     col_idx++;
   }
diff --git a/src/kudu/common/columnar_serialization.h b/src/kudu/common/columnar_serialization.h
index bc74f08..b4862c7 100644
--- a/src/kudu/common/columnar_serialization.h
+++ b/src/kudu/common/columnar_serialization.h
@@ -17,6 +17,7 @@
 #pragma once
 
 #include <cstdint>
+#include <utility>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
@@ -30,8 +31,24 @@ class Schema;
 
 // A pending batch of serialized rows, suitable for easy conversion
 // into the protobuf representation and a set of sidecars.
-struct ColumnarSerializedBatch {
+class ColumnarSerializedBatch {
+ public:
+  // 'rowblock_schema': the schema of the RowBlocks that will be passed to
+  //                    AddRowBlock().
+  // 'client_schema': the schema to be returned to the client, which may
+  //                  contain a subset of columns
+  ColumnarSerializedBatch(const Schema& rowblock_schema,
+                          const Schema& client_schema);
+
+  // Append the data in 'block' into this columnar batch.
+  //
+  // Returns the number of selected rows serialized.
+  int AddRowBlock(const RowBlock& block);
+
   struct Column {
+    // The index of the column in the schema of the RowBlocks to be appended.
+    int rowblock_schema_col_idx;
+
     // Underlying column data.
     faststring data;
 
@@ -41,17 +58,19 @@ struct ColumnarSerializedBatch {
     // Each bit is set when a value is non-null
     boost::optional<faststring> non_null_bitmap;
   };
-  std::vector<Column> columns;
-};
 
-// Serialize the data in 'block' into the columnar batch 'out', appending to
-// any data already serialized to the same batch.
-//
-// Returns the number of selected rows serialized.
-int SerializeRowBlockColumnar(
-    const RowBlock& block,
-    const Schema* projection_schema,
-    ColumnarSerializedBatch* out);
+  const std::vector<Column>& columns() const {
+    return columns_;
+  }
+
+  std::vector<Column> TakeColumns() && {
+    return std::move(columns_);
+  }
+
+ private:
+  friend class WireProtocolTest;
+  std::vector<Column> columns_;
+};
 
 
 ////////////////////////////////////////////////////////////
diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc
index 4a1c791..9ab534d 100644
--- a/src/kudu/common/wire_protocol-test.cc
+++ b/src/kudu/common/wire_protocol-test.cc
@@ -306,13 +306,13 @@ TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) {
   }
 
   // Convert all of the RowBlocks to a single serialized (concatenated) columnar format.
-  ColumnarSerializedBatch batch;
+  ColumnarSerializedBatch batch(schema_, schema_);
   for (const auto& block : blocks) {
-    SerializeRowBlockColumnar(block, nullptr, &batch);
+    batch.AddRowBlock(block);
   }
 
   // Verify that the resulting serialized data matches the concatenated original data blocks.
-  ASSERT_EQ(5, batch.columns.size());
+  ASSERT_EQ(5, batch.columns().size());
   int dst_row_idx = 0;
   for (const auto& block : blocks) {
     for (int src_row_idx = 0; src_row_idx < block.nrows(); src_row_idx++) {
@@ -325,7 +325,7 @@ TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) {
       for (int c = 0; c < schema_.num_columns(); c++) {
         SCOPED_TRACE(c);
         const auto& col = schema_.column(c);
-        const auto& serialized_col = batch.columns[c];
+        const auto& serialized_col = batch.columns()[c];
         if (col.is_nullable()) {
           bool expect_null = row.is_null(c);;
           EXPECT_EQ(!BitmapTest(serialized_col.non_null_bitmap->data(), dst_row_idx),
@@ -464,8 +464,8 @@ struct RowwiseConverter {
 
 struct ColumnarConverter {
   static void Run(const RowBlock& block) {
-    ColumnarSerializedBatch batch;
-    SerializeRowBlockColumnar(block, nullptr, &batch);
+    ColumnarSerializedBatch batch(*block.schema(), *block.schema());
+    batch.AddRowBlock(block);
   }
 
   static constexpr const char* kName = "columnar";
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 1bb93ff..edfc1fd 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -26,6 +26,7 @@
 #include <numeric>
 #include <ostream>
 #include <string>
+#include <type_traits>
 #include <unordered_set>
 #include <vector>
 
@@ -691,7 +692,9 @@ class ScanResultCollector {
   // request is decoded and checked for 'row_format_flags'.
   //
   // Does nothing by default.
-  virtual Status InitSerializer(uint64_t /* row_format_flags */) {
+  virtual Status InitSerializer(uint64_t /* row_format_flags */,
+                                const Schema& /* scanner_schema */,
+                                const Schema& /* client_schema */) {
     return Status::OK();
   }
 
@@ -754,6 +757,8 @@ class RowwiseResultSerializer : public ResultSerializer {
 
   int SerializeRowBlock(const RowBlock& row_block,
                         const Schema* client_projection_schema) override {
+    // TODO(todd) create some kind of serializer object that caches the projection
+    // information to avoid recalculating it on every SerializeRowBlock call.
     int num_selected = kudu::SerializeRowBlock(
         row_block, client_projection_schema,
         &rows_data_, &indirect_data_, pad_unixtime_micros_to_16_bytes_);
@@ -795,18 +800,21 @@ class RowwiseResultSerializer : public ResultSerializer {
 
 class ColumnarResultSerializer : public ResultSerializer {
  public:
-  static Status Create(uint64_t flags, unique_ptr<ResultSerializer>* serializer) {
+  static Status Create(uint64_t flags,
+                       const Schema& scanner_schema,
+                       const Schema& client_schema,
+                       unique_ptr<ResultSerializer>* serializer) {
     if (flags & ~RowFormatFlags::COLUMNAR_LAYOUT) {
       return Status::InvalidArgument("Row format flags not supported with columnar layout");
     }
-    serializer->reset(new ColumnarResultSerializer());
+    serializer->reset(new ColumnarResultSerializer(scanner_schema, client_schema));
     return Status::OK();
   }
 
   int SerializeRowBlock(const RowBlock& row_block,
-                        const Schema* client_projection_schema) override {
+                        const Schema* /* unused */) override {
     CHECK(!done_);
-    int n_sel = SerializeRowBlockColumnar(row_block, client_projection_schema, &results_);
+    int n_sel = results_.AddRowBlock(row_block);
     num_rows_ += n_sel;
     return n_sel;
   }
@@ -815,7 +823,7 @@ class ColumnarResultSerializer : public ResultSerializer {
     CHECK(!done_);
 
     int total = 0;
-    for (const auto& col : results_.columns) {
+    for (const auto& col : results_.columns()) {
       total += col.data.size();
       if (col.varlen_data) {
         total += col.varlen_data->size();
@@ -831,7 +839,8 @@ class ColumnarResultSerializer : public ResultSerializer {
     CHECK(!done_);
     done_ = true;
     ColumnarRowBlockPB* data = resp->mutable_columnar_data();
-    for (auto& col : results_.columns) {
+    auto cols = std::move(results_).TakeColumns();
+    for (auto& col : cols) {
       auto* col_pb = data->add_columns();
       int sidecar_idx;
       CHECK_OK(context->AddOutboundSidecar(
@@ -854,7 +863,10 @@ class ColumnarResultSerializer : public ResultSerializer {
   }
 
  private:
-  ColumnarResultSerializer() {}
+  ColumnarResultSerializer(const Schema& scanner_schema,
+                           const Schema& client_schema)
+      : results_(scanner_schema, client_schema) {
+  }
 
   int64_t num_rows_ = 0;
   ColumnarSerializedBatch results_;
@@ -898,14 +910,17 @@ class ScanResultCopier : public ScanResultCollector {
     return num_rows_returned_;
   }
 
-  Status InitSerializer(uint64_t row_format_flags) override {
+  Status InitSerializer(uint64_t row_format_flags,
+                        const Schema& scanner_schema,
+                        const Schema& client_schema) override {
     if (serializer_) {
       // TODO(todd) for the NewScanner case, this gets called twice
       // which is a bit ugly. Refactor to avoid!
       return Status::OK();
     }
     if (row_format_flags & COLUMNAR_LAYOUT) {
-      return ColumnarResultSerializer::Create(row_format_flags, &serializer_);
+      return ColumnarResultSerializer::Create(
+          row_format_flags, scanner_schema, client_schema, &serializer_);
     }
     serializer_.reset(new RowwiseResultSerializer(batch_size_bytes_, row_format_flags));
     return Status::OK();
@@ -2432,14 +2447,6 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
   TRACE_EVENT1("tserver", "TabletServiceImpl::HandleNewScanRequest",
                "tablet_id", scan_pb.tablet_id());
 
-  Status s = result_collector->InitSerializer(scan_pb.row_format_flags());
-  if (!s.ok()) {
-    *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
-    return s;
-  }
-
-  const Schema& tablet_schema = replica->tablet_metadata()->schema();
-
   SharedScanner scanner;
   server_->scanner_manager()->NewScanner(replica,
                                          rpc_context->remote_user(),
@@ -2455,7 +2462,7 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
   // Create the user's requested projection.
   // TODO(todd): Add test cases for bad projections including 0 columns.
   Schema projection;
-  s = ColumnPBsToSchema(scan_pb.projected_columns(), &projection);
+  Status s = ColumnPBsToSchema(scan_pb.projected_columns(), &projection);
   if (PREDICT_FALSE(!s.ok())) {
     *error_code = TabletServerErrorPB::INVALID_SCHEMA;
     return s;
@@ -2480,6 +2487,8 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
     }
   }
 
+  const Schema& tablet_schema = replica->tablet_metadata()->schema();
+
   ScanSpec spec;
   s = SetupScanSpec(scan_pb, tablet_schema, scanner, &spec);
   if (PREDICT_FALSE(!s.ok())) {
@@ -2498,11 +2507,6 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
   // NOTE: We should build the missing column after optimizing scan which will
   // remove unnecessary predicates.
   vector<ColumnSchema> missing_cols = spec.GetMissingColumns(projection);
-  if (spec.CanShortCircuit()) {
-    VLOG(1) << "short-circuiting without creating a server-side scanner.";
-    *has_more_results = false;
-    return Status::OK();
-  }
 
   // Store the original projection.
   {
@@ -2551,6 +2555,20 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
   projection = projection_builder.BuildWithoutIds();
   VLOG(3) << "Scan projection: " << projection.ToString(Schema::BASE_INFO);
 
+  s = result_collector->InitSerializer(scan_pb.row_format_flags(),
+                                       projection,
+                                       *scanner->client_projection_schema());
+  if (!s.ok()) {
+    *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
+    return s;
+  }
+
+  if (spec.CanShortCircuit()) {
+    VLOG(1) << "short-circuiting without creating a server-side scanner.";
+    *has_more_results = false;
+    return Status::OK();
+  }
+
   // It's important to keep the reference to the tablet for the case when the
   // tablet replica's shutdown is run concurrently with the code below.
   shared_ptr<Tablet> tablet;
@@ -2741,13 +2759,6 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
           << SecureShortDebugString(*req);
   TRACE("Found scanner $0 for tablet $1", scanner->id(), scanner->tablet_id());
 
-  // Set the row format flags on the ScanResultCollector.
-  s = result_collector->InitSerializer(scanner->row_format_flags());
-  if (!s.ok()) {
-    *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
-    return s;
-  }
-
   if (batch_size_bytes == 0 && req->close_scanner()) {
     *has_more_results = false;
     return Status::OK();
@@ -2761,11 +2772,20 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
 
   RowwiseIterator* iter = scanner->iter();
 
+  // Set the row format flags on the ScanResultCollector.
+  s = result_collector->InitSerializer(scanner->row_format_flags(),
+                                       iter->schema(),
+                                       *scanner->client_projection_schema());
+  if (!s.ok()) {
+    *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
+    return s;
+  }
+
   // TODO(todd): could size the RowBlock based on the user's requested batch size?
   // If people had really large indirect objects, we would currently overshoot
   // their requested batch size by a lot.
   Arena arena(32 * 1024);
-  RowBlock block(&scanner->iter()->schema(),
+  RowBlock block(&iter->schema(),
                  FLAGS_scanner_batch_size_rows, &arena);
 
   // TODO(todd): in the future, use the client timeout to set a budget. For now,