You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2016/07/19 21:32:31 UTC

incubator-kudu git commit: [c++-client]: cache non-covering ranges in meta cache

Repository: incubator-kudu
Updated Branches:
  refs/heads/master 7e2783c67 -> e831ab430


[c++-client]: cache non-covering ranges in meta cache

This commit introduces a few features to the meta cache, all with the aim of
making it compatible with the upcoming add/drop range partitions feature.

1) Non-covered range partitions are now cached in the meta cache.  This is
achieved by storing MetaCacheEntry objects in the meta cache's partition-key
index instead of RemoteTablets.  The MetaCacheEntry holds either a RemoteTablet,
in which case it represents a covered partition range, or it represents a
non-covered partition range.

2) Entries are now removed from the meta cache's partition-key index when it can
be determined that the entries are no longer valid from the results of a
GetTableLocations RPC.

3) A basic TTL has been added to the GetTableLocationsResponsePB so that the
client can properly refresh the meta cache when necessary. The TTL is
configurable by the master, and defaults to one hour.

Change-Id: I05bcb3fe05d51d7c455e1d68bd2baa6f3c2b9d21
Reviewed-on: http://gerrit.cloudera.org:8080/3581
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/e831ab43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/e831ab43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/e831ab43

Branch: refs/heads/master
Commit: e831ab4303640443143c4635796bc24262314637
Parents: 7e2783c
Author: Dan Burkert <da...@cloudera.com>
Authored: Wed Jul 6 17:35:56 2016 -0700
Committer: Dan Burkert <da...@cloudera.com>
Committed: Tue Jul 19 21:31:50 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/client-test.cc     |  62 +++++--
 src/kudu/client/client.h           |   3 +-
 src/kudu/client/meta_cache.cc      | 308 +++++++++++++++++++++++---------
 src/kudu/client/meta_cache.h       | 113 +++++++++++-
 src/kudu/client/schema.h           |   2 +
 src/kudu/client/table-internal.cc  |  12 +-
 src/kudu/gutil/map-util.h          |  21 +++
 src/kudu/master/catalog_manager.cc |   7 +
 src/kudu/master/master.proto       |   4 +
 src/kudu/tools/ksck_remote.cc      |  35 ++--
 10 files changed, 433 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e831ab43/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 404c7b7..b2e5645 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -59,6 +59,7 @@
 #include "kudu/tserver/ts_tablet_manager.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/net/sockaddr.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/test_util.h"
@@ -76,6 +77,7 @@ DECLARE_int32(scanner_gc_check_interval_us);
 DECLARE_int32(scanner_inject_latency_on_each_batch_ms);
 DECLARE_int32(scanner_max_batch_size_bytes);
 DECLARE_int32(scanner_ttl_ms);
+DECLARE_int32(table_locations_ttl_ms);
 DEFINE_int32(test_scan_num_rows, 1000, "Number of rows to insert and scan");
 
 METRIC_DECLARE_counter(rpcs_queue_overflow);
@@ -139,6 +141,17 @@ class ClientTest : public KuduTest {
     ASSERT_NO_FATAL_FAILURE(CreateTable(kTable2Name, 1, {}, {}, &client_table2_));
   }
 
+  // Looks up the remote tablet entry for a given partition key in the meta cache.
+  scoped_refptr<internal::RemoteTablet> MetaCacheLookup(KuduTable* table,
+                                                        const string& partition_key) {
+    scoped_refptr<internal::RemoteTablet> rt;
+    Synchronizer sync;
+    client_->data_->meta_cache_->LookupTabletByKey(table, partition_key, MonoTime::Max(), &rt,
+                                                   sync.AsStatusCallback());
+    CHECK_OK(sync.Wait());
+    return rt;
+  }
+
   // Generate a set of split rows for tablets used in this test.
   vector<unique_ptr<KuduPartialRow>> GenerateSplitRows() {
     vector<unique_ptr<KuduPartialRow>> rows;
@@ -1216,6 +1229,32 @@ TEST_F(ClientTest, TestNonCoveringRangePartitions) {
   }
 }
 
+TEST_F(ClientTest, TestMetaCacheExpiry) {
+  google::FlagSaver saver;
+  FLAGS_table_locations_ttl_ms = 25;
+  auto& meta_cache = client_->data_->meta_cache_;
+
+  // Clear the cache.
+  meta_cache->ClearCacheForTesting();
+  internal::MetaCacheEntry entry;
+  ASSERT_FALSE(meta_cache->LookupTabletByKeyFastPath(client_table_.get(), "", &entry));
+
+  // Prime the cache.
+  CHECK_NOTNULL(MetaCacheLookup(client_table_.get(), "").get());
+  ASSERT_TRUE(meta_cache->LookupTabletByKeyFastPath(client_table_.get(), "", &entry));
+  ASSERT_FALSE(entry.stale());
+
+  // Sleep in order to expire the cache.
+  SleepFor(MonoDelta::FromMilliseconds(FLAGS_table_locations_ttl_ms));
+  ASSERT_TRUE(entry.stale());
+  ASSERT_FALSE(meta_cache->LookupTabletByKeyFastPath(client_table_.get(), "", &entry));
+
+  // Force a lookup and ensure the entry is refreshed.
+  CHECK_NOTNULL(MetaCacheLookup(client_table_.get(), "").get());
+  ASSERT_TRUE(meta_cache->LookupTabletByKeyFastPath(client_table_.get(), "", &entry));
+  ASSERT_FALSE(entry.stale());
+}
+
 TEST_F(ClientTest, TestGetTabletServerBlacklist) {
   shared_ptr<KuduTable> table;
   ASSERT_NO_FATAL_FAILURE(CreateTable("blacklist",
@@ -1229,10 +1268,7 @@ TEST_F(ClientTest, TestGetTabletServerBlacklist) {
   // We have to loop since some replicas may have been created slowly.
   scoped_refptr<internal::RemoteTablet> rt;
   while (true) {
-    Synchronizer sync;
-    client_->data_->meta_cache_->LookupTabletByKey(table.get(), "", MonoTime::Max(), &rt,
-                                                  sync.AsStatusCallback());
-    ASSERT_OK(sync.Wait());
+    rt = MetaCacheLookup(table.get(), "");
     ASSERT_TRUE(rt.get() != nullptr);
     vector<internal::RemoteTabletServer*> tservers;
     rt->GetRemoteTabletServers(&tservers);
@@ -1709,8 +1745,8 @@ TEST_F(ClientTest, TestWriteTimeout) {
     gscoped_ptr<KuduError> error = GetSingleErrorFromSession(session.get());
     ASSERT_TRUE(error->status().IsTimedOut()) << error->status().ToString();
     ASSERT_STR_CONTAINS(error->status().ToString(),
-                        "GetTableLocations(client-testtb, int32 key=1, 1) "
-                        "failed: timed out after deadline expired");
+                        "GetTableLocations { table: 'client-testtb', partition-key: (int32 key=1),"
+                        " attempt: 1 } failed: timed out after deadline expired");
   }
 
   // Next time out the actual write on the tablet server.
@@ -2390,12 +2426,7 @@ TEST_F(ClientTest, TestReplicatedMultiTabletTableFailover) {
   ASSERT_NO_FATAL_FAILURE(InsertTestRows(table.get(), kNumRowsToWrite));
 
   // Find the leader of the first tablet.
-  Synchronizer sync;
-  scoped_refptr<internal::RemoteTablet> rt;
-  client_->data_->meta_cache_->LookupTabletByKey(table.get(), "",
-                                                 MonoTime::Max(),
-                                                 &rt, sync.AsStatusCallback());
-  ASSERT_OK(sync.Wait());
+  scoped_refptr<internal::RemoteTablet> rt = MetaCacheLookup(table.get(), "");
   internal::RemoteTabletServer *rts = rt->LeaderTServer();
 
   // Kill the leader of the first tablet.
@@ -2455,12 +2486,7 @@ TEST_F(ClientTest, TestReplicatedTabletWritesWithLeaderElection) {
   SleepFor(MonoDelta::FromMilliseconds(1500));
 
   // Find the leader replica
-  Synchronizer sync;
-  scoped_refptr<internal::RemoteTablet> rt;
-  client_->data_->meta_cache_->LookupTabletByKey(table.get(), "",
-                                                 MonoTime::Max(),
-                                                 &rt, sync.AsStatusCallback());
-  ASSERT_OK(sync.Wait());
+  scoped_refptr<internal::RemoteTablet> rt = MetaCacheLookup(table.get(), "");
   internal::RemoteTabletServer *rts;
   set<string> blacklist;
   vector<internal::RemoteTabletServer*> candidates;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e831ab43/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index c68cc96..710768c 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -281,6 +281,7 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
   friend class internal::RemoteTablet;
   friend class internal::RemoteTabletServer;
   friend class internal::WriteRpc;
+  friend class ClientTest;
   friend class KuduClientBuilder;
   friend class KuduScanner;
   friend class KuduScanTokenBuilder;
@@ -292,8 +293,8 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
   FRIEND_TEST(ClientTest, TestGetTabletServerBlacklist);
   FRIEND_TEST(ClientTest, TestMasterDown);
   FRIEND_TEST(ClientTest, TestMasterLookupPermits);
+  FRIEND_TEST(ClientTest, TestMetaCacheExpiry);
   FRIEND_TEST(ClientTest, TestNonCoveringRangePartitions);
-  FRIEND_TEST(ClientTest, TestReplicatedMultiTabletTableFailover);
   FRIEND_TEST(ClientTest, TestReplicatedTabletWritesWithLeaderElection);
   FRIEND_TEST(ClientTest, TestScanFaultTolerance);
   FRIEND_TEST(ClientTest, TestScanTimeout);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e831ab43/src/kudu/client/meta_cache.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc
index 3922674..92cbe78 100644
--- a/src/kudu/client/meta_cache.cc
+++ b/src/kudu/client/meta_cache.cc
@@ -59,6 +59,10 @@ namespace client {
 
 namespace internal {
 
+namespace {
+const int MAX_RETURNED_TABLE_LOCATIONS = 10;
+} // anonymous namespace
+
 ////////////////////////////////////////////////////////////
 
 RemoteTabletServer::RemoteTabletServer(const master::TSInfoPB& pb)
@@ -266,12 +270,12 @@ void RemoteTablet::MarkTServerAsFollower(const RemoteTabletServer* server) {
                 << server->ToString() << ". Replicas: " << ReplicasAsStringUnlocked();
 }
 
-std::string RemoteTablet::ReplicasAsString() const {
+string RemoteTablet::ReplicasAsString() const {
   std::lock_guard<simple_spinlock> l(lock_);
   return ReplicasAsStringUnlocked();
 }
 
-std::string RemoteTablet::ReplicasAsStringUnlocked() const {
+string RemoteTablet::ReplicasAsStringUnlocked() const {
   DCHECK(lock_.is_locked());
   string replicas_str;
   for (const RemoteReplica& rep : replicas_) {
@@ -284,6 +288,42 @@ std::string RemoteTablet::ReplicasAsStringUnlocked() const {
   return replicas_str;
 }
 
+bool MetaCacheEntry::Contains(const string& partition_key) const {
+  DCHECK(Initialized());
+  return lower_bound_partition_key() <= partition_key &&
+         (upper_bound_partition_key().empty() || upper_bound_partition_key() > partition_key);
+}
+
+bool MetaCacheEntry::stale() const {
+  DCHECK(Initialized());
+  return expiration_time_.ComesBefore(MonoTime::Now(MonoTime::FINE)) ||
+         (!is_non_covered_range() && tablet_->stale());
+}
+
+string MetaCacheEntry::DebugString(const KuduTable* table) const {
+  DCHECK(Initialized());
+  const string& lower_bound = lower_bound_partition_key();
+  const string& upper_bound = upper_bound_partition_key();
+
+  string lower_bound_string = lower_bound.empty() ? "<start>" :
+    table->partition_schema().PartitionKeyDebugString(lower_bound, *table->schema().schema_);
+
+  string upper_bound_string = upper_bound.empty() ? "<end>" :
+    table->partition_schema().PartitionKeyDebugString(upper_bound, *table->schema().schema_);
+
+  MonoDelta ttl = expiration_time_.GetDeltaSince(MonoTime::Now(MonoTime::FINE));
+
+  if (is_non_covered_range()) {
+    return strings::Substitute(
+        "NonCoveredRange { lower_bound: ($0), upper_bound: ($1), ttl: $2ms }",
+        lower_bound_string, upper_bound_string, ttl.ToMilliseconds());
+  } else {
+    return strings::Substitute(
+        "Tablet { id: $0, lower_bound: ($1), upper_bound: ($2), ttl: $3ms }",
+        tablet()->tablet_id(), lower_bound_string, upper_bound_string, ttl.ToMilliseconds());
+  }
+}
+
 MetaCacheServerPicker::MetaCacheServerPicker(KuduClient* client,
                                              const scoped_refptr<MetaCache>& meta_cache,
                                              const KuduTable* table,
@@ -479,11 +519,13 @@ class LookupRpc : public Rpc {
   virtual void SendRpc() OVERRIDE;
   virtual string ToString() const OVERRIDE;
 
+  const GetTableLocationsRequestPB& req() const { return req_; }
   const GetTableLocationsResponsePB& resp() const { return resp_; }
   const string& table_name() const { return table_->name(); }
   const string& table_id() const { return table_->id(); }
   const string& partition_key() const { return partition_key_; }
   bool is_exact_lookup() const { return is_exact_lookup_; }
+  const KuduTable* table() const { return table_; }
 
  private:
   virtual void SendRpcCb(const Status& status) OVERRIDE;
@@ -522,7 +564,7 @@ class LookupRpc : public Rpc {
 
   // When lookup finishes successfully, the selected tablet is
   // written here prior to invoking 'user_cb_'.
-  scoped_refptr<RemoteTablet> *remote_tablet_;
+  scoped_refptr<RemoteTablet>* remote_tablet_;
 
   // Whether this lookup has acquired a master lookup permit.
   bool has_permit_;
@@ -559,26 +601,38 @@ LookupRpc::~LookupRpc() {
 
 void LookupRpc::SendRpc() {
   // Fast path: lookup in the cache.
-  scoped_refptr<RemoteTablet> result;
-  if (PREDICT_TRUE(meta_cache_->LookupTabletByKeyFastPath(table_, partition_key_, &result)) &&
-      result->HasLeader()) {
-    VLOG(3) << "Fast lookup: found tablet " << result->tablet_id()
-            << " for " << table_->partition_schema()
-                                 .PartitionKeyDebugString(partition_key_, *table_->schema().schema_)
-            << " of " << table_->name();
-    if (remote_tablet_) {
-      *remote_tablet_ = result;
+  MetaCacheEntry entry;
+  while (PREDICT_TRUE(meta_cache_->LookupTabletByKeyFastPath(table_, partition_key_, &entry))
+         && (entry.is_non_covered_range() || entry.tablet()->HasLeader())) {
+    VLOG(4) << "Fast lookup: found " << entry.DebugString(table_)
+            << " for ("
+            << table_->partition_schema()
+                      .PartitionKeyDebugString(partition_key_, *table_->schema().schema_)
+            << ") of " << table_->name();
+    if (!entry.is_non_covered_range()) {
+      if (remote_tablet_) {
+        *remote_tablet_ = entry.tablet();
+      }
+      user_cb_.Run(Status::OK());
+      delete this;
+      return;
     }
-    user_cb_.Run(Status::OK());
-    delete this;
-    return;
+    if (is_exact_lookup_ || entry.upper_bound_partition_key().empty()) {
+      user_cb_.Run(Status::NotFound(
+            "No tablet covering the requested range partition",
+            table_->partition_schema()
+                   .PartitionKeyDebugString(partition_key_, *table_->schema().schema_)));
+      delete this;
+      return;
+    }
+    partition_key_ = entry.upper_bound_partition_key();
   }
 
   // Slow path: must lookup the tablet in the master.
-  VLOG(3) << "Fast lookup: no known tablet"
-          << " for " << table_->partition_schema()
-                               .PartitionKeyDebugString(partition_key_, *table_->schema().schema_)
-          << " of " << table_->name()
+  VLOG(4) << "Fast lookup: no cache entry"
+          << " for (" << table_->partition_schema()
+                                .PartitionKeyDebugString(partition_key_, *table_->schema().schema_)
+          << ") of " << table_->name()
           << ": refreshing our metadata from the Master";
 
   if (!has_permit_) {
@@ -594,6 +648,7 @@ void LookupRpc::SendRpc() {
   // Fill out the request.
   req_.mutable_table()->set_table_id(table_->id());
   req_.set_partition_key_start(partition_key_);
+  req_.set_max_returned_locations(MAX_RETURNED_TABLE_LOCATIONS);
 
   // The end partition key is left unset intentionally so that we'll prefetch
   // some additional tablets.
@@ -615,7 +670,7 @@ void LookupRpc::SendRpc() {
 }
 
 string LookupRpc::ToString() const {
-  return Substitute("GetTableLocations($0, $1, $2)",
+  return Substitute("GetTableLocations { table: '$0', partition-key: ($1), attempt: $2 }",
                     table_->name(),
                     table_->partition_schema()
                            .PartitionKeyDebugString(partition_key_, *table_->schema().schema_),
@@ -695,11 +750,23 @@ void LookupRpc::SendRpcCb(const Status& status) {
     }
   }
 
+  if (new_status.IsServiceUnavailable()) {
+    // One or more of the tablets is not running; retry after a backoff period.
+    mutable_retrier()->DelayedRetry(this, new_status);
+    ignore_result(delete_me.release());
+    return;
+  }
+
   if (new_status.ok()) {
-    scoped_refptr<RemoteTablet> result;
-    new_status = meta_cache_->ProcessLookupResponse(*this, &result);
-    if (remote_tablet_) {
-      *remote_tablet_ = result;
+    MetaCacheEntry entry;
+    new_status = meta_cache_->ProcessLookupResponse(*this, &entry);
+    if (entry.is_non_covered_range()) {
+      new_status = Status::NotFound(
+          "No tablet covering the requested range partition",
+          table_->partition_schema()
+                 .PartitionKeyDebugString(partition_key_, *table_->schema().schema_));
+    } else if (remote_tablet_) {
+      *remote_tablet_ = entry.tablet();
     }
   } else {
     new_status = new_status.CloneAndPrepend(Substitute("$0 failed", ToString()));
@@ -709,79 +776,153 @@ void LookupRpc::SendRpcCb(const Status& status) {
 }
 
 Status MetaCache::ProcessLookupResponse(const LookupRpc& rpc,
-                                        scoped_refptr<RemoteTablet>* remote_tablet) {
+                                        MetaCacheEntry* cache_entry) {
   VLOG(2) << "Processing master response for " << rpc.ToString()
           << ". Response: " << rpc.resp().ShortDebugString();
 
+  MonoTime expiration_time = MonoTime::Now(MonoTime::FINE);
+  expiration_time.AddDelta(MonoDelta::FromMilliseconds(rpc.resp().ttl_millis()));
+
   std::lock_guard<rw_spinlock> l(lock_);
   TabletMap& tablets_by_key = LookupOrInsert(&tablets_by_table_and_key_,
                                              rpc.table_id(), TabletMap());
-  for (const TabletLocationsPB& loc : rpc.resp().tablet_locations()) {
-    // First, update the tserver cache, needed for the Refresh calls below.
-    for (const TabletLocationsPB_ReplicaPB& r : loc.replicas()) {
-      UpdateTabletServer(r.ts_info());
-    }
 
-    // Next, update the tablet caches.
-    string tablet_id = loc.tablet_id();
-    scoped_refptr<RemoteTablet> remote = FindPtrOrNull(tablets_by_id_, tablet_id);
-    if (remote.get() != nullptr) {
-      // Partition should not have changed.
-      DCHECK_EQ(loc.partition().partition_key_start(), remote->partition().partition_key_start());
-      DCHECK_EQ(loc.partition().partition_key_end(), remote->partition().partition_key_end());
+  const auto& tablet_locations = rpc.resp().tablet_locations();
 
-      VLOG(3) << "Refreshing tablet " << tablet_id << ": " << loc.ShortDebugString();
-      remote->Refresh(ts_cache_, loc.replicas());
-      continue;
+  if (tablet_locations.empty()) {
+    // If there are no tablets in the response, then the table is empty. If
+    // there were any tablets in the table they would have been returned, since
+    // the master guarantees that if the partition key falls in a non-covered
+    // range, the previous tablet will be returned, and we did not set an upper
+    // bound partition key on the request.
+    DCHECK(!rpc.req().has_partition_key_end());
+
+    tablets_by_key.clear();
+    MetaCacheEntry entry(expiration_time, "", "");
+    VLOG(3) << "Caching '" << rpc.table_name() << "' entry " << entry.DebugString(rpc.table());
+    InsertOrDie(&tablets_by_key, "", std::move(entry));
+  } else {
+
+    // The comments below will reference the following diagram:
+    //
+    //   +---+   +---+---+
+    //   |   |   |   |   |
+    // A | B | C | D | E | F
+    //   |   |   |   |   |
+    //   +---+   +---+---+
+    //
+    // It depicts a tablet locations response from the master containing three
+    // tablets: B, D and E. Three non-covered ranges are present: A, C, and F.
+    // An RPC response containing B, D and E could occur if the lookup partition
+    // key falls in A, B, or C, although the existence of A as an initial
+    // non-covered range can only be inferred if the lookup partition key falls
+    // in A.
+
+    const auto& first_lower_bound = tablet_locations.Get(0).partition().partition_key_start();
+    if (rpc.partition_key() < first_lower_bound) {
+      // If the first tablet is past the requested partition key, then the
+      // partition key falls in an initial non-covered range, such as A.
+
+      // Clear any existing entries which overlap with the discovered non-covered range.
+      tablets_by_key.erase(tablets_by_key.begin(), tablets_by_key.lower_bound(first_lower_bound));
+      MetaCacheEntry entry(expiration_time, "", first_lower_bound);
+      VLOG(3) << "Caching '" << rpc.table_name() << "' entry " << entry.DebugString(rpc.table());
+      InsertOrDie(&tablets_by_key, "", std::move(entry));
     }
 
-    VLOG(3) << "Caching tablet " << tablet_id << " for (" << rpc.table_name() << "): "
-            << loc.ShortDebugString();
+    // last_upper_bound tracks the upper bound of the previously processed
+    // entry, so that we can determine when we have found a non-covered range.
+    string last_upper_bound = first_lower_bound;
+    for (const TabletLocationsPB& tablet : tablet_locations) {
+      const auto& tablet_lower_bound = tablet.partition().partition_key_start();
+      const auto& tablet_upper_bound = tablet.partition().partition_key_end();
 
-    Partition partition;
-    Partition::FromPB(loc.partition(), &partition);
-    remote = new RemoteTablet(tablet_id, partition);
-    remote->Refresh(ts_cache_, loc.replicas());
+      if (last_upper_bound < tablet_lower_bound) {
+        // There is a non-covered range between the previous tablet and this tablet.
+        // This will discover C while processing the tablet location for D.
 
-    InsertOrDie(&tablets_by_id_, tablet_id, remote);
-    InsertOrDie(&tablets_by_key, partition.partition_key_start(), remote);
+        // Clear any existing entries which overlap with the discovered non-covered range.
+        tablets_by_key.erase(tablets_by_key.lower_bound(last_upper_bound),
+                             tablets_by_key.lower_bound(tablet_lower_bound));
 
-    // TODO(KUDU-1421): Once removing partition ranges is supported, we should
-    // inspect the tablet locations for any non-covered ranges. Cached tablets
-    // falling in non-covered ranges should be removed.
-  }
+        MetaCacheEntry entry(expiration_time, last_upper_bound, tablet_lower_bound);
+        VLOG(3) << "Caching '" << rpc.table_name() << "' entry " << entry.DebugString(rpc.table());
+        InsertOrDie(&tablets_by_key, last_upper_bound, std::move(entry));
+      }
+      last_upper_bound = tablet_upper_bound;
+
+      // Now process the tablet itself (such as B, D, or E). If we already know
+      // about the tablet, then we only need to refresh it's replica locations
+      // and the entry TTL. If the tablet is unknown, then we need to create a
+      // new RemoteTablet for it.
+
+      // First, update the tserver cache, needed for the Refresh calls below.
+      for (const TabletLocationsPB_ReplicaPB& replicas : tablet.replicas()) {
+        UpdateTabletServer(replicas.ts_info());
+      }
 
-  bool not_found = false;
-  // itr points to the first tablet that is greater than the requested partition key.
-  auto itr = tablets_by_key.upper_bound(rpc.partition_key());
-  if (tablets_by_key.empty()) {
-    not_found = true;
-  } else if (itr == tablets_by_key.begin()) {
-    // The requested partition key is before all tablets.
-    if (rpc.is_exact_lookup()) {
-      not_found = true;
+      string tablet_id = tablet.tablet_id();
+      scoped_refptr<RemoteTablet> remote = FindPtrOrNull(tablets_by_id_, tablet_id);
+      if (remote.get() != nullptr) {
+        // Partition should not have changed.
+        DCHECK_EQ(tablet_lower_bound, remote->partition().partition_key_start());
+        DCHECK_EQ(tablet_upper_bound, remote->partition().partition_key_end());
+
+        VLOG(3) << "Refreshing tablet " << tablet_id << ": " << tablet.ShortDebugString();
+        remote->Refresh(ts_cache_, tablet.replicas());
+
+        // Update the entry TTL.
+        auto& entry = FindOrDie(tablets_by_key, tablet_lower_bound);
+        DCHECK(!entry.is_non_covered_range() &&
+               entry.upper_bound_partition_key() == tablet_upper_bound);
+        entry.refresh_expiration_time(expiration_time);
+        continue;
+      }
+
+      // Clear any existing entries which overlap with the discovered tablet.
+      tablets_by_key.erase(tablets_by_key.lower_bound(tablet_lower_bound),
+                           tablet_upper_bound.empty() ? tablets_by_key.end() :
+                             tablets_by_key.lower_bound(tablet_upper_bound));
+
+      Partition partition;
+      Partition::FromPB(tablet.partition(), &partition);
+      remote = new RemoteTablet(tablet_id, partition);
+      remote->Refresh(ts_cache_, tablet.replicas());
+
+      MetaCacheEntry entry(expiration_time, remote);
+      VLOG(3) << "Caching '" << rpc.table_name() << "' entry " << entry.DebugString(rpc.table());
+
+      InsertOrDie(&tablets_by_id_, tablet_id, remote);
+      InsertOrDie(&tablets_by_key, tablet_lower_bound, std::move(entry));
+    }
+
+    if (!last_upper_bound.empty() && tablet_locations.size() < MAX_RETURNED_TABLE_LOCATIONS) {
+      // There is a non-covered range between the last tablet and the end of the
+      // partition key space, such as F.
+
+      // Clear existing entries which overlap with the discovered non-covered range.
+      tablets_by_key.erase(tablets_by_key.lower_bound(last_upper_bound),
+                           tablets_by_key.end());
+
+      MetaCacheEntry entry(expiration_time, last_upper_bound, "");
+      VLOG(3) << "Caching '" << rpc.table_name() << "' entry " << entry.DebugString(rpc.table());
+      InsertOrDie(&tablets_by_key, last_upper_bound, std::move(entry));
     }
-    *remote_tablet = itr->second;
-  } else if (std::prev(itr)->second->partition().partition_key_end() > rpc.partition_key() ||
-             std::prev(itr)->second->partition().partition_key_end().empty()) {
-    // Exact match.
-    *remote_tablet = std::prev(itr)->second;
-  } else if (itr == tablets_by_key.end() || rpc.is_exact_lookup()) {
-    // The requested partition key is beyond all tablets,
-    // or falls between two tablets and the lookup is exact.
-    not_found = true;
-  } else {
-    // The primary key falls between two tablets; return the second.
-    *remote_tablet = itr->second;
   }
 
-  return not_found ? Status::NotFound("No tablet covering the requested range partition")
-                   : Status::OK();
+  // Finally, lookup the discovered entry and return it to the requestor.
+  *cache_entry = FindFloorOrDie(tablets_by_key, rpc.partition_key());
+  if (!rpc.is_exact_lookup() && cache_entry->is_non_covered_range() &&
+      !cache_entry->upper_bound_partition_key().empty()) {
+    *cache_entry = FindFloorOrDie(tablets_by_key, cache_entry->upper_bound_partition_key());
+    DCHECK(!cache_entry->is_non_covered_range());
+  }
+  return Status::OK();
 }
 
 bool MetaCache::LookupTabletByKeyFastPath(const KuduTable* table,
                                           const string& partition_key,
-                                          scoped_refptr<RemoteTablet>* remote_tablet) {
+                                          MetaCacheEntry* entry) {
   shared_lock<rw_spinlock> l(lock_);
   const TabletMap* tablets = FindOrNull(tablets_by_table_and_key_, table->id());
   if (PREDICT_FALSE(!tablets)) {
@@ -789,21 +930,19 @@ bool MetaCache::LookupTabletByKeyFastPath(const KuduTable* table,
     return false;
   }
 
-  const scoped_refptr<RemoteTablet>* r = FindFloorOrNull(*tablets, partition_key);
-  if (PREDICT_FALSE(!r)) {
+  const MetaCacheEntry* e = FindFloorOrNull(*tablets, partition_key);
+  if (PREDICT_FALSE(!e)) {
     // No tablets with a start partition key lower than 'partition_key'.
     return false;
   }
 
   // Stale entries must be re-fetched.
-  if ((*r)->stale()) {
+  if (e->stale()) {
     return false;
   }
 
-  if ((*r)->partition().partition_key_end().compare(partition_key) > 0 ||
-      (*r)->partition().partition_key_end().empty()) {
-    // partition_key < partition.end OR tablet doesn't end.
-    *remote_tablet = *r;
+  if (e->Contains(partition_key)) {
+    *entry = *e;
     return true;
   }
 
@@ -811,6 +950,7 @@ bool MetaCache::LookupTabletByKeyFastPath(const KuduTable* table,
 }
 
 void MetaCache::ClearCacheForTesting() {
+  VLOG(3) << "Clearing cache";
   shared_lock<rw_spinlock> l(lock_);
   STLDeleteValues(&ts_cache_);
   tablets_by_id_.clear();
@@ -857,7 +997,7 @@ void MetaCache::MarkTSFailed(RemoteTabletServer* ts,
   Status ts_status = status.CloneAndPrepend("TS failed");
 
   // TODO: replace with a ts->tablet multimap for faster lookup?
-  for (const TabletMap::value_type& tablet : tablets_by_id_) {
+  for (const auto& tablet : tablets_by_id_) {
     // We just loop on all tablets; if a tablet does not have a replica on this
     // TS, MarkReplicaFailed() returns false and we ignore the return value.
     tablet.second->MarkReplicaFailed(ts, ts_status);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e831ab43/src/kudu/client/meta_cache.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/meta_cache.h b/src/kudu/client/meta_cache.h
index 53403d1..4dddfc4 100644
--- a/src/kudu/client/meta_cache.h
+++ b/src/kudu/client/meta_cache.h
@@ -34,11 +34,11 @@
 #include "kudu/rpc/rpc.h"
 #include "kudu/util/async_util.h"
 #include "kudu/util/locks.h"
+#include "kudu/util/memory/arena.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
 #include "kudu/util/semaphore.h"
 #include "kudu/util/status.h"
-#include "kudu/util/memory/arena.h"
-#include "kudu/util/net/net_util.h"
 
 namespace kudu {
 
@@ -57,6 +57,7 @@ class TSInfoPB;
 namespace client {
 
 class ClientTest_TestMasterLookupPermits_Test;
+class ClientTest_TestMetaCacheExpiry_Test;
 class ClientTest_TestNonCoveringRangePartitions_Test;
 class KuduClient;
 class KuduTable;
@@ -248,6 +249,99 @@ class RemoteTablet : public RefCountedThreadSafe<RemoteTablet> {
   DISALLOW_COPY_AND_ASSIGN(RemoteTablet);
 };
 
+// MetaCacheEntry holds either a tablet and its associated `RemoteTablet`
+// instance, or a non-covered partition range.
+class MetaCacheEntry {
+ public:
+
+  MetaCacheEntry() { }
+
+  // Construct a MetaCacheEntry representing a tablet.
+  MetaCacheEntry(MonoTime expiration_time, scoped_refptr<RemoteTablet> tablet)
+      : expiration_time_(expiration_time),
+        tablet_(std::move(tablet)) {
+  }
+
+  // Construct a MetaCacheEntry representing a non-covered range with the
+  // provided range partition bounds.
+  MetaCacheEntry(MonoTime expiration_time,
+                 std::string lower_bound_partition_key,
+                 std::string upper_bound_partition_key)
+      : expiration_time_(std::move(expiration_time)),
+        lower_bound_partition_key_(std::move(lower_bound_partition_key)),
+        upper_bound_partition_key_(std::move(upper_bound_partition_key)) {
+  }
+
+  // Returns `true` if this is a non-covered partition range.
+  bool is_non_covered_range() const {
+    DCHECK(Initialized());
+    return tablet_.get() == nullptr;
+  }
+
+  // Returns the remote tablet, should only be called if this entry contains a
+  // tablet.
+  const scoped_refptr<RemoteTablet>& tablet() const {
+    DCHECK_NOTNULL(tablet_.get());
+    DCHECK(Initialized());
+    return tablet_;
+  }
+
+  // Returns the inclusive lower bound partition key for the entry.
+  const std::string& lower_bound_partition_key() const {
+    DCHECK(Initialized());
+    if (is_non_covered_range()) {
+      return lower_bound_partition_key_;
+    } else {
+      return tablet_->partition().partition_key_start();
+    }
+  }
+
+  // Returns the exclusive upper bound partition key for the entry.
+  const std::string& upper_bound_partition_key() const {
+    DCHECK(Initialized());
+    if (is_non_covered_range()) {
+      return upper_bound_partition_key_;
+    } else {
+      return tablet_->partition().partition_key_end();
+    }
+  }
+
+  void refresh_expiration_time(MonoTime expiration_time) {
+    DCHECK(Initialized());
+    DCHECK(expiration_time.Initialized());
+    // Do not check that the new expiration time comes after the existing expiration
+    // time, because that may not hold if the master changes it's configured ttl.
+    expiration_time_ = expiration_time;
+  }
+
+  // Returns true if the partition key is contained in this meta cache entry.
+  bool Contains(const std::string& partition_key) const;
+
+  // Returns true if this meta cache entry is stale.
+  bool stale() const;
+
+  std::string DebugString(const KuduTable* table) const;
+
+ private:
+
+  // Returns true if the entry is initialized.
+  bool Initialized() const {
+    return expiration_time_.Initialized();
+  }
+
+  // The expiration time of this cached entry.
+  MonoTime expiration_time_;
+
+  // The tablet. If this is a non-covered range then the tablet will be a nullptr.
+  scoped_refptr<RemoteTablet> tablet_;
+
+  // The lower bound partition key, if this is a non-covered range.
+  std::string lower_bound_partition_key_;
+
+  // The upper bound partition key, if this is a non-covered range.
+  std::string upper_bound_partition_key_;
+};
+
 // Manager of RemoteTablets and RemoteTabletServers. The client consults
 // this class to look up a given tablet or server.
 //
@@ -296,17 +390,18 @@ class MetaCache : public RefCountedThreadSafe<MetaCache> {
   friend class LookupRpc;
 
   FRIEND_TEST(client::ClientTest, TestMasterLookupPermits);
+  FRIEND_TEST(client::ClientTest, TestMetaCacheExpiry);
   FRIEND_TEST(client::ClientTest, TestNonCoveringRangePartitions);
 
   // Called on the slow LookupTablet path when the master responds. Populates
   // the tablet caches and returns a reference to the first one.
-  Status ProcessLookupResponse(const LookupRpc& rpc, scoped_refptr<RemoteTablet>* remote_tablet);
+  Status ProcessLookupResponse(const LookupRpc& rpc, MetaCacheEntry* entry);
 
   // Lookup the given tablet by key, only consulting local information.
   // Returns true and sets *remote_tablet if successful.
   bool LookupTabletByKeyFastPath(const KuduTable* table,
                                  const std::string& partition_key,
-                                 scoped_refptr<RemoteTablet>* remote_tablet);
+                                 MetaCacheEntry* entry);
 
   // Clears the meta cache for testing purposes.
   void ClearCacheForTesting();
@@ -332,16 +427,20 @@ class MetaCache : public RefCountedThreadSafe<MetaCache> {
   // Protected by lock_.
   TabletServerMap ts_cache_;
 
-  // Cache of tablets, keyed by table ID, then by start partition key.
+  // Cache of tablets, keyed by partition key.
+  //
+  // Protected by lock_.
+  typedef std::map<std::string, MetaCacheEntry> TabletMap;
+
+  // Cache of tablets and non-covered ranges, keyed by table id.
   //
   // Protected by lock_.
-  typedef std::map<std::string, scoped_refptr<RemoteTablet> > TabletMap;
   std::unordered_map<std::string, TabletMap> tablets_by_table_and_key_;
 
   // Cache of tablets, keyed by tablet ID.
   //
   // Protected by lock_
-  std::unordered_map<std::string, scoped_refptr<RemoteTablet> > tablets_by_id_;
+  std::unordered_map<std::string, scoped_refptr<RemoteTablet>> tablets_by_id_;
 
   // Prevents master lookup "storms" by delaying master lookups when all
   // permits have been acquired.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e831ab43/src/kudu/client/schema.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/schema.h b/src/kudu/client/schema.h
index f2b4917..65fb5c3 100644
--- a/src/kudu/client/schema.h
+++ b/src/kudu/client/schema.h
@@ -41,6 +41,7 @@ namespace client {
 namespace internal {
 class GetTableSchemaRpc;
 class LookupRpc;
+class MetaCacheEntry;
 class WriteRpc;
 } // namespace internal
 
@@ -332,6 +333,7 @@ class KUDU_EXPORT KuduSchema {
   friend class ScanConfiguration;
   friend class internal::GetTableSchemaRpc;
   friend class internal::LookupRpc;
+  friend class internal::MetaCacheEntry;
   friend class internal::WriteRpc;
   friend class kudu::tools::TsAdminClient;
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e831ab43/src/kudu/client/table-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/table-internal.cc b/src/kudu/client/table-internal.cc
index ca97480..c9aa5dd 100644
--- a/src/kudu/client/table-internal.cc
+++ b/src/kudu/client/table-internal.cc
@@ -128,16 +128,12 @@ Status KuduTable::Data::Open() {
           continue;
         }
       }
-      if (s.ok()) {
-        s = StatusFromPB(resp.error().status());
-      }
+      s = StatusFromPB(resp.error().status());
     }
-    if (!s.ok()) {
-      LOG(WARNING) << "Error getting table locations: " << s.ToString() << ", retrying.";
-      continue;
-    }
-    if (resp.tablet_locations_size() > 0) {
+    if (s.ok()) {
       break;
+    } else {
+      LOG(WARNING) << "Error getting table locations: " << s.ToString() << ", retrying.";
     }
 
     /* TODO: Use exponential backoff instead */

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e831ab43/src/kudu/gutil/map-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/gutil/map-util.h b/src/kudu/gutil/map-util.h
index 9e6e46c..8a3b302 100644
--- a/src/kudu/gutil/map-util.h
+++ b/src/kudu/gutil/map-util.h
@@ -204,6 +204,27 @@ FindFloorOrNull(Collection& collection,  // NOLINT
   return &(--it)->second;
 }
 
+// Returns a const-reference to the value associated with the greatest key
+// that's less than or equal to the given key, or crashes if it does not exist.
+template <class Collection>
+const typename Collection::value_type::second_type&
+FindFloorOrDie(const Collection& collection,
+               const typename Collection::value_type::first_type& key) {
+  auto it = collection.upper_bound(key);
+  CHECK(it != collection.begin());
+  return (--it)->second;
+}
+
+// Same as above, but returns a non-const reference.
+template <class Collection>
+typename Collection::value_type::second_type&
+FindFloorOrDie(Collection& collection,
+               const typename Collection::value_type::first_type& key) {
+  auto it = collection.upper_bound(key);
+  CHECK(it != collection.begin());
+  return (--it)->second;
+}
+
 // Returns the pointer value associated with the given key. If none is found,
 // NULL is returned. The function is designed to be used with a map of keys to
 // pointers.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e831ab43/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 9271436..6568637 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -147,6 +147,12 @@ DEFINE_bool(catalog_manager_check_ts_count_for_create_table, true,
             "a table to be created.");
 TAG_FLAG(catalog_manager_check_ts_count_for_create_table, hidden);
 
+DEFINE_int32(table_locations_ttl_ms, 60 * 60 * 1000, // 1 hour
+             "Maximum time in milliseconds which clients may cache table locations. "
+             "New range partitions may not be visible to existing client instances "
+             "until after waiting for the ttl period.");
+TAG_FLAG(table_locations_ttl_ms, advanced);
+
 using std::pair;
 using std::shared_ptr;
 using std::string;
@@ -3091,6 +3097,7 @@ Status CatalogManager::GetTableLocations(const GetTableLocationsRequestPB* req,
       LOG(FATAL) << "Unexpected error while building tablet locations: " << s.ToString();
     }
   }
+  resp->set_ttl_millis(FLAGS_table_locations_ttl_ms);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e831ab43/src/kudu/master/master.proto
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 4045e4f..35a3354 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -417,6 +417,10 @@ message GetTableLocationsResponsePB {
   optional MasterErrorPB error = 1;
 
   repeated TabletLocationsPB tablet_locations = 2;
+
+  // If the client caches table locations, the entries should not live longer
+  // than this timeout. Defaults to one hour.
+  optional uint32 ttl_millis = 3 [default = 36000000];
 }
 
 message AlterTableRequestPB {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e831ab43/src/kudu/tools/ksck_remote.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote.cc b/src/kudu/tools/ksck_remote.cc
index e25e94d..fcfae31 100644
--- a/src/kudu/tools/ksck_remote.cc
+++ b/src/kudu/tools/ksck_remote.cc
@@ -250,7 +250,7 @@ Status RemoteKsckMaster::RetrieveTabletServers(TSMap* tablet_servers) {
   return Status::OK();
 }
 
-Status RemoteKsckMaster::RetrieveTablesList(vector<shared_ptr<KsckTable> >* tables) {
+Status RemoteKsckMaster::RetrieveTablesList(vector<shared_ptr<KsckTable>>* tables) {
   master::ListTablesRequestPB req;
   master::ListTablesResponsePB resp;
   RpcController rpc;
@@ -260,7 +260,7 @@ Status RemoteKsckMaster::RetrieveTablesList(vector<shared_ptr<KsckTable> >* tabl
   if (resp.has_error()) {
     return StatusFromPB(resp.error().status());
   }
-  vector<shared_ptr<KsckTable> > tables_temp;
+  vector<shared_ptr<KsckTable>> tables_temp;
   for (const master::ListTablesResponsePB_TableInfo& info : resp.tables()) {
     Schema schema;
     int num_replicas;
@@ -273,11 +273,17 @@ Status RemoteKsckMaster::RetrieveTablesList(vector<shared_ptr<KsckTable> >* tabl
 }
 
 Status RemoteKsckMaster::RetrieveTabletsList(const shared_ptr<KsckTable>& table) {
-  vector<shared_ptr<KsckTablet> > tablets;
+  vector<shared_ptr<KsckTablet>> tablets;
   bool more_tablets = true;
   string last_key;
+  int retries = 0;
   while (more_tablets) {
-    GetTabletsBatch(table, &last_key, tablets, &more_tablets);
+    Status s = GetTabletsBatch(table, &last_key, tablets, &more_tablets);
+    if (s.IsServiceUnavailable() && retries++ < 25) {
+      SleepFor(MonoDelta::FromMilliseconds(100 * retries));
+    } else if (!s.ok()) {
+      return s;
+    }
   }
 
   table->set_tablets(tablets);
@@ -286,7 +292,7 @@ Status RemoteKsckMaster::RetrieveTabletsList(const shared_ptr<KsckTable>& table)
 
 Status RemoteKsckMaster::GetTabletsBatch(const shared_ptr<KsckTable>& table,
                                          string* last_partition_key,
-                                         vector<shared_ptr<KsckTablet> >& tablets,
+                                         vector<shared_ptr<KsckTablet>>& tablets,
                                          bool* more_tablets) {
   master::GetTableLocationsRequestPB req;
   master::GetTableLocationsResponsePB resp;
@@ -299,8 +305,14 @@ Status RemoteKsckMaster::GetTabletsBatch(const shared_ptr<KsckTable>& table,
   rpc.set_timeout(GetDefaultTimeout());
   RETURN_NOT_OK(proxy_->GetTableLocations(req, &resp, &rpc));
   for (const master::TabletLocationsPB& locations : resp.tablet_locations()) {
+    if (locations.partition().partition_key_start() < *last_partition_key) {
+      // We've already seen this partition.
+      continue;
+    }
+    *last_partition_key = locations.partition().partition_key_start();
+
     shared_ptr<KsckTablet> tablet(new KsckTablet(table.get(), locations.tablet_id()));
-    vector<shared_ptr<KsckTabletReplica> > replicas;
+    vector<shared_ptr<KsckTabletReplica>> replicas;
     for (const master::TabletLocationsPB_ReplicaPB& replica : locations.replicas()) {
       bool is_leader = replica.role() == consensus::RaftPeerPB::LEADER;
       bool is_follower = replica.role() == consensus::RaftPeerPB::FOLLOWER;
@@ -310,16 +322,7 @@ Status RemoteKsckMaster::GetTabletsBatch(const shared_ptr<KsckTable>& table,
     tablet->set_replicas(replicas);
     tablets.push_back(tablet);
   }
-  if (resp.tablet_locations_size() != 0) {
-    *last_partition_key = (resp.tablet_locations().end() - 1)->partition().partition_key_end();
-  } else {
-    return Status::NotFound(Substitute(
-      "The Master returned 0 tablets for GetTableLocations of table $0 at start key $1",
-      table->name(), *(last_partition_key)));
-  }
-  if (last_partition_key->empty()) {
-    *more_tablets = false;
-  }
+  *more_tablets = resp.tablet_locations().size() == FLAGS_tablets_batch_size_max;
   return Status::OK();
 }