You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/09/06 01:42:26 UTC

[2/2] kudu git commit: catalog_manager: improve IsAlterInProgress performance

catalog_manager: improve IsAlterInProgress performance

To mentally prepare for KUDU-1807, I decided to also tackle the O(n)
behavior of IsAlterInProgress. It's not nearly as bad as IsCreateInProgress
(since we aren't taking cow locks on each tablet) but is still not great
since IsAlterInProgress is called repeatedly by clients.

The solution involves a new map in TableInfo to track the schema versions
of all tablets. Keeping the map up-to-date requires additional work when
adding/removing tablets and when tablet reports include a schema change, but
these are infrequent operations relative to IsAlterInProgress so I figured
the trade off is worth it.

I'm not particularly happy about the lock acquisition in
set_reported_schema_version() but couldn't see a good way to avoid it.

I looped 1000 runs of alter_table-randomized-test. The ~5 failures I saw
were due to KUDU-1539, which I don't think this patch makes us any more or
less vulnerable to.

Change-Id: Id8c1f48c0febad038833edd555ee88f1db83249d
Reviewed-on: http://gerrit.cloudera.org:8080/7950
Reviewed-by: Dan Burkert <da...@apache.org>
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a71ecfd4
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a71ecfd4
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a71ecfd4

Branch: refs/heads/master
Commit: a71ecfd4a232c82ccf6f3b8fb452eaed3528ad46
Parents: 72dc8fb
Author: Adar Dembo <ad...@cloudera.com>
Authored: Fri Sep 1 19:43:47 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Sep 6 01:42:00 2017 +0000

----------------------------------------------------------------------
 src/kudu/master/catalog_manager.cc | 88 +++++++++++++++++++++++++++------
 src/kudu/master/catalog_manager.h  | 39 ++++++++++++---
 2 files changed, 105 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/a71ecfd4/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index ccedd3a..ef69cf5 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -4241,7 +4241,7 @@ TabletInfo::TabletInfo(const scoped_refptr<TableInfo>& table, string tablet_id)
     : tablet_id_(std::move(tablet_id)),
       table_(table),
       last_create_tablet_time_(MonoTime::Now()),
-      reported_schema_version_(0) {}
+      reported_schema_version_(NOT_YET_REPORTED) {}
 
 TabletInfo::~TabletInfo() {
 }
@@ -4256,16 +4256,38 @@ MonoTime TabletInfo::last_create_tablet_time() const {
   return last_create_tablet_time_;
 }
 
-bool TabletInfo::set_reported_schema_version(uint32_t version) {
-  std::lock_guard<simple_spinlock> l(lock_);
-  if (version > reported_schema_version_) {
-    reported_schema_version_ = version;
-    return true;
+void TabletInfo::set_reported_schema_version(int64_t version) {
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+
+    // Fast path: there's no schema version change.
+    if (version <= reported_schema_version_) {
+      return;
+    }
   }
-  return false;
+
+  // Slow path: we have a schema version change.
+  //
+  // We need to hold both the table and tablet locks to make the change. By
+  // convention, the table lock is always acquired first.
+  std::lock_guard<rw_spinlock> table_l(table_->lock_);
+  std::lock_guard<simple_spinlock> tablet_l(lock_);
+
+  // Check again in case the schema version changed underneath us.
+  int64_t old_version = reported_schema_version_;
+  if (version <= old_version) {
+    return;
+  }
+
+  // Perform the changes.
+  VLOG(3) << Substitute("$0: schema version changed from $1 to $2",
+                        ToString(), old_version, version);
+  reported_schema_version_ = version;
+  table_->DecrementSchemaVersionCountUnlocked(old_version);
+  table_->IncrementSchemaVersionCountUnlocked(version);
 }
 
-uint32_t TabletInfo::reported_schema_version() const {
+int64_t TabletInfo::reported_schema_version() const {
   std::lock_guard<simple_spinlock> l(lock_);
   return reported_schema_version_;
 }
@@ -4300,6 +4322,7 @@ void TableInfo::AddRemoveTablets(const vector<scoped_refptr<TabletInfo>>& tablet
   for (const auto& tablet : tablets_to_drop) {
     const auto& lower_bound = tablet->metadata().state().pb.partition().partition_key_start();
     CHECK(EraseKeyReturnValuePtr(&tablet_map_, lower_bound) != nullptr);
+    DecrementSchemaVersionCountUnlocked(tablet->reported_schema_version());
   }
   for (const auto& tablet : tablets_to_add) {
     TabletInfo* old = nullptr;
@@ -4308,11 +4331,20 @@ void TableInfo::AddRemoveTablets(const vector<scoped_refptr<TabletInfo>>& tablet
                          tablet.get(), &old)) {
       VLOG(1) << Substitute("Replaced tablet $0 with $1",
                             old->tablet_id(), tablet->tablet_id());
+      DecrementSchemaVersionCountUnlocked(old->reported_schema_version());
+
       // TODO(unknown): can we assert that the replaced tablet is not in Running state?
       // May be a little tricky since we don't know whether to look at its committed or
       // uncommitted state.
     }
+    IncrementSchemaVersionCountUnlocked(tablet->reported_schema_version());
   }
+
+#ifndef NDEBUG
+  if (tablet_map_.empty()) {
+    DCHECK(schema_version_counts_.empty());
+  }
+#endif
 }
 
 void TableInfo::GetTabletsInRange(const GetTableLocationsRequestPB* req,
@@ -4345,15 +4377,18 @@ void TableInfo::GetTabletsInRange(const GetTableLocationsRequestPB* req,
 
 bool TableInfo::IsAlterInProgress(uint32_t version) const {
   shared_lock<rw_spinlock> l(lock_);
-  for (const auto& e : tablet_map_) {
-    if (e.second->reported_schema_version() < version) {
-      VLOG(3) << Substitute("Table $0 ALTER in progress due to tablet $1 "
-          "because reported schema $2 < expected $3", table_id_,
-          e.second->ToString(), e.second->reported_schema_version(), version);
-      return true;
-    }
+  auto it = schema_version_counts_.begin();
+  if (it == schema_version_counts_.end()) {
+    // The table has no tablets.
+    return false;
   }
-  return false;
+  DCHECK_GT(it->second, 0);
+
+  // 'it->first' is either NOT_YET_REPORTED (if at least one tablet has yet to
+  // report), or it's the lowest schema version belonging to at least one
+  // tablet. The numeric value of NOT_YET_REPORTED is -1 so we can compare it
+  // to 'version' either way.
+  return it->first < static_cast<int64_t>(version);
 }
 
 bool TableInfo::IsCreateInProgress() const {
@@ -4422,6 +4457,27 @@ void TableInfo::GetAllTablets(vector<scoped_refptr<TabletInfo>>* ret) const {
   }
 }
 
+void TableInfo::IncrementSchemaVersionCountUnlocked(int64_t version) {
+  DCHECK(lock_.is_write_locked());
+  schema_version_counts_[version]++;
+}
+
+void TableInfo::DecrementSchemaVersionCountUnlocked(int64_t version) {
+  DCHECK(lock_.is_write_locked());
+
+  // The schema version map invariant is that every tablet should be
+  // represented. To enforce this, if the decrement reduces a particular key's
+  // value to 0, we must erase the key too.
+  auto it = schema_version_counts_.find(version);
+  DCHECK(it != schema_version_counts_.end())
+      << Substitute("$0 not in schema version map", version);
+  DCHECK_GT(it->second, 0);
+  it->second--;
+  if (it->second == 0) {
+    schema_version_counts_.erase(it);
+  }
+}
+
 void PersistentTableInfo::set_state(SysTablesEntryPB::State state, const string& msg) {
   pb.set_state(state);
   pb.set_state_msg(msg);

http://git-wip-us.apache.org/repos/asf/kudu/blob/a71ecfd4/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 24fd30a..2851cab 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -138,6 +138,14 @@ class TabletInfo : public RefCountedThreadSafe<TabletInfo> {
  public:
   typedef PersistentTabletInfo cow_state;
 
+  enum {
+    // Schema version when the tablet has yet to report.
+    //
+    // This value must be less than all possible schema versions. -1 is
+    // appropriate; schema versions range from 0 to UINT32_MAX.
+    NOT_YET_REPORTED = -1L
+  };
+
   TabletInfo(const scoped_refptr<TableInfo>& table, std::string tablet_id);
 
   const std::string& tablet_id() const { return tablet_id_; }
@@ -152,9 +160,14 @@ class TabletInfo : public RefCountedThreadSafe<TabletInfo> {
   void set_last_create_tablet_time(const MonoTime& ts);
   MonoTime last_create_tablet_time() const;
 
-  // Accessors for the last reported schema version
-  bool set_reported_schema_version(uint32_t version);
-  uint32_t reported_schema_version() const;
+  // Sets the reported schema version to 'version' provided it's not already
+  // equal to or greater than it.
+  //
+  // Also reflects the version change to the table's schema version map.
+  void set_reported_schema_version(int64_t version);
+
+  // Simple accessor for reported_schema_version_.
+  int64_t reported_schema_version() const;
 
   // No synchronization needed.
   std::string ToString() const;
@@ -176,7 +189,9 @@ class TabletInfo : public RefCountedThreadSafe<TabletInfo> {
   MonoTime last_create_tablet_time_;
 
   // Reported schema version (in-memory only).
-  uint32_t reported_schema_version_;
+  //
+  // Set to NOT_YET_REPORTED when the tablet hasn't yet reported.
+  int64_t reported_schema_version_;
 
   DISALLOW_COPY_AND_ASSIGN(TabletInfo);
 };
@@ -272,9 +287,15 @@ class TableInfo : public RefCountedThreadSafe<TableInfo> {
 
  private:
   friend class RefCountedThreadSafe<TableInfo>;
+  friend class TabletInfo;
   ~TableInfo();
 
-  void AddTabletUnlocked(TabletInfo* tablet);
+  // Increments or decrements the value for the key 'version' in
+  // 'schema_version_counts'.
+  //
+  // Must be called with 'lock_' held for writing.
+  void IncrementSchemaVersionCountUnlocked(int64_t version);
+  void DecrementSchemaVersionCountUnlocked(int64_t version);
 
   const std::string table_id_;
 
@@ -285,7 +306,7 @@ class TableInfo : public RefCountedThreadSafe<TableInfo> {
   typedef std::map<std::string, TabletInfo*> RawTabletInfoMap;
   RawTabletInfoMap tablet_map_;
 
-  // Protects tablet_map_ and pending_tasks_
+  // Protects tablet_map_, pending_tasks_, and schema_version_counts_.
   mutable rw_spinlock lock_;
 
   CowObject<PersistentTableInfo> metadata_;
@@ -293,6 +314,12 @@ class TableInfo : public RefCountedThreadSafe<TableInfo> {
   // List of pending tasks (e.g. create/alter tablet requests)
   std::unordered_set<MonitoredTask*> pending_tasks_;
 
+  // Map of schema version to the number of tablets that reported that version.
+  //
+  // All tablets are represented here regardless of whether they've reported.
+  // Tablets yet to report will count towards the special NOT_YET_REPORTED key.
+  std::map<int64_t, int64_t> schema_version_counts_;
+
   DISALLOW_COPY_AND_ASSIGN(TableInfo);
 };