You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/12/07 06:53:29 UTC
[kudu] branch master updated: client: allow tablet ID lookups from
the MetaCache
This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 2a55876 client: allow tablet ID lookups from the MetaCache
2a55876 is described below
commit 2a558768f8aa00068e72ccd1327081f07ba46b03
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Sun Nov 29 22:41:36 2020 -0800
client: allow tablet ID lookups from the MetaCache
Having a client interface for getting a RemoteTablet asynchronously will
be useful when it comes time to send participant ops to specific tablet
IDs from the TxnSystemClient.
In addition to the existing key-indexed cache entries, this patch
extends the MetaCache to track a separate set of cache entries by tablet
ID. There is no relationship between the two sets of cache entries -- a
tablet may exist in one but not the other.
The new id-based cache entries are only used if performing an id-based
lookup in the MetaCache via its new internal LookupTabletById() call
that will be used in a later patch.
Change-Id: Ib2333add5c3ab8403c48e69d29c90f3aec0914b6
Reviewed-on: http://gerrit.cloudera.org:8080/16794
Tested-by: Andrew Wong <aw...@cloudera.com>
Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
src/kudu/client/client-test.cc | 199 ++++++++++++++++++++++++++-
src/kudu/client/client.h | 4 +
src/kudu/client/master_proxy_rpc.h | 10 +-
src/kudu/client/meta_cache.cc | 270 ++++++++++++++++++++++++++++++++-----
src/kudu/client/meta_cache.h | 76 +++++++++--
5 files changed, 508 insertions(+), 51 deletions(-)
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 64bb791..57da237 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -145,6 +145,7 @@ DECLARE_bool(rpc_trace_negotiation);
DECLARE_bool(scanner_inject_service_unavailable_on_continue_scan);
DECLARE_bool(txn_manager_enabled);
DECLARE_bool(txn_manager_lazily_initialized);
+DECLARE_int32(client_tablet_locations_by_id_ttl_ms);
DECLARE_int32(flush_threshold_mb);
DECLARE_int32(flush_threshold_secs);
DECLARE_int32(heartbeat_interval_ms);
@@ -276,6 +277,19 @@ class ClientTest : public KuduTest {
return rt;
}
+ Status MetaCacheLookupById(
+ const string& tablet_id, scoped_refptr<internal::RemoteTablet>* remote_tablet) {
+ remote_tablet->reset();
+ scoped_refptr<internal::RemoteTablet> rt;
+ Synchronizer sync;
+ client_->data_->meta_cache_->LookupTabletById(
+ client_.get(), tablet_id, MonoTime::Max(), &rt,
+ sync.AsStatusCallback());
+ RETURN_NOT_OK(sync.Wait());
+ *remote_tablet = std::move(rt);
+ return Status::OK();
+ }
+
// Generate a set of split rows for tablets used in this test.
vector<unique_ptr<KuduPartialRow>> GenerateSplitRows() {
vector<unique_ptr<KuduPartialRow>> rows;
@@ -2208,7 +2222,6 @@ TEST_F(ClientTest, TestMinMaxRangeBounds) {
}
TEST_F(ClientTest, TestMetaCacheExpiry) {
- google::FlagSaver saver;
FLAGS_table_locations_ttl_ms = 25;
auto& meta_cache = client_->data_->meta_cache_;
@@ -2228,9 +2241,191 @@ TEST_F(ClientTest, TestMetaCacheExpiry) {
ASSERT_FALSE(meta_cache->LookupEntryByKeyFastPath(client_table_.get(), "", &entry));
// Force a lookup and ensure the entry is refreshed.
- CHECK_NOTNULL(MetaCacheLookup(client_table_.get(), "").get());
+ ASSERT_NE(nullptr, MetaCacheLookup(client_table_.get(), ""));
+ ASSERT_TRUE(entry.stale());
+ ASSERT_TRUE(meta_cache->LookupEntryByKeyFastPath(client_table_.get(), "", &entry));
+ ASSERT_FALSE(entry.stale());
+}
+
+TEST_F(ClientTest, TestBasicIdBasedLookup) {
+ auto tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
+ ASSERT_FALSE(tablet_ids.empty());
+ scoped_refptr<internal::RemoteTablet> rt;
+ for (const auto& tablet_id : tablet_ids) {
+ ASSERT_OK(MetaCacheLookupById(tablet_id, &rt));
+ ASSERT_TRUE(rt != nullptr);
+ ASSERT_EQ(tablet_id, rt->tablet_id());
+ }
+ const auto& kDummyId = "dummy-tablet-id";
+ Status s = MetaCacheLookupById(kDummyId, &rt);
+ ASSERT_TRUE(s.IsNotFound());
+ ASSERT_EQ(nullptr, rt);
+
+ auto& meta_cache = client_->data_->meta_cache_;
+ internal::MetaCacheEntry entry;
+ ASSERT_FALSE(meta_cache->LookupEntryByIdFastPath(kDummyId, &entry));
+}
+
+TEST_F(ClientTest, TestMetaCacheExpiryById) {
+ FLAGS_client_tablet_locations_by_id_ttl_ms = 25;
+ auto tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
+ ASSERT_FALSE(tablet_ids.empty());
+ const auto& tablet_id = tablet_ids[0];
+
+ auto& meta_cache = client_->data_->meta_cache_;
+ meta_cache->ClearCache();
+ {
+ internal::MetaCacheEntry entry;
+ ASSERT_FALSE(meta_cache->LookupEntryByIdFastPath(tablet_id, &entry));
+ ASSERT_FALSE(entry.Initialized());
+ }
+ {
+ scoped_refptr<internal::RemoteTablet> rt;
+ ASSERT_OK(MetaCacheLookupById(tablet_id, &rt));
+ ASSERT_NE(nullptr, rt);
+ internal::MetaCacheEntry entry;
+ ASSERT_TRUE(meta_cache->LookupEntryByIdFastPath(tablet_id, &entry));
+ ASSERT_TRUE(entry.Initialized());
+ ASSERT_FALSE(entry.stale());
+
+ // After some time, the entry should become stale.
+ SleepFor(MonoDelta::FromMilliseconds(FLAGS_client_tablet_locations_by_id_ttl_ms));
+ ASSERT_TRUE(entry.stale());
+ }
+ {
+ internal::MetaCacheEntry entry;
+ ASSERT_FALSE(meta_cache->LookupEntryByIdFastPath(tablet_id, &entry));
+ ASSERT_FALSE(entry.Initialized());
+
+ // Force a lookup and ensure the entry is refreshed only once we refresh the
+ // entry.
+ scoped_refptr<internal::RemoteTablet> rt;
+ ASSERT_OK(MetaCacheLookupById(tablet_id, &rt));
+ ASSERT_NE(nullptr, rt);
+ ASSERT_TRUE(meta_cache->LookupEntryByIdFastPath(tablet_id, &entry));
+ ASSERT_FALSE(entry.stale());
+ }
+}
+
+TEST_F(ClientTest, TestMetaCacheWithKeysAndIds) {
+ auto& meta_cache = client_->data_->meta_cache_;
+ auto tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
+ ASSERT_FALSE(tablet_ids.empty());
+ const auto& first_tablet_id = tablet_ids[0];
+
+ meta_cache->ClearCache();
+ {
+ internal::MetaCacheEntry entry;
+ ASSERT_FALSE(meta_cache->LookupEntryByIdFastPath(first_tablet_id, &entry));
+ ASSERT_FALSE(entry.Initialized());
+ ASSERT_FALSE(meta_cache->LookupEntryByKeyFastPath(client_table_.get(), "", &entry));
+ ASSERT_FALSE(entry.Initialized());
+
+ ASSERT_NE(nullptr, MetaCacheLookup(client_table_.get(), ""));
+ ASSERT_TRUE(meta_cache->LookupEntryByKeyFastPath(client_table_.get(), "", &entry));
+ ASSERT_FALSE(entry.stale());
+ }
+ // Just because we have a cache entry for key-based lookup doesn't mean we
+ // have an entry for id-based lookup.
+ for (const auto& tid : tablet_ids) {
+ {
+ internal::MetaCacheEntry entry;
+ ASSERT_FALSE(meta_cache->LookupEntryByIdFastPath(tid, &entry));
+ ASSERT_FALSE(entry.Initialized());
+
+ // We should be able to force an id-based lookup even if the tablets are
+ // already in the meta cache.
+ scoped_refptr<internal::RemoteTablet> rt;
+ ASSERT_OK(MetaCacheLookupById(tid, &rt));
+ ASSERT_NE(nullptr, rt);
+ }
+ {
+ internal::MetaCacheEntry entry;
+ ASSERT_TRUE(meta_cache->LookupEntryByIdFastPath(tid, &entry));
+ ASSERT_FALSE(entry.stale());
+ }
+ }
+ // Even if we looked up new locations, the key-based cached entry should
+ // still be available.
+ {
+ internal::MetaCacheEntry entry;
+ ASSERT_TRUE(meta_cache->LookupEntryByKeyFastPath(client_table_.get(), "", &entry));
+ ASSERT_FALSE(entry.stale());
+ }
+ // Let's do that again but with an id-based lookup first.
+ meta_cache->ClearCache();
+ {
+ internal::MetaCacheEntry entry;
+ ASSERT_FALSE(meta_cache->LookupEntryByIdFastPath(first_tablet_id, &entry));
+ ASSERT_FALSE(entry.Initialized());
+ ASSERT_FALSE(meta_cache->LookupEntryByKeyFastPath(client_table_.get(), "", &entry));
+ ASSERT_FALSE(entry.Initialized());
+ }
+ // Once we do the lookup by ID, we should be able to fast-path the id-based
+ // lookup but not the key-based lookup.
+ {
+ internal::MetaCacheEntry entry;
+ scoped_refptr<internal::RemoteTablet> rt;
+ ASSERT_OK(MetaCacheLookupById(first_tablet_id, &rt));
+ ASSERT_NE(nullptr, rt);
+ ASSERT_TRUE(meta_cache->LookupEntryByIdFastPath(first_tablet_id, &entry));
+ ASSERT_FALSE(entry.stale());
+ ASSERT_FALSE(meta_cache->LookupEntryByKeyFastPath(client_table_.get(), "", &entry));
+ }
+ // And once we do the key-based lookups, we should be able to see them both
+ // cached.
+ {
+ internal::MetaCacheEntry entry;
+ ASSERT_NE(nullptr, MetaCacheLookup(client_table_.get(), ""));
+ ASSERT_TRUE(meta_cache->LookupEntryByKeyFastPath(client_table_.get(), "", &entry));
+ ASSERT_FALSE(entry.stale());
+ }
+ {
+ internal::MetaCacheEntry entry;
+ ASSERT_TRUE(meta_cache->LookupEntryByIdFastPath(first_tablet_id, &entry));
+ ASSERT_FALSE(entry.stale());
+ }
+}
+
+TEST_F(ClientTest, TestMetaCacheExpiryWithKeysAndIds) {
+ auto& meta_cache = client_->data_->meta_cache_;
+ meta_cache->ClearCache();
+ auto tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
+ ASSERT_FALSE(tablet_ids.empty());
+ const auto& first_tablet_id = tablet_ids[0];
+
+ FLAGS_table_locations_ttl_ms = 10000;
+ FLAGS_client_tablet_locations_by_id_ttl_ms = 25;
+ internal::MetaCacheEntry entry;
+ ASSERT_FALSE(meta_cache->LookupEntryByIdFastPath(first_tablet_id, &entry));
+ ASSERT_FALSE(meta_cache->LookupEntryByKeyFastPath(client_table_.get(), "", &entry));
+
+ // Perform both id-based and key-based lookups.
+ ASSERT_NE(nullptr, MetaCacheLookup(client_table_.get(), ""));
+ scoped_refptr<internal::RemoteTablet> rt;
+ ASSERT_OK(MetaCacheLookupById(first_tablet_id, &rt));
+ ASSERT_NE(nullptr, rt);
+
+ // Wait for our id-based entries to expire. This shouldn't affect our
+ // key-based entries.
+ SleepFor(MonoDelta::FromMilliseconds(FLAGS_client_tablet_locations_by_id_ttl_ms));
+ ASSERT_FALSE(meta_cache->LookupEntryByIdFastPath(first_tablet_id, &entry));
ASSERT_TRUE(meta_cache->LookupEntryByKeyFastPath(client_table_.get(), "", &entry));
ASSERT_FALSE(entry.stale());
+
+ FLAGS_client_tablet_locations_by_id_ttl_ms = 10000;
+ FLAGS_table_locations_ttl_ms = 25;
+ meta_cache->ClearCache();
+ ASSERT_NE(nullptr, MetaCacheLookup(client_table_.get(), ""));
+ ASSERT_OK(MetaCacheLookupById(first_tablet_id, &rt));
+ ASSERT_NE(nullptr, rt);
+
+ // Wait for our key-based entries to expire. This shouldn't affect our
+ // id-based entries.
+ SleepFor(MonoDelta::FromMilliseconds(FLAGS_table_locations_ttl_ms));
+ ASSERT_FALSE(meta_cache->LookupEntryByKeyFastPath(client_table_.get(), "", &entry));
+ ASSERT_TRUE(meta_cache->LookupEntryByIdFastPath(first_tablet_id, &entry));
+ ASSERT_FALSE(entry.stale());
}
TEST_F(ClientTest, TestGetTabletServerBlacklist) {
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 95a2a1d..c0f93cc 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -809,12 +809,16 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
FRIEND_TEST(kudu::ClientStressTest, TestUniqueClientIds);
FRIEND_TEST(ClientTest, ConnectionNegotiationTimeout);
+ FRIEND_TEST(ClientTest, TestBasicIdBasedLookup);
FRIEND_TEST(ClientTest, TestCacheAuthzTokens);
FRIEND_TEST(ClientTest, TestGetSecurityInfoFromMaster);
FRIEND_TEST(ClientTest, TestGetTabletServerBlacklist);
FRIEND_TEST(ClientTest, TestMasterDown);
FRIEND_TEST(ClientTest, TestMasterLookupPermits);
FRIEND_TEST(ClientTest, TestMetaCacheExpiry);
+ FRIEND_TEST(ClientTest, TestMetaCacheExpiryById);
+ FRIEND_TEST(ClientTest, TestMetaCacheExpiryWithKeysAndIds);
+ FRIEND_TEST(ClientTest, TestMetaCacheWithKeysAndIds);
FRIEND_TEST(ClientTest, TestNonCoveringRangePartitions);
FRIEND_TEST(ClientTest, TestRetrieveAuthzTokenInParallel);
FRIEND_TEST(ClientTest, TestReplicatedTabletWritesWithLeaderElection);
diff --git a/src/kudu/client/master_proxy_rpc.h b/src/kudu/client/master_proxy_rpc.h
index 420c4b9..22548fb 100644
--- a/src/kudu/client/master_proxy_rpc.h
+++ b/src/kudu/client/master_proxy_rpc.h
@@ -83,11 +83,6 @@ class AsyncLeaderMasterRpc : public rpc::Rpc {
std::string ToString() const override;
- protected:
- // Handles 'status', retrying if necessary, and calling the user-provided
- // callback as appropriate.
- void SendRpcCb(const Status& status) override;
-
// Uses 'status' and the contents of the RPC controller and RPC response to
// determine whether reconnections or retries should be performed, and if so,
// performs them. Additionally, updates 'status' to include more information
@@ -108,6 +103,11 @@ class AsyncLeaderMasterRpc : public rpc::Rpc {
// alive.
bool RetryOrReconnectIfNecessary(Status* status);
+ protected:
+ // Handles 'status', retrying if necessary, and calling the user-provided
+ // callback as appropriate.
+ void SendRpcCb(const Status& status) override;
+
// Attempts to reconnect with the masters and find the leader master, and
// attempts to retry the RPC.
virtual void ResetMasterLeaderAndRetry(rpc::CredentialsPolicy creds_policy);
diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc
index 3abf7ab..d7fc94f 100644
--- a/src/kudu/client/meta_cache.cc
+++ b/src/kudu/client/meta_cache.cc
@@ -62,9 +62,12 @@ using kudu::consensus::RaftPeerPB;
using kudu::master::ANY_REPLICA;
using kudu::master::GetTableLocationsRequestPB;
using kudu::master::GetTableLocationsResponsePB;
+using kudu::master::GetTabletLocationsRequestPB;
+using kudu::master::GetTabletLocationsResponsePB;
using kudu::master::MasterServiceProxy;
using kudu::master::TabletLocationsPB;
using kudu::master::TSInfoPB;
+using kudu::pb_util::SecureShortDebugString;
using kudu::rpc::BackoffType;
using kudu::rpc::CredentialsPolicy;
using kudu::tserver::TabletServerAdminServiceProxy;
@@ -86,6 +89,12 @@ DEFINE_bool(client_use_unix_domain_sockets, false,
"on such a socket and the client is running on the same host.");
TAG_FLAG(client_use_unix_domain_sockets, experimental);
+DEFINE_int32(client_tablet_locations_by_id_ttl_ms, 60 * 60 * 1000, // 60 minutes
+ "Maximum time in milliseconds that clients will cache tablet "
+ "locations keyed by tablet ID.");
+TAG_FLAG(client_tablet_locations_by_id_ttl_ms, advanced);
+TAG_FLAG(client_tablet_locations_by_id_ttl_ms, runtime);
+
namespace kudu {
namespace client {
namespace internal {
@@ -575,7 +584,7 @@ MetaCache::~MetaCache() {
STLDeleteValues(&ts_cache_);
}
-void MetaCache::UpdateTabletServer(const TSInfoPB& pb) {
+void MetaCache::UpdateTabletServerUnlocked(const TSInfoPB& pb) {
DCHECK(lock_.is_write_locked());
RemoteTabletServer* ts = FindPtrOrNull(ts_cache_, pb.permanent_uuid());
if (ts) {
@@ -587,6 +596,119 @@ void MetaCache::UpdateTabletServer(const TSInfoPB& pb) {
InsertOrDie(&ts_cache_, pb.permanent_uuid(), new RemoteTabletServer(pb));
}
+// A (tablet id) --> tablet lookup. May be in-flight to a master, or may be
+// handled locally.
+//
+// Keeps a reference on the owning meta cache while alive.
+class LookupRpcById : public AsyncLeaderMasterRpc<GetTabletLocationsRequestPB,
+ GetTabletLocationsResponsePB> {
+ public:
+ LookupRpcById(scoped_refptr<MetaCache> meta_cache,
+ KuduClient* client,
+ StatusCallback user_cb,
+ const string& tablet_id,
+ scoped_refptr<RemoteTablet>* remote_tablet,
+ const MonoTime& deadline);
+ virtual ~LookupRpcById() = default;
+
+ void SendRpc() override;
+ void SendRpcSlowPath();
+ string ToString() const override {
+ return Substitute("$0 { tablet: '$1', attempt: $2 }",
+ rpc_name_, tablet_id_, num_attempts());
+ }
+ private:
+ void SendRpcCb(const Status& status) override;
+
+ const string tablet_id_;
+ scoped_refptr<MetaCache> meta_cache_;
+
+ GetTabletLocationsRequestPB req_;
+ GetTabletLocationsResponsePB resp_;
+
+ scoped_refptr<RemoteTablet>* remote_tablet_;
+};
+
+LookupRpcById::LookupRpcById(scoped_refptr<MetaCache> meta_cache,
+ KuduClient* client,
+ StatusCallback user_cb,
+ const string& tablet_id,
+ scoped_refptr<RemoteTablet>* remote_tablet,
+ const MonoTime& deadline)
+ : AsyncLeaderMasterRpc(deadline, client, BackoffType::LINEAR, req_, &resp_,
+ &MasterServiceProxy::GetTabletLocationsAsync, "LookupRpcById", std::move(user_cb), {}),
+ tablet_id_(tablet_id),
+ meta_cache_(std::move(meta_cache)),
+ remote_tablet_(remote_tablet) {
+}
+
+void LookupRpcById::SendRpc() {
+ Status fastpath_status = meta_cache_->DoFastPathLookupById(
+ tablet_id_, remote_tablet_);
+ if (!fastpath_status.IsIncomplete()) {
+ user_cb_(fastpath_status);
+ delete this;
+ return;
+ }
+ SendRpcSlowPath();
+}
+
+void LookupRpcById::SendRpcSlowPath() {
+ req_.add_tablet_ids(tablet_id_);
+ req_.set_intern_ts_infos_in_response(true);
+ AsyncLeaderMasterRpc::SendRpc();
+}
+
+namespace {
+// Handles master-related errors and transient lookup-related errors,
+// scheduling retries and returning 'true' if rescheduled, in which case,
+// callers should ensure this object remains alive. Updates 'status' to include
+// more information based on the response.
+template <class LookupRpcClass, class RespClass>
+bool RetryLookupIfNecessary(Status* status,
+ const RespClass& resp,
+ rpc::RpcRetrier* retrier,
+ LookupRpcClass* rpc) {
+ if (rpc->RetryOrReconnectIfNecessary(status)) {
+ return true;
+ }
+ // Handle ServiceUnavailable codes from BuildLocationsForTablet().
+ if (status->ok() && resp.has_error()) {
+ *status = StatusFromPB(resp.error().status());
+ if (status->IsServiceUnavailable()) {
+ retrier->DelayedRetry(rpc, *status);
+ return true;
+ }
+ }
+ return false;
+}
+} // anonymous namespace
+
+void LookupRpcById::SendRpcCb(const Status& status) {
+ unique_ptr<LookupRpcById> delete_me(this);
+
+ // Check for generic lookup errors.
+ Status new_status = status;
+ if (RetryLookupIfNecessary(&new_status, resp_, mutable_retrier(), this)) {
+ ignore_result(delete_me.release());
+ return;
+ }
+
+ // If there were no errors, process the response.
+ if (new_status.ok()) {
+ MetaCacheEntry entry;
+ new_status = meta_cache_->ProcessGetTabletLocationsResponse(tablet_id_, resp_, &entry);
+ if (new_status.ok() && remote_tablet_) {
+ *remote_tablet_ = entry.tablet();
+ }
+ }
+ if (!new_status.ok()) {
+ // Otherwise, prep the final error.
+ new_status = new_status.CloneAndPrepend(Substitute("$0 failed", ToString()));
+ KLOG_EVERY_N_SECS(WARNING, 1) << new_status.ToString();
+ }
+ user_cb_(new_status);
+}
// A (table, partition_key) --> tablet lookup. May be in-flight to a master, or
// may be handled locally.
@@ -776,26 +898,13 @@ void LookupRpc::SendRpcCb(const Status& status) {
// itself.
unique_ptr<LookupRpc> delete_me(this);
- // Check for generic errors.
+ // Check for generic lookup errors.
Status new_status = status;
- if (RetryOrReconnectIfNecessary(&new_status)) {
+ if (RetryLookupIfNecessary(&new_status, resp_, mutable_retrier(), this)) {
ignore_result(delete_me.release());
return;
}
- // Check for more application errors.
- // Note: RetryOrReconnectIfNecessary only checked for generic application
- // errors. This check is specific to LookupRpc.
- if (new_status.ok() && resp_.has_error()) {
- new_status = StatusFromPB(resp_.error().status());
- if (new_status.IsServiceUnavailable()) {
- // One or more of the tablets is not running. Retry after some time.
- mutable_retrier()->DelayedRetry(this, new_status);
- ignore_result(delete_me.release());
- return;
- }
- }
-
// If there were no errors, process the response.
if (new_status.ok()) {
MetaCacheEntry entry;
@@ -834,6 +943,58 @@ Status MetaCache::ProcessLookupResponse(const LookupRpc& rpc,
}
+Status MetaCache::ProcessGetTabletLocationsResponse(const string& tablet_id,
+ const GetTabletLocationsResponsePB& resp,
+ MetaCacheEntry* cache_entry) {
+ MonoTime expiration_time = MonoTime::Now() +
+ MonoDelta::FromMilliseconds(FLAGS_client_tablet_locations_by_id_ttl_ms);
+ const auto& tablet_locations = resp.tablet_locations();
+ if (tablet_locations.empty()) {
+ return Status::NotFound("");
+ }
+ const auto& ts_infos = resp.ts_infos();
+ DCHECK_EQ(1, tablet_locations.size());
+ const auto& tablet = tablet_locations[0];
+ DCHECK_EQ(tablet_id, tablet.tablet_id());
+ const auto& tablet_lower_bound = tablet.partition().partition_key_start();
+ const auto& tablet_upper_bound = tablet.partition().partition_key_end();
+
+ std::lock_guard<percpu_rwlock> l(lock_);
+ for (const auto& ts_info : ts_infos) {
+ UpdateTabletServerUnlocked(ts_info);
+ }
+ scoped_refptr<RemoteTablet> remote = FindPtrOrNull(tablets_by_id_, tablet_id);
+ if (remote) {
+ // Partition should not have changed.
+ DCHECK_EQ(tablet_lower_bound, remote->partition().partition_key_start());
+ DCHECK_EQ(tablet_upper_bound, remote->partition().partition_key_end());
+
+ VLOG(3) << "Refreshing tablet " << tablet_id << ": " << SecureShortDebugString(tablet);
+ RETURN_NOT_OK_PREPEND(remote->Refresh(ts_cache_, tablet, ts_infos),
+ Substitute("failed to refresh locations for tablet $0",
+ tablet_id));
+ MetaCacheEntry entry(expiration_time, remote);
+ auto& mapped_entry = LookupOrEmplace(&entry_by_tablet_id_, tablet_id, std::move(entry));
+ // NOTE: it's harmless to call refresh_expiration_time() if we just
+ // constructed the entry with the same time.
+ mapped_entry.refresh_expiration_time(expiration_time);
+ } else {
+ Partition partition;
+ Partition::FromPB(tablet.partition(), &partition);
+ remote = new RemoteTablet(tablet_id, partition);
+ RETURN_NOT_OK_PREPEND(remote->Refresh(ts_cache_, tablet, ts_infos),
+ Substitute("failed to refresh locations for tablet $0",
+ tablet_id));
+ MetaCacheEntry entry(expiration_time, remote);
+ VLOG(3) << Substitute("Caching '$0' entry", tablet_id);
+
+ EmplaceOrDie(&tablets_by_id_, tablet_id, std::move(remote));
+ EmplaceOrDie(&entry_by_tablet_id_, tablet_id, std::move(entry));
+ }
+ *cache_entry = FindOrDie(entry_by_tablet_id_, tablet_id);
+ return Status::OK();
+}
+
Status MetaCache::ProcessGetTableLocationsResponse(const KuduTable* table,
const string& partition_key,
bool is_exact_lookup,
@@ -859,14 +1020,14 @@ Status MetaCache::ProcessGetTableLocationsResponse(const KuduTable* table,
// It's used for backward compatibility.
for (const TabletLocationsPB& tablet : tablet_locations) {
for (const auto& replicas : tablet.deprecated_replicas()) {
- UpdateTabletServer(replicas.ts_info());
+ UpdateTabletServerUnlocked(replicas.ts_info());
}
}
// In the case of "interned" replicas, the above 'deprecated_replicas' lists will be empty
// and instead we'll need to update from the top-level list of tservers.
const auto& ts_infos = resp.ts_infos();
for (const TSInfoPB& ts_info : ts_infos) {
- UpdateTabletServer(ts_info);
+ UpdateTabletServerUnlocked(ts_info);
}
// The comments below will reference the following diagram:
@@ -921,31 +1082,37 @@ Status MetaCache::ProcessGetTableLocationsResponse(const KuduTable* table,
// about the tablet, then we only need to refresh it's replica locations
// and the entry TTL. If the tablet is unknown, then we need to create a
// new RemoteTablet for it.
-
const string& tablet_id = tablet.tablet_id();
scoped_refptr<RemoteTablet> remote = FindPtrOrNull(tablets_by_id_, tablet_id);
- if (remote.get() != nullptr) {
+ if (remote) {
// Partition should not have changed.
DCHECK_EQ(tablet_lower_bound, remote->partition().partition_key_start());
DCHECK_EQ(tablet_upper_bound, remote->partition().partition_key_end());
- VLOG(3) << "Refreshing tablet " << tablet_id << ": "
- << pb_util::SecureShortDebugString(tablet);
+ VLOG(3) << Substitute("Refreshing tablet $0: $1",
+ tablet_id, SecureShortDebugString(tablet));
RETURN_NOT_OK_PREPEND(remote->Refresh(ts_cache_, tablet, ts_infos),
Substitute("failed to refresh locations for tablet $0",
tablet_id));
// Update the entry TTL.
- auto& entry = FindOrDie(tablets_by_key, tablet_lower_bound);
- DCHECK(!entry.is_non_covered_range() &&
- entry.upper_bound_partition_key() == tablet_upper_bound);
- entry.refresh_expiration_time(expiration_time);
+ auto* entry = FindOrNull(tablets_by_key, tablet_lower_bound);
+ if (entry) {
+ DCHECK(!entry->is_non_covered_range() &&
+ entry->upper_bound_partition_key() == tablet_upper_bound);
+ entry->refresh_expiration_time(expiration_time);
+ } else {
+ // A remote tablet exists, but isn't indexed for key-based lookups.
+ // Index it now.
+ MetaCacheEntry entry(expiration_time, remote);
+ VLOG(3) << Substitute("Caching '$0' entry $1", table->name(), entry.DebugString(table));
+ EmplaceOrDie(&tablets_by_key, tablet_lower_bound, std::move(entry));
+ }
continue;
}
-
// Clear any existing entries which overlap with the discovered tablet.
tablets_by_key.erase(tablets_by_key.lower_bound(tablet_lower_bound),
tablet_upper_bound.empty() ? tablets_by_key.end() :
- tablets_by_key.lower_bound(tablet_upper_bound));
+ tablets_by_key.lower_bound(tablet_upper_bound));
Partition partition;
Partition::FromPB(tablet.partition(), &partition);
@@ -957,8 +1124,8 @@ Status MetaCache::ProcessGetTableLocationsResponse(const KuduTable* table,
MetaCacheEntry entry(expiration_time, remote);
VLOG(3) << "Caching '" << table->name() << "' entry " << entry.DebugString(table);
- InsertOrDie(&tablets_by_id_, tablet_id, remote);
- InsertOrDie(&tablets_by_key, tablet_lower_bound, entry);
+ EmplaceOrDie(&tablets_by_id_, tablet_id, std::move(remote));
+ EmplaceOrDie(&tablets_by_key, tablet_lower_bound, std::move(entry));
}
if (!last_upper_bound.empty() && tablet_locations.size() < max_returned_locations) {
@@ -1038,6 +1205,33 @@ Status MetaCache::DoFastPathLookup(const KuduTable* table,
return Status::Incomplete("");
}
+bool MetaCache::LookupEntryByIdFastPath(const string& tablet_id,
+ MetaCacheEntry* entry) {
+ shared_lock<rw_spinlock> l(lock_.get_lock());
+ const auto* cache_entry = FindOrNull(entry_by_tablet_id_, tablet_id);
+ if (PREDICT_FALSE(!cache_entry)) {
+ return false;
+ }
+ if (cache_entry->stale()) {
+ return false;
+ }
+ *entry = *cache_entry;
+ return true;
+}
+
+Status MetaCache::DoFastPathLookupById(const string& tablet_id,
+ scoped_refptr<RemoteTablet>* remote_tablet) {
+ MetaCacheEntry entry;
+ if (PREDICT_TRUE(LookupEntryByIdFastPath(tablet_id, &entry))) {
+ DCHECK(!entry.is_non_covered_range());
+ if (remote_tablet) {
+ *remote_tablet = entry.tablet();
+ }
+ return Status::OK();
+ }
+ return Status::Incomplete("");
+}
+
void MetaCache::ClearNonCoveredRangeEntries(const std::string& table_id) {
VLOG(3) << "Clearing non-covered range entries of table " << table_id;
std::lock_guard<percpu_rwlock> l(lock_);
@@ -1063,6 +1257,7 @@ void MetaCache::ClearCache() {
STLDeleteValues(&ts_cache_);
tablets_by_id_.clear();
tablets_by_table_and_key_.clear();
+ entry_by_tablet_id_.clear();
}
void MetaCache::LookupTabletByKey(const KuduTable* table,
@@ -1091,6 +1286,21 @@ void MetaCache::LookupTabletByKey(const KuduTable* table,
rpc->SendRpcSlowPath();
}
+void MetaCache::LookupTabletById(KuduClient* client,
+ const string& tablet_id,
+ const MonoTime& deadline,
+ scoped_refptr<RemoteTablet>* remote_tablet,
+ const StatusCallback& callback) {
+ Status fastpath_status = DoFastPathLookupById(tablet_id, remote_tablet);
+ if (!fastpath_status.IsIncomplete()) {
+ callback(fastpath_status);
+ return;
+ }
+ LookupRpcById* rpc = new LookupRpcById(this, client, callback, tablet_id,
+ remote_tablet, deadline);
+ rpc->SendRpcSlowPath();
+}
+
void MetaCache::MarkTSFailed(RemoteTabletServer* ts,
const Status& status) {
LOG(INFO) << "Marking tablet server " << ts->ToString() << " as failed.";
diff --git a/src/kudu/client/meta_cache.h b/src/kudu/client/meta_cache.h
index d7e8481..af59d2e 100644
--- a/src/kudu/client/meta_cache.h
+++ b/src/kudu/client/meta_cache.h
@@ -49,14 +49,15 @@ namespace kudu {
class Sockaddr;
namespace tserver {
-class TabletServerServiceProxy;
class TabletServerAdminServiceProxy;
+class TabletServerServiceProxy;
} // namespace tserver
namespace master {
+class GetTableLocationsResponsePB;
+class GetTabletLocationsResponsePB;
class TSInfoPB;
class TabletLocationsPB;
-class GetTableLocationsResponsePB;
} // namespace master
namespace client {
@@ -187,7 +188,8 @@ class MetaCacheServerPicker : public rpc::ServerPicker<RemoteTabletServer> {
// A ref to the meta cache.
scoped_refptr<MetaCache> meta_cache_;
- // The table we're writing to.
+ // The table we're writing to. If null, relies on tablet ID-based lookups
+ // instead of partition key-based lookups.
const KuduTable* table_;
// The tablet we're picking replicas for.
@@ -365,13 +367,13 @@ class MetaCacheEntry {
// metadata.
std::string DebugString(const KuduTable* table) const;
- private:
-
// Returns true if the entry is initialized.
bool Initialized() const {
return expiration_time_.Initialized();
}
+ private:
+
// The expiration time of this cached entry.
MonoTime expiration_time_;
@@ -422,12 +424,30 @@ class MetaCache : public RefCountedThreadSafe<MetaCache> {
scoped_refptr<RemoteTablet>* remote_tablet,
const StatusCallback& callback);
+ // Look up the locations of the given tablet, storing the result in
+ // 'remote_tablet' if not null, and calling 'callback' once the lookup is
+ // complete. Only tablets with non-failed LEADERs are considered.
+ //
+ // NOTE: the callback may be called from an IO thread or inline with this
+ // call if the cached data is already available.
+ void LookupTabletById(KuduClient* client,
+ const std::string& tablet_id,
+ const MonoTime& deadline,
+ scoped_refptr<RemoteTablet>* remote_tablet,
+ const StatusCallback& callback);
+
// Lookup the given tablet by key, only consulting local information.
// Returns true and sets *entry if successful.
bool LookupEntryByKeyFastPath(const KuduTable* table,
const std::string& partition_key,
MetaCacheEntry* entry);
+ // Lookup the given tablet by tablet ID, only consulting local information.
+ // Returns true and sets *entry if successful.
+ bool LookupEntryByIdFastPath(const std::string& tablet_id,
+ MetaCacheEntry* entry);
+ // Process the response for the given key-based lookup parameters, indexing
+ // the location information as appropriate.
Status ProcessGetTableLocationsResponse(const KuduTable* table,
const std::string& partition_key,
bool is_exact_lookup,
@@ -458,6 +478,7 @@ class MetaCache : public RefCountedThreadSafe<MetaCache> {
private:
friend class LookupRpc;
+ friend class LookupRpcById;
FRIEND_TEST(client::ClientTest, TestMasterLookupPermits);
FRIEND_TEST(client::ClientTest, TestMetaCacheExpiry);
@@ -468,10 +489,16 @@ class MetaCache : public RefCountedThreadSafe<MetaCache> {
MetaCacheEntry* cache_entry,
int max_returned_locations);
+ // Process the response for the given id-based lookup parameters, indexing
+ // the location information as appropriate.
+ Status ProcessGetTabletLocationsResponse(const std::string& tablet_id,
+ const master::GetTabletLocationsResponsePB& resp,
+ MetaCacheEntry* cache_entry);
+
// Perform the complete fast-path lookup. Returns:
- // - NotFound if the lookup hits a non-covering range.
- // - Incomplete if the fast path was not possible
- // - OK if the lookup was successful.
+ // - NotFound if the lookup hits a non-covering range.
+ // - Incomplete if the fast path was not possible
+ // - OK if the lookup was successful.
//
// If 'lookup_type' is kLowerBound, then 'partition_key' will be updated to indicate the
// start of the range for the matched tablet.
@@ -480,13 +507,23 @@ class MetaCache : public RefCountedThreadSafe<MetaCache> {
LookupType lookup_type,
scoped_refptr<RemoteTablet>* remote_tablet);
+ // Perform the fast-path lookup by tablet ID. Returns:
+ // - Incomplete if there was no cache entry
+ // - OK if the lookup was successful
+ //
+ // If 'remote_tablet' isn't null, it is populated with a pointer to the
+ // RemoteTablet being looked up. Otherwise, just does the lookup, priming the
+ // cache with the location.
+ Status DoFastPathLookupById(const std::string& tablet_id,
+ scoped_refptr<RemoteTablet>* remote_tablet);
+
// Update our information about the given tablet server.
//
// This is called when we get some response from the master which contains
// the latest host/port info for a server.
//
// NOTE: Must be called with lock_ held.
- void UpdateTabletServer(const master::TSInfoPB& pb);
+ void UpdateTabletServerUnlocked(const master::TSInfoPB& pb);
KuduClient* client_;
@@ -501,17 +538,28 @@ class MetaCache : public RefCountedThreadSafe<MetaCache> {
// Protected by lock_.
TabletServerMap ts_cache_;
- // Cache of tablets, keyed by partition key.
+ // Cache entries for tablets and non-covered ranges, keyed by table ID, used
+ // for key-based lookups.
//
// Protected by lock_.
typedef std::map<std::string, MetaCacheEntry> TabletMap;
+ std::unordered_map<std::string, TabletMap> tablets_by_table_and_key_;
- // Cache of tablets and non-covered ranges, keyed by table id.
+ // Cache entries for tablets, keyed by tablet ID, used for ID-based lookups.
+ // NOTE: existence in 'tablets_by_table_and_key' does not imply existence in
+ // 'entry_by_tablet_id_', and vice versa.
//
// Protected by lock_.
- std::unordered_map<std::string, TabletMap> tablets_by_table_and_key_;
-
- // Cache of tablets, keyed by tablet ID.
+ //
+ // TODO(awong): it might be nice for ID-based lookups and table-based lookups
+ // to use the same entries. It's currently tricky to do so since ID-based
+ // lookups don't incur any table metadata, making lookups by table ID tricky.
+ std::unordered_map<std::string, MetaCacheEntry> entry_by_tablet_id_;
+
+ // The underlying remote tablets pointed to by the above cache entry
+ // containers, keyed by tablet ID. If an entry does not exist for a given
+ // tablet ID in this container, none can exist in either of the above
+ // containers.
//
// Protected by lock_
std::unordered_map<std::string, scoped_refptr<RemoteTablet>> tablets_by_id_;