You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/07/26 07:44:19 UTC

[impala] 02/02: IMPALA-12305: Fix wrong Catalog Service ID when CatalogD becomes active

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 434ae61e388bb44e0a49746cb969f28529656909
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Sun Jul 23 01:42:03 2023 -0700

    IMPALA-12305: Fix wrong Catalog Service ID when CatalogD becomes active
    
    In IMPALA-12286, catalogd re-generate its Catalog Service ID in
    JniCatalog when it becomes active. But CatalogServiceCatalog is not
    updated when new Catalog Service ID is generated. This causes
    coordinator hanging when processing DDLs.
    In CatalogServer class, is_active_ is not protected by mutex
    catalog_lock_, and pending_topic_updates_ is not cleared when the
    catalogd becomes active. It's possible catalog server sends pending
    catalog topic updates with old Catalog Service ID then sends catalog
    topic updates with new Catalog Service ID.
    
    This patch removes catalogServiceId_ from CatalogServiceCatalog, and
    makes CatalogServiceCatalog to call JniCatalog.getServiceId() directly.
    It makes is_active_ to be protected by mutex catalog_lock_, and makes
    catalog server to clear pending_topic_updates_ when the catalogd becomes
    active.
    
    Testing:
     - Added more queries in test cases for CatalogD HA. Verified that
       test cases failed without fix, and test cases were passed after fix.
     - Passed core tests.
    
    Change-Id: I5eada89052c5f16209c6f16357f30f78b4497434
    Reviewed-on: http://gerrit.cloudera.org:8080/20258
    Reviewed-by: Andrew Sherman <as...@cloudera.com>
    Reviewed-by: Riza Suminto <ri...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/catalog/catalog-server.cc                   | 68 +++++++++++-----------
 be/src/catalog/catalog-server.h                    | 11 ++--
 .../impala/catalog/CatalogServiceCatalog.java      | 17 +++---
 .../java/org/apache/impala/service/JniCatalog.java | 26 ++++++---
 .../events/MetastoreEventsProcessorTest.java       |  8 +--
 .../impala/testutil/CatalogServiceTestCatalog.java | 13 ++---
 tests/custom_cluster/test_catalogd_ha.py           | 39 ++++++++-----
 7 files changed, 99 insertions(+), 83 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 6bc1369d4..961a37fd6 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -344,6 +344,7 @@ CatalogServer::CatalogServer(MetricGroup* metrics)
   : protocol_version_(CatalogServiceVersion::V2),
     thrift_iface_(new CatalogServiceThriftIf(this)),
     thrift_serializer_(FLAGS_compact_catalog_topic), metrics_(metrics),
+    is_active_(!FLAGS_enable_catalogd_ha),
     topic_updates_ready_(false), last_sent_catalog_version_(0L),
     catalog_objects_max_version_(0L) {
   topic_processing_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
@@ -354,7 +355,6 @@ CatalogServer::CatalogServer(MetricGroup* metrics)
       metrics->AddProperty(CATALOG_ACTIVE_STATUS, !FLAGS_enable_catalogd_ha);
   num_ha_active_status_change_metric_ =
       metrics->AddCounter(CATALOG_HA_NUM_ACTIVE_STATUS_CHANGE, 0);
-  is_active_.Store(FLAGS_enable_catalogd_ha ? 0 : 1);
 }
 
 Status CatalogServer::Start() {
@@ -408,7 +408,7 @@ Status CatalogServer::Start() {
       statestore_subscriber_->Start(&has_active_catalogd, &active_catalogd_registration));
   if (FLAGS_enable_catalogd_ha && has_active_catalogd) {
     UpdateRegisteredCatalogd(active_catalogd_registration);
-    if (FLAGS_force_catalogd_active && is_active_.Load() != 1) {
+    if (FLAGS_force_catalogd_active && !IsActive()) {
       LOG(ERROR) << "Could not start CatalogD as active instance";
       return Status("Could not start CatalogD as active instance");
     }
@@ -417,7 +417,7 @@ Status CatalogServer::Start() {
   // Notify the thread to start for the first time.
   {
     lock_guard<mutex> l(catalog_lock_);
-    catalog_update_cv_.NotifyOne();
+    if (is_active_) catalog_update_cv_.NotifyOne();
   }
   return Status::OK();
 }
@@ -451,18 +451,15 @@ void CatalogServer::RegisterWebpages(Webserver* webserver, bool metrics_only) {
 void CatalogServer::UpdateCatalogTopicCallback(
     const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
     vector<TTopicDelta>* subscriber_topic_updates) {
-  // Don't update catalog if this instance is not active.
-  if (is_active_.Load() == 0) return;
-
   StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
       incoming_topic_deltas.find(CatalogServer::IMPALA_CATALOG_TOPIC);
   if (topic == incoming_topic_deltas.end()) return;
 
   unique_lock<mutex> l(catalog_lock_, std::try_to_lock);
-  // Return if unable to acquire the catalog_lock_ or if the topic update data is
-  // not yet ready for processing. This indicates the catalog_update_gathering_thread_
-  // is still building a topic update.
-  if (!l || !topic_updates_ready_) return;
+  // Return if unable to acquire the catalog_lock_, or this instance is not active,
+  // or if the topic update data is not yet ready for processing. This indicates the
+  // catalog_update_gathering_thread_ is still building a topic update.
+  if (!l || !is_active_ || !topic_updates_ready_) return;
 
   const TTopicDelta& delta = topic->second;
 
@@ -510,35 +507,40 @@ void CatalogServer::UpdateRegisteredCatalogd(
             << TNetworkAddressToString(catalogd_registration.address);
   bool is_matching = (catalogd_registration.address.hostname == FLAGS_hostname
       && catalogd_registration.address.port == FLAGS_catalog_service_port);
-  bool is_changed;
+  lock_guard<mutex> l(catalog_lock_);
   if (is_matching) {
-    is_changed = is_active_.CompareAndSwap(0, 1);
+    if (!is_active_) {
+      is_active_ = true;
+      active_status_metric_->SetValue(true);
+      num_ha_active_status_change_metric_->Increment(1);
+      // Reset last_sent_catalog_version_ when the catalogd become active. This will
+      // lead to non-delta catalog update for next IMPALA_CATALOG_TOPIC which also
+      // instruct the statestore to clear all entries for the catalog update topic.
+      last_sent_catalog_version_ = 0;
+      // Regenerate Catalog Service ID.
+      catalog_->RegenerateServiceId();
+      // Clear pending topic updates.
+      pending_topic_updates_.clear();
+      // Signal the catalog update gathering thread to start.
+      topic_updates_ready_ = false;
+      catalog_update_cv_.NotifyOne();
+      LOG(INFO) << "This catalogd instance is changed to active status";
+    }
   } else {
-    is_changed = is_active_.CompareAndSwap(1, 0);
-  }
-  if (is_changed) {
-    {
-      unique_lock<mutex> unique_lock(catalog_lock_);
-      bool is_active = (is_active_.Load() != 0);
-      if (is_active) {
-        // Reset last_sent_catalog_version_ when the catalogd become active. This will
-        // lead to non-delta catalog update for next IMPALA_CATALOG_TOPIC which also
-        // instruct the statestore to clear all entries for the catalog update topic.
-        last_sent_catalog_version_ = 0;
-        // Signal the catalog update gathering thread to start.
-        topic_updates_ready_ = false;
-        catalog_update_cv_.NotifyOne();
-        // Regenerate Catalog Service ID.
-        catalog_->RegenerateServiceId();
-      }
-      active_status_metric_->SetValue(is_active);
+    if (is_active_) {
+      is_active_ = false;
+      active_status_metric_->SetValue(false);
+      num_ha_active_status_change_metric_->Increment(1);
+      LOG(INFO) << "This catalogd instance is changed to inactive status";
     }
-    num_ha_active_status_change_metric_->Increment(1);
-    LOG(INFO) << "The role of catalogd instance is changed to "
-              << (is_matching ? "active" : "standby");
   }
 }
 
+bool CatalogServer::IsActive() {
+  lock_guard<mutex> l(catalog_lock_);
+  return is_active_;
+}
+
 [[noreturn]] void CatalogServer::GatherCatalogUpdatesThread() {
   while (true) {
     unique_lock<mutex> unique_lock(catalog_lock_);
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index c699e981d..77cb744c2 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -134,9 +134,6 @@ class CatalogServer {
   /// Indicates whether the catalog service is ready.
   std::atomic_bool service_started_{false};
 
-  /// Set to 1 if this catalog instance is active.
-  AtomicInt32 is_active_{0};
-
   /// Thrift API implementation which proxies requests onto this CatalogService.
   std::shared_ptr<CatalogServiceIf> thrift_iface_;
   ThriftSerializer thrift_serializer_;
@@ -162,10 +159,13 @@ class CatalogServer {
   /// Thread that periodically wakes up and refreshes certain Catalog metrics.
   std::unique_ptr<Thread> catalog_metrics_refresh_thread_;
 
-  /// Protects catalog_update_cv_, pending_topic_updates_,
+  /// Protects is_active_, catalog_update_cv_, pending_topic_updates_,
   /// catalog_objects_to/from_version_, and last_sent_catalog_version.
   std::mutex catalog_lock_;
 
+  /// Set to true if this catalog instance is active.
+  bool is_active_;
+
   /// Condition variable used to signal when the catalog_update_gathering_thread_ should
   /// fetch its next set of updates from the JniCatalog. At the end of each statestore
   /// heartbeat, this CV is signaled and the catalog_update_gathering_thread_ starts
@@ -209,6 +209,9 @@ class CatalogServer {
   /// Callback function for receiving notification of updating catalogd.
   void UpdateRegisteredCatalogd(const TCatalogRegistration& catalogd_registration);
 
+  /// Returns the active status of the catalogd.
+  bool IsActive();
+
   /// Executed by the catalog_update_gathering_thread_. Calls into JniCatalog
   /// to get the latest set of catalog objects that exist, along with some metadata on
   /// each object. The results are stored in the shared catalog_objects_ data structure.
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 724b3bc94..ddb08e75f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -89,6 +89,7 @@ import org.apache.impala.hive.executor.HiveJavaFunction;
 import org.apache.impala.hive.executor.HiveJavaFunctionFactoryImpl;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
+import org.apache.impala.service.JniCatalog;
 import org.apache.impala.thrift.CatalogLookupStatus;
 import org.apache.impala.thrift.CatalogServiceConstants;
 import org.apache.impala.thrift.TCatalog;
@@ -240,8 +241,6 @@ public class CatalogServiceCatalog extends Catalog {
   // default value of table id in the GetPartialCatalogObjectRequest
   public static final long TABLE_ID_UNAVAILABLE = -1;
 
-  private final TUniqueId catalogServiceId_;
-
   // Fair lock used to synchronize reads/writes of catalogVersion_. Because this lock
   // protects catalogVersion_, it can be used to perform atomic bulk catalog operations
   // since catalogVersion_ cannot change externally while the lock is being held.
@@ -340,8 +339,7 @@ public class CatalogServiceCatalog extends Catalog {
    * @throws ImpalaException
    */
   public CatalogServiceCatalog(boolean loadInBackground, int numLoadingThreads,
-      TUniqueId catalogServiceId, String localLibraryPath,
-      MetaStoreClientPool metaStoreClientPool)
+      String localLibraryPath, MetaStoreClientPool metaStoreClientPool)
       throws ImpalaException {
     super(metaStoreClientPool);
     blacklistedDbs_ = CatalogBlacklistUtils.parseBlacklistedDbs(
@@ -356,7 +354,6 @@ public class CatalogServiceCatalog extends Catalog {
         .getBackendCfg().topic_update_tbl_max_wait_time_ms;
     Preconditions.checkState(topicUpdateTblLockMaxWaitTimeMs_ >= 0,
         "topic_update_tbl_max_wait_time_ms must be positive");
-    catalogServiceId_ = catalogServiceId;
     tableLoadingMgr_ = new TableLoadingMgr(this, numLoadingThreads);
     loadInBackground_ = loadInBackground;
     try {
@@ -397,9 +394,8 @@ public class CatalogServiceCatalog extends Catalog {
    * to establish an initial connection to the HMS before giving up.
    */
   public CatalogServiceCatalog(boolean loadInBackground, int numLoadingThreads,
-      int initialHmsCnxnTimeoutSec, TUniqueId catalogServiceId, String localLibraryPath)
-      throws ImpalaException {
-    this(loadInBackground, numLoadingThreads, catalogServiceId, localLibraryPath,
+      int initialHmsCnxnTimeoutSec, String localLibraryPath) throws ImpalaException {
+    this(loadInBackground, numLoadingThreads, localLibraryPath,
         new MetaStoreClientPool(INITIAL_META_STORE_CLIENT_POOL_SIZE,
             initialHmsCnxnTimeoutSec));
   }
@@ -999,7 +995,8 @@ public class CatalogServiceCatalog extends Catalog {
     // can safely forward their min catalog version.
     TCatalogObject catalog =
         new TCatalogObject(TCatalogObjectType.CATALOG, ctx.toVersion);
-    catalog.setCatalog(new TCatalog(catalogServiceId_, ctx.lastResetStartVersion));
+    catalog.setCatalog(
+        new TCatalog(JniCatalog.getServiceId(), ctx.lastResetStartVersion));
     ctx.addCatalogObject(catalog, false);
     // Garbage collect the delete and topic update log.
     deleteLog_.garbageCollect(ctx.toVersion);
@@ -3528,7 +3525,7 @@ public class CatalogServiceCatalog extends Catalog {
    * Gets the id for this catalog service
    */
   public String getCatalogServiceId() {
-    return TUniqueIdUtil.PrintId(catalogServiceId_).intern();
+    return TUniqueIdUtil.PrintId(JniCatalog.getServiceId()).intern();
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index 23e2d96de..d4947ce64 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -24,6 +24,9 @@ import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 import java.util.stream.Collectors;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.impala.analysis.TableName;
@@ -111,8 +114,9 @@ public class JniCatalog {
   // The service id will be regenerated when the CatalogD becomes active.
   private static TUniqueId catalogServiceId_ = generateId();
 
-  // Lock to protect catalogServiceId_.
-  private final static Object catalogServiceIdLock_ = new Object();
+  // ReadWriteLock to protect catalogServiceId_.
+  private final static ReentrantReadWriteLock catalogServiceIdLock_ =
+      new ReentrantReadWriteLock(true /*fair ordering*/);
 
   // A singleton monitoring class that keeps track of the catalog usage metrics.
   private final CatalogOperationMetrics catalogOperationUsage_ =
@@ -155,8 +159,7 @@ public class JniCatalog {
         new MetaStoreClientPool(CatalogServiceCatalog.INITIAL_META_STORE_CLIENT_POOL_SIZE,
             cfg.initial_hms_cnxn_timeout_s);
     catalog_ = new CatalogServiceCatalog(cfg.load_catalog_in_background,
-        cfg.num_metadata_loading_threads, getServiceId(), cfg.local_library_path,
-        metaStoreClientPool);
+        cfg.num_metadata_loading_threads, cfg.local_library_path, metaStoreClientPool);
     authzManager_ = authzFactory.newAuthorizationManager(catalog_);
     catalog_.setAuthzManager(authzManager_);
     catalogOpExecutor_ = new CatalogOpExecutor(catalog_, authzConfig, authzManager_,
@@ -254,16 +257,23 @@ public class JniCatalog {
   }
 
   public static TUniqueId getServiceId() {
-    synchronized (catalogServiceIdLock_) {
+    catalogServiceIdLock_.readLock().lock();
+    try {
       return catalogServiceId_;
+    } finally {
+      catalogServiceIdLock_.readLock().unlock();
     }
   }
 
   public void regenerateServiceId() {
-    synchronized (catalogServiceIdLock_) {
+    catalogServiceIdLock_.writeLock().lock();
+    try {
+      TUniqueId oldCatalogServiceId = catalogServiceId_;
       catalogServiceId_ = generateId();
-      LOG.info("Regenerate Catalog Service Id {}",
-          TUniqueIdUtil.PrintId(catalogServiceId_).intern());
+      LOG.info("Old Catalog Service ID " + TUniqueIdUtil.PrintId(oldCatalogServiceId) +
+          ", Regenerate Catalog Service ID " + TUniqueIdUtil.PrintId(catalogServiceId_));
+    } finally {
+      catalogServiceIdLock_.writeLock().unlock();
     }
   }
 
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 37d9203ed..e71e3afe6 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -157,7 +157,6 @@ import org.apache.impala.thrift.TTableRowFormat;
 import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.thrift.TTypeNode;
 import org.apache.impala.thrift.TTypeNodeType;
-import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.thrift.TUpdateCatalogRequest;
 import org.apache.impala.thrift.TUpdatedPartition;
 import org.apache.impala.util.MetaStoreUtil;
@@ -1764,17 +1763,16 @@ public class MetastoreEventsProcessorTest {
     private String tblName_;
 
     private FakeCatalogServiceCatalogForFlagTests(boolean loadInBackground,
-        int numLoadingThreads, TUniqueId catalogServiceId, String localLibraryPath,
+        int numLoadingThreads, String localLibraryPath,
         MetaStoreClientPool metaStoreClientPool) throws ImpalaException {
-      super(loadInBackground, numLoadingThreads, catalogServiceId, localLibraryPath,
-          metaStoreClientPool);
+      super(loadInBackground, numLoadingThreads, localLibraryPath, metaStoreClientPool);
     }
 
     public static CatalogServiceCatalog create() {
       FeSupport.loadLibrary();
       CatalogServiceCatalog cs;
       try {
-        cs = new FakeCatalogServiceCatalogForFlagTests(false, 16, new TUniqueId(),
+        cs = new FakeCatalogServiceCatalogForFlagTests(false, 16,
             System.getProperty("java.io.tmpdir"), new MetaStoreClientPool(0, 0));
         cs.setAuthzManager(new NoopAuthorizationManager());
         cs.setMetastoreEventProcessor(NoOpEventProcessor.getInstance());
diff --git a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
index f3ec57cc3..cba20cbb7 100644
--- a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
@@ -34,7 +34,6 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.hive.executor.TestHiveJavaFunctionFactory;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.service.FeSupport;
-import org.apache.impala.thrift.TUniqueId;
 
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -47,10 +46,9 @@ import java.util.UUID;
 public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
   private CatalogOpExecutor opExecutor_;
   private CatalogServiceTestCatalog(boolean loadInBackground, int numLoadingThreads,
-      TUniqueId catalogServiceId, MetaStoreClientPool metaStoreClientPool)
-      throws ImpalaException {
-    super(loadInBackground, numLoadingThreads, catalogServiceId,
-        System.getProperty("java.io.tmpdir"), metaStoreClientPool);
+      MetaStoreClientPool metaStoreClientPool) throws ImpalaException {
+    super(loadInBackground, numLoadingThreads, System.getProperty("java.io.tmpdir"),
+        metaStoreClientPool);
 
     // Cache pools are typically loaded asynchronously, but as there is no fixed execution
     // order for tests, the cache pools are loaded synchronously before the tests are
@@ -74,8 +72,7 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
       if (MetastoreShim.getMajorVersion() > 2) {
         MetastoreShim.setHiveClientCapabilities();
       }
-      cs = new CatalogServiceTestCatalog(false, 16, new TUniqueId(),
-          new MetaStoreClientPool(0, 0));
+      cs = new CatalogServiceTestCatalog(false, 16, new MetaStoreClientPool(0, 0));
       cs.setAuthzManager(factory.newAuthorizationManager(cs));
       cs.setMetastoreEventProcessor(NoOpEventProcessor.getInstance());
       cs.setCatalogMetastoreServer(NoOpCatalogMetastoreServer.INSTANCE);
@@ -106,7 +103,7 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
       MetastoreShim.setHiveClientCapabilities();
     }
     CatalogServiceTestCatalog cs = new CatalogServiceTestCatalog(false, 16,
-        new TUniqueId(), new EmbeddedMetastoreClientPool(0, derbyPath));
+        new EmbeddedMetastoreClientPool(0, derbyPath));
     cs.setAuthzManager(new NoopAuthorizationManager());
     cs.setMetastoreEventProcessor(NoOpEventProcessor.getInstance());
     cs.setCatalogOpExecutor(new CatalogOpExecutor(cs,
diff --git a/tests/custom_cluster/test_catalogd_ha.py b/tests/custom_cluster/test_catalogd_ha.py
index a1dcf9da2..3803883f7 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -55,6 +55,20 @@ class TestCatalogdHA(CustomClusterTestSuite):
     _, catalog_service_port = active_catalogd_address.split(":")
     assert(int(catalog_service_port) == catalogd_service.get_catalog_service_port())
 
+  def __run_simple_queries(self):
+    try:
+      self.execute_query_expect_success(
+          self.client, "drop table if exists test_catalogd_ha")
+      self.execute_query_expect_success(
+          self.client, "create table if not exists test_catalogd_ha (id int)")
+      self.execute_query_expect_success(
+          self.client, "insert into table test_catalogd_ha values(1), (2), (3)")
+      self.execute_query_expect_success(
+          self.client, "select count(*) from test_catalogd_ha")
+    finally:
+      self.execute_query_expect_success(
+          self.client, "drop table if exists test_catalogd_ha")
+
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_subscriber_id_as_catalogd_priority=true",
     start_args="--enable_catalogd_ha")
@@ -74,9 +88,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     self.__verify_impalad_active_catalogd_port(0, catalogd_service_1)
     self.__verify_impalad_active_catalogd_port(1, catalogd_service_1)
     self.__verify_impalad_active_catalogd_port(2, catalogd_service_1)
-    # Verify simple query is ran successfully.
-    self.execute_query_expect_success(
-        self.client, "select count(*) from functional.alltypes")
+    # Verify simple queries are ran successfully.
+    self.__run_simple_queries()
 
     # Restart one coordinator. Verify it get active catalogd address from statestore.
     self.cluster.impalads[0].restart()
@@ -103,9 +116,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     self.__verify_impalad_active_catalogd_port(0, catalogd_service_1)
     self.__verify_impalad_active_catalogd_port(1, catalogd_service_1)
     self.__verify_impalad_active_catalogd_port(2, catalogd_service_1)
-    # Verify simple query is ran successfully.
-    self.execute_query_expect_success(
-        self.client, "select count(*) from functional.alltypes")
+    # Verify simple queries are ran successfully.
+    self.__run_simple_queries()
 
   def __test_catalogd_auto_failover(self):
     """Stop active catalogd and verify standby catalogd becomes active.
@@ -140,9 +152,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     self.__verify_impalad_active_catalogd_port(0, catalogd_service_2)
     self.__verify_impalad_active_catalogd_port(1, catalogd_service_2)
     self.__verify_impalad_active_catalogd_port(2, catalogd_service_2)
-    # Verify simple query is ran successfully.
-    self.execute_query_expect_success(
-        self.client, "select count(*) from functional.alltypes")
+    # Verify simple queries are ran successfully.
+    self.__run_simple_queries()
 
     end_count_clear_topic_entries = statestore_service.get_metric_value(
         "statestore.num-clear-topic-entries-requests")
@@ -231,9 +242,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     self.__verify_impalad_active_catalogd_port(1, catalogd_service_2)
     self.__verify_impalad_active_catalogd_port(2, catalogd_service_2)
 
-    # Verify simple query is ran successfully.
-    self.execute_query_expect_success(
-        self.client, "select count(*) from functional.alltypes")
+    # Verify simple queries are ran successfully.
+    self.__run_simple_queries()
 
     end_count_clear_topic_entries = statestore_service.get_metric_value(
         "statestore.num-clear-topic-entries-requests")
@@ -334,9 +344,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     self.__verify_impalad_active_catalogd_port(0, catalogd_service_1)
     self.__verify_impalad_active_catalogd_port(1, catalogd_service_1)
     self.__verify_impalad_active_catalogd_port(2, catalogd_service_1)
-    # Verify simple query is ran successfully.
-    self.execute_query_expect_success(
-        self.client, "select count(*) from functional.alltypes")
+    # Verify simple queries are ran successfully.
+    self.__run_simple_queries()
 
     unexpected_msg = re.compile(
         "unexpected sequence number: [0-9]+, was expecting greater than [0-9]+")