You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/11/20 23:20:35 UTC

[2/2] kudu git commit: wire_protocol: optimize RewriteRowBlockPointers

wire_protocol: optimize RewriteRowBlockPointers

I was looking at why our tpch1 benchmark performance seems to have been
creeping up over recent months and started profiling it a bit. I didn't
see any reason for a regression but did notice that
RewriteRowBlockPointers ends up being memory-bandwidth-bound, so took a
few minutes to optimize it.

The old code did one pass through the data for each column. In the case
that the rowblock was larger than CPU cache, this meant streaming a lot
of data from L2, L3, or even memory, which bottlenecked it. The new code
is branchier but does only a single pass through the data.

This reduced the client side CPU usage of the query in tpch_real_world
from about 70ms/iteration to about 40ms/iteration.

Change-Id: Ie428e595e9f564762bbdbd09dc6a5f312abe9aec
Reviewed-on: http://gerrit.cloudera.org:8080/8554
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <aw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/f9d85f53
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/f9d85f53
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/f9d85f53

Branch: refs/heads/master
Commit: f9d85f53b3aa25d7ec17ca9a247c9c7cc7d1a6d6
Parents: 338d8a8
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Nov 15 11:55:47 2017 -0800
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Mon Nov 20 20:39:34 2017 +0000

----------------------------------------------------------------------
 src/kudu/common/wire_protocol.cc | 117 ++++++++++++++++++++--------------
 1 file changed, 69 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/f9d85f53/src/kudu/common/wire_protocol.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/wire_protocol.cc b/src/kudu/common/wire_protocol.cc
index 9dc3544..e4c8606 100644
--- a/src/kudu/common/wire_protocol.cc
+++ b/src/kudu/common/wire_protocol.cc
@@ -17,9 +17,8 @@
 
 #include "kudu/common/wire_protocol.h"
 
-#include <cinttypes>
-#include <cstring>
 #include <cstdint>
+#include <cstring>
 #include <ostream>
 #include <string>
 #include <vector>
@@ -36,8 +35,8 @@
 #include "kudu/common/types.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/metadata.pb.h"
+#include "kudu/gutil/fixedarray.h"
 #include "kudu/gutil/port.h"
-#include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/fastmem.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/bitmap.h"
@@ -55,6 +54,7 @@ using kudu::pb_util::SecureDebugString;
 using kudu::pb_util::SecureShortDebugString;
 using std::string;
 using std::vector;
+using strings::Substitute;
 
 namespace kudu {
 
@@ -547,73 +547,94 @@ Status RewriteRowBlockPointers(const Schema& schema, const RowwiseRowBlockPB& ro
   // request? Maybe we should suck it up and copy the data when we mutate?
 
   size_t total_padding = 0;
-  // If we're padding UNIXTIME_MICROS for Impala we need to calculate the total padding
-  // size to adjust the row_stride.
-  if (pad_unixtime_micros_to_16_bytes) {
-    for (int i = 0; i < schema.num_columns(); i++) {
-      if (schema.column(i).type_info()->type() == UNIXTIME_MICROS) {
-        total_padding += 8;
-      }
+  int num_binary_cols = 0;
+  for (int i = 0; i < schema.num_columns(); i++) {
+    // If we're padding UNIXTIME_MICROS for Impala we need to calculate the total padding
+    // size to adjust the row_stride.
+    if (pad_unixtime_micros_to_16_bytes &&
+        schema.column(i).type_info()->type() == UNIXTIME_MICROS) {
+      total_padding += 8;
+    }
+    if (schema.column(i).type_info()->physical_type() == BINARY) {
+      num_binary_cols++;
     }
   }
 
-  size_t row_stride = ContiguousRowHelper::row_size(schema) + total_padding;
+  const size_t row_stride = ContiguousRowHelper::row_size(schema) + total_padding;
 
   // We don't need a const-cast because we can just use Slice's lack of
   // const-safety.
   uint8_t* row_data = row_data_slice->mutable_data();
   const uint8_t* indir_data = indirect_data_slice.data();
-  size_t expected_data_size = rowblock_pb.num_rows() * row_stride;
-  size_t null_bitmap_offset = schema.byte_size() + total_padding;
+  const size_t expected_data_size = rowblock_pb.num_rows() * row_stride;
+  const size_t null_bitmap_offset = schema.byte_size() + total_padding;
 
   if (PREDICT_FALSE(row_data_slice->size() != expected_data_size)) {
     return Status::Corruption(
-      StringPrintf("Row block has %zd bytes of data but expected %zd for %" PRIu32 " rows",
+      Substitute("Row block has $0 bytes of data but expected $1 for $2 rows",
                    row_data_slice->size(), expected_data_size, rowblock_pb.num_rows()));
   }
 
-  size_t padding_so_far = 0;
+  if (num_binary_cols == 0) return Status::OK();
+
+  // Calculate the offset information for the columns which need rewriting.
+  // Calculating this up front means we can avoid re-calculating this redundant
+  // information once per row.
+  struct ToRewrite {
+    int col_idx;
+    int col_offset;
+    bool nullable;
+  };
+  FixedArray<ToRewrite> to_rewrite(num_binary_cols);
+
+  int padding_so_far = 0;
+  int j = 0;
   for (int i = 0; i < schema.num_columns(); i++) {
     const ColumnSchema& col = schema.column(i);
-    if (col.type_info()->type() == UNIXTIME_MICROS && pad_unixtime_micros_to_16_bytes) {
+    if (pad_unixtime_micros_to_16_bytes &&
+        col.type_info()->type() == UNIXTIME_MICROS) {
       padding_so_far += 8;
-      continue;
     }
-    if (col.type_info()->physical_type() != BINARY) {
-      continue;
+    if (col.type_info()->physical_type() == BINARY) {
+      int column_offset = schema.column_offset(i) + padding_so_far;
+      to_rewrite[j++] = { i, column_offset, col.is_nullable() };
     }
-
-    size_t column_offset = schema.column_offset(i) + padding_so_far;
-
-    int row_idx = 0;
-    size_t row_offset = 0;
-    while (row_offset < row_data_slice->size()) {
-      uint8_t* row_ptr = row_data + row_offset;
-      uint8_t* cell_ptr = row_ptr + column_offset;
-
-      if (!col.is_nullable() || !BitmapTest(row_ptr + null_bitmap_offset, i)) {
-        // The pointer is currently an offset into indir_data. Need to replace it
-        // with the actual pointer into indir_data
-        Slice *slice = reinterpret_cast<Slice *>(cell_ptr);
-        size_t offset_in_indirect = reinterpret_cast<uintptr_t>(slice->data());
-
-        // Ensure the updated pointer is within the bounds of the indirect data.
-        bool overflowed = false;
-        size_t max_offset = AddWithOverflowCheck(offset_in_indirect, slice->size(), &overflowed);
-        if (PREDICT_FALSE(overflowed || max_offset > indirect_data_slice.size())) {
-          return Status::Corruption(
-            StringPrintf("Row #%d contained bad indirect slice for column %s: (%zd, %zd)",
-                         row_idx, col.ToString().c_str(),
-                         reinterpret_cast<uintptr_t>(slice->data()),
-                         slice->size()));
-        }
-        *slice = Slice(&indir_data[offset_in_indirect], slice->size());
+  }
+  DCHECK_EQ(j, num_binary_cols);
+
+  // Iterate through the rows and rewrite columns as necessary.
+  // NOTE: we do this row-by-row instead of column-by-column because
+  // the input data is typically much larger than L1 cache, and thus
+  // doing one pass over the memory is faster.
+  uint8_t* row_ptr = row_data;
+  for (int row_idx = 0;
+       row_idx < rowblock_pb.num_rows();
+       row_idx++) {
+    for (const auto& t : to_rewrite) {
+      uint8_t* cell_ptr = row_ptr + t.col_offset;
+
+      if (t.nullable && BitmapTest(row_ptr + null_bitmap_offset, t.col_idx)) {
+        // No need to rewrite null values.
+        continue;
       }
 
-      // Advance to next row
-      row_offset += row_stride;
-      row_idx++;
+      // The pointer is currently an offset into indir_data. Need to replace it
+      // with the actual pointer into indir_data
+      Slice *slice = reinterpret_cast<Slice *>(cell_ptr);
+      size_t offset_in_indirect = reinterpret_cast<uintptr_t>(slice->data());
+
+      // Ensure the updated pointer is within the bounds of the indirect data.
+      bool overflowed = false;
+      size_t max_offset = AddWithOverflowCheck(offset_in_indirect, slice->size(), &overflowed);
+      if (PREDICT_FALSE(overflowed || max_offset > indirect_data_slice.size())) {
+        const auto& col = schema.column(t.col_idx);
+        return Status::Corruption(
+            Substitute("Row #$0 contained bad indirect slice for column $1: ($2, $3)",
+                       row_idx, col.ToString(), offset_in_indirect, slice->size()));
+      }
+      *slice = Slice(&indir_data[offset_in_indirect], slice->size());
     }
+    row_ptr += row_stride;
   }
 
   return Status::OK();