You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2019/08/24 00:59:12 UTC

[kudu] 03/05: KUDU-2847: Optimize iteration over selection vector in SerializeRowBlock

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 7116a0841ca4dcd62d3e6338cfbd6672df86770a
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Tue Jul 23 22:52:51 2019 -0700

    KUDU-2847: Optimize iteration over selection vector in SerializeRowBlock
    
    This improves the performance of serializing RowBlocks to the wire by
    amortizing the cost of iterating over the set bits of the selection
    bitmap. Instead of using the bitmap once per column, we convert the
    bitmap to a list of set indices up front, and then use those indices for
    conversion.
    
    This changes the benchmarks to report cycles/cell instead of raw times,
    making it easier to see the effects of column count or sparsity.
    
    Benchmark results:
      column count 3 and row select rate 1:     5.12520529   ->  5.44280228 cycles/cell
      column count 3 and row select rate 0.8:   12.74473127  ->  7.04588262 cycles/cell
      column count 3 and row select rate 0.5:   23.98607461  ->  7.51201477 cycles/cell
      column count 3 and row select rate 0.2:   40.66053179  ->  8.30233998 cycles/cell
      column count 30 and row select rate 1:    15.43040511  ->  15.97765642 cycles/cell
      column count 30 and row select rate 0.8:  23.7480557   ->  17.84433817 cycles/cell
      column count 30 and row select rate 0.5:  40.08323337  ->  17.67888749 cycles/cell
      column count 30 and row select rate 0.2:  48.62210244  ->  16.56884988 cycles/cell
      column count 300 and row select rate 1:   18.9223316   ->  20.90426976 cycles/cell
      column count 300 and row select rate 0.8: 27.50793008  ->  21.92481189 cycles/cell
      column count 300 and row select rate 0.5: 40.34367716  ->  21.32180024 cycles/cell
      column count 300 and row select rate 0.2: 52.7446843   ->  20.92634437 cycles/cell
    
    Patch co-authored by Zhang Yao.
    
    Change-Id: I19917d1875c46fd4cf98ef8a471b0340a76161e7
    Reviewed-on: http://gerrit.cloudera.org:8080/13721
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/common/rowblock-test.cc      | 21 +++++++++
 src/kudu/common/rowblock.cc           | 30 +++++++++++++
 src/kudu/common/rowblock.h            |  4 ++
 src/kudu/common/wire_protocol-test.cc | 27 +++++++-----
 src/kudu/common/wire_protocol.cc      | 80 ++++++++++++++++-------------------
 5 files changed, 108 insertions(+), 54 deletions(-)

diff --git a/src/kudu/common/rowblock-test.cc b/src/kudu/common/rowblock-test.cc
index 1dd85b8..c2d5a84 100644
--- a/src/kudu/common/rowblock-test.cc
+++ b/src/kudu/common/rowblock-test.cc
@@ -18,9 +18,12 @@
 #include "kudu/common/rowblock.h"
 
 #include <cstddef>
+#include <vector>
 
 #include <gtest/gtest.h>
 
+using std::vector;
+
 namespace kudu {
 
 TEST(TestSelectionVector, TestEquals) {
@@ -56,11 +59,29 @@ TEST(TestSelectionVector, TestNonByteAligned) {
   ASSERT_EQ(sv.nrows(), sv.CountSelected());
   ASSERT_TRUE(sv.AnySelected());
 
+  vector<int> sel;
+  sv.GetSelectedRows(&sel);
+  ASSERT_EQ(sv.nrows(), sel.size());
+
   for (size_t i = 0; i < sv.nrows(); i++) {
     sv.SetRowUnselected(i);
   }
   ASSERT_EQ(0, sv.CountSelected());
   ASSERT_FALSE(sv.AnySelected());
+  sv.GetSelectedRows(&sel);
+  ASSERT_EQ(0, sel.size());
+}
+
+TEST(TestSelectionVector, TestGetSelectedRows) {
+  vector<int> expected = {1, 4, 9, 10, 18};
+  SelectionVector sv(20);
+  sv.SetAllFalse();
+  for (int i : expected) {
+    sv.SetRowSelected(i);
+  }
+  vector<int> selected;
+  sv.GetSelectedRows(&selected);
+  ASSERT_EQ(expected, selected);
 }
 
 } // namespace kudu
diff --git a/src/kudu/common/rowblock.cc b/src/kudu/common/rowblock.cc
index 4996973..f89b379 100644
--- a/src/kudu/common/rowblock.cc
+++ b/src/kudu/common/rowblock.cc
@@ -16,12 +16,17 @@
 // under the License.
 #include "kudu/common/rowblock.h"
 
+#include <numeric>
+#include <vector>
+
 #include <glog/logging.h>
 
 #include "kudu/gutil/bits.h"
 #include "kudu/gutil/port.h"
 #include "kudu/util/bitmap.h"
 
+using std::vector;
+
 namespace kudu {
 
 SelectionVector::SelectionVector(size_t row_capacity)
@@ -69,6 +74,31 @@ void SelectionVector::ClearToSelectAtMost(size_t max_rows) {
   }
 }
 
+void SelectionVector::GetSelectedRows(vector<int>* selected) const {
+  int n_selected = CountSelected();
+  selected->resize(n_selected);
+  if (n_selected == 0) {
+    return;
+  }
+  if (n_selected == n_rows_) {
+    std::iota(selected->begin(), selected->end(), 0);
+    return;
+  }
+
+  const uint8_t* bitmap = &bitmap_[0];
+  int* dst = selected->data();
+  // Within each byte, keep flipping the least significant non-zero bit and adding
+  // the bit index to the output until none are set.
+  for (int i = 0; i < n_bytes_; i++) {
+    uint8_t bm = *bitmap++;
+    while (bm != 0) {
+      int bit = Bits::FindLSBSetNonZero(bm);
+      *dst++ = (i * 8) + bit;
+      bm ^= (1 << bit);
+    }
+  }
+}
+
 size_t SelectionVector::CountSelected() const {
   return Bits::Count(&bitmap_[0], n_bytes_);
 }
diff --git a/src/kudu/common/rowblock.h b/src/kudu/common/rowblock.h
index 678b355..d65ee4e 100644
--- a/src/kudu/common/rowblock.h
+++ b/src/kudu/common/rowblock.h
@@ -103,6 +103,10 @@ class SelectionVector {
     return BitmapFindFirstSet(&bitmap_[0], row_offset, n_rows_, row);
   }
 
+  // Sets '*selected' to the indices of all rows marked as selected
+  // in this selection vector.
+  void GetSelectedRows(std::vector<int>* selected) const;
+
   uint8_t *mutable_bitmap() {
     return &bitmap_[0];
   }
diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc
index 49dc500..cb5294e 100644
--- a/src/kudu/common/wire_protocol-test.cc
+++ b/src/kudu/common/wire_protocol-test.cc
@@ -39,6 +39,7 @@
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
 #include "kudu/util/bitmap.h"
 #include "kudu/util/bloom_filter.h"
 #include "kudu/util/faststring.h"
@@ -151,22 +152,28 @@ class WireProtocolTest : public KuduTest,
   void RunBenchmark(int column_count, double select_rate) {
     ResetBenchmarkSchema(column_count);
     Arena arena(1024);
-    const int kNumTrials = AllowSlowTests() ? 100 : 10;
-    RowBlock block(&benchmark_schema_, 10000, &arena);
+    RowBlock block(&benchmark_schema_, 1000, &arena);
+    // Regardless of the config, use a constant number of cells for the test by
+    // looping the conversion an appropriate number of times.
+    const int64_t kNumCellsToConvert = AllowSlowTests() ? 100000000 : 1000000;
+    const int kNumTrials = kNumCellsToConvert / select_rate / column_count / block.nrows();
     FillRowBlockForBenchmark(&block);
     SelectRandomRowsWithRate(&block, select_rate);
 
     RowwiseRowBlockPB pb;
     faststring direct, indirect;
-    LOG_TIMING(INFO, Substitute("Converting to PB with column count $0 and row select rate $1 ",
-                                column_count, select_rate)) {
-      for (int i = 0; i < kNumTrials; ++i) {
-        pb.Clear();
-        direct.clear();
-        indirect.clear();
-        SerializeRowBlock(block, &pb, nullptr, &direct, &indirect);
-      }
+    int64_t cycle_start = CycleClock::Now();
+    for (int i = 0; i < kNumTrials; ++i) {
+      pb.Clear();
+      direct.clear();
+      indirect.clear();
+      SerializeRowBlock(block, &pb, nullptr, &direct, &indirect);
     }
+    int64_t cycle_end = CycleClock::Now();
+    LOG(INFO) << Substitute(
+        "Converting to PB with column count $0 and row select rate $1: $2 cycles/cell",
+        column_count, select_rate,
+        static_cast<double>(cycle_end - cycle_start) / kNumCellsToConvert);
   }
  protected:
   Schema schema_;
diff --git a/src/kudu/common/wire_protocol.cc b/src/kudu/common/wire_protocol.cc
index 9350231..82e67cd 100644
--- a/src/kudu/common/wire_protocol.cc
+++ b/src/kudu/common/wire_protocol.cc
@@ -907,51 +907,40 @@ void AppendRowToString<RowBlockRow>(const RowBlockRow& row, string* buf) {
 // be copied to column 'dst_col_idx' in the output protobuf; otherwise,
 // dst_col_idx must be equal to col_idx.
 template<bool IS_NULLABLE, bool IS_VARLEN>
-static void CopyColumn(const RowBlock& block, int col_idx, int dst_col_idx, uint8_t* dst_base,
-                       faststring* indirect_data, const Schema* dst_schema, size_t row_stride,
-                       size_t schema_byte_size, size_t column_offset) {
+static void CopyColumn(
+    const ColumnBlock& column_block, int dst_col_idx, uint8_t* __restrict__ dst_base,
+    faststring* indirect_data, const Schema* dst_schema, size_t row_stride,
+    size_t schema_byte_size, size_t column_offset,
+    const vector<int>& row_idx_select) {
   DCHECK(dst_schema);
-  ColumnBlock column_block = block.column_block(col_idx);
   uint8_t* dst = dst_base + column_offset;
   size_t offset_to_null_bitmap = schema_byte_size - column_offset;
 
   size_t cell_size = column_block.stride();
   const uint8_t* src = column_block.cell_ptr(0);
 
-  BitmapIterator selected_row_iter(block.selection_vector()->bitmap(), block.nrows());
-  int run_size;
-  bool selected;
-  int row_idx = 0;
-  while ((run_size = selected_row_iter.Next(&selected))) {
-    if (!selected) {
-      src += run_size * cell_size;
-      row_idx += run_size;
-      continue;
-    }
-    for (int i = 0; i < run_size; i++) {
-      if (IS_NULLABLE && column_block.is_null(row_idx)) {
-        BitmapChange(dst + offset_to_null_bitmap, dst_col_idx, true);
-      } else if (IS_VARLEN) {
-        const Slice *slice = reinterpret_cast<const Slice *>(src);
-        size_t offset_in_indirect = indirect_data->size();
-        indirect_data->append(reinterpret_cast<const char*>(slice->data()), slice->size());
-
-        Slice *dst_slice = reinterpret_cast<Slice *>(dst);
-        *dst_slice = Slice(reinterpret_cast<const uint8_t*>(offset_in_indirect),
-                           slice->size());
-        if (IS_NULLABLE) {
-          BitmapChange(dst + offset_to_null_bitmap, dst_col_idx, false);
-        }
-      } else { // non-string, non-null
-        strings::memcpy_inlined(dst, src, cell_size);
-        if (IS_NULLABLE) {
-          BitmapChange(dst + offset_to_null_bitmap, dst_col_idx, false);
-        }
+  for (auto index : row_idx_select) {
+    src = column_block.cell_ptr(index);
+    if (IS_NULLABLE && column_block.is_null(index)) {
+      BitmapChange(dst + offset_to_null_bitmap, dst_col_idx, true);
+    } else if (IS_VARLEN) {
+      const Slice* slice = reinterpret_cast<const Slice *>(src);
+      size_t offset_in_indirect = indirect_data->size();
+      indirect_data->append(reinterpret_cast<const char*>(slice->data()), slice->size());
+
+      Slice* dst_slice = reinterpret_cast<Slice *>(dst);
+      *dst_slice = Slice(reinterpret_cast<const uint8_t*>(offset_in_indirect),
+                         slice->size());
+      if (IS_NULLABLE) {
+        BitmapChange(dst + offset_to_null_bitmap, dst_col_idx, false);
+      }
+    } else { // non-string, non-null
+      strings::memcpy_inlined(dst, src, cell_size);
+      if (IS_NULLABLE) {
+        BitmapChange(dst + offset_to_null_bitmap, dst_col_idx, false);
       }
-      dst += row_stride;
-      src += cell_size;
-      row_idx++;
     }
+    dst += row_stride;
   }
 }
 
@@ -1002,6 +991,8 @@ void SerializeRowBlock(const RowBlock& block,
     memset(base, 0, additional_size);
   }
 
+  vector<int> selected_row_indexes;
+  block.selection_vector()->GetSelectedRows(&selected_row_indexes);
   size_t t_schema_idx = 0;
   size_t padding_so_far = 0;
   for (int p_schema_idx = 0; p_schema_idx < projection_schema->num_columns(); p_schema_idx++) {
@@ -1010,6 +1001,7 @@ void SerializeRowBlock(const RowBlock& block,
     DCHECK_NE(t_schema_idx, -1);
 
     size_t column_offset = projection_schema->column_offset(p_schema_idx) + padding_so_far;
+    const ColumnBlock& column_block = block.column_block(t_schema_idx);
 
     // Generating different functions for each of these cases makes them much less
     // branch-heavy -- we do the branch once outside the loop, and then have a
@@ -1018,17 +1010,17 @@ void SerializeRowBlock(const RowBlock& block,
     // even bigger gains, since we could inline the constant cell sizes and column
     // offsets.
     if (col.is_nullable() && col.type_info()->physical_type() == BINARY) {
-      CopyColumn<true, true>(block, t_schema_idx, p_schema_idx, base, indirect_data,
-                             projection_schema, row_stride, schema_byte_size, column_offset);
+      CopyColumn<true, true>(column_block, p_schema_idx, base, indirect_data, projection_schema,
+                             row_stride, schema_byte_size, column_offset, selected_row_indexes);
     } else if (col.is_nullable() && col.type_info()->physical_type() != BINARY) {
-      CopyColumn<true, false>(block, t_schema_idx, p_schema_idx, base, indirect_data,
-                              projection_schema, row_stride, schema_byte_size, column_offset);
+      CopyColumn<true, false>(column_block, p_schema_idx, base, indirect_data, projection_schema,
+                              row_stride, schema_byte_size, column_offset, selected_row_indexes);
     } else if (!col.is_nullable() && col.type_info()->physical_type() == BINARY) {
-      CopyColumn<false, true>(block, t_schema_idx, p_schema_idx, base, indirect_data,
-                              projection_schema, row_stride, schema_byte_size, column_offset);
+      CopyColumn<false, true>(column_block, p_schema_idx, base, indirect_data, projection_schema,
+                              row_stride, schema_byte_size, column_offset, selected_row_indexes);
     } else if (!col.is_nullable() && col.type_info()->physical_type() != BINARY) {
-      CopyColumn<false, false>(block, t_schema_idx, p_schema_idx, base, indirect_data,
-                               projection_schema, row_stride, schema_byte_size, column_offset);
+      CopyColumn<false, false>(column_block, p_schema_idx, base, indirect_data, projection_schema,
+                               row_stride, schema_byte_size, column_offset, selected_row_indexes);
     } else {
       LOG(FATAL) << "cannot reach here";
     }