You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2020/03/25 18:10:18 UTC

[kudu] branch master updated: wire_protocol: some simplification and optimization for rowwise encoding

This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ae02a9  wire_protocol: some simplification and optimization for rowwise encoding
3ae02a9 is described below

commit 3ae02a92ccb99aa1a6b0c3d05deea75756aaa6e3
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Tue Mar 24 14:23:34 2020 -0700

    wire_protocol: some simplification and optimization for rowwise encoding
    
    * Re-implement GetSelectedRows based on the new ForEachSetBit(...)
      utility, which operates word-by-word instead of byte-by-byte.
    
    * use a boolean return value which indicates the common case of "all
      rows are selected". Currently the rowwise serialization code path
      doesn't use this special value (and just reproduces the old
      std::iota() call to generate the sequence of all indexes), but the
      columnar code path will special case this as a memcpy.
    
    * Avoid one call to CountSelected() in SerializeRowBlock() by calculating
      num_rows from the size of the row index vector.
    
    * Change SerializeRowBlock() to return an int indicating the number of
      rows selected, instead of accumulating it into the protobuf. This
      value can then be re-used to eliminate one extra call to CountSelected
      in ScanResultCopier::HandleRowBlock().
    
      After this change, the protobuf is no longer used by
      SerializeRowBlock, so I removed the parameter, which required a bit of
      fixup in the tests.
    
    Change-Id: I24dfb1bd036fde514ca6494bae0ddc171dd225dd
    Reviewed-on: http://gerrit.cloudera.org:8080/15550
    Reviewed-by: Grant Henke <gr...@apache.org>
    Tested-by: Todd Lipcon <to...@apache.org>
---
 src/kudu/common/rowblock-test.cc      | 14 ++++++------
 src/kudu/common/rowblock.cc           | 42 +++++++++++++++++++----------------
 src/kudu/common/rowblock.h            |  8 ++++++-
 src/kudu/common/wire_protocol-test.cc | 26 ++++++++++------------
 src/kudu/common/wire_protocol.cc      | 35 ++++++++++++++++++++---------
 src/kudu/common/wire_protocol.h       | 12 +++++-----
 src/kudu/tserver/tablet_service.cc    | 21 +++++++++---------
 7 files changed, 90 insertions(+), 68 deletions(-)

diff --git a/src/kudu/common/rowblock-test.cc b/src/kudu/common/rowblock-test.cc
index c2d5a84..f9e9aca 100644
--- a/src/kudu/common/rowblock-test.cc
+++ b/src/kudu/common/rowblock-test.cc
@@ -18,6 +18,7 @@
 #include "kudu/common/rowblock.h"
 
 #include <cstddef>
+#include <cstdint>
 #include <vector>
 
 #include <gtest/gtest.h>
@@ -59,28 +60,27 @@ TEST(TestSelectionVector, TestNonByteAligned) {
   ASSERT_EQ(sv.nrows(), sv.CountSelected());
   ASSERT_TRUE(sv.AnySelected());
 
-  vector<int> sel;
-  sv.GetSelectedRows(&sel);
-  ASSERT_EQ(sv.nrows(), sel.size());
+  vector<uint16_t> sel;
+  ASSERT_FALSE(sv.GetSelectedRows(&sel));
 
   for (size_t i = 0; i < sv.nrows(); i++) {
     sv.SetRowUnselected(i);
   }
   ASSERT_EQ(0, sv.CountSelected());
   ASSERT_FALSE(sv.AnySelected());
-  sv.GetSelectedRows(&sel);
+  ASSERT_TRUE(sv.GetSelectedRows(&sel));
   ASSERT_EQ(0, sel.size());
 }
 
 TEST(TestSelectionVector, TestGetSelectedRows) {
-  vector<int> expected = {1, 4, 9, 10, 18};
+  vector<uint16_t> expected = {1, 4, 9, 10, 18};
   SelectionVector sv(20);
   sv.SetAllFalse();
   for (int i : expected) {
     sv.SetRowSelected(i);
   }
-  vector<int> selected;
-  sv.GetSelectedRows(&selected);
+  vector<uint16_t> selected;
+  ASSERT_TRUE(sv.GetSelectedRows(&selected));
   ASSERT_EQ(expected, selected);
 }
 
diff --git a/src/kudu/common/rowblock.cc b/src/kudu/common/rowblock.cc
index 7e8b22b..74d1964 100644
--- a/src/kudu/common/rowblock.cc
+++ b/src/kudu/common/rowblock.cc
@@ -16,7 +16,7 @@
 // under the License.
 #include "kudu/common/rowblock.h"
 
-#include <numeric>
+#include <limits>
 #include <vector>
 
 #include <glog/logging.h>
@@ -74,31 +74,35 @@ void SelectionVector::ClearToSelectAtMost(size_t max_rows) {
   }
 }
 
-void SelectionVector::GetSelectedRows(vector<int>* selected) const {
+
+// TODO(todd) this is a bit faster when implemented with target "bmi" enabled.
+// Consider duplicating it and doing runtime switching.
+static void GetSelectedRowsInternal(const uint8_t* __restrict__ bitmap,
+                                    int n_bytes,
+                                    uint16_t* __restrict__ dst) {
+  ForEachSetBit(bitmap, n_bytes * 8,
+                [&](int bit) {
+                  *dst++ = bit;
+                });
+}
+
+bool SelectionVector::GetSelectedRows(vector<uint16_t>* selected) const {
+  CHECK_LE(n_rows_, std::numeric_limits<uint16_t>::max());
   int n_selected = CountSelected();
-  selected->resize(n_selected);
-  if (n_selected == 0) {
-    return;
-  }
   if (n_selected == n_rows_) {
-    std::iota(selected->begin(), selected->end(), 0);
-    return;
+    selected->clear();
+    return false;
   }
 
-  const uint8_t* bitmap = &bitmap_[0];
-  int* dst = selected->data();
-  // Within each byte, keep flipping the least significant non-zero bit and adding
-  // the bit index to the output until none are set.
-  for (int i = 0; i < n_bytes_; i++) {
-    uint8_t bm = *bitmap++;
-    while (bm != 0) {
-      int bit = Bits::FindLSBSetNonZero(bm);
-      *dst++ = (i * 8) + bit;
-      bm ^= (1 << bit);
-    }
+  selected->resize(n_selected);
+  if (n_selected == 0) {
+    return true;
   }
+  GetSelectedRowsInternal(&bitmap_[0], n_bytes_, selected->data());
+  return true;
 }
 
+
 size_t SelectionVector::CountSelected() const {
   return Bits::Count(&bitmap_[0], n_bytes_);
 }
diff --git a/src/kudu/common/rowblock.h b/src/kudu/common/rowblock.h
index 04fa419..cedc4a7 100644
--- a/src/kudu/common/rowblock.h
+++ b/src/kudu/common/rowblock.h
@@ -18,6 +18,7 @@
 
 #include <cstdint>
 #include <cstring>
+#include <memory>
 #include <string>
 #include <vector>
 
@@ -27,6 +28,7 @@
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/stringpiece.h"
 #include "kudu/util/bitmap.h"
 #include "kudu/util/status.h"
@@ -102,7 +104,11 @@ class SelectionVector {
 
   // Sets '*selected' to the indices of all rows marked as selected
   // in this selection vector.
-  void GetSelectedRows(std::vector<int>* selected) const;
+  //
+  // NOTE: in the case that all rows are selected, a fast path is triggered
+  // in which false is returned with an empty 'selected'. Otherwise, returns
+  // true.
+  bool GetSelectedRows(std::vector<uint16_t>* selected) const WARN_UNUSED_RESULT;
 
   uint8_t *mutable_bitmap() {
     return &bitmap_[0];
diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc
index d85e947..4c9ac0d 100644
--- a/src/kudu/common/wire_protocol-test.cc
+++ b/src/kudu/common/wire_protocol-test.cc
@@ -44,7 +44,6 @@
 #include "kudu/util/hash.pb.h"
 #include "kudu/util/hexdump.h"
 #include "kudu/util/memory/arena.h"
-#include "kudu/util/pb_util.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/stopwatch.h"  // IWYU pragma: keep
@@ -157,14 +156,12 @@ class WireProtocolTest : public KuduTest,
     FillRowBlockForBenchmark(&block);
     SelectRandomRowsWithRate(&block, select_rate);
 
-    RowwiseRowBlockPB pb;
     faststring direct, indirect;
     int64_t cycle_start = CycleClock::Now();
     for (int i = 0; i < kNumTrials; ++i) {
-      pb.Clear();
       direct.clear();
       indirect.clear();
-      SerializeRowBlock(block, &pb, nullptr, &direct, &indirect);
+      SerializeRowBlock(block, nullptr, &direct, &indirect);
     }
     int64_t cycle_end = CycleClock::Now();
     LOG(INFO) << Substitute(
@@ -315,19 +312,20 @@ TEST_F(WireProtocolTest, TestColumnarRowBlockToPB) {
   FillRowBlockWithTestRows(&block);
 
   // Convert to PB.
-  RowwiseRowBlockPB pb;
   faststring direct, indirect;
-  SerializeRowBlock(block, &pb, nullptr, &direct, &indirect);
-  SCOPED_TRACE(pb_util::SecureDebugString(pb));
+  int num_rows = SerializeRowBlock(block, nullptr, &direct, &indirect);
   SCOPED_TRACE("Row data: " + direct.ToString());
   SCOPED_TRACE("Indirect data: " + indirect.ToString());
 
   // Convert back to a row, ensure that the resulting row is the same
   // as the one we put in.
+  RowwiseRowBlockPB pb;
+  pb.set_num_rows(num_rows);
+
   vector<const uint8_t*> row_ptrs;
   Slice direct_sidecar = direct;
   ASSERT_OK(ExtractRowsFromRowBlockPB(schema_, pb, indirect,
-                                             &direct_sidecar, &row_ptrs));
+                                      &direct_sidecar, &row_ptrs));
   ASSERT_EQ(block.nrows(), row_ptrs.size());
   for (int i = 0; i < block.nrows(); ++i) {
     ConstContiguousRow row_roundtripped(&schema_, row_ptrs[i]);
@@ -375,16 +373,17 @@ TEST_F(WireProtocolTest, TestColumnarRowBlockToPBWithPadding) {
                        ColumnSchema("col3", INT32, true /* nullable */)}, 0);
 
   // Convert to PB.
-  RowwiseRowBlockPB pb;
   faststring direct, indirect;
-  SerializeRowBlock(block, &pb, &proj_schema, &direct, &indirect, true /* pad timestamps */);
-  SCOPED_TRACE(pb_util::SecureDebugString(pb));
+  int num_rows = SerializeRowBlock(block, &proj_schema, &direct, &indirect,
+                                   true /* pad timestamps */);
   SCOPED_TRACE("Row data: " + HexDump(direct));
   SCOPED_TRACE("Indirect data: " + HexDump(indirect));
 
   // Convert back to a row, ensure that the resulting row is the same
   // as the one we put in. Can't reuse the decoding methods since we
   // won't support decoding padded rows within Kudu.
+  RowwiseRowBlockPB pb;
+  pb.set_num_rows(num_rows);
   vector<const uint8_t*> row_ptrs;
   Slice direct_sidecar = direct;
   Slice indirect_sidecar = indirect;
@@ -476,10 +475,9 @@ TEST_F(WireProtocolTest, TestBlockWithNoColumns) {
   ASSERT_EQ(900, block.selection_vector()->CountSelected());
 
   // Convert it to protobuf, ensure that the results look right.
-  RowwiseRowBlockPB pb;
   faststring direct, indirect;
-  SerializeRowBlock(block, &pb, nullptr, &direct, &indirect);
-  ASSERT_EQ(900, pb.num_rows());
+  int num_rows = SerializeRowBlock(block, nullptr, &direct, &indirect);
+  ASSERT_EQ(900, num_rows);
 }
 
 TEST_F(WireProtocolTest, TestColumnDefaultValue) {
diff --git a/src/kudu/common/wire_protocol.cc b/src/kudu/common/wire_protocol.cc
index 533de57..5b2bc3f 100644
--- a/src/kudu/common/wire_protocol.cc
+++ b/src/kudu/common/wire_protocol.cc
@@ -22,6 +22,7 @@
 #include <algorithm>
 #include <cstdint>
 #include <cstring>
+#include <numeric>
 #include <ostream>
 #include <string>
 #include <vector>
@@ -891,7 +892,7 @@ static void CopyColumn(
     const ColumnBlock& column_block, int dst_col_idx, uint8_t* __restrict__ dst_base,
     faststring* indirect_data, const Schema* dst_schema, size_t row_stride,
     size_t schema_byte_size, size_t column_offset,
-    const vector<int>& row_idx_select) {
+    const vector<uint16_t>& row_idx_select) {
   DCHECK(dst_schema);
   uint8_t* dst = dst_base + column_offset;
   size_t offset_to_non_null_bitmap = schema_byte_size - column_offset;
@@ -927,13 +928,28 @@ static void CopyColumn(
 // Because we use a faststring here, ASAN tests become unbearably slow
 // with the extra verifications.
 ATTRIBUTE_NO_ADDRESS_SAFETY_ANALYSIS
-void SerializeRowBlock(const RowBlock& block,
-                       RowwiseRowBlockPB* rowblock_pb,
-                       const Schema* projection_schema,
-                       faststring* data_buf,
-                       faststring* indirect_data,
-                       bool pad_unixtime_micros_to_16_bytes) {
+int SerializeRowBlock(const RowBlock& block,
+                      const Schema* projection_schema,
+                      faststring* data_buf,
+                      faststring* indirect_data,
+                      bool pad_unixtime_micros_to_16_bytes) {
   DCHECK_GT(block.nrows(), 0);
+
+  vector<uint16_t> selected_row_indexes;
+  bool all_selected = !block.selection_vector()->GetSelectedRows(
+      &selected_row_indexes);
+  if (all_selected) {
+    // TODO(todd): add a fast-path for this in the 'Copy' functions.
+    selected_row_indexes.resize(block.nrows());
+    std::iota(selected_row_indexes.begin(),
+              selected_row_indexes.end(), 0);
+  }
+  size_t num_rows = selected_row_indexes.size();
+
+  // Fast-path empty blocks (eg because the predicate didn't match any rows or
+  // all rows in the block were deleted)
+  if (num_rows == 0) return 0;
+
   const Schema* tablet_schema = block.schema();
 
   if (projection_schema == nullptr) {
@@ -958,7 +974,6 @@ void SerializeRowBlock(const RowBlock& block,
 
   size_t old_size = data_buf->size();
   size_t row_stride = ContiguousRowHelper::row_size(*projection_schema) + total_padding;
-  size_t num_rows = block.selection_vector()->CountSelected();
   size_t schema_byte_size = projection_schema->byte_size() + total_padding;
   size_t additional_size = row_stride * num_rows;
 
@@ -971,8 +986,6 @@ void SerializeRowBlock(const RowBlock& block,
     memset(base, 0, additional_size);
   }
 
-  vector<int> selected_row_indexes;
-  block.selection_vector()->GetSelectedRows(&selected_row_indexes);
   size_t t_schema_idx = 0;
   size_t padding_so_far = 0;
   for (int p_schema_idx = 0; p_schema_idx < projection_schema->num_columns(); p_schema_idx++) {
@@ -1009,7 +1022,7 @@ void SerializeRowBlock(const RowBlock& block,
       padding_so_far += 8;
     }
   }
-  rowblock_pb->set_num_rows(rowblock_pb->num_rows() + num_rows);
+  return num_rows;
 }
 
 string StartTimeToString(const ServerRegistrationPB& reg) {
diff --git a/src/kudu/common/wire_protocol.h b/src/kudu/common/wire_protocol.h
index 6a88b3d..7ac76fd 100644
--- a/src/kudu/common/wire_protocol.h
+++ b/src/kudu/common/wire_protocol.h
@@ -158,7 +158,7 @@ Status ParseInt32Config(const std::string& name, const std::string& value, int32
 Status ExtraConfigPBToPBMap(const TableExtraConfigPB& pb,
                             google::protobuf::Map<std::string, std::string>* configs);
 
-// Encode the given row block into the provided protobuf and data buffers.
+// Encode the given row block into the provided data buffers.
 //
 // All data (both direct and indirect) for each selected row in the RowBlock is
 // copied into the protobuf and faststrings.
@@ -172,10 +172,12 @@ Status ExtraConfigPBToPBMap(const TableExtraConfigPB& pb,
 // schema will be padded to the right by 8 (zero'd) bytes for a total of 16 bytes.
 //
 // Requires that block.nrows() > 0
-void SerializeRowBlock(const RowBlock& block, RowwiseRowBlockPB* rowblock_pb,
-                       const Schema* projection_schema,
-                       faststring* data_buf, faststring* indirect_data,
-                       bool pad_unixtime_micros_to_16_bytes = false);
+//
+// Returns the number of rows serialized.
+int SerializeRowBlock(const RowBlock& block,
+                      const Schema* projection_schema,
+                      faststring* data_buf, faststring* indirect_data,
+                      bool pad_unixtime_micros_to_16_bytes = false);
 
 // Rewrites the data pointed-to by row data slice 'row_data_slice' by replacing
 // relative indirect data pointers with absolute ones in 'indirect_data_slice'.
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index e3f861b..c69e0b3 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -91,7 +91,6 @@
 #include "kudu/tserver/tserver_admin.pb.h"
 #include "kudu/tserver/tserver_service.pb.h"
 #include "kudu/util/auto_release_pool.h"
-#include "kudu/util/bitset.h"
 #include "kudu/util/crc.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/faststring.h"
@@ -758,16 +757,16 @@ class ScanResultCopier : public ScanResultCollector {
         pad_unixtime_micros_to_16_bytes_(false) {}
 
   void HandleRowBlock(Scanner* scanner, const RowBlock& row_block) override {
-    int64_t num_selected = row_block.selection_vector()->CountSelected();
-    // Fast-path empty blocks (eg because the predicate didn't match any rows or
-    // all rows in the block were deleted)
-    if (num_selected == 0) return;
-
-    num_rows_returned_ += num_selected;
-    scanner->add_num_rows_returned(num_selected);
-    SerializeRowBlock(row_block, rowblock_pb_, scanner->client_projection_schema(),
-                      rows_data_, indirect_data_, pad_unixtime_micros_to_16_bytes_);
-    SetLastRow(row_block, &last_primary_key_);
+    int num_selected = SerializeRowBlock(
+        row_block, scanner->client_projection_schema(),
+        rows_data_, indirect_data_, pad_unixtime_micros_to_16_bytes_);
+
+    if (num_selected > 0) {
+      rowblock_pb_->set_num_rows(rowblock_pb_->num_rows() + num_selected);
+      num_rows_returned_ += num_selected;
+      scanner->add_num_rows_returned(num_selected);
+      SetLastRow(row_block, &last_primary_key_);
+    }
   }
 
   // Returns number of bytes buffered to return.