You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/03/20 21:45:36 UTC

[3/5] incubator-kudu git commit: Integrate ColumnPredicate into client and server

Integrate ColumnPredicate into client and server

This commit integrates the new ColumnPredicate type into the ScanSpec class. The
predicate encoder functionality has been subsumed into the ScanSpec and
ColumnPredicate classes.

In addition, we now 'lift' implicit predicates from the lower and upper primary
key bounds into the set of column predicates when the scan spec is optimized.

Scans using column predicates will fail to complete if the server does not
support the column predicate feature, so users will need to update the server
version when using a client version with this patch.

Finally, this commit refactors row_key-util into key_util, and makes the utility
methods able to work on primary keys as well as range keys.

Change-Id: Ife6852680b7f59fddee688e5702c1a70944f7622
Reviewed-on: http://gerrit.cloudera.org:8080/2138
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/53e67e9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/53e67e9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/53e67e9e

Branch: refs/heads/master
Commit: 53e67e9eb7b4fc940a14a17119041871e80bcc3f
Parents: 9c798f4
Author: Dan Burkert <da...@cloudera.com>
Authored: Fri Feb 5 15:36:26 2016 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Sun Mar 20 20:27:49 2016 +0000

----------------------------------------------------------------------
 .../org/kududb/client/AsyncKuduScanner.java     |   2 +-
 .../org/kududb/client/ColumnRangePredicate.java |   2 +-
 .../kududb/client/TestColumnRangePredicate.java |   6 +-
 src/kudu/client/client.cc                       |  34 +-
 src/kudu/client/scan_predicate-internal.h       |  15 +-
 src/kudu/client/scan_predicate.cc               |  46 +-
 src/kudu/client/scanner-internal.cc             |  75 ++-
 src/kudu/client/scanner-internal.h              |   9 +-
 src/kudu/common/CMakeLists.txt                  |  11 +-
 src/kudu/common/column_predicate.cc             |   4 +-
 src/kudu/common/column_predicate.h              |   1 -
 src/kudu/common/common.proto                    |  36 +
 src/kudu/common/encoded_key.cc                  |   4 +-
 src/kudu/common/generic_iterators-test.cc       |  17 +-
 src/kudu/common/generic_iterators.cc            | 165 ++---
 src/kudu/common/generic_iterators.h             |  25 +-
 src/kudu/common/key_util-test.cc                | 133 ++++
 src/kudu/common/key_util.cc                     | 284 ++++++++
 src/kudu/common/key_util.h                      | 103 +++
 src/kudu/common/partial_row.h                   |   3 +-
 src/kudu/common/partition.cc                    |   1 -
 src/kudu/common/predicate_encoder-test.cc       | 305 ---------
 src/kudu/common/predicate_encoder.cc            | 244 -------
 src/kudu/common/predicate_encoder.h             |  83 ---
 src/kudu/common/row_key-util-test.cc            | 135 ----
 src/kudu/common/row_key-util.cc                 | 118 ----
 src/kudu/common/row_key-util.h                  |  77 ---
 src/kudu/common/scan_spec-test.cc               | 666 +++++++++++++++++++
 src/kudu/common/scan_spec.cc                    | 204 +++++-
 src/kudu/common/scan_spec.h                     |  79 ++-
 src/kudu/master/sys_catalog.cc                  |   9 +-
 src/kudu/tablet/cfile_set-test.cc               |  20 +-
 src/kudu/tablet/composite-pushdown-test.cc      |  81 +--
 src/kudu/tablet/diskrowset-test-base.h          |   6 +-
 src/kudu/tablet/tablet-pushdown-test.cc         |  20 +-
 src/kudu/tablet/tablet.cc                       |  14 +-
 src/kudu/tablet/tablet.h                        |   6 -
 src/kudu/tablet/tablet_random_access-test.cc    |   2 +-
 src/kudu/tserver/tablet_server-test.cc          |  10 +-
 src/kudu/tserver/tablet_service.cc              | 105 ++-
 src/kudu/tserver/tablet_service.h               |   2 +
 src/kudu/tserver/tserver-path-handlers.cc       |  10 +-
 src/kudu/tserver/tserver.proto                  |  20 +-
 43 files changed, 1894 insertions(+), 1298 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
index 245ddbb..33c4f73 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
@@ -702,7 +702,7 @@ public final class AsyncKuduScanner {
           }
 
           if (!columnRangePredicates.isEmpty()) {
-            newBuilder.addAllRangePredicates(columnRangePredicates);
+            newBuilder.addAllDEPRECATEDRangePredicates(columnRangePredicates);
           }
           builder.setNewScanRequest(newBuilder.build())
                  .setBatchSizeBytes(batchSizeBytes);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/java/kudu-client/src/main/java/org/kududb/client/ColumnRangePredicate.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/ColumnRangePredicate.java b/java/kudu-client/src/main/java/org/kududb/client/ColumnRangePredicate.java
index bf46306..1d0daa3 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/ColumnRangePredicate.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/ColumnRangePredicate.java
@@ -59,7 +59,7 @@ public class ColumnRangePredicate {
 
   private void setUpperBoundInternal(byte[] value) {
     this.upperBound = value;
-    pb.setUpperBound(ZeroCopyLiteralByteString.wrap(this.upperBound));
+    pb.setInclusiveUpperBound(ZeroCopyLiteralByteString.wrap(this.upperBound));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/java/kudu-client/src/test/java/org/kududb/client/TestColumnRangePredicate.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestColumnRangePredicate.java b/java/kudu-client/src/test/java/org/kududb/client/TestColumnRangePredicate.java
index b5af961..bf13f2d 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/TestColumnRangePredicate.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/TestColumnRangePredicate.java
@@ -59,14 +59,14 @@ public class TestColumnRangePredicate {
 
     assertEquals(col1.getName(), decodedPreds.get(0).getColumn().getName());
     assertEquals(1, Bytes.getInt(Bytes.get(decodedPreds.get(0).getLowerBound())));
-    assertFalse(decodedPreds.get(0).hasUpperBound());
+    assertFalse(decodedPreds.get(0).hasInclusiveUpperBound());
 
     assertEquals(col1.getName(), decodedPreds.get(1).getColumn().getName());
-    assertEquals(2, Bytes.getInt(Bytes.get(decodedPreds.get(1).getUpperBound())));
+    assertEquals(2, Bytes.getInt(Bytes.get(decodedPreds.get(1).getInclusiveUpperBound())));
     assertFalse(decodedPreds.get(1).hasLowerBound());
 
     assertEquals(col2.getName(), decodedPreds.get(2).getColumn().getName());
     assertEquals("aaa", Bytes.getString(Bytes.get(decodedPreds.get(2).getLowerBound())));
-    assertEquals("bbb", Bytes.getString(Bytes.get(decodedPreds.get(2).getUpperBound())));
+    assertEquals("bbb", Bytes.getString(Bytes.get(decodedPreds.get(2).getInclusiveUpperBound())));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/src/kudu/client/client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 0c69ce5..74c4a4d 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -958,7 +958,7 @@ Status KuduScanner::AddConjunctPredicate(KuduPredicate* pred) {
   if (data_->open_) {
     return Status::IllegalState("Predicate must be set before Open()");
   }
-  return pred->data_->AddToScanSpec(&data_->spec_);
+  return pred->data_->AddToScanSpec(&data_->spec_, &data_->arena_);
 }
 
 Status KuduScanner::AddLowerBound(const KuduPartialRow& key) {
@@ -1039,20 +1039,23 @@ struct CloseCallback {
 } // anonymous namespace
 
 string KuduScanner::ToString() const {
-  Slice start_key = data_->spec_.lower_bound_key() ?
-    data_->spec_.lower_bound_key()->encoded_key() : Slice("INF");
-  Slice end_key = data_->spec_.exclusive_upper_bound_key() ?
-    data_->spec_.exclusive_upper_bound_key()->encoded_key() : Slice("INF");
-  return strings::Substitute("$0: [$1,$2)", data_->table_->name(),
-                             start_key.ToDebugString(), end_key.ToDebugString());
+  return strings::Substitute("$0: $1",
+                             data_->table_->name(),
+                             data_->spec_.ToString(*data_->table_->schema().schema_));
 }
 
 Status KuduScanner::Open() {
   CHECK(!data_->open_) << "Scanner already open";
   CHECK(data_->projection_ != nullptr) << "No projection provided";
 
-  // Find the first tablet.
-  data_->spec_encoder_.EncodeRangePredicates(&data_->spec_, false);
+  data_->spec_.OptimizeScan(*data_->table_->schema().schema_, &data_->arena_, &data_->pool_, false);
+
+  if (data_->spec_.CanShortCircuit()) {
+    VLOG(1) << "Short circuiting scan " << ToString();
+    data_->open_ = true;
+    data_->short_circuit_ = true;
+    return Status::OK();
+  }
 
   VLOG(1) << "Beginning scan " << ToString();
 
@@ -1108,7 +1111,6 @@ Status KuduScanner::KeepAlive() {
 
 void KuduScanner::Close() {
   if (!data_->open_) return;
-  CHECK(data_->proxy_);
 
   VLOG(1) << "Ending scan " << ToString();
 
@@ -1118,6 +1120,7 @@ void KuduScanner::Close() {
   // This is reflected in the Open() response. In this case, there is no server-side state
   // to clean up.
   if (!data_->next_req_.scanner_id().empty()) {
+    CHECK(data_->proxy_);
     gscoped_ptr<CloseCallback> closer(new CloseCallback);
     closer->scanner_id = data_->next_req_.scanner_id();
     data_->PrepareRequest(KuduScanner::Data::CLOSE);
@@ -1134,9 +1137,10 @@ void KuduScanner::Close() {
 
 bool KuduScanner::HasMoreRows() const {
   CHECK(data_->open_);
-  return data_->data_in_open_ || // more data in hand
-      data_->last_response_.has_more_results() || // more data in this tablet
-      data_->MoreTablets(); // more tablets to scan, possibly with more data
+  return !data_->short_circuit_ &&                 // The scan is not short circuited
+      (data_->data_in_open_ ||                     // more data in hand
+       data_->last_response_.has_more_results() || // more data in this tablet
+       data_->MoreTablets());                      // more tablets to scan, possibly with more data
 }
 
 Status KuduScanner::NextBatch(vector<KuduRowResult>* rows) {
@@ -1155,6 +1159,10 @@ Status KuduScanner::NextBatch(KuduScanBatch* result) {
 
   result->data_->Clear();
 
+  if (data_->short_circuit_) {
+    return Status::OK();
+  }
+
   if (data_->data_in_open_) {
     // We have data from a previous scan.
     VLOG(1) << "Extracting data from scan " << ToString();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/src/kudu/client/scan_predicate-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_predicate-internal.h b/src/kudu/client/scan_predicate-internal.h
index 32ce5bd..61205f4 100644
--- a/src/kudu/client/scan_predicate-internal.h
+++ b/src/kudu/client/scan_predicate-internal.h
@@ -17,10 +17,12 @@
 #ifndef KUDU_CLIENT_SCAN_PREDICATE_INTERNAL_H
 #define KUDU_CLIENT_SCAN_PREDICATE_INTERNAL_H
 
+#include "kudu/client/scan_predicate.h"
 #include "kudu/client/value.h"
 #include "kudu/client/value-internal.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/util/memory/arena.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
@@ -30,7 +32,7 @@ class KuduPredicate::Data {
  public:
   Data();
   virtual ~Data();
-  virtual Status AddToScanSpec(ScanSpec* spec) = 0;
+  virtual Status AddToScanSpec(ScanSpec* spec, Arena* arena) = 0;
   virtual Data* Clone() const = 0;
 };
 
@@ -50,11 +52,11 @@ class ErrorPredicateData : public KuduPredicate::Data {
   virtual ~ErrorPredicateData() {
   }
 
-  virtual Status AddToScanSpec(ScanSpec* spec) OVERRIDE {
+  Status AddToScanSpec(ScanSpec* spec, Arena* arena) override {
     return status_;
   }
 
-  virtual ErrorPredicateData* Clone() const OVERRIDE {
+  ErrorPredicateData* Clone() const override {
     return new ErrorPredicateData(status_);
   }
 
@@ -72,9 +74,9 @@ class ComparisonPredicateData : public KuduPredicate::Data {
                           KuduValue* value);
   virtual ~ComparisonPredicateData();
 
-  virtual Status AddToScanSpec(ScanSpec* spec) OVERRIDE;
+  Status AddToScanSpec(ScanSpec* spec, Arena* arena) override;
 
-  virtual ComparisonPredicateData* Clone() const OVERRIDE {
+  ComparisonPredicateData* Clone() const override {
       return new ComparisonPredicateData(col_, op_, val_->Clone());
   }
 
@@ -84,9 +86,6 @@ class ComparisonPredicateData : public KuduPredicate::Data {
   ColumnSchema col_;
   KuduPredicate::ComparisonOp op_;
   gscoped_ptr<KuduValue> val_;
-
-  // Owned.
-  ColumnRangePredicate* pred_;
 };
 
 } // namespace client

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/src/kudu/client/scan_predicate.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_predicate.cc b/src/kudu/client/scan_predicate.cc
index 4a4df8b..f11b8d1 100644
--- a/src/kudu/client/scan_predicate.cc
+++ b/src/kudu/client/scan_predicate.cc
@@ -16,18 +16,24 @@
 // under the License.
 
 #include "kudu/client/scan_predicate.h"
+
+#include <boost/optional.hpp>
+#include <utility>
+
 #include "kudu/client/scan_predicate-internal.h"
-#include "kudu/client/value.h"
 #include "kudu/client/value-internal.h"
-
-#include "kudu/common/scan_spec.h"
+#include "kudu/client/value.h"
 #include "kudu/common/scan_predicate.h"
-
+#include "kudu/common/scan_spec.h"
 #include "kudu/gutil/strings/substitute.h"
 
-using strings::Substitute;
+using std::move;
+using boost::optional;
 
 namespace kudu {
+
+using strings::Substitute;
+
 namespace client {
 
 KuduPredicate::KuduPredicate(Data* d)
@@ -51,7 +57,7 @@ KuduPredicate* KuduPredicate::Clone() const {
 ComparisonPredicateData::ComparisonPredicateData(ColumnSchema col,
                                                  KuduPredicate::ComparisonOp op,
                                                  KuduValue* val)
-    : col_(std::move(col)),
+    : col_(move(col)),
       op_(op),
       val_(val) {
 }
@@ -59,31 +65,31 @@ ComparisonPredicateData::~ComparisonPredicateData() {
 }
 
 
-Status ComparisonPredicateData::AddToScanSpec(ScanSpec* spec) {
+Status ComparisonPredicateData::AddToScanSpec(ScanSpec* spec, Arena* arena) {
   void* val_void;
   RETURN_NOT_OK(val_->data_->CheckTypeAndGetPointer(col_.name(),
                                                     col_.type_info()->physical_type(),
                                                     &val_void));
-
-  void* lower_bound = nullptr;
-  void* upper_bound = nullptr;
   switch (op_) {
-    case KuduPredicate::LESS_EQUAL:
-      upper_bound = val_void;
+    case KuduPredicate::LESS_EQUAL: {
+      optional<ColumnPredicate> pred =
+        ColumnPredicate::InclusiveRange(col_, nullptr, val_void, arena);
+      if (pred) {
+        spec->AddPredicate(*pred);
+      }
       break;
-    case KuduPredicate::GREATER_EQUAL:
-      lower_bound = val_void;
+    };
+    case KuduPredicate::GREATER_EQUAL: {
+      spec->AddPredicate(ColumnPredicate::Range(col_, val_void, nullptr));
       break;
-    case KuduPredicate::EQUAL:
-      lower_bound = upper_bound = val_void;
+    };
+    case KuduPredicate::EQUAL: {
+      spec->AddPredicate(ColumnPredicate::Equality(col_, val_void));
       break;
+    };
     default:
       return Status::InvalidArgument(Substitute("invalid comparison op: $0", op_));
   }
-
-  ColumnRangePredicate p(col_, lower_bound, upper_bound);
-  spec->AddPredicate(p);
-
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/src/kudu/client/scanner-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index aad99e4..b9931e9 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -27,6 +27,7 @@
 #include "kudu/client/meta_cache.h"
 #include "kudu/client/row_result.h"
 #include "kudu/client/table-internal.h"
+#include "kudu/common/common.pb.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/rpc_controller.h"
@@ -37,12 +38,12 @@ using std::string;
 
 namespace kudu {
 
+using kudu::ColumnPredicatePB;
 using rpc::RpcController;
 using strings::Substitute;
 using strings::SubstituteAndAppend;
-using tserver::ColumnRangePredicatePB;
 using tserver::NewScanRequestPB;
-using tserver::ScanResponsePB;
+using tserver::TabletServerFeatures;
 
 namespace client {
 
@@ -59,9 +60,9 @@ KuduScanner::Data::Data(KuduTable* table)
     read_mode_(READ_LATEST),
     is_fault_tolerant_(false),
     snapshot_timestamp_(kNoTimestamp),
+    short_circuit_(false),
     table_(DCHECK_NOTNULL(table)),
     arena_(1024, 1024*1024),
-    spec_encoder_(table->schema().schema_, &arena_),
     timeout_(MonoDelta::FromMilliseconds(kScanTimeoutMillis)),
     scan_attempts_(0) {
   SetProjectionSchema(table->schema().schema_);
@@ -78,9 +79,10 @@ Status KuduScanner::Data::CheckForErrors() {
   return StatusFromPB(last_response_.error().status());
 }
 
-void KuduScanner::Data::CopyPredicateBound(const ColumnSchema& col,
-                                           const void* bound_src,
-                                           string* bound_dst) {
+namespace {
+void CopyPredicateBound(const ColumnSchema& col,
+                        const void* bound_src,
+                        string* bound_dst) {
   const void* src;
   size_t size;
   if (col.type_info()->physical_type() == BINARY) {
@@ -96,6 +98,36 @@ void KuduScanner::Data::CopyPredicateBound(const ColumnSchema& col,
   bound_dst->assign(reinterpret_cast<const char*>(src), size);
 }
 
+void ColumnPredicateIntoPB(const ColumnPredicate& predicate,
+                           ColumnPredicatePB* pb) {
+  pb->set_column(predicate.column().name());
+  switch (predicate.predicate_type()) {
+    case PredicateType::Equality: {
+      CopyPredicateBound(predicate.column(),
+                         predicate.raw_lower(),
+                         pb->mutable_equality()->mutable_value());
+      return;
+    };
+    case PredicateType::Range: {
+      auto range_pred = pb->mutable_range();
+      if (predicate.raw_lower() != nullptr) {
+        CopyPredicateBound(predicate.column(),
+                           predicate.raw_lower(),
+                           range_pred->mutable_lower());
+      }
+      if (predicate.raw_upper() != nullptr) {
+        CopyPredicateBound(predicate.column(),
+                           predicate.raw_upper(),
+                           range_pred->mutable_upper());
+      }
+      return;
+    };
+    case PredicateType::None: LOG(FATAL) << "None predicate may not be converted to protobuf";
+  }
+  LOG(FATAL) << "unknown predicate type";
+}
+} // anonymous namespace
+
 Status KuduScanner::Data::CanBeRetried(const bool isNewScan,
                                        const Status& rpc_status, const Status& server_status,
                                        const MonoTime& actual_deadline, const MonoTime& deadline,
@@ -103,6 +135,14 @@ Status KuduScanner::Data::CanBeRetried(const bool isNewScan,
                                        set<string>* blacklist) {
   CHECK(!rpc_status.ok() || !server_status.ok());
 
+  // Check for ERROR_INVALID_REQUEST, which should not retry.
+  if (server_status.ok() &&
+      !rpc_status.ok() &&
+      controller_.error_response() != nullptr &&
+      controller_.error_response()->code() == rpc::ErrorStatusPB::ERROR_INVALID_REQUEST) {
+    return rpc_status;
+  }
+
   // Check for ERROR_SERVER_TOO_BUSY, which should result in a retry after a delay.
   if (server_status.ok() &&
       !rpc_status.ok() &&
@@ -248,20 +288,9 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
   }
 
   // Set up the predicates.
-  scan->clear_range_predicates();
-  for (const ColumnRangePredicate& pred : spec_.predicates()) {
-    const ColumnSchema& col = pred.column();
-    const ValueRange& range = pred.range();
-    ColumnRangePredicatePB* pb = scan->add_range_predicates();
-    if (range.has_lower_bound()) {
-      CopyPredicateBound(col, range.lower_bound(),
-                         pb->mutable_lower_bound());
-    }
-    if (range.has_upper_bound()) {
-      CopyPredicateBound(col, range.upper_bound(),
-                         pb->mutable_upper_bound());
-    }
-    ColumnSchemaToPB(col, pb->mutable_column());
+  scan->clear_column_predicates();
+  for (const auto& col_pred : spec_.predicates()) {
+    ColumnPredicateIntoPB(col_pred.second, scan->add_column_predicates());
   }
 
   if (spec_.lower_bound_key()) {
@@ -340,6 +369,10 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
     controller_.Reset();
     controller_.set_deadline(rpc_deadline);
 
+    if (!spec_.predicates().empty()) {
+      controller_.RequireServerFeature(TabletServerFeatures::COLUMN_PREDICATES);
+    }
+
     CHECK(ts->proxy());
     ts_ = CHECK_NOTNULL(ts);
     proxy_ = ts->proxy();
@@ -461,8 +494,6 @@ void KuduScanner::Data::SetProjectionSchema(const Schema* schema) {
   client_projection_ = KuduSchema(*schema);
 }
 
-
-
 ////////////////////////////////////////////////////////////
 // KuduScanBatch
 ////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/src/kudu/client/scanner-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h
index 9cf5ad2..8e17c59 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -21,12 +21,12 @@
 #include <string>
 #include <vector>
 
-#include "kudu/gutil/macros.h"
 #include "kudu/client/client.h"
 #include "kudu/client/row_result.h"
 #include "kudu/common/scan_spec.h"
-#include "kudu/common/predicate_encoder.h"
+#include "kudu/gutil/macros.h"
 #include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/util/auto_release_pool.h"
 
 namespace kudu {
 
@@ -107,6 +107,10 @@ class KuduScanner::Data {
   bool is_fault_tolerant_;
   int64_t snapshot_timestamp_;
 
+  // Set to true if the scan is known to be empty based on predicates and
+  // primary key bounds.
+  bool short_circuit_;
+
   // The encoded last primary key from the most recent tablet scan response.
   std::string last_primary_key_;
 
@@ -142,7 +146,6 @@ class KuduScanner::Data {
   // Machinery to store and encode raw column range predicates into
   // encoded keys.
   ScanSpec spec_;
-  RangePredicateEncoder spec_encoder_;
 
   // The tablet we're scanning.
   scoped_refptr<internal::RemoteTablet> remote_;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/src/kudu/common/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/common/CMakeLists.txt b/src/kudu/common/CMakeLists.txt
index c5deff6..04afe3e 100644
--- a/src/kudu/common/CMakeLists.txt
+++ b/src/kudu/common/CMakeLists.txt
@@ -46,12 +46,11 @@ set(COMMON_SRCS
   id_mapping.cc
   iterator_stats.cc
   key_encoder.cc
+  key_util.cc
   partial_row.cc
   partition.cc
-  predicate_encoder.cc
   rowblock.cc
   row_changelist.cc
-  row_key-util.cc
   row_operations.cc
   scan_predicate.cc
   scan_spec.cc
@@ -61,9 +60,9 @@ set(COMMON_SRCS
   wire_protocol.cc)
 
 # Workaround for clang bug https://llvm.org/bugs/show_bug.cgi?id=23757
-# in which it incorrectly optimizes row_key-util.cc and causes incorrect results.
+# in which it incorrectly optimizes key_util.cc and causes incorrect results.
 if ("${COMPILER_FAMILY}" STREQUAL "clang")
-  set_source_files_properties(row_key-util.cc PROPERTIES COMPILE_FLAGS -fwrapv)
+  set_source_files_properties(key_util.cc PROPERTIES COMPILE_FLAGS -fwrapv)
 endif()
 
 set(COMMON_LIBS
@@ -85,10 +84,10 @@ ADD_KUDU_TEST(id_mapping-test)
 ADD_KUDU_TEST(partial_row-test)
 ADD_KUDU_TEST(partition-test)
 ADD_KUDU_TEST(predicate-test)
-ADD_KUDU_TEST(predicate_encoder-test)
 ADD_KUDU_TEST(row_changelist-test)
-ADD_KUDU_TEST(row_key-util-test)
+ADD_KUDU_TEST(key_util-test)
 ADD_KUDU_TEST(row_operations-test)
+ADD_KUDU_TEST(scan_spec-test)
 ADD_KUDU_TEST(schema-test)
 ADD_KUDU_TEST(types-test)
 ADD_KUDU_TEST(wire_protocol-test)

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/src/kudu/common/column_predicate.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/column_predicate.cc b/src/kudu/common/column_predicate.cc
index 22c74ed..d1c8c67 100644
--- a/src/kudu/common/column_predicate.cc
+++ b/src/kudu/common/column_predicate.cc
@@ -19,7 +19,7 @@
 
 #include <utility>
 
-#include "kudu/common/row_key-util.h"
+#include "kudu/common/key_util.h"
 #include "kudu/common/rowblock.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
@@ -65,7 +65,7 @@ boost::optional<ColumnPredicate> ColumnPredicate::InclusiveRange(ColumnSchema co
     size_t size = column.type_info()->size();
     void*  buf = CHECK_NOTNULL(arena->AllocateBytes(size));
     memcpy(buf, upper, size);
-    if (!row_key_util::IncrementCell(column, buf, arena)) {
+    if (!key_util::IncrementCell(column, buf, arena)) {
       if (lower == nullptr) {
         return boost::none;
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/src/kudu/common/column_predicate.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/column_predicate.h b/src/kudu/common/column_predicate.h
index 3bbc81a..c90d841 100644
--- a/src/kudu/common/column_predicate.h
+++ b/src/kudu/common/column_predicate.h
@@ -20,7 +20,6 @@
 #include <boost/optional.hpp>
 #include <string>
 
-#include "kudu/common/row_key-util.h"
 #include "kudu/common/schema.h"
 
 namespace kudu {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/src/kudu/common/common.proto
----------------------------------------------------------------------
diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto
index 964629f..a04f9b4 100644
--- a/src/kudu/common/common.proto
+++ b/src/kudu/common/common.proto
@@ -268,3 +268,39 @@ message PartitionPB {
   // The encoded end partition key (exclusive).
   optional bytes partition_key_end = 3;
 }
+
+// A predicate that can be applied on a Kudu column.
+message ColumnPredicatePB {
+  // The predicate column name.
+  optional string column = 1;
+
+  message Range {
+
+    // Bounds should be encoded as follows:
+    // - STRING/BINARY values: simply the exact string value for the bound.
+    // - other type: the canonical x86 in-memory representation -- eg for
+    //   uint32s, a little-endian value.
+    //
+    // Note that this predicate type should not be used for NULL data --
+    // NULL is defined to neither be greater than or less than other values
+    // for the comparison operator. We will eventually add a special
+    // predicate type for null-ness.
+
+    // The inclusive lower bound.
+    optional bytes lower = 1;
+
+    // The exclusive upper bound.
+    optional bytes upper = 2;
+  }
+
+  message Equality {
+    // The inclusive lower bound. See comment in Range for notes on the
+    // encoding.
+    optional bytes value = 1;
+  }
+
+  oneof predicate {
+    Range range = 2;
+    Equality equality = 3;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/src/kudu/common/encoded_key.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/encoded_key.cc b/src/kudu/common/encoded_key.cc
index e4ea027..dafe150 100644
--- a/src/kudu/common/encoded_key.cc
+++ b/src/kudu/common/encoded_key.cc
@@ -19,8 +19,8 @@
 
 #include "kudu/common/encoded_key.h"
 #include "kudu/common/key_encoder.h"
+#include "kudu/common/key_util.h"
 #include "kudu/common/row.h"
-#include "kudu/common/row_key-util.h"
 
 namespace kudu {
 
@@ -95,7 +95,7 @@ Status EncodedKey::IncrementEncodedKey(const Schema& tablet_schema,
 
   // Increment the new key
   ContiguousRow new_row(&tablet_schema, new_row_key);
-  if (!row_key_util::IncrementKey(&new_row, arena)) {
+  if (!key_util::IncrementPrimaryKey(&new_row, arena)) {
     return Status::IllegalState("No lexicographically greater key exists");
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/src/kudu/common/generic_iterators-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/generic_iterators-test.cc b/src/kudu/common/generic_iterators-test.cc
index cb870be..a8406d1 100644
--- a/src/kudu/common/generic_iterators-test.cc
+++ b/src/kudu/common/generic_iterators-test.cc
@@ -112,7 +112,7 @@ TEST(TestMergeIterator, TestMergeEmpty) {
     new MaterializingIterator(
       shared_ptr<ColumnwiseIterator>(new VectorIterator(empty_vec))));
 
-  vector<shared_ptr<RowwiseIterator> > to_merge;
+  vector<shared_ptr<RowwiseIterator>> to_merge;
   to_merge.push_back(iter);
 
   MergeIterator merger(kIntSchema, to_merge);
@@ -126,14 +126,14 @@ class TestIntRangePredicate {
   TestIntRangePredicate(uint32_t lower, uint32_t upper) :
     lower_(lower),
     upper_(upper),
-    pred_(kIntSchema.column(0), &lower_, &upper_) {}
+    pred_(ColumnPredicate::Range(kIntSchema.column(0), &lower_, &upper_)) {}
 
   uint32_t lower_, upper_;
-  ColumnRangePredicate pred_;
+  ColumnPredicate pred_;
 };
 
 void TestMerge(const TestIntRangePredicate &predicate) {
-  vector<shared_ptr<RowwiseIterator> > to_merge;
+  vector<shared_ptr<RowwiseIterator>> to_merge;
   vector<uint32_t> ints;
   vector<uint32_t> all_ints;
   all_ints.reserve(FLAGS_num_rows * FLAGS_num_lists);
@@ -213,7 +213,7 @@ TEST(TestMergeIterator, TestPredicate) {
 // to single columns.
 TEST(TestMaterializingIterator, TestMaterializingPredicatePushdown) {
   ScanSpec spec;
-  TestIntRangePredicate pred1(20, 29);
+  TestIntRangePredicate pred1(20, 30);
   spec.AddPredicate(pred1.pred_);
   LOG(INFO) << "Predicate: " << pred1.pred_.ToString();
 
@@ -225,8 +225,7 @@ TEST(TestMaterializingIterator, TestMaterializingPredicatePushdown) {
   shared_ptr<VectorIterator> colwise(new VectorIterator(ints));
   MaterializingIterator materializing(colwise);
   ASSERT_OK(materializing.Init(&spec));
-  ASSERT_EQ(0, spec.predicates().size())
-    << "Iterator should have pushed down predicate";
+  ASSERT_EQ(0, spec.predicates().size()) << "Iterator should have pushed down predicate";
 
   Arena arena(1024, 1024);
   RowBlock dst(kIntSchema, 100, &arena);
@@ -245,7 +244,7 @@ TEST(TestMaterializingIterator, TestMaterializingPredicatePushdown) {
 // input.
 TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluation) {
   ScanSpec spec;
-  TestIntRangePredicate pred1(20, 29);
+  TestIntRangePredicate pred1(20, 30);
   spec.AddPredicate(pred1.pred_);
   LOG(INFO) << "Predicate: " << pred1.pred_.ToString();
 
@@ -273,7 +272,7 @@ TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluation) {
 
   ASSERT_EQ(0, spec.predicates().size())
     << "Iterator tree should have accepted predicate";
-  ASSERT_EQ(1, pred_eval->predicates_.size())
+  ASSERT_EQ(1, pred_eval->col_idx_predicates_.size())
     << "Predicate should be evaluated by the outer iterator";
 
   Arena arena(1024, 1024);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/src/kudu/common/generic_iterators.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/generic_iterators.cc b/src/kudu/common/generic_iterators.cc
index 6a0cbc0..bdabd1c 100644
--- a/src/kudu/common/generic_iterators.cc
+++ b/src/kudu/common/generic_iterators.cc
@@ -15,21 +15,29 @@
 // specific language governing permissions and limitations
 // under the License.
 
-
 #include <algorithm>
 #include <memory>
 #include <string>
+#include <tuple>
 #include <utility>
 
 #include "kudu/common/generic_iterators.h"
 #include "kudu/common/row.h"
 #include "kudu/common/rowblock.h"
 #include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/memory/arena.h"
 
+using std::all_of;
+using std::get;
+using std::move;
+using std::remove_if;
 using std::shared_ptr;
+using std::sort;
 using std::string;
+using std::tuple;
 
 DEFINE_bool(materializing_iterator_do_pushdown, true,
             "Should MaterializingIterator do predicate pushdown");
@@ -165,13 +173,11 @@ Status MergeIterator::Init(ScanSpec *spec) {
   // Before we copy any rows, clean up any iterators which were empty
   // to start with. Otherwise, HasNext() won't properly return false
   // if we were passed only empty iterators.
-  for (size_t i = 0; i < iters_.size(); i++) {
-    if (PREDICT_FALSE(iters_[i]->IsFullyExhausted())) {
-      iters_.erase(iters_.begin() + i);
-      i--;
-      continue;
-    }
-  }
+  iters_.erase(
+      remove_if(iters_.begin(), iters_.end(), [] (const shared_ptr<MergeIterState>& iter) {
+        return PREDICT_FALSE(iter->IsFullyExhausted());
+      }),
+      iters_.end());
 
   initted_ = true;
   return Status::OK();
@@ -193,7 +199,7 @@ Status MergeIterator::InitSubIterators(ScanSpec *spec) {
   // Since we handle predicates in all the wrapped iterators, we can clear
   // them here.
   if (spec != nullptr) {
-    spec->mutable_predicates()->clear();
+    spec->RemovePredicates();
   }
   return Status::OK();
 }
@@ -346,7 +352,7 @@ Status UnionIterator::InitSubIterators(ScanSpec *spec) {
   // Since we handle predicates in all the wrapped iterators, we can clear
   // them here.
   if (spec != nullptr) {
-    spec->mutable_predicates()->clear();
+    spec->RemovePredicates();
   }
   return Status::OK();
 }
@@ -426,53 +432,53 @@ void UnionIterator::GetIteratorStats(std::vector<IteratorStats>* stats) const {
 ////////////////////////////////////////////////////////////
 
 MaterializingIterator::MaterializingIterator(shared_ptr<ColumnwiseIterator> iter)
-    : iter_(std::move(iter)),
+    : iter_(move(iter)),
       disallow_pushdown_for_tests_(!FLAGS_materializing_iterator_do_pushdown) {
 }
 
 Status MaterializingIterator::Init(ScanSpec *spec) {
   RETURN_NOT_OK(iter_->Init(spec));
 
+  int32_t num_columns = schema().num_columns();
+  col_idx_predicates_.clear();
+  non_predicate_column_indexes_.clear();
+
   if (spec != nullptr && !disallow_pushdown_for_tests_) {
-    // Gather any single-column predicates.
-    ScanSpec::PredicateList *preds = spec->mutable_predicates();
-    for (auto iter = preds->begin(); iter != preds->end();) {
-      const ColumnRangePredicate &pred = *iter;
-      const string &col_name = pred.column().name();
-      int idx = schema().find_column(col_name);
-      if (idx == -1) {
-        return Status::InvalidArgument("No such column", col_name);
+    col_idx_predicates_.reserve(spec->predicates().size());
+    non_predicate_column_indexes_.reserve(num_columns - spec->predicates().size());
+
+    for (const auto& col_pred : spec->predicates()) {
+      const ColumnPredicate& pred = col_pred.second;
+      int col_idx = schema().find_column(pred.column().name());
+      if (col_idx == Schema::kColumnNotFound) {
+        return Status::InvalidArgument("No such column", col_pred.first);
       }
-
       VLOG(1) << "Pushing down predicate " << pred.ToString();
-      preds_by_column_.insert(std::make_pair(idx, pred));
-
-      // Since we'll evaluate this predicate ourselves, remove it from the scan spec
-      // so higher layers don't repeat our work.
-      iter = preds->erase(iter);
+      col_idx_predicates_.emplace_back(col_idx, move(col_pred.second));
     }
-  }
 
-  // Determine a materialization order such that columns with predicates
-  // are materialized first.
-  //
-  // TODO: we can be a little smarter about this, by trying to estimate
-  // predicate selectivity, involve the materialization cost of types, etc.
-  vector<size_t> with_preds, without_preds;
+    for (int32_t col_idx = 0; col_idx < schema().num_columns(); col_idx++) {
+      if (!ContainsKey(spec->predicates(), schema().column(col_idx).name())) {
+        non_predicate_column_indexes_.emplace_back(col_idx);
+      }
+    }
 
-  for (size_t i = 0; i < schema().num_columns(); i++) {
-    int num_preds = preds_by_column_.count(i);
-    if (num_preds > 0) {
-      with_preds.push_back(i);
-    } else {
-      without_preds.push_back(i);
+    // Since we'll evaluate these predicates ourselves, remove them from the
+    // scan spec so higher layers don't repeat our work.
+    spec->RemovePredicates();
+  } else {
+    non_predicate_column_indexes_.reserve(num_columns);
+    for (int32_t col_idx = 0; col_idx < num_columns; col_idx++) {
+      non_predicate_column_indexes_.emplace_back(col_idx);
     }
   }
 
-  materialization_order_.swap(with_preds);
-  materialization_order_.insert(materialization_order_.end(),
-                                without_preds.begin(), without_preds.end());
-  DCHECK_EQ(materialization_order_.size(), schema().num_columns());
+  // Sort the predicates by selectivity so that the most selective are evaluated earlier.
+  sort(col_idx_predicates_.begin(), col_idx_predicates_.end(),
+       [] (const tuple<int32_t, ColumnPredicate>& left,
+           const tuple<int32_t, ColumnPredicate>& right) {
+         return SelectivityComparator(get<1>(left), get<1>(right));
+       });
 
   return Status::OK();
 }
@@ -500,31 +506,28 @@ Status MaterializingIterator::MaterializeBlock(RowBlock *dst) {
   // been deleted.
   RETURN_NOT_OK(iter_->InitializeSelectionVector(dst->selection_vector()));
 
-  bool short_circuit = false;
-
-  for (size_t col_idx : materialization_order_) {
+  for (const auto& col_pred : col_idx_predicates_) {
     // Materialize the column itself into the row block.
-    ColumnBlock dst_col(dst->column_block(col_idx));
-    RETURN_NOT_OK(iter_->MaterializeColumn(col_idx, &dst_col));
-
-    // Evaluate any predicates that apply to this column.
-    auto range = preds_by_column_.equal_range(col_idx);
-    for (auto it = range.first; it != range.second; ++it) {
-      const ColumnRangePredicate &pred = it->second;
+    ColumnBlock dst_col(dst->column_block(get<0>(col_pred)));
+    RETURN_NOT_OK(iter_->MaterializeColumn(get<0>(col_pred), &dst_col));
 
-      pred.Evaluate(dst, dst->selection_vector());
+    // Evaluate the column predicate.
+    get<1>(col_pred).Evaluate(dst_col, dst->selection_vector());
 
-      // If after evaluating this predicate, the entire row block has now been
-      // filtered out, we don't need to materialize other columns at all.
-      if (!dst->selection_vector()->AnySelected()) {
-        short_circuit = true;
-        break;
-      }
-    }
-    if (short_circuit) {
-      break;
+    // If after evaluating this predicate the entire row block has been filtered
+    // out, we don't need to materialize other columns at all.
+    if (!dst->selection_vector()->AnySelected()) {
+      DVLOG(1) << "0/" << dst->nrows() << " passed predicate";
+      return Status::OK();
     }
   }
+
+  for (size_t col_idx : non_predicate_column_indexes_) {
+    // Materialize the column itself into the row block.
+    ColumnBlock dst_col(dst->column_block(col_idx));
+    RETURN_NOT_OK(iter_->MaterializeColumn(col_idx, &dst_col));
+  }
+
   DVLOG(1) << dst->selection_vector()->CountSelected() << "/"
            << dst->nrows() << " passed predicate";
   return Status::OK();
@@ -541,17 +544,15 @@ string MaterializingIterator::ToString() const {
 ////////////////////////////////////////////////////////////
 
 PredicateEvaluatingIterator::PredicateEvaluatingIterator(shared_ptr<RowwiseIterator> base_iter)
-    : base_iter_(std::move(base_iter)) {
+    : base_iter_(move(base_iter)) {
 }
 
 Status PredicateEvaluatingIterator::InitAndMaybeWrap(
   shared_ptr<RowwiseIterator> *base_iter, ScanSpec *spec) {
   RETURN_NOT_OK((*base_iter)->Init(spec));
-  if (spec != nullptr &&
-      !spec->predicates().empty()) {
+  if (spec != nullptr && !spec->predicates().empty()) {
     // Underlying iterator did not accept all predicates. Wrap it.
-    shared_ptr<RowwiseIterator> wrapper(
-      new PredicateEvaluatingIterator(*base_iter));
+    shared_ptr<RowwiseIterator> wrapper(new PredicateEvaluatingIterator(*base_iter));
     CHECK_OK(wrapper->Init(spec));
     base_iter->swap(wrapper);
   }
@@ -560,11 +561,20 @@ Status PredicateEvaluatingIterator::InitAndMaybeWrap(
 
 Status PredicateEvaluatingIterator::Init(ScanSpec *spec) {
   // base_iter_ already Init()ed before this is constructed.
-
   CHECK_NOTNULL(spec);
-  // Gather any predicates that the base iterator did not pushdown.
-  // This also clears the predicates from the spec.
-  predicates_.swap(*(spec->mutable_predicates()));
+
+  // Gather any predicates that the base iterator did not pushdown, and remove
+  // the predicates from the spec.
+  col_idx_predicates_.clear();
+  col_idx_predicates_.reserve(spec->predicates().size());
+  for (auto& predicate : spec->predicates()) {
+    col_idx_predicates_.emplace_back(move(predicate.second));
+  }
+  spec->RemovePredicates();
+
+  // Sort the predicates by selectivity so that the most selective are evaluated earlier.
+  sort(col_idx_predicates_.begin(), col_idx_predicates_.end(), SelectivityComparator);
+
   return Status::OK();
 }
 
@@ -575,8 +585,12 @@ bool PredicateEvaluatingIterator::HasNext() const {
 Status PredicateEvaluatingIterator::NextBlock(RowBlock *dst) {
   RETURN_NOT_OK(base_iter_->NextBlock(dst));
 
-  for (ColumnRangePredicate &pred : predicates_) {
-    pred.Evaluate(dst, dst->selection_vector());
+  for (const auto& predicate : col_idx_predicates_) {
+    int32_t col_idx = dst->schema().find_column(predicate.column().name());
+    if (col_idx == Schema::kColumnNotFound) {
+      return Status::InvalidArgument("Unknown column in predicate", predicate.ToString());
+    }
+    predicate.Evaluate(dst->column_block(col_idx), dst->selection_vector());
 
     // If after evaluating this predicate, the entire row block has now been
     // filtered out, we don't need to evaluate any further predicates.
@@ -589,10 +603,7 @@ Status PredicateEvaluatingIterator::NextBlock(RowBlock *dst) {
 }
 
 string PredicateEvaluatingIterator::ToString() const {
-  string s;
-  s.append("PredicateEvaluating(").append(base_iter_->ToString()).append(")");
-  return s;
+  return strings::Substitute("PredicateEvaluating($0)", base_iter_->ToString());
 }
 
-
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/src/kudu/common/generic_iterators.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/generic_iterators.h b/src/kudu/common/generic_iterators.h
index da5e706..bd4a162 100644
--- a/src/kudu/common/generic_iterators.h
+++ b/src/kudu/common/generic_iterators.h
@@ -21,6 +21,7 @@
 #include <gtest/gtest_prod.h>
 #include <memory>
 #include <string>
+#include <tuple>
 #include <unordered_map>
 #include <vector>
 
@@ -77,7 +78,6 @@ class MergeIterator : public RowwiseIterator {
   ObjectPool<ScanSpec> scan_spec_copies_;
 };
 
-
 // An iterator which unions the results of other iterators.
 // This is different from MergeIterator in that it lays the results out end-to-end
 // rather than merging them based on keys. Hence it is more efficient since there is
@@ -133,10 +133,10 @@ class UnionIterator : public RowwiseIterator {
 
 // An iterator which wraps a ColumnwiseIterator, materializing it into full rows.
 //
-// Predicates which only apply to a single column are pushed down into this iterator.
-// While materializing a block, columns with associated predicates are materialized
-// first, and the predicates evaluated. If the predicates succeed in filtering out
-// an entire batch, then other columns may avoid doing any IO.
+// Column predicates are pushed down into this iterator. While materializing a
+// block, columns with associated predicates are materialized first, and the
+// predicates evaluated. If the predicates succeed in filtering out an entire
+// batch, then other columns may avoid doing any IO.
 class MaterializingIterator : public RowwiseIterator {
  public:
   explicit MaterializingIterator(std::shared_ptr<ColumnwiseIterator> iter);
@@ -166,16 +166,16 @@ class MaterializingIterator : public RowwiseIterator {
 
   std::shared_ptr<ColumnwiseIterator> iter_;
 
-  std::unordered_multimap<size_t, ColumnRangePredicate> preds_by_column_;
+  // List of (column index, predicate) in order of most to least selective.
+  std::vector<std::tuple<int32_t, ColumnPredicate>> col_idx_predicates_;
 
-  // The order in which the columns will be materialized.
-  std::vector<size_t> materialization_order_;
+  // List of column indexes without predicates to materialize.
+  std::vector<int32_t> non_predicate_column_indexes_;
 
   // Set only by test code to disallow pushdown.
   bool disallow_pushdown_for_tests_;
 };
 
-
 // An iterator which wraps another iterator and evaluates any predicates that the
 // wrapped iterator did not itself handle during push down.
 class PredicateEvaluatingIterator : public RowwiseIterator {
@@ -213,13 +213,14 @@ class PredicateEvaluatingIterator : public RowwiseIterator {
   // Construct the evaluating iterator.
   // This is only called from ::InitAndMaybeWrap()
   // REQUIRES: base_iter is already Init()ed.
-  explicit PredicateEvaluatingIterator(
-      std::shared_ptr<RowwiseIterator> base_iter);
+  explicit PredicateEvaluatingIterator(std::shared_ptr<RowwiseIterator> base_iter);
 
   FRIEND_TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluation);
 
   std::shared_ptr<RowwiseIterator> base_iter_;
-  std::vector<ColumnRangePredicate> predicates_;
+
+  // List of (column index, predicate) in order of most to least selective.
+  std::vector<ColumnPredicate> col_idx_predicates_;
 };
 
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/src/kudu/common/key_util-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/key_util-test.cc b/src/kudu/common/key_util-test.cc
new file mode 100644
index 0000000..004e78d
--- /dev/null
+++ b/src/kudu/common/key_util-test.cc
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/common/key_util.h"
+
+#include <gtest/gtest.h>
+
+#include "kudu/common/partial_row.h"
+#include "kudu/common/row.h"
+#include "kudu/common/schema.h"
+#include "kudu/gutil/mathlimits.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+class KeyUtilTest : public KuduTest {
+ public:
+  KeyUtilTest()
+    : arena_(1024, 4096) {}
+
+ protected:
+  uint8_t* row_data(KuduPartialRow* row) {
+    return row->row_data_;
+  }
+
+  Arena arena_;
+};
+
+TEST_F(KeyUtilTest, TestIncrementNonCompositePrimaryKey) {
+  Schema schema({ ColumnSchema("key", INT32),
+                  ColumnSchema("other_col", INT32),
+                  ColumnSchema("other_col2", STRING, true) },
+                1);
+  KuduPartialRow p_row(&schema);
+  ContiguousRow row(&schema, row_data(&p_row));
+
+  // Normal increment.
+  EXPECT_OK(p_row.SetInt32(0, 1000));
+  EXPECT_TRUE(key_util::IncrementPrimaryKey(&row, &arena_));
+  EXPECT_EQ("int32 key=1001", p_row.ToString());
+
+  // Overflow increment.
+  EXPECT_OK(p_row.SetInt32(0, MathLimits<int32_t>::kMax));
+  EXPECT_FALSE(key_util::IncrementPrimaryKey(&row, &arena_));
+  EXPECT_EQ("int32 key=-2147483648", p_row.ToString());
+}
+
+TEST_F(KeyUtilTest, TestIncrementCompositePrimaryKey) {
+  Schema schema({ ColumnSchema("k1", INT32),
+                  ColumnSchema("k2", INT32),
+                  ColumnSchema("other_col", STRING, true) },
+                2);
+
+  KuduPartialRow p_row(&schema);
+  ContiguousRow row(&schema, row_data(&p_row));
+
+  // Normal increment.
+  EXPECT_OK(p_row.SetInt32(0, 1000));
+  EXPECT_OK(p_row.SetInt32(1, 1000));
+  EXPECT_TRUE(key_util::IncrementPrimaryKey(&row, &arena_));
+  EXPECT_EQ("int32 k1=1000, int32 k2=1001", p_row.ToString());
+
+  // Overflow a later part of the key, carrying into the earlier
+  // part..
+  EXPECT_OK(p_row.SetInt32(1, MathLimits<int32_t>::kMax));
+  EXPECT_TRUE(key_util::IncrementPrimaryKey(&row, &arena_));
+  EXPECT_EQ("int32 k1=1001, int32 k2=-2147483648", p_row.ToString());
+
+  // Overflow the whole key.
+  EXPECT_OK(p_row.SetInt32(0, MathLimits<int32_t>::kMax));
+  EXPECT_OK(p_row.SetInt32(1, MathLimits<int32_t>::kMax));
+  EXPECT_FALSE(key_util::IncrementPrimaryKey(&row, &arena_));
+  EXPECT_EQ("int32 k1=-2147483648, int32 k2=-2147483648", p_row.ToString());
+}
+
+TEST_F(KeyUtilTest, TestIncrementCompositeIntStringPrimaryKey) {
+  Schema schema({ ColumnSchema("k1", INT32),
+                  ColumnSchema("k2", STRING),
+                  ColumnSchema("other_col", STRING, true) },
+                2);
+
+  KuduPartialRow p_row(&schema);
+  ContiguousRow row(&schema, row_data(&p_row));
+
+  // Normal increment.
+  EXPECT_OK(p_row.SetInt32(0, 1000));
+  EXPECT_OK(p_row.SetString(1, "hello"));
+  EXPECT_TRUE(key_util::IncrementPrimaryKey(&row, &arena_));
+  EXPECT_EQ("int32 k1=1000, string k2=hello\\000", p_row.ToString());
+
+  // There's no way to overflow a string key - you can always make it higher
+  // by tacking on more \x00.
+  EXPECT_TRUE(key_util::IncrementPrimaryKey(&row, &arena_));
+  EXPECT_EQ("int32 k1=1000, string k2=hello\\000\\000", p_row.ToString());
+}
+
+TEST_F(KeyUtilTest, TestIncrementCompositeStringIntPrimaryKey) {
+  Schema schema({ ColumnSchema("k1", STRING),
+                  ColumnSchema("k2", INT32),
+                  ColumnSchema("other_col", STRING, true) },
+                2);
+
+  KuduPartialRow p_row(&schema);
+  ContiguousRow row(&schema, row_data(&p_row));
+
+  // Normal increment.
+  EXPECT_OK(p_row.SetString(0, "hello"));
+  EXPECT_OK(p_row.SetInt32(1, 1000));
+  EXPECT_TRUE(key_util::IncrementPrimaryKey(&row, &arena_));
+  EXPECT_EQ("string k1=hello, int32 k2=1001", p_row.ToString());
+
+  // Overflowing the int32 portion should tack \x00 onto the
+  // string portion.
+  EXPECT_OK(p_row.SetInt32(1, MathLimits<int32_t>::kMax));
+  EXPECT_TRUE(key_util::IncrementPrimaryKey(&row, &arena_));
+  EXPECT_EQ("string k1=hello\\000, int32 k2=-2147483648", p_row.ToString());
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/src/kudu/common/key_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/key_util.cc b/src/kudu/common/key_util.cc
new file mode 100644
index 0000000..609aaf8
--- /dev/null
+++ b/src/kudu/common/key_util.cc
@@ -0,0 +1,284 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/common/key_util.h"
+
+#include <boost/iterator/counting_iterator.hpp>
+#include <iterator>
+#include <string>
+#include <tuple>
+#include <type_traits>
+
+#include "kudu/common/column_predicate.h"
+#include "kudu/common/key_encoder.h"
+#include "kudu/common/row.h"
+#include "kudu/common/schema.h"
+#include "kudu/gutil/map-util.h"
+
+using std::string;
+using std::tuple;
+using std::unordered_map;
+using std::vector;
+
+namespace kudu {
+namespace key_util {
+
+namespace {
+
+template<DataType type>
+bool IncrementIntCell(void* cell_ptr) {
+  typedef DataTypeTraits<type> traits;
+  typedef typename traits::cpp_type cpp_type;
+
+  cpp_type orig;
+  memcpy(&orig, cell_ptr, sizeof(cpp_type));
+
+  cpp_type inc;
+  if (std::is_unsigned<cpp_type>::value) {
+    inc = orig + 1;
+  } else {
+    // Signed overflow is undefined in C. So, we'll use a branch here
+    // instead of counting on undefined behavior.
+    if (orig == MathLimits<cpp_type>::kMax) {
+      inc = MathLimits<cpp_type>::kMin;
+    } else {
+      inc = orig + 1;
+    }
+  }
+  memcpy(cell_ptr, &inc, sizeof(cpp_type));
+  return inc > orig;
+}
+
+bool IncrementStringCell(void* cell_ptr, Arena* arena) {
+  Slice orig;
+  memcpy(&orig, cell_ptr, sizeof(orig));
+  uint8_t* new_buf = CHECK_NOTNULL(
+      static_cast<uint8_t*>(arena->AllocateBytes(orig.size() + 1)));
+  memcpy(new_buf, orig.data(), orig.size());
+  new_buf[orig.size()] = '\0';
+
+  Slice inc(new_buf, orig.size() + 1);
+  memcpy(cell_ptr, &inc, sizeof(inc));
+  return true;
+}
+
+template<typename ColIdxIter>
+void SetKeyToMinValues(ColIdxIter first, ColIdxIter last, ContiguousRow* row) {
+  for (auto col_idx_it = first; col_idx_it != last; std::advance(col_idx_it, 1)) {
+    DCHECK_LE(0, *col_idx_it);
+    const ColumnSchema& col = row->schema()->column(*col_idx_it);
+    col.type_info()->CopyMinValue(row->mutable_cell_ptr(*col_idx_it));
+  }
+}
+
+// Increments a key with the provided column indices to the smallest key which
+// is greater than the current key.
+template<typename ColIdxIter>
+bool IncrementKey(ColIdxIter first,
+                  ColIdxIter last,
+                  ContiguousRow* row,
+                  Arena* arena) {
+  for (auto col_idx_it = std::prev(last);
+       std::distance(first, col_idx_it) >= 0;
+       std::advance(col_idx_it, -1)) {
+    if (IncrementCell(row->schema()->column(*col_idx_it),
+                      row->mutable_cell_ptr(*col_idx_it), arena)) {
+      return true;
+    }
+  }
+  return false;
+}
+
+template<typename ColIdxIter>
+int PushUpperBoundKeyPredicates(ColIdxIter first,
+                                ColIdxIter last,
+                                const unordered_map<string, ColumnPredicate>& predicates,
+                                ContiguousRow* row,
+                                Arena* arena) {
+
+  const Schema& schema = *CHECK_NOTNULL(row->schema());
+  int pushed_predicates = 0;
+  // Tracks whether the last pushed predicate is an equality predicate.
+  const ColumnPredicate* final_predicate = nullptr;
+
+  // Step 1: copy predicates into the row in key column order, stopping after
+  // the first range predicate.
+
+  for (auto col_idx_it = first; col_idx_it < last; std::advance(col_idx_it, 1)) {
+    const ColumnSchema& column = schema.column(*col_idx_it);
+    const ColumnPredicate* predicate = FindOrNull(predicates, column.name());
+    if (predicate == nullptr) break;
+    size_t size = column.type_info()->size();
+    if (predicate->predicate_type() == PredicateType::Equality) {
+      memcpy(row->mutable_cell_ptr(*col_idx_it), predicate->raw_lower(), size);
+      pushed_predicates++;
+      final_predicate = predicate;
+    } else if (predicate->predicate_type() == PredicateType::Range) {
+      if (predicate->raw_upper() != nullptr) {
+        memcpy(row->mutable_cell_ptr(*col_idx_it), predicate->raw_upper(), size);
+        pushed_predicates++;
+        final_predicate = predicate;
+      }
+      // After the first column with a range constraint we stop pushing
+      // constraints into the upper bound. Instead, we push minimum values
+      // to the remaining columns (below), which is the maximally tight
+      // constraint.
+      break;
+    } else {
+      LOG(FATAL) << "unexpected predicate type can not be pushed into key";
+    }
+  }
+
+  // If no predicates were pushed, no need to do any more work.
+  if (pushed_predicates == 0) { return 0; }
+
+  // Step 2: If the final predicate is an equality predicate, increment the
+  // key to convert it to an exclusive upper bound.
+  if (final_predicate->predicate_type() == PredicateType::Equality) {
+    if (!IncrementKey(first, std::next(first, pushed_predicates), row, arena)) {
+      // If the increment fails then this bound is is not constraining the keyspace.
+      return 0;
+    }
+  }
+
+  // Step 3: Fill the remaining columns without predicates with the min value.
+  SetKeyToMinValues(std::next(first, pushed_predicates), last, row);
+  return pushed_predicates;
+}
+
+template<typename ColIdxIter>
+int PushLowerBoundKeyPredicates(ColIdxIter first,
+                                ColIdxIter last,
+                                const unordered_map<string, ColumnPredicate>& predicates,
+                                ContiguousRow* row,
+                                Arena* arena) {
+  const Schema& schema = *CHECK_NOTNULL(row->schema());
+  int pushed_predicates = 0;
+
+  // Step 1: copy predicates into the row in key column order, stopping after
+  // the first missing predicate.
+
+  for (auto col_idx_it = first; col_idx_it < last; std::advance(col_idx_it, 1)) {
+    const ColumnSchema& column = schema.column(*col_idx_it);
+    const ColumnPredicate* predicate = FindOrNull(predicates, column.name());
+    if (predicate == nullptr) break;
+    size_t size = column.type_info()->size();
+    if (predicate->predicate_type() == PredicateType::Equality) {
+      memcpy(row->mutable_cell_ptr(*col_idx_it), predicate->raw_lower(), size);
+      pushed_predicates++;
+    } else if (predicate->predicate_type() == PredicateType::Range) {
+      if (predicate->raw_lower() != nullptr) {
+        memcpy(row->mutable_cell_ptr(*col_idx_it), predicate->raw_lower(), size);
+        pushed_predicates++;
+      } else {
+        break;
+      }
+    } else {
+      LOG(FATAL) << "unexpected predicate type can not be pushed into key";
+    }
+  }
+
+  // If no predicates were pushed, no need to do any more work.
+  if (pushed_predicates == 0) { return 0; }
+
+  // Step 2: Fill the remaining columns without predicates with the min value.
+  SetKeyToMinValues(std::next(first, pushed_predicates), last, row);
+  return pushed_predicates;
+}
+} // anonymous namespace
+
+bool IncrementPrimaryKey(ContiguousRow* row, Arena* arena) {
+  int32_t num_pk_cols = row->schema()->num_key_columns();
+  return IncrementKey(boost::make_counting_iterator(0),
+                      boost::make_counting_iterator(num_pk_cols),
+                      row,
+                      arena);
+}
+
+bool IncrementCell(const ColumnSchema& col, void* cell_ptr, Arena* arena) {
+  DataType type = col.type_info()->physical_type();
+  switch (type) {
+#define HANDLE_TYPE(t) case t: return IncrementIntCell<t>(cell_ptr);
+    HANDLE_TYPE(UINT8);
+    HANDLE_TYPE(UINT16);
+    HANDLE_TYPE(UINT32);
+    HANDLE_TYPE(UINT64);
+    HANDLE_TYPE(INT8);
+    HANDLE_TYPE(INT16);
+    HANDLE_TYPE(INT32);
+    HANDLE_TYPE(TIMESTAMP);
+    HANDLE_TYPE(INT64);
+    case UNKNOWN_DATA:
+    case BOOL:
+    case FLOAT:
+    case DOUBLE:
+      LOG(FATAL) << "Unable to handle type " << type << " in row keys";
+    case STRING:
+    case BINARY:
+      return IncrementStringCell(cell_ptr, arena);
+    default: CHECK(false) << "Unknown data type: " << type;
+  }
+  return false; // unreachable
+#undef HANDLE_TYPE
+}
+
+int PushLowerBoundKeyPredicates(const vector<int32_t>& col_idxs,
+                                const unordered_map<string, ColumnPredicate>& predicates,
+                                ContiguousRow* row,
+                                Arena* arena) {
+  return PushLowerBoundKeyPredicates(col_idxs.begin(), col_idxs.end(), predicates, row, arena);
+}
+
+int PushUpperBoundKeyPredicates(const vector<int32_t>& col_idxs,
+                                const unordered_map<string, ColumnPredicate>& predicates,
+                                ContiguousRow* row,
+                                Arena* arena) {
+  return PushUpperBoundKeyPredicates(col_idxs.begin(), col_idxs.end(), predicates, row, arena);
+}
+
+int PushLowerBoundPrimaryKeyPredicates(const unordered_map<string, ColumnPredicate>& predicates,
+                                       ContiguousRow* row,
+                                       Arena* arena) {
+  int32_t num_pk_cols = row->schema()->num_key_columns();
+  return PushLowerBoundKeyPredicates(boost::make_counting_iterator(0),
+                                     boost::make_counting_iterator(num_pk_cols),
+                                     predicates,
+                                     row,
+                                     arena);
+}
+
+int PushUpperBoundPrimaryKeyPredicates(const unordered_map<string, ColumnPredicate>& predicates,
+                                       ContiguousRow* row,
+                                       Arena* arena) {
+  int32_t num_pk_cols = row->schema()->num_key_columns();
+  return PushUpperBoundKeyPredicates(boost::make_counting_iterator(0),
+                                     boost::make_counting_iterator(num_pk_cols),
+                                     predicates,
+                                     row,
+                                     arena);
+}
+
+void EncodeKey(const vector<int32_t>& col_idxs, const ContiguousRow& row, string* buffer) {
+  for (int i = 0; i < col_idxs.size(); i++) {
+    int32_t col_idx = col_idxs[i];
+    const auto& encoder = GetKeyEncoder<string>(row.schema()->column(col_idx).type_info());
+    encoder.Encode(row.cell_ptr(col_idx), i + 1 == col_idxs.size(), buffer);
+  }
+}
+
+} // namespace key_util
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/src/kudu/common/key_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/key_util.h b/src/kudu/common/key_util.h
new file mode 100644
index 0000000..8dc60f4
--- /dev/null
+++ b/src/kudu/common/key_util.h
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Utility functions for working with the primary key portion of a row.
+
+#pragma once
+
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "kudu/gutil/port.h"
+
+namespace kudu {
+
+class Arena;
+class ColumnPredicate;
+class ColumnSchema;
+class ContiguousRow;
+
+namespace key_util {
+
+// Increments the primary key with the provided column indices to the smallest
+// key which is greater than the current key.
+//
+// For example, for a composite key with types (INT8, INT8), incrementing
+// the row (1, 1) will result in (1, 2). Incrementing (1, 127) will result
+// in (2, -128).
+//
+// Note that not all keys may be incremented without overflow. For example,
+// if the key is an INT8, and the key is already set to '127', incrementing
+// would overflow. In this case, the value is incremented and overflowed, but
+// the function returns 'false' to indicate the overflow condition. Otherwise,
+// returns 'true'.
+//
+// String and binary types are incremented by appending a '\0' byte to the end.
+// Since our string and binary types have unbounded length, this implies that if
+// a key has a string or binary component, it will always be incremented.
+//
+// For the case of incrementing string or binary types, we allocate a new copy
+// from 'arena', which must be non-NULL.
+//
+// REQUIRES: all primary key columns must be initialized.
+bool IncrementPrimaryKey(ContiguousRow* row, Arena* arena) WARN_UNUSED_RESULT;
+
+// Increments the provided cell in place.
+bool IncrementCell(const ColumnSchema& col, void* cell_ptr, Arena* arena);
+
+// Pushes lower bound key predicates into the row. Returns the number of pushed
+// predicates. Unpushed predicate columns will be set to the minimum value
+// (unless no predicates are pushed at all).
+int PushLowerBoundKeyPredicates(
+    const std::vector<int32_t>& col_idxs,
+    const std::unordered_map<std::string, ColumnPredicate>& predicates,
+    ContiguousRow* row,
+    Arena* arena);
+
+// Pushes upper bound key predicates into the row. Returns the number of pushed
+// predicates. Unpushed predicate columns will be set to the minimum value
+// (unless no predicates are pushed at all).
+int PushUpperBoundKeyPredicates(
+    const std::vector<int32_t>& col_idxs,
+    const std::unordered_map<std::string, ColumnPredicate>& predicates,
+    ContiguousRow* row,
+    Arena* arena);
+
+// Pushes lower bound key predicates into the row. Returns the number of pushed
+// predicates. Unpushed predicate columns will be set to the minimum value
+// (unless no predicates are pushed at all).
+int PushLowerBoundPrimaryKeyPredicates(
+    const std::unordered_map<std::string, ColumnPredicate>& predicates,
+    ContiguousRow* row,
+    Arena* arena);
+
+// Pushes upper bound key predicates into the row. Returns the number of pushed
+// predicates. Unpushed predicate columns will be set to the minimum value
+// (unless no predicates are pushed at all).
+int PushUpperBoundPrimaryKeyPredicates(
+    const std::unordered_map<std::string, ColumnPredicate>& predicates,
+    ContiguousRow* row,
+    Arena* arena);
+
+// Appends the encoded key into the buffer.
+void EncodeKey(const std::vector<int32_t>& col_idxs,
+               const ContiguousRow& row,
+               std::string* buffer);
+
+} // namespace key_util
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/src/kudu/common/partial_row.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/partial_row.h b/src/kudu/common/partial_row.h
index 724fc69..eac4fab 100644
--- a/src/kudu/common/partial_row.h
+++ b/src/kudu/common/partial_row.h
@@ -189,11 +189,12 @@ class KUDU_EXPORT KuduPartialRow {
   const Schema* schema() const { return schema_; }
 
  private:
-  friend class RowKeyUtilTest;
+  friend class KeyUtilTest;
   friend class RowOperationsPBDecoder;
   friend class RowOperationsPBEncoder;
   friend class client::KuduWriteOperation;   // for row_data_.
   friend class PartitionSchema;
+  friend class TestScanSpec;
   template<typename KeyTypeWrapper> friend struct client::SliceKeysTestSetup;
   template<typename KeyTypeWrapper> friend struct client::IntKeysTestSetup;
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/src/kudu/common/partition.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc
index 2e0efc9..d8910ce 100644
--- a/src/kudu/common/partition.cc
+++ b/src/kudu/common/partition.cc
@@ -21,7 +21,6 @@
 #include <set>
 
 #include "kudu/common/partial_row.h"
-#include "kudu/common/row_key-util.h"
 #include "kudu/common/scan_predicate.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/gutil/map-util.h"

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53e67e9e/src/kudu/common/predicate_encoder-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/predicate_encoder-test.cc b/src/kudu/common/predicate_encoder-test.cc
deleted file mode 100644
index b2486d1..0000000
--- a/src/kudu/common/predicate_encoder-test.cc
+++ /dev/null
@@ -1,305 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <glog/logging.h>
-#include <gtest/gtest.h>
-#include <vector>
-
-#include "kudu/common/schema.h"
-#include "kudu/common/predicate_encoder.h"
-#include "kudu/util/test_macros.h"
-#include "kudu/util/test_util.h"
-
-namespace kudu {
-
-class TestRangePredicateEncoder : public KuduTest {
- public:
-  explicit TestRangePredicateEncoder(const Schema& s)
-    : arena_(1024, 256 * 1024),
-      schema_(s),
-      enc_(&schema_, &arena_) {}
-
-  enum ComparisonOp {
-    GE,
-    EQ,
-    LE
-  };
-
-  template<class T>
-  void AddPredicate(ScanSpec* spec, StringPiece col,
-                    ComparisonOp op, T val) {
-    int idx = schema_.find_column(col);
-    CHECK_GE(idx, 0);
-
-    void* upper = nullptr;
-    void* lower = nullptr;
-    void* val_void = arena_.AllocateBytes(sizeof(val));
-    memcpy(val_void, &val, sizeof(val));
-
-    switch (op) {
-      case GE:
-        lower = val_void;
-        break;
-      case EQ:
-        lower = upper = val_void;
-        break;
-      case LE:
-        upper = val_void;
-        break;
-    }
-
-    ColumnRangePredicate pred(schema_.column(idx), lower, upper);
-    spec->AddPredicate(pred);
-  }
-
-
- protected:
-  Arena arena_;
-  Schema schema_;
-  RangePredicateEncoder enc_;
-};
-
-class CompositeIntKeysTest : public TestRangePredicateEncoder {
- public:
-  CompositeIntKeysTest() :
-    TestRangePredicateEncoder(
-        Schema({ ColumnSchema("a", UINT8),
-                 ColumnSchema("b", UINT8),
-                 ColumnSchema("c", UINT8) },
-               3)) {
-  }
-};
-
-// Test that multiple predicates on a column are collapsed by
-// RangePredicateEncoder::Simplify()
-TEST_F(CompositeIntKeysTest, TestSimplify) {
-  ScanSpec spec;
-  AddPredicate<uint8_t>(&spec, "a", EQ, 255);
-  AddPredicate<uint8_t>(&spec, "b", GE, 3);
-  AddPredicate<uint8_t>(&spec, "b", LE, 255);
-  AddPredicate<uint8_t>(&spec, "b", LE, 200);
-  AddPredicate<uint8_t>(&spec, "c", LE, 128);
-  SCOPED_TRACE(spec.ToStringWithSchema(schema_));
-  vector<RangePredicateEncoder::SimplifiedBounds> bounds;
-  enc_.SimplifyBounds(spec, &bounds);
-  ASSERT_EQ(3, bounds.size());
-  ASSERT_EQ("(`a` BETWEEN 255 AND 255)",
-            ColumnRangePredicate(schema_.column(0), bounds[0].lower, bounds[0].upper).ToString());
-
-  ASSERT_EQ("(`b` BETWEEN 3 AND 200)",
-            ColumnRangePredicate(schema_.column(1), bounds[1].lower, bounds[1].upper).ToString());
-  ASSERT_EQ("(`c` <= 128)",
-            ColumnRangePredicate(schema_.column(2), bounds[2].lower, bounds[2].upper).ToString());
-}
-
-// Predicate: a == 128
-TEST_F(CompositeIntKeysTest, TestPrefixEquality) {
-  ScanSpec spec;
-  AddPredicate<uint8_t>(&spec, "a", EQ, 128);
-  SCOPED_TRACE(spec.ToStringWithSchema(schema_));
-  ASSERT_NO_FATAL_FAILURE(enc_.EncodeRangePredicates(&spec, true));
-  // Expect: key >= (128, 0, 0) AND key < (129, 0, 0)
-  EXPECT_EQ("PK >= (uint8 a=128, uint8 b=0, uint8 c=0) AND "
-            "PK < (uint8 a=129, uint8 b=0, uint8 c=0)",
-            spec.ToStringWithSchema(schema_));
-}
-
-// Predicate: a <= 254
-TEST_F(CompositeIntKeysTest, TestPrefixUpperBound) {
-  ScanSpec spec;
-  AddPredicate<uint8_t>(&spec, "a", LE, 254);
-  SCOPED_TRACE(spec.ToStringWithSchema(schema_));
-  ASSERT_NO_FATAL_FAILURE(enc_.EncodeRangePredicates(&spec, true));
-  EXPECT_EQ("PK < (uint8 a=255, uint8 b=0, uint8 c=0)",
-            spec.ToStringWithSchema(schema_));
-}
-
-// Predicate: a >= 254
-TEST_F(CompositeIntKeysTest, TestPrefixLowerBound) {
-  // Predicate: a >= 254
-  ScanSpec spec;
-  AddPredicate<uint8_t>(&spec, "a", GE, 254);
-  SCOPED_TRACE(spec.ToStringWithSchema(schema_));
-  ASSERT_NO_FATAL_FAILURE(enc_.EncodeRangePredicates(&spec, true));
-  EXPECT_EQ("PK >= (uint8 a=254, uint8 b=0, uint8 c=0)", spec.ToStringWithSchema(schema_));
-}
-
-// Test a predicate on a non-prefix part of the key. Can't be pushed.
-//
-// Predicate: b == 128
-TEST_F(CompositeIntKeysTest, TestNonPrefix) {
-  ScanSpec spec;
-  AddPredicate<uint8_t>(&spec, "b", EQ, 128);
-  SCOPED_TRACE(spec.ToStringWithSchema(schema_));
-  ASSERT_NO_FATAL_FAILURE(enc_.EncodeRangePredicates(&spec, true));
-  // Expect: nothing pushed (predicate is still on `b`, not PK)
-  EXPECT_EQ("(`b` BETWEEN 128 AND 128)",
-            spec.ToStringWithSchema(schema_));
-}
-
-// Test what happens when an upper bound on a cell is equal to the maximum
-// value for the cell. In this case, the preceding cell is also at the maximum
-// value as well, so we eliminate the upper bound entirely.
-//
-// Predicate: a == 255 AND b BETWEEN 3 AND 255
-TEST_F(CompositeIntKeysTest, TestRedundantUpperBound) {
-  ScanSpec spec;
-  AddPredicate<uint8_t>(&spec, "a", EQ, 255);
-  AddPredicate<uint8_t>(&spec, "b", GE, 3);
-  AddPredicate<uint8_t>(&spec, "b", LE, 255);
-  SCOPED_TRACE(spec.ToStringWithSchema(schema_));
-  ASSERT_NO_FATAL_FAILURE(enc_.EncodeRangePredicates(&spec, true));
-  EXPECT_EQ("PK >= (uint8 a=255, uint8 b=3, uint8 c=0)", spec.ToStringWithSchema(schema_));
-}
-
-// A similar test, but in this case we still have an equality prefix
-// that needs to be accounted for, so we can't eliminate the upper bound
-// entirely.
-//
-// Predicate: a == 1 AND b BETWEEN 3 AND 255
-TEST_F(CompositeIntKeysTest, TestRedundantUpperBound2) {
-  ScanSpec spec;
-  AddPredicate<uint8_t>(&spec, "a", EQ, 1);
-  AddPredicate<uint8_t>(&spec, "b", GE, 3);
-  AddPredicate<uint8_t>(&spec, "b", LE, 255);
-  SCOPED_TRACE(spec.ToStringWithSchema(schema_));
-  ASSERT_NO_FATAL_FAILURE(enc_.EncodeRangePredicates(&spec, true));
-  EXPECT_EQ("PK >= (uint8 a=1, uint8 b=3, uint8 c=0) AND "
-            "PK < (uint8 a=2, uint8 b=0, uint8 c=0)",
-            spec.ToStringWithSchema(schema_));
-}
-
-// Test that, if so desired, pushed predicates are not erased.
-//
-// Predicate: a == 254
-TEST_F(CompositeIntKeysTest, TestNoErasePredicates) {
-  ScanSpec spec;
-  AddPredicate<uint8_t>(&spec, "a", EQ, 254);
-  SCOPED_TRACE(spec.ToStringWithSchema(schema_));
-  ASSERT_NO_FATAL_FAILURE(enc_.EncodeRangePredicates(&spec, false));
-  EXPECT_EQ("PK >= (uint8 a=254, uint8 b=0, uint8 c=0) AND "
-            "PK < (uint8 a=255, uint8 b=0, uint8 c=0)\n"
-            "(`a` BETWEEN 254 AND 254)", spec.ToStringWithSchema(schema_));
-}
-
-// Test that, if pushed predicates are erased, that we don't
-// erase non-pushed predicates.
-// Because we have no predicate on column 'b', we can't push a
-// a range predicate that includes 'c'.
-//
-// Predicate: a == 254 AND c == 254
-TEST_F(CompositeIntKeysTest, TestNoErasePredicates2) {
-  ScanSpec spec;
-  AddPredicate<uint8_t>(&spec, "a", EQ, 254);
-  AddPredicate<uint8_t>(&spec, "c", EQ, 254);
-  SCOPED_TRACE(spec.ToStringWithSchema(schema_));
-  ASSERT_NO_FATAL_FAILURE(enc_.EncodeRangePredicates(&spec, true));
-  // The predicate on column A should be pushed while "c" remains.
-  EXPECT_EQ("PK >= (uint8 a=254, uint8 b=0, uint8 c=0) AND "
-            "PK < (uint8 a=255, uint8 b=0, uint8 c=0)\n"
-            "(`c` BETWEEN 254 AND 254)", spec.ToStringWithSchema(schema_));
-}
-
-// Test that predicates added out of key order are OK.
-//
-// Predicate: b == 254 AND a == 254
-TEST_F(CompositeIntKeysTest, TestPredicateOrderDoesntMatter) {
-  ScanSpec spec;
-  AddPredicate<uint8_t>(&spec, "b", EQ, 254);
-  AddPredicate<uint8_t>(&spec, "a", EQ, 254);
-  SCOPED_TRACE(spec.ToStringWithSchema(schema_));
-  ASSERT_NO_FATAL_FAILURE(enc_.EncodeRangePredicates(&spec, true));
-  EXPECT_EQ("PK >= (uint8 a=254, uint8 b=254, uint8 c=0) AND "
-            "PK < (uint8 a=254, uint8 b=255, uint8 c=0)",
-            spec.ToStringWithSchema(schema_));
-}
-
-// Tests for String parts in composite keys
-//------------------------------------------------------------
-class CompositeIntStringKeysTest : public TestRangePredicateEncoder {
- public:
-  CompositeIntStringKeysTest() :
-    TestRangePredicateEncoder(
-        Schema({ ColumnSchema("a", UINT8),
-                 ColumnSchema("b", STRING),
-                 ColumnSchema("c", STRING) },
-               3)) {
-  }
-};
-
-
-// Predicate: a == 128
-TEST_F(CompositeIntStringKeysTest, TestPrefixEquality) {
-  ScanSpec spec;
-  AddPredicate<uint8_t>(&spec, "a", EQ, 128);
-  SCOPED_TRACE(spec.ToStringWithSchema(schema_));
-  ASSERT_NO_FATAL_FAILURE(enc_.EncodeRangePredicates(&spec, true));
-  // Expect: key >= (128, "", "") AND key < (129, "", "")
-  EXPECT_EQ("PK >= (uint8 a=128, string b=, string c=) AND "
-            "PK < (uint8 a=129, string b=, string c=)",
-            spec.ToStringWithSchema(schema_));
-}
-
-// Predicate: a == 128 AND b = "abc"
-TEST_F(CompositeIntStringKeysTest, TestPrefixEqualityWithString) {
-  ScanSpec spec;
-  AddPredicate<uint8_t>(&spec, "a", EQ, 128);
-  AddPredicate<Slice>(&spec, "b", EQ, Slice("abc"));
-  SCOPED_TRACE(spec.ToString());
-  ASSERT_NO_FATAL_FAILURE(enc_.EncodeRangePredicates(&spec, true));
-  EXPECT_EQ("PK >= (uint8 a=128, string b=abc, string c=) AND "
-            "PK < (uint8 a=128, string b=abc\\000, string c=)",
-            spec.ToStringWithSchema(schema_));
-}
-
-// Tests for non-composite int key
-//------------------------------------------------------------
-class SingleIntKeyTest : public TestRangePredicateEncoder {
- public:
-  SingleIntKeyTest() :
-    TestRangePredicateEncoder(
-        Schema({ ColumnSchema("a", UINT8) }, 1)) {
-    }
-};
-
-TEST_F(SingleIntKeyTest, TestEquality) {
-  ScanSpec spec;
-  AddPredicate<uint8_t>(&spec, "a", EQ, 128);
-  SCOPED_TRACE(spec.ToString());
-  ASSERT_NO_FATAL_FAILURE(enc_.EncodeRangePredicates(&spec, true));
-  EXPECT_EQ("PK >= (uint8 a=128) AND "
-            "PK < (uint8 a=129)",
-            spec.ToStringWithSchema(schema_));
-}
-
-TEST_F(SingleIntKeyTest, TestRedundantUpperBound) {
-  ScanSpec spec;
-  AddPredicate<uint8_t>(&spec, "a", EQ, 255);
-  SCOPED_TRACE(spec.ToString());
-  ASSERT_NO_FATAL_FAILURE(enc_.EncodeRangePredicates(&spec, true));
-  EXPECT_EQ("PK >= (uint8 a=255)",
-            spec.ToStringWithSchema(schema_));
-}
-
-TEST_F(SingleIntKeyTest, TestNoPredicates) {
-  ScanSpec spec;
-  SCOPED_TRACE(spec.ToString());
-  ASSERT_NO_FATAL_FAILURE(enc_.EncodeRangePredicates(&spec, true));
-  EXPECT_EQ("", spec.ToStringWithSchema(schema_));
-}
-
-} // namespace kudu