You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2023/07/22 03:55:18 UTC

[impala] branch master updated: IMPALA-12304: Fix the sequence number issue for update_catalogd RPC

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new af3f56e6d IMPALA-12304: Fix the sequence number issue for update_catalogd RPC
af3f56e6d is described below

commit af3f56e6d1605a56f7bd02b0af35be980a7e4c63
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Fri Jul 21 12:02:56 2023 -0700

    IMPALA-12304: Fix the sequence number issue for update_catalogd RPC
    
    Subscriber will re-register to statestore when statestore is restarted.
    When the statestore is restarted, its sending sequence number for
    update_catalogd RPC is reset. The subscribers need to reset their last
    receiving sequence number of update_catalogd RPC when they successfully
    re-register with statestore. Otherwise, subscribers may miss some RPCs
    after statestore is restarted.
    
    Could find related error messages in catalogd log file when run
    test_catalogd_ha.py::TestCatalogdHA::test_restart_statestore.
    Verified that no such error messages in catalogd log after the fix.
    
    Made a small optimization for statestore not to wake up the thread
    for update_catalogd RPC if there is no change for elected active
    catalogd and there is no RPC failure in last round.
    
    Testing:
     - Passed the core test.
    
    Change-Id: I21c1e6f6d8b047a37c7db2b7995b7ff74e317226
    Reviewed-on: http://gerrit.cloudera.org:8080/20247
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
---
 be/src/statestore/statestore-subscriber.cc | 3 +++
 be/src/statestore/statestore.cc            | 6 +++++-
 tests/custom_cluster/test_catalogd_ha.py   | 5 +++++
 3 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index bbfe56564..480966489 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -465,6 +465,9 @@ Status StatestoreSubscriber::StatestoreStub::Register(bool* has_active_catalogd,
         *active_catalogd_registration = response.catalogd_registration;
       }
     }
+    // Reset last received sequence number of update_catalogd RPC since statestore
+    // could be restarted.
+    last_update_catalogd_seq_ = 0;
   }
   heartbeat_interval_timer_.Start();
   return status;
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 76cb3ec50..945d3a088 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -1236,7 +1236,11 @@ void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
   while (1) {
     {
       unique_lock<mutex> l(*catalog_manager_.GetLock());
-      update_catalod_cv_.WaitFor(l, timeout_us);
+      if (rpc_receivers.empty()) {
+        update_catalod_cv_.Wait(l);
+      } else {
+        update_catalod_cv_.WaitFor(l, timeout_us);
+      }
     }
     SendUpdateCatalogdNotification(&last_sending_sequence, rpc_receivers);
   }
diff --git a/tests/custom_cluster/test_catalogd_ha.py b/tests/custom_cluster/test_catalogd_ha.py
index 82f14202e..a1dcf9da2 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -17,6 +17,7 @@
 
 from __future__ import absolute_import, division, print_function
 import logging
+import re
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.environ import build_flavor_timeout
@@ -336,3 +337,7 @@ class TestCatalogdHA(CustomClusterTestSuite):
     # Verify simple query is ran successfully.
     self.execute_query_expect_success(
         self.client, "select count(*) from functional.alltypes")
+
+    unexpected_msg = re.compile(
+        "unexpected sequence number: [0-9]+, was expecting greater than [0-9]+")
+    self.assert_catalogd_log_contains("INFO", unexpected_msg, expected_count=0)