You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2023/07/29 01:53:32 UTC

[impala] 01/02: IMPALA-12323: DDL hang with SYNC_DDL=1 when CatalogD HA enabled

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6ecb8bfcf4d9e68e0091aea64540e0fb64aeb3e0
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Fri Jul 28 09:33:17 2023 -0700

    IMPALA-12323: DDL hang with SYNC_DDL=1 when CatalogD HA enabled
    
    When CatalogD HA is enabled, standby catalogd does not receive catalog
    topic updates from statestore and does not apply catalog updates from
    the active catalogd. Its min topic version is not changed.
    Function Statestore::GetMinSubscriberTopicVersion() loops through all
    subscribers to find min topic version. Standby catalogd causes
    min topic version not increased, and hence Impala server waits
    indefinitely in ImpalaServer::WaitForCatalogUpdateTopicPropagation().
    
    This patch fixed the issue by skipping standby catalogd when finding min
    topic version in Statestore::GetMinSubscriberTopicVersion().
    
    Testing:
     - Added unit-test code for CatalogD HA to run DDL with SYNC_DDL as 1.
       Verified that test cases hang without fix, and test cases were passed
       after fix.
     - Passed test_catalogd_ha.py.
    
    Change-Id: Ie559c711078f32171dfb2d2e2fda54773c0927c3
    Reviewed-on: http://gerrit.cloudera.org:8080/20280
    Reviewed-by: Andrew Sherman <as...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/statestore/statestore.cc          | 6 ++++++
 tests/custom_cluster/test_catalogd_ha.py | 8 +++++++-
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 945d3a088..0ec515f73 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -986,6 +986,12 @@ Statestore::TopicEntry::Version Statestore::GetMinSubscriberTopicVersion(
   bool found = false;
   // Find the minimum version processed for this topic across all topic subscribers.
   for (const SubscriberMap::value_type& subscriber: subscribers_) {
+    if (FLAGS_enable_catalogd_ha && subscriber.second->IsCatalogd()
+        && !catalog_manager_.IsActiveCatalogd(subscriber.second->id())) {
+      // Skip inactive catalogd since it does not apply catalog updates from the active
+      // catalogd.
+      continue;
+    }
     auto subscribed_topics = subscriber.second->GetTopicsMapForId(topic_id);
     if (subscribed_topics->find(topic_id) != subscribed_topics->end()) {
       found = true;
diff --git a/tests/custom_cluster/test_catalogd_ha.py b/tests/custom_cluster/test_catalogd_ha.py
index 3803883f7..97b666f14 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -55,8 +55,10 @@ class TestCatalogdHA(CustomClusterTestSuite):
     _, catalog_service_port = active_catalogd_address.split(":")
     assert(int(catalog_service_port) == catalogd_service.get_catalog_service_port())
 
-  def __run_simple_queries(self):
+  def __run_simple_queries(self, sync_ddl=False):
     try:
+      if sync_ddl:
+        self.execute_query_expect_success(self.client, "set SYNC_DDL=1")
       self.execute_query_expect_success(
           self.client, "drop table if exists test_catalogd_ha")
       self.execute_query_expect_success(
@@ -90,6 +92,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     self.__verify_impalad_active_catalogd_port(2, catalogd_service_1)
     # Verify simple queries are ran successfully.
     self.__run_simple_queries()
+    # Verify simple queries with sync_ddl as 1.
+    self.__run_simple_queries(sync_ddl=True)
 
     # Restart one coordinator. Verify it get active catalogd address from statestore.
     self.cluster.impalads[0].restart()
@@ -154,6 +158,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     self.__verify_impalad_active_catalogd_port(2, catalogd_service_2)
     # Verify simple queries are ran successfully.
     self.__run_simple_queries()
+    # Verify simple queries with sync_ddl as 1.
+    self.__run_simple_queries(sync_ddl=True)
 
     end_count_clear_topic_entries = statestore_service.get_metric_value(
         "statestore.num-clear-topic-entries-requests")