You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2016/04/14 20:46:00 UTC
incubator-kudu git commit: [c++-client] abstract scanner configuration
Repository: incubator-kudu
Updated Branches:
refs/heads/master ce11938a2 -> 8a3e461f8
[c++-client] abstract scanner configuration
Unlike the Java client, the c++ client configures scan options directly on the
KuduScanner. This commit adds an internal class, ScanConfiguration, that takes
over responsibility for configuring scans from the KuduScanner. KuduScanner is
updated to hold a ScanConfiguration internally. The ScanConfiguration will be
used internally in the forthcoming scan token builder API.
Change-Id: Id3d5b27f4a0ae6cd1500f17f96b7043590affd92
Reviewed-on: http://gerrit.cloudera.org:8080/2726
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/8a3e461f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/8a3e461f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/8a3e461f
Branch: refs/heads/master
Commit: 8a3e461f8619dda6f2f85135bb94143e9ae2926f
Parents: ce11938
Author: Dan Burkert <da...@cloudera.com>
Authored: Wed Apr 6 11:17:35 2016 -0700
Committer: Dan Burkert <da...@cloudera.com>
Committed: Thu Apr 14 18:45:34 2016 +0000
----------------------------------------------------------------------
src/kudu/client/CMakeLists.txt | 3 +-
src/kudu/client/client.cc | 135 ++++++----------------
src/kudu/client/scan_configuration.cc | 180 +++++++++++++++++++++++++++++
src/kudu/client/scan_configuration.h | 169 +++++++++++++++++++++++++++
src/kudu/client/scan_predicate.h | 4 +-
src/kudu/client/scanner-internal.cc | 64 ++++------
src/kudu/client/scanner-internal.h | 43 ++-----
src/kudu/client/schema.h | 1 +
8 files changed, 428 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8a3e461f/src/kudu/client/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/client/CMakeLists.txt b/src/kudu/client/CMakeLists.txt
index c3f909a..48625b2 100644
--- a/src/kudu/client/CMakeLists.txt
+++ b/src/kudu/client/CMakeLists.txt
@@ -24,10 +24,11 @@ set(CLIENT_SRCS
error-internal.cc
meta_cache.cc
scan_batch.cc
+ scan_configuration.cc
scan_predicate.cc
scanner-internal.cc
- session-internal.cc
schema.cc
+ session-internal.cc
table-internal.cc
table_alterer-internal.cc
table_creator-internal.cc
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8a3e461f/src/kudu/client/client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index b9a2b63..fb614ba 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -103,7 +103,6 @@ using internal::ErrorCollector;
using internal::MetaCache;
using sp::shared_ptr;
-static const int kHtTimestampBitsToShift = 12;
static const char* kProgName = "kudu_client";
// We need to reroute all logging to stderr when the client library is
@@ -852,48 +851,18 @@ Status KuduScanner::SetProjectedColumnNames(const vector<string>& col_names) {
if (data_->open_) {
return Status::IllegalState("Projection must be set before Open()");
}
-
- const Schema* table_schema = data_->table_->schema().schema_;
- vector<int> col_indexes;
- col_indexes.reserve(col_names.size());
- for (const string& col_name : col_names) {
- int idx = table_schema->find_column(col_name);
- if (idx == Schema::kColumnNotFound) {
- return Status::NotFound(strings::Substitute("Column: \"$0\" was not found in the "
- "table schema.", col_name));
- }
- col_indexes.push_back(idx);
- }
-
- return SetProjectedColumnIndexes(col_indexes);
+ return data_->mutable_configuration()->SetProjectedColumnNames(col_names);
}
Status KuduScanner::SetProjectedColumnIndexes(const vector<int>& col_indexes) {
if (data_->open_) {
return Status::IllegalState("Projection must be set before Open()");
}
-
- const Schema* table_schema = data_->table_->schema().schema_;
- vector<ColumnSchema> cols;
- cols.reserve(col_indexes.size());
- for (const int col_index : col_indexes) {
- if (col_index >= table_schema->columns().size()) {
- return Status::NotFound(strings::Substitute("Column: \"$0\" was not found in the "
- "table schema.", col_index));
- }
- cols.push_back(table_schema->column(col_index));
- }
-
- gscoped_ptr<Schema> s(new Schema());
- RETURN_NOT_OK(s->Reset(cols, 0));
- data_->SetProjectionSchema(data_->pool_.Add(s.release()));
- return Status::OK();
+ return data_->mutable_configuration()->SetProjectedColumnIndexes(col_indexes);
}
Status KuduScanner::SetBatchSizeBytes(uint32_t batch_size) {
- data_->has_batch_size_bytes_ = true;
- data_->batch_size_bytes_ = batch_size;
- return Status::OK();
+ return data_->mutable_configuration()->SetBatchSizeBytes(batch_size);
}
Status KuduScanner::SetReadMode(ReadMode read_mode) {
@@ -903,8 +872,7 @@ Status KuduScanner::SetReadMode(ReadMode read_mode) {
if (!tight_enum_test<ReadMode>(read_mode)) {
return Status::InvalidArgument("Bad read mode");
}
- data_->read_mode_ = read_mode;
- return Status::OK();
+ return data_->mutable_configuration()->SetReadMode(read_mode);
}
Status KuduScanner::SetOrderMode(OrderMode order_mode) {
@@ -914,26 +882,21 @@ Status KuduScanner::SetOrderMode(OrderMode order_mode) {
if (!tight_enum_test<OrderMode>(order_mode)) {
return Status::InvalidArgument("Bad order mode");
}
- data_->is_fault_tolerant_ = order_mode == ORDERED;
- return Status::OK();
+ return data_->mutable_configuration()->SetFaultTolerant(order_mode == ORDERED);
}
Status KuduScanner::SetFaultTolerant() {
if (data_->open_) {
return Status::IllegalState("Fault-tolerance must be set before Open()");
}
- RETURN_NOT_OK(SetReadMode(READ_AT_SNAPSHOT));
- data_->is_fault_tolerant_ = true;
- return Status::OK();
+ return data_->mutable_configuration()->SetFaultTolerant(true);
}
Status KuduScanner::SetSnapshotMicros(uint64_t snapshot_timestamp_micros) {
if (data_->open_) {
return Status::IllegalState("Snapshot timestamp must be set before Open()");
}
- // Shift the HT timestamp bits to get well-formed HT timestamp with the logical
- // bits zeroed out.
- data_->snapshot_timestamp_ = snapshot_timestamp_micros << kHtTimestampBitsToShift;
+ data_->mutable_configuration()->SetSnapshotMicros(snapshot_timestamp_micros);
return Status::OK();
}
@@ -941,7 +904,7 @@ Status KuduScanner::SetSnapshotRaw(uint64_t snapshot_timestamp) {
if (data_->open_) {
return Status::IllegalState("Snapshot timestamp must be set before Open()");
}
- data_->snapshot_timestamp_ = snapshot_timestamp;
+ data_->mutable_configuration()->SetSnapshotRaw(snapshot_timestamp);
return Status::OK();
}
@@ -949,83 +912,59 @@ Status KuduScanner::SetSelection(KuduClient::ReplicaSelection selection) {
if (data_->open_) {
return Status::IllegalState("Replica selection must be set before Open()");
}
- data_->selection_ = selection;
- return Status::OK();
+ return data_->mutable_configuration()->SetSelection(selection);
}
Status KuduScanner::SetTimeoutMillis(int millis) {
if (data_->open_) {
return Status::IllegalState("Timeout must be set before Open()");
}
- data_->timeout_ = MonoDelta::FromMilliseconds(millis);
+ data_->mutable_configuration()->SetTimeoutMillis(millis);
return Status::OK();
}
Status KuduScanner::AddConjunctPredicate(KuduPredicate* pred) {
- // Take ownership even if we return a bad status.
- data_->pool_.Add(pred);
if (data_->open_) {
+ // Take ownership even if we return a bad status.
+ delete pred;
return Status::IllegalState("Predicate must be set before Open()");
}
- return pred->data_->AddToScanSpec(&data_->spec_, &data_->arena_);
+ return data_->mutable_configuration()->AddConjunctPredicate(pred);
}
Status KuduScanner::AddLowerBound(const KuduPartialRow& key) {
- gscoped_ptr<string> enc(new string());
- RETURN_NOT_OK(key.EncodeRowKey(enc.get()));
- RETURN_NOT_OK(AddLowerBoundRaw(Slice(*enc)));
- data_->pool_.Add(enc.release());
- return Status::OK();
+ return data_->mutable_configuration()->AddLowerBound(key);
}
Status KuduScanner::AddLowerBoundRaw(const Slice& key) {
- // Make a copy of the key.
- gscoped_ptr<EncodedKey> enc_key;
- RETURN_NOT_OK(EncodedKey::DecodeEncodedString(
- *data_->table_->schema().schema_, &data_->arena_, key, &enc_key));
- data_->spec_.SetLowerBoundKey(enc_key.get());
- data_->pool_.Add(enc_key.release());
- return Status::OK();
+ return data_->mutable_configuration()->AddLowerBoundRaw(key);
}
Status KuduScanner::AddExclusiveUpperBound(const KuduPartialRow& key) {
- gscoped_ptr<string> enc(new string());
- RETURN_NOT_OK(key.EncodeRowKey(enc.get()));
- RETURN_NOT_OK(AddExclusiveUpperBoundRaw(Slice(*enc)));
- data_->pool_.Add(enc.release());
- return Status::OK();
+ return data_->mutable_configuration()->AddUpperBound(key);
}
Status KuduScanner::AddExclusiveUpperBoundRaw(const Slice& key) {
- // Make a copy of the key.
- gscoped_ptr<EncodedKey> enc_key;
- RETURN_NOT_OK(EncodedKey::DecodeEncodedString(
- *data_->table_->schema().schema_, &data_->arena_, key, &enc_key));
- data_->spec_.SetExclusiveUpperBoundKey(enc_key.get());
- data_->pool_.Add(enc_key.release());
- return Status::OK();
+ return data_->mutable_configuration()->AddUpperBoundRaw(key);
}
Status KuduScanner::AddLowerBoundPartitionKeyRaw(const Slice& partition_key) {
- data_->spec_.SetLowerBoundPartitionKey(partition_key);
- return Status::OK();
+ return data_->mutable_configuration()->AddLowerBoundPartitionKeyRaw(partition_key);
}
Status KuduScanner::AddExclusiveUpperBoundPartitionKeyRaw(const Slice& partition_key) {
- data_->spec_.SetExclusiveUpperBoundPartitionKey(partition_key);
- return Status::OK();
+ return data_->mutable_configuration()->AddUpperBoundPartitionKeyRaw(partition_key);
}
Status KuduScanner::SetCacheBlocks(bool cache_blocks) {
if (data_->open_) {
return Status::IllegalState("Block caching must be set before Open()");
}
- data_->spec_.set_cache_blocks(cache_blocks);
- return Status::OK();
+ return data_->mutable_configuration()->SetCacheBlocks(cache_blocks);
}
KuduSchema KuduScanner::GetProjectionSchema() const {
- return data_->client_projection_;
+ return KuduSchema(*data_->configuration().projection());
}
namespace {
@@ -1050,19 +989,21 @@ struct CloseCallback {
string KuduScanner::ToString() const {
return strings::Substitute("$0: $1",
data_->table_->name(),
- data_->spec_.ToString(*data_->table_->schema().schema_));
+ data_->configuration()
+ .spec()
+ .ToString(*data_->table_->schema().schema_));
}
Status KuduScanner::Open() {
CHECK(!data_->open_) << "Scanner already open";
- CHECK(data_->projection_ != nullptr) << "No projection provided";
- data_->spec_.OptimizeScan(*data_->table_->schema().schema_, &data_->arena_, &data_->pool_, false);
+ data_->mutable_configuration()->OptimizeScanSpec();
data_->partition_pruner_.Init(*data_->table_->schema().schema_,
data_->table_->partition_schema(),
- data_->spec_);
+ data_->configuration().spec());
- if (data_->spec_.CanShortCircuit() || !data_->partition_pruner_.HasMorePartitionKeyRanges()) {
+ if (data_->configuration().spec().CanShortCircuit() ||
+ !data_->partition_pruner_.HasMorePartitionKeyRanges()) {
VLOG(1) << "Short circuiting scan " << ToString();
data_->open_ = true;
data_->short_circuit_ = true;
@@ -1072,7 +1013,7 @@ Status KuduScanner::Open() {
VLOG(1) << "Beginning scan " << ToString();
MonoTime deadline = MonoTime::Now(MonoTime::FINE);
- deadline.AddDelta(data_->timeout_);
+ deadline.AddDelta(data_->configuration().timeout());
set<string> blacklist;
RETURN_NOT_OK(data_->OpenNextTablet(deadline, &blacklist));
@@ -1101,7 +1042,7 @@ void KuduScanner::Close() {
closer->scanner_id = data_->next_req_.scanner_id();
data_->PrepareRequest(KuduScanner::Data::CLOSE);
data_->next_req_.set_close_scanner(true);
- closer->controller.set_timeout(data_->timeout_);
+ closer->controller.set_timeout(data_->configuration().timeout());
data_->proxy_->ScanAsync(data_->next_req_, &closer->response, &closer->controller,
boost::bind(&CloseCallback::Callback, closer.get()));
ignore_result(closer.release());
@@ -1144,19 +1085,19 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
VLOG(1) << "Extracting data from scan " << ToString();
data_->data_in_open_ = false;
return batch->data_->Reset(&data_->controller_,
- data_->projection_,
- &data_->client_projection_,
+ data_->configuration().projection(),
+ data_->configuration().client_projection(),
make_gscoped_ptr(data_->last_response_.release_data()));
} else if (data_->last_response_.has_more_results()) {
// More data is available in this tablet.
VLOG(1) << "Continuing scan " << ToString();
MonoTime batch_deadline = MonoTime::Now(MonoTime::FINE);
- batch_deadline.AddDelta(data_->timeout_);
+ batch_deadline.AddDelta(data_->configuration().timeout());
data_->PrepareRequest(KuduScanner::Data::CONTINUE);
while (true) {
- bool allow_time_for_failover = data_->is_fault_tolerant_;
+ bool allow_time_for_failover = data_->configuration().is_fault_tolerant();
ScanRpcStatus result = data_->SendScanRpc(batch_deadline, allow_time_for_failover);
// Success case.
@@ -1166,8 +1107,8 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
}
data_->scan_attempts_ = 0;
return batch->data_->Reset(&data_->controller_,
- data_->projection_,
- &data_->client_projection_,
+ data_->configuration().projection(),
+ data_->configuration().client_projection(),
make_gscoped_ptr(data_->last_response_.release_data()));
}
@@ -1180,7 +1121,7 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
set<string> blacklist;
RETURN_NOT_OK(data_->HandleError(result, batch_deadline, &blacklist));
- if (data_->is_fault_tolerant_) {
+ if (data_->configuration().is_fault_tolerant()) {
LOG(WARNING) << "Attempting to retry scan of tablet " << ToString() << " elsewhere.";
return data_->ReopenCurrentTablet(batch_deadline, &blacklist);
}
@@ -1200,7 +1141,7 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
VLOG(1) << "Scanning next tablet " << ToString();
data_->last_primary_key_.clear();
MonoTime deadline = MonoTime::Now(MonoTime::FINE);
- deadline.AddDelta(data_->timeout_);
+ deadline.AddDelta(data_->configuration().timeout());
set<string> blacklist;
RETURN_NOT_OK(data_->OpenNextTablet(deadline, &blacklist));
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8a3e461f/src/kudu/client/scan_configuration.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_configuration.cc b/src/kudu/client/scan_configuration.cc
new file mode 100644
index 0000000..6b06086
--- /dev/null
+++ b/src/kudu/client/scan_configuration.cc
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/client/scan_configuration.h"
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "kudu/client/client.h"
+#include "kudu/client/scan_predicate.h"
+#include "kudu/client/scan_predicate-internal.h"
+
+using std::unique_ptr;
+using std::string;
+using std::vector;
+
+namespace kudu {
+namespace client {
+
+ScanConfiguration::ScanConfiguration(KuduTable* table)
+ : table_(table),
+ projection_(table->schema().schema_),
+ client_projection_(*table->schema().schema_),
+ has_batch_size_bytes_(false),
+ batch_size_bytes_(0),
+ selection_(KuduClient::CLOSEST_REPLICA),
+ read_mode_(KuduScanner::READ_LATEST),
+ is_fault_tolerant_(false),
+ snapshot_timestamp_(kNoTimestamp),
+ timeout_(MonoDelta::FromMilliseconds(KuduScanner::kScanTimeoutMillis)),
+ arena_(1024, 1024 * 1024) {
+}
+
+Status ScanConfiguration::SetProjectedColumnNames(const vector<string>& col_names) {
+ const Schema& schema = *table().schema().schema_;
+ vector<int> col_indexes;
+ col_indexes.reserve(col_names.size());
+ for (const string& col_name : col_names) {
+ int idx = schema.find_column(col_name);
+ if (idx == Schema::kColumnNotFound) {
+ return Status::NotFound(strings::Substitute(
+ "Column: \"$0\" was not found in the table schema.", col_name));
+ }
+ col_indexes.push_back(idx);
+ }
+ return SetProjectedColumnIndexes(col_indexes);
+}
+
+Status ScanConfiguration::SetProjectedColumnIndexes(const vector<int>& col_indexes) {
+ const Schema* table_schema = table_->schema().schema_;
+ vector<ColumnSchema> cols;
+ cols.reserve(col_indexes.size());
+ for (const int col_index : col_indexes) {
+ if (col_index < 0 || col_index >= table_schema->columns().size()) {
+ return Status::NotFound(strings::Substitute(
+ "Column index: $0 was not found in the table schema.", col_index));
+ }
+ cols.push_back(table_schema->column(col_index));
+ }
+
+ unique_ptr<Schema> s(new Schema());
+ RETURN_NOT_OK(s->Reset(cols, 0));
+ projection_ = pool_.Add(s.release());
+ client_projection_ = KuduSchema(*projection_);
+ return Status::OK();
+}
+
+Status ScanConfiguration::AddConjunctPredicate(KuduPredicate* pred) {
+ // Take ownership even if we return a bad status.
+ pool_.Add(pred);
+ return pred->data_->AddToScanSpec(&spec_, &arena_);
+}
+
+Status ScanConfiguration::AddLowerBound(const KuduPartialRow& key) {
+ string encoded;
+ RETURN_NOT_OK(key.EncodeRowKey(&encoded));
+ return AddLowerBoundRaw(encoded);
+}
+
+Status ScanConfiguration::AddUpperBound(const KuduPartialRow& key) {
+ string encoded;
+ RETURN_NOT_OK(key.EncodeRowKey(&encoded));
+ return AddUpperBoundRaw(encoded);
+}
+
+Status ScanConfiguration::AddLowerBoundRaw(const Slice& key) {
+ // Make a copy of the key.
+ gscoped_ptr<EncodedKey> enc_key;
+ RETURN_NOT_OK(EncodedKey::DecodeEncodedString(
+ *table_->schema().schema_, &arena_, key, &enc_key));
+ spec_.SetLowerBoundKey(enc_key.get());
+ pool_.Add(enc_key.release());
+ return Status::OK();
+}
+
+Status ScanConfiguration::AddUpperBoundRaw(const Slice& key) {
+ // Make a copy of the key.
+ gscoped_ptr<EncodedKey> enc_key;
+ RETURN_NOT_OK(EncodedKey::DecodeEncodedString(
+ *table_->schema().schema_, &arena_, key, &enc_key));
+ spec_.SetExclusiveUpperBoundKey(enc_key.get());
+ pool_.Add(enc_key.release());
+ return Status::OK();
+}
+
+Status ScanConfiguration::AddLowerBoundPartitionKeyRaw(const Slice& partition_key) {
+ spec_.SetLowerBoundPartitionKey(partition_key);
+ return Status::OK();
+}
+
+Status ScanConfiguration::AddUpperBoundPartitionKeyRaw(const Slice& partition_key) {
+ spec_.SetExclusiveUpperBoundPartitionKey(partition_key);
+ return Status::OK();
+}
+
+Status ScanConfiguration::SetCacheBlocks(bool cache_blocks) {
+ spec_.set_cache_blocks(cache_blocks);
+ return Status::OK();
+}
+
+Status ScanConfiguration::SetBatchSizeBytes(uint32_t batch_size) {
+ has_batch_size_bytes_ = true;
+ batch_size_bytes_ = batch_size;
+ return Status::OK();
+}
+
+Status ScanConfiguration::SetSelection(KuduClient::ReplicaSelection selection) {
+ selection_ = selection;
+ return Status::OK();
+}
+
+Status ScanConfiguration::SetReadMode(KuduScanner::ReadMode read_mode) {
+ read_mode_ = read_mode;
+ return Status::OK();
+}
+
+Status ScanConfiguration::SetFaultTolerant(bool fault_tolerant) {
+ RETURN_NOT_OK(SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
+ is_fault_tolerant_ = true;
+ return Status::OK();
+}
+
+void ScanConfiguration::SetSnapshotMicros(uint64_t snapshot_timestamp_micros) {
+ // Shift the HT timestamp bits to get well-formed HT timestamp with the
+ // logical bits zeroed out.
+ snapshot_timestamp_ = snapshot_timestamp_micros << kHtTimestampBitsToShift;
+}
+
+void ScanConfiguration::SetSnapshotRaw(uint64_t snapshot_timestamp) {
+ snapshot_timestamp_ = snapshot_timestamp;
+}
+
+void ScanConfiguration::SetTimeoutMillis(int millis) {
+ timeout_ = MonoDelta::FromMilliseconds(millis);
+}
+
+void ScanConfiguration::OptimizeScanSpec() {
+ spec_.OptimizeScan(*table_->schema().schema_,
+ &arena_,
+ &pool_,
+ /* remove_pushed_predicates */ false);
+}
+
+} // namespace client
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8a3e461f/src/kudu/client/scan_configuration.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_configuration.h b/src/kudu/client/scan_configuration.h
new file mode 100644
index 0000000..efa7a96
--- /dev/null
+++ b/src/kudu/client/scan_configuration.h
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "kudu/client/client.h"
+#include "kudu/common/scan_spec.h"
+#include "kudu/util/auto_release_pool.h"
+#include "kudu/util/memory/arena.h"
+
+namespace kudu {
+namespace client {
+
+// A configuration object which holds Kudu scan options.
+//
+// Unless otherwise specified, the method documentation matches the
+// corresponding methods on KuduScanner.
+class ScanConfiguration {
+ public:
+
+ static const int64_t kNoTimestamp = -1;
+ static const int kHtTimestampBitsToShift = 12;
+
+ explicit ScanConfiguration(KuduTable* table);
+ ~ScanConfiguration() = default;
+
+ Status SetProjectedColumnNames(const std::vector<std::string>& col_names) WARN_UNUSED_RESULT;
+
+ Status SetProjectedColumnIndexes(const std::vector<int>& col_indexes) WARN_UNUSED_RESULT;
+
+ Status AddConjunctPredicate(KuduPredicate* pred) WARN_UNUSED_RESULT;
+
+ Status AddLowerBound(const KuduPartialRow& key);
+
+ Status AddUpperBound(const KuduPartialRow& key);
+
+ Status AddLowerBoundRaw(const Slice& key);
+
+ Status AddUpperBoundRaw(const Slice& key);
+
+ Status AddLowerBoundPartitionKeyRaw(const Slice& partition_key);
+
+ Status AddUpperBoundPartitionKeyRaw(const Slice& partition_key);
+
+ Status SetCacheBlocks(bool cache_blocks);
+
+ Status SetBatchSizeBytes(uint32_t batch_size);
+
+ Status SetSelection(KuduClient::ReplicaSelection selection) WARN_UNUSED_RESULT;
+
+ Status SetReadMode(KuduScanner::ReadMode read_mode) WARN_UNUSED_RESULT;
+
+ Status SetFaultTolerant(bool fault_tolerant) WARN_UNUSED_RESULT;
+
+ void SetSnapshotMicros(uint64_t snapshot_timestamp_micros);
+
+ void SetSnapshotRaw(uint64_t snapshot_timestamp);
+
+ void SetTimeoutMillis(int millis);
+
+ void OptimizeScanSpec();
+
+ const KuduTable& table() {
+ return *table_;
+ }
+
+ // Returns the projection schema.
+ //
+ // The ScanConfiguration retains ownership of the projection.
+ const Schema* projection() const {
+ return projection_;
+ }
+
+ // Returns the client projection schema.
+ //
+ // The ScanConfiguration retains ownership of the projection.
+ const KuduSchema* client_projection() const {
+ return &client_projection_;
+ }
+
+ const ScanSpec& spec() const {
+ return spec_;
+ }
+
+ bool has_batch_size_bytes() const {
+ return has_batch_size_bytes_;
+ }
+
+ uint32_t batch_size_bytes() const {
+ CHECK(has_batch_size_bytes_);
+ return batch_size_bytes_;
+ }
+
+ KuduClient::ReplicaSelection selection() const {
+ return selection_;
+ }
+
+ KuduScanner::ReadMode read_mode() const {
+ return read_mode_;
+ }
+
+ bool is_fault_tolerant() const {
+ return is_fault_tolerant_;
+ }
+
+ int64_t snapshot_timestamp() const {
+ return snapshot_timestamp_;
+ }
+
+ const MonoDelta& timeout() const {
+ return timeout_;
+ }
+
+ private:
+ friend class ScanTokenBuilder;
+
+ // Non-owned, non-null table.
+ KuduTable* table_;
+
+ // Non-owned, non-null projection schema.
+ Schema* projection_;
+
+ // Owned client projection.
+ KuduSchema client_projection_;
+
+ ScanSpec spec_;
+
+ bool has_batch_size_bytes_;
+ uint32 batch_size_bytes_;
+
+ KuduClient::ReplicaSelection selection_;
+
+ KuduScanner::ReadMode read_mode_;
+
+ bool is_fault_tolerant_;
+
+ int64_t snapshot_timestamp_;
+
+ MonoDelta timeout_;
+
+ // Manages interior allocations for the scan spec and copied bounds.
+ Arena arena_;
+
+ // Manages objects which need to live for the lifetime of the configuration,
+ // such as schemas, predicates, and keys.
+ AutoReleasePool pool_;
+};
+
+} // namespace client
+} // namespace kudu
+
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8a3e461f/src/kudu/client/scan_predicate.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_predicate.h b/src/kudu/client/scan_predicate.h
index 604d096..fa6ff48 100644
--- a/src/kudu/client/scan_predicate.h
+++ b/src/kudu/client/scan_predicate.h
@@ -47,10 +47,10 @@ class KUDU_EXPORT KuduPredicate {
// and gcc gives an error trying to derive from a private nested class.
class KUDU_NO_EXPORT Data;
private:
- friend class KuduScanner;
- friend class KuduTable;
friend class ComparisonPredicateData;
friend class ErrorPredicateData;
+ friend class KuduTable;
+ friend class ScanConfiguration;
explicit KuduPredicate(Data* d);
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8a3e461f/src/kudu/client/scanner-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index 1d24c13..2e8c5cb 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -49,29 +49,20 @@ namespace client {
using internal::RemoteTabletServer;
-static const int64_t kNoTimestamp = -1;
-
KuduScanner::Data::Data(KuduTable* table)
- : open_(false),
+ : configuration_(table),
+ open_(false),
data_in_open_(false),
- has_batch_size_bytes_(false),
- batch_size_bytes_(0),
- selection_(KuduClient::CLOSEST_REPLICA),
- read_mode_(READ_LATEST),
- is_fault_tolerant_(false),
- snapshot_timestamp_(kNoTimestamp),
short_circuit_(false),
table_(DCHECK_NOTNULL(table)),
- arena_(1024, 1024*1024),
- timeout_(MonoDelta::FromMilliseconds(kScanTimeoutMillis)),
scan_attempts_(0) {
- SetProjectionSchema(table->schema().schema_);
}
KuduScanner::Data::~Data() {
}
namespace {
+// Copies a predicate lower or upper bound from 'bound_src' into 'bound_dst'.
void CopyPredicateBound(const ColumnSchema& col,
const void* bound_src,
string* bound_dst) {
@@ -288,7 +279,7 @@ ScanRpcStatus KuduScanner::Data::SendScanRpc(const MonoTime& overall_deadline,
controller_.Reset();
controller_.set_deadline(rpc_deadline);
- if (!spec_.predicates().empty()) {
+ if (!configuration_.spec().predicates().empty()) {
controller_.RequireServerFeature(TabletServerFeatures::COLUMN_PREDICATES);
}
return AnalyzeResponse(
@@ -305,13 +296,13 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
PrepareRequest(KuduScanner::Data::NEW);
next_req_.clear_scanner_id();
NewScanRequestPB* scan = next_req_.mutable_new_scan_request();
- switch (read_mode_) {
+ switch (configuration_.read_mode()) {
case READ_LATEST: scan->set_read_mode(kudu::READ_LATEST); break;
case READ_AT_SNAPSHOT: scan->set_read_mode(kudu::READ_AT_SNAPSHOT); break;
default: LOG(FATAL) << "Unexpected read mode.";
}
- if (is_fault_tolerant_) {
+ if (configuration_.is_fault_tolerant()) {
scan->set_order_mode(kudu::ORDERED);
} else {
scan->set_order_mode(kudu::UNORDERED);
@@ -323,38 +314,38 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
scan->set_last_primary_key(last_primary_key_);
}
- scan->set_cache_blocks(spec_.cache_blocks());
+ scan->set_cache_blocks(configuration_.spec().cache_blocks());
- if (snapshot_timestamp_ != kNoTimestamp) {
- if (PREDICT_FALSE(read_mode_ != READ_AT_SNAPSHOT)) {
+ if (configuration_.snapshot_timestamp() != ScanConfiguration::kNoTimestamp) {
+ if (PREDICT_FALSE(configuration_.read_mode() != READ_AT_SNAPSHOT)) {
LOG(WARNING) << "Scan snapshot timestamp set but read mode was READ_LATEST."
" Ignoring timestamp.";
} else {
- scan->set_snap_timestamp(snapshot_timestamp_);
+ scan->set_snap_timestamp(configuration_.snapshot_timestamp());
}
}
// Set up the predicates.
scan->clear_column_predicates();
- for (const auto& col_pred : spec_.predicates()) {
+ for (const auto& col_pred : configuration_.spec().predicates()) {
ColumnPredicateIntoPB(col_pred.second, scan->add_column_predicates());
}
- if (spec_.lower_bound_key()) {
+ if (configuration_.spec().lower_bound_key()) {
scan->mutable_start_primary_key()->assign(
- reinterpret_cast<const char*>(spec_.lower_bound_key()->encoded_key().data()),
- spec_.lower_bound_key()->encoded_key().size());
+ reinterpret_cast<const char*>(configuration_.spec().lower_bound_key()->encoded_key().data()),
+ configuration_.spec().lower_bound_key()->encoded_key().size());
} else {
scan->clear_start_primary_key();
}
- if (spec_.exclusive_upper_bound_key()) {
- scan->mutable_stop_primary_key()->assign(
- reinterpret_cast<const char*>(spec_.exclusive_upper_bound_key()->encoded_key().data()),
- spec_.exclusive_upper_bound_key()->encoded_key().size());
+ if (configuration_.spec().exclusive_upper_bound_key()) {
+ scan->mutable_stop_primary_key()->assign(reinterpret_cast<const char*>(
+ configuration_.spec().exclusive_upper_bound_key()->encoded_key().data()),
+ configuration_.spec().exclusive_upper_bound_key()->encoded_key().size());
} else {
scan->clear_stop_primary_key();
}
- RETURN_NOT_OK(SchemaToColumnPBs(*projection_, scan->mutable_projected_columns(),
+ RETURN_NOT_OK(SchemaToColumnPBs(*configuration_.projection(), scan->mutable_projected_columns(),
SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES | SCHEMA_PB_WITHOUT_IDS));
for (int attempt = 1;; attempt++) {
@@ -373,7 +364,7 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
Status lookup_status = table_->client()->data_->GetTabletServer(
table_->client(),
remote_,
- selection_,
+ configuration_.selection(),
*blacklist,
&candidates,
&ts);
@@ -429,9 +420,9 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
// If present in the response, set the snapshot timestamp and the encoded last
// primary key. This is used when retrying the scan elsewhere. The last
// primary key is also updated on each scan response.
- if (is_fault_tolerant_) {
+ if (configuration().is_fault_tolerant()) {
CHECK(last_response_.has_snap_timestamp());
- snapshot_timestamp_ = last_response_.snap_timestamp();
+ configuration_.SetSnapshotRaw(last_response_.snap_timestamp());
if (last_response_.has_last_primary_key()) {
last_primary_key_ = last_response_.last_primary_key();
}
@@ -453,7 +444,7 @@ Status KuduScanner::Data::KeepAlive() {
}
RpcController controller;
- controller.set_timeout(timeout_);
+ controller.set_timeout(configuration_.timeout());
tserver::ScannerKeepAliveRequestPB request;
request.set_scanner_id(next_req_.scanner_id());
tserver::ScannerKeepAliveResponsePB response;
@@ -473,8 +464,8 @@ bool KuduScanner::Data::MoreTablets() const {
void KuduScanner::Data::PrepareRequest(RequestType state) {
if (state == KuduScanner::Data::CLOSE) {
next_req_.set_batch_size_bytes(0);
- } else if (has_batch_size_bytes_) {
- next_req_.set_batch_size_bytes(batch_size_bytes_);
+ } else if (configuration_.has_batch_size_bytes()) {
+ next_req_.set_batch_size_bytes(configuration_.batch_size_bytes());
} else {
next_req_.clear_batch_size_bytes();
}
@@ -492,11 +483,6 @@ void KuduScanner::Data::UpdateLastError(const Status& error) {
}
}
-void KuduScanner::Data::SetProjectionSchema(const Schema* schema) {
- projection_ = schema;
- client_projection_ = KuduSchema(*schema);
-}
-
////////////////////////////////////////////////////////////
// KuduScanBatch
////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8a3e461f/src/kudu/client/scanner-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h
index 44dea45..fee8b93 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -23,6 +23,7 @@
#include "kudu/client/client.h"
#include "kudu/client/row_result.h"
+#include "kudu/client/scan_configuration.h"
#include "kudu/common/partition_pruner.h"
#include "kudu/common/scan_spec.h"
#include "kudu/gutil/macros.h"
@@ -81,12 +82,6 @@ class KuduScanner::Data {
explicit Data(KuduTable* table);
~Data();
- // Copies a predicate lower or upper bound from 'bound_src' into
- // 'bound_dst'.
- void CopyPredicateBound(const ColumnSchema& col,
- const void* bound_src, std::string* bound_dst);
-
-
// Calculates a deadline and sends the next RPC for this scanner. The deadline for the
// RPC is calculated based on whether 'allow_time_for_failover' is true. If true,
// the deadline used for the RPC will be shortened so that, on timeout, there will
@@ -151,18 +146,18 @@ class KuduScanner::Data {
// non-fatal (i.e. retriable) scan error is encountered.
void UpdateLastError(const Status& error);
- // Sets the projection schema.
- void SetProjectionSchema(const Schema* schema);
+ const ScanConfiguration& configuration() const {
+ return configuration_;
+ }
+
+ ScanConfiguration* mutable_configuration() {
+ return &configuration_;
+ }
+
+ ScanConfiguration configuration_;
bool open_;
bool data_in_open_;
- bool has_batch_size_bytes_;
- uint32 batch_size_bytes_;
- KuduClient::ReplicaSelection selection_;
-
- ReadMode read_mode_;
- bool is_fault_tolerant_;
- int64_t snapshot_timestamp_;
// Set to true if the scan is known to be empty based on predicates and
// primary key bounds.
@@ -172,6 +167,7 @@ class KuduScanner::Data {
std::string last_primary_key_;
internal::RemoteTabletServer* ts_;
+
// The proxy can be derived from the RemoteTabletServer, but this involves retaking the
// meta cache lock. Keeping our own shared_ptr avoids this overhead.
std::shared_ptr<tserver::TabletServerServiceProxy> proxy_;
@@ -190,28 +186,11 @@ class KuduScanner::Data {
// The table we're scanning.
KuduTable* table_;
- // The projection schema used in the scan.
- const Schema* projection_;
-
- // 'projection_' after it is converted to KuduSchema, so that users can obtain
- // the projection without having to include common/schema.h.
- KuduSchema client_projection_;
-
- Arena arena_;
- AutoReleasePool pool_;
-
- // Machinery to store and encode raw column range predicates into
- // encoded keys.
- ScanSpec spec_;
-
PartitionPruner partition_pruner_;
// The tablet we're scanning.
scoped_refptr<internal::RemoteTablet> remote_;
- // Timeout for scanner RPCs.
- MonoDelta timeout_;
-
// Number of attempts since the last successful scan.
int scan_attempts_;
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8a3e461f/src/kudu/client/schema.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/schema.h b/src/kudu/client/schema.h
index 6bdeb0c..3236898 100644
--- a/src/kudu/client/schema.h
+++ b/src/kudu/client/schema.h
@@ -327,6 +327,7 @@ class KUDU_EXPORT KuduSchema {
friend class KuduTable;
friend class KuduTableCreator;
friend class KuduWriteOperation;
+ friend class ScanConfiguration;
friend class internal::GetTableSchemaRpc;
friend class internal::LookupRpc;
friend class internal::WriteRpc;