You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2020/04/16 17:45:20 UTC

[kudu] 02/04: columnar_serialization: avoid preallocating 8MB per column

This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit adf5d9f5e847b4aadb82c540c98ac9ac707bfa1c
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Tue Apr 7 15:53:49 2020 -0700

    columnar_serialization: avoid preallocating 8MB per column
    
    Previously we would reserve an 8MB buffer for every column of data to be
    scanned. This wouldn't scale well for high number of concurrent queries
    with lots of columns.
    
    The new approach is to use the configured batch size and apportion that
    memory budget across the columns based on the size of those columns.
    It's not 100% accurate but at least shouldn't overshoot by hundreds of
    MB like the prior approach.
    
    Change-Id: I9b7ff78547792acbd975a606a02ec388dba3a8e8
    Reviewed-on: http://gerrit.cloudera.org:8080/15679
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/common/columnar_serialization.cc | 11 +++++++----
 src/kudu/common/columnar_serialization.h  |  8 +++++++-
 src/kudu/common/wire_protocol-test.cc     |  6 ++++--
 src/kudu/tserver/tablet_service.cc        | 11 +++++++----
 4 files changed, 25 insertions(+), 11 deletions(-)

diff --git a/src/kudu/common/columnar_serialization.cc b/src/kudu/common/columnar_serialization.cc
index 52a3425..6c481d9 100644
--- a/src/kudu/common/columnar_serialization.cc
+++ b/src/kudu/common/columnar_serialization.cc
@@ -588,10 +588,10 @@ void CopySelectedVarlenCellsFromColumn(const ColumnBlock& cblock,
 } // namespace internal
 
 ColumnarSerializedBatch::ColumnarSerializedBatch(const Schema& rowblock_schema,
-                                                 const Schema& client_schema) {
+                                                 const Schema& client_schema,
+                                                 int expected_batch_size_bytes) {
   // Initialize buffers for the columns.
-  // TODO(todd) don't pre-size these to 1MB per column -- quite
-  // expensive if there are a lot of columns!
+  int64_t row_bytes = client_schema.byte_size();
   columns_.reserve(client_schema.num_columns());
   for (const auto& schema_col : client_schema.columns()) {
     columns_.emplace_back();
@@ -600,7 +600,10 @@ ColumnarSerializedBatch::ColumnarSerializedBatch(const Schema& rowblock_schema,
     col.rowblock_schema_col_idx = rowblock_schema.find_column(schema_col.name());
     CHECK_NE(col.rowblock_schema_col_idx, -1);
 
-    col.data.reserve(1024 * 1024);
+    // Size the initial buffer based on the percentage of the total row that this column
+    // takes up. This isn't fully accurate because of costs like the null bitmap or varlen
+    // data, but tries to reasonably apportion the memory budget across the columns.
+    col.data.reserve(schema_col.type_info()->size() * expected_batch_size_bytes / row_bytes);
     if (schema_col.type_info()->physical_type() == BINARY) {
       col.varlen_data.emplace();
     }
diff --git a/src/kudu/common/columnar_serialization.h b/src/kudu/common/columnar_serialization.h
index b4862c7..f5c3d56 100644
--- a/src/kudu/common/columnar_serialization.h
+++ b/src/kudu/common/columnar_serialization.h
@@ -35,10 +35,16 @@ class ColumnarSerializedBatch {
  public:
   // 'rowblock_schema': the schema of the RowBlocks that will be passed to
   //                    AddRowBlock().
+  //
   // 'client_schema': the schema to be returned to the client, which may
   //                  contain a subset of columns
+  //
+  // 'expected_batch_size_bytes':
+  //      the batch size at which the caller expects to stop adding new rows to
+  //      this batch. This is is only a hint and does not affect correctness.
   ColumnarSerializedBatch(const Schema& rowblock_schema,
-                          const Schema& client_schema);
+                          const Schema& client_schema,
+                          int expected_batch_size_bytes);
 
   // Append the data in 'block' into this columnar batch.
   //
diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc
index 9ab534d..fb08521 100644
--- a/src/kudu/common/wire_protocol-test.cc
+++ b/src/kudu/common/wire_protocol-test.cc
@@ -298,6 +298,7 @@ TEST_F(WireProtocolTest, TestRowBlockToRowwisePB) {
 TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) {
   // Generate several blocks of random data.
   static constexpr int kNumBlocks = 3;
+  static constexpr int kBatchSizeBytes = 8192 * 1024;
   Arena arena(1024);
   std::list<RowBlock> blocks;
   for (int i = 0; i < kNumBlocks; i++) {
@@ -306,7 +307,7 @@ TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) {
   }
 
   // Convert all of the RowBlocks to a single serialized (concatenated) columnar format.
-  ColumnarSerializedBatch batch(schema_, schema_);
+  ColumnarSerializedBatch batch(schema_, schema_, kBatchSizeBytes);
   for (const auto& block : blocks) {
     batch.AddRowBlock(block);
   }
@@ -464,7 +465,8 @@ struct RowwiseConverter {
 
 struct ColumnarConverter {
   static void Run(const RowBlock& block) {
-    ColumnarSerializedBatch batch(*block.schema(), *block.schema());
+    constexpr int kBatchSizeBytes = 8192 * 1024;
+    ColumnarSerializedBatch batch(*block.schema(), *block.schema(), kBatchSizeBytes);
     batch.AddRowBlock(block);
   }
 
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index edfc1fd..927182c 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -801,13 +801,15 @@ class RowwiseResultSerializer : public ResultSerializer {
 class ColumnarResultSerializer : public ResultSerializer {
  public:
   static Status Create(uint64_t flags,
+                       int batch_size_bytes,
                        const Schema& scanner_schema,
                        const Schema& client_schema,
                        unique_ptr<ResultSerializer>* serializer) {
     if (flags & ~RowFormatFlags::COLUMNAR_LAYOUT) {
       return Status::InvalidArgument("Row format flags not supported with columnar layout");
     }
-    serializer->reset(new ColumnarResultSerializer(scanner_schema, client_schema));
+    serializer->reset(new ColumnarResultSerializer(
+        scanner_schema, client_schema, batch_size_bytes));
     return Status::OK();
   }
 
@@ -864,8 +866,9 @@ class ColumnarResultSerializer : public ResultSerializer {
 
  private:
   ColumnarResultSerializer(const Schema& scanner_schema,
-                           const Schema& client_schema)
-      : results_(scanner_schema, client_schema) {
+                           const Schema& client_schema,
+                           int batch_size_bytes)
+      : results_(scanner_schema, client_schema, batch_size_bytes) {
   }
 
   int64_t num_rows_ = 0;
@@ -920,7 +923,7 @@ class ScanResultCopier : public ScanResultCollector {
     }
     if (row_format_flags & COLUMNAR_LAYOUT) {
       return ColumnarResultSerializer::Create(
-          row_format_flags, scanner_schema, client_schema, &serializer_);
+          row_format_flags, batch_size_bytes_, scanner_schema, client_schema, &serializer_);
     }
     serializer_.reset(new RowwiseResultSerializer(batch_size_bytes_, row_format_flags));
     return Status::OK();