You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2023/07/22 03:55:18 UTC
[impala] branch master updated: IMPALA-12304: Fix the sequence number issue for update_catalogd RPC
This is an automated email from the ASF dual-hosted git repository.
wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new af3f56e6d IMPALA-12304: Fix the sequence number issue for update_catalogd RPC
af3f56e6d is described below
commit af3f56e6d1605a56f7bd02b0af35be980a7e4c63
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Fri Jul 21 12:02:56 2023 -0700
IMPALA-12304: Fix the sequence number issue for update_catalogd RPC
Subscriber will re-register to statestore when statestore is restarted.
When the statestore is restarted, its sending sequence number for
update_catalogd RPC is reset. The subscribers need to reset their last
receiving sequence number of update_catalogd RPC when they successfully
re-register with statestore. Otherwise, subscribers may miss some RPCs
after statestore is restarted.
Could find related error messages in catalogd log file when run
test_catalogd_ha.py::TestCatalogdHA::test_restart_statestore.
Verified that no such error messages in catalogd log after the fix.
Made a small optimization for statestore not to wake up the thread
for update_catalogd RPC if there is no change for elected active
catalogd and there is no RPC failure in last round.
Testing:
- Passed the core test.
Change-Id: I21c1e6f6d8b047a37c7db2b7995b7ff74e317226
Reviewed-on: http://gerrit.cloudera.org:8080/20247
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
---
be/src/statestore/statestore-subscriber.cc | 3 +++
be/src/statestore/statestore.cc | 6 +++++-
tests/custom_cluster/test_catalogd_ha.py | 5 +++++
3 files changed, 13 insertions(+), 1 deletion(-)
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index bbfe56564..480966489 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -465,6 +465,9 @@ Status StatestoreSubscriber::StatestoreStub::Register(bool* has_active_catalogd,
*active_catalogd_registration = response.catalogd_registration;
}
}
+ // Reset last received sequence number of update_catalogd RPC since statestore
+ // could be restarted.
+ last_update_catalogd_seq_ = 0;
}
heartbeat_interval_timer_.Start();
return status;
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 76cb3ec50..945d3a088 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -1236,7 +1236,11 @@ void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
while (1) {
{
unique_lock<mutex> l(*catalog_manager_.GetLock());
- update_catalod_cv_.WaitFor(l, timeout_us);
+ if (rpc_receivers.empty()) {
+ update_catalod_cv_.Wait(l);
+ } else {
+ update_catalod_cv_.WaitFor(l, timeout_us);
+ }
}
SendUpdateCatalogdNotification(&last_sending_sequence, rpc_receivers);
}
diff --git a/tests/custom_cluster/test_catalogd_ha.py b/tests/custom_cluster/test_catalogd_ha.py
index 82f14202e..a1dcf9da2 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -17,6 +17,7 @@
from __future__ import absolute_import, division, print_function
import logging
+import re
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.environ import build_flavor_timeout
@@ -336,3 +337,7 @@ class TestCatalogdHA(CustomClusterTestSuite):
# Verify simple query is ran successfully.
self.execute_query_expect_success(
self.client, "select count(*) from functional.alltypes")
+
+ unexpected_msg = re.compile(
+ "unexpected sequence number: [0-9]+, was expecting greater than [0-9]+")
+ self.assert_catalogd_log_contains("INFO", unexpected_msg, expected_count=0)