You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2023/07/21 08:22:38 UTC

[impala] 03/03: IMPALA-12286: Make CatalogD HA robust

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ae95c43eda016e3380f395a235b373cec1337522
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Mon Jul 17 22:36:57 2023 -0700

    IMPALA-12286: Make CatalogD HA robust
    
    IMPALA-12155 added support for CatalogD HA. The statestore assigns the
    roles for the catalogd in the HA pair. When an active catalogd is
    elected, statestore sends RPCs to coordinators and catalogds to notify
    the new active catalogd. But the RPCs could fail due to network
    conditions. To avoid missing the notification of new active catalogd,
    this patch makes statestore to resend RPCs to subscribers if there are
    RPC failures.
    
    This patch also rename the metric "catalog-server.ha-active-status" to
    "catalog-server.active-status" since this metric is also set for
    catalogd when CatalogD HA is not enabled.
    
    As discussed in IMPALA-12267, catalogd need to re-generate its Catalog
    Service ID when it becomes active.
    
    Testing:
     - Added unit-test cases for CatalogD HA with simulated RPC failures.
     - Passed core tests.
    
    Change-Id: Ibdfea022031c3cc1cbaf4ad52e947720a5d5630f
    Reviewed-on: http://gerrit.cloudera.org:8080/20220
    Reviewed-by: Andrew Sherman <as...@cloudera.com>
    Reviewed-by: Abhishek Rawat <ar...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
---
 be/src/catalog/catalog-server.cc                   |  10 +-
 be/src/catalog/catalog-server.h                    |   4 +-
 be/src/catalog/catalog.cc                          |   7 +
 be/src/catalog/catalog.h                           |   5 +
 be/src/common/global-flags.cc                      |   7 +-
 be/src/statestore/statestore-catalogd-mgr.cc       |  16 ++-
 be/src/statestore/statestore-catalogd-mgr.h        |   9 +-
 be/src/statestore/statestore-subscriber.cc         |   6 +-
 be/src/statestore/statestore-subscriber.h          |   2 +-
 be/src/statestore/statestore.cc                    | 150 +++++++++++++++------
 be/src/statestore/statestore.h                     |  11 +-
 common/thrift/metrics.json                         |  20 ++-
 .../java/org/apache/impala/service/JniCatalog.java |  21 ++-
 tests/custom_cluster/test_catalogd_ha.py           | 110 +++++++++++----
 tests/custom_cluster/test_custom_statestore.py     |  12 +-
 15 files changed, 283 insertions(+), 107 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 23a6fd012..6bc1369d4 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -158,7 +158,7 @@ const string CATALOG_SERVER_TOPIC_PROCESSING_TIMES =
     "catalog-server.topic-processing-time-s";
 const string CATALOG_SERVER_PARTIAL_FETCH_RPC_QUEUE_LEN =
     "catalog.partial-fetch-rpc.queue-len";
-const string CATALOG_HA_ACTIVE_STATUS = "catalog-server.ha-active-status";
+const string CATALOG_ACTIVE_STATUS = "catalog-server.active-status";
 const string CATALOG_HA_NUM_ACTIVE_STATUS_CHANGE =
     "catalog-server.ha-number-active-status-change";
 
@@ -350,8 +350,8 @@ CatalogServer::CatalogServer(MetricGroup* metrics)
       CATALOG_SERVER_TOPIC_PROCESSING_TIMES);
   partial_fetch_rpc_queue_len_metric_ =
       metrics->AddGauge(CATALOG_SERVER_PARTIAL_FETCH_RPC_QUEUE_LEN, 0);
-  ha_active_status_metric_ =
-      metrics->AddProperty(CATALOG_HA_ACTIVE_STATUS, !FLAGS_enable_catalogd_ha);
+  active_status_metric_ =
+      metrics->AddProperty(CATALOG_ACTIVE_STATUS, !FLAGS_enable_catalogd_ha);
   num_ha_active_status_change_metric_ =
       metrics->AddCounter(CATALOG_HA_NUM_ACTIVE_STATUS_CHANGE, 0);
   is_active_.Store(FLAGS_enable_catalogd_ha ? 0 : 1);
@@ -528,8 +528,10 @@ void CatalogServer::UpdateRegisteredCatalogd(
         // Signal the catalog update gathering thread to start.
         topic_updates_ready_ = false;
         catalog_update_cv_.NotifyOne();
+        // Regenerate Catalog Service ID.
+        catalog_->RegenerateServiceId();
       }
-      ha_active_status_metric_->SetValue(is_active);
+      active_status_metric_->SetValue(is_active);
     }
     num_ha_active_status_change_metric_->Increment(1);
     LOG(INFO) << "The role of catalogd instance is changed to "
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index ce1e7378f..c699e981d 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -150,8 +150,8 @@ class CatalogServer {
   /// Tracks the partial fetch RPC call queue length on the Catalog server.
   IntGauge* partial_fetch_rpc_queue_len_metric_;
 
-  /// Metric that tracks if this catalogd is active when catalogd HA is enabled.
-  BooleanProperty* ha_active_status_metric_;
+  /// Metric that tracks if this catalogd is active.
+  BooleanProperty* active_status_metric_;
 
   /// Metric to count the number of active status changes.
   IntCounter* num_ha_active_status_change_metric_;
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index 3553b8f29..dec23e053 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -69,6 +69,7 @@ Catalog::Catalog() {
     {"prioritizeLoad", "([B)V", &prioritize_load_id_},
     {"getPartitionStats", "([B)[B", &get_partition_stats_id_},
     {"updateTableUsage", "([B)V", &update_table_usage_id_},
+    {"regenerateServiceId", "()V", &regenerate_service_id_},
   };
 
   JNIEnv* jni_env = JniUtil::GetJNIEnv();
@@ -204,3 +205,9 @@ Status Catalog::GetPartitionStats(
 Status Catalog::UpdateTableUsage(const TUpdateTableUsageRequest& req) {
   return JniUtil::CallJniMethod(catalog_, update_table_usage_id_, req);
 }
+
+void Catalog::RegenerateServiceId() {
+  JNIEnv* jni_env = JniUtil::GetJNIEnv();
+  jni_env->CallVoidMethod(catalog_, regenerate_service_id_);
+  ABORT_IF_EXC(jni_env);
+}
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index 10c24109d..315f72463 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -137,6 +137,10 @@ class Catalog {
   /// report.
   Status UpdateTableUsage(const TUpdateTableUsageRequest& req);
 
+  /// Regenerate Catalog Service ID.
+  /// The function should be called when the CatalogD becomes active.
+  void RegenerateServiceId();
+
  private:
   /// Descriptor of Java Catalog class itself, used to create a new instance.
   jclass catalog_class_;
@@ -162,6 +166,7 @@ class Catalog {
   jmethodID prioritize_load_id_; // JniCatalog.prioritizeLoad()
   jmethodID catalog_ctor_;
   jmethodID update_table_usage_id_;
+  jmethodID regenerate_service_id_; // JniCatalog.regenerateServiceId()
 };
 
 }
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 0c8d99e23..09e981c96 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -401,9 +401,12 @@ DEFINE_bool(use_subscriber_id_as_catalogd_priority, false, "Subscriber-id is use
 DEFINE_int64(catalogd_ha_preemption_wait_period_ms, 10000, "(Advanced) The time after "
     "which statestore designates the first registered catalogd as active if statestore "
     "does not receive registration request from the second catalogd.");
-DEFINE_int64(active_catalogd_designation_monitoring_frequency_ms, 100, "(Advanced) "
-    "Frequency (in ms) with which the statestore monitors if active catalogd is "
+DEFINE_int64(active_catalogd_designation_monitoring_interval_ms, 100, "(Advanced) "
+    "Interval (in ms) with which the statestore monitors if active catalogd is "
     "designated.");
+DEFINE_int64(update_catalogd_rpc_resend_interval_ms, 100, "(Advanced) Interval (in ms) "
+    "with which the statestore resends the update catalogd RPC to a subscriber if the "
+    "statestore has failed to send the RPC to the subscriber.");
 
 // TGeospatialLibrary's values are mapped here as constants
 static const string geo_lib_none = "NONE";
diff --git a/be/src/statestore/statestore-catalogd-mgr.cc b/be/src/statestore/statestore-catalogd-mgr.cc
index fb2ba6663..e86625f46 100644
--- a/be/src/statestore/statestore-catalogd-mgr.cc
+++ b/be/src/statestore/statestore-catalogd-mgr.cc
@@ -58,6 +58,7 @@ bool StatestoreCatalogdMgr::RegisterCatalogd(bool is_reregistering,
     DCHECK(num_registered_catalogd_ < 2);
     is_active_catalogd_assigned_ = true;
     COPY_CATALOGD_REGISTRATION_FROM_LOCAL_VARIABLES(active);
+    ++sending_sequence_;
     return true;
   }
 
@@ -71,6 +72,7 @@ bool StatestoreCatalogdMgr::RegisterCatalogd(bool is_reregistering,
         COPY_CATALOGD_REGISTRATION_FROM_LOCAL_VARIABLES(active);
         LOG(INFO) << active_catalogd_subscriber_id_
                   << " is re-registered with FLAGS_force_catalogd_active.";
+        ++sending_sequence_;
         return true;
       }
     } else {
@@ -83,6 +85,7 @@ bool StatestoreCatalogdMgr::RegisterCatalogd(bool is_reregistering,
         LOG(INFO) << active_catalogd_subscriber_id_
                   << " is re-registered after HA preemption waiting period and "
                   << "is assigned as active catalogd.";
+        ++sending_sequence_;
         return true;
       }
     }
@@ -109,6 +112,7 @@ bool StatestoreCatalogdMgr::RegisterCatalogd(bool is_reregistering,
       is_active_catalogd_assigned_ = true;
       COPY_CATALOGD_REGISTRATION_FROM_LOCAL_VARIABLES(active);
       LOG(INFO) << active_catalogd_subscriber_id_ << " is assigned as active catalogd.";
+      ++sending_sequence_;
       return true;
     }
     // Wait second catalogd to be registered.
@@ -129,6 +133,7 @@ bool StatestoreCatalogdMgr::RegisterCatalogd(bool is_reregistering,
       LOG(INFO) << active_catalogd_subscriber_id_
                 << " is registered with FLAGS_force_catalogd_active and is assigned as "
                 << "active catalogd.";
+      ++sending_sequence_;
       return true;
     } else if (is_active_catalogd_assigned_) {
       // Existing one is already assigned as active catalogd.
@@ -149,6 +154,7 @@ bool StatestoreCatalogdMgr::RegisterCatalogd(bool is_reregistering,
       }
       LOG(INFO) << active_catalogd_subscriber_id_
                 << " has higher priority and is assigned as active catalogd.";
+      ++sending_sequence_;
       return true;
     }
   }
@@ -171,6 +177,7 @@ bool StatestoreCatalogdMgr::CheckActiveCatalog() {
   COPY_CATALOGD_REGISTRATION_FROM_MEMBER_VARIABLES(active, first);
   LOG(INFO) << active_catalogd_subscriber_id_
             << " is assigned as active catalogd after preemption waiting period.";
+  ++sending_sequence_;
   return true;
 }
 
@@ -186,12 +193,14 @@ bool StatestoreCatalogdMgr::UnregisterCatalogd(
       COPY_CATALOGD_REGISTRATION_FROM_MEMBER_VARIABLES(active, standby);
       RESET_CATALOGD_REGISTRATION_MEMBER_VARIABLES(standby);
       LOG(INFO) << "Fail over active catalogd to " << active_catalogd_subscriber_id_;
+      ++sending_sequence_;
       return true;
     } else {
       is_active_catalogd_assigned_ = false;
       // Don't need to wait second one to be registered.
       first_catalogd_register_time_ = MonotonicMillis() -
           FLAGS_catalogd_ha_preemption_wait_period_ms -1;
+      LOG(INFO) << "No active catalogd available in the cluster";
     }
   } else if (num_registered_catalogd_ > 0) {
     // Unregister standby catalogd.
@@ -206,9 +215,10 @@ bool StatestoreCatalogdMgr::UnregisterCatalogd(
 }
 
 const TCatalogRegistration& StatestoreCatalogdMgr::GetActiveCatalogRegistration(
-    bool* has_active_catalogd) {
+    bool* has_active_catalogd, int64* sending_sequence) {
   std::lock_guard<std::mutex> l(catalog_mgr_lock_);
   *has_active_catalogd = is_active_catalogd_assigned_;
+  *sending_sequence = sending_sequence_;
   return active_catalogd_registration_;
 }
 
@@ -222,7 +232,3 @@ bool StatestoreCatalogdMgr::IsActiveCatalogd(const SubscriberId& subscriber_id)
   return active_catalogd_subscriber_id_ == subscriber_id;
 }
 
-int64 StatestoreCatalogdMgr::GetSendingSequence() {
-  std::lock_guard<std::mutex> l(catalog_mgr_lock_);
-  return ++sending_sequence_;
-}
diff --git a/be/src/statestore/statestore-catalogd-mgr.h b/be/src/statestore/statestore-catalogd-mgr.h
index 00c4ee5d6..55bd3b205 100644
--- a/be/src/statestore/statestore-catalogd-mgr.h
+++ b/be/src/statestore/statestore-catalogd-mgr.h
@@ -66,7 +66,8 @@ class StatestoreCatalogdMgr {
 
   /// Return the protocol version of catalog service and address of active catalogd.
   /// Set *has_active_catalogd as false if the active one is not designated yet.
-  const TCatalogRegistration& GetActiveCatalogRegistration(bool* has_active_catalogd);
+  const TCatalogRegistration& GetActiveCatalogRegistration(
+      bool* has_active_catalogd, int64* sending_sequence);
 
   /// Return the subscriber-id of active catalogd.
   /// This function should be called after the active catalogd is designated.
@@ -78,9 +79,6 @@ class StatestoreCatalogdMgr {
   /// Return the mutex lock.
   std::mutex* GetLock() { return &catalog_mgr_lock_; }
 
-  /// Get sending sequence number.
-  int64 GetSendingSequence();
-
  private:
   /// Protect all member variables.
   std::mutex catalog_mgr_lock_;
@@ -118,7 +116,8 @@ class StatestoreCatalogdMgr {
   /// Additional registration info of standby catalogd
   TCatalogRegistration standby_catalogd_registration_;
 
-  /// Monotonically increasing sending sequence number.
+  /// Monotonically increasing sending sequence number. The value is increased when
+  /// a new active catalogd is designated.
   int64 sending_sequence_;
 };
 
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index 134c614f0..bbfe56564 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -326,8 +326,8 @@ StatestoreSubscriber::StatestoreStub::StatestoreStub(StatestoreSubscriber* subsc
       "statestore-subscriber.registration-id", "N/A");
   statestore_id_metric_ = metrics_->AddProperty<string>(
       "statestore-subscriber.statestore-id", "N/A");
-  update_catalogd_metric_ = metrics_->AddCounter(
-      "statestore-subscriber.num-update-catalogd", 0);
+  update_catalogd_rpc_metric_ = metrics_->AddCounter(
+      "statestore-subscriber.num-update-catalogd-rpc", 0);
   re_registr_attempt_metric_ = metrics_->AddCounter(
       "statestore-subscriber.num-re-register-attempt", 0);
 }
@@ -641,7 +641,7 @@ void StatestoreSubscriber::StatestoreStub::Heartbeat(
 void StatestoreSubscriber::StatestoreStub::UpdateCatalogd(
     const TCatalogRegistration& catalogd_registration,
     const RegistrationId& registration_id, int64 sequence) {
-  update_catalogd_metric_->Increment(1);
+  update_catalogd_rpc_metric_->Increment(1);
   const Status& status =
       CheckRegistrationIdAndUpdateCatalogdSeq(registration_id, sequence);
   if (status.ok()) {
diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h
index 45dd9c31b..e89bc5e78 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -392,7 +392,7 @@ class StatestoreSubscriber {
     StringProperty* statestore_id_metric_;
 
     /// Metric to count the total number of UpdateCatalogd RPCs received by subscriber.
-    IntCounter* update_catalogd_metric_;
+    IntCounter* update_catalogd_rpc_metric_;
 
     /// Metric to count the total number of re-registration attempt.
     IntCounter* re_registr_attempt_metric_;
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index d47b33694..76cb3ec50 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -113,7 +113,8 @@ DEFINE_int32(statestore_update_catalogd_tcp_timeout_seconds, 3, "(Advanced) The
     "UpdateCatalogd RPC in short order");
 
 DECLARE_bool(enable_catalogd_ha);
-DECLARE_int64(active_catalogd_designation_monitoring_frequency_ms);
+DECLARE_int64(active_catalogd_designation_monitoring_interval_ms);
+DECLARE_int64(update_catalogd_rpc_resend_interval_ms);
 DECLARE_string(debug_actions);
 DECLARE_string(ssl_server_certificate);
 DECLARE_string(ssl_private_key);
@@ -135,7 +136,10 @@ const string STATESTORE_UPDATE_DURATION = "statestore.topic-update-durations";
 const string STATESTORE_PRIORITY_UPDATE_DURATION =
     "statestore.priority-topic-update-durations";
 const string STATESTORE_HEARTBEAT_DURATION = "statestore.heartbeat-durations";
-const string STATESTORE_UPDATE_CATALOGD_NUM = "statestore.num-update-catalogd";
+const string STATESTORE_SUCCESSFUL_UPDATE_CATALOGD_RPC_NUM =
+    "statestore.num-successful-update-catalogd-rpc";
+const string STATESTORE_FAILED_UPDATE_CATALOGD_RPC_NUM =
+    "statestore.num-failed-update-catalogd-rpc";
 const string STATESTORE_CLEAR_TOPIC_ENTRIES_NUM =
     "statestore.num-clear-topic-entries-requests";
 const string STATESTORE_ACTIVE_CATALOGD_ADDRESS = "statestore.active-catalogd-address";
@@ -552,7 +556,10 @@ Statestore::Statestore(MetricGroup* metrics)
       metrics, STATESTORE_PRIORITY_UPDATE_DURATION);
   heartbeat_duration_metric_ =
       StatsMetric<double>::CreateAndRegister(metrics, STATESTORE_HEARTBEAT_DURATION);
-  update_catalogd_metric_ = metrics->AddCounter(STATESTORE_UPDATE_CATALOGD_NUM, 0);
+  successful_update_catalogd_rpc_metric_ =
+      metrics->AddCounter(STATESTORE_SUCCESSFUL_UPDATE_CATALOGD_RPC_NUM, 0);
+  failed_update_catalogd_rpc_metric_ =
+      metrics->AddCounter(STATESTORE_FAILED_UPDATE_CATALOGD_RPC_NUM, 0);
   clear_topic_entries_metric_ =
       metrics->AddCounter(STATESTORE_CLEAR_TOPIC_ENTRIES_NUM, 0);
   active_catalogd_address_metric_ = metrics->AddProperty<string>(
@@ -796,8 +803,10 @@ Status Statestore::RegisterSubscriber(const SubscriberId& subscriber_id,
     RETURN_IF_ERROR(OfferUpdate(update, &subscriber_topic_update_threadpool_));
     RETURN_IF_ERROR(OfferUpdate(update, &subscriber_priority_topic_update_threadpool_));
     RETURN_IF_ERROR(OfferUpdate(update, &subscriber_heartbeat_threadpool_));
+    int64 sending_sequence;
     *active_catalogd_registration =
-        catalog_manager_.GetActiveCatalogRegistration(has_active_catalogd);
+        catalog_manager_.GetActiveCatalogRegistration(
+            has_active_catalogd, &sending_sequence);
   }
 
   return Status::OK();
@@ -1206,71 +1215,124 @@ void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
 }
 
 [[noreturn]] void Statestore::MonitorUpdateCatalogd() {
-  // Check if the first registered one should be designated with active role.
+  int64 last_sending_sequence = 0;
+  // rpc_receivers is used to track subscribers to which statestore need to send RPCs
+  // when there is a change in the elected active catalogd. It is updated from
+  // subscribers_, and the subscribers will be removed from this list if the RPCs are
+  // successfully sent to them.
+  vector<std::shared_ptr<Subscriber>> rpc_receivers;
   int64_t timeout_us =
-      FLAGS_active_catalogd_designation_monitoring_frequency_ms * MICROS_PER_MILLI;
+      FLAGS_active_catalogd_designation_monitoring_interval_ms * MICROS_PER_MILLI;
+  // Check if the first registered one should be designated with active role.
   while (!catalog_manager_.CheckActiveCatalog()) {
     unique_lock<mutex> l(*catalog_manager_.GetLock());
     update_catalod_cv_.WaitFor(l, timeout_us);
   }
-  SendUpdateCatalogdNotification();
+  SendUpdateCatalogdNotification(&last_sending_sequence, rpc_receivers);
 
-  // Wait for notification. If catalogd is registered, send notification to all
-  // coordinators
+  // Wait for notification. If new leader is elected due to catalogd is registered or
+  // unregistered, send notification to all coordinators and catalogds.
+  timeout_us = FLAGS_update_catalogd_rpc_resend_interval_ms * MICROS_PER_MILLI;
   while (1) {
     {
       unique_lock<mutex> l(*catalog_manager_.GetLock());
-      update_catalod_cv_.Wait(l);
+      update_catalod_cv_.WaitFor(l, timeout_us);
     }
-    SendUpdateCatalogdNotification();
+    SendUpdateCatalogdNotification(&last_sending_sequence, rpc_receivers);
   }
 }
 
-void Statestore::SendUpdateCatalogdNotification() {
+void Statestore::SendUpdateCatalogdNotification(int64* last_sending_sequence,
+    vector<std::shared_ptr<Subscriber>>& rpc_receivers) {
   bool has_active_catalogd;
+  int64 sending_sequence = 0;
   TCatalogRegistration catalogd_registration =
-      catalog_manager_.GetActiveCatalogRegistration(&has_active_catalogd);
-  DCHECK(has_active_catalogd);
-  if (!has_active_catalogd) return;
-
-  active_catalogd_address_metric_->SetValue(
-      TNetworkAddressToString(catalogd_registration.address));
+      catalog_manager_.GetActiveCatalogRegistration(
+          &has_active_catalogd, &sending_sequence);
+  if (!has_active_catalogd ||
+      (sending_sequence == *last_sending_sequence && rpc_receivers.empty())) {
+    // Don't resend RPCs if there is no change in Active Catalogd and no RPC failure in
+    // last round.
+    return;
+  }
 
-  vector<std::shared_ptr<Subscriber>> receivers;
-  {
+  bool resend_rpc = false;
+  if (sending_sequence > *last_sending_sequence) {
+    // Send notification for the latest elected active catalogd.
+    active_catalogd_address_metric_->SetValue(
+        TNetworkAddressToString(catalogd_registration.address));
+    rpc_receivers.clear();
+    {
+      lock_guard<mutex> l(subscribers_lock_);
+      for (const auto& subscriber : subscribers_) {
+        if (subscriber.second->IsSubscribedCatalogdChange()) {
+          rpc_receivers.push_back(subscriber.second);
+        }
+      }
+    }
+    *last_sending_sequence = sending_sequence;
+  } else {
+    DCHECK(!rpc_receivers.empty());
     lock_guard<mutex> l(subscribers_lock_);
-    for (const auto& subscriber : subscribers_) {
-      if (subscriber.second->IsSubscribedCatalogdChange()) {
-        receivers.push_back(subscriber.second);
+    for (std::vector<std::shared_ptr<Subscriber>>::iterator it = rpc_receivers.begin();
+         it != rpc_receivers.end();) {
+      // Don't resend RPC to subscribers which have been removed from subscriber list.
+      std::shared_ptr<Subscriber> subscriber = *it;
+      if (subscribers_.find(subscriber->id()) == subscribers_.end()) {
+        it = rpc_receivers.erase(it);
+      } else {
+        ++it;
       }
     }
+    if (rpc_receivers.empty()) return;
+    resend_rpc = true;
   }
-  for (const auto& subscriber : receivers) {
+
+  for (std::vector<std::shared_ptr<Subscriber>>::iterator it = rpc_receivers.begin();
+       it != rpc_receivers.end();) {
+    std::shared_ptr<Subscriber> subscriber = *it;
     Status status;
-    StatestoreSubscriberConn client(update_catalogd_client_cache_.get(),
-        subscriber->network_address(), &status);
+    if (!resend_rpc) {
+      status = DebugAction(
+          FLAGS_debug_actions, "SEND_UPDATE_CATALOGD_RPC_FIRST_ATTEMPT");
+    }
     if (status.ok()) {
-      TUpdateCatalogdRequest request;
-      TUpdateCatalogdResponse response;
-      request.__set_registration_id(subscriber->registration_id());
-      request.__set_statestore_id(statestore_id_);
-      request.__set_sequence(catalog_manager_.GetSendingSequence());
-      request.__set_catalogd_registration(catalogd_registration);
-      status = client.DoRpc(
-          &StatestoreSubscriberClientWrapper::UpdateCatalogd, request, &response);
-      if (!status.ok()) {
-        if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) {
-          // Add details to status to make it more useful, while preserving the stack
-          status.AddDetail(Substitute(
-              "Subscriber $0 timed-out during update catalogd RPC. Timeout is $1s.",
-                  subscriber->id(),
-                  FLAGS_statestore_update_catalogd_tcp_timeout_seconds));
+      StatestoreSubscriberConn client(update_catalogd_client_cache_.get(),
+          subscriber->network_address(), &status);
+      if (status.ok()) {
+        TUpdateCatalogdRequest request;
+        TUpdateCatalogdResponse response;
+        request.__set_registration_id(subscriber->registration_id());
+        request.__set_statestore_id(statestore_id_);
+        request.__set_sequence(sending_sequence);
+        request.__set_catalogd_registration(catalogd_registration);
+        status = client.DoRpc(
+            &StatestoreSubscriberClientWrapper::UpdateCatalogd, request, &response);
+        if (!status.ok()) {
+          if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) {
+            // Add details to status to make it more useful, while preserving the stack
+            status.AddDetail(Substitute(
+                "Subscriber $0 timed-out during update catalogd RPC. Timeout is $1s.",
+                subscriber->id(), FLAGS_statestore_update_catalogd_tcp_timeout_seconds));
+          }
         }
-        LOG(ERROR) << "Couldn't send UpdateCatalogd RPC,  " << status.GetDetail();
-      } else {
-        update_catalogd_metric_->Increment(1);
       }
     }
+    if (status.ok()) {
+      successful_update_catalogd_rpc_metric_->Increment(1);
+      // Remove the subscriber from the receiver list so that Statestore will not resend
+      // RPC to it in next round.
+      it = rpc_receivers.erase(it);
+    } else {
+      LOG(ERROR) << "Couldn't send UpdateCatalogd RPC,  " << status.GetDetail();
+      failed_update_catalogd_rpc_metric_->Increment(1);
+      // Leave the subscriber in the receiver list. Statestore will resend RPC to it in
+      // next round.
+      ++it;
+    }
+  }
+  if (rpc_receivers.empty()) {
+    LOG(INFO) << "Successfully sent UpdateCatalogd RPCs to all subscribers";
   }
 }
 
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 5578817e8..3696f3459 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -672,8 +672,12 @@ class Statestore : public CacheLineAligned {
   /// Same as above, but for SendHeartbeat() RPCs.
   StatsMetric<double>* heartbeat_duration_metric_;
 
-  /// Metric to count the total number of UpdateCatalogd RPCs sent by statestore.
-  IntCounter* update_catalogd_metric_;
+  /// Metric to count the total number of successful UpdateCatalogd RPCs sent by
+  /// statestore.
+  IntCounter* successful_update_catalogd_rpc_metric_;
+
+  /// Metric to count the total number of failed UpdateCatalogd RPCs sent by statestore.
+  IntCounter* failed_update_catalogd_rpc_metric_;
 
   /// Metric to count the total number of requests for clearing topic entries from
   /// catalogd. Catalogd indicates to clear topic entries when it is restarted or its
@@ -796,7 +800,8 @@ class Statestore : public CacheLineAligned {
   [[noreturn]] void MonitorUpdateCatalogd();
 
   /// Send notification of updating catalogd to the coordinators.
-  void SendUpdateCatalogdNotification();
+  void SendUpdateCatalogdNotification(int64* last_sending_sequence,
+      vector<std::shared_ptr<Subscriber>>& receivers);
 
   /// Raw callback to indicate whether the service is ready.
   void HealthzHandler(const Webserver::WebRequest& req, std::stringstream* data,
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 8db554f1f..d64ed31fb 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -1099,7 +1099,7 @@
     "label": "The active status of Catalog Server daemon",
     "units": "NONE",
     "kind": "PROPERTY",
-    "key": "catalog-server.ha-active-status"
+    "key": "catalog-server.active-status"
   },
   {
     "description": "The number of active status changes made to this Catalog Server Daemon.",
@@ -1780,7 +1780,7 @@
     "label": "Number of UpdateCatalogd RPCs",
     "units": "UNIT",
     "kind": "COUNTER",
-    "key": "statestore-subscriber.num-update-catalogd"
+    "key": "statestore-subscriber.num-update-catalogd-rpc"
   },
   {
     "description": "The number of Re-Registration Attempt.",
@@ -1871,14 +1871,24 @@
     "key": "statestore.heartbeat-durations"
   },
   {
-    "description": "The number of RPCs sent for updating catalogd.",
+    "description": "The number of successful RPCs for updating catalogd.",
     "contexts": [
       "STATESTORE"
     ],
-    "label": "Number of UpdateCatalogd RPCs",
+    "label": "Number of successful UpdateCatalogd RPCs sent by statestore",
+    "units": "UNIT",
+    "kind": "COUNTER",
+    "key": "statestore.num-successful-update-catalogd-rpc"
+  },
+  {
+    "description": "The number of failed RPCs for updating catalogd.",
+    "contexts": [
+      "STATESTORE"
+    ],
+    "label": "Number of failed UpdateCatalogd RPCs sent by statestore",
     "units": "UNIT",
     "kind": "COUNTER",
-    "key": "statestore.num-update-catalogd"
+    "key": "statestore.num-failed-update-catalogd-rpc"
   },
   {
     "description": "The number of requests for clearing topic entries from catalogd.",
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index d5afb46ab..23e2d96de 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -85,6 +85,7 @@ import org.apache.impala.util.CatalogOpUtil;
 import org.apache.impala.util.GlogAppender;
 import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.util.ThreadNameAnnotator;
+import org.apache.impala.util.TUniqueIdUtil;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
@@ -107,7 +108,11 @@ public class JniCatalog {
   private final AuthorizationManager authzManager_;
 
   // A unique identifier for this instance of the Catalog Service.
-  private static final TUniqueId catalogServiceId_ = generateId();
+  // The service id will be regenerated when the CatalogD becomes active.
+  private static TUniqueId catalogServiceId_ = generateId();
+
+  // Lock to protect catalogServiceId_.
+  private final static Object catalogServiceIdLock_ = new Object();
 
   // A singleton monitoring class that keeps track of the catalog usage metrics.
   private final CatalogOperationMetrics catalogOperationUsage_ =
@@ -248,7 +253,19 @@ public class JniCatalog {
     return databaseName + "." + tableName;
   }
 
-  public static TUniqueId getServiceId() { return catalogServiceId_; }
+  public static TUniqueId getServiceId() {
+    synchronized (catalogServiceIdLock_) {
+      return catalogServiceId_;
+    }
+  }
+
+  public void regenerateServiceId() {
+    synchronized (catalogServiceIdLock_) {
+      catalogServiceId_ = generateId();
+      LOG.info("Regenerate Catalog Service Id {}",
+          TUniqueIdUtil.PrintId(catalogServiceId_).intern());
+    }
+  }
 
   /**
    * Gets the current catalog version.
diff --git a/tests/custom_cluster/test_catalogd_ha.py b/tests/custom_cluster/test_catalogd_ha.py
index 2669c1133..82f14202e 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -64,8 +64,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     assert(len(catalogds) == 2)
     catalogd_service_1 = catalogds[0].service
     catalogd_service_2 = catalogds[1].service
-    assert(catalogd_service_1.get_metric_value("catalog-server.ha-active-status"))
-    assert(not catalogd_service_2.get_metric_value("catalog-server.ha-active-status"))
+    assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
+    assert(not catalogd_service_2.get_metric_value("catalog-server.active-status"))
 
     # Verify ports of the active catalogd of statestore and impalad are matching with
     # the catalog service port of the current active catalogd.
@@ -94,7 +94,7 @@ class TestCatalogdHA(CustomClusterTestSuite):
     catalogds = self.cluster.catalogds()
     assert(len(catalogds) == 1)
     catalogd_service_1 = catalogds[0].service
-    assert(catalogd_service_1.get_metric_value("catalog-server.ha-active-status"))
+    assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
 
     # Verify ports of the active catalogd of statestore and impalad are matching with
     # the catalog service port of the current active catalogd.
@@ -106,11 +106,7 @@ class TestCatalogdHA(CustomClusterTestSuite):
     self.execute_query_expect_success(
         self.client, "select count(*) from functional.alltypes")
 
-  @CustomClusterTestSuite.with_args(
-    statestored_args="--use_subscriber_id_as_catalogd_priority=true "
-                     "--statestore_heartbeat_frequency_ms=1000",
-    start_args="--enable_catalogd_ha")
-  def test_catalogd_auto_failover(self):
+  def __test_catalogd_auto_failover(self):
     """Stop active catalogd and verify standby catalogd becomes active.
     Restart original active catalogd. Verify that statestore does not resume its
     active role."""
@@ -119,8 +115,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     assert(len(catalogds) == 2)
     catalogd_service_1 = catalogds[0].service
     catalogd_service_2 = catalogds[1].service
-    assert(catalogd_service_1.get_metric_value("catalog-server.ha-active-status"))
-    assert(not catalogd_service_2.get_metric_value("catalog-server.ha-active-status"))
+    assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
+    assert(not catalogd_service_2.get_metric_value("catalog-server.active-status"))
 
     statestore_service = self.cluster.statestored.service
     start_count_clear_topic_entries = statestore_service.get_metric_value(
@@ -132,10 +128,10 @@ class TestCatalogdHA(CustomClusterTestSuite):
     # Wait for long enough for the statestore to detect the failure of active catalogd
     # and assign active role to standby catalogd.
     catalogd_service_2.wait_for_metric_value(
-        "catalog-server.ha-active-status", expected_value=True, timeout=30)
+        "catalog-server.active-status", expected_value=True, timeout=30)
     assert(catalogd_service_2.get_metric_value(
         "catalog-server.ha-number-active-status-change") > 0)
-    assert(catalogd_service_2.get_metric_value("catalog-server.ha-active-status"))
+    assert(catalogd_service_2.get_metric_value("catalog-server.active-status"))
 
     # Verify ports of the active catalogd of statestore and impalad are matching with
     # the catalog service port of the current active catalogd.
@@ -156,8 +152,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     catalogds[0].start(wait_until_ready=True)
     sleep(1)
     catalogd_service_1 = catalogds[0].service
-    assert(not catalogd_service_1.get_metric_value("catalog-server.ha-active-status"))
-    assert(catalogd_service_2.get_metric_value("catalog-server.ha-active-status"))
+    assert(not catalogd_service_1.get_metric_value("catalog-server.active-status"))
+    assert(catalogd_service_2.get_metric_value("catalog-server.active-status"))
 
     # Verify ports of the active catalogd of statestore and impalad are matching with
     # the catalog service port of the current active catalogd.
@@ -170,7 +166,36 @@ class TestCatalogdHA(CustomClusterTestSuite):
     statestored_args="--use_subscriber_id_as_catalogd_priority=true "
                      "--statestore_heartbeat_frequency_ms=1000",
     start_args="--enable_catalogd_ha")
-  def test_catalogd_manual_failover(self):
+  def test_catalogd_auto_failover(self):
+    """Tests for Catalog Service auto fail over without failed RPCs."""
+    self.__test_catalogd_auto_failover()
+
+    statestore_service = self.cluster.statestored.service
+    successful_update_catalogd_rpc_num = statestore_service.get_metric_value(
+        "statestore.num-successful-update-catalogd-rpc")
+    failed_update_catalogd_rpc_num = statestore_service.get_metric_value(
+        "statestore.num-failed-update-catalogd-rpc")
+    assert(successful_update_catalogd_rpc_num >= 6)
+    assert(failed_update_catalogd_rpc_num == 0)
+
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--use_subscriber_id_as_catalogd_priority=true "
+                     "--statestore_heartbeat_frequency_ms=1000 "
+                     "--debug_actions=SEND_UPDATE_CATALOGD_RPC_FIRST_ATTEMPT:FAIL@1.0",
+    start_args="--enable_catalogd_ha")
+  def test_catalogd_auto_failover_with_failed_rpc(self):
+    """Tests for Catalog Service auto fail over with failed RPCs."""
+    self.__test_catalogd_auto_failover()
+
+    statestore_service = self.cluster.statestored.service
+    successful_update_catalogd_rpc_num = statestore_service.get_metric_value(
+        "statestore.num-successful-update-catalogd-rpc")
+    failed_update_catalogd_rpc_num = statestore_service.get_metric_value(
+        "statestore.num-failed-update-catalogd-rpc")
+    assert(successful_update_catalogd_rpc_num >= 6)
+    assert(failed_update_catalogd_rpc_num == successful_update_catalogd_rpc_num)
+
+  def __test_catalogd_manual_failover(self):
     """Stop active catalogd and verify standby catalogd becomes active.
     Restart original active catalogd with force_catalogd_active as true. Verify that
     statestore resume it as active.
@@ -180,8 +205,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     assert(len(catalogds) == 2)
     catalogd_service_1 = catalogds[0].service
     catalogd_service_2 = catalogds[1].service
-    assert(catalogd_service_1.get_metric_value("catalog-server.ha-active-status"))
-    assert(not catalogd_service_2.get_metric_value("catalog-server.ha-active-status"))
+    assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
+    assert(not catalogd_service_2.get_metric_value("catalog-server.active-status"))
 
     statestore_service = self.cluster.statestored.service
     start_count_clear_topic_entries = statestore_service.get_metric_value(
@@ -193,10 +218,10 @@ class TestCatalogdHA(CustomClusterTestSuite):
     # Wait for long enough for the statestore to detect the failure of active catalogd
     # and assign active role to standby catalogd.
     catalogd_service_2.wait_for_metric_value(
-        "catalog-server.ha-active-status", expected_value=True, timeout=30)
+        "catalog-server.active-status", expected_value=True, timeout=30)
     assert(catalogd_service_2.get_metric_value(
         "catalog-server.ha-number-active-status-change") > 0)
-    assert(catalogd_service_2.get_metric_value("catalog-server.ha-active-status"))
+    assert(catalogd_service_2.get_metric_value("catalog-server.active-status"))
 
     # Verify ports of the active catalogd of statestore and impalad are matching with
     # the catalog service port of the current active catalogd.
@@ -220,11 +245,11 @@ class TestCatalogdHA(CustomClusterTestSuite):
                        additional_args="--force_catalogd_active=true")
     catalogd_service_1 = catalogds[0].service
     catalogd_service_1.wait_for_metric_value(
-        "catalog-server.ha-active-status", expected_value=True, timeout=15)
-    assert(catalogd_service_1.get_metric_value("catalog-server.ha-active-status"))
+        "catalog-server.active-status", expected_value=True, timeout=15)
+    assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
     sleep_time_s = build_flavor_timeout(2, slow_build_timeout=5)
     sleep(sleep_time_s)
-    assert(not catalogd_service_2.get_metric_value("catalog-server.ha-active-status"))
+    assert(not catalogd_service_2.get_metric_value("catalog-server.active-status"))
 
     # Verify ports of the active catalogd of statestore and impalad are matching with
     # the catalog service port of the current active catalogd.
@@ -237,6 +262,39 @@ class TestCatalogdHA(CustomClusterTestSuite):
         "statestore.num-clear-topic-entries-requests")
     assert end_count_clear_topic_entries > start_count_clear_topic_entries
 
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--use_subscriber_id_as_catalogd_priority=true "
+                     "--statestore_heartbeat_frequency_ms=1000",
+    start_args="--enable_catalogd_ha")
+  def test_catalogd_manual_failover(self):
+    """Tests for Catalog Service manual fail over without failed RPCs."""
+    self.__test_catalogd_manual_failover()
+
+    statestore_service = self.cluster.statestored.service
+    successful_update_catalogd_rpc_num = statestore_service.get_metric_value(
+        "statestore.num-successful-update-catalogd-rpc")
+    failed_update_catalogd_rpc_num = statestore_service.get_metric_value(
+        "statestore.num-failed-update-catalogd-rpc")
+    assert(successful_update_catalogd_rpc_num >= 10)
+    assert(failed_update_catalogd_rpc_num == 0)
+
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--use_subscriber_id_as_catalogd_priority=true "
+                     "--statestore_heartbeat_frequency_ms=1000 "
+                     "--debug_actions=SEND_UPDATE_CATALOGD_RPC_FIRST_ATTEMPT:FAIL@1.0",
+    start_args="--enable_catalogd_ha")
+  def test_catalogd_manual_failover_with_failed_rpc(self):
+    """Tests for Catalog Service manual fail over with failed RPCs."""
+    self.__test_catalogd_manual_failover()
+
+    statestore_service = self.cluster.statestored.service
+    successful_update_catalogd_rpc_num = statestore_service.get_metric_value(
+        "statestore.num-successful-update-catalogd-rpc")
+    failed_update_catalogd_rpc_num = statestore_service.get_metric_value(
+        "statestore.num-failed-update-catalogd-rpc")
+    assert(successful_update_catalogd_rpc_num >= 10)
+    assert(failed_update_catalogd_rpc_num == successful_update_catalogd_rpc_num)
+
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_subscriber_id_as_catalogd_priority=true",
     start_args="--enable_catalogd_ha")
@@ -248,8 +306,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     assert(len(catalogds) == 2)
     catalogd_service_1 = catalogds[0].service
     catalogd_service_2 = catalogds[1].service
-    assert(catalogd_service_1.get_metric_value("catalog-server.ha-active-status"))
-    assert(not catalogd_service_2.get_metric_value("catalog-server.ha-active-status"))
+    assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
+    assert(not catalogd_service_2.get_metric_value("catalog-server.active-status"))
 
     # Verify ports of the active catalogd of statestore and impalad are matching with
     # the catalog service port of the current active catalogd.
@@ -266,8 +324,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
         expected_value=5, timeout=wait_time_s)
     sleep_time_s = build_flavor_timeout(2, slow_build_timeout=5)
     sleep(sleep_time_s)
-    assert(catalogd_service_1.get_metric_value("catalog-server.ha-active-status"))
-    assert(not catalogd_service_2.get_metric_value("catalog-server.ha-active-status"))
+    assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
+    assert(not catalogd_service_2.get_metric_value("catalog-server.active-status"))
 
     # Verify ports of the active catalogd of statestore and impalad are matching with
     # the catalog service port of the current active catalogd.
diff --git a/tests/custom_cluster/test_custom_statestore.py b/tests/custom_cluster/test_custom_statestore.py
index 15d63ec84..8693b9834 100644
--- a/tests/custom_cluster/test_custom_statestore.py
+++ b/tests/custom_cluster/test_custom_statestore.py
@@ -253,15 +253,17 @@ class TestStatestoreStartupDelay(CustomClusterTestSuite):
     except Exception:
       self._stop_impala_cluster()
 
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--statestore_heartbeat_frequency_ms=10000")
   def test_subscriber_tolerate_restart_catalog(self):
     """Restart catalogd and verify update-catalogd RPCs are sent by statestore
     """
     statestore_service = self.cluster.statestored.service
     start_sent_rpc_count = statestore_service.get_metric_value(
-        "statestore.num-update-catalogd")
+        "statestore.num-successful-update-catalogd-rpc")
     impalad_service = self.cluster.impalads[0].service
     start_recv_rpc_count = impalad_service.get_metric_value(
-        "statestore-subscriber.num-update-catalogd")
+        "statestore-subscriber.num-update-catalogd-rpc")
 
     # Restart catalog daemon.
     self.cluster.catalogd.restart()
@@ -269,10 +271,10 @@ class TestStatestoreStartupDelay(CustomClusterTestSuite):
     sleep(wait_time_s)
     # Verify update-catalogd RPCs are sent to coordinators from statestore.
     end_sent_rpc_count = statestore_service.get_metric_value(
-        "statestore.num-update-catalogd")
+        "statestore.num-successful-update-catalogd-rpc")
     assert end_sent_rpc_count > start_sent_rpc_count
     end_recv_rpc_count = impalad_service.get_metric_value(
-        "statestore-subscriber.num-update-catalogd")
+        "statestore-subscriber.num-update-catalogd-rpc")
     assert end_recv_rpc_count > start_recv_rpc_count
 
   @SkipIfBuildType.not_dev_build
@@ -287,5 +289,5 @@ class TestStatestoreStartupDelay(CustomClusterTestSuite):
 
     # Verify that impalad received notification of updating catalogd.
     recv_rpc_count = self.cluster.impalads[0].service.get_metric_value(
-        "statestore-subscriber.num-update-catalogd")
+        "statestore-subscriber.num-update-catalogd-rpc")
     assert recv_rpc_count > 0