You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2020/03/31 20:24:15 UTC

[kudu] branch master updated: tserver: add support for returning scan result in columnar layout

This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 00e7ca6  tserver: add support for returning scan result in columnar layout
00e7ca6 is described below

commit 00e7ca6437876e9ac95abfa96eaa7b86a785aa5c
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Thu Mar 26 22:28:06 2020 -0700

    tserver: add support for returning scan result in columnar layout
    
    This adds the server-side support for serializing scan results in
    columnar format. So far it's tested via direct RPCs. A future commit
    will add client support.
    
    Change-Id: Ib99faa27554ec228492f096bb8760c883e44660d
    Reviewed-on: http://gerrit.cloudera.org:8080/15602
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Todd Lipcon <to...@apache.org>
---
 src/kudu/common/wire_protocol.proto    |  19 +++
 src/kudu/tserver/CMakeLists.txt        |   2 +-
 src/kudu/tserver/tablet_server-test.cc |  71 +++++++++
 src/kudu/tserver/tablet_service.cc     | 270 ++++++++++++++++++++++++---------
 src/kudu/tserver/tserver.proto         |   8 +
 5 files changed, 298 insertions(+), 72 deletions(-)

diff --git a/src/kudu/common/wire_protocol.proto b/src/kudu/common/wire_protocol.proto
index 719643e..5ddba152 100644
--- a/src/kudu/common/wire_protocol.proto
+++ b/src/kudu/common/wire_protocol.proto
@@ -151,6 +151,25 @@ message RowwiseRowBlockPB {
   optional int32 indirect_data_sidecar = 3;
 }
 
+// A block of rows in columnar layout.
+message ColumnarRowBlockPB {
+  message Column {
+    // The index of the sidecar containing cell data of the column.
+    optional int32 data_sidecar = 1;
+
+    // The index of the sidecar containing any data referred to by binary/string data.
+    // In this case, the 'pointer' field of the Slices contained in the data sidecar
+    // will refer to offsets in this sidecar.
+    optional int32 indirect_data_sidecar = 2;
+
+    // If the column is nullable, The index of the sidecar containing a bitmap with a set
+    // bit for all non-null cells.
+    optional int32 non_null_bitmap_sidecar = 3;
+  }
+  repeated Column columns = 1;
+  optional int64 num_rows = 2;
+}
+
 // A set of operations (INSERT, UPDATE, UPSERT, or DELETE) to apply to a table,
 // or the set of split rows and range bounds when creating or altering table.
 // Range bounds determine the boundaries of range partitions during table
diff --git a/src/kudu/tserver/CMakeLists.txt b/src/kudu/tserver/CMakeLists.txt
index ae74010..c714547 100644
--- a/src/kudu/tserver/CMakeLists.txt
+++ b/src/kudu/tserver/CMakeLists.txt
@@ -183,7 +183,7 @@ ADD_KUDU_TEST(mini_tablet_server-test)
 ADD_KUDU_TEST(tablet_copy_client-test)
 ADD_KUDU_TEST(tablet_copy_source_session-test)
 ADD_KUDU_TEST(tablet_copy_service-test)
-ADD_KUDU_TEST(tablet_server-test PROCESSORS 3)
+ADD_KUDU_TEST(tablet_server-test PROCESSORS 3 NUM_SHARDS 4)
 ADD_KUDU_TEST(tablet_server-stress-test RUN_SERIAL true)
 ADD_KUDU_TEST(tablet_server_authorization-test NUM_SHARDS 2)
 ADD_KUDU_TEST(scanners-test)
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 4c49e3f..c336af8 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -98,6 +98,7 @@
 #include "kudu/tserver/tserver_admin.proxy.h"
 #include "kudu/tserver/tserver_service.pb.h"
 #include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/util/array_view.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/crc.h"
 #include "kudu/util/curl_util.h"
@@ -2538,6 +2539,76 @@ TEST_F(TabletServerTest, TestScanWithStringPredicates) {
   ASSERT_EQ(R"((int32 key=59, int32 int_val=118, string string_val="hello 59"))", results[9]);
 }
 
+TEST_F(TabletServerTest, TestColumnarScan) {
+  const int kNumRows = 100;
+  InsertTestRowsDirect(0, kNumRows);
+
+  ScanRequestPB req;
+  ScanResponsePB resp;
+  RpcController rpc;
+
+  NewScanRequestPB* scan = req.mutable_new_scan_request();
+  scan->set_tablet_id(kTabletId);
+  ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns()));
+
+  scan->set_row_format_flags(RowFormatFlags::COLUMNAR_LAYOUT);
+  rpc.RequireServerFeature(TabletServerFeatures::COLUMNAR_LAYOUT_FEATURE);
+  // Send the call
+  SCOPED_TRACE(SecureDebugString(req));
+  ASSERT_OK(proxy_->Scan(req, &resp, &rpc));
+
+  // Verify the response
+  SCOPED_TRACE(SecureDebugString(resp));
+  ASSERT_FALSE(resp.has_error());
+  ASSERT_EQ(3, resp.columnar_data().columns_size());
+
+  ASSERT_EQ(kNumRows, resp.columnar_data().num_rows());
+
+  // Verify column 0 (int32 key)
+  {
+    Slice col_data;
+    ASSERT_OK(rpc.GetInboundSidecar(resp.columnar_data().columns(0).data_sidecar(), &col_data));
+    SCOPED_TRACE(col_data.ToDebugString());
+    ASSERT_EQ(col_data.size(), kNumRows * sizeof(int32_t));
+    ArrayView<const int32_t> cells(reinterpret_cast<const int32_t*>(col_data.data()), kNumRows);
+    for (int i = 0; i < kNumRows; i++) {
+      EXPECT_EQ(i, cells[i]);
+    }
+  }
+
+  // Verify column 1 (int32 val)
+  {
+    Slice col_data;
+    ASSERT_OK(rpc.GetInboundSidecar(resp.columnar_data().columns(1).data_sidecar(), &col_data));
+    SCOPED_TRACE(col_data.ToDebugString());
+    ASSERT_EQ(col_data.size(), kNumRows * sizeof(int32_t));
+    ArrayView<const int32_t> cells(reinterpret_cast<const int32_t*>(col_data.data()), kNumRows);
+    for (int i = 0; i < kNumRows; i++) {
+      EXPECT_EQ(i * 2, cells[i]);
+    }
+  }
+  // Verify column 2 (string)
+  {
+    Slice col_data;
+    ASSERT_OK(rpc.GetInboundSidecar(resp.columnar_data().columns(2).data_sidecar(), &col_data));
+
+    Slice indirect_data;
+    ASSERT_OK(rpc.GetInboundSidecar(resp.columnar_data().columns(2).indirect_data_sidecar(),
+                                    &indirect_data));
+
+    SCOPED_TRACE(col_data.ToDebugString());
+    ASSERT_EQ(col_data.size(), kNumRows * sizeof(Slice));
+    ArrayView<const Slice> cells(reinterpret_cast<const Slice*>(col_data.data()), kNumRows);
+    for (int i = 0; i < kNumRows; i++) {
+      Slice s = cells[i];
+      Slice real_str(indirect_data.data() + reinterpret_cast<uintptr_t>(s.data()),
+                     s.size());
+      ASSERT_EQ(Substitute("hello $0", i), real_str);
+    }
+  }
+}
+
+
 TEST_F(TabletServerTest, TestNonPositiveLimitsShortCircuit) {
   InsertTestRowsDirect(0, 10);
   for (int limit : { -1, 0 }) {
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index a51450f..6f0b6eb 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -35,6 +35,7 @@
 
 #include "kudu/clock/clock.h"
 #include "kudu/common/column_predicate.h"
+#include "kudu/common/columnar_serialization.h"
 #include "kudu/common/columnblock.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/encoded_key.h"
@@ -693,20 +694,19 @@ class ScanResultCollector {
   // Returns number of bytes which will be returned in the response.
   virtual int64_t ResponseSize() const = 0;
 
-  // Returns the last processed row's primary key.
-  virtual const faststring& last_primary_key() const = 0;
-
   // Return the number of rows actually returned to the client.
   virtual int64_t NumRowsReturned() const = 0;
 
-  // Sets row format flags on the ScanResultCollector.
+  // Initialize the serializer with the given row format flags.
   //
-  // This is a setter instead of a constructor argument passed to specific
-  // collector implementations because, currently, the collector is built
-  // before the request is decoded and checked for 'row_format_flags'.
+  // This is a separate function instead of a constructor argument passed to specific
+  // collector implementations because, currently, the collector is built before the
+  // request is decoded and checked for 'row_format_flags'.
   //
   // Does nothing by default.
-  virtual void set_row_format_flags(uint64_t /* row_format_flags */) {}
+  virtual Status InitSerializer(uint64_t /* row_format_flags */) {
+    return Status::OK();
+  }
 
   CpuTimes* cpu_times() {
     return &cpu_times_;
@@ -735,7 +735,146 @@ void SetLastRow(const RowBlock& row_block, faststring* last_primary_key) {
   }
 }
 
-}  // namespace
+// Interface for serializing results into a scan response.
+class ResultSerializer {
+ public:
+  virtual ~ResultSerializer() = default;
+
+  // Add the given RowBlock to the pending response.
+  virtual int SerializeRowBlock(const RowBlock& row_block,
+                                const Schema* client_projection_schema) = 0;
+
+  // Return the approximate size (in bytes) of the pending response. Once this
+  // result is greater than the client's requested batch size, the pending rows
+  // will be returned to the client.
+  virtual size_t ResponseSize() const = 0;
+
+  // Serialize the pending rows into the response protobuf.
+  // Must be called at most once.
+  virtual void SetupResponse(rpc::RpcContext* context, ScanResponsePB* resp) = 0;
+};
+
+class RowwiseResultSerializer : public ResultSerializer {
+ public:
+  RowwiseResultSerializer(int batch_size_bytes, uint64_t flags)
+      : rows_data_(batch_size_bytes * 11 / 10),
+        indirect_data_(batch_size_bytes * 11 / 10),
+        pad_unixtime_micros_to_16_bytes_(flags & RowFormatFlags::PAD_UNIX_TIME_MICROS_TO_16_BYTES) {
+    // TODO(todd): use a chain of faststrings instead of a single one to avoid
+    // allocating this large buffer. Large buffer allocations are slow and
+    // potentially wasteful.
+  }
+
+  int SerializeRowBlock(const RowBlock& row_block,
+                        const Schema* client_projection_schema) override {
+    int num_selected = kudu::SerializeRowBlock(
+        row_block, client_projection_schema,
+        &rows_data_, &indirect_data_, pad_unixtime_micros_to_16_bytes_);
+    rowblock_pb_.set_num_rows(rowblock_pb_.num_rows() + num_selected);
+    return num_selected;
+  }
+
+  size_t ResponseSize() const override {
+    return rows_data_.size() + indirect_data_.size();
+  }
+
+  void SetupResponse(rpc::RpcContext* context, ScanResponsePB* resp) override {
+    CHECK(!done_);
+    done_ = true;
+
+    *resp->mutable_data() = std::move(rowblock_pb_);
+    // Add sidecar data to context and record the returned indices.
+    int rows_idx;
+    CHECK_OK(context->AddOutboundSidecar(
+        RpcSidecar::FromFaststring((std::move(rows_data_))), &rows_idx));
+    resp->mutable_data()->set_rows_sidecar(rows_idx);
+
+    // Add indirect data as a sidecar, if applicable.
+    if (indirect_data_.size() > 0) {
+      int indirect_idx;
+      CHECK_OK(context->AddOutboundSidecar(
+          RpcSidecar::FromFaststring(std::move(indirect_data_)), &indirect_idx));
+      resp->mutable_data()->set_indirect_data_sidecar(indirect_idx);
+    }
+  }
+
+ private:
+  RowwiseRowBlockPB rowblock_pb_;
+  faststring rows_data_;
+  faststring indirect_data_;
+  bool pad_unixtime_micros_to_16_bytes_;
+  bool done_ = false;
+};
+
+class ColumnarResultSerializer : public ResultSerializer {
+ public:
+  static Status Create(uint64_t flags, unique_ptr<ResultSerializer>* serializer) {
+    if (flags & ~RowFormatFlags::COLUMNAR_LAYOUT) {
+      return Status::InvalidArgument("Row format flags not supported with columnar layout");
+    }
+    serializer->reset(new ColumnarResultSerializer());
+    return Status::OK();
+  }
+
+  int SerializeRowBlock(const RowBlock& row_block,
+                        const Schema* client_projection_schema) override {
+    CHECK(!done_);
+    int n_sel = SerializeRowBlockColumnar(row_block, client_projection_schema, &results_);
+    num_rows_ += n_sel;
+    return n_sel;
+  }
+
+  size_t ResponseSize() const override {
+    CHECK(!done_);
+
+    int total = 0;
+    for (const auto& col : results_.columns) {
+      total += col.data.size();
+      if (col.indirect_data) {
+        total += col.indirect_data->size();
+      }
+      if (col.non_null_bitmap) {
+        total += col.non_null_bitmap->size();
+      }
+    }
+    return total;
+  }
+
+  void SetupResponse(rpc::RpcContext* context, ScanResponsePB* resp) override {
+    CHECK(!done_);
+    done_ = true;
+    ColumnarRowBlockPB* data = resp->mutable_columnar_data();
+    for (auto& col : results_.columns) {
+      auto* col_pb = data->add_columns();
+      int sidecar_idx;
+      CHECK_OK(context->AddOutboundSidecar(
+          RpcSidecar::FromFaststring((std::move(col.data))), &sidecar_idx));
+      col_pb->set_data_sidecar(sidecar_idx);
+
+      if (col.indirect_data) {
+        CHECK_OK(context->AddOutboundSidecar(
+            RpcSidecar::FromFaststring((std::move(*col.indirect_data))), &sidecar_idx));
+        col_pb->set_indirect_data_sidecar(sidecar_idx);
+      }
+
+      if (col.non_null_bitmap) {
+        CHECK_OK(context->AddOutboundSidecar(
+            RpcSidecar::FromFaststring((std::move(*col.non_null_bitmap))), &sidecar_idx));
+        col_pb->set_non_null_bitmap_sidecar(sidecar_idx);
+      }
+    }
+    data->set_num_rows(num_rows_);
+  }
+
+ private:
+  ColumnarResultSerializer() {}
+
+  int64_t num_rows_ = 0;
+  ColumnarSerializedBatch results_;
+  bool done_ = false;
+};
+
+} // anonymous namespace
 
 // Copies the scan result to the given row block PB and data buffers.
 //
@@ -747,22 +886,16 @@ void SetLastRow(const RowBlock& row_block, faststring* last_primary_key) {
 // server-side scan and thus never need to return the actual data.)
 class ScanResultCopier : public ScanResultCollector {
  public:
-  ScanResultCopier(RowwiseRowBlockPB* rowblock_pb,
-                   faststring* rows_data,
-                   faststring* indirect_data)
-      : rowblock_pb_(DCHECK_NOTNULL(rowblock_pb)),
-        rows_data_(DCHECK_NOTNULL(rows_data)),
-        indirect_data_(DCHECK_NOTNULL(indirect_data)),
-        num_rows_returned_(0),
-        pad_unixtime_micros_to_16_bytes_(false) {}
+  explicit ScanResultCopier(int batch_size_bytes)
+      : batch_size_bytes_(batch_size_bytes),
+        num_rows_returned_(0) {
+  }
 
   void HandleRowBlock(Scanner* scanner, const RowBlock& row_block) override {
-    int num_selected = SerializeRowBlock(
-        row_block, scanner->client_projection_schema(),
-        rows_data_, indirect_data_, pad_unixtime_micros_to_16_bytes_);
+    int num_selected = serializer_->SerializeRowBlock(
+        row_block, scanner->client_projection_schema());
 
     if (num_selected > 0) {
-      rowblock_pb_->set_num_rows(rowblock_pb_->num_rows() + num_selected);
       num_rows_returned_ += num_selected;
       scanner->add_num_rows_returned(num_selected);
       SetLastRow(row_block, &last_primary_key_);
@@ -771,30 +904,45 @@ class ScanResultCopier : public ScanResultCollector {
 
   // Returns number of bytes buffered to return.
   int64_t ResponseSize() const override {
-    return rows_data_->size() + indirect_data_->size();
-  }
-
-  const faststring& last_primary_key() const override {
-    return last_primary_key_;
+    return serializer_->ResponseSize();
   }
 
   int64_t NumRowsReturned() const override {
     return num_rows_returned_;
   }
 
-  void set_row_format_flags(uint64_t row_format_flags) override {
-    if (row_format_flags & RowFormatFlags::PAD_UNIX_TIME_MICROS_TO_16_BYTES) {
-      pad_unixtime_micros_to_16_bytes_ = true;
+  Status InitSerializer(uint64_t row_format_flags) override {
+    if (serializer_) {
+      // TODO(todd) for the NewScanner case, this gets called twice
+      // which is a bit ugly. Refactor to avoid!
+      return Status::OK();
+    }
+    if (row_format_flags & COLUMNAR_LAYOUT) {
+      return ColumnarResultSerializer::Create(row_format_flags, &serializer_);
+    }
+    serializer_.reset(new RowwiseResultSerializer(batch_size_bytes_, row_format_flags));
+    return Status::OK();
+  }
+
+  void SetupResponse(rpc::RpcContext* context, ScanResponsePB* resp) {
+    if (serializer_) {
+      serializer_->SetupResponse(context, resp);
+    }
+
+    // Set the last row found by the collector.
+    //
+    // We could have an empty batch if all the remaining rows are filtered by the
+    // predicate, in which case do not set the last row.
+    if (last_primary_key_.length() > 0) {
+      resp->set_last_primary_key(last_primary_key_.ToString());
     }
   }
 
  private:
-  RowwiseRowBlockPB* const rowblock_pb_;
-  faststring* const rows_data_;
-  faststring* const indirect_data_;
+  int batch_size_bytes_;
   int64_t num_rows_returned_;
   faststring last_primary_key_;
-  bool pad_unixtime_micros_to_16_bytes_;
+  unique_ptr<ResultSerializer> serializer_;
 
   DISALLOW_COPY_AND_ASSIGN(ScanResultCopier);
 };
@@ -830,8 +978,6 @@ class ScanResultChecksummer : public ScanResultCollector {
   // Returns a constant -- we only return checksum based on a time budget.
   virtual int64_t ResponseSize() const OVERRIDE { return sizeof(agg_checksum_); }
 
-  virtual const faststring& last_primary_key() const OVERRIDE { return encoded_last_row_; }
-
   virtual int64_t NumRowsReturned() const OVERRIDE {
     return 0;
   }
@@ -1731,13 +1877,7 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
   }
 
   size_t batch_size_bytes = GetMaxBatchSizeBytesHint(req);
-  // TODO(todd): use a chain of faststrings instead of a single one to avoid
-  // allocating this large buffer. Large buffer allocations are slow and
-  // potentially wasteful.
-  faststring rows_data(batch_size_bytes * 11 / 10);
-  faststring indirect_data(batch_size_bytes * 11 / 10);
-  RowwiseRowBlockPB data;
-  ScanResultCopier collector(&data, &rows_data, &indirect_data);
+  ScanResultCopier collector(batch_size_bytes);
 
   bool has_more_results = false;
   TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR;
@@ -1779,32 +1919,9 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
                               "Must pass either a scanner_id or new_scan_request"));
     return;
   }
-  resp->set_has_more_results(has_more_results);
 
-  resp->mutable_data()->CopyFrom(data);
-
-  // Add sidecar data to context and record the returned indices.
-  int rows_idx;
-  CHECK_OK(context->AddOutboundSidecar(
-      RpcSidecar::FromFaststring((std::move(rows_data))), &rows_idx));
-  resp->mutable_data()->set_rows_sidecar(rows_idx);
-
-  // Add indirect data as a sidecar, if applicable.
-  if (indirect_data.size() > 0) {
-    int indirect_idx;
-    CHECK_OK(context->AddOutboundSidecar(
-        RpcSidecar::FromFaststring(std::move(indirect_data)), &indirect_idx));
-    resp->mutable_data()->set_indirect_data_sidecar(indirect_idx);
-  }
-
-  // Set the last row found by the collector.
-  //
-  // We could have an empty batch if all the remaining rows are filtered by the
-  // predicate, in which case do not set the last row.
-  const faststring& last = collector.last_primary_key();
-  if (last.length() > 0) {
-    resp->set_last_primary_key(last.ToString());
-  }
+  collector.SetupResponse(context, resp);
+  resp->set_has_more_results(has_more_results);
   resp->set_propagated_timestamp(server_->clock()->Now().ToUint64());
 
   SetResourceMetrics(context, collector.cpu_times(), resp->mutable_resource_metrics());
@@ -2110,6 +2227,7 @@ bool TabletServiceImpl::SupportsFeature(uint32_t feature) const {
     case TabletServerFeatures::PAD_UNIXTIME_MICROS_TO_16_BYTES:
     case TabletServerFeatures::QUIESCING:
     case TabletServerFeatures::BLOOM_FILTER_PREDICATE:
+    case TabletServerFeatures::COLUMNAR_LAYOUT_FEATURE:
       return true;
     default:
       return false;
@@ -2319,6 +2437,12 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
   TRACE_EVENT1("tserver", "TabletServiceImpl::HandleNewScanRequest",
                "tablet_id", scan_pb.tablet_id());
 
+  Status s = result_collector->InitSerializer(scan_pb.row_format_flags());
+  if (!s.ok()) {
+    *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
+    return s;
+  }
+
   const Schema& tablet_schema = replica->tablet_metadata()->schema();
 
   SharedScanner scanner;
@@ -2336,7 +2460,7 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
   // Create the user's requested projection.
   // TODO(todd): Add test cases for bad projections including 0 columns.
   Schema projection;
-  Status s = ColumnPBsToSchema(scan_pb.projected_columns(), &projection);
+  s = ColumnPBsToSchema(scan_pb.projected_columns(), &projection);
   if (PREDICT_FALSE(!s.ok())) {
     *error_code = TabletServerErrorPB::INVALID_SCHEMA;
     return s;
@@ -2614,9 +2738,6 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
                                       "--scanner_inject_service_unavailable_on_continue_scan");
   }
 
-  // Set the row format flags on the ScanResultCollector.
-  result_collector->set_row_format_flags(scanner->row_format_flags());
-
   // If we early-exit out of this function, automatically unregister the scanner.
   ScopedUnregisterScanner unreg_scanner(server_->scanner_manager(), scanner->id());
   ScopedAddScannerTiming scanner_timer(scanner.get(), result_collector->cpu_times());
@@ -2625,6 +2746,13 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
           << SecureShortDebugString(*req);
   TRACE("Found scanner $0 for tablet $1", scanner->id(), scanner->tablet_id());
 
+  // Set the row format flags on the ScanResultCollector.
+  s = result_collector->InitSerializer(scanner->row_format_flags());
+  if (!s.ok()) {
+    *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
+    return s;
+  }
+
   if (batch_size_bytes == 0 && req->close_scanner()) {
     *has_more_results = false;
     return Status::OK();
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index f2ab6d4..9f6de3d 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -224,6 +224,10 @@ message ColumnRangePredicateListPB {
 enum RowFormatFlags {
   NO_FLAGS = 0;
   PAD_UNIX_TIME_MICROS_TO_16_BYTES = 1;
+
+  // Return a ColumnarRowBlockPB instead of RowwiseRowBlockPB.
+  // Incompatible with PAD_UNIX_TIME_MICROS_TO_16_BYTES.
+  COLUMNAR_LAYOUT = 2;
 }
 
 message NewScanRequestPB {
@@ -405,6 +409,8 @@ message ScanResponsePB {
   // The schema will match the schema requested by the client when it created
   // the scanner.
   optional RowwiseRowBlockPB data = 4;
+  // Set instead of 'data' if COLUMNAR_LAYOUT is passed.
+  optional ColumnarRowBlockPB columnar_data = 5;
 
   // The snapshot timestamp at which the scan was executed. This is only set
   // in the first response (i.e. the response to the request that had
@@ -472,4 +478,6 @@ enum TabletServerFeatures {
   PAD_UNIXTIME_MICROS_TO_16_BYTES = 2;
   QUIESCING = 3;
   BLOOM_FILTER_PREDICATE = 4;
+  // Whether the server supports the COLUMNAR_LAYOUT format flag.
+  COLUMNAR_LAYOUT_FEATURE = 5;
 }