You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2016/04/14 20:46:00 UTC

incubator-kudu git commit: [c++-client] abstract scanner configuration

Repository: incubator-kudu
Updated Branches:
  refs/heads/master ce11938a2 -> 8a3e461f8


[c++-client] abstract scanner configuration

Unlike the Java client, the c++ client configures scan options directly on the
KuduScanner. This commit adds an internal class, ScanConfiguration, that takes
over responsibility for configuring scans from the KuduScanner.  KuduScanner is
updated to hold a ScanConfiguration internally. The ScanConfiguration will be
used internally in the forthcoming scan token builder API.

Change-Id: Id3d5b27f4a0ae6cd1500f17f96b7043590affd92
Reviewed-on: http://gerrit.cloudera.org:8080/2726
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/8a3e461f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/8a3e461f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/8a3e461f

Branch: refs/heads/master
Commit: 8a3e461f8619dda6f2f85135bb94143e9ae2926f
Parents: ce11938
Author: Dan Burkert <da...@cloudera.com>
Authored: Wed Apr 6 11:17:35 2016 -0700
Committer: Dan Burkert <da...@cloudera.com>
Committed: Thu Apr 14 18:45:34 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/CMakeLists.txt        |   3 +-
 src/kudu/client/client.cc             | 135 ++++++----------------
 src/kudu/client/scan_configuration.cc | 180 +++++++++++++++++++++++++++++
 src/kudu/client/scan_configuration.h  | 169 +++++++++++++++++++++++++++
 src/kudu/client/scan_predicate.h      |   4 +-
 src/kudu/client/scanner-internal.cc   |  64 ++++------
 src/kudu/client/scanner-internal.h    |  43 ++-----
 src/kudu/client/schema.h              |   1 +
 8 files changed, 428 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8a3e461f/src/kudu/client/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/client/CMakeLists.txt b/src/kudu/client/CMakeLists.txt
index c3f909a..48625b2 100644
--- a/src/kudu/client/CMakeLists.txt
+++ b/src/kudu/client/CMakeLists.txt
@@ -24,10 +24,11 @@ set(CLIENT_SRCS
   error-internal.cc
   meta_cache.cc
   scan_batch.cc
+  scan_configuration.cc
   scan_predicate.cc
   scanner-internal.cc
-  session-internal.cc
   schema.cc
+  session-internal.cc
   table-internal.cc
   table_alterer-internal.cc
   table_creator-internal.cc

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8a3e461f/src/kudu/client/client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index b9a2b63..fb614ba 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -103,7 +103,6 @@ using internal::ErrorCollector;
 using internal::MetaCache;
 using sp::shared_ptr;
 
-static const int kHtTimestampBitsToShift = 12;
 static const char* kProgName = "kudu_client";
 
 // We need to reroute all logging to stderr when the client library is
@@ -852,48 +851,18 @@ Status KuduScanner::SetProjectedColumnNames(const vector<string>& col_names) {
   if (data_->open_) {
     return Status::IllegalState("Projection must be set before Open()");
   }
-
-  const Schema* table_schema = data_->table_->schema().schema_;
-  vector<int> col_indexes;
-  col_indexes.reserve(col_names.size());
-  for (const string& col_name : col_names) {
-    int idx = table_schema->find_column(col_name);
-    if (idx == Schema::kColumnNotFound) {
-      return Status::NotFound(strings::Substitute("Column: \"$0\" was not found in the "
-          "table schema.", col_name));
-    }
-    col_indexes.push_back(idx);
-  }
-
-  return SetProjectedColumnIndexes(col_indexes);
+  return data_->mutable_configuration()->SetProjectedColumnNames(col_names);
 }
 
 Status KuduScanner::SetProjectedColumnIndexes(const vector<int>& col_indexes) {
   if (data_->open_) {
     return Status::IllegalState("Projection must be set before Open()");
   }
-
-  const Schema* table_schema = data_->table_->schema().schema_;
-  vector<ColumnSchema> cols;
-  cols.reserve(col_indexes.size());
-  for (const int col_index : col_indexes) {
-    if (col_index >= table_schema->columns().size()) {
-      return Status::NotFound(strings::Substitute("Column: \"$0\" was not found in the "
-          "table schema.", col_index));
-    }
-    cols.push_back(table_schema->column(col_index));
-  }
-
-  gscoped_ptr<Schema> s(new Schema());
-  RETURN_NOT_OK(s->Reset(cols, 0));
-  data_->SetProjectionSchema(data_->pool_.Add(s.release()));
-  return Status::OK();
+  return data_->mutable_configuration()->SetProjectedColumnIndexes(col_indexes);
 }
 
 Status KuduScanner::SetBatchSizeBytes(uint32_t batch_size) {
-  data_->has_batch_size_bytes_ = true;
-  data_->batch_size_bytes_ = batch_size;
-  return Status::OK();
+  return data_->mutable_configuration()->SetBatchSizeBytes(batch_size);
 }
 
 Status KuduScanner::SetReadMode(ReadMode read_mode) {
@@ -903,8 +872,7 @@ Status KuduScanner::SetReadMode(ReadMode read_mode) {
   if (!tight_enum_test<ReadMode>(read_mode)) {
     return Status::InvalidArgument("Bad read mode");
   }
-  data_->read_mode_ = read_mode;
-  return Status::OK();
+  return data_->mutable_configuration()->SetReadMode(read_mode);
 }
 
 Status KuduScanner::SetOrderMode(OrderMode order_mode) {
@@ -914,26 +882,21 @@ Status KuduScanner::SetOrderMode(OrderMode order_mode) {
   if (!tight_enum_test<OrderMode>(order_mode)) {
     return Status::InvalidArgument("Bad order mode");
   }
-  data_->is_fault_tolerant_ = order_mode == ORDERED;
-  return Status::OK();
+  return data_->mutable_configuration()->SetFaultTolerant(order_mode == ORDERED);
 }
 
 Status KuduScanner::SetFaultTolerant() {
   if (data_->open_) {
     return Status::IllegalState("Fault-tolerance must be set before Open()");
   }
-  RETURN_NOT_OK(SetReadMode(READ_AT_SNAPSHOT));
-  data_->is_fault_tolerant_ = true;
-  return Status::OK();
+  return data_->mutable_configuration()->SetFaultTolerant(true);
 }
 
 Status KuduScanner::SetSnapshotMicros(uint64_t snapshot_timestamp_micros) {
   if (data_->open_) {
     return Status::IllegalState("Snapshot timestamp must be set before Open()");
   }
-  // Shift the HT timestamp bits to get well-formed HT timestamp with the logical
-  // bits zeroed out.
-  data_->snapshot_timestamp_ = snapshot_timestamp_micros << kHtTimestampBitsToShift;
+  data_->mutable_configuration()->SetSnapshotMicros(snapshot_timestamp_micros);
   return Status::OK();
 }
 
@@ -941,7 +904,7 @@ Status KuduScanner::SetSnapshotRaw(uint64_t snapshot_timestamp) {
   if (data_->open_) {
     return Status::IllegalState("Snapshot timestamp must be set before Open()");
   }
-  data_->snapshot_timestamp_ = snapshot_timestamp;
+  data_->mutable_configuration()->SetSnapshotRaw(snapshot_timestamp);
   return Status::OK();
 }
 
@@ -949,83 +912,59 @@ Status KuduScanner::SetSelection(KuduClient::ReplicaSelection selection) {
   if (data_->open_) {
     return Status::IllegalState("Replica selection must be set before Open()");
   }
-  data_->selection_ = selection;
-  return Status::OK();
+  return data_->mutable_configuration()->SetSelection(selection);
 }
 
 Status KuduScanner::SetTimeoutMillis(int millis) {
   if (data_->open_) {
     return Status::IllegalState("Timeout must be set before Open()");
   }
-  data_->timeout_ = MonoDelta::FromMilliseconds(millis);
+  data_->mutable_configuration()->SetTimeoutMillis(millis);
   return Status::OK();
 }
 
 Status KuduScanner::AddConjunctPredicate(KuduPredicate* pred) {
-  // Take ownership even if we return a bad status.
-  data_->pool_.Add(pred);
   if (data_->open_) {
+    // Take ownership even if we return a bad status.
+    delete pred;
     return Status::IllegalState("Predicate must be set before Open()");
   }
-  return pred->data_->AddToScanSpec(&data_->spec_, &data_->arena_);
+  return data_->mutable_configuration()->AddConjunctPredicate(pred);
 }
 
 Status KuduScanner::AddLowerBound(const KuduPartialRow& key) {
-  gscoped_ptr<string> enc(new string());
-  RETURN_NOT_OK(key.EncodeRowKey(enc.get()));
-  RETURN_NOT_OK(AddLowerBoundRaw(Slice(*enc)));
-  data_->pool_.Add(enc.release());
-  return Status::OK();
+  return data_->mutable_configuration()->AddLowerBound(key);
 }
 
 Status KuduScanner::AddLowerBoundRaw(const Slice& key) {
-  // Make a copy of the key.
-  gscoped_ptr<EncodedKey> enc_key;
-  RETURN_NOT_OK(EncodedKey::DecodeEncodedString(
-                  *data_->table_->schema().schema_, &data_->arena_, key, &enc_key));
-  data_->spec_.SetLowerBoundKey(enc_key.get());
-  data_->pool_.Add(enc_key.release());
-  return Status::OK();
+  return data_->mutable_configuration()->AddLowerBoundRaw(key);
 }
 
 Status KuduScanner::AddExclusiveUpperBound(const KuduPartialRow& key) {
-  gscoped_ptr<string> enc(new string());
-  RETURN_NOT_OK(key.EncodeRowKey(enc.get()));
-  RETURN_NOT_OK(AddExclusiveUpperBoundRaw(Slice(*enc)));
-  data_->pool_.Add(enc.release());
-  return Status::OK();
+  return data_->mutable_configuration()->AddUpperBound(key);
 }
 
 Status KuduScanner::AddExclusiveUpperBoundRaw(const Slice& key) {
-  // Make a copy of the key.
-  gscoped_ptr<EncodedKey> enc_key;
-  RETURN_NOT_OK(EncodedKey::DecodeEncodedString(
-                  *data_->table_->schema().schema_, &data_->arena_, key, &enc_key));
-  data_->spec_.SetExclusiveUpperBoundKey(enc_key.get());
-  data_->pool_.Add(enc_key.release());
-  return Status::OK();
+  return data_->mutable_configuration()->AddUpperBoundRaw(key);
 }
 
 Status KuduScanner::AddLowerBoundPartitionKeyRaw(const Slice& partition_key) {
-  data_->spec_.SetLowerBoundPartitionKey(partition_key);
-  return Status::OK();
+  return data_->mutable_configuration()->AddLowerBoundPartitionKeyRaw(partition_key);
 }
 
 Status KuduScanner::AddExclusiveUpperBoundPartitionKeyRaw(const Slice& partition_key) {
-  data_->spec_.SetExclusiveUpperBoundPartitionKey(partition_key);
-  return Status::OK();
+  return data_->mutable_configuration()->AddUpperBoundPartitionKeyRaw(partition_key);
 }
 
 Status KuduScanner::SetCacheBlocks(bool cache_blocks) {
   if (data_->open_) {
     return Status::IllegalState("Block caching must be set before Open()");
   }
-  data_->spec_.set_cache_blocks(cache_blocks);
-  return Status::OK();
+  return data_->mutable_configuration()->SetCacheBlocks(cache_blocks);
 }
 
 KuduSchema KuduScanner::GetProjectionSchema() const {
-  return data_->client_projection_;
+  return KuduSchema(*data_->configuration().projection());
 }
 
 namespace {
@@ -1050,19 +989,21 @@ struct CloseCallback {
 string KuduScanner::ToString() const {
   return strings::Substitute("$0: $1",
                              data_->table_->name(),
-                             data_->spec_.ToString(*data_->table_->schema().schema_));
+                             data_->configuration()
+                                   .spec()
+                                   .ToString(*data_->table_->schema().schema_));
 }
 
 Status KuduScanner::Open() {
   CHECK(!data_->open_) << "Scanner already open";
-  CHECK(data_->projection_ != nullptr) << "No projection provided";
 
-  data_->spec_.OptimizeScan(*data_->table_->schema().schema_, &data_->arena_, &data_->pool_, false);
+  data_->mutable_configuration()->OptimizeScanSpec();
   data_->partition_pruner_.Init(*data_->table_->schema().schema_,
                                 data_->table_->partition_schema(),
-                                data_->spec_);
+                                data_->configuration().spec());
 
-  if (data_->spec_.CanShortCircuit() || !data_->partition_pruner_.HasMorePartitionKeyRanges()) {
+  if (data_->configuration().spec().CanShortCircuit() ||
+      !data_->partition_pruner_.HasMorePartitionKeyRanges()) {
     VLOG(1) << "Short circuiting scan " << ToString();
     data_->open_ = true;
     data_->short_circuit_ = true;
@@ -1072,7 +1013,7 @@ Status KuduScanner::Open() {
   VLOG(1) << "Beginning scan " << ToString();
 
   MonoTime deadline = MonoTime::Now(MonoTime::FINE);
-  deadline.AddDelta(data_->timeout_);
+  deadline.AddDelta(data_->configuration().timeout());
   set<string> blacklist;
 
   RETURN_NOT_OK(data_->OpenNextTablet(deadline, &blacklist));
@@ -1101,7 +1042,7 @@ void KuduScanner::Close() {
     closer->scanner_id = data_->next_req_.scanner_id();
     data_->PrepareRequest(KuduScanner::Data::CLOSE);
     data_->next_req_.set_close_scanner(true);
-    closer->controller.set_timeout(data_->timeout_);
+    closer->controller.set_timeout(data_->configuration().timeout());
     data_->proxy_->ScanAsync(data_->next_req_, &closer->response, &closer->controller,
                              boost::bind(&CloseCallback::Callback, closer.get()));
     ignore_result(closer.release());
@@ -1144,19 +1085,19 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
     VLOG(1) << "Extracting data from scan " << ToString();
     data_->data_in_open_ = false;
     return batch->data_->Reset(&data_->controller_,
-                                data_->projection_,
-                                &data_->client_projection_,
+                               data_->configuration().projection(),
+                               data_->configuration().client_projection(),
                                 make_gscoped_ptr(data_->last_response_.release_data()));
   } else if (data_->last_response_.has_more_results()) {
     // More data is available in this tablet.
     VLOG(1) << "Continuing scan " << ToString();
 
     MonoTime batch_deadline = MonoTime::Now(MonoTime::FINE);
-    batch_deadline.AddDelta(data_->timeout_);
+    batch_deadline.AddDelta(data_->configuration().timeout());
     data_->PrepareRequest(KuduScanner::Data::CONTINUE);
 
     while (true) {
-      bool allow_time_for_failover = data_->is_fault_tolerant_;
+      bool allow_time_for_failover = data_->configuration().is_fault_tolerant();
       ScanRpcStatus result = data_->SendScanRpc(batch_deadline, allow_time_for_failover);
 
       // Success case.
@@ -1166,8 +1107,8 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
         }
         data_->scan_attempts_ = 0;
         return batch->data_->Reset(&data_->controller_,
-                                   data_->projection_,
-                                   &data_->client_projection_,
+                                   data_->configuration().projection(),
+                                   data_->configuration().client_projection(),
                                    make_gscoped_ptr(data_->last_response_.release_data()));
       }
 
@@ -1180,7 +1121,7 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
       set<string> blacklist;
       RETURN_NOT_OK(data_->HandleError(result, batch_deadline, &blacklist));
 
-      if (data_->is_fault_tolerant_) {
+      if (data_->configuration().is_fault_tolerant()) {
         LOG(WARNING) << "Attempting to retry scan of tablet " << ToString() << " elsewhere.";
         return data_->ReopenCurrentTablet(batch_deadline, &blacklist);
       }
@@ -1200,7 +1141,7 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
     VLOG(1) << "Scanning next tablet " << ToString();
     data_->last_primary_key_.clear();
     MonoTime deadline = MonoTime::Now(MonoTime::FINE);
-    deadline.AddDelta(data_->timeout_);
+    deadline.AddDelta(data_->configuration().timeout());
     set<string> blacklist;
 
     RETURN_NOT_OK(data_->OpenNextTablet(deadline, &blacklist));

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8a3e461f/src/kudu/client/scan_configuration.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_configuration.cc b/src/kudu/client/scan_configuration.cc
new file mode 100644
index 0000000..6b06086
--- /dev/null
+++ b/src/kudu/client/scan_configuration.cc
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/client/scan_configuration.h"
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "kudu/client/client.h"
+#include "kudu/client/scan_predicate.h"
+#include "kudu/client/scan_predicate-internal.h"
+
+using std::unique_ptr;
+using std::string;
+using std::vector;
+
+namespace kudu {
+namespace client {
+
+ScanConfiguration::ScanConfiguration(KuduTable* table)
+    : table_(table),
+      projection_(table->schema().schema_),
+      client_projection_(*table->schema().schema_),
+      has_batch_size_bytes_(false),
+      batch_size_bytes_(0),
+      selection_(KuduClient::CLOSEST_REPLICA),
+      read_mode_(KuduScanner::READ_LATEST),
+      is_fault_tolerant_(false),
+      snapshot_timestamp_(kNoTimestamp),
+      timeout_(MonoDelta::FromMilliseconds(KuduScanner::kScanTimeoutMillis)),
+      arena_(1024, 1024 * 1024) {
+}
+
+Status ScanConfiguration::SetProjectedColumnNames(const vector<string>& col_names) {
+  const Schema& schema = *table().schema().schema_;
+  vector<int> col_indexes;
+  col_indexes.reserve(col_names.size());
+  for (const string& col_name : col_names) {
+    int idx = schema.find_column(col_name);
+    if (idx == Schema::kColumnNotFound) {
+      return Status::NotFound(strings::Substitute(
+            "Column: \"$0\" was not found in the table schema.", col_name));
+    }
+    col_indexes.push_back(idx);
+  }
+  return SetProjectedColumnIndexes(col_indexes);
+}
+
+Status ScanConfiguration::SetProjectedColumnIndexes(const vector<int>& col_indexes) {
+  const Schema* table_schema = table_->schema().schema_;
+  vector<ColumnSchema> cols;
+  cols.reserve(col_indexes.size());
+  for (const int col_index : col_indexes) {
+    if (col_index < 0 || col_index >= table_schema->columns().size()) {
+      return Status::NotFound(strings::Substitute(
+            "Column index: $0 was not found in the table schema.", col_index));
+    }
+    cols.push_back(table_schema->column(col_index));
+  }
+
+  unique_ptr<Schema> s(new Schema());
+  RETURN_NOT_OK(s->Reset(cols, 0));
+  projection_ = pool_.Add(s.release());
+  client_projection_ = KuduSchema(*projection_);
+  return Status::OK();
+}
+
+Status ScanConfiguration::AddConjunctPredicate(KuduPredicate* pred) {
+  // Take ownership even if we return a bad status.
+  pool_.Add(pred);
+  return pred->data_->AddToScanSpec(&spec_, &arena_);
+}
+
+Status ScanConfiguration::AddLowerBound(const KuduPartialRow& key) {
+  string encoded;
+  RETURN_NOT_OK(key.EncodeRowKey(&encoded));
+  return AddLowerBoundRaw(encoded);
+}
+
+Status ScanConfiguration::AddUpperBound(const KuduPartialRow& key) {
+  string encoded;
+  RETURN_NOT_OK(key.EncodeRowKey(&encoded));
+  return AddUpperBoundRaw(encoded);
+}
+
+Status ScanConfiguration::AddLowerBoundRaw(const Slice& key) {
+  // Make a copy of the key.
+  gscoped_ptr<EncodedKey> enc_key;
+  RETURN_NOT_OK(EncodedKey::DecodeEncodedString(
+                  *table_->schema().schema_, &arena_, key, &enc_key));
+  spec_.SetLowerBoundKey(enc_key.get());
+  pool_.Add(enc_key.release());
+  return Status::OK();
+}
+
+Status ScanConfiguration::AddUpperBoundRaw(const Slice& key) {
+  // Make a copy of the key.
+  gscoped_ptr<EncodedKey> enc_key;
+  RETURN_NOT_OK(EncodedKey::DecodeEncodedString(
+                  *table_->schema().schema_, &arena_, key, &enc_key));
+  spec_.SetExclusiveUpperBoundKey(enc_key.get());
+  pool_.Add(enc_key.release());
+  return Status::OK();
+}
+
+Status ScanConfiguration::AddLowerBoundPartitionKeyRaw(const Slice& partition_key) {
+  spec_.SetLowerBoundPartitionKey(partition_key);
+  return Status::OK();
+}
+
+Status ScanConfiguration::AddUpperBoundPartitionKeyRaw(const Slice& partition_key) {
+  spec_.SetExclusiveUpperBoundPartitionKey(partition_key);
+  return Status::OK();
+}
+
+Status ScanConfiguration::SetCacheBlocks(bool cache_blocks) {
+  spec_.set_cache_blocks(cache_blocks);
+  return Status::OK();
+}
+
+Status ScanConfiguration::SetBatchSizeBytes(uint32_t batch_size) {
+  has_batch_size_bytes_ = true;
+  batch_size_bytes_ = batch_size;
+  return Status::OK();
+}
+
+Status ScanConfiguration::SetSelection(KuduClient::ReplicaSelection selection) {
+  selection_ = selection;
+  return Status::OK();
+}
+
+Status ScanConfiguration::SetReadMode(KuduScanner::ReadMode read_mode) {
+  read_mode_ = read_mode;
+  return Status::OK();
+}
+
+Status ScanConfiguration::SetFaultTolerant(bool fault_tolerant) {
+  RETURN_NOT_OK(SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
+  is_fault_tolerant_ = true;
+  return Status::OK();
+}
+
+void ScanConfiguration::SetSnapshotMicros(uint64_t snapshot_timestamp_micros) {
+  // Shift the HT timestamp bits to get well-formed HT timestamp with the
+  // logical bits zeroed out.
+  snapshot_timestamp_ = snapshot_timestamp_micros << kHtTimestampBitsToShift;
+}
+
+void ScanConfiguration::SetSnapshotRaw(uint64_t snapshot_timestamp) {
+  snapshot_timestamp_ = snapshot_timestamp;
+}
+
+void ScanConfiguration::SetTimeoutMillis(int millis) {
+  timeout_ = MonoDelta::FromMilliseconds(millis);
+}
+
+void ScanConfiguration::OptimizeScanSpec() {
+  spec_.OptimizeScan(*table_->schema().schema_,
+                     &arena_,
+                     &pool_,
+                     /* remove_pushed_predicates */ false);
+}
+
+} // namespace client
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8a3e461f/src/kudu/client/scan_configuration.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_configuration.h b/src/kudu/client/scan_configuration.h
new file mode 100644
index 0000000..efa7a96
--- /dev/null
+++ b/src/kudu/client/scan_configuration.h
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "kudu/client/client.h"
+#include "kudu/common/scan_spec.h"
+#include "kudu/util/auto_release_pool.h"
+#include "kudu/util/memory/arena.h"
+
+namespace kudu {
+namespace client {
+
+// A configuration object which holds Kudu scan options.
+//
+// Unless otherwise specified, the method documentation matches the
+// corresponding methods on KuduScanner.
+class ScanConfiguration {
+ public:
+
+  static const int64_t kNoTimestamp = -1;
+  static const int kHtTimestampBitsToShift = 12;
+
+  explicit ScanConfiguration(KuduTable* table);
+  ~ScanConfiguration() = default;
+
+  Status SetProjectedColumnNames(const std::vector<std::string>& col_names) WARN_UNUSED_RESULT;
+
+  Status SetProjectedColumnIndexes(const std::vector<int>& col_indexes) WARN_UNUSED_RESULT;
+
+  Status AddConjunctPredicate(KuduPredicate* pred) WARN_UNUSED_RESULT;
+
+  Status AddLowerBound(const KuduPartialRow& key);
+
+  Status AddUpperBound(const KuduPartialRow& key);
+
+  Status AddLowerBoundRaw(const Slice& key);
+
+  Status AddUpperBoundRaw(const Slice& key);
+
+  Status AddLowerBoundPartitionKeyRaw(const Slice& partition_key);
+
+  Status AddUpperBoundPartitionKeyRaw(const Slice& partition_key);
+
+  Status SetCacheBlocks(bool cache_blocks);
+
+  Status SetBatchSizeBytes(uint32_t batch_size);
+
+  Status SetSelection(KuduClient::ReplicaSelection selection) WARN_UNUSED_RESULT;
+
+  Status SetReadMode(KuduScanner::ReadMode read_mode) WARN_UNUSED_RESULT;
+
+  Status SetFaultTolerant(bool fault_tolerant) WARN_UNUSED_RESULT;
+
+  void SetSnapshotMicros(uint64_t snapshot_timestamp_micros);
+
+  void SetSnapshotRaw(uint64_t snapshot_timestamp);
+
+  void SetTimeoutMillis(int millis);
+
+  void OptimizeScanSpec();
+
+  const KuduTable& table() {
+    return *table_;
+  }
+
+  // Returns the projection schema.
+  //
+  // The ScanConfiguration retains ownership of the projection.
+  const Schema* projection() const {
+    return projection_;
+  }
+
+  // Returns the client projection schema.
+  //
+  // The ScanConfiguration retains ownership of the projection.
+  const KuduSchema* client_projection() const {
+    return &client_projection_;
+  }
+
+  const ScanSpec& spec() const {
+    return spec_;
+  }
+
+  bool has_batch_size_bytes() const {
+    return has_batch_size_bytes_;
+  }
+
+  uint32_t batch_size_bytes() const {
+    CHECK(has_batch_size_bytes_);
+    return batch_size_bytes_;
+  }
+
+  KuduClient::ReplicaSelection selection() const {
+    return selection_;
+  }
+
+  KuduScanner::ReadMode read_mode() const {
+    return read_mode_;
+  }
+
+  bool is_fault_tolerant() const {
+    return is_fault_tolerant_;
+  }
+
+  int64_t snapshot_timestamp() const {
+    return snapshot_timestamp_;
+  }
+
+  const MonoDelta& timeout() const {
+    return timeout_;
+  }
+
+ private:
+  friend class ScanTokenBuilder;
+
+  // Non-owned, non-null table.
+  KuduTable* table_;
+
+  // Non-owned, non-null projection schema.
+  Schema* projection_;
+
+  // Owned client projection.
+  KuduSchema client_projection_;
+
+  ScanSpec spec_;
+
+  bool has_batch_size_bytes_;
+  uint32 batch_size_bytes_;
+
+  KuduClient::ReplicaSelection selection_;
+
+  KuduScanner::ReadMode read_mode_;
+
+  bool is_fault_tolerant_;
+
+  int64_t snapshot_timestamp_;
+
+  MonoDelta timeout_;
+
+  // Manages interior allocations for the scan spec and copied bounds.
+  Arena arena_;
+
+  // Manages objects which need to live for the lifetime of the configuration,
+  // such as schemas, predicates, and keys.
+  AutoReleasePool pool_;
+};
+
+} // namespace client
+} // namespace kudu
+

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8a3e461f/src/kudu/client/scan_predicate.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_predicate.h b/src/kudu/client/scan_predicate.h
index 604d096..fa6ff48 100644
--- a/src/kudu/client/scan_predicate.h
+++ b/src/kudu/client/scan_predicate.h
@@ -47,10 +47,10 @@ class KUDU_EXPORT KuduPredicate {
   // and gcc gives an error trying to derive from a private nested class.
   class KUDU_NO_EXPORT Data;
  private:
-  friend class KuduScanner;
-  friend class KuduTable;
   friend class ComparisonPredicateData;
   friend class ErrorPredicateData;
+  friend class KuduTable;
+  friend class ScanConfiguration;
 
   explicit KuduPredicate(Data* d);
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8a3e461f/src/kudu/client/scanner-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index 1d24c13..2e8c5cb 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -49,29 +49,20 @@ namespace client {
 
 using internal::RemoteTabletServer;
 
-static const int64_t kNoTimestamp = -1;
-
 KuduScanner::Data::Data(KuduTable* table)
-  : open_(false),
+  : configuration_(table),
+    open_(false),
     data_in_open_(false),
-    has_batch_size_bytes_(false),
-    batch_size_bytes_(0),
-    selection_(KuduClient::CLOSEST_REPLICA),
-    read_mode_(READ_LATEST),
-    is_fault_tolerant_(false),
-    snapshot_timestamp_(kNoTimestamp),
     short_circuit_(false),
     table_(DCHECK_NOTNULL(table)),
-    arena_(1024, 1024*1024),
-    timeout_(MonoDelta::FromMilliseconds(kScanTimeoutMillis)),
     scan_attempts_(0) {
-  SetProjectionSchema(table->schema().schema_);
 }
 
 KuduScanner::Data::~Data() {
 }
 
 namespace {
+// Copies a predicate lower or upper bound from 'bound_src' into 'bound_dst'.
 void CopyPredicateBound(const ColumnSchema& col,
                         const void* bound_src,
                         string* bound_dst) {
@@ -288,7 +279,7 @@ ScanRpcStatus KuduScanner::Data::SendScanRpc(const MonoTime& overall_deadline,
 
   controller_.Reset();
   controller_.set_deadline(rpc_deadline);
-  if (!spec_.predicates().empty()) {
+  if (!configuration_.spec().predicates().empty()) {
     controller_.RequireServerFeature(TabletServerFeatures::COLUMN_PREDICATES);
   }
   return AnalyzeResponse(
@@ -305,13 +296,13 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
   PrepareRequest(KuduScanner::Data::NEW);
   next_req_.clear_scanner_id();
   NewScanRequestPB* scan = next_req_.mutable_new_scan_request();
-  switch (read_mode_) {
+  switch (configuration_.read_mode()) {
     case READ_LATEST: scan->set_read_mode(kudu::READ_LATEST); break;
     case READ_AT_SNAPSHOT: scan->set_read_mode(kudu::READ_AT_SNAPSHOT); break;
     default: LOG(FATAL) << "Unexpected read mode.";
   }
 
-  if (is_fault_tolerant_) {
+  if (configuration_.is_fault_tolerant()) {
     scan->set_order_mode(kudu::ORDERED);
   } else {
     scan->set_order_mode(kudu::UNORDERED);
@@ -323,38 +314,38 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
     scan->set_last_primary_key(last_primary_key_);
   }
 
-  scan->set_cache_blocks(spec_.cache_blocks());
+  scan->set_cache_blocks(configuration_.spec().cache_blocks());
 
-  if (snapshot_timestamp_ != kNoTimestamp) {
-    if (PREDICT_FALSE(read_mode_ != READ_AT_SNAPSHOT)) {
+  if (configuration_.snapshot_timestamp() != ScanConfiguration::kNoTimestamp) {
+    if (PREDICT_FALSE(configuration_.read_mode() != READ_AT_SNAPSHOT)) {
       LOG(WARNING) << "Scan snapshot timestamp set but read mode was READ_LATEST."
           " Ignoring timestamp.";
     } else {
-      scan->set_snap_timestamp(snapshot_timestamp_);
+      scan->set_snap_timestamp(configuration_.snapshot_timestamp());
     }
   }
 
   // Set up the predicates.
   scan->clear_column_predicates();
-  for (const auto& col_pred : spec_.predicates()) {
+  for (const auto& col_pred : configuration_.spec().predicates()) {
     ColumnPredicateIntoPB(col_pred.second, scan->add_column_predicates());
   }
 
-  if (spec_.lower_bound_key()) {
+  if (configuration_.spec().lower_bound_key()) {
     scan->mutable_start_primary_key()->assign(
-      reinterpret_cast<const char*>(spec_.lower_bound_key()->encoded_key().data()),
-      spec_.lower_bound_key()->encoded_key().size());
+      reinterpret_cast<const char*>(configuration_.spec().lower_bound_key()->encoded_key().data()),
+      configuration_.spec().lower_bound_key()->encoded_key().size());
   } else {
     scan->clear_start_primary_key();
   }
-  if (spec_.exclusive_upper_bound_key()) {
-    scan->mutable_stop_primary_key()->assign(
-      reinterpret_cast<const char*>(spec_.exclusive_upper_bound_key()->encoded_key().data()),
-      spec_.exclusive_upper_bound_key()->encoded_key().size());
+  if (configuration_.spec().exclusive_upper_bound_key()) {
+    scan->mutable_stop_primary_key()->assign(reinterpret_cast<const char*>(
+          configuration_.spec().exclusive_upper_bound_key()->encoded_key().data()),
+      configuration_.spec().exclusive_upper_bound_key()->encoded_key().size());
   } else {
     scan->clear_stop_primary_key();
   }
-  RETURN_NOT_OK(SchemaToColumnPBs(*projection_, scan->mutable_projected_columns(),
+  RETURN_NOT_OK(SchemaToColumnPBs(*configuration_.projection(), scan->mutable_projected_columns(),
                                   SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES | SCHEMA_PB_WITHOUT_IDS));
 
   for (int attempt = 1;; attempt++) {
@@ -373,7 +364,7 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
     Status lookup_status = table_->client()->data_->GetTabletServer(
         table_->client(),
         remote_,
-        selection_,
+        configuration_.selection(),
         *blacklist,
         &candidates,
         &ts);
@@ -429,9 +420,9 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
   // If present in the response, set the snapshot timestamp and the encoded last
   // primary key.  This is used when retrying the scan elsewhere.  The last
   // primary key is also updated on each scan response.
-  if (is_fault_tolerant_) {
+  if (configuration().is_fault_tolerant()) {
     CHECK(last_response_.has_snap_timestamp());
-    snapshot_timestamp_ = last_response_.snap_timestamp();
+    configuration_.SetSnapshotRaw(last_response_.snap_timestamp());
     if (last_response_.has_last_primary_key()) {
       last_primary_key_ = last_response_.last_primary_key();
     }
@@ -453,7 +444,7 @@ Status KuduScanner::Data::KeepAlive() {
   }
 
   RpcController controller;
-  controller.set_timeout(timeout_);
+  controller.set_timeout(configuration_.timeout());
   tserver::ScannerKeepAliveRequestPB request;
   request.set_scanner_id(next_req_.scanner_id());
   tserver::ScannerKeepAliveResponsePB response;
@@ -473,8 +464,8 @@ bool KuduScanner::Data::MoreTablets() const {
 void KuduScanner::Data::PrepareRequest(RequestType state) {
   if (state == KuduScanner::Data::CLOSE) {
     next_req_.set_batch_size_bytes(0);
-  } else if (has_batch_size_bytes_) {
-    next_req_.set_batch_size_bytes(batch_size_bytes_);
+  } else if (configuration_.has_batch_size_bytes()) {
+    next_req_.set_batch_size_bytes(configuration_.batch_size_bytes());
   } else {
     next_req_.clear_batch_size_bytes();
   }
@@ -492,11 +483,6 @@ void KuduScanner::Data::UpdateLastError(const Status& error) {
   }
 }
 
-void KuduScanner::Data::SetProjectionSchema(const Schema* schema) {
-  projection_ = schema;
-  client_projection_ = KuduSchema(*schema);
-}
-
 ////////////////////////////////////////////////////////////
 // KuduScanBatch
 ////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8a3e461f/src/kudu/client/scanner-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h
index 44dea45..fee8b93 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -23,6 +23,7 @@
 
 #include "kudu/client/client.h"
 #include "kudu/client/row_result.h"
+#include "kudu/client/scan_configuration.h"
 #include "kudu/common/partition_pruner.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/gutil/macros.h"
@@ -81,12 +82,6 @@ class KuduScanner::Data {
   explicit Data(KuduTable* table);
   ~Data();
 
-  // Copies a predicate lower or upper bound from 'bound_src' into
-  // 'bound_dst'.
-  void CopyPredicateBound(const ColumnSchema& col,
-                          const void* bound_src, std::string* bound_dst);
-
-
   // Calculates a deadline and sends the next RPC for this scanner. The deadline for the
   // RPC is calculated based on whether 'allow_time_for_failover' is true. If true,
   // the deadline used for the RPC will be shortened so that, on timeout, there will
@@ -151,18 +146,18 @@ class KuduScanner::Data {
   // non-fatal (i.e. retriable) scan error is encountered.
   void UpdateLastError(const Status& error);
 
-  // Sets the projection schema.
-  void SetProjectionSchema(const Schema* schema);
+  const ScanConfiguration& configuration() const {
+    return configuration_;
+  }
+
+  ScanConfiguration* mutable_configuration() {
+    return &configuration_;
+  }
+
+  ScanConfiguration configuration_;
 
   bool open_;
   bool data_in_open_;
-  bool has_batch_size_bytes_;
-  uint32 batch_size_bytes_;
-  KuduClient::ReplicaSelection selection_;
-
-  ReadMode read_mode_;
-  bool is_fault_tolerant_;
-  int64_t snapshot_timestamp_;
 
   // Set to true if the scan is known to be empty based on predicates and
   // primary key bounds.
@@ -172,6 +167,7 @@ class KuduScanner::Data {
   std::string last_primary_key_;
 
   internal::RemoteTabletServer* ts_;
+
   // The proxy can be derived from the RemoteTabletServer, but this involves retaking the
   // meta cache lock. Keeping our own shared_ptr avoids this overhead.
   std::shared_ptr<tserver::TabletServerServiceProxy> proxy_;
@@ -190,28 +186,11 @@ class KuduScanner::Data {
   // The table we're scanning.
   KuduTable* table_;
 
-  // The projection schema used in the scan.
-  const Schema* projection_;
-
-  // 'projection_' after it is converted to KuduSchema, so that users can obtain
-  // the projection without having to include common/schema.h.
-  KuduSchema client_projection_;
-
-  Arena arena_;
-  AutoReleasePool pool_;
-
-  // Machinery to store and encode raw column range predicates into
-  // encoded keys.
-  ScanSpec spec_;
-
   PartitionPruner partition_pruner_;
 
   // The tablet we're scanning.
   scoped_refptr<internal::RemoteTablet> remote_;
 
-  // Timeout for scanner RPCs.
-  MonoDelta timeout_;
-
   // Number of attempts since the last successful scan.
   int scan_attempts_;
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8a3e461f/src/kudu/client/schema.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/schema.h b/src/kudu/client/schema.h
index 6bdeb0c..3236898 100644
--- a/src/kudu/client/schema.h
+++ b/src/kudu/client/schema.h
@@ -327,6 +327,7 @@ class KUDU_EXPORT KuduSchema {
   friend class KuduTable;
   friend class KuduTableCreator;
   friend class KuduWriteOperation;
+  friend class ScanConfiguration;
   friend class internal::GetTableSchemaRpc;
   friend class internal::LookupRpc;
   friend class internal::WriteRpc;