You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2020/04/07 22:21:18 UTC

[kudu] 04/04: wire_protocol: change columnar serialization of varlen data to match Arrow

This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 36a21d6ee4de8828417d7884cf87cd5e2ba15a21
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Mon Apr 6 10:29:39 2020 -0700

    wire_protocol: change columnar serialization of varlen data to match Arrow
    
    This changes the format of variable-length columns serialized on the
    wire to match Apache Arrow instead of our internal column format. The
    Arrow format consists of an array of n+1 offsets for n rows, such that
    the data for cell 'n' spans offset offsets[n]...offsets[n+1].
    
    The obvious advantage here is that clients can zero-copy into Arrow
    structures since the format is compatible. The less obvious advantage is
    that we are going from 16 bytes (sizeof(Slice)) to 4 bytes
    (sizeof(uint32_t offset)) for each serialized string, so this should be
    a savings even for non-arrow users of the API.
    
    This patch also adds some more sanity checking of the wire format in the
    client API so that it's not the responsibility of the caller to guard
    against malicious servers.
    
    Change-Id: Iadf728744feb83f5980e62bea4fd7634a1a52467
    Reviewed-on: http://gerrit.cloudera.org:8080/15660
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/client/client-test.cc            |  18 ++--
 src/kudu/client/columnar_scan_batch.cc    |   8 +-
 src/kudu/client/columnar_scan_batch.h     |  25 ++++--
 src/kudu/client/scanner-internal.cc       |  99 +++++++++++++++-------
 src/kudu/client/scanner-internal.h        |  10 +--
 src/kudu/common/columnar_serialization.cc | 136 ++++++++++++++++++++++--------
 src/kudu/common/columnar_serialization.h  |   4 +-
 src/kudu/common/rowblock.h                |  15 ++++
 src/kudu/common/wire_protocol-test.cc     |  24 +++---
 src/kudu/common/wire_protocol.proto       |   6 +-
 src/kudu/tserver/tablet_server-test.cc    |  16 ++--
 src/kudu/tserver/tablet_service.cc        |  10 +--
 12 files changed, 253 insertions(+), 118 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index d500d6e..6961ff3 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -1065,15 +1065,18 @@ TEST_F(ClientTest, TestColumnarScan) {
 
     // Verify the data.
     Slice col_data[4];
-    for (int i = 0; i < 4; i++) {
-      ASSERT_OK(batch.GetDataForColumn(i, &col_data[i]));
-    }
+    Slice string_indir_data;
+    ASSERT_OK(batch.GetFixedLengthColumn(0, &col_data[0]));
+    ASSERT_OK(batch.GetFixedLengthColumn(1, &col_data[1]));
+    ASSERT_OK(batch.GetVariableLengthColumn(2, &col_data[2], &string_indir_data));
+    ASSERT_OK(batch.GetFixedLengthColumn(3, &col_data[3]));
+
     ArrayView<const int32_t> c0(reinterpret_cast<const int32_t*>(col_data[0].data()),
                                 batch.NumRows());
     ArrayView<const int32_t> c1(reinterpret_cast<const int32_t*>(col_data[1].data()),
                                 batch.NumRows());
-    ArrayView<const Slice> c2(reinterpret_cast<const Slice*>(col_data[2].data()),
-                              batch.NumRows());
+    ArrayView<const uint32_t> c2_offsets(reinterpret_cast<const uint32_t*>(col_data[2].data()),
+                                         batch.NumRows() + 1);
     ArrayView<const int32_t> c3(reinterpret_cast<const int32_t*>(col_data[3].data()),
                                 batch.NumRows());
 
@@ -1081,7 +1084,10 @@ TEST_F(ClientTest, TestColumnarScan) {
       int row_idx = total_rows + i;
       EXPECT_EQ(row_idx, c0[i]);
       EXPECT_EQ(row_idx * 2, c1[i]);
-      EXPECT_EQ(Substitute("hello $0", row_idx), c2[i]);
+
+      Slice str(&string_indir_data[c2_offsets[i]],
+                c2_offsets[i + 1] - c2_offsets[i]);
+      EXPECT_EQ(Substitute("hello $0", row_idx), str);
       EXPECT_EQ(row_idx * 3, c3[i]);
     }
     total_rows += batch.NumRows();
diff --git a/src/kudu/client/columnar_scan_batch.cc b/src/kudu/client/columnar_scan_batch.cc
index b346445..c764665 100644
--- a/src/kudu/client/columnar_scan_batch.cc
+++ b/src/kudu/client/columnar_scan_batch.cc
@@ -38,8 +38,12 @@ int KuduColumnarScanBatch::NumRows() const {
   return data_->resp_data_.num_rows();
 }
 
-Status KuduColumnarScanBatch::GetDataForColumn(int idx, Slice* data) const {
-  return data_->GetDataForColumn(idx, data);
+Status KuduColumnarScanBatch::GetFixedLengthColumn(int idx, Slice* data) const {
+  return data_->GetFixedLengthColumn(idx, data);
+}
+
+Status KuduColumnarScanBatch::GetVariableLengthColumn(int idx, Slice* offsets, Slice* data) const {
+  return data_->GetVariableLengthColumn(idx, offsets, data);
 }
 
 Status KuduColumnarScanBatch::GetNonNullBitmapForColumn(int idx, Slice* data) const {
diff --git a/src/kudu/client/columnar_scan_batch.h b/src/kudu/client/columnar_scan_batch.h
index 23a3e7d..924c162 100644
--- a/src/kudu/client/columnar_scan_batch.h
+++ b/src/kudu/client/columnar_scan_batch.h
@@ -41,6 +41,12 @@ namespace client {
 /// some CPU cycles on the Kudu cluster and can also enable faster processing of the
 /// returned data in certain client applications.
 ///
+/// The columnar data retrieved by this class matches the columnar encoding described by
+/// Apache Arrow[1], but without the alignment and padding guarantees that are made by
+/// the Arrow IPC serialization.
+///
+/// [1] https://arrow.apache.org/docs/format/Columnar.html
+///
 /// NOTE: this class is not thread-safe.
 class KUDU_EXPORT KuduColumnarScanBatch {
  public:
@@ -50,17 +56,26 @@ class KUDU_EXPORT KuduColumnarScanBatch {
   /// @return The number of rows in this batch.
   int NumRows() const;
 
-  /// Get the raw columnar data corresponding to the column with index 'idx'.
+  /// Get the raw columnar data corresponding to the primitive-typed column with index 'idx'.
   ///
-  /// The data is in little-endian packed array format. No alignment is guaranteed.
+  /// The data is in little-endian packed array format. No alignment or padding is guaranteed.
   /// Space is reserved for all cells regardless of whether they might be null.
   /// The data stored in a null cell may or may not be zeroed.
   ///
-  /// If this returns an error for a given column, then a second call for the same
-  /// column has undefined results.
+  /// For variable-length (e.g. STRING, BINARY, VARCHAR) columns, use
+  /// GetVariableLengthColumn instead.
   ///
   /// @note The Slice returned is only valid for the lifetime of the KuduColumnarScanBatch.
-  Status GetDataForColumn(int idx, Slice* data) const;
+  Status GetFixedLengthColumn(int idx, Slice* data) const;
+
+  /// Return the variable-length data for the variable-length-typed column with index 'idx'.
+  ///
+  /// If NumRows() is 0, the 'offsets' array will have length 0. Otherwise, this array
+  /// will contain NumRows() + 1 entries, each indicating an offset within the
+  /// variable-length data array returned in 'data'. For each cell with index 'n',
+  /// offsets[n] indicates the starting offset of that cell, and offsets[n+1] indicates
+  /// the ending offset of that cell.
+  Status GetVariableLengthColumn(int idx, Slice* offsets, Slice* data) const;
 
   /// Get a bitmap corresponding to the non-null status of the cells in the given column.
   ///
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index 93006b3..75bcbd3 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -50,13 +50,11 @@
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/security/token.pb.h"
 #include "kudu/tserver/tserver_service.proxy.h"
-#include "kudu/util/array_view.h"
 #include "kudu/util/async_util.h"
 #include "kudu/util/bitmap.h"
 #include "kudu/util/hexdump.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
-#include "kudu/util/safe_math.h"
 
 using google::protobuf::FieldDescriptor;
 using google::protobuf::Reflection;
@@ -786,7 +784,6 @@ Status KuduColumnarScanBatch::Data::Reset(
   controller_.Swap(controller);
   projection_ = projection;
   client_projection_ = client_projection;
-  rewritten_varlen_columns_.clear();
 
   unique_ptr<ColumnarRowBlockPB> resp_data(response->release_columnar_data());
   if (!resp_data) {
@@ -811,48 +808,86 @@ Status KuduColumnarScanBatch::Data::CheckColumnIndex(int idx) const {
   return Status::OK();
 }
 
-Status KuduColumnarScanBatch::Data::GetDataForColumn(int idx, Slice* data) const {
+Status KuduColumnarScanBatch::Data::GetFixedLengthColumn(int idx, Slice* data) const {
   RETURN_NOT_OK(CheckColumnIndex(idx));
+  const auto& col = projection_->column(idx);
+  if (PREDICT_FALSE(col.type_info()->physical_type() == BINARY)) {
+    return Status::InvalidArgument("column is variable-length", col.ToString());
+  }
+
+  // Get the sidecar from the RPC.
+  if (PREDICT_FALSE(!resp_data_.columns(idx).has_data_sidecar())) {
+    return Status::Corruption("server did not send data for column", col.ToString());
+  }
   RETURN_NOT_OK(controller_.GetInboundSidecar(
       resp_data_.columns(idx).data_sidecar(),
       data));
 
-  // Rewrite slices to be real pointers instead of pointers relative to the
-  // indirect data buffer.
-  if (projection_->column(idx).type_info()->physical_type() == BINARY &&
-      !ContainsKey(rewritten_varlen_columns_, idx)) {
-
-    Slice indirect_data_slice;
-    RETURN_NOT_OK(controller_.GetInboundSidecar(
-        resp_data_.columns(idx).indirect_data_sidecar(),
-        &indirect_data_slice));
-
-    ArrayView<Slice> v(reinterpret_cast<Slice*>(data->mutable_data()),
-                       data->size() / sizeof(Slice));
-    for (int row_idx = 0; row_idx < v.size(); row_idx++) {
-      Slice* slice = &v[row_idx];
-      size_t offset_in_indirect = reinterpret_cast<uintptr_t>(slice->data());
-      // Ensure the updated pointer is within the bounds of the indirect data.
-      bool overflowed = false;
-      size_t max_offset = AddWithOverflowCheck(offset_in_indirect, slice->size(), &overflowed);
-      if (PREDICT_FALSE(overflowed || max_offset > indirect_data_slice.size())) {
-        const auto& col = projection_->column(idx);
-        return Status::Corruption(
-            Substitute("Row #$0 contained bad indirect slice for column $1: ($2, $3)",
-                       row_idx, col.ToString(), offset_in_indirect, slice->size()));
-      }
-      *slice = Slice(&indirect_data_slice[offset_in_indirect], slice->size());
+  size_t expected_size = resp_data_.num_rows() * col.type_info()->size();
+  if (PREDICT_FALSE(data->size() != expected_size)) {
+    return Status::Corruption(Substitute(
+        "server sent unexpected data length $0 for column $1 (expected $2)",
+        data->size(), col.ToString(), expected_size));
+  }
+  return Status::OK();
+}
+
+Status KuduColumnarScanBatch::Data::GetVariableLengthColumn(
+    int idx, Slice* offsets, Slice* data) const {
+  RETURN_NOT_OK(CheckColumnIndex(idx));
+  const auto& col = projection_->column(idx);
+  if (PREDICT_FALSE(col.type_info()->physical_type() != BINARY)) {
+    return Status::InvalidArgument("column is not variable-length", col.ToString());
+  }
+  const auto& resp_col = resp_data_.columns(idx);
+
+  // Get the offsets.
+  Slice offsets_tmp;
+  if (PREDICT_FALSE(!resp_col.has_data_sidecar())) {
+    return Status::Corruption("server did not send offset data for column", col.ToString());
+  }
+  RETURN_NOT_OK(controller_.GetInboundSidecar(
+      resp_col.data_sidecar(),
+      &offsets_tmp));
+
+  // Get the varlen data.
+  Slice data_tmp;
+  if (PREDICT_FALSE(!resp_col.has_varlen_data_sidecar())) {
+    return Status::Corruption("server did not send varlen data for column", col.ToString());
+  }
+  RETURN_NOT_OK(controller_.GetInboundSidecar(
+      resp_col.varlen_data_sidecar(),
+      &data_tmp));
+
+  // Validate the offsets.
+  auto expected_num_offsets = resp_data_.num_rows() == 0 ? 0 : (resp_data_.num_rows() + 1);
+  auto expected_size = expected_num_offsets * sizeof(uint32_t);
+  if (PREDICT_FALSE(offsets_tmp.size() != expected_size)) {
+    return Status::Corruption(Substitute("size $0 of offsets buffer for column $1 did not "
+                                         "match expected size $2",
+                                         offsets_tmp.size(), col.ToString(), expected_size));
+  }
+  for (int i = 0; i < resp_data_.num_rows(); i++) {
+    uint32_t offset = UnalignedLoad<uint32_t>(offsets_tmp.data() + i * sizeof(uint32_t));
+    if (PREDICT_FALSE(offset > data_tmp.size())) {
+      return Status::Corruption(Substitute(
+          "invalid offset $0 returned for column $1 at index $2 (max valid offset is $3)",
+          offset, col.ToString(), i, data_tmp.size()));
     }
-    rewritten_varlen_columns_.insert(idx);
   }
+
+  *offsets = offsets_tmp;
+  *data = data_tmp;
+
   return Status::OK();
 }
 
 Status KuduColumnarScanBatch::Data::GetNonNullBitmapForColumn(int idx, Slice* data) const {
   RETURN_NOT_OK(CheckColumnIndex(idx));
   const auto& col = resp_data_.columns(idx);
-  if (!col.has_non_null_bitmap_sidecar()) {
-    return Status::NotFound("column is not nullable");
+  if (PREDICT_FALSE(!col.has_non_null_bitmap_sidecar())) {
+    return Status::Corruption(Substitute("server did not send null bitmap for column $0",
+                                         projection_->column(idx).ToString()));
   }
   return controller_.GetInboundSidecar(col.non_null_bitmap_sidecar(), data);
 }
diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h
index 01da884..98e89ea 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -23,7 +23,6 @@
 #include <ostream>
 #include <set>
 #include <string>
-#include <unordered_set>
 #include <vector>
 
 #include <glog/logging.h>
@@ -378,7 +377,8 @@ class KuduColumnarScanBatch::Data : public internal::ScanBatchDataInterface {
                tserver::ScanResponsePB* response) override;
   void Clear() override;
 
-  Status GetDataForColumn(int idx, Slice* data) const;
+  Status GetFixedLengthColumn(int idx, Slice* data) const;
+  Status GetVariableLengthColumn(int idx, Slice* offsets, Slice* data) const;
   Status GetNonNullBitmapForColumn(int idx, Slice* data) const;
 
  private:
@@ -394,12 +394,6 @@ class KuduColumnarScanBatch::Data : public internal::ScanBatchDataInterface {
   // The PB which contains the "direct data" slice.
   ColumnarRowBlockPB resp_data_;
 
-  // Tracks for each variable-length (binary) column whether the pointers have been
-  // rewritten yet to be "real" pointers instead of sidecar-relative offsets.
-  // Mutable since the 'GetDataForColumn' call is semantically const, but in fact
-  // needs to modify this member to do the lazy pointer rewrites.
-  mutable std::unordered_set<int> rewritten_varlen_columns_;
-
   // The projection being scanned.
   const Schema* projection_;
   // The KuduSchema version of 'projection_'
diff --git a/src/kudu/common/columnar_serialization.cc b/src/kudu/common/columnar_serialization.cc
index 47060de..eaa8469 100644
--- a/src/kudu/common/columnar_serialization.cc
+++ b/src/kudu/common/columnar_serialization.cc
@@ -35,6 +35,7 @@
 #include "kudu/common/zp7.h"
 #include "kudu/gutil/cpu.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/fastmem.h"
 #include "kudu/util/alignment.h"
 #include "kudu/util/bitmap.h"
 #include "kudu/util/slice.h"
@@ -449,32 +450,6 @@ void CopySelectedRows(const vector<uint16_t>& sel_rows,
 }
 
 namespace {
-// For each of the Slices in 'cells_buf', copy the pointed-to data into 'indirect' and
-// modify the Slice so that its 'pointer' field is not an actual memory pointer, but
-// rather an offset within the indirect data buffer.
-void RelocateSlicesToIndirect(uint8_t* __restrict__ cells_buf, int n_rows,
-                              faststring* indirect) {
-  Slice* cell_slices = reinterpret_cast<Slice*>(cells_buf);
-  size_t total_size = 0;
-  for (int i = 0; i < n_rows; i++) {
-    total_size += cell_slices[i].size();
-  }
-
-  int old_size = indirect->size();
-  indirect->resize_with_extra_capacity(old_size + total_size);
-
-  uint8_t* dst_base = indirect->data();
-  uint8_t* dst = dst_base + old_size;
-
-  for (int i = 0; i < n_rows; i++) {
-    Slice* s = &cell_slices[i];
-    if (!s->empty()) {
-      memcpy(dst, s->data(), s->size());
-    }
-    *s = Slice(reinterpret_cast<const uint8_t*>(dst - dst_base), s->size());
-    dst += s->size();
-  }
-}
 
 // Specialized division for the known type sizes. Despite having some branching here,
 // this is faster than a 'div' instruction which has a 20+ cycle latency.
@@ -489,12 +464,12 @@ size_t div_sizeof_type(size_t s, size_t divisor) {
   }
 }
 
-
-// Copy the selected cells (and non-null-bitmap bits) from 'cblock' into 'dst' according to
-// the given 'sel_rows'.
+// Copy the selected primitive cells (and non-null-bitmap bits) from 'cblock' into 'dst'
+// according to the given 'sel_rows'.
 void CopySelectedCellsFromColumn(const ColumnBlock& cblock,
                                  const SelectedRows& sel_rows,
                                  ColumnarSerializedBatch::Column* dst) {
+  DCHECK(cblock.type_info()->physical_type() != BINARY);
   size_t sizeof_type = cblock.type_info()->size();
   int n_sel = sel_rows.num_selected();
 
@@ -523,11 +498,89 @@ void CopySelectedCellsFromColumn(const ColumnBlock& cblock,
     ZeroNullValues(sizeof_type, initial_rows, n_sel,
         dst->data.data(), dst->non_null_bitmap->data());
   }
+}
+
+
+// For each of the Slices in 'cells_buf', copy the pointed-to data into 'varlen' and
+// write the _end_ offset of the copied data into 'offsets_out'. This assumes (and
+// DCHECKs) that the _start_ offset of each cell was already previously written by a
+// previous invocation of this function.
+void CopySlicesAndWriteEndOffsets(const Slice* __restrict__ cells_buf,
+                                  const SelectedRows& sel_rows,
+                                  uint32_t* __restrict__ offsets_out,
+                                  faststring* varlen) {
+  const Slice* cell_slices = reinterpret_cast<const Slice*>(cells_buf);
+  size_t total_added_size = 0;
+  sel_rows.ForEachIndex(
+      [&](uint16_t i) {
+        total_added_size += cell_slices[i].size();
+      });
+
+  // The output array should already have an entry for the start offset
+  // of our first cell.
+  DCHECK_EQ(offsets_out[-1], varlen->size());
+
+  int old_size = varlen->size();
+  varlen->resize_with_extra_capacity(old_size + total_added_size);
+
+  uint8_t* dst_base = varlen->data();
+  uint8_t* dst = dst_base + old_size;
+
+  sel_rows.ForEachIndex(
+      [&](uint16_t i) {
+        const Slice* s = &cell_slices[i];
+        if (!s->empty()) {
+          strings::memcpy_inlined(dst, s->data(), s->size());
+        }
+        dst += s->size();
+        *offsets_out++ = dst - dst_base;
+      });
+}
+
+// Copy variable-length cells into 'dst' using an Arrow-style serialization:
+// a list of offsets in the 'data' array and the data itself in the 'varlen_data'
+// array.
+//
+// The offset array has a length one greater than the number of cells.
+void CopySelectedVarlenCellsFromColumn(const ColumnBlock& cblock,
+                                       const SelectedRows& sel_rows,
+                                       ColumnarSerializedBatch::Column* dst) {
+  using offset_type = uint32_t;
+  DCHECK(cblock.type_info()->physical_type() == BINARY);
+  int n_sel = sel_rows.num_selected();
+  DCHECK_GT(n_sel, 0);
+
+  // If this is the first call, append a '0' entry for the offset of the first string.
+  if (dst->data.size() == 0) {
+    CHECK_EQ(dst->varlen_data->size(), 0);
+    offset_type zero_offset = 0;
+    dst->data.append(&zero_offset, sizeof(zero_offset));
+  }
 
-  if (cblock.type_info()->physical_type() == BINARY) {
-    RelocateSlicesToIndirect(dst_buf, n_sel, boost::get_pointer(dst->indirect_data));
+  // Number of initial rows in the dst values and null_bitmap.
+  DCHECK_EQ(dst->data.size() % sizeof(offset_type), 0);
+  size_t initial_offset_count = div_sizeof_type(dst->data.size(), sizeof(offset_type));
+  size_t initial_rows = initial_offset_count - 1;
+  size_t new_offset_count = initial_offset_count + n_sel;
+  size_t new_num_rows = initial_rows + n_sel;
+
+  if (cblock.is_nullable()) {
+    DCHECK_EQ(dst->non_null_bitmap->size(), BitmapSize(initial_rows));
+    dst->non_null_bitmap->resize_with_extra_capacity(BitmapSize(new_num_rows));
+    CopyNonNullBitmap(cblock.non_null_bitmap(),
+                      sel_rows.bitmap(),
+                      initial_rows, cblock.nrows(),
+                      dst->non_null_bitmap->data());
+    ZeroNullValues(sizeof(Slice), 0, cblock.nrows(),
+                   const_cast<ColumnBlock&>(cblock).data(), cblock.non_null_bitmap());
   }
+  dst->data.resize_with_extra_capacity(sizeof(offset_type) * new_offset_count);
+  offset_type* dst_offset = reinterpret_cast<offset_type*>(dst->data.data()) + initial_offset_count;
+  const Slice* src_slices = reinterpret_cast<const Slice*>(cblock.cell_ptr(0));
+  CopySlicesAndWriteEndOffsets(src_slices, sel_rows,
+                               dst_offset, boost::get_pointer(dst->varlen_data));
 }
+
 } // anonymous namespace
 } // namespace internal
 
@@ -552,7 +605,7 @@ int SerializeRowBlockColumnar(
       out->columns.emplace_back();
       out->columns.back().data.reserve(1024 * 1024);
       if (col.type_info()->physical_type() == BINARY) {
-        out->columns.back().indirect_data.emplace();
+        out->columns.back().varlen_data.emplace();
       }
       if (col.is_nullable()) {
         out->columns.back().non_null_bitmap.emplace();
@@ -561,16 +614,27 @@ int SerializeRowBlockColumnar(
   }
 
   SelectedRows sel = block.selection_vector()->GetSelectedRows();
+  if (sel.num_selected() == 0) {
+    return 0;
+  }
+
   int col_idx = 0;
   for (const auto& col : projection_schema->columns()) {
     int t_schema_idx = tablet_schema->find_column(col.name());
     CHECK_NE(t_schema_idx, -1);
     const ColumnBlock& column_block = block.column_block(t_schema_idx);
 
-    internal::CopySelectedCellsFromColumn(
-        column_block,
-        sel,
-        &out->columns[col_idx]);
+    if (column_block.type_info()->physical_type() == BINARY) {
+      internal::CopySelectedVarlenCellsFromColumn(
+          column_block,
+          sel,
+          &out->columns[col_idx]);
+    } else {
+      internal::CopySelectedCellsFromColumn(
+          column_block,
+          sel,
+          &out->columns[col_idx]);
+    }
     col_idx++;
   }
 
diff --git a/src/kudu/common/columnar_serialization.h b/src/kudu/common/columnar_serialization.h
index d7ad494..bc74f08 100644
--- a/src/kudu/common/columnar_serialization.h
+++ b/src/kudu/common/columnar_serialization.h
@@ -35,8 +35,8 @@ struct ColumnarSerializedBatch {
     // Underlying column data.
     faststring data;
 
-    // Data for varlen columns (BINARY)
-    boost::optional<faststring> indirect_data;
+    // Data for varlen columns (those with BINARY physical type)
+    boost::optional<faststring> varlen_data;
 
     // Each bit is set when a value is non-null
     boost::optional<faststring> non_null_bitmap;
diff --git a/src/kudu/common/rowblock.h b/src/kudu/common/rowblock.h
index 6e86e5f..0db69d2 100644
--- a/src/kudu/common/rowblock.h
+++ b/src/kudu/common/rowblock.h
@@ -258,6 +258,21 @@ class SelectedRows {
     return CreateRowIndexes();
   }
 
+  // Call F(index) for each selected index.
+  template<class F>
+  void ForEachIndex(F func) const {
+    if (all_selected_) {
+      int n_sel = num_selected();
+      for (int i = 0; i < n_sel; i++) {
+        func(i);
+      }
+    } else {
+      for (uint16_t i : indexes_) {
+        func(i);
+      }
+    }
+  }
+
  private:
   friend class SelectionVector;
 
diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc
index ae18f1e..4a1c791 100644
--- a/src/kudu/common/wire_protocol-test.cc
+++ b/src/kudu/common/wire_protocol-test.cc
@@ -335,19 +335,21 @@ TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) {
           }
         }
         int type_size = col.type_info()->size();
-        Slice serialized_val(serialized_col.data.data() + type_size * dst_row_idx,
-                             type_size);
-        Slice orig_val(row.cell_ptr(c), type_size);
-
+        Slice serialized_val;
+        Slice orig_val;
         if (col.type_info()->physical_type() == BINARY) {
-          orig_val = *reinterpret_cast<const Slice*>(orig_val.data());
-          serialized_val = *reinterpret_cast<const Slice*>(serialized_val.data());
-
-          uintptr_t indirect_offset = reinterpret_cast<uintptr_t>(serialized_val.data());
-          serialized_val = Slice(serialized_col.indirect_data->data() + indirect_offset,
-                                 serialized_val.size());
+          const uint8_t* offset_ptr = serialized_col.data.data() + sizeof(uint32_t) * dst_row_idx;
+          uint32_t start_offset = UnalignedLoad<uint32_t>(offset_ptr);
+          uint32_t end_offset = UnalignedLoad<uint32_t>(offset_ptr + sizeof(uint32_t));
+          ASSERT_GE(end_offset, start_offset);
+          serialized_val = Slice(serialized_col.varlen_data->data() + start_offset,
+                                 end_offset - start_offset);
+          memcpy(&orig_val, row.cell_ptr(c), type_size);
+        } else {
+          serialized_val = Slice(serialized_col.data.data() + type_size * dst_row_idx,
+                                 type_size);
+          orig_val = Slice(row.cell_ptr(c), type_size);
         }
-
         EXPECT_EQ(orig_val, serialized_val);
       }
       dst_row_idx++;
diff --git a/src/kudu/common/wire_protocol.proto b/src/kudu/common/wire_protocol.proto
index 5ddba152..3b3bec2 100644
--- a/src/kudu/common/wire_protocol.proto
+++ b/src/kudu/common/wire_protocol.proto
@@ -158,9 +158,9 @@ message ColumnarRowBlockPB {
     optional int32 data_sidecar = 1;
 
     // The index of the sidecar containing any data referred to by binary/string data.
-    // In this case, the 'pointer' field of the Slices contained in the data sidecar
-    // will refer to offsets in this sidecar.
-    optional int32 indirect_data_sidecar = 2;
+    // In this case, the `data` sidecar will contain an array of `num_rows + 1` uint32s
+    // pointing to offsets in this sidecar.
+    optional int32 varlen_data_sidecar = 2;
 
     // If the column is nullable, The index of the sidecar containing a bitmap with a set
     // bit for all non-null cells.
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index c47a5e9..fab5ad4 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -2591,17 +2591,17 @@ TEST_F(TabletServerTest, TestColumnarScan) {
     Slice col_data;
     ASSERT_OK(rpc.GetInboundSidecar(resp.columnar_data().columns(2).data_sidecar(), &col_data));
 
-    Slice indirect_data;
-    ASSERT_OK(rpc.GetInboundSidecar(resp.columnar_data().columns(2).indirect_data_sidecar(),
-                                    &indirect_data));
+    Slice varlen_data;
+    ASSERT_OK(rpc.GetInboundSidecar(resp.columnar_data().columns(2).varlen_data_sidecar(),
+                                    &varlen_data));
 
     SCOPED_TRACE(col_data.ToDebugString());
-    ASSERT_EQ(col_data.size(), kNumRows * sizeof(Slice));
-    ArrayView<const Slice> cells(reinterpret_cast<const Slice*>(col_data.data()), kNumRows);
+    ASSERT_EQ(col_data.size(), (kNumRows + 1) * sizeof(uint32_t));
+    ArrayView<const uint32_t> offsets(reinterpret_cast<const uint32_t*>(col_data.data()),
+                                      kNumRows + 1);
     for (int i = 0; i < kNumRows; i++) {
-      Slice s = cells[i];
-      Slice real_str(indirect_data.data() + reinterpret_cast<uintptr_t>(s.data()),
-                     s.size());
+      Slice real_str(varlen_data.data() + offsets[i],
+                     offsets[i + 1] - offsets[i]);
       ASSERT_EQ(Substitute("hello $0", i), real_str);
     }
   }
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index c72d223..1bb93ff 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -817,8 +817,8 @@ class ColumnarResultSerializer : public ResultSerializer {
     int total = 0;
     for (const auto& col : results_.columns) {
       total += col.data.size();
-      if (col.indirect_data) {
-        total += col.indirect_data->size();
+      if (col.varlen_data) {
+        total += col.varlen_data->size();
       }
       if (col.non_null_bitmap) {
         total += col.non_null_bitmap->size();
@@ -838,10 +838,10 @@ class ColumnarResultSerializer : public ResultSerializer {
           RpcSidecar::FromFaststring((std::move(col.data))), &sidecar_idx));
       col_pb->set_data_sidecar(sidecar_idx);
 
-      if (col.indirect_data) {
+      if (col.varlen_data) {
         CHECK_OK(context->AddOutboundSidecar(
-            RpcSidecar::FromFaststring((std::move(*col.indirect_data))), &sidecar_idx));
-        col_pb->set_indirect_data_sidecar(sidecar_idx);
+            RpcSidecar::FromFaststring((std::move(*col.varlen_data))), &sidecar_idx));
+        col_pb->set_varlen_data_sidecar(sidecar_idx);
       }
 
       if (col.non_null_bitmap) {