You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2020/03/31 20:24:15 UTC
[kudu] branch master updated: tserver: add support for returning
scan result in columnar layout
This is an automated email from the ASF dual-hosted git repository.
todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 00e7ca6 tserver: add support for returning scan result in columnar layout
00e7ca6 is described below
commit 00e7ca6437876e9ac95abfa96eaa7b86a785aa5c
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Thu Mar 26 22:28:06 2020 -0700
tserver: add support for returning scan result in columnar layout
This adds the server-side support for serializing scan results in
columnar format. So far it's tested via direct RPCs. A future commit
will add client support.
Change-Id: Ib99faa27554ec228492f096bb8760c883e44660d
Reviewed-on: http://gerrit.cloudera.org:8080/15602
Reviewed-by: Andrew Wong <aw...@cloudera.com>
Tested-by: Todd Lipcon <to...@apache.org>
---
src/kudu/common/wire_protocol.proto | 19 +++
src/kudu/tserver/CMakeLists.txt | 2 +-
src/kudu/tserver/tablet_server-test.cc | 71 +++++++++
src/kudu/tserver/tablet_service.cc | 270 ++++++++++++++++++++++++---------
src/kudu/tserver/tserver.proto | 8 +
5 files changed, 298 insertions(+), 72 deletions(-)
diff --git a/src/kudu/common/wire_protocol.proto b/src/kudu/common/wire_protocol.proto
index 719643e..5ddba152 100644
--- a/src/kudu/common/wire_protocol.proto
+++ b/src/kudu/common/wire_protocol.proto
@@ -151,6 +151,25 @@ message RowwiseRowBlockPB {
optional int32 indirect_data_sidecar = 3;
}
+// A block of rows in columnar layout.
+message ColumnarRowBlockPB {
+ message Column {
+ // The index of the sidecar containing cell data of the column.
+ optional int32 data_sidecar = 1;
+
+ // The index of the sidecar containing any data referred to by binary/string data.
+ // In this case, the 'pointer' field of the Slices contained in the data sidecar
+ // will refer to offsets in this sidecar.
+ optional int32 indirect_data_sidecar = 2;
+
+ // If the column is nullable, The index of the sidecar containing a bitmap with a set
+ // bit for all non-null cells.
+ optional int32 non_null_bitmap_sidecar = 3;
+ }
+ repeated Column columns = 1;
+ optional int64 num_rows = 2;
+}
+
// A set of operations (INSERT, UPDATE, UPSERT, or DELETE) to apply to a table,
// or the set of split rows and range bounds when creating or altering table.
// Range bounds determine the boundaries of range partitions during table
diff --git a/src/kudu/tserver/CMakeLists.txt b/src/kudu/tserver/CMakeLists.txt
index ae74010..c714547 100644
--- a/src/kudu/tserver/CMakeLists.txt
+++ b/src/kudu/tserver/CMakeLists.txt
@@ -183,7 +183,7 @@ ADD_KUDU_TEST(mini_tablet_server-test)
ADD_KUDU_TEST(tablet_copy_client-test)
ADD_KUDU_TEST(tablet_copy_source_session-test)
ADD_KUDU_TEST(tablet_copy_service-test)
-ADD_KUDU_TEST(tablet_server-test PROCESSORS 3)
+ADD_KUDU_TEST(tablet_server-test PROCESSORS 3 NUM_SHARDS 4)
ADD_KUDU_TEST(tablet_server-stress-test RUN_SERIAL true)
ADD_KUDU_TEST(tablet_server_authorization-test NUM_SHARDS 2)
ADD_KUDU_TEST(scanners-test)
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 4c49e3f..c336af8 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -98,6 +98,7 @@
#include "kudu/tserver/tserver_admin.proxy.h"
#include "kudu/tserver/tserver_service.pb.h"
#include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/util/array_view.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/crc.h"
#include "kudu/util/curl_util.h"
@@ -2538,6 +2539,76 @@ TEST_F(TabletServerTest, TestScanWithStringPredicates) {
ASSERT_EQ(R"((int32 key=59, int32 int_val=118, string string_val="hello 59"))", results[9]);
}
+TEST_F(TabletServerTest, TestColumnarScan) {
+ const int kNumRows = 100;
+ InsertTestRowsDirect(0, kNumRows);
+
+ ScanRequestPB req;
+ ScanResponsePB resp;
+ RpcController rpc;
+
+ NewScanRequestPB* scan = req.mutable_new_scan_request();
+ scan->set_tablet_id(kTabletId);
+ ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns()));
+
+ scan->set_row_format_flags(RowFormatFlags::COLUMNAR_LAYOUT);
+ rpc.RequireServerFeature(TabletServerFeatures::COLUMNAR_LAYOUT_FEATURE);
+ // Send the call
+ SCOPED_TRACE(SecureDebugString(req));
+ ASSERT_OK(proxy_->Scan(req, &resp, &rpc));
+
+ // Verify the response
+ SCOPED_TRACE(SecureDebugString(resp));
+ ASSERT_FALSE(resp.has_error());
+ ASSERT_EQ(3, resp.columnar_data().columns_size());
+
+ ASSERT_EQ(kNumRows, resp.columnar_data().num_rows());
+
+ // Verify column 0 (int32 key)
+ {
+ Slice col_data;
+ ASSERT_OK(rpc.GetInboundSidecar(resp.columnar_data().columns(0).data_sidecar(), &col_data));
+ SCOPED_TRACE(col_data.ToDebugString());
+ ASSERT_EQ(col_data.size(), kNumRows * sizeof(int32_t));
+ ArrayView<const int32_t> cells(reinterpret_cast<const int32_t*>(col_data.data()), kNumRows);
+ for (int i = 0; i < kNumRows; i++) {
+ EXPECT_EQ(i, cells[i]);
+ }
+ }
+
+ // Verify column 1 (int32 val)
+ {
+ Slice col_data;
+ ASSERT_OK(rpc.GetInboundSidecar(resp.columnar_data().columns(1).data_sidecar(), &col_data));
+ SCOPED_TRACE(col_data.ToDebugString());
+ ASSERT_EQ(col_data.size(), kNumRows * sizeof(int32_t));
+ ArrayView<const int32_t> cells(reinterpret_cast<const int32_t*>(col_data.data()), kNumRows);
+ for (int i = 0; i < kNumRows; i++) {
+ EXPECT_EQ(i * 2, cells[i]);
+ }
+ }
+ // Verify column 2 (string)
+ {
+ Slice col_data;
+ ASSERT_OK(rpc.GetInboundSidecar(resp.columnar_data().columns(2).data_sidecar(), &col_data));
+
+ Slice indirect_data;
+ ASSERT_OK(rpc.GetInboundSidecar(resp.columnar_data().columns(2).indirect_data_sidecar(),
+ &indirect_data));
+
+ SCOPED_TRACE(col_data.ToDebugString());
+ ASSERT_EQ(col_data.size(), kNumRows * sizeof(Slice));
+ ArrayView<const Slice> cells(reinterpret_cast<const Slice*>(col_data.data()), kNumRows);
+ for (int i = 0; i < kNumRows; i++) {
+ Slice s = cells[i];
+ Slice real_str(indirect_data.data() + reinterpret_cast<uintptr_t>(s.data()),
+ s.size());
+ ASSERT_EQ(Substitute("hello $0", i), real_str);
+ }
+ }
+}
+
+
TEST_F(TabletServerTest, TestNonPositiveLimitsShortCircuit) {
InsertTestRowsDirect(0, 10);
for (int limit : { -1, 0 }) {
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index a51450f..6f0b6eb 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -35,6 +35,7 @@
#include "kudu/clock/clock.h"
#include "kudu/common/column_predicate.h"
+#include "kudu/common/columnar_serialization.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/encoded_key.h"
@@ -693,20 +694,19 @@ class ScanResultCollector {
// Returns number of bytes which will be returned in the response.
virtual int64_t ResponseSize() const = 0;
- // Returns the last processed row's primary key.
- virtual const faststring& last_primary_key() const = 0;
-
// Return the number of rows actually returned to the client.
virtual int64_t NumRowsReturned() const = 0;
- // Sets row format flags on the ScanResultCollector.
+ // Initialize the serializer with the given row format flags.
//
- // This is a setter instead of a constructor argument passed to specific
- // collector implementations because, currently, the collector is built
- // before the request is decoded and checked for 'row_format_flags'.
+ // This is a separate function instead of a constructor argument passed to specific
+ // collector implementations because, currently, the collector is built before the
+ // request is decoded and checked for 'row_format_flags'.
//
// Does nothing by default.
- virtual void set_row_format_flags(uint64_t /* row_format_flags */) {}
+ virtual Status InitSerializer(uint64_t /* row_format_flags */) {
+ return Status::OK();
+ }
CpuTimes* cpu_times() {
return &cpu_times_;
@@ -735,7 +735,146 @@ void SetLastRow(const RowBlock& row_block, faststring* last_primary_key) {
}
}
-} // namespace
+// Interface for serializing results into a scan response.
+class ResultSerializer {
+ public:
+ virtual ~ResultSerializer() = default;
+
+ // Add the given RowBlock to the pending response.
+ virtual int SerializeRowBlock(const RowBlock& row_block,
+ const Schema* client_projection_schema) = 0;
+
+ // Return the approximate size (in bytes) of the pending response. Once this
+ // result is greater than the client's requested batch size, the pending rows
+ // will be returned to the client.
+ virtual size_t ResponseSize() const = 0;
+
+ // Serialize the pending rows into the response protobuf.
+ // Must be called at most once.
+ virtual void SetupResponse(rpc::RpcContext* context, ScanResponsePB* resp) = 0;
+};
+
+class RowwiseResultSerializer : public ResultSerializer {
+ public:
+ RowwiseResultSerializer(int batch_size_bytes, uint64_t flags)
+ : rows_data_(batch_size_bytes * 11 / 10),
+ indirect_data_(batch_size_bytes * 11 / 10),
+ pad_unixtime_micros_to_16_bytes_(flags & RowFormatFlags::PAD_UNIX_TIME_MICROS_TO_16_BYTES) {
+ // TODO(todd): use a chain of faststrings instead of a single one to avoid
+ // allocating this large buffer. Large buffer allocations are slow and
+ // potentially wasteful.
+ }
+
+ int SerializeRowBlock(const RowBlock& row_block,
+ const Schema* client_projection_schema) override {
+ int num_selected = kudu::SerializeRowBlock(
+ row_block, client_projection_schema,
+ &rows_data_, &indirect_data_, pad_unixtime_micros_to_16_bytes_);
+ rowblock_pb_.set_num_rows(rowblock_pb_.num_rows() + num_selected);
+ return num_selected;
+ }
+
+ size_t ResponseSize() const override {
+ return rows_data_.size() + indirect_data_.size();
+ }
+
+ void SetupResponse(rpc::RpcContext* context, ScanResponsePB* resp) override {
+ CHECK(!done_);
+ done_ = true;
+
+ *resp->mutable_data() = std::move(rowblock_pb_);
+ // Add sidecar data to context and record the returned indices.
+ int rows_idx;
+ CHECK_OK(context->AddOutboundSidecar(
+ RpcSidecar::FromFaststring((std::move(rows_data_))), &rows_idx));
+ resp->mutable_data()->set_rows_sidecar(rows_idx);
+
+ // Add indirect data as a sidecar, if applicable.
+ if (indirect_data_.size() > 0) {
+ int indirect_idx;
+ CHECK_OK(context->AddOutboundSidecar(
+ RpcSidecar::FromFaststring(std::move(indirect_data_)), &indirect_idx));
+ resp->mutable_data()->set_indirect_data_sidecar(indirect_idx);
+ }
+ }
+
+ private:
+ RowwiseRowBlockPB rowblock_pb_;
+ faststring rows_data_;
+ faststring indirect_data_;
+ bool pad_unixtime_micros_to_16_bytes_;
+ bool done_ = false;
+};
+
+class ColumnarResultSerializer : public ResultSerializer {
+ public:
+ static Status Create(uint64_t flags, unique_ptr<ResultSerializer>* serializer) {
+ if (flags & ~RowFormatFlags::COLUMNAR_LAYOUT) {
+ return Status::InvalidArgument("Row format flags not supported with columnar layout");
+ }
+ serializer->reset(new ColumnarResultSerializer());
+ return Status::OK();
+ }
+
+ int SerializeRowBlock(const RowBlock& row_block,
+ const Schema* client_projection_schema) override {
+ CHECK(!done_);
+ int n_sel = SerializeRowBlockColumnar(row_block, client_projection_schema, &results_);
+ num_rows_ += n_sel;
+ return n_sel;
+ }
+
+ size_t ResponseSize() const override {
+ CHECK(!done_);
+
+ int total = 0;
+ for (const auto& col : results_.columns) {
+ total += col.data.size();
+ if (col.indirect_data) {
+ total += col.indirect_data->size();
+ }
+ if (col.non_null_bitmap) {
+ total += col.non_null_bitmap->size();
+ }
+ }
+ return total;
+ }
+
+ void SetupResponse(rpc::RpcContext* context, ScanResponsePB* resp) override {
+ CHECK(!done_);
+ done_ = true;
+ ColumnarRowBlockPB* data = resp->mutable_columnar_data();
+ for (auto& col : results_.columns) {
+ auto* col_pb = data->add_columns();
+ int sidecar_idx;
+ CHECK_OK(context->AddOutboundSidecar(
+ RpcSidecar::FromFaststring((std::move(col.data))), &sidecar_idx));
+ col_pb->set_data_sidecar(sidecar_idx);
+
+ if (col.indirect_data) {
+ CHECK_OK(context->AddOutboundSidecar(
+ RpcSidecar::FromFaststring((std::move(*col.indirect_data))), &sidecar_idx));
+ col_pb->set_indirect_data_sidecar(sidecar_idx);
+ }
+
+ if (col.non_null_bitmap) {
+ CHECK_OK(context->AddOutboundSidecar(
+ RpcSidecar::FromFaststring((std::move(*col.non_null_bitmap))), &sidecar_idx));
+ col_pb->set_non_null_bitmap_sidecar(sidecar_idx);
+ }
+ }
+ data->set_num_rows(num_rows_);
+ }
+
+ private:
+ ColumnarResultSerializer() {}
+
+ int64_t num_rows_ = 0;
+ ColumnarSerializedBatch results_;
+ bool done_ = false;
+};
+
+} // anonymous namespace
// Copies the scan result to the given row block PB and data buffers.
//
@@ -747,22 +886,16 @@ void SetLastRow(const RowBlock& row_block, faststring* last_primary_key) {
// server-side scan and thus never need to return the actual data.)
class ScanResultCopier : public ScanResultCollector {
public:
- ScanResultCopier(RowwiseRowBlockPB* rowblock_pb,
- faststring* rows_data,
- faststring* indirect_data)
- : rowblock_pb_(DCHECK_NOTNULL(rowblock_pb)),
- rows_data_(DCHECK_NOTNULL(rows_data)),
- indirect_data_(DCHECK_NOTNULL(indirect_data)),
- num_rows_returned_(0),
- pad_unixtime_micros_to_16_bytes_(false) {}
+ explicit ScanResultCopier(int batch_size_bytes)
+ : batch_size_bytes_(batch_size_bytes),
+ num_rows_returned_(0) {
+ }
void HandleRowBlock(Scanner* scanner, const RowBlock& row_block) override {
- int num_selected = SerializeRowBlock(
- row_block, scanner->client_projection_schema(),
- rows_data_, indirect_data_, pad_unixtime_micros_to_16_bytes_);
+ int num_selected = serializer_->SerializeRowBlock(
+ row_block, scanner->client_projection_schema());
if (num_selected > 0) {
- rowblock_pb_->set_num_rows(rowblock_pb_->num_rows() + num_selected);
num_rows_returned_ += num_selected;
scanner->add_num_rows_returned(num_selected);
SetLastRow(row_block, &last_primary_key_);
@@ -771,30 +904,45 @@ class ScanResultCopier : public ScanResultCollector {
// Returns number of bytes buffered to return.
int64_t ResponseSize() const override {
- return rows_data_->size() + indirect_data_->size();
- }
-
- const faststring& last_primary_key() const override {
- return last_primary_key_;
+ return serializer_->ResponseSize();
}
int64_t NumRowsReturned() const override {
return num_rows_returned_;
}
- void set_row_format_flags(uint64_t row_format_flags) override {
- if (row_format_flags & RowFormatFlags::PAD_UNIX_TIME_MICROS_TO_16_BYTES) {
- pad_unixtime_micros_to_16_bytes_ = true;
+ Status InitSerializer(uint64_t row_format_flags) override {
+ if (serializer_) {
+ // TODO(todd) for the NewScanner case, this gets called twice
+ // which is a bit ugly. Refactor to avoid!
+ return Status::OK();
+ }
+ if (row_format_flags & COLUMNAR_LAYOUT) {
+ return ColumnarResultSerializer::Create(row_format_flags, &serializer_);
+ }
+ serializer_.reset(new RowwiseResultSerializer(batch_size_bytes_, row_format_flags));
+ return Status::OK();
+ }
+
+ void SetupResponse(rpc::RpcContext* context, ScanResponsePB* resp) {
+ if (serializer_) {
+ serializer_->SetupResponse(context, resp);
+ }
+
+ // Set the last row found by the collector.
+ //
+ // We could have an empty batch if all the remaining rows are filtered by the
+ // predicate, in which case do not set the last row.
+ if (last_primary_key_.length() > 0) {
+ resp->set_last_primary_key(last_primary_key_.ToString());
}
}
private:
- RowwiseRowBlockPB* const rowblock_pb_;
- faststring* const rows_data_;
- faststring* const indirect_data_;
+ int batch_size_bytes_;
int64_t num_rows_returned_;
faststring last_primary_key_;
- bool pad_unixtime_micros_to_16_bytes_;
+ unique_ptr<ResultSerializer> serializer_;
DISALLOW_COPY_AND_ASSIGN(ScanResultCopier);
};
@@ -830,8 +978,6 @@ class ScanResultChecksummer : public ScanResultCollector {
// Returns a constant -- we only return checksum based on a time budget.
virtual int64_t ResponseSize() const OVERRIDE { return sizeof(agg_checksum_); }
- virtual const faststring& last_primary_key() const OVERRIDE { return encoded_last_row_; }
-
virtual int64_t NumRowsReturned() const OVERRIDE {
return 0;
}
@@ -1731,13 +1877,7 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
}
size_t batch_size_bytes = GetMaxBatchSizeBytesHint(req);
- // TODO(todd): use a chain of faststrings instead of a single one to avoid
- // allocating this large buffer. Large buffer allocations are slow and
- // potentially wasteful.
- faststring rows_data(batch_size_bytes * 11 / 10);
- faststring indirect_data(batch_size_bytes * 11 / 10);
- RowwiseRowBlockPB data;
- ScanResultCopier collector(&data, &rows_data, &indirect_data);
+ ScanResultCopier collector(batch_size_bytes);
bool has_more_results = false;
TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR;
@@ -1779,32 +1919,9 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
"Must pass either a scanner_id or new_scan_request"));
return;
}
- resp->set_has_more_results(has_more_results);
- resp->mutable_data()->CopyFrom(data);
-
- // Add sidecar data to context and record the returned indices.
- int rows_idx;
- CHECK_OK(context->AddOutboundSidecar(
- RpcSidecar::FromFaststring((std::move(rows_data))), &rows_idx));
- resp->mutable_data()->set_rows_sidecar(rows_idx);
-
- // Add indirect data as a sidecar, if applicable.
- if (indirect_data.size() > 0) {
- int indirect_idx;
- CHECK_OK(context->AddOutboundSidecar(
- RpcSidecar::FromFaststring(std::move(indirect_data)), &indirect_idx));
- resp->mutable_data()->set_indirect_data_sidecar(indirect_idx);
- }
-
- // Set the last row found by the collector.
- //
- // We could have an empty batch if all the remaining rows are filtered by the
- // predicate, in which case do not set the last row.
- const faststring& last = collector.last_primary_key();
- if (last.length() > 0) {
- resp->set_last_primary_key(last.ToString());
- }
+ collector.SetupResponse(context, resp);
+ resp->set_has_more_results(has_more_results);
resp->set_propagated_timestamp(server_->clock()->Now().ToUint64());
SetResourceMetrics(context, collector.cpu_times(), resp->mutable_resource_metrics());
@@ -2110,6 +2227,7 @@ bool TabletServiceImpl::SupportsFeature(uint32_t feature) const {
case TabletServerFeatures::PAD_UNIXTIME_MICROS_TO_16_BYTES:
case TabletServerFeatures::QUIESCING:
case TabletServerFeatures::BLOOM_FILTER_PREDICATE:
+ case TabletServerFeatures::COLUMNAR_LAYOUT_FEATURE:
return true;
default:
return false;
@@ -2319,6 +2437,12 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
TRACE_EVENT1("tserver", "TabletServiceImpl::HandleNewScanRequest",
"tablet_id", scan_pb.tablet_id());
+ Status s = result_collector->InitSerializer(scan_pb.row_format_flags());
+ if (!s.ok()) {
+ *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
+ return s;
+ }
+
const Schema& tablet_schema = replica->tablet_metadata()->schema();
SharedScanner scanner;
@@ -2336,7 +2460,7 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
// Create the user's requested projection.
// TODO(todd): Add test cases for bad projections including 0 columns.
Schema projection;
- Status s = ColumnPBsToSchema(scan_pb.projected_columns(), &projection);
+ s = ColumnPBsToSchema(scan_pb.projected_columns(), &projection);
if (PREDICT_FALSE(!s.ok())) {
*error_code = TabletServerErrorPB::INVALID_SCHEMA;
return s;
@@ -2614,9 +2738,6 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
"--scanner_inject_service_unavailable_on_continue_scan");
}
- // Set the row format flags on the ScanResultCollector.
- result_collector->set_row_format_flags(scanner->row_format_flags());
-
// If we early-exit out of this function, automatically unregister the scanner.
ScopedUnregisterScanner unreg_scanner(server_->scanner_manager(), scanner->id());
ScopedAddScannerTiming scanner_timer(scanner.get(), result_collector->cpu_times());
@@ -2625,6 +2746,13 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
<< SecureShortDebugString(*req);
TRACE("Found scanner $0 for tablet $1", scanner->id(), scanner->tablet_id());
+ // Set the row format flags on the ScanResultCollector.
+ s = result_collector->InitSerializer(scanner->row_format_flags());
+ if (!s.ok()) {
+ *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
+ return s;
+ }
+
if (batch_size_bytes == 0 && req->close_scanner()) {
*has_more_results = false;
return Status::OK();
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index f2ab6d4..9f6de3d 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -224,6 +224,10 @@ message ColumnRangePredicateListPB {
enum RowFormatFlags {
NO_FLAGS = 0;
PAD_UNIX_TIME_MICROS_TO_16_BYTES = 1;
+
+ // Return a ColumnarRowBlockPB instead of RowwiseRowBlockPB.
+ // Incompatible with PAD_UNIX_TIME_MICROS_TO_16_BYTES.
+ COLUMNAR_LAYOUT = 2;
}
message NewScanRequestPB {
@@ -405,6 +409,8 @@ message ScanResponsePB {
// The schema will match the schema requested by the client when it created
// the scanner.
optional RowwiseRowBlockPB data = 4;
+ // Set instead of 'data' if COLUMNAR_LAYOUT is passed.
+ optional ColumnarRowBlockPB columnar_data = 5;
// The snapshot timestamp at which the scan was executed. This is only set
// in the first response (i.e. the response to the request that had
@@ -472,4 +478,6 @@ enum TabletServerFeatures {
PAD_UNIXTIME_MICROS_TO_16_BYTES = 2;
QUIESCING = 3;
BLOOM_FILTER_PREDICATE = 4;
+ // Whether the server supports the COLUMNAR_LAYOUT format flag.
+ COLUMNAR_LAYOUT_FEATURE = 5;
}