You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2020/04/01 22:24:19 UTC
[kudu] 02/02: client: add support for columnar format scan
This is an automated email from the ASF dual-hosted git repository.
todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit fa2d4c756646b3f8428d2931113e8dfc3bc5af79
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Thu Mar 26 22:28:06 2020 -0700
client: add support for columnar format scan
Change-Id: I69ec4487c78b872e7b9eba5facdd44cfd6b8f4fa
Reviewed-on: http://gerrit.cloudera.org:8080/15622
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/client/CMakeLists.txt | 2 +
src/kudu/client/client-test.cc | 45 ++++++++++
src/kudu/client/client.cc | 41 +++++----
src/kudu/client/client.h | 29 +++++++
src/kudu/client/columnar_scan_batch.cc | 55 ++++++++++++
src/kudu/client/columnar_scan_batch.h | 90 ++++++++++++++++++++
src/kudu/client/scanner-internal.cc | 121 +++++++++++++++++++++++++--
src/kudu/client/scanner-internal.h | 59 ++++++++++++-
src/kudu/tools/tool_action_remote_replica.cc | 2 +-
9 files changed, 421 insertions(+), 23 deletions(-)
diff --git a/src/kudu/client/CMakeLists.txt b/src/kudu/client/CMakeLists.txt
index ffd536a..f456757 100644
--- a/src/kudu/client/CMakeLists.txt
+++ b/src/kudu/client/CMakeLists.txt
@@ -34,6 +34,7 @@ set(CLIENT_SRCS
client.cc
client_builder-internal.cc
client-internal.cc
+ columnar_scan_batch.cc
error_collector.cc
error-internal.cc
hash.cc
@@ -177,6 +178,7 @@ install(TARGETS kudu_client_exported
install(FILES
callbacks.h
client.h
+ columnar_scan_batch.h
hash.h
row_result.h
scan_batch.h
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index f8bca5d..668c9d3 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -51,6 +51,7 @@
#include "kudu/client/client-internal.h"
#include "kudu/client/client-test-util.h"
#include "kudu/client/client.pb.h"
+#include "kudu/client/columnar_scan_batch.h"
#include "kudu/client/error_collector.h"
#include "kudu/client/meta_cache.h"
#include "kudu/client/resource_metrics.h"
@@ -104,6 +105,7 @@
#include "kudu/tserver/tablet_server_options.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/array_view.h"
#include "kudu/util/async_util.h"
#include "kudu/util/barrier.h"
#include "kudu/util/countdown_latch.h"
@@ -1051,6 +1053,49 @@ TEST_F(ClientTest, TestScanAtFutureTimestamp) {
ASSERT_STR_CONTAINS(s.ToString(), "in the future.");
}
+TEST_F(ClientTest, TestColumnarScan) {
+ // Set the batch size such that a full scan could yield either multi-batch
+ // or single-batch scans.
+ int64_t num_rows = rand() % 2000;
+ int64_t batch_size = rand() % 1000;
+ FLAGS_scanner_batch_size_rows = batch_size;
+
+ NO_FATALS(InsertTestRows(client_table_.get(), num_rows));
+ KuduScanner scanner(client_table_.get());
+ ASSERT_OK(scanner.SetRowFormatFlags(KuduScanner::COLUMNAR_LAYOUT));
+
+ ASSERT_OK(scanner.Open());
+ KuduColumnarScanBatch batch;
+ int total_rows = 0;
+ while (scanner.HasMoreRows()) {
+ ASSERT_OK(scanner.NextBatch(&batch));
+
+ // Verify the data.
+ Slice col_data[4];
+ for (int i = 0; i < 4; i++) {
+ ASSERT_OK(batch.GetDataForColumn(i, &col_data[i]));
+ }
+ ArrayView<const int32_t> c0(reinterpret_cast<const int32_t*>(col_data[0].data()),
+ batch.NumRows());
+ ArrayView<const int32_t> c1(reinterpret_cast<const int32_t*>(col_data[1].data()),
+ batch.NumRows());
+ ArrayView<const Slice> c2(reinterpret_cast<const Slice*>(col_data[2].data()),
+ batch.NumRows());
+ ArrayView<const int32_t> c3(reinterpret_cast<const int32_t*>(col_data[3].data()),
+ batch.NumRows());
+
+ for (int i = 0; i < batch.NumRows(); i++) {
+ int row_idx = total_rows + i;
+ EXPECT_EQ(row_idx, c0[i]);
+ EXPECT_EQ(row_idx * 2, c1[i]);
+ EXPECT_EQ(Substitute("hello $0", row_idx), c2[i]);
+ EXPECT_EQ(row_idx * 3, c3[i]);
+ }
+ total_rows += batch.NumRows();
+ }
+ ASSERT_EQ(num_rows, total_rows);
+}
+
const KuduScanner::ReadMode read_modes[] = {
KuduScanner::READ_LATEST,
KuduScanner::READ_AT_SNAPSHOT,
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 571844a..968537b 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -37,6 +37,7 @@
#include "kudu/client/client-internal.h"
#include "kudu/client/client.pb.h"
#include "kudu/client/client_builder-internal.h"
+#include "kudu/client/columnar_scan_batch.h"
#include "kudu/client/error-internal.h"
#include "kudu/client/error_collector.h"
#include "kudu/client/master_proxy_rpc.h"
@@ -1569,6 +1570,7 @@ Status KuduScanner::SetRowFormatFlags(uint64_t flags) {
switch (flags) {
case NO_FLAGS:
case PAD_UNIXTIME_MICROS_TO_16_BYTES:
+ case COLUMNAR_LAYOUT:
break;
default:
return Status::InvalidArgument(Substitute("Invalid row format flags: $0", flags));
@@ -1711,6 +1713,15 @@ Status KuduScanner::NextBatch(vector<KuduRowResult>* rows) {
}
Status KuduScanner::NextBatch(KuduScanBatch* batch) {
+ return NextBatch(batch->data_);
+}
+
+Status KuduScanner::NextBatch(KuduColumnarScanBatch* batch) {
+ return NextBatch(batch->data_);
+}
+
+Status KuduScanner::NextBatch(internal::ScanBatchDataInterface* batch_data) {
+
// TODO: do some double-buffering here -- when we return this batch
// we should already have fired off the RPC for the next batch, but
// need to do some swapping of the response objects around to avoid
@@ -1718,7 +1729,7 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
CHECK(data_->open_);
CHECK(data_->proxy_);
- batch->data_->Clear();
+ batch_data->Clear();
if (data_->short_circuit_) {
return Status::OK();
@@ -1728,12 +1739,11 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
// We have data from a previous scan.
VLOG(2) << "Extracting data from " << data_->DebugString();
data_->data_in_open_ = false;
- return batch->data_->Reset(&data_->controller_,
- data_->configuration().projection(),
- data_->configuration().client_projection(),
- data_->configuration().row_format_flags(),
- unique_ptr<RowwiseRowBlockPB>(
- data_->last_response_.release_data()));
+ return batch_data->Reset(&data_->controller_,
+ data_->configuration().projection(),
+ data_->configuration().client_projection(),
+ data_->configuration().row_format_flags(),
+ &data_->last_response_);
}
if (data_->last_response_.has_more_results()) {
@@ -1753,12 +1763,11 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
data_->last_primary_key_ = data_->last_response_.last_primary_key();
}
data_->scan_attempts_ = 0;
- return batch->data_->Reset(&data_->controller_,
- data_->configuration().projection(),
- data_->configuration().client_projection(),
- data_->configuration().row_format_flags(),
- unique_ptr<RowwiseRowBlockPB>(
- data_->last_response_.release_data()));
+ return batch_data->Reset(&data_->controller_,
+ data_->configuration().projection(),
+ data_->configuration().client_projection(),
+ data_->configuration().row_format_flags(),
+ &data_->last_response_);
}
data_->scan_attempts_++;
@@ -1797,7 +1806,11 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
set<string> blacklist;
RETURN_NOT_OK(data_->OpenNextTablet(deadline, &blacklist));
- // No rows written, the next invocation will pick them up.
+ if (data_->data_in_open_) {
+ // Avoid returning an empty batch in between tablets if we have data
+ // we can return from this call.
+ return NextBatch(batch_data);
+ }
return Status::OK();
} else {
// No more data anywhere.
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 5c59ac7..f8d16d8 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -73,6 +73,7 @@ class RemoteKsckCluster;
namespace client {
+class KuduColumnarScanBatch;
class KuduDelete;
class KuduInsert;
class KuduInsertIgnore;
@@ -102,6 +103,7 @@ class RemoteTablet;
class RemoteTabletServer;
class ReplicaController;
class RetrieveAuthzTokenRpc;
+class ScanBatchDataInterface;
class WriteRpc;
template <class ReqClass, class RespClass>
class AsyncLeaderMasterRpc; // IWYU pragma: keep
@@ -2255,6 +2257,9 @@ class KUDU_EXPORT KuduScanner {
/// Fetch the next batch of results for this scanner.
///
+ /// This variant may not be used when the scan is configured with the
+ /// COLUMNAR_LAYOUT RowFormatFlag.
+ ///
/// A single KuduScanBatch object may be reused. Each subsequent call
/// replaces the data from the previous call, and invalidates any
/// KuduScanBatch::RowPtr objects previously obtained from the batch.
@@ -2263,6 +2268,19 @@ class KUDU_EXPORT KuduScanner {
/// @return Operation result status.
Status NextBatch(KuduScanBatch* batch);
+ /// Fetch the next batch of columnar results for this scanner.
+ ///
+ /// This variant may only be used when the scan is configured with the
+ /// COLUMNAR_LAYOUT RowFormatFlag.
+ ///
+ /// A single KuduColumnarScanBatch object may be reused. Each subsequent call
+ /// replaces the data from the previous call, and invalidates any
+ /// Slice objects previously obtained from the batch.
+ /// @param [out] batch
+ /// Placeholder for the result.
+ /// @return Operation result status.
+ Status NextBatch(KuduColumnarScanBatch* batch);
+
/// Get the KuduTabletServer that is currently handling the scan.
///
/// More concretely, this is the server that handled the most recent
@@ -2389,6 +2407,15 @@ class KUDU_EXPORT KuduScanner {
/// results and might even cause the client to crash.
static const uint64_t PAD_UNIXTIME_MICROS_TO_16_BYTES = 1 << 0;
+ /// Enable column-oriented data transfer. The server will transfer data to the client
+ /// in a columnar format rather than a row-wise format. The KuduColumnarScanBatch API
+ /// must be used to fetch results from this scan.
+ ///
+ /// NOTE: older versions of the Kudu server do not support this feature. Clients
+ /// aiming to support compatibility with previous versions should have a fallback
+ /// code path.
+ static const uint64_t COLUMNAR_LAYOUT = 1 << 1;
+
/// Optionally set row format modifier flags.
///
/// If flags is RowFormatFlags::NO_FLAGS, then no modifications will be made to the row
@@ -2436,6 +2463,8 @@ class KUDU_EXPORT KuduScanner {
private:
class KUDU_NO_EXPORT Data;
+ Status NextBatch(internal::ScanBatchDataInterface* batch);
+
friend class KuduScanToken;
FRIEND_TEST(ClientTest, TestBlockScannerHijackingAttempts);
FRIEND_TEST(ClientTest, TestScanCloseProxy);
diff --git a/src/kudu/client/columnar_scan_batch.cc b/src/kudu/client/columnar_scan_batch.cc
new file mode 100644
index 0000000..b346445
--- /dev/null
+++ b/src/kudu/client/columnar_scan_batch.cc
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/client/columnar_scan_batch.h"
+
+#include "kudu/client/scanner-internal.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/rpc/rpc_controller.h"
+
+namespace kudu {
+class Slice;
+
+namespace client {
+
+KuduColumnarScanBatch::KuduColumnarScanBatch()
+ : data_(new KuduColumnarScanBatch::Data()) {
+}
+
+KuduColumnarScanBatch::~KuduColumnarScanBatch() {
+ delete data_;
+}
+
+int KuduColumnarScanBatch::NumRows() const {
+ return data_->resp_data_.num_rows();
+}
+
+Status KuduColumnarScanBatch::GetDataForColumn(int idx, Slice* data) const {
+ return data_->GetDataForColumn(idx, data);
+}
+
+Status KuduColumnarScanBatch::GetNonNullBitmapForColumn(int idx, Slice* data) const {
+ RETURN_NOT_OK(data_->CheckColumnIndex(idx));
+ const auto& col = data_->resp_data_.columns(idx);
+ if (!col.has_non_null_bitmap_sidecar()) {
+ return Status::NotFound("column is not nullable");
+ }
+ return data_->controller_.GetInboundSidecar(col.non_null_bitmap_sidecar(), data);
+}
+
+} // namespace client
+} // namespace kudu
diff --git a/src/kudu/client/columnar_scan_batch.h b/src/kudu/client/columnar_scan_batch.h
new file mode 100644
index 0000000..23a3e7d
--- /dev/null
+++ b/src/kudu/client/columnar_scan_batch.h
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_CLIENT_COLUMNAR_SCAN_BATCH_H
+#define KUDU_CLIENT_COLUMNAR_SCAN_BATCH_H
+
+#ifdef KUDU_HEADERS_NO_STUBS
+#include "kudu/gutil/macros.h"
+#else
+#include "kudu/client/stubs.h"
+#endif
+
+#include "kudu/util/kudu_export.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+class Slice;
+
+namespace client {
+
+/// @brief A batch of columnar data returned from a scanner
+///
+/// Similar to KuduScanBatch, this contains a batch of rows returned from a scanner.
+/// This type of batch is used if the KuduScanner::COLUMNAR_LAYOUT row format flag
+/// is enabled.
+///
+/// Retrieving rows in columnar layout can be significantly more efficient. It saves
+/// some CPU cycles on the Kudu cluster and can also enable faster processing of the
+/// returned data in certain client applications.
+///
+/// NOTE: this class is not thread-safe.
+class KUDU_EXPORT KuduColumnarScanBatch {
+ public:
+ KuduColumnarScanBatch();
+ ~KuduColumnarScanBatch();
+
+ /// @return The number of rows in this batch.
+ int NumRows() const;
+
+ /// Get the raw columnar data corresponding to the column with index 'idx'.
+ ///
+ /// The data is in little-endian packed array format. No alignment is guaranteed.
+ /// Space is reserved for all cells regardless of whether they might be null.
+ /// The data stored in a null cell may or may not be zeroed.
+ ///
+ /// If this returns an error for a given column, then a second call for the same
+ /// column has undefined results.
+ ///
+ /// @note The Slice returned is only valid for the lifetime of the KuduColumnarScanBatch.
+ Status GetDataForColumn(int idx, Slice* data) const;
+
+ /// Get a bitmap corresponding to the non-null status of the cells in the given column.
+ ///
+ /// A set bit indicates a non-null cell.
+ /// If the number of rows is not a multiple of 8, the state of the trailing bits in the
+ /// bitmap is undefined.
+ ///
+ /// It is an error to call this function on a column which is not marked as nullable
+ /// in the schema.
+ ///
+ /// @note The Slice returned is only valid for the lifetime of the KuduColumnarScanBatch.
+ Status GetNonNullBitmapForColumn(int idx, Slice* data) const;
+
+ private:
+ class KUDU_NO_EXPORT Data;
+
+ friend class KuduScanner;
+
+ Data* data_;
+ DISALLOW_COPY_AND_ASSIGN(KuduColumnarScanBatch);
+};
+
+
+} // namespace client
+} // namespace kudu
+
+#endif
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index 9cb14aa..93006b3 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -38,6 +38,7 @@
#include "kudu/common/partition.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
+#include "kudu/common/types.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
@@ -49,11 +50,13 @@
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/security/token.pb.h"
#include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/util/array_view.h"
#include "kudu/util/async_util.h"
#include "kudu/util/bitmap.h"
#include "kudu/util/hexdump.h"
#include "kudu/util/logging.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/safe_math.h"
using google::protobuf::FieldDescriptor;
using google::protobuf::Reflection;
@@ -70,6 +73,8 @@ using rpc::RpcController;
using security::SignedTokenPB;
using strings::Substitute;
using tserver::NewScanRequestPB;
+using tserver::RowFormatFlags;
+using tserver::ScanResponsePB;
using tserver::TabletServerFeatures;
namespace client {
@@ -367,6 +372,10 @@ ScanRpcStatus KuduScanner::Data::SendScanRpc(const MonoTime& overall_deadline,
if (configuration().row_format_flags() & KuduScanner::PAD_UNIXTIME_MICROS_TO_16_BYTES) {
controller_.RequireServerFeature(TabletServerFeatures::PAD_UNIXTIME_MICROS_TO_16_BYTES);
}
+ if (configuration().row_format_flags() & KuduScanner::COLUMNAR_LAYOUT) {
+ controller_.RequireServerFeature(TabletServerFeatures::COLUMNAR_LAYOUT_FEATURE);
+ }
+
if (next_req_.has_new_scan_request()) {
// Only new scan requests require authz tokens. Scan continuations rely on
// Kudu's prevention of scanner hijacking by different users.
@@ -386,7 +395,9 @@ ScanRpcStatus KuduScanner::Data::SendScanRpc(const MonoTime& overall_deadline,
rpc_deadline, overall_deadline);
if (scan_status.result == ScanRpcStatus::OK) {
UpdateResourceMetrics();
- num_rows_returned_ += last_response_.data().num_rows();
+ num_rows_returned_ += last_response_.has_data() ? last_response_.data().num_rows() : 0;
+ num_rows_returned_ += last_response_.has_columnar_data() ?
+ last_response_.columnar_data().num_rows() : 0;
}
return scan_status;
}
@@ -561,13 +572,15 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
partition_pruner_.RemovePartitionKeyRange(remote_->partition().partition_key_end());
next_req_.clear_new_scan_request();
- data_in_open_ = last_response_.has_data() && last_response_.data().num_rows() > 0;
+ data_in_open_ = (last_response_.has_data() && last_response_.data().num_rows() > 0) ||
+ (last_response_.has_columnar_data() && last_response_.columnar_data().num_rows() > 0);
if (last_response_.has_more_results()) {
next_req_.set_scanner_id(last_response_.scanner_id());
VLOG(2) << "Opened tablet " << remote_->tablet_id()
<< ", scanner ID " << last_response_.scanner_id();
- } else if (last_response_.has_data()) {
- VLOG(2) << "Opened tablet " << remote_->tablet_id() << ", no scanner ID assigned";
+ } else if (last_response_.has_data() || last_response_.has_columnar_data()) {
+ VLOG(2) << "Opened tablet " << remote_->tablet_id() << ", no scanner ID assigned, "
+ << " data_in_open=" << data_in_open_;
} else {
VLOG(2) << "Opened tablet " << remote_->tablet_id() << " (no rows), no scanner ID assigned";
}
@@ -670,13 +683,19 @@ Status KuduScanBatch::Data::Reset(RpcController* controller,
const Schema* projection,
const KuduSchema* client_projection,
uint64_t row_format_flags,
- unique_ptr<RowwiseRowBlockPB> resp_data) {
+ ScanResponsePB* response) {
CHECK(controller->finished());
+ if (row_format_flags & RowFormatFlags::COLUMNAR_LAYOUT) {
+ return Status::InvalidArgument("columnar layout specified, must use KuduColumnarScanBatch");
+ }
+
controller_.Swap(controller);
projection_ = projection;
projected_row_size_ = CalculateProjectedRowSize(*projection_);
client_projection_ = client_projection;
row_format_flags_ = row_format_flags;
+ unique_ptr<RowwiseRowBlockPB> resp_data(response->release_data());
+
if (!resp_data) {
// No new data; just clear out the old stuff.
resp_data_.Clear();
@@ -748,5 +767,97 @@ void KuduScanBatch::Data::Clear() {
controller_.Reset();
}
+////////////////////////////////////////////////////////////
+// KuduColumnarScanBatch
+////////////////////////////////////////////////////////////
+
+Status KuduColumnarScanBatch::Data::Reset(
+ rpc::RpcController* controller,
+ const Schema* projection,
+ const KuduSchema* client_projection,
+ uint64_t row_format_flags,
+ tserver::ScanResponsePB* response) {
+ if (!(row_format_flags & RowFormatFlags::COLUMNAR_LAYOUT)) {
+ return Status::InvalidArgument("rowwise layout specified, must use KuduScanBatch");
+ }
+ CHECK(!response->has_data()) << "expected columnar data";
+
+ CHECK(controller->finished());
+ controller_.Swap(controller);
+ projection_ = projection;
+ client_projection_ = client_projection;
+ rewritten_varlen_columns_.clear();
+
+ unique_ptr<ColumnarRowBlockPB> resp_data(response->release_columnar_data());
+ if (!resp_data) {
+ // No new data; just clear out the old stuff.
+ resp_data_.Clear();
+ return Status::OK();
+ }
+ resp_data_ = std::move(*resp_data);
+ return Status::OK();
+}
+
+void KuduColumnarScanBatch::Data::Clear() {
+ resp_data_.Clear();
+ controller_.Reset();
+}
+
+Status KuduColumnarScanBatch::Data::CheckColumnIndex(int idx) const {
+ if (idx >= resp_data_.columns_size()) {
+ return Status::InvalidArgument(Substitute("bad column index $0 ($1 columns present)",
+ idx, resp_data_.columns_size()));
+ }
+ return Status::OK();
+}
+
+Status KuduColumnarScanBatch::Data::GetDataForColumn(int idx, Slice* data) const {
+ RETURN_NOT_OK(CheckColumnIndex(idx));
+ RETURN_NOT_OK(controller_.GetInboundSidecar(
+ resp_data_.columns(idx).data_sidecar(),
+ data));
+
+ // Rewrite slices to be real pointers instead of pointers relative to the
+ // indirect data buffer.
+ if (projection_->column(idx).type_info()->physical_type() == BINARY &&
+ !ContainsKey(rewritten_varlen_columns_, idx)) {
+
+ Slice indirect_data_slice;
+ RETURN_NOT_OK(controller_.GetInboundSidecar(
+ resp_data_.columns(idx).indirect_data_sidecar(),
+ &indirect_data_slice));
+
+ ArrayView<Slice> v(reinterpret_cast<Slice*>(data->mutable_data()),
+ data->size() / sizeof(Slice));
+ for (int row_idx = 0; row_idx < v.size(); row_idx++) {
+ Slice* slice = &v[row_idx];
+ size_t offset_in_indirect = reinterpret_cast<uintptr_t>(slice->data());
+ // Ensure the updated pointer is within the bounds of the indirect data.
+ bool overflowed = false;
+ size_t max_offset = AddWithOverflowCheck(offset_in_indirect, slice->size(), &overflowed);
+ if (PREDICT_FALSE(overflowed || max_offset > indirect_data_slice.size())) {
+ const auto& col = projection_->column(idx);
+ return Status::Corruption(
+ Substitute("Row #$0 contained bad indirect slice for column $1: ($2, $3)",
+ row_idx, col.ToString(), offset_in_indirect, slice->size()));
+ }
+ *slice = Slice(&indirect_data_slice[offset_in_indirect], slice->size());
+ }
+ rewritten_varlen_columns_.insert(idx);
+ }
+ return Status::OK();
+}
+
+Status KuduColumnarScanBatch::Data::GetNonNullBitmapForColumn(int idx, Slice* data) const {
+ RETURN_NOT_OK(CheckColumnIndex(idx));
+ const auto& col = resp_data_.columns(idx);
+ if (!col.has_non_null_bitmap_sidecar()) {
+ return Status::NotFound("column is not nullable");
+ }
+ return controller_.GetInboundSidecar(col.non_null_bitmap_sidecar(), data);
+}
+
+
+
} // namespace client
} // namespace kudu
diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h
index f0d07a0..01da884 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -23,11 +23,13 @@
#include <ostream>
#include <set>
#include <string>
+#include <unordered_set>
#include <vector>
#include <glog/logging.h>
#include "kudu/client/client.h"
+#include "kudu/client/columnar_scan_batch.h"
#include "kudu/client/resource_metrics.h"
#include "kudu/client/row_result.h"
#include "kudu/client/scan_batch.h"
@@ -294,7 +296,20 @@ class KuduScanner::Data {
DISALLOW_COPY_AND_ASSIGN(Data);
};
-class KuduScanBatch::Data {
+namespace internal {
+class ScanBatchDataInterface {
+ public:
+ virtual ~ScanBatchDataInterface() = default;
+ virtual Status Reset(rpc::RpcController* controller,
+ const Schema* projection,
+ const KuduSchema* client_projection,
+ uint64_t row_format_flags,
+ tserver::ScanResponsePB* response) = 0;
+ virtual void Clear() = 0;
+};
+} // namespace internal
+
+class KuduScanBatch::Data : public internal::ScanBatchDataInterface {
public:
Data();
~Data();
@@ -303,7 +318,7 @@ class KuduScanBatch::Data {
const Schema* projection,
const KuduSchema* client_projection,
uint64_t row_format_flags,
- std::unique_ptr<RowwiseRowBlockPB> resp_data);
+ tserver::ScanResponsePB* response) override;
int num_rows() const {
return resp_data_.num_rows();
@@ -324,7 +339,7 @@ class KuduScanBatch::Data {
void ExtractRows(std::vector<KuduScanBatch::RowPtr>* rows);
- void Clear();
+ void Clear() override;
// Returns the size of a row for the given projection 'proj'.
static size_t CalculateProjectedRowSize(const Schema& proj);
@@ -354,6 +369,44 @@ class KuduScanBatch::Data {
size_t projected_row_size_;
};
+class KuduColumnarScanBatch::Data : public internal::ScanBatchDataInterface {
+ public:
+ Status Reset(rpc::RpcController* controller,
+ const Schema* projection,
+ const KuduSchema* client_projection,
+ uint64_t row_format_flags,
+ tserver::ScanResponsePB* response) override;
+ void Clear() override;
+
+ Status GetDataForColumn(int idx, Slice* data) const;
+ Status GetNonNullBitmapForColumn(int idx, Slice* data) const;
+
+ private:
+ Status CheckColumnIndex(int idx) const;
+
+ friend class KuduColumnarScanBatch;
+
+ // The RPC controller for the RPC which returned this batch.
+ // Holding on to the controller ensures we hold on to the
+ // sidecars which contain the actual data.
+ rpc::RpcController controller_;
+
+ // The PB which contains the "direct data" slice.
+ ColumnarRowBlockPB resp_data_;
+
+ // Tracks for each variable-length (binary) column whether the pointers have been
+ // rewritten yet to be "real" pointers instead of sidecar-relative offsets.
+ // Mutable since the 'GetDataForColumn' call is semantically const, but in fact
+ // needs to modify this member to do the lazy pointer rewrites.
+ mutable std::unordered_set<int> rewritten_varlen_columns_;
+
+ // The projection being scanned.
+ const Schema* projection_;
+ // The KuduSchema version of 'projection_'
+ const KuduSchema* client_projection_;
+};
+
+
} // namespace client
} // namespace kudu
diff --git a/src/kudu/tools/tool_action_remote_replica.cc b/src/kudu/tools/tool_action_remote_replica.cc
index 4a30000..44c2de6 100644
--- a/src/kudu/tools/tool_action_remote_replica.cc
+++ b/src/kudu/tools/tool_action_remote_replica.cc
@@ -155,7 +155,7 @@ class ReplicaDumper {
&schema,
&client_schema,
client::KuduScanner::NO_FLAGS,
- unique_ptr<RowwiseRowBlockPB>(resp.release_data())));
+ &resp));
vector<KuduRowResult> rows;
results.ExtractRows(&rows);
for (const auto& r : rows) {