You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2023/07/29 01:53:33 UTC

[impala] 02/02: IMPALA-12321: Fix the race condition when updating active catalogd

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e48af8c04ade5b270b1c601b8904fbd144241e29
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Tue Jul 25 16:55:34 2023 -0700

    IMPALA-12321: Fix the race condition when updating active catalogd
    
    When CatalogD HA is enabled, statestored sends the address of current
    active catalogd to coordinators and catalogds on two different events:
    catalogd registration, and new active catalogd election change.
    Statestored sends the active catalogd in two different kinds of RPCs.
    If there are more than one election changes in short time, coordinators
    and catalogds could receive RPCs in the order which are different from
    the changing order on statestore.
    To make coordinators and catalogds to have same view as statestore, we
    have to avoid to overwrite the latest version of active catalogd with
    previous change.
    
    Version of active catalogd is already in UpdateCatalogd RPC, but not
    in response message of statestore registration. This patch adds active
    catalogd version in the response message of statestore registration.
    Coordinators and catalogds only apply the changes which have newer
    version than the version of last recevied active catalogd.
    The version of last received active catalogd have to be re-synced for
    new registration since statestore could be restarted.
    Allow subscribers to skip UpdateCatalogd RPC if subscribers cannot
    handle it, for example the statestore-id is unknown for a subscriber
    since the RPC is received before registration is completed. The
    skipped RPCs will be resent by statestore.
    
    Testing:
     - Added a test case to start both catalogds with flag
       'force_catalogd_active' as true.
     - Passed core tests
    
    Change-Id: Ie49947e563d43c59bdd476b28c35be69848ae12a
    Reviewed-on: http://gerrit.cloudera.org:8080/20276
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
---
 be/src/catalog/catalog-server.cc                   |  43 ++++----
 be/src/catalog/catalog-server.h                    |  22 +++-
 be/src/runtime/exec-env.cc                         |  21 ++--
 be/src/runtime/exec-env.h                          |  18 +++-
 be/src/statestore/statestore-catalogd-mgr.cc       |  26 +++--
 be/src/statestore/statestore-catalogd-mgr.h        |  10 +-
 be/src/statestore/statestore-subscriber-catalog.cc |  30 ++++++
 be/src/statestore/statestore-subscriber-catalog.h  |  23 ++++
 be/src/statestore/statestore-subscriber.cc         | 120 +++++++++++++--------
 be/src/statestore/statestore-subscriber.h          |  46 ++++----
 be/src/statestore/statestore.cc                    |  47 +++++---
 be/src/statestore/statestore.h                     |   3 +-
 common/thrift/StatestoreService.thrift             |  12 ++-
 tests/custom_cluster/test_catalogd_ha.py           |  35 +++++-
 14 files changed, 315 insertions(+), 141 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 961a37fd6..6d2b47c96 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -378,6 +378,7 @@ Status CatalogServer::Start() {
   RETURN_IF_ERROR(Thread::Create("catalog-server", "catalog-metrics-refresh-thread",
       &CatalogServer::RefreshMetrics, this, &catalog_metrics_refresh_thread_));
 
+  active_catalogd_version_checker_.reset(new ActiveCatalogdVersionChecker());
   statestore_subscriber_.reset(new StatestoreSubscriberCatalog(
      Substitute("catalog-server@$0", TNetworkAddressToString(server_address)),
      subscriber_address, statestore_address, metrics_, protocol_version_,
@@ -398,20 +399,18 @@ Status CatalogServer::Start() {
     return status;
   }
   // Add callback to handle notification of updating catalogd from Statestore.
-  StatestoreSubscriber::UpdateCatalogdCallback update_catalogd_cb =
-      bind<void>(mem_fn(&CatalogServer::UpdateRegisteredCatalogd), this, _1);
-  statestore_subscriber_->AddUpdateCatalogdTopic(update_catalogd_cb);
-
-  bool has_active_catalogd = false;
-  TCatalogRegistration active_catalogd_registration;
-  RETURN_IF_ERROR(
-      statestore_subscriber_->Start(&has_active_catalogd, &active_catalogd_registration));
-  if (FLAGS_enable_catalogd_ha && has_active_catalogd) {
-    UpdateRegisteredCatalogd(active_catalogd_registration);
-    if (FLAGS_force_catalogd_active && !IsActive()) {
-      LOG(ERROR) << "Could not start CatalogD as active instance";
-      return Status("Could not start CatalogD as active instance");
-    }
+  if (FLAGS_enable_catalogd_ha) {
+    StatestoreSubscriber::UpdateCatalogdCallback update_catalogd_cb =
+        bind<void>(mem_fn(&CatalogServer::UpdateActiveCatalogd), this, _1, _2, _3);
+    statestore_subscriber_->AddUpdateCatalogdTopic(update_catalogd_cb);
+  }
+
+  RETURN_IF_ERROR(statestore_subscriber_->Start());
+  if (FLAGS_force_catalogd_active && !IsActive()) {
+    // If both catalogd are started with 'force_catalogd_active' as true in short time,
+    // the second election overwrite the first election. The one which registering with
+    // statstore first will be inactive.
+    LOG(WARNING) << "Could not start CatalogD as active instance";
   }
 
   // Notify the thread to start for the first time.
@@ -497,8 +496,13 @@ void CatalogServer::UpdateCatalogTopicCallback(
   catalog_update_cv_.NotifyOne();
 }
 
-void CatalogServer::UpdateRegisteredCatalogd(
-    const TCatalogRegistration& catalogd_registration) {
+void CatalogServer::UpdateActiveCatalogd(bool is_registration_reply,
+    int64_t active_catalogd_version, const TCatalogRegistration& catalogd_registration) {
+  lock_guard<mutex> l(catalog_lock_);
+  if (!active_catalogd_version_checker_->CheckActiveCatalogdVersion(
+          is_registration_reply, active_catalogd_version)) {
+    return;
+  }
   if (catalogd_registration.address.hostname.empty()
       || catalogd_registration.address.port == 0) {
     return;
@@ -507,7 +511,6 @@ void CatalogServer::UpdateRegisteredCatalogd(
             << TNetworkAddressToString(catalogd_registration.address);
   bool is_matching = (catalogd_registration.address.hostname == FLAGS_hostname
       && catalogd_registration.address.port == FLAGS_catalog_service_port);
-  lock_guard<mutex> l(catalog_lock_);
   if (is_matching) {
     if (!is_active_) {
       is_active_ = true;
@@ -531,7 +534,11 @@ void CatalogServer::UpdateRegisteredCatalogd(
       is_active_ = false;
       active_status_metric_->SetValue(false);
       num_ha_active_status_change_metric_->Increment(1);
-      LOG(INFO) << "This catalogd instance is changed to inactive status";
+      LOG(INFO) << "This catalogd instance is changed to inactive status. "
+                << "Current active catalogd: "
+                << TNetworkAddressToString(catalogd_registration.address)
+                << ", active_catalogd_version: "
+                << active_catalogd_version;
     }
   }
 }
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 77cb744c2..b490894ce 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -38,8 +38,9 @@ using kudu::HttpStatusCode;
 
 namespace impala {
 
-class StatestoreSubscriber;
+class ActiveCatalogdVersionChecker;
 class Catalog;
+class StatestoreSubscriber;
 
 /// The Impala CatalogServer manages the caching and persistence of cluster-wide metadata.
 /// The CatalogServer aggregates the metadata from the Hive Metastore, the NameNode,
@@ -159,13 +160,17 @@ class CatalogServer {
   /// Thread that periodically wakes up and refreshes certain Catalog metrics.
   std::unique_ptr<Thread> catalog_metrics_refresh_thread_;
 
-  /// Protects is_active_, catalog_update_cv_, pending_topic_updates_,
-  /// catalog_objects_to/from_version_, and last_sent_catalog_version.
+  /// Protects is_active_, active_catalogd_version_checker_,
+  /// catalog_update_cv_, pending_topic_updates_, catalog_objects_to/from_version_, and
+  /// last_sent_catalog_version.
   std::mutex catalog_lock_;
 
   /// Set to true if this catalog instance is active.
   bool is_active_;
 
+  /// Object to track the version of received active catalogd.
+  boost::scoped_ptr<ActiveCatalogdVersionChecker> active_catalogd_version_checker_;
+
   /// Condition variable used to signal when the catalog_update_gathering_thread_ should
   /// fetch its next set of updates from the JniCatalog. At the end of each statestore
   /// heartbeat, this CV is signaled and the catalog_update_gathering_thread_ starts
@@ -206,8 +211,15 @@ class CatalogServer {
       const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
       std::vector<TTopicDelta>* subscriber_topic_updates);
 
-  /// Callback function for receiving notification of updating catalogd.
-  void UpdateRegisteredCatalogd(const TCatalogRegistration& catalogd_registration);
+  /// Callback function for receiving notification of new active catalogd.
+  /// This function is called when active catalogd is found from registration process,
+  /// or UpdateCatalogd RPC is received. The two kinds of RPCs could be received out of
+  /// sending order.
+  /// Reset 'last_active_catalogd_version_' if 'is_registration_reply' is true and
+  /// 'active_catalogd_version' is negative. In this case, 'catalogd_registration' is
+  /// invalid and should not be used.
+  void UpdateActiveCatalogd(bool is_registration_reply, int64_t active_catalogd_version,
+      const TCatalogRegistration& catalogd_registration);
 
   /// Returns the active status of the catalogd.
   bool IsActive();
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 28597f1d5..a9f3a654d 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -51,6 +51,7 @@
 #include "service/data-stream-service.h"
 #include "service/frontend.h"
 #include "service/impala-server.h"
+#include "statestore/statestore-subscriber-catalog.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
@@ -249,6 +250,7 @@ ExecEnv::ExecEnv(int krpc_port, int subscriber_port, int webserver_port,
 
   catalogd_address_ = std::make_shared<const TNetworkAddress>(
       MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port));
+  active_catalogd_version_checker_.reset(new ActiveCatalogdVersionChecker());
 
   TStatestoreSubscriberType::type subscriber_type = TStatestoreSubscriberType::UNKNOWN;
   if (FLAGS_is_coordinator) {
@@ -267,7 +269,7 @@ ExecEnv::ExecEnv(int krpc_port, int subscriber_port, int webserver_port,
         CreateHdfsOpThreadPool("hdfs-worker-pool", FLAGS_num_hdfs_worker_threads, 1024));
 
     StatestoreSubscriber::UpdateCatalogdCallback update_catalogd_cb =
-        bind<void>(mem_fn(&ExecEnv::UpdateCatalogd), this, _1);
+        bind<void>(mem_fn(&ExecEnv::UpdateActiveCatalogd), this, _1, _2, _3);
     statestore_subscriber_->AddUpdateCatalogdTopic(update_catalogd_cb);
   }
   StatestoreSubscriber::CompleteRegistrationCallback complete_registration_cb =
@@ -514,18 +516,12 @@ Status ExecEnv::StartStatestoreSubscriberService() {
 
   // Must happen after all topic registrations / callbacks are done
   if (statestore_subscriber_.get() != nullptr) {
-    bool has_active_catalogd = false;
-    TCatalogRegistration active_catalogd_registration;
-    Status status = statestore_subscriber_->Start(
-        &has_active_catalogd, &active_catalogd_registration);
+    Status status = statestore_subscriber_->Start();
     if (!status.ok()) {
       status.AddDetail("Statestore subscriber did not start up.");
       return status;
     }
     if (statestore_subscriber_->IsRegistered()) SetStatestoreRegistrationCompleted();
-    if (has_active_catalogd && FLAGS_is_coordinator) {
-      UpdateCatalogd(active_catalogd_registration);
-    }
   }
 
   return Status::OK();
@@ -671,7 +667,13 @@ Status ExecEnv::GetAdmissionServiceAddress(
   return Status::OK();
 }
 
-void ExecEnv::UpdateCatalogd(const TCatalogRegistration& catalogd_registration) {
+void ExecEnv::UpdateActiveCatalogd(bool is_registration_reply,
+    int64 active_catalogd_version, const TCatalogRegistration& catalogd_registration) {
+  std::lock_guard<std::mutex> l(catalogd_address_lock_);
+  if (!active_catalogd_version_checker_->CheckActiveCatalogdVersion(
+          is_registration_reply, active_catalogd_version)) {
+    return;
+  }
   if (catalogd_registration.address.hostname.empty()
       || catalogd_registration.address.port == 0) {
     return;
@@ -691,7 +693,6 @@ void ExecEnv::UpdateCatalogd(const TCatalogRegistration& catalogd_registration)
               << statestore_subscriber_->GetCatalogProtocolVersion()
               << " vs. " << catalogd_registration.protocol;
   }
-  std::lock_guard<std::mutex> l(catalogd_address_lock_);
   DCHECK(catalogd_address_.get() != nullptr);
   if (!is_catalogd_address_metric_set_) {
     // At least set the metric once.
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 26579f72b..e71878fc3 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -42,6 +42,7 @@ class KuduClient;
 
 namespace impala {
 
+class ActiveCatalogdVersionChecker;
 class AdmissionController;
 class BufferPool;
 class CallableThreadPool;
@@ -204,9 +205,15 @@ class ExecEnv {
     statestore_registration_completed_.CompareAndSwap(0, 1);
   }
 
-  /// Callback function for receiving notification of updating catalogd.
-  /// This function is called after catalogd is registered to statestore.
-  void UpdateCatalogd(const TCatalogRegistration& catalogd_registration);
+  /// Callback function for receiving notification of new active catalogd.
+  /// This function is called when active catalogd is found from registration process,
+  /// or UpdateCatalogd RPC is received. The two kinds of RPCs could be received out of
+  /// sending order.
+  /// Reset 'last_active_catalogd_version_' if 'is_registration_reply' is true and
+  /// 'active_catalogd_version' is negative. In this case, 'catalogd_registration' is
+  /// invalid and should not be used.
+  void UpdateActiveCatalogd(bool is_registration_reply, int64_t active_catalogd_version,
+      const TCatalogRegistration& catalogd_registration);
 
   /// Return the current address of Catalog service.
   std::shared_ptr<const TNetworkAddress> GetCatalogdAddress() const;
@@ -321,10 +328,13 @@ class ExecEnv {
   /// Current address of Catalog service
   std::shared_ptr<const TNetworkAddress> catalogd_address_;
 
+  /// Object to track the version of received active catalogd.
+  boost::scoped_ptr<ActiveCatalogdVersionChecker> active_catalogd_version_checker_;
+
   /// Flag that indicate if the metric for catalogd address has been set.
   bool is_catalogd_address_metric_set_ = false;
 
-  /// Protects catalogd_address_.
+  /// Protects catalogd_address_ and active_catalogd_version_tracker_.
   mutable std::mutex catalogd_address_lock_;
 
   /// Initialize ExecEnv based on Hadoop config from frontend.
diff --git a/be/src/statestore/statestore-catalogd-mgr.cc b/be/src/statestore/statestore-catalogd-mgr.cc
index e86625f46..a381e1a6b 100644
--- a/be/src/statestore/statestore-catalogd-mgr.cc
+++ b/be/src/statestore/statestore-catalogd-mgr.cc
@@ -58,7 +58,7 @@ bool StatestoreCatalogdMgr::RegisterCatalogd(bool is_reregistering,
     DCHECK(num_registered_catalogd_ < 2);
     is_active_catalogd_assigned_ = true;
     COPY_CATALOGD_REGISTRATION_FROM_LOCAL_VARIABLES(active);
-    ++sending_sequence_;
+    ++active_catalogd_version_;
     return true;
   }
 
@@ -72,7 +72,7 @@ bool StatestoreCatalogdMgr::RegisterCatalogd(bool is_reregistering,
         COPY_CATALOGD_REGISTRATION_FROM_LOCAL_VARIABLES(active);
         LOG(INFO) << active_catalogd_subscriber_id_
                   << " is re-registered with FLAGS_force_catalogd_active.";
-        ++sending_sequence_;
+        ++active_catalogd_version_;
         return true;
       }
     } else {
@@ -85,7 +85,7 @@ bool StatestoreCatalogdMgr::RegisterCatalogd(bool is_reregistering,
         LOG(INFO) << active_catalogd_subscriber_id_
                   << " is re-registered after HA preemption waiting period and "
                   << "is assigned as active catalogd.";
-        ++sending_sequence_;
+        ++active_catalogd_version_;
         return true;
       }
     }
@@ -102,6 +102,9 @@ bool StatestoreCatalogdMgr::RegisterCatalogd(bool is_reregistering,
     bool is_waiting_period_expired = false;
     if (first_catalogd_register_time_ == 0) {
       first_catalogd_register_time_ = MonotonicMillis();
+      if (FLAGS_catalogd_ha_preemption_wait_period_ms == 0) {
+        is_waiting_period_expired = true;
+      }
     } else if (MonotonicMillis() - first_catalogd_register_time_ >=
         FLAGS_catalogd_ha_preemption_wait_period_ms) {
       is_waiting_period_expired = true;
@@ -112,7 +115,7 @@ bool StatestoreCatalogdMgr::RegisterCatalogd(bool is_reregistering,
       is_active_catalogd_assigned_ = true;
       COPY_CATALOGD_REGISTRATION_FROM_LOCAL_VARIABLES(active);
       LOG(INFO) << active_catalogd_subscriber_id_ << " is assigned as active catalogd.";
-      ++sending_sequence_;
+      ++active_catalogd_version_;
       return true;
     }
     // Wait second catalogd to be registered.
@@ -120,7 +123,8 @@ bool StatestoreCatalogdMgr::RegisterCatalogd(bool is_reregistering,
             << "period.";
   } else {
     num_registered_catalogd_++;
-    DCHECK(num_registered_catalogd_ == 2);
+    DCHECK(num_registered_catalogd_ == 2)
+        << "No more than 2 CatalogD registrations are allowed!";
     if (catalogd_registration.force_catalogd_active) {
       // Force to set the current one as active catalogd
       if (is_active_catalogd_assigned_) {
@@ -133,7 +137,7 @@ bool StatestoreCatalogdMgr::RegisterCatalogd(bool is_reregistering,
       LOG(INFO) << active_catalogd_subscriber_id_
                 << " is registered with FLAGS_force_catalogd_active and is assigned as "
                 << "active catalogd.";
-      ++sending_sequence_;
+      ++active_catalogd_version_;
       return true;
     } else if (is_active_catalogd_assigned_) {
       // Existing one is already assigned as active catalogd.
@@ -154,7 +158,7 @@ bool StatestoreCatalogdMgr::RegisterCatalogd(bool is_reregistering,
       }
       LOG(INFO) << active_catalogd_subscriber_id_
                 << " has higher priority and is assigned as active catalogd.";
-      ++sending_sequence_;
+      ++active_catalogd_version_;
       return true;
     }
   }
@@ -177,7 +181,7 @@ bool StatestoreCatalogdMgr::CheckActiveCatalog() {
   COPY_CATALOGD_REGISTRATION_FROM_MEMBER_VARIABLES(active, first);
   LOG(INFO) << active_catalogd_subscriber_id_
             << " is assigned as active catalogd after preemption waiting period.";
-  ++sending_sequence_;
+  ++active_catalogd_version_;
   return true;
 }
 
@@ -193,7 +197,7 @@ bool StatestoreCatalogdMgr::UnregisterCatalogd(
       COPY_CATALOGD_REGISTRATION_FROM_MEMBER_VARIABLES(active, standby);
       RESET_CATALOGD_REGISTRATION_MEMBER_VARIABLES(standby);
       LOG(INFO) << "Fail over active catalogd to " << active_catalogd_subscriber_id_;
-      ++sending_sequence_;
+      ++active_catalogd_version_;
       return true;
     } else {
       is_active_catalogd_assigned_ = false;
@@ -215,10 +219,10 @@ bool StatestoreCatalogdMgr::UnregisterCatalogd(
 }
 
 const TCatalogRegistration& StatestoreCatalogdMgr::GetActiveCatalogRegistration(
-    bool* has_active_catalogd, int64* sending_sequence) {
+    bool* has_active_catalogd, int64_t* active_catalogd_version) {
   std::lock_guard<std::mutex> l(catalog_mgr_lock_);
   *has_active_catalogd = is_active_catalogd_assigned_;
-  *sending_sequence = sending_sequence_;
+  *active_catalogd_version = active_catalogd_version_;
   return active_catalogd_registration_;
 }
 
diff --git a/be/src/statestore/statestore-catalogd-mgr.h b/be/src/statestore/statestore-catalogd-mgr.h
index 55bd3b205..2fb7140ce 100644
--- a/be/src/statestore/statestore-catalogd-mgr.h
+++ b/be/src/statestore/statestore-catalogd-mgr.h
@@ -45,7 +45,7 @@ class StatestoreCatalogdMgr {
       is_active_catalogd_assigned_(false),
       num_registered_catalogd_(0),
       first_catalogd_register_time_(0),
-      sending_sequence_(0) {}
+      active_catalogd_version_(0L) {}
 
   /// Register one catalogd.
   /// Return true if new active catalogd is designated during this registration.
@@ -67,7 +67,7 @@ class StatestoreCatalogdMgr {
   /// Return the protocol version of catalog service and address of active catalogd.
   /// Set *has_active_catalogd as false if the active one is not designated yet.
   const TCatalogRegistration& GetActiveCatalogRegistration(
-      bool* has_active_catalogd, int64* sending_sequence);
+      bool* has_active_catalogd, int64_t* active_catalogd_version);
 
   /// Return the subscriber-id of active catalogd.
   /// This function should be called after the active catalogd is designated.
@@ -116,9 +116,9 @@ class StatestoreCatalogdMgr {
   /// Additional registration info of standby catalogd
   TCatalogRegistration standby_catalogd_registration_;
 
-  /// Monotonically increasing sending sequence number. The value is increased when
-  /// a new active catalogd is designated.
-  int64 sending_sequence_;
+  /// Monotonically increasing version number. The value is increased when a new active
+  /// catalogd is designated.
+  int64_t active_catalogd_version_;
 };
 
 } // namespace impala
diff --git a/be/src/statestore/statestore-subscriber-catalog.cc b/be/src/statestore/statestore-subscriber-catalog.cc
index 3f90bce20..25ed30fb6 100644
--- a/be/src/statestore/statestore-subscriber-catalog.cc
+++ b/be/src/statestore/statestore-subscriber-catalog.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "common/logging.h"
 #include "statestore/statestore-subscriber-catalog.h"
 
 using namespace impala;
@@ -36,3 +37,32 @@ StatestoreSubscriberCatalog::StatestoreSubscriberCatalog(
   catalogd_registration_.__set_enable_catalogd_ha(FLAGS_enable_catalogd_ha);
   catalogd_registration_.__set_force_catalogd_active(FLAGS_force_catalogd_active);
 }
+
+bool ActiveCatalogdVersionChecker::CheckActiveCatalogdVersion(
+    bool is_registration_reply, int64 active_catalogd_version) {
+  if (is_registration_reply) {
+    last_update_for_registration_ = true;
+    if (active_catalogd_version < 0) {
+      // Reset the version of last received active catalogd.
+      last_active_catalogd_version_ = 0;
+      return false;
+    }
+  } else {
+    if (last_active_catalogd_version_ >= active_catalogd_version) {
+      if (last_update_for_registration_
+          && last_active_catalogd_version_ == active_catalogd_version) {
+        VLOG(3) << "Duplicated update of active catalogd";
+      } else {
+        LOG(INFO) << "Ignore the update of active catalogd since more recent update has "
+                  << "been processed (" << last_active_catalogd_version_ << " vs "
+                  << active_catalogd_version << ")";
+      }
+      last_update_for_registration_ = false;
+      return false;
+    }
+    last_update_for_registration_ = false;
+  }
+  DCHECK(active_catalogd_version >= 0);
+  last_active_catalogd_version_ = active_catalogd_version;
+  return true;
+}
diff --git a/be/src/statestore/statestore-subscriber-catalog.h b/be/src/statestore/statestore-subscriber-catalog.h
index f6657f7f7..867f8e1ea 100644
--- a/be/src/statestore/statestore-subscriber-catalog.h
+++ b/be/src/statestore/statestore-subscriber-catalog.h
@@ -52,4 +52,27 @@ class StatestoreSubscriberCatalog : public StatestoreSubscriber {
   TCatalogRegistration catalogd_registration_;
 };
 
+/// ActiveCatalogdVersionChecker:
+/// Tracks the version of received active catalogd.
+class ActiveCatalogdVersionChecker {
+ public:
+  ActiveCatalogdVersionChecker()
+    : last_update_for_registration_(false),
+      last_active_catalogd_version_(0L) {}
+
+  /// Returns true if the given active_catalogd_version is newer than the last received
+  /// version.
+  /// This function is not thread-safe. It must be protected by the caller.
+  bool CheckActiveCatalogdVersion(
+      bool is_registration_reply, int64 active_catalogd_version);
+
+ private:
+  /// True if the last update of active catalogd was processed for event of receiving
+  /// registration reply.
+  bool last_update_for_registration_;
+
+  /// Version of last received active catalogd.
+  int64_t last_active_catalogd_version_;
+};
+
 }
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index 480966489..f0f9f3ce5 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -145,10 +145,13 @@ class StatestoreSubscriberThriftIf : public StatestoreSubscriberIf {
 
   virtual void UpdateCatalogd(TUpdateCatalogdResponse& response,
       const TUpdateCatalogdRequest& request) {
+    bool update_skipped = false;
     Status status = CheckProtocolVersion(request.protocol_version);
     if (status.ok()) {
       subscriber_->UpdateCatalogd(request.catalogd_registration,
-          request.registration_id, request.statestore_id, request.sequence);
+          request.registration_id, request.statestore_id, request.catalogd_version,
+          &update_skipped);
+      response.__set_skipped(update_skipped);
     }
     TStatus thrift_status;
     status.ToThrift(&thrift_status);
@@ -210,8 +213,7 @@ bool StatestoreSubscriber::IsRegistered() const {
   return statestore_->IsRegistered();
 }
 
-Status StatestoreSubscriber::Start(bool* has_active_catalogd,
-    TCatalogRegistration* active_catalogd_registration) {
+Status StatestoreSubscriber::Start() {
   // Backend must be started before registration
   std::shared_ptr<TProcessor> processor(
       new StatestoreSubscriberProcessor(thrift_iface_));
@@ -240,7 +242,7 @@ Status StatestoreSubscriber::Start(bool* has_active_catalogd,
   // Specify the port which the heartbeat server is listening on.
   heartbeat_address_.port = heartbeat_server_->port();
 
-  return statestore_->Start(has_active_catalogd, active_catalogd_registration);
+  return statestore_->Start();
 }
 
 /// Set Register Request
@@ -275,13 +277,18 @@ void StatestoreSubscriber::Heartbeat(
 
 void StatestoreSubscriber::UpdateCatalogd(
     const TCatalogRegistration& catalogd_registration,
-    const RegistrationId& registration_id,
-    const TUniqueId& statestore_id, int64 sequence) {
+    const RegistrationId& registration_id, const TUniqueId& statestore_id,
+    int64_t active_catalogd_version, bool* update_skipped) {
   if (statestore_->IsMatchingStatestoreId(statestore_id)) {
-    statestore_->UpdateCatalogd(catalogd_registration, registration_id, sequence);
+    statestore_->UpdateCatalogd(
+        catalogd_registration, registration_id, active_catalogd_version, update_skipped);
   } else {
-    VLOG(3) << "Ignore updating catalogd message from unknown statestored: "
-            << statestore_id;
+    // It's possible the catalogd update RPC is received before the registration response
+    // is received. Skip this update so that the statestore will retry this update in
+    // the future.
+    *update_skipped = true;
+    LOG(INFO) << "Skipped updating catalogd message from unknown statestored: "
+              << statestore_id;
   }
 }
 
@@ -292,9 +299,13 @@ Status StatestoreSubscriber::UpdateState(const TopicDeltaMap& incoming_topic_del
     return statestore_->UpdateState(
         incoming_topic_deltas, registration_id, subscriber_topic_updates, skipped);
   } else {
-    VLOG(3) << "Ignore topic update message from unknown statestored: " << statestore_id;
+    // It's possible the topic update is received before the registration response is
+    // received. Skip this update so that the statestore will retry this update in the
+    // future.
+    *skipped = true;
+    VLOG(3) << "Skipped topic update message from unknown statestored: " << statestore_id;
+    return Status::OK();
   }
-  return Status::OK();
 }
 
 StatestoreSubscriber::StatestoreStub::StatestoreStub(StatestoreSubscriber* subscriber,
@@ -365,6 +376,7 @@ void StatestoreSubscriber::StatestoreStub::AddCompleteRegistrationTopic(
 }
 
 Status StatestoreSubscriber::StatestoreStub::Register(bool* has_active_catalogd,
+    int64_t* active_catalogd_version,
     TCatalogRegistration* active_catalogd_registration) {
   // Check protocol version of the statestore first.
   TGetProtocolVersionRequest get_protocol_request;
@@ -410,6 +422,13 @@ Status StatestoreSubscriber::StatestoreStub::Register(bool* has_active_catalogd,
     request.topic_registrations.push_back(thrift_topic);
   }
 
+  {
+    // Reset registration_id_ and statestore_id_ before registering with statestore
+    // so that RPC messages for previous registration are not accepted.
+    lock_guard<mutex> l(id_lock_);
+    registration_id_ = TUniqueId();
+    statestore_id_ = TUniqueId();
+  }
   RETURN_IF_ERROR(subscriber_->SetRegisterRequest(&request));
   TRegisterSubscriberResponse response;
   attempt = 0; // Used for debug action only.
@@ -461,22 +480,24 @@ Status StatestoreSubscriber::StatestoreStub::Register(bool* has_active_catalogd,
       VLOG(1) << "Active catalogd address: "
               << TNetworkAddressToString(response.catalogd_registration.address);
       if (has_active_catalogd != nullptr) *has_active_catalogd = true;
+      if (active_catalogd_version != nullptr && response.__isset.catalogd_version) {
+        *active_catalogd_version = response.catalogd_version;
+      }
       if (active_catalogd_registration != nullptr) {
         *active_catalogd_registration = response.catalogd_registration;
       }
     }
-    // Reset last received sequence number of update_catalogd RPC since statestore
-    // could be restarted.
-    last_update_catalogd_seq_ = 0;
   }
   heartbeat_interval_timer_.Start();
   return status;
 }
 
-Status StatestoreSubscriber::StatestoreStub::Start(bool* has_active_catalogd,
-    TCatalogRegistration* active_catalogd_registration) {
+Status StatestoreSubscriber::StatestoreStub::Start() {
   Status status;
   {
+    bool has_active_catalogd = false;
+    int64_t active_catalogd_version = 0;
+    TCatalogRegistration active_catalogd_registration;
     // Take the lock to ensure that, if a topic-update is received during registration
     // (perhaps because Register() has succeeded, but we haven't finished setting up
     // state on the client side), UpdateState() will reject the message.
@@ -485,11 +506,18 @@ Status StatestoreSubscriber::StatestoreStub::Start(bool* has_active_catalogd,
     // Inject failure before registering to statestore.
     status = DebugAction(FLAGS_debug_actions, "REGISTER_STATESTORE_ON_STARTUP");
     if (status.ok()) {
-      status = Register(has_active_catalogd, active_catalogd_registration);
+      status = Register(
+          &has_active_catalogd, &active_catalogd_version, &active_catalogd_registration);
     }
     if (status.ok()) {
       is_registered_ = true;
       LOG(INFO) << "statestore registration successful on startup";
+      if (has_active_catalogd) {
+        DCHECK(active_catalogd_version >= 0);
+        for (const UpdateCatalogdCallback& callback : update_catalogd_callbacks_) {
+          callback(true, active_catalogd_version, active_catalogd_registration);
+        }
+      }
     } else {
       LOG(INFO) << "statestore registration unsuccessful on startup: "
                 << status.GetDetail();
@@ -539,13 +567,14 @@ void StatestoreSubscriber::StatestoreStub::RecoveryModeChecker() {
                 << ": Connection with statestore lost, entering recovery mode";
       uint32_t attempt_count = 1;
       bool has_active_catalogd = false;
+      int64_t active_catalogd_version = 0;
       TCatalogRegistration active_catalogd_registration;
       while (true) {
         LOG(INFO) << "Trying to re-register with statestore, attempt: "
                   << attempt_count++;
         re_registr_attempt_metric_->Increment(1);
-        Status status =
-            Register(&has_active_catalogd, &active_catalogd_registration);
+        Status status = Register(&has_active_catalogd, &active_catalogd_version,
+            &active_catalogd_registration);
         if (status.ok()) {
           if (!is_registered_) {
             is_registered_ = true;
@@ -559,11 +588,16 @@ void StatestoreSubscriber::StatestoreStub::RecoveryModeChecker() {
           // the next loop while we're waiting for heartbeat messages to resume.
           failure_detector_->UpdateHeartbeat(STATESTORE_ID, true);
           LOG(INFO) << "Reconnected to statestore. Exiting recovery mode";
-
-          if (has_active_catalogd) {
-            for (const UpdateCatalogdCallback& callback : update_catalogd_callbacks_) {
-              callback(active_catalogd_registration);
-            }
+          if (!has_active_catalogd) {
+            // Need to reset version of last received active catalogd for new
+            // registration. Note that active_catalogd_registration is invalid when
+            // active_catalogd_version is negative.
+            active_catalogd_version = -1;
+          } else {
+            DCHECK(active_catalogd_version >= 0);
+          }
+          for (const UpdateCatalogdCallback& callback : update_catalogd_callbacks_) {
+            callback(true, active_catalogd_version, active_catalogd_registration);
           }
           // Break out of enclosing while (true) to top of outer-scope loop.
           break;
@@ -604,21 +638,6 @@ Status StatestoreSubscriber::StatestoreStub::CheckRegistrationId(
   return Status::OK();
 }
 
-Status StatestoreSubscriber::StatestoreStub::CheckRegistrationIdAndUpdateCatalogdSeq(
-    const RegistrationId& registration_id, int64 sequence){
-  lock_guard<mutex> r(id_lock_);
-  if (registration_id_ != TUniqueId() &&
-      (registration_id != registration_id_ || sequence <= last_update_catalogd_seq_)) {
-    return Status(Substitute("Unexpected registration ID: $0, was expecting $1 "
-        "or unexpected sequence number: $2, was expecting greater than $3",
-            PrintId(registration_id), PrintId(registration_id_), sequence,
-            last_update_catalogd_seq_));
-  }
-  last_update_catalogd_seq_ = sequence;
-  return Status::OK();
-}
-
-
 bool StatestoreSubscriber::StatestoreStub::IsMatchingStatestoreId(
     const TUniqueId statestore_id) {
   lock_guard<mutex> r(id_lock_);
@@ -643,14 +662,27 @@ void StatestoreSubscriber::StatestoreStub::Heartbeat(
 
 void StatestoreSubscriber::StatestoreStub::UpdateCatalogd(
     const TCatalogRegistration& catalogd_registration,
-    const RegistrationId& registration_id, int64 sequence) {
-  update_catalogd_rpc_metric_->Increment(1);
-  const Status& status =
-      CheckRegistrationIdAndUpdateCatalogdSeq(registration_id, sequence);
+    const RegistrationId& registration_id, int64_t active_catalogd_version,
+    bool* update_skipped) {
+  const Status& status = CheckRegistrationId(registration_id);
   if (status.ok()) {
+    // Try to acquire lock to avoid race with updating catalogd from registration thread.
+    shared_lock<shared_mutex> l(lock_, boost::try_to_lock);
+    if (!l.owns_lock()) {
+      LOG(INFO) << "Unable to acquire the lock, skip UpdateCatalogd RPC notification.";
+      *update_skipped = true;
+      return;
+    }
+    update_catalogd_rpc_metric_->Increment(1);
+    DCHECK(active_catalogd_version >= 0);
     for (const UpdateCatalogdCallback& callback : update_catalogd_callbacks_) {
-      callback(catalogd_registration);
+      callback(false, active_catalogd_version, catalogd_registration);
     }
+  } else {
+    // It's possible the registration is not completed.
+    LOG(INFO) << "Skip UpdateCatalogd RPC notification due to unknown registration_id. "
+              << "It's likely the registration reply is not received.";
+    *update_skipped = true;
   }
 }
 
diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h
index e89bc5e78..eea999d3b 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -112,9 +112,24 @@ class StatestoreSubscriber {
       const UpdateCallback& callback);
 
   /// UpdateCatalogdCallback is invoked every time that a notification of updating
-  /// catalogd is received from the statestore.
-  typedef boost::function<void (const TCatalogRegistration& catalogd_registration)>
-      UpdateCatalogdCallback;
+  /// catalogd is received from the statestore. This callback function could be called
+  /// for two events: receive registration RPC reply, receive UpdateCatalogD RPC request.
+  ///   is_registration_reply: set it as true when the function is called for the event
+  ///                          of registration reply.
+  ///   active_catalogd_version: the version of active catalogd.
+  ///   catalogd_registration: registration info of active catalogd, including its
+  ///                          address and port.
+  ///
+  /// When receiving registration RPC reply for the attempt of re-registration, it's
+  /// possible the active catalogd has NOT been elected by statestore yet. In this case,
+  /// we still need to call this function so that coordinators and catalogds could reset
+  /// their 'last_active_catalogd_version_'. Otherwise, the catalogd updates may be
+  /// ignored by coordinators and catalogds after statestore is restarted. In this case,
+  /// active_catalogd_version is set as invalid value -1. catalogd_registration has
+  /// invalid value and should not be used by callee.
+  typedef boost::function<void (
+      bool is_registration_reply, int64_t active_catalogd_version,
+      const TCatalogRegistration& catalogd_registration)> UpdateCatalogdCallback;
 
   /// Adds a callback for notification of updating catalogd.
   void AddUpdateCatalogdTopic(const UpdateCatalogdCallback& callback);
@@ -129,14 +144,9 @@ class StatestoreSubscriber {
   /// Registers this subscriber with the statestore, and starts the
   /// heartbeat service, as well as a thread to check for failure and
   /// initiate recovery mode.
-  /// has_active_catalogd - return true if the active catalogd is designated by
-  ///                       statestore.
-  /// active_catalogd_registration - return the address and protocol of the
-  ///                                active catalogd.
   ///
   /// Returns OK unless some error occurred, like a failure to connect.
-  virtual Status Start(bool* has_active_catalogd = nullptr,
-      TCatalogRegistration* active_catalogd_registration = nullptr);
+  virtual Status Start();
 
   /// Set Register Request
   virtual Status SetRegisterRequest(TRegisterSubscriberRequest* request);
@@ -241,8 +251,7 @@ class StatestoreSubscriber {
         const TNetworkAddress& statestore_address, MetricGroup* metrics);
 
     /// Returns OK unless some error occurred, like a failure to connect.
-    Status Start(bool* has_active_catalogd,
-        TCatalogRegistration* active_catalogd_registration);
+    Status Start();
 
     /// Adds a topic to the set of topics that updates will be received
     /// for. When a topic update is received, the supplied UpdateCallback
@@ -292,7 +301,8 @@ class StatestoreSubscriber {
 
     /// Called when the catalogd has been updated.
     void UpdateCatalogd(const TCatalogRegistration& catalogd_registration,
-        const RegistrationId& registration_id, int64 sequence);
+        const RegistrationId& registration_id, int64_t active_catalogd_version,
+        bool* update_skipped);
 
     /// Run in a separate thread. In a loop, check failure_detector_ to see if the
     /// statestore is still sending heartbeat messages. If not, enter 'recovery mode'
@@ -308,7 +318,7 @@ class StatestoreSubscriber {
     /// Creates a client of the remote statestore and sends a list of
     /// topics to register for. Returns OK unless there is some problem
     /// connecting, or the statestore reports an error.
-    Status Register(bool* has_active_catalogd,
+    Status Register(bool* has_active_catalogd, int64_t* active_catalogd_version,
         TCatalogRegistration* active_catalogd_registration);
 
     /// Returns OK if registration_id == registration_id_, or if registration_id_ is not
@@ -316,11 +326,6 @@ class StatestoreSubscriber {
     /// intended for the current registration epoch.
     Status CheckRegistrationId(const RegistrationId& registration_id);
 
-    // Returns OK if registration_id == registration_id_ and
-    // sequence == last_update_catalogd_seq_.
-    Status CheckRegistrationIdAndUpdateCatalogdSeq(const RegistrationId& registration_id,
-        int64 sequence);
-
     /// Check if the given statestore ID is matching with the statestore ID of this
     /// statestore.
     bool IsMatchingStatestoreId(const TUniqueId statestore_id);
@@ -437,9 +442,6 @@ class StatestoreSubscriber {
     /// from the statestore.
     TUniqueId statestore_id_;
 
-    // Last sequence number for notification of changing active CatalogD.
-    int64 last_update_catalogd_seq_ = 0;
-
     /// Monotonic timestamp of the last successful registration.
     AtomicInt64 last_registration_ms_{0};
   };
@@ -459,7 +461,7 @@ class StatestoreSubscriber {
   /// Called when the catalogd has been updated.
   void UpdateCatalogd(const TCatalogRegistration& catalogd_registration,
       const RegistrationId& registration_id, const TUniqueId& statestore_id,
-      int64 sequence);
+      int64_t active_catalogd_version, bool* update_skipped);
 
   /// Subscriber thrift implementation, needs to access UpdateState
   friend class StatestoreSubscriberThriftIf;
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 0ec515f73..12e545412 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -223,17 +223,19 @@ class StatestoreThriftIf : public StatestoreServiceIf {
 
     RegistrationId registration_id;
     bool has_active_catalogd;
+    int64_t active_catalogd_version;
     TCatalogRegistration active_catalogd_registration;
     Status status = statestore_->RegisterSubscriber(params.subscriber_id,
         params.subscriber_location, params.topic_registrations, subscriber_type,
         subscribe_catalogd_change, catalogd_registration, &registration_id,
-        &has_active_catalogd, &active_catalogd_registration);
+        &has_active_catalogd, &active_catalogd_version, &active_catalogd_registration);
     status.ToThrift(&response.status);
     response.__set_registration_id(registration_id);
     response.__set_statestore_id(statestore_->GetStateStoreId());
     response.__set_protocol_version(statestore_->GetProtocolVersion());
     if (has_active_catalogd) {
       response.__set_catalogd_registration(active_catalogd_registration);
+      response.__set_catalogd_version(active_catalogd_version);
     }
   }
 
@@ -733,6 +735,7 @@ Status Statestore::RegisterSubscriber(const SubscriberId& subscriber_id,
     const TCatalogRegistration& catalogd_registration,
     RegistrationId* registration_id,
     bool* has_active_catalogd,
+    int64_t* active_catalogd_version,
     TCatalogRegistration* active_catalogd_registration) {
   bool is_catalogd = subscriber_type == TStatestoreSubscriberType::CATALOGD;
   if (subscriber_id.empty()) {
@@ -803,10 +806,9 @@ Status Statestore::RegisterSubscriber(const SubscriberId& subscriber_id,
     RETURN_IF_ERROR(OfferUpdate(update, &subscriber_topic_update_threadpool_));
     RETURN_IF_ERROR(OfferUpdate(update, &subscriber_priority_topic_update_threadpool_));
     RETURN_IF_ERROR(OfferUpdate(update, &subscriber_heartbeat_threadpool_));
-    int64 sending_sequence;
     *active_catalogd_registration =
         catalog_manager_.GetActiveCatalogRegistration(
-            has_active_catalogd, &sending_sequence);
+            has_active_catalogd, active_catalogd_version);
   }
 
   return Status::OK();
@@ -1221,7 +1223,7 @@ void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
 }
 
 [[noreturn]] void Statestore::MonitorUpdateCatalogd() {
-  int64 last_sending_sequence = 0;
+  int64_t last_active_catalogd_version = 0;
   // rpc_receivers is used to track subscribers to which statestore need to send RPCs
   // when there is a change in the elected active catalogd. It is updated from
   // subscribers_, and the subscribers will be removed from this list if the RPCs are
@@ -1234,7 +1236,7 @@ void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
     unique_lock<mutex> l(*catalog_manager_.GetLock());
     update_catalod_cv_.WaitFor(l, timeout_us);
   }
-  SendUpdateCatalogdNotification(&last_sending_sequence, rpc_receivers);
+  SendUpdateCatalogdNotification(&last_active_catalogd_version, rpc_receivers);
 
   // Wait for notification. If new leader is elected due to catalogd is registered or
   // unregistered, send notification to all coordinators and catalogds.
@@ -1248,26 +1250,27 @@ void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
         update_catalod_cv_.WaitFor(l, timeout_us);
       }
     }
-    SendUpdateCatalogdNotification(&last_sending_sequence, rpc_receivers);
+    SendUpdateCatalogdNotification(&last_active_catalogd_version, rpc_receivers);
   }
 }
 
-void Statestore::SendUpdateCatalogdNotification(int64* last_sending_sequence,
+void Statestore::SendUpdateCatalogdNotification(int64_t* last_active_catalogd_version,
     vector<std::shared_ptr<Subscriber>>& rpc_receivers) {
   bool has_active_catalogd;
-  int64 sending_sequence = 0;
+  int64_t active_catalogd_version = 0;
   TCatalogRegistration catalogd_registration =
       catalog_manager_.GetActiveCatalogRegistration(
-          &has_active_catalogd, &sending_sequence);
+          &has_active_catalogd, &active_catalogd_version);
   if (!has_active_catalogd ||
-      (sending_sequence == *last_sending_sequence && rpc_receivers.empty())) {
+      (active_catalogd_version == *last_active_catalogd_version
+          && rpc_receivers.empty())) {
     // Don't resend RPCs if there is no change in Active Catalogd and no RPC failure in
     // last round.
     return;
   }
 
   bool resend_rpc = false;
-  if (sending_sequence > *last_sending_sequence) {
+  if (active_catalogd_version > *last_active_catalogd_version) {
     // Send notification for the latest elected active catalogd.
     active_catalogd_address_metric_->SetValue(
         TNetworkAddressToString(catalogd_registration.address));
@@ -1280,7 +1283,7 @@ void Statestore::SendUpdateCatalogdNotification(int64* last_sending_sequence,
         }
       }
     }
-    *last_sending_sequence = sending_sequence;
+    *last_active_catalogd_version = active_catalogd_version;
   } else {
     DCHECK(!rpc_receivers.empty());
     lock_guard<mutex> l(subscribers_lock_);
@@ -1301,6 +1304,7 @@ void Statestore::SendUpdateCatalogdNotification(int64* last_sending_sequence,
   for (std::vector<std::shared_ptr<Subscriber>>::iterator it = rpc_receivers.begin();
        it != rpc_receivers.end();) {
     std::shared_ptr<Subscriber> subscriber = *it;
+    bool update_skipped = false;
     Status status;
     if (!resend_rpc) {
       status = DebugAction(
@@ -1314,7 +1318,7 @@ void Statestore::SendUpdateCatalogdNotification(int64* last_sending_sequence,
         TUpdateCatalogdResponse response;
         request.__set_registration_id(subscriber->registration_id());
         request.__set_statestore_id(statestore_id_);
-        request.__set_sequence(sending_sequence);
+        request.__set_catalogd_version(active_catalogd_version);
         request.__set_catalogd_registration(catalogd_registration);
         status = client.DoRpc(
             &StatestoreSubscriberClientWrapper::UpdateCatalogd, request, &response);
@@ -1325,14 +1329,23 @@ void Statestore::SendUpdateCatalogdNotification(int64* last_sending_sequence,
                 "Subscriber $0 timed-out during update catalogd RPC. Timeout is $1s.",
                 subscriber->id(), FLAGS_statestore_update_catalogd_tcp_timeout_seconds));
           }
+        } else {
+          update_skipped = (response.__isset.skipped && response.skipped);
         }
       }
     }
     if (status.ok()) {
-      successful_update_catalogd_rpc_metric_->Increment(1);
-      // Remove the subscriber from the receiver list so that Statestore will not resend
-      // RPC to it in next round.
-      it = rpc_receivers.erase(it);
+      if (update_skipped) {
+        // The subscriber skipped processing this update. It's not considered as a failure
+        // since subscribers can decide what they do with any update. The subscriber is
+        // left in the receiver list so that RPC will be resent to it in next round.
+        ++it;
+      } else {
+        successful_update_catalogd_rpc_metric_->Increment(1);
+        // Remove the subscriber from the receiver list so that Statestore will not resend
+        // RPC to it in next round.
+        it = rpc_receivers.erase(it);
+      }
     } else {
       LOG(ERROR) << "Couldn't send UpdateCatalogd RPC,  " << status.GetDetail();
       failed_update_catalogd_rpc_metric_->Increment(1);
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 3696f3459..62b24384a 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -165,6 +165,7 @@ class Statestore : public CacheLineAligned {
       const TCatalogRegistration& catalogd_registration,
       RegistrationId* registration_id,
       bool* has_active_catalogd,
+      int64_t* active_catalogd_version,
       TCatalogRegistration* active_catalogd_registration);
 
   /// Registers webpages for the input webserver. If metrics_only is set then only
@@ -800,7 +801,7 @@ class Statestore : public CacheLineAligned {
   [[noreturn]] void MonitorUpdateCatalogd();
 
   /// Send notification of updating catalogd to the coordinators.
-  void SendUpdateCatalogdNotification(int64* last_sending_sequence,
+  void SendUpdateCatalogdNotification(int64_t* last_active_catalogd_version,
       vector<std::shared_ptr<Subscriber>>& receivers);
 
   /// Raw callback to indicate whether the service is ready.
diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift
index 8c2294082..d752ac099 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -254,6 +254,9 @@ struct TRegisterSubscriberResponse {
 
   // Catalog registration info.
   5: optional TCatalogRegistration catalogd_registration;
+
+  // The version of active catalogd
+  6: optional i64 catalogd_version;
 }
 
 struct TGetProtocolVersionRequest {
@@ -338,8 +341,8 @@ struct TUpdateCatalogdRequest {
   // Unique identifier for the statestore instance.
   3: required Types.TUniqueId statestore_id;
 
-  // Monotonously increasing number
-  4: required i64 sequence;
+  // The version of active catalogd
+  4: required i64 catalogd_version;
 
   // Catalog registration info.
   5: required TCatalogRegistration catalogd_registration;
@@ -348,6 +351,11 @@ struct TUpdateCatalogdRequest {
 struct TUpdateCatalogdResponse {
   // Whether the call was executed correctly at the application level
   1: required Status.TStatus status;
+
+  // True if this update was skipped by the subscriber. This is distinguished from a
+  // non-OK status since the former indicates an error which contributes to the
+  // statestore's view of a subscriber's liveness.
+  2: optional bool skipped;
 }
 
 service StatestoreSubscriber {
diff --git a/tests/custom_cluster/test_catalogd_ha.py b/tests/custom_cluster/test_catalogd_ha.py
index 97b666f14..dcdcf744a 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -353,6 +353,37 @@ class TestCatalogdHA(CustomClusterTestSuite):
     # Verify simple queries are ran successfully.
     self.__run_simple_queries()
 
-    unexpected_msg = re.compile(
-        "unexpected sequence number: [0-9]+, was expecting greater than [0-9]+")
+    unexpected_msg = re.compile("Ignore the update of active catalogd since more recent "
+        "update has been processed ([0-9]+ vs [0-9]+)")
     self.assert_catalogd_log_contains("INFO", unexpected_msg, expected_count=0)
+    self.assert_impalad_log_contains("INFO", unexpected_msg, expected_count=0)
+
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--force_catalogd_active=true",
+    start_args="--enable_catalogd_ha")
+  def test_two_catalogd_with_force_active(self):
+    """The test case for cluster started with catalogd HA enabled and
+    both catalogds started with 'force_catalogd_active' as true.
+    Verify that one and only one catalogd is active."""
+    catalogds = self.cluster.catalogds()
+    assert(len(catalogds) == 2)
+    catalogd_service_1 = catalogds[0].service
+    catalogd_service_2 = catalogds[1].service
+    assert(catalogd_service_1.get_metric_value("catalog-server.active-status")
+        != catalogd_service_2.get_metric_value("catalog-server.active-status"))
+
+    # Verify ports of the active catalogd of statestore and impalad are matching with
+    # the catalog service port of the current active catalogd.
+    if catalogd_service_1.get_metric_value("catalog-server.active-status"):
+      self.__verify_statestore_active_catalogd_port(catalogd_service_1)
+      self.__verify_impalad_active_catalogd_port(0, catalogd_service_1)
+      self.__verify_impalad_active_catalogd_port(1, catalogd_service_1)
+      self.__verify_impalad_active_catalogd_port(2, catalogd_service_1)
+    else:
+      self.__verify_statestore_active_catalogd_port(catalogd_service_2)
+      self.__verify_impalad_active_catalogd_port(0, catalogd_service_2)
+      self.__verify_impalad_active_catalogd_port(1, catalogd_service_2)
+      self.__verify_impalad_active_catalogd_port(2, catalogd_service_2)
+
+    # Verify simple queries are ran successfully.
+    self.__run_simple_queries()