You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/07/26 07:44:17 UTC

[impala] branch master updated (7fb6a9a1d -> 434ae61e3)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from 7fb6a9a1d IMPALA-11941: (Addendum) Use released jamm 0.4.0
     new 98059b029 IMPALA-12267: DMLs/DDLs can hang as a result of catalogd restart
     new 434ae61e3 IMPALA-12305: Fix wrong Catalog Service ID when CatalogD becomes active

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/catalog/catalog-server.cc                   |  68 ++++++-----
 be/src/catalog/catalog-server.h                    |  11 +-
 be/src/service/client-request-state.cc             |   8 +-
 be/src/service/impala-server.cc                    | 134 ++++++++++++++++++---
 be/src/service/impala-server.h                     |  15 ++-
 .../impala/catalog/CatalogServiceCatalog.java      |  17 ++-
 .../org/apache/impala/catalog/ImpaladCatalog.java  |  12 +-
 .../java/org/apache/impala/service/JniCatalog.java |  26 ++--
 .../events/MetastoreEventsProcessorTest.java       |   8 +-
 .../impala/testutil/CatalogServiceTestCatalog.java |  13 +-
 tests/custom_cluster/test_catalogd_ha.py           |  39 +++---
 tests/custom_cluster/test_restart_services.py      | 120 +++++++++++++++++-
 12 files changed, 355 insertions(+), 116 deletions(-)


[impala] 02/02: IMPALA-12305: Fix wrong Catalog Service ID when CatalogD becomes active

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 434ae61e388bb44e0a49746cb969f28529656909
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Sun Jul 23 01:42:03 2023 -0700

    IMPALA-12305: Fix wrong Catalog Service ID when CatalogD becomes active
    
    In IMPALA-12286, catalogd re-generate its Catalog Service ID in
    JniCatalog when it becomes active. But CatalogServiceCatalog is not
    updated when new Catalog Service ID is generated. This causes
    coordinator hanging when processing DDLs.
    In CatalogServer class, is_active_ is not protected by mutex
    catalog_lock_, and pending_topic_updates_ is not cleared when the
    catalogd becomes active. It's possible catalog server sends pending
    catalog topic updates with old Catalog Service ID then sends catalog
    topic updates with new Catalog Service ID.
    
    This patch removes catalogServiceId_ from CatalogServiceCatalog, and
    makes CatalogServiceCatalog to call JniCatalog.getServiceId() directly.
    It makes is_active_ to be protected by mutex catalog_lock_, and makes
    catalog server to clear pending_topic_updates_ when the catalogd becomes
    active.
    
    Testing:
     - Added more queries in test cases for CatalogD HA. Verified that
       test cases failed without fix, and test cases were passed after fix.
     - Passed core tests.
    
    Change-Id: I5eada89052c5f16209c6f16357f30f78b4497434
    Reviewed-on: http://gerrit.cloudera.org:8080/20258
    Reviewed-by: Andrew Sherman <as...@cloudera.com>
    Reviewed-by: Riza Suminto <ri...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/catalog/catalog-server.cc                   | 68 +++++++++++-----------
 be/src/catalog/catalog-server.h                    | 11 ++--
 .../impala/catalog/CatalogServiceCatalog.java      | 17 +++---
 .../java/org/apache/impala/service/JniCatalog.java | 26 ++++++---
 .../events/MetastoreEventsProcessorTest.java       |  8 +--
 .../impala/testutil/CatalogServiceTestCatalog.java | 13 ++---
 tests/custom_cluster/test_catalogd_ha.py           | 39 ++++++++-----
 7 files changed, 99 insertions(+), 83 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 6bc1369d4..961a37fd6 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -344,6 +344,7 @@ CatalogServer::CatalogServer(MetricGroup* metrics)
   : protocol_version_(CatalogServiceVersion::V2),
     thrift_iface_(new CatalogServiceThriftIf(this)),
     thrift_serializer_(FLAGS_compact_catalog_topic), metrics_(metrics),
+    is_active_(!FLAGS_enable_catalogd_ha),
     topic_updates_ready_(false), last_sent_catalog_version_(0L),
     catalog_objects_max_version_(0L) {
   topic_processing_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
@@ -354,7 +355,6 @@ CatalogServer::CatalogServer(MetricGroup* metrics)
       metrics->AddProperty(CATALOG_ACTIVE_STATUS, !FLAGS_enable_catalogd_ha);
   num_ha_active_status_change_metric_ =
       metrics->AddCounter(CATALOG_HA_NUM_ACTIVE_STATUS_CHANGE, 0);
-  is_active_.Store(FLAGS_enable_catalogd_ha ? 0 : 1);
 }
 
 Status CatalogServer::Start() {
@@ -408,7 +408,7 @@ Status CatalogServer::Start() {
       statestore_subscriber_->Start(&has_active_catalogd, &active_catalogd_registration));
   if (FLAGS_enable_catalogd_ha && has_active_catalogd) {
     UpdateRegisteredCatalogd(active_catalogd_registration);
-    if (FLAGS_force_catalogd_active && is_active_.Load() != 1) {
+    if (FLAGS_force_catalogd_active && !IsActive()) {
       LOG(ERROR) << "Could not start CatalogD as active instance";
       return Status("Could not start CatalogD as active instance");
     }
@@ -417,7 +417,7 @@ Status CatalogServer::Start() {
   // Notify the thread to start for the first time.
   {
     lock_guard<mutex> l(catalog_lock_);
-    catalog_update_cv_.NotifyOne();
+    if (is_active_) catalog_update_cv_.NotifyOne();
   }
   return Status::OK();
 }
@@ -451,18 +451,15 @@ void CatalogServer::RegisterWebpages(Webserver* webserver, bool metrics_only) {
 void CatalogServer::UpdateCatalogTopicCallback(
     const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
     vector<TTopicDelta>* subscriber_topic_updates) {
-  // Don't update catalog if this instance is not active.
-  if (is_active_.Load() == 0) return;
-
   StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
       incoming_topic_deltas.find(CatalogServer::IMPALA_CATALOG_TOPIC);
   if (topic == incoming_topic_deltas.end()) return;
 
   unique_lock<mutex> l(catalog_lock_, std::try_to_lock);
-  // Return if unable to acquire the catalog_lock_ or if the topic update data is
-  // not yet ready for processing. This indicates the catalog_update_gathering_thread_
-  // is still building a topic update.
-  if (!l || !topic_updates_ready_) return;
+  // Return if unable to acquire the catalog_lock_, or this instance is not active,
+  // or if the topic update data is not yet ready for processing. This indicates the
+  // catalog_update_gathering_thread_ is still building a topic update.
+  if (!l || !is_active_ || !topic_updates_ready_) return;
 
   const TTopicDelta& delta = topic->second;
 
@@ -510,35 +507,40 @@ void CatalogServer::UpdateRegisteredCatalogd(
             << TNetworkAddressToString(catalogd_registration.address);
   bool is_matching = (catalogd_registration.address.hostname == FLAGS_hostname
       && catalogd_registration.address.port == FLAGS_catalog_service_port);
-  bool is_changed;
+  lock_guard<mutex> l(catalog_lock_);
   if (is_matching) {
-    is_changed = is_active_.CompareAndSwap(0, 1);
+    if (!is_active_) {
+      is_active_ = true;
+      active_status_metric_->SetValue(true);
+      num_ha_active_status_change_metric_->Increment(1);
+      // Reset last_sent_catalog_version_ when the catalogd become active. This will
+      // lead to non-delta catalog update for next IMPALA_CATALOG_TOPIC which also
+      // instruct the statestore to clear all entries for the catalog update topic.
+      last_sent_catalog_version_ = 0;
+      // Regenerate Catalog Service ID.
+      catalog_->RegenerateServiceId();
+      // Clear pending topic updates.
+      pending_topic_updates_.clear();
+      // Signal the catalog update gathering thread to start.
+      topic_updates_ready_ = false;
+      catalog_update_cv_.NotifyOne();
+      LOG(INFO) << "This catalogd instance is changed to active status";
+    }
   } else {
-    is_changed = is_active_.CompareAndSwap(1, 0);
-  }
-  if (is_changed) {
-    {
-      unique_lock<mutex> unique_lock(catalog_lock_);
-      bool is_active = (is_active_.Load() != 0);
-      if (is_active) {
-        // Reset last_sent_catalog_version_ when the catalogd become active. This will
-        // lead to non-delta catalog update for next IMPALA_CATALOG_TOPIC which also
-        // instruct the statestore to clear all entries for the catalog update topic.
-        last_sent_catalog_version_ = 0;
-        // Signal the catalog update gathering thread to start.
-        topic_updates_ready_ = false;
-        catalog_update_cv_.NotifyOne();
-        // Regenerate Catalog Service ID.
-        catalog_->RegenerateServiceId();
-      }
-      active_status_metric_->SetValue(is_active);
+    if (is_active_) {
+      is_active_ = false;
+      active_status_metric_->SetValue(false);
+      num_ha_active_status_change_metric_->Increment(1);
+      LOG(INFO) << "This catalogd instance is changed to inactive status";
     }
-    num_ha_active_status_change_metric_->Increment(1);
-    LOG(INFO) << "The role of catalogd instance is changed to "
-              << (is_matching ? "active" : "standby");
   }
 }
 
+bool CatalogServer::IsActive() {
+  lock_guard<mutex> l(catalog_lock_);
+  return is_active_;
+}
+
 [[noreturn]] void CatalogServer::GatherCatalogUpdatesThread() {
   while (true) {
     unique_lock<mutex> unique_lock(catalog_lock_);
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index c699e981d..77cb744c2 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -134,9 +134,6 @@ class CatalogServer {
   /// Indicates whether the catalog service is ready.
   std::atomic_bool service_started_{false};
 
-  /// Set to 1 if this catalog instance is active.
-  AtomicInt32 is_active_{0};
-
   /// Thrift API implementation which proxies requests onto this CatalogService.
   std::shared_ptr<CatalogServiceIf> thrift_iface_;
   ThriftSerializer thrift_serializer_;
@@ -162,10 +159,13 @@ class CatalogServer {
   /// Thread that periodically wakes up and refreshes certain Catalog metrics.
   std::unique_ptr<Thread> catalog_metrics_refresh_thread_;
 
-  /// Protects catalog_update_cv_, pending_topic_updates_,
+  /// Protects is_active_, catalog_update_cv_, pending_topic_updates_,
   /// catalog_objects_to/from_version_, and last_sent_catalog_version.
   std::mutex catalog_lock_;
 
+  /// Set to true if this catalog instance is active.
+  bool is_active_;
+
   /// Condition variable used to signal when the catalog_update_gathering_thread_ should
   /// fetch its next set of updates from the JniCatalog. At the end of each statestore
   /// heartbeat, this CV is signaled and the catalog_update_gathering_thread_ starts
@@ -209,6 +209,9 @@ class CatalogServer {
   /// Callback function for receiving notification of updating catalogd.
   void UpdateRegisteredCatalogd(const TCatalogRegistration& catalogd_registration);
 
+  /// Returns the active status of the catalogd.
+  bool IsActive();
+
   /// Executed by the catalog_update_gathering_thread_. Calls into JniCatalog
   /// to get the latest set of catalog objects that exist, along with some metadata on
   /// each object. The results are stored in the shared catalog_objects_ data structure.
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 724b3bc94..ddb08e75f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -89,6 +89,7 @@ import org.apache.impala.hive.executor.HiveJavaFunction;
 import org.apache.impala.hive.executor.HiveJavaFunctionFactoryImpl;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
+import org.apache.impala.service.JniCatalog;
 import org.apache.impala.thrift.CatalogLookupStatus;
 import org.apache.impala.thrift.CatalogServiceConstants;
 import org.apache.impala.thrift.TCatalog;
@@ -240,8 +241,6 @@ public class CatalogServiceCatalog extends Catalog {
   // default value of table id in the GetPartialCatalogObjectRequest
   public static final long TABLE_ID_UNAVAILABLE = -1;
 
-  private final TUniqueId catalogServiceId_;
-
   // Fair lock used to synchronize reads/writes of catalogVersion_. Because this lock
   // protects catalogVersion_, it can be used to perform atomic bulk catalog operations
   // since catalogVersion_ cannot change externally while the lock is being held.
@@ -340,8 +339,7 @@ public class CatalogServiceCatalog extends Catalog {
    * @throws ImpalaException
    */
   public CatalogServiceCatalog(boolean loadInBackground, int numLoadingThreads,
-      TUniqueId catalogServiceId, String localLibraryPath,
-      MetaStoreClientPool metaStoreClientPool)
+      String localLibraryPath, MetaStoreClientPool metaStoreClientPool)
       throws ImpalaException {
     super(metaStoreClientPool);
     blacklistedDbs_ = CatalogBlacklistUtils.parseBlacklistedDbs(
@@ -356,7 +354,6 @@ public class CatalogServiceCatalog extends Catalog {
         .getBackendCfg().topic_update_tbl_max_wait_time_ms;
     Preconditions.checkState(topicUpdateTblLockMaxWaitTimeMs_ >= 0,
         "topic_update_tbl_max_wait_time_ms must be positive");
-    catalogServiceId_ = catalogServiceId;
     tableLoadingMgr_ = new TableLoadingMgr(this, numLoadingThreads);
     loadInBackground_ = loadInBackground;
     try {
@@ -397,9 +394,8 @@ public class CatalogServiceCatalog extends Catalog {
    * to establish an initial connection to the HMS before giving up.
    */
   public CatalogServiceCatalog(boolean loadInBackground, int numLoadingThreads,
-      int initialHmsCnxnTimeoutSec, TUniqueId catalogServiceId, String localLibraryPath)
-      throws ImpalaException {
-    this(loadInBackground, numLoadingThreads, catalogServiceId, localLibraryPath,
+      int initialHmsCnxnTimeoutSec, String localLibraryPath) throws ImpalaException {
+    this(loadInBackground, numLoadingThreads, localLibraryPath,
         new MetaStoreClientPool(INITIAL_META_STORE_CLIENT_POOL_SIZE,
             initialHmsCnxnTimeoutSec));
   }
@@ -999,7 +995,8 @@ public class CatalogServiceCatalog extends Catalog {
     // can safely forward their min catalog version.
     TCatalogObject catalog =
         new TCatalogObject(TCatalogObjectType.CATALOG, ctx.toVersion);
-    catalog.setCatalog(new TCatalog(catalogServiceId_, ctx.lastResetStartVersion));
+    catalog.setCatalog(
+        new TCatalog(JniCatalog.getServiceId(), ctx.lastResetStartVersion));
     ctx.addCatalogObject(catalog, false);
     // Garbage collect the delete and topic update log.
     deleteLog_.garbageCollect(ctx.toVersion);
@@ -3528,7 +3525,7 @@ public class CatalogServiceCatalog extends Catalog {
    * Gets the id for this catalog service
    */
   public String getCatalogServiceId() {
-    return TUniqueIdUtil.PrintId(catalogServiceId_).intern();
+    return TUniqueIdUtil.PrintId(JniCatalog.getServiceId()).intern();
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index 23e2d96de..d4947ce64 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -24,6 +24,9 @@ import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 import java.util.stream.Collectors;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.impala.analysis.TableName;
@@ -111,8 +114,9 @@ public class JniCatalog {
   // The service id will be regenerated when the CatalogD becomes active.
   private static TUniqueId catalogServiceId_ = generateId();
 
-  // Lock to protect catalogServiceId_.
-  private final static Object catalogServiceIdLock_ = new Object();
+  // ReadWriteLock to protect catalogServiceId_.
+  private final static ReentrantReadWriteLock catalogServiceIdLock_ =
+      new ReentrantReadWriteLock(true /*fair ordering*/);
 
   // A singleton monitoring class that keeps track of the catalog usage metrics.
   private final CatalogOperationMetrics catalogOperationUsage_ =
@@ -155,8 +159,7 @@ public class JniCatalog {
         new MetaStoreClientPool(CatalogServiceCatalog.INITIAL_META_STORE_CLIENT_POOL_SIZE,
             cfg.initial_hms_cnxn_timeout_s);
     catalog_ = new CatalogServiceCatalog(cfg.load_catalog_in_background,
-        cfg.num_metadata_loading_threads, getServiceId(), cfg.local_library_path,
-        metaStoreClientPool);
+        cfg.num_metadata_loading_threads, cfg.local_library_path, metaStoreClientPool);
     authzManager_ = authzFactory.newAuthorizationManager(catalog_);
     catalog_.setAuthzManager(authzManager_);
     catalogOpExecutor_ = new CatalogOpExecutor(catalog_, authzConfig, authzManager_,
@@ -254,16 +257,23 @@ public class JniCatalog {
   }
 
   public static TUniqueId getServiceId() {
-    synchronized (catalogServiceIdLock_) {
+    catalogServiceIdLock_.readLock().lock();
+    try {
       return catalogServiceId_;
+    } finally {
+      catalogServiceIdLock_.readLock().unlock();
     }
   }
 
   public void regenerateServiceId() {
-    synchronized (catalogServiceIdLock_) {
+    catalogServiceIdLock_.writeLock().lock();
+    try {
+      TUniqueId oldCatalogServiceId = catalogServiceId_;
       catalogServiceId_ = generateId();
-      LOG.info("Regenerate Catalog Service Id {}",
-          TUniqueIdUtil.PrintId(catalogServiceId_).intern());
+      LOG.info("Old Catalog Service ID " + TUniqueIdUtil.PrintId(oldCatalogServiceId) +
+          ", Regenerate Catalog Service ID " + TUniqueIdUtil.PrintId(catalogServiceId_));
+    } finally {
+      catalogServiceIdLock_.writeLock().unlock();
     }
   }
 
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 37d9203ed..e71e3afe6 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -157,7 +157,6 @@ import org.apache.impala.thrift.TTableRowFormat;
 import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.thrift.TTypeNode;
 import org.apache.impala.thrift.TTypeNodeType;
-import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.thrift.TUpdateCatalogRequest;
 import org.apache.impala.thrift.TUpdatedPartition;
 import org.apache.impala.util.MetaStoreUtil;
@@ -1764,17 +1763,16 @@ public class MetastoreEventsProcessorTest {
     private String tblName_;
 
     private FakeCatalogServiceCatalogForFlagTests(boolean loadInBackground,
-        int numLoadingThreads, TUniqueId catalogServiceId, String localLibraryPath,
+        int numLoadingThreads, String localLibraryPath,
         MetaStoreClientPool metaStoreClientPool) throws ImpalaException {
-      super(loadInBackground, numLoadingThreads, catalogServiceId, localLibraryPath,
-          metaStoreClientPool);
+      super(loadInBackground, numLoadingThreads, localLibraryPath, metaStoreClientPool);
     }
 
     public static CatalogServiceCatalog create() {
       FeSupport.loadLibrary();
       CatalogServiceCatalog cs;
       try {
-        cs = new FakeCatalogServiceCatalogForFlagTests(false, 16, new TUniqueId(),
+        cs = new FakeCatalogServiceCatalogForFlagTests(false, 16,
             System.getProperty("java.io.tmpdir"), new MetaStoreClientPool(0, 0));
         cs.setAuthzManager(new NoopAuthorizationManager());
         cs.setMetastoreEventProcessor(NoOpEventProcessor.getInstance());
diff --git a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
index f3ec57cc3..cba20cbb7 100644
--- a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
@@ -34,7 +34,6 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.hive.executor.TestHiveJavaFunctionFactory;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.service.FeSupport;
-import org.apache.impala.thrift.TUniqueId;
 
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -47,10 +46,9 @@ import java.util.UUID;
 public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
   private CatalogOpExecutor opExecutor_;
   private CatalogServiceTestCatalog(boolean loadInBackground, int numLoadingThreads,
-      TUniqueId catalogServiceId, MetaStoreClientPool metaStoreClientPool)
-      throws ImpalaException {
-    super(loadInBackground, numLoadingThreads, catalogServiceId,
-        System.getProperty("java.io.tmpdir"), metaStoreClientPool);
+      MetaStoreClientPool metaStoreClientPool) throws ImpalaException {
+    super(loadInBackground, numLoadingThreads, System.getProperty("java.io.tmpdir"),
+        metaStoreClientPool);
 
     // Cache pools are typically loaded asynchronously, but as there is no fixed execution
     // order for tests, the cache pools are loaded synchronously before the tests are
@@ -74,8 +72,7 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
       if (MetastoreShim.getMajorVersion() > 2) {
         MetastoreShim.setHiveClientCapabilities();
       }
-      cs = new CatalogServiceTestCatalog(false, 16, new TUniqueId(),
-          new MetaStoreClientPool(0, 0));
+      cs = new CatalogServiceTestCatalog(false, 16, new MetaStoreClientPool(0, 0));
       cs.setAuthzManager(factory.newAuthorizationManager(cs));
       cs.setMetastoreEventProcessor(NoOpEventProcessor.getInstance());
       cs.setCatalogMetastoreServer(NoOpCatalogMetastoreServer.INSTANCE);
@@ -106,7 +103,7 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
       MetastoreShim.setHiveClientCapabilities();
     }
     CatalogServiceTestCatalog cs = new CatalogServiceTestCatalog(false, 16,
-        new TUniqueId(), new EmbeddedMetastoreClientPool(0, derbyPath));
+        new EmbeddedMetastoreClientPool(0, derbyPath));
     cs.setAuthzManager(new NoopAuthorizationManager());
     cs.setMetastoreEventProcessor(NoOpEventProcessor.getInstance());
     cs.setCatalogOpExecutor(new CatalogOpExecutor(cs,
diff --git a/tests/custom_cluster/test_catalogd_ha.py b/tests/custom_cluster/test_catalogd_ha.py
index a1dcf9da2..3803883f7 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -55,6 +55,20 @@ class TestCatalogdHA(CustomClusterTestSuite):
     _, catalog_service_port = active_catalogd_address.split(":")
     assert(int(catalog_service_port) == catalogd_service.get_catalog_service_port())
 
+  def __run_simple_queries(self):
+    try:
+      self.execute_query_expect_success(
+          self.client, "drop table if exists test_catalogd_ha")
+      self.execute_query_expect_success(
+          self.client, "create table if not exists test_catalogd_ha (id int)")
+      self.execute_query_expect_success(
+          self.client, "insert into table test_catalogd_ha values(1), (2), (3)")
+      self.execute_query_expect_success(
+          self.client, "select count(*) from test_catalogd_ha")
+    finally:
+      self.execute_query_expect_success(
+          self.client, "drop table if exists test_catalogd_ha")
+
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_subscriber_id_as_catalogd_priority=true",
     start_args="--enable_catalogd_ha")
@@ -74,9 +88,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     self.__verify_impalad_active_catalogd_port(0, catalogd_service_1)
     self.__verify_impalad_active_catalogd_port(1, catalogd_service_1)
     self.__verify_impalad_active_catalogd_port(2, catalogd_service_1)
-    # Verify simple query is ran successfully.
-    self.execute_query_expect_success(
-        self.client, "select count(*) from functional.alltypes")
+    # Verify simple queries are ran successfully.
+    self.__run_simple_queries()
 
     # Restart one coordinator. Verify it get active catalogd address from statestore.
     self.cluster.impalads[0].restart()
@@ -103,9 +116,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     self.__verify_impalad_active_catalogd_port(0, catalogd_service_1)
     self.__verify_impalad_active_catalogd_port(1, catalogd_service_1)
     self.__verify_impalad_active_catalogd_port(2, catalogd_service_1)
-    # Verify simple query is ran successfully.
-    self.execute_query_expect_success(
-        self.client, "select count(*) from functional.alltypes")
+    # Verify simple queries are ran successfully.
+    self.__run_simple_queries()
 
   def __test_catalogd_auto_failover(self):
     """Stop active catalogd and verify standby catalogd becomes active.
@@ -140,9 +152,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     self.__verify_impalad_active_catalogd_port(0, catalogd_service_2)
     self.__verify_impalad_active_catalogd_port(1, catalogd_service_2)
     self.__verify_impalad_active_catalogd_port(2, catalogd_service_2)
-    # Verify simple query is ran successfully.
-    self.execute_query_expect_success(
-        self.client, "select count(*) from functional.alltypes")
+    # Verify simple queries are ran successfully.
+    self.__run_simple_queries()
 
     end_count_clear_topic_entries = statestore_service.get_metric_value(
         "statestore.num-clear-topic-entries-requests")
@@ -231,9 +242,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     self.__verify_impalad_active_catalogd_port(1, catalogd_service_2)
     self.__verify_impalad_active_catalogd_port(2, catalogd_service_2)
 
-    # Verify simple query is ran successfully.
-    self.execute_query_expect_success(
-        self.client, "select count(*) from functional.alltypes")
+    # Verify simple queries are ran successfully.
+    self.__run_simple_queries()
 
     end_count_clear_topic_entries = statestore_service.get_metric_value(
         "statestore.num-clear-topic-entries-requests")
@@ -334,9 +344,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     self.__verify_impalad_active_catalogd_port(0, catalogd_service_1)
     self.__verify_impalad_active_catalogd_port(1, catalogd_service_1)
     self.__verify_impalad_active_catalogd_port(2, catalogd_service_1)
-    # Verify simple query is ran successfully.
-    self.execute_query_expect_success(
-        self.client, "select count(*) from functional.alltypes")
+    # Verify simple queries are ran successfully.
+    self.__run_simple_queries()
 
     unexpected_msg = re.compile(
         "unexpected sequence number: [0-9]+, was expecting greater than [0-9]+")


[impala] 01/02: IMPALA-12267: DMLs/DDLs can hang as a result of catalogd restart

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 98059b029afc4950c7934e9a6edcdbbb99bc890b
Author: Daniel Becker <da...@cloudera.com>
AuthorDate: Wed Jul 12 17:20:34 2023 +0200

    IMPALA-12267: DMLs/DDLs can hang as a result of catalogd restart
    
    IMPALA-5476 added handling for changes in catalogd service ID during the
    lifetime of a DML/DDL query, but the loop that waits for a new catalogd
    service ID can wait indefinitely in case the DML/DDL was handled by the
    previous catalogd and restart happened just after replying to the
    coordinator:
    https://github.com/apache/impala/blob/d0fe4c604f72d41019832513ebf65cfe8f469953/be/src/service/impala-server.cc#L2204
    
    This change adds two new startup flags. The first one is
    '--wait_for_new_catalog_service_id_timeout_sec', which sets an upper
    limit on the waiting time (in seconds). The second one is
    '--wait_for_new_catalog_service_id_max_iterations', which limits how
    many valid catalog updates without a new catalog service ID the
    coordinator receives before it gives up and stops waiting.
    
    For both startup flags, negative values and zero turn the feature off.
    
    If the coordinator gives up waiting because of either of the new startup
    flags, the local catalog cache is not updated and a warning is logged.
    
    Testing:
     - added a custom cluster test that reproduces the above condition using
       a debug action and verifies that the waiting loop times out.
    
    Change-Id: Ib71bec8f67f80b0bdfe0a6cc46a16ef624163d8b
    Reviewed-on: http://gerrit.cloudera.org:8080/20192
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/client-request-state.cc             |   8 +-
 be/src/service/impala-server.cc                    | 134 ++++++++++++++++++---
 be/src/service/impala-server.h                     |  15 ++-
 .../org/apache/impala/catalog/ImpaladCatalog.java  |  12 +-
 tests/custom_cluster/test_restart_services.py      | 120 +++++++++++++++++-
 5 files changed, 256 insertions(+), 33 deletions(-)

diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 64b3a4fc7..f6c0fbb2b 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -725,7 +725,7 @@ void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) {
   // Add newly created table to catalog cache.
   status = parent_server_->ProcessCatalogUpdateResult(
       *catalog_op_executor_->update_catalog_result(),
-      exec_request_->query_options.sync_ddl);
+      exec_request_->query_options.sync_ddl, query_options());
   {
     lock_guard<mutex> l(lock_);
     RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
@@ -842,7 +842,7 @@ void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
 
   status = parent_server_->ProcessCatalogUpdateResult(
       resp.result,
-      exec_request_->query_options.sync_ddl);
+      exec_request_->query_options.sync_ddl, query_options());
   {
     lock_guard<mutex> l(lock_);
     RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
@@ -1594,7 +1594,7 @@ Status ClientRequestState::UpdateCatalog() {
         query_events_->MarkEvent("Transaction committed");
       }
       RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(resp.result,
-          exec_request_->query_options.sync_ddl));
+          exec_request_->query_options.sync_ddl, query_options()));
     }
   } else if (InKuduTransaction()) {
     // Commit the Kudu transaction. Clear transaction state if it's successful.
@@ -1754,7 +1754,7 @@ Status ClientRequestState::UpdateTableAndColumnStats(
   }
   RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
       *catalog_op_executor_->update_catalog_result(),
-      exec_request_->query_options.sync_ddl));
+      exec_request_->query_options.sync_ddl, query_options()));
 
   // Set the results to be reported to the client.
   SetResultSet(catalog_op_executor_->ddl_exec_response());
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index d2fe8f37d..58fa78ab4 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -379,6 +379,28 @@ DEFINE_bool(auto_check_compaction, false,
     "additional RPCs to hive metastore for each table in a query during the query "
     "compilation.");
 
+DEFINE_int32(wait_for_new_catalog_service_id_timeout_sec, 5 * 60,
+    "During DDL/DML queries, if there is a mismatch between the catalog service ID that"
+    "the coordinator knows of and the one in the RPC response from the catalogd, the "
+    "coordinator waits for a statestore update with a new catalog service ID in order to "
+    "catch up with the one in the RPC response. However, in rare cases the service ID "
+    "the coordinator knows of is the more recent one, in which case it could wait "
+    "infinitely - to avoid this, this flag can be set to a positive value (in seconds) "
+    "to limit the waiting time. Negative values and zero have no effect. See also "
+    "'--wait_for_new_catalog_service_id_max_iterations,'.");
+
+DEFINE_int32(wait_for_new_catalog_service_id_max_iterations, 10,
+    "This flag is used in the same situation as described at the "
+    "'--wait_for_new_catalog_service_id_timeout_sec' flag. Instead of limiting the "
+    "waiting time, the effect of this flag is that the coordinator gives up waiting "
+    "after receiving the set number of valid catalog updates that do not change the "
+    "catalog service ID. Negative values and zero have no effect. If both this flag and "
+    "'--wait_for_new_catalog_service_id_timeout_sec' are set, the coordinator stops "
+    "waiting when the stop condition of either of them is met. Note that it is possible "
+    "that the coordinator does not receive any catalog update from the statestore and in "
+    "this case it will wait indefinitely if "
+    "'--wait_for_new_catalog_service_id_timeout_sec' is not set.");
+
 // Flags for JWT token based authentication.
 DECLARE_bool(jwt_token_auth);
 DECLARE_bool(jwt_validate_signature);
@@ -885,6 +907,67 @@ Status ImpalaServer::DecompressToProfile(TRuntimeProfileFormat::type format,
   return Status::OK();
 }
 
+void ImpalaServer::WaitForNewCatalogServiceId(TUniqueId cur_service_id,
+    unique_lock<mutex>* ver_lock) {
+  DCHECK(ver_lock != nullptr);
+  // The catalog service ID of 'catalog_update_result' does not match the current catalog
+  // service ID. It is possible that catalogd has been restarted and
+  // 'catalog_update_result' contains the new service ID but we haven't received the
+  // statestore update about the new catalogd yet. We'll wait until we receive an update
+  // with a new catalog service ID or we give up (if
+  // --wait_for_new_catalog_service_id_timeout_sec is set and we time out OR if
+  // --wait_for_new_catalog_service_id_max_iterations is set and we reach the max number
+  // of updates without a new service ID). The timeout is useful in case the service ID of
+  // 'catalog_update_result' is actually older than the current catalog service ID. This
+  // is possible if the RPC response came from the old catalogd and we have already
+  // received the statestore update about the new one (see IMPALA-12267).
+  const bool timeout_set = FLAGS_wait_for_new_catalog_service_id_timeout_sec > 0;
+  const int64_t timeout_ms =
+      FLAGS_wait_for_new_catalog_service_id_timeout_sec * MILLIS_PER_SEC;
+  timespec wait_end_time;
+  if (timeout_set) TimeFromNowMillis(timeout_ms, &wait_end_time);
+
+  const bool max_statestore_updates_set =
+      FLAGS_wait_for_new_catalog_service_id_max_iterations > 0;
+
+  bool timed_out = false;
+  int num_statestore_updates = 0;
+
+  int64_t old_catalog_version = catalog_update_info_.catalog_version;
+  while (catalog_update_info_.catalog_service_id == cur_service_id) {
+    if (max_statestore_updates_set
+        && catalog_update_info_.catalog_version != old_catalog_version) {
+      old_catalog_version = catalog_update_info_.catalog_version;
+      ++num_statestore_updates;
+      if (num_statestore_updates < FLAGS_wait_for_new_catalog_service_id_max_iterations) {
+        LOG(INFO) << "Received " << num_statestore_updates << " non-empty catalog "
+            << "updates from the statestore while waiting for an update with a new "
+            << "catalog service ID but the catalog service ID has not changed. Going to "
+            << "give up waiting after "
+            << FLAGS_wait_for_new_catalog_service_id_max_iterations
+            << " such updates in total.";
+      } else {
+        LOG(WARNING) << "Received " << num_statestore_updates << " non-empty catalog "
+            << "updates from the statestore while waiting for an update with a new "
+            << "catalog service ID but the catalog service ID has not changed. "
+            << "Giving up waiting.";
+        break;
+      }
+    }
+
+    if (timeout_set) {
+      timed_out = !catalog_version_update_cv_.WaitUntil(*ver_lock, wait_end_time);
+      if (timed_out) {
+        LOG(WARNING) << "Waiting for catalog update with a new "
+            << "catalog service ID timed out.";
+        break;
+      }
+    } else {
+      catalog_version_update_cv_.Wait(*ver_lock);
+    }
+  }
+}
+
 Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& user,
     TExecSummary* result, TExecSummary* original_result, bool* was_retried) {
   if (was_retried != nullptr) *was_retried = false;
@@ -2171,7 +2254,8 @@ void ImpalaServer::WaitForMinCatalogUpdate(const int64_t min_req_catalog_object_
 }
 
 Status ImpalaServer::ProcessCatalogUpdateResult(
-    const TCatalogUpdateResult& catalog_update_result, bool wait_for_all_subscribers) {
+    const TCatalogUpdateResult& catalog_update_result, bool wait_for_all_subscribers,
+    const TQueryOptions& query_options) {
   const TUniqueId& catalog_service_id = catalog_update_result.catalog_service_id;
   if (!catalog_update_result.__isset.updated_catalog_objects &&
       !catalog_update_result.__isset.removed_catalog_objects) {
@@ -2192,18 +2276,17 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
   } else {
     TUniqueId cur_service_id;
     {
+      Status status = DebugAction(query_options, "WAIT_BEFORE_PROCESSING_CATALOG_UPDATE");
+      DCHECK(status.ok());
+
       unique_lock<mutex> ver_lock(catalog_version_lock_);
       cur_service_id = catalog_update_info_.catalog_service_id;
-      if (catalog_update_info_.catalog_service_id != catalog_service_id) {
+      if (cur_service_id != catalog_service_id) {
         LOG(INFO) << "Catalog service ID mismatch. Current ID: "
             << PrintId(cur_service_id) << ". ID in response: "
-            << PrintId(catalog_service_id) << ". Catalogd may be restarted. Waiting for"
-            " new catalog update from statestore.";
-        // Catalog service ID has been changed, and impalad request a full topic update.
-        // When impalad completes the full topic update, it will exit this loop.
-        while (cur_service_id == catalog_update_info_.catalog_service_id) {
-          catalog_version_update_cv_.Wait(ver_lock);
-        }
+            << PrintId(catalog_service_id) << ". Catalogd may have been restarted. "
+            "Waiting for new catalog update from statestore.";
+        WaitForNewCatalogServiceId(cur_service_id, &ver_lock);
         cur_service_id = catalog_update_info_.catalog_service_id;
       }
     }
@@ -2224,17 +2307,30 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
       RETURN_IF_ERROR(status);
     } else {
       // We can't apply updates on another service id, because the local catalog is still
-      // inconsistent with the catalogd that executes the DDL. Catalogd may be restarted
-      // more than once inside a statestore update cycle. 'cur_service_id' could belong
-      // to 1) a stale update from the previous restarted catalogd, or 2) a newer update
-      // from next restarted catalogd. We are good to ignore the DDL result at the second
-      // case. However, in the first case clients may see stale catalog until the
-      // expected catalog topic update comes.
+      // inconsistent with the catalogd that executes the DDL/DML.
+      //
+      // 'cur_service_id' could belong to
+      // 1) a stale update about a previous catalogd; this is possible if
+      //     a) catalogd was restarted more than once (for example inside a statestore
+      //        update cycle) and we only got the updates about some but not all restarts
+      //        - the update about the catalogd that has 'catalog_service_id' has not
+      //        arrived yet OR
+      //     b) we gave up waiting (timed out or got a certain number of updates) before
+      //        getting the update about the new catalogd
+      // 2) an update about a restarted catalogd that is newer than the one with
+      //    'catalog_service_id' (in this case we also timed out waiting for an update)
+      //
+      // We are good to ignore the DDL/DML result in the second case. However, in the
+      // first case clients may see a stale catalog until the expected catalog topic
+      // update arrives.
       // TODO: handle the first case in IMPALA-10875.
-      LOG(WARNING) << "Ignoring catalog update result of catalog service ID: "
-          << PrintId(catalog_service_id) << ". The expected catalog service ID: "
-          << PrintId(catalog_service_id) << ". Current catalog service ID: "
-          << PrintId(cur_service_id) <<". Catalogd may be restarted more than once.";
+      LOG(WARNING) << "Ignoring catalog update result of catalog service ID "
+          << PrintId(catalog_service_id)
+          << " because it does not match with current catalog service ID "
+          << PrintId(cur_service_id)
+          << ". The current catalog service ID may be stale (this may be caused by the "
+          << "catalogd having been restarted more than once) or newer than the catalog "
+          << "service ID of the update result.";
     }
     if (!wait_for_all_subscribers) return Status::OK();
     // Wait until we receive and process the catalog update that covers the effects
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 6b40630d4..19941f80a 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -417,11 +417,14 @@ class ImpalaServer : public ImpalaServiceIf,
   /// been updated from a statestore heartbeat that includes this catalog
   /// update's version.
   ///
-  /// If wait_for_all_subscribers is true, this function also
-  /// waits for all other catalog topic subscribers to process this update by checking the
-  /// current min_subscriber_topic_version included in each state store heartbeat.
+  /// If 'wait_for_all_subscribers' is true, this function also waits for all other
+  /// catalog topic subscribers to process this update by checking the current
+  /// min_subscriber_topic_version included in each state store heartbeat.
+  ///
+  /// 'query_options' is used for running debug actions.
   Status ProcessCatalogUpdateResult(const TCatalogUpdateResult& catalog_update_result,
-      bool wait_for_all_subscribers) WARN_UNUSED_RESULT;
+      bool wait_for_all_subscribers, const TQueryOptions& query_options)
+      WARN_UNUSED_RESULT;
 
   /// Wait until the catalog update with version 'catalog_update_version' is
   /// received and applied in the local catalog cache or until the catalog
@@ -1253,6 +1256,9 @@ class ImpalaServer : public ImpalaServiceIf,
   Status DecompressToProfile(TRuntimeProfileFormat::type format,
       std::shared_ptr<QueryStateRecord> query_record, RuntimeProfileOutput* profile);
 
+  void WaitForNewCatalogServiceId(TUniqueId cur_service_id,
+      std::unique_lock<std::mutex>* ver_lock);
+
   /// Logger for writing encoded query profiles, one per line with the following format:
   /// <ms-since-epoch> <query-id> <thrift query profile URL encoded and gzipped>
   boost::scoped_ptr<SimpleLogger> profile_logger_;
@@ -1549,6 +1555,7 @@ class ImpalaServer : public ImpalaServiceIf,
       catalog_topic_version(0L),
       catalog_object_version_lower_bound(0L) {
     }
+
     /// Update the metrics to store the current version of catalog, current topic and
     /// current service id used by impalad.
     void  UpdateCatalogVersionMetrics();
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index 2d389f8ef..430736a07 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -169,15 +169,17 @@ public class ImpaladCatalog extends Catalog implements FeCatalog {
   /**
    * Update the catalog service Id. Trigger a full update if the service ID changes.
    */
-  private void setCatalogServiceId(TUniqueId catalog_service_id) throws CatalogException {
+  private void setCatalogServiceId(TUniqueId catalogServiceId) throws CatalogException {
     // Check for changes in the catalog service ID.
-    if (!catalogServiceId_.equals(catalog_service_id)) {
+    if (!catalogServiceId_.equals(catalogServiceId)) {
       boolean firstRun = catalogServiceId_.equals(INITIAL_CATALOG_SERVICE_ID);
-      catalogServiceId_ = catalog_service_id;
+      TUniqueId oldCatalogServiceId = catalogServiceId_;
+      catalogServiceId_ = catalogServiceId;
       if (!firstRun) {
         // Throw an exception which will trigger a full topic update request.
-        throw new CatalogException("Detected catalog service ID change. Aborting " +
-            "updateCatalog()");
+        throw new CatalogException("Detected catalog service ID changes from " +
+            TUniqueIdUtil.PrintId(oldCatalogServiceId) + " to " +
+            TUniqueIdUtil.PrintId(catalogServiceId) + ". Aborting updateCatalog()");
       }
     }
   }
diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py
index 1e1dae001..56533b2c8 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -176,7 +176,7 @@ class TestRestart(CustomClusterTestSuite):
     self.execute_query_expect_success(self.client, "invalidate metadata")
 
     # No need to care whether the dll is executed successfully, it is just to make
-    # the local catalog catche of impalad out of sync
+    # the local catalog cache of impalad out of sync
     for i in range(0, 10):
       try:
         query = "alter table join_aa add columns (age" + str(i) + " int)"
@@ -191,6 +191,124 @@ class TestRestart(CustomClusterTestSuite):
     self.execute_query_expect_success(self.client, "select name from join_aa")
     self.execute_query_expect_success(self.client, "drop table join_aa")
 
+  WAIT_FOR_CATALOG_UPDATE_TIMEOUT_SEC = 5
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=1,
+    statestored_args="--statestore_update_frequency_ms=2000",
+    impalad_args=("--wait_for_new_catalog_service_id_timeout_sec={} \
+                  --wait_for_new_catalog_service_id_max_iterations=-1"
+                  .format(WAIT_FOR_CATALOG_UPDATE_TIMEOUT_SEC)))
+  def test_restart_catalogd_while_handling_rpc_response_with_timeout(self):
+    """Regression test for IMPALA-12267. We'd like to cause a situation where
+         - The coordinator issues a DDL or DML query
+         - Catalogd sends a response RPC
+         - Catalogd is restarted and gets a new catalog service ID
+         - The coordinator receives the update about the new catalogd from the statestore
+           before processing the RPC from the old catalogd.
+    Before IMPALA-12267 the coordinator hung infinitely in this situation, waiting for a
+    statestore update with a new catalog service ID assuming the service ID it had was
+    stale, but it already had the most recent one."""
+    self.execute_query_expect_success(self.client, "drop table if exists join_aa")
+    self.execute_query_expect_success(self.client, "create table join_aa(id int)")
+    # Make the catalog object version grow large enough
+    self.execute_query_expect_success(self.client, "invalidate metadata")
+
+    debug_action_sleep_time_sec = 10
+    DEBUG_ACTION = ("WAIT_BEFORE_PROCESSING_CATALOG_UPDATE:SLEEP@{}"
+                    .format(debug_action_sleep_time_sec * 1000))
+
+    query = "alter table join_aa add columns (age" + " int)"
+    handle = self.execute_query_async(query, query_options={"debug_action": DEBUG_ACTION})
+
+    # Wait a bit so the RPC from the catalogd arrives to the coordinator.
+    time.sleep(0.5)
+
+    self.cluster.catalogd.restart()
+
+    # Wait for the query to finish.
+    max_wait_time = (debug_action_sleep_time_sec
+        + self.WAIT_FOR_CATALOG_UPDATE_TIMEOUT_SEC + 10)
+    self.wait_for_state(handle, self.client.QUERY_STATES["FINISHED"], max_wait_time)
+
+    self.assert_impalad_log_contains("WARNING",
+        "Waiting for catalog update with a new catalog service ID timed out.")
+    self.assert_impalad_log_contains("WARNING",
+        "Ignoring catalog update result of catalog service ID")
+
+    self.execute_query_expect_success(self.client, "select age from join_aa")
+
+    self.execute_query_expect_success(self.client,
+        "alter table join_aa add columns (name string)")
+    self.execute_query_expect_success(self.client, "select name from join_aa")
+    self.execute_query_expect_success(self.client, "drop table join_aa")
+
+  WAIT_FOR_CATALOG_UPDATE_MAX_ITERATIONS = 3
+  STATESTORE_UPDATE_FREQ_SEC = 2
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=1,
+    statestored_args="--statestore_update_frequency_ms={}".format(
+        STATESTORE_UPDATE_FREQ_SEC * 1000),
+    impalad_args=("--wait_for_new_catalog_service_id_timeout_sec=-1 \
+                  --wait_for_new_catalog_service_id_max_iterations={}"
+                  .format(WAIT_FOR_CATALOG_UPDATE_MAX_ITERATIONS)))
+  def test_restart_catalogd_while_handling_rpc_response_with_max_iters(self):
+    """We create the same situation as described in
+    'test_restart_catalogd_while_handling_rpc_response_with_timeout()' but we get out of
+    it not by timing out but by giving up waiting after receiving
+    'WAIT_FOR_CATALOG_UPDATE_MAX_ITERATIONS' updates from the statestore that don't change
+    the catalog service ID."""
+    self.execute_query_expect_success(self.client, "drop table if exists join_aa")
+    self.execute_query_expect_success(self.client, "create table join_aa(id int)")
+    # Make the catalog object version grow large enough
+    self.execute_query_expect_success(self.client, "invalidate metadata")
+
+    debug_action_sleep_time_sec = 10
+    DEBUG_ACTION = ("WAIT_BEFORE_PROCESSING_CATALOG_UPDATE:SLEEP@{}"
+                    .format(debug_action_sleep_time_sec * 1000))
+
+    query = "alter table join_aa add columns (age" + " int)"
+    handle = self.execute_query_async(query, query_options={"debug_action": DEBUG_ACTION})
+
+    # Wait a bit so the RPC from the catalogd arrives to the coordinator.
+    time.sleep(0.5)
+
+    self.cluster.catalogd.restart()
+
+    # Sleep until the coordinator is done with the debug action sleep and it starts
+    # waiting for catalog updates.
+    time.sleep(debug_action_sleep_time_sec + 0.5)
+
+    # Issue DML queries so that the coordinator receives catalog updates.
+    for i in range(self.WAIT_FOR_CATALOG_UPDATE_MAX_ITERATIONS):
+      try:
+        query = "alter table join_aa add columns (age" + str(i) + " int)"
+        self.execute_query_async(query)
+        time.sleep(self.STATESTORE_UPDATE_FREQ_SEC)
+      except Exception as e:
+        LOG.info(str(e))
+
+    # Wait for the query to finish.
+    max_wait_time = 10
+    self.wait_for_state(handle, self.client.QUERY_STATES["FINISHED"], max_wait_time)
+
+    expected_log_msg = "Received {} non-empty catalog updates from the statestore " \
+        "while waiting for an update with a new catalog service ID but the catalog " \
+        "service ID has not changed. Giving up waiting.".format(
+            self.WAIT_FOR_CATALOG_UPDATE_MAX_ITERATIONS)
+
+    self.assert_impalad_log_contains("INFO", expected_log_msg)
+    self.assert_impalad_log_contains("WARNING",
+        "Ignoring catalog update result of catalog service ID")
+
+    self.execute_query_expect_success(self.client, "select age from join_aa")
+
+    self.execute_query_expect_success(self.client,
+        "alter table join_aa add columns (name string)")
+    self.execute_query_expect_success(self.client, "select name from join_aa")
+    self.execute_query_expect_success(self.client, "drop table join_aa")
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     statestored_args="--statestore_update_frequency_ms=5000")