You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2020/04/01 22:24:19 UTC

[kudu] 02/02: client: add support for columnar format scan

This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit fa2d4c756646b3f8428d2931113e8dfc3bc5af79
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Thu Mar 26 22:28:06 2020 -0700

    client: add support for columnar format scan
    
    Change-Id: I69ec4487c78b872e7b9eba5facdd44cfd6b8f4fa
    Reviewed-on: http://gerrit.cloudera.org:8080/15622
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/client/CMakeLists.txt               |   2 +
 src/kudu/client/client-test.cc               |  45 ++++++++++
 src/kudu/client/client.cc                    |  41 +++++----
 src/kudu/client/client.h                     |  29 +++++++
 src/kudu/client/columnar_scan_batch.cc       |  55 ++++++++++++
 src/kudu/client/columnar_scan_batch.h        |  90 ++++++++++++++++++++
 src/kudu/client/scanner-internal.cc          | 121 +++++++++++++++++++++++++--
 src/kudu/client/scanner-internal.h           |  59 ++++++++++++-
 src/kudu/tools/tool_action_remote_replica.cc |   2 +-
 9 files changed, 421 insertions(+), 23 deletions(-)

diff --git a/src/kudu/client/CMakeLists.txt b/src/kudu/client/CMakeLists.txt
index ffd536a..f456757 100644
--- a/src/kudu/client/CMakeLists.txt
+++ b/src/kudu/client/CMakeLists.txt
@@ -34,6 +34,7 @@ set(CLIENT_SRCS
   client.cc
   client_builder-internal.cc
   client-internal.cc
+  columnar_scan_batch.cc
   error_collector.cc
   error-internal.cc
   hash.cc
@@ -177,6 +178,7 @@ install(TARGETS kudu_client_exported
 install(FILES
   callbacks.h
   client.h
+  columnar_scan_batch.h
   hash.h
   row_result.h
   scan_batch.h
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index f8bca5d..668c9d3 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -51,6 +51,7 @@
 #include "kudu/client/client-internal.h"
 #include "kudu/client/client-test-util.h"
 #include "kudu/client/client.pb.h"
+#include "kudu/client/columnar_scan_batch.h"
 #include "kudu/client/error_collector.h"
 #include "kudu/client/meta_cache.h"
 #include "kudu/client/resource_metrics.h"
@@ -104,6 +105,7 @@
 #include "kudu/tserver/tablet_server_options.h"
 #include "kudu/tserver/ts_tablet_manager.h"
 #include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/array_view.h"
 #include "kudu/util/async_util.h"
 #include "kudu/util/barrier.h"
 #include "kudu/util/countdown_latch.h"
@@ -1051,6 +1053,49 @@ TEST_F(ClientTest, TestScanAtFutureTimestamp) {
   ASSERT_STR_CONTAINS(s.ToString(), "in the future.");
 }
 
+TEST_F(ClientTest, TestColumnarScan) {
+  // Set the batch size such that a full scan could yield either multi-batch
+  // or single-batch scans.
+  int64_t num_rows = rand() % 2000;
+  int64_t batch_size = rand() % 1000;
+  FLAGS_scanner_batch_size_rows = batch_size;
+
+  NO_FATALS(InsertTestRows(client_table_.get(), num_rows));
+  KuduScanner scanner(client_table_.get());
+  ASSERT_OK(scanner.SetRowFormatFlags(KuduScanner::COLUMNAR_LAYOUT));
+
+  ASSERT_OK(scanner.Open());
+  KuduColumnarScanBatch batch;
+  int total_rows = 0;
+  while (scanner.HasMoreRows()) {
+    ASSERT_OK(scanner.NextBatch(&batch));
+
+    // Verify the data.
+    Slice col_data[4];
+    for (int i = 0; i < 4; i++) {
+      ASSERT_OK(batch.GetDataForColumn(i, &col_data[i]));
+    }
+    ArrayView<const int32_t> c0(reinterpret_cast<const int32_t*>(col_data[0].data()),
+                                batch.NumRows());
+    ArrayView<const int32_t> c1(reinterpret_cast<const int32_t*>(col_data[1].data()),
+                                batch.NumRows());
+    ArrayView<const Slice> c2(reinterpret_cast<const Slice*>(col_data[2].data()),
+                              batch.NumRows());
+    ArrayView<const int32_t> c3(reinterpret_cast<const int32_t*>(col_data[3].data()),
+                                batch.NumRows());
+
+    for (int i = 0; i < batch.NumRows(); i++) {
+      int row_idx = total_rows + i;
+      EXPECT_EQ(row_idx, c0[i]);
+      EXPECT_EQ(row_idx * 2, c1[i]);
+      EXPECT_EQ(Substitute("hello $0", row_idx), c2[i]);
+      EXPECT_EQ(row_idx * 3, c3[i]);
+    }
+    total_rows += batch.NumRows();
+  }
+  ASSERT_EQ(num_rows, total_rows);
+}
+
 const KuduScanner::ReadMode read_modes[] = {
     KuduScanner::READ_LATEST,
     KuduScanner::READ_AT_SNAPSHOT,
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 571844a..968537b 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -37,6 +37,7 @@
 #include "kudu/client/client-internal.h"
 #include "kudu/client/client.pb.h"
 #include "kudu/client/client_builder-internal.h"
+#include "kudu/client/columnar_scan_batch.h"
 #include "kudu/client/error-internal.h"
 #include "kudu/client/error_collector.h"
 #include "kudu/client/master_proxy_rpc.h"
@@ -1569,6 +1570,7 @@ Status KuduScanner::SetRowFormatFlags(uint64_t flags) {
   switch (flags) {
     case NO_FLAGS:
     case PAD_UNIXTIME_MICROS_TO_16_BYTES:
+    case COLUMNAR_LAYOUT:
       break;
     default:
       return Status::InvalidArgument(Substitute("Invalid row format flags: $0", flags));
@@ -1711,6 +1713,15 @@ Status KuduScanner::NextBatch(vector<KuduRowResult>* rows) {
 }
 
 Status KuduScanner::NextBatch(KuduScanBatch* batch) {
+  return NextBatch(batch->data_);
+}
+
+Status KuduScanner::NextBatch(KuduColumnarScanBatch* batch) {
+  return NextBatch(batch->data_);
+}
+
+Status KuduScanner::NextBatch(internal::ScanBatchDataInterface* batch_data) {
+
   // TODO: do some double-buffering here -- when we return this batch
   // we should already have fired off the RPC for the next batch, but
   // need to do some swapping of the response objects around to avoid
@@ -1718,7 +1729,7 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
   CHECK(data_->open_);
   CHECK(data_->proxy_);
 
-  batch->data_->Clear();
+  batch_data->Clear();
 
   if (data_->short_circuit_) {
     return Status::OK();
@@ -1728,12 +1739,11 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
     // We have data from a previous scan.
     VLOG(2) << "Extracting data from " << data_->DebugString();
     data_->data_in_open_ = false;
-    return batch->data_->Reset(&data_->controller_,
-                               data_->configuration().projection(),
-                               data_->configuration().client_projection(),
-                               data_->configuration().row_format_flags(),
-                               unique_ptr<RowwiseRowBlockPB>(
-                                   data_->last_response_.release_data()));
+    return batch_data->Reset(&data_->controller_,
+                             data_->configuration().projection(),
+                             data_->configuration().client_projection(),
+                             data_->configuration().row_format_flags(),
+                             &data_->last_response_);
   }
 
   if (data_->last_response_.has_more_results()) {
@@ -1753,12 +1763,11 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
           data_->last_primary_key_ = data_->last_response_.last_primary_key();
         }
         data_->scan_attempts_ = 0;
-        return batch->data_->Reset(&data_->controller_,
-                                   data_->configuration().projection(),
-                                   data_->configuration().client_projection(),
-                                   data_->configuration().row_format_flags(),
-                                   unique_ptr<RowwiseRowBlockPB>(
-                                       data_->last_response_.release_data()));
+        return batch_data->Reset(&data_->controller_,
+                                 data_->configuration().projection(),
+                                 data_->configuration().client_projection(),
+                                 data_->configuration().row_format_flags(),
+                                 &data_->last_response_);
       }
 
       data_->scan_attempts_++;
@@ -1797,7 +1806,11 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
     set<string> blacklist;
 
     RETURN_NOT_OK(data_->OpenNextTablet(deadline, &blacklist));
-    // No rows written, the next invocation will pick them up.
+    if (data_->data_in_open_) {
+      // Avoid returning an empty batch in between tablets if we have data
+      // we can return from this call.
+      return NextBatch(batch_data);
+    }
     return Status::OK();
   } else {
     // No more data anywhere.
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 5c59ac7..f8d16d8 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -73,6 +73,7 @@ class RemoteKsckCluster;
 
 namespace client {
 
+class KuduColumnarScanBatch;
 class KuduDelete;
 class KuduInsert;
 class KuduInsertIgnore;
@@ -102,6 +103,7 @@ class RemoteTablet;
 class RemoteTabletServer;
 class ReplicaController;
 class RetrieveAuthzTokenRpc;
+class ScanBatchDataInterface;
 class WriteRpc;
 template <class ReqClass, class RespClass>
 class AsyncLeaderMasterRpc; // IWYU pragma: keep
@@ -2255,6 +2257,9 @@ class KUDU_EXPORT KuduScanner {
 
   /// Fetch the next batch of results for this scanner.
   ///
+  /// This variant may not be used when the scan is configured with the
+  /// COLUMNAR_LAYOUT RowFormatFlag.
+  ///
   /// A single KuduScanBatch object may be reused. Each subsequent call
   /// replaces the data from the previous call, and invalidates any
   /// KuduScanBatch::RowPtr objects previously obtained from the batch.
@@ -2263,6 +2268,19 @@ class KUDU_EXPORT KuduScanner {
   /// @return Operation result status.
   Status NextBatch(KuduScanBatch* batch);
 
+  /// Fetch the next batch of columnar results for this scanner.
+  ///
+  /// This variant may only be used when the scan is configured with the
+  /// COLUMNAR_LAYOUT RowFormatFlag.
+  ///
+  /// A single KuduColumnarScanBatch object may be reused. Each subsequent call
+  /// replaces the data from the previous call, and invalidates any
+  /// Slice objects previously obtained from the batch.
+  /// @param [out] batch
+  ///   Placeholder for the result.
+  /// @return Operation result status.
+  Status NextBatch(KuduColumnarScanBatch* batch);
+
   /// Get the KuduTabletServer that is currently handling the scan.
   ///
   /// More concretely, this is the server that handled the most recent
@@ -2389,6 +2407,15 @@ class KUDU_EXPORT KuduScanner {
   ///   results and might even cause the client to crash.
   static const uint64_t PAD_UNIXTIME_MICROS_TO_16_BYTES = 1 << 0;
 
+  /// Enable column-oriented data transfer. The server will transfer data to the client
+  /// in a columnar format rather than a row-wise format. The KuduColumnarScanBatch API
+  /// must be used to fetch results from this scan.
+  ///
+  /// NOTE: older versions of the Kudu server do not support this feature. Clients
+  /// aiming to support compatibility with previous versions should have a fallback
+  /// code path.
+  static const uint64_t COLUMNAR_LAYOUT = 1 << 1;
+
   /// Optionally set row format modifier flags.
   ///
   /// If flags is RowFormatFlags::NO_FLAGS, then no modifications will be made to the row
@@ -2436,6 +2463,8 @@ class KUDU_EXPORT KuduScanner {
  private:
   class KUDU_NO_EXPORT Data;
 
+  Status NextBatch(internal::ScanBatchDataInterface* batch);
+
   friend class KuduScanToken;
   FRIEND_TEST(ClientTest, TestBlockScannerHijackingAttempts);
   FRIEND_TEST(ClientTest, TestScanCloseProxy);
diff --git a/src/kudu/client/columnar_scan_batch.cc b/src/kudu/client/columnar_scan_batch.cc
new file mode 100644
index 0000000..b346445
--- /dev/null
+++ b/src/kudu/client/columnar_scan_batch.cc
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/client/columnar_scan_batch.h"
+
+#include "kudu/client/scanner-internal.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/rpc/rpc_controller.h"
+
+namespace kudu {
+class Slice;
+
+namespace client {
+
+KuduColumnarScanBatch::KuduColumnarScanBatch()
+    : data_(new KuduColumnarScanBatch::Data()) {
+}
+
+KuduColumnarScanBatch::~KuduColumnarScanBatch() {
+  delete data_;
+}
+
+int KuduColumnarScanBatch::NumRows() const {
+  return data_->resp_data_.num_rows();
+}
+
+Status KuduColumnarScanBatch::GetDataForColumn(int idx, Slice* data) const {
+  return data_->GetDataForColumn(idx, data);
+}
+
+Status KuduColumnarScanBatch::GetNonNullBitmapForColumn(int idx, Slice* data) const {
+  RETURN_NOT_OK(data_->CheckColumnIndex(idx));
+  const auto& col = data_->resp_data_.columns(idx);
+  if (!col.has_non_null_bitmap_sidecar()) {
+    return Status::NotFound("column is not nullable");
+  }
+  return data_->controller_.GetInboundSidecar(col.non_null_bitmap_sidecar(), data);
+}
+
+} // namespace client
+} // namespace kudu
diff --git a/src/kudu/client/columnar_scan_batch.h b/src/kudu/client/columnar_scan_batch.h
new file mode 100644
index 0000000..23a3e7d
--- /dev/null
+++ b/src/kudu/client/columnar_scan_batch.h
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_CLIENT_COLUMNAR_SCAN_BATCH_H
+#define KUDU_CLIENT_COLUMNAR_SCAN_BATCH_H
+
+#ifdef KUDU_HEADERS_NO_STUBS
+#include "kudu/gutil/macros.h"
+#else
+#include "kudu/client/stubs.h"
+#endif
+
+#include "kudu/util/kudu_export.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+class Slice;
+
+namespace client {
+
+/// @brief A batch of columnar data returned from a scanner
+///
+/// Similar to KuduScanBatch, this contains a batch of rows returned from a scanner.
+/// This type of batch is used if the KuduScanner::COLUMNAR_LAYOUT row format flag
+/// is enabled.
+///
+/// Retrieving rows in columnar layout can be significantly more efficient. It saves
+/// some CPU cycles on the Kudu cluster and can also enable faster processing of the
+/// returned data in certain client applications.
+///
+/// NOTE: this class is not thread-safe.
+class KUDU_EXPORT KuduColumnarScanBatch {
+ public:
+  KuduColumnarScanBatch();
+  ~KuduColumnarScanBatch();
+
+  /// @return The number of rows in this batch.
+  int NumRows() const;
+
+  /// Get the raw columnar data corresponding to the column with index 'idx'.
+  ///
+  /// The data is in little-endian packed array format. No alignment is guaranteed.
+  /// Space is reserved for all cells regardless of whether they might be null.
+  /// The data stored in a null cell may or may not be zeroed.
+  ///
+  /// If this returns an error for a given column, then a second call for the same
+  /// column has undefined results.
+  ///
+  /// @note The Slice returned is only valid for the lifetime of the KuduColumnarScanBatch.
+  Status GetDataForColumn(int idx, Slice* data) const;
+
+  /// Get a bitmap corresponding to the non-null status of the cells in the given column.
+  ///
+  /// A set bit indicates a non-null cell.
+  /// If the number of rows is not a multiple of 8, the state of the trailing bits in the
+  /// bitmap is undefined.
+  ///
+  /// It is an error to call this function on a column which is not marked as nullable
+  /// in the schema.
+  ///
+  /// @note The Slice returned is only valid for the lifetime of the KuduColumnarScanBatch.
+  Status GetNonNullBitmapForColumn(int idx, Slice* data) const;
+
+ private:
+  class KUDU_NO_EXPORT Data;
+
+  friend class KuduScanner;
+
+  Data* data_;
+  DISALLOW_COPY_AND_ASSIGN(KuduColumnarScanBatch);
+};
+
+
+} // namespace client
+} // namespace kudu
+
+#endif
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index 9cb14aa..93006b3 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -38,6 +38,7 @@
 #include "kudu/common/partition.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
+#include "kudu/common/types.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
@@ -49,11 +50,13 @@
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/security/token.pb.h"
 #include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/util/array_view.h"
 #include "kudu/util/async_util.h"
 #include "kudu/util/bitmap.h"
 #include "kudu/util/hexdump.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/safe_math.h"
 
 using google::protobuf::FieldDescriptor;
 using google::protobuf::Reflection;
@@ -70,6 +73,8 @@ using rpc::RpcController;
 using security::SignedTokenPB;
 using strings::Substitute;
 using tserver::NewScanRequestPB;
+using tserver::RowFormatFlags;
+using tserver::ScanResponsePB;
 using tserver::TabletServerFeatures;
 
 namespace client {
@@ -367,6 +372,10 @@ ScanRpcStatus KuduScanner::Data::SendScanRpc(const MonoTime& overall_deadline,
   if (configuration().row_format_flags() & KuduScanner::PAD_UNIXTIME_MICROS_TO_16_BYTES) {
     controller_.RequireServerFeature(TabletServerFeatures::PAD_UNIXTIME_MICROS_TO_16_BYTES);
   }
+  if (configuration().row_format_flags() & KuduScanner::COLUMNAR_LAYOUT) {
+    controller_.RequireServerFeature(TabletServerFeatures::COLUMNAR_LAYOUT_FEATURE);
+  }
+
   if (next_req_.has_new_scan_request()) {
     // Only new scan requests require authz tokens. Scan continuations rely on
     // Kudu's prevention of scanner hijacking by different users.
@@ -386,7 +395,9 @@ ScanRpcStatus KuduScanner::Data::SendScanRpc(const MonoTime& overall_deadline,
       rpc_deadline, overall_deadline);
   if (scan_status.result == ScanRpcStatus::OK) {
     UpdateResourceMetrics();
-    num_rows_returned_ += last_response_.data().num_rows();
+    num_rows_returned_ += last_response_.has_data() ? last_response_.data().num_rows() : 0;
+    num_rows_returned_ += last_response_.has_columnar_data() ?
+        last_response_.columnar_data().num_rows() : 0;
   }
   return scan_status;
 }
@@ -561,13 +572,15 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
   partition_pruner_.RemovePartitionKeyRange(remote_->partition().partition_key_end());
 
   next_req_.clear_new_scan_request();
-  data_in_open_ = last_response_.has_data() && last_response_.data().num_rows() > 0;
+  data_in_open_ = (last_response_.has_data() && last_response_.data().num_rows() > 0) ||
+      (last_response_.has_columnar_data() && last_response_.columnar_data().num_rows() > 0);
   if (last_response_.has_more_results()) {
     next_req_.set_scanner_id(last_response_.scanner_id());
     VLOG(2) << "Opened tablet " << remote_->tablet_id()
             << ", scanner ID " << last_response_.scanner_id();
-  } else if (last_response_.has_data()) {
-    VLOG(2) << "Opened tablet " << remote_->tablet_id() << ", no scanner ID assigned";
+  } else if (last_response_.has_data() || last_response_.has_columnar_data()) {
+    VLOG(2) << "Opened tablet " << remote_->tablet_id() << ", no scanner ID assigned, "
+            << " data_in_open=" << data_in_open_;
   } else {
     VLOG(2) << "Opened tablet " << remote_->tablet_id() << " (no rows), no scanner ID assigned";
   }
@@ -670,13 +683,19 @@ Status KuduScanBatch::Data::Reset(RpcController* controller,
                                   const Schema* projection,
                                   const KuduSchema* client_projection,
                                   uint64_t row_format_flags,
-                                  unique_ptr<RowwiseRowBlockPB> resp_data) {
+                                  ScanResponsePB* response) {
   CHECK(controller->finished());
+  if (row_format_flags & RowFormatFlags::COLUMNAR_LAYOUT) {
+    return Status::InvalidArgument("columnar layout specified, must use KuduColumnarScanBatch");
+  }
+
   controller_.Swap(controller);
   projection_ = projection;
   projected_row_size_ = CalculateProjectedRowSize(*projection_);
   client_projection_ = client_projection;
   row_format_flags_ = row_format_flags;
+  unique_ptr<RowwiseRowBlockPB> resp_data(response->release_data());
+
   if (!resp_data) {
     // No new data; just clear out the old stuff.
     resp_data_.Clear();
@@ -748,5 +767,97 @@ void KuduScanBatch::Data::Clear() {
   controller_.Reset();
 }
 
+////////////////////////////////////////////////////////////
+// KuduColumnarScanBatch
+////////////////////////////////////////////////////////////
+
+Status KuduColumnarScanBatch::Data::Reset(
+    rpc::RpcController* controller,
+    const Schema* projection,
+    const KuduSchema* client_projection,
+    uint64_t row_format_flags,
+    tserver::ScanResponsePB* response) {
+  if (!(row_format_flags & RowFormatFlags::COLUMNAR_LAYOUT)) {
+    return Status::InvalidArgument("rowwise layout specified, must use KuduScanBatch");
+  }
+  CHECK(!response->has_data()) << "expected columnar data";
+
+  CHECK(controller->finished());
+  controller_.Swap(controller);
+  projection_ = projection;
+  client_projection_ = client_projection;
+  rewritten_varlen_columns_.clear();
+
+  unique_ptr<ColumnarRowBlockPB> resp_data(response->release_columnar_data());
+  if (!resp_data) {
+    // No new data; just clear out the old stuff.
+    resp_data_.Clear();
+    return Status::OK();
+  }
+  resp_data_ = std::move(*resp_data);
+  return Status::OK();
+}
+
+void KuduColumnarScanBatch::Data::Clear() {
+  resp_data_.Clear();
+  controller_.Reset();
+}
+
+Status KuduColumnarScanBatch::Data::CheckColumnIndex(int idx) const {
+  if (idx >= resp_data_.columns_size()) {
+    return Status::InvalidArgument(Substitute("bad column index $0 ($1 columns present)",
+                                              idx, resp_data_.columns_size()));
+  }
+  return Status::OK();
+}
+
+Status KuduColumnarScanBatch::Data::GetDataForColumn(int idx, Slice* data) const {
+  RETURN_NOT_OK(CheckColumnIndex(idx));
+  RETURN_NOT_OK(controller_.GetInboundSidecar(
+      resp_data_.columns(idx).data_sidecar(),
+      data));
+
+  // Rewrite slices to be real pointers instead of pointers relative to the
+  // indirect data buffer.
+  if (projection_->column(idx).type_info()->physical_type() == BINARY &&
+      !ContainsKey(rewritten_varlen_columns_, idx)) {
+
+    Slice indirect_data_slice;
+    RETURN_NOT_OK(controller_.GetInboundSidecar(
+        resp_data_.columns(idx).indirect_data_sidecar(),
+        &indirect_data_slice));
+
+    ArrayView<Slice> v(reinterpret_cast<Slice*>(data->mutable_data()),
+                       data->size() / sizeof(Slice));
+    for (int row_idx = 0; row_idx < v.size(); row_idx++) {
+      Slice* slice = &v[row_idx];
+      size_t offset_in_indirect = reinterpret_cast<uintptr_t>(slice->data());
+      // Ensure the updated pointer is within the bounds of the indirect data.
+      bool overflowed = false;
+      size_t max_offset = AddWithOverflowCheck(offset_in_indirect, slice->size(), &overflowed);
+      if (PREDICT_FALSE(overflowed || max_offset > indirect_data_slice.size())) {
+        const auto& col = projection_->column(idx);
+        return Status::Corruption(
+            Substitute("Row #$0 contained bad indirect slice for column $1: ($2, $3)",
+                       row_idx, col.ToString(), offset_in_indirect, slice->size()));
+      }
+      *slice = Slice(&indirect_data_slice[offset_in_indirect], slice->size());
+    }
+    rewritten_varlen_columns_.insert(idx);
+  }
+  return Status::OK();
+}
+
+Status KuduColumnarScanBatch::Data::GetNonNullBitmapForColumn(int idx, Slice* data) const {
+  RETURN_NOT_OK(CheckColumnIndex(idx));
+  const auto& col = resp_data_.columns(idx);
+  if (!col.has_non_null_bitmap_sidecar()) {
+    return Status::NotFound("column is not nullable");
+  }
+  return controller_.GetInboundSidecar(col.non_null_bitmap_sidecar(), data);
+}
+
+
+
 } // namespace client
 } // namespace kudu
diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h
index f0d07a0..01da884 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -23,11 +23,13 @@
 #include <ostream>
 #include <set>
 #include <string>
+#include <unordered_set>
 #include <vector>
 
 #include <glog/logging.h>
 
 #include "kudu/client/client.h"
+#include "kudu/client/columnar_scan_batch.h"
 #include "kudu/client/resource_metrics.h"
 #include "kudu/client/row_result.h"
 #include "kudu/client/scan_batch.h"
@@ -294,7 +296,20 @@ class KuduScanner::Data {
   DISALLOW_COPY_AND_ASSIGN(Data);
 };
 
-class KuduScanBatch::Data {
+namespace internal {
+class ScanBatchDataInterface {
+ public:
+  virtual ~ScanBatchDataInterface() = default;
+  virtual Status Reset(rpc::RpcController* controller,
+                       const Schema* projection,
+                       const KuduSchema* client_projection,
+                       uint64_t row_format_flags,
+                       tserver::ScanResponsePB* response) = 0;
+  virtual void Clear() = 0;
+};
+} // namespace internal
+
+class KuduScanBatch::Data : public internal::ScanBatchDataInterface {
  public:
   Data();
   ~Data();
@@ -303,7 +318,7 @@ class KuduScanBatch::Data {
                const Schema* projection,
                const KuduSchema* client_projection,
                uint64_t row_format_flags,
-               std::unique_ptr<RowwiseRowBlockPB> resp_data);
+               tserver::ScanResponsePB* response) override;
 
   int num_rows() const {
     return resp_data_.num_rows();
@@ -324,7 +339,7 @@ class KuduScanBatch::Data {
 
   void ExtractRows(std::vector<KuduScanBatch::RowPtr>* rows);
 
-  void Clear();
+  void Clear() override;
 
   // Returns the size of a row for the given projection 'proj'.
   static size_t CalculateProjectedRowSize(const Schema& proj);
@@ -354,6 +369,44 @@ class KuduScanBatch::Data {
   size_t projected_row_size_;
 };
 
+class KuduColumnarScanBatch::Data : public internal::ScanBatchDataInterface {
+ public:
+  Status Reset(rpc::RpcController* controller,
+               const Schema* projection,
+               const KuduSchema* client_projection,
+               uint64_t row_format_flags,
+               tserver::ScanResponsePB* response) override;
+  void Clear() override;
+
+  Status GetDataForColumn(int idx, Slice* data) const;
+  Status GetNonNullBitmapForColumn(int idx, Slice* data) const;
+
+ private:
+  Status CheckColumnIndex(int idx) const;
+
+  friend class KuduColumnarScanBatch;
+
+  // The RPC controller for the RPC which returned this batch.
+  // Holding on to the controller ensures we hold on to the
+  // sidecars which contain the actual data.
+  rpc::RpcController controller_;
+
+  // The PB which contains the "direct data" slice.
+  ColumnarRowBlockPB resp_data_;
+
+  // Tracks for each variable-length (binary) column whether the pointers have been
+  // rewritten yet to be "real" pointers instead of sidecar-relative offsets.
+  // Mutable since the 'GetDataForColumn' call is semantically const, but in fact
+  // needs to modify this member to do the lazy pointer rewrites.
+  mutable std::unordered_set<int> rewritten_varlen_columns_;
+
+  // The projection being scanned.
+  const Schema* projection_;
+  // The KuduSchema version of 'projection_'
+  const KuduSchema* client_projection_;
+};
+
+
 } // namespace client
 } // namespace kudu
 
diff --git a/src/kudu/tools/tool_action_remote_replica.cc b/src/kudu/tools/tool_action_remote_replica.cc
index 4a30000..44c2de6 100644
--- a/src/kudu/tools/tool_action_remote_replica.cc
+++ b/src/kudu/tools/tool_action_remote_replica.cc
@@ -155,7 +155,7 @@ class ReplicaDumper {
                                   &schema,
                                   &client_schema,
                                   client::KuduScanner::NO_FLAGS,
-                                  unique_ptr<RowwiseRowBlockPB>(resp.release_data())));
+                                  &resp));
       vector<KuduRowResult> rows;
       results.ExtractRows(&rows);
       for (const auto& r : rows) {