You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brpc.apache.org by se...@apache.org on 2023/02/15 02:01:11 UTC
[brpc] branch master updated: Periodic ns quit (#2123)
This is an automated email from the ASF dual-hosted git repository.
serverglen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 3d4db043 Periodic ns quit (#2123)
3d4db043 is described below
commit 3d4db04303736903f5345be2187a7842cf4a11f7
Author: Bright Chen <ch...@foxmail.com>
AuthorDate: Wed Feb 15 10:01:00 2023 +0800
Periodic ns quit (#2123)
* Fix periodic ns thread can not quit
* Abstract ConsulNamingService into a PeriodicNamingService
* Quit periodic ns when bthread is stopped
---
src/brpc/periodic_naming_service.cpp | 6 +++-
src/brpc/periodic_naming_service.h | 2 +-
src/brpc/policy/consul_naming_service.cpp | 46 ++++++-------------------------
src/brpc/policy/consul_naming_service.h | 18 ++++++------
4 files changed, 24 insertions(+), 48 deletions(-)
diff --git a/src/brpc/periodic_naming_service.cpp b/src/brpc/periodic_naming_service.cpp
index 5e109779..ac3c3370 100644
--- a/src/brpc/periodic_naming_service.cpp
+++ b/src/brpc/periodic_naming_service.cpp
@@ -37,7 +37,7 @@ int PeriodicNamingService::RunNamingService(
const char* service_name, NamingServiceActions* actions) {
std::vector<ServerNode> servers;
bool ever_reset = false;
- for (;;) {
+ while (true) {
servers.clear();
const int rc = GetServers(service_name, &servers);
if (rc == 0) {
@@ -51,6 +51,10 @@ int PeriodicNamingService::RunNamingService(
actions->ResetServers(servers);
}
+ if (bthread_stopped(bthread_self())) {
+ RPC_VLOG << "Quit NamingServiceThread=" << bthread_self();
+ return 0;
+ }
if (bthread_usleep(GetNamingServiceAccessIntervalMs() * 1000UL) < 0) {
if (errno == ESTOP) {
RPC_VLOG << "Quit NamingServiceThread=" << bthread_self();
diff --git a/src/brpc/periodic_naming_service.h b/src/brpc/periodic_naming_service.h
index 8216ddfd..7e51114f 100644
--- a/src/brpc/periodic_naming_service.h
+++ b/src/brpc/periodic_naming_service.h
@@ -32,7 +32,7 @@ protected:
virtual int GetNamingServiceAccessIntervalMs() const;
int RunNamingService(const char* service_name,
- NamingServiceActions* actions);
+ NamingServiceActions* actions) override;
};
} // namespace brpc
diff --git a/src/brpc/policy/consul_naming_service.cpp b/src/brpc/policy/consul_naming_service.cpp
index f4b2345e..795c180c 100644
--- a/src/brpc/policy/consul_naming_service.cpp
+++ b/src/brpc/policy/consul_naming_service.cpp
@@ -62,6 +62,14 @@ std::string RapidjsonValueToString(const BUTIL_RAPIDJSON_NAMESPACE::Value& value
return buffer.GetString();
}
+ConsulNamingService::ConsulNamingService()
+ : _backup_file_loaded(false), _consul_connected(false) {}
+
+int ConsulNamingService::GetNamingServiceAccessIntervalMs() const {
+ return FLAGS_consul_retry_interval_ms > 0 ? FLAGS_consul_retry_interval_ms :
+ PeriodicNamingService::GetNamingServiceAccessIntervalMs();
+}
+
int ConsulNamingService::DegradeToOtherServiceIfNeeded(const char* service_name,
std::vector<ServerNode>* servers) {
if (FLAGS_consul_enable_degrade_to_file_naming_service && !_backup_file_loaded) {
@@ -209,47 +217,9 @@ int ConsulNamingService::GetServers(const char* service_name,
return 0;
}
-int ConsulNamingService::RunNamingService(const char* service_name,
- NamingServiceActions* actions) {
- std::vector<ServerNode> servers;
- bool ever_reset = false;
- for (;;) {
- servers.clear();
- const int rc = GetServers(service_name, &servers);
- if (bthread_stopped(bthread_self())) {
- RPC_VLOG << "Quit NamingServiceThread=" << bthread_self();
- return 0;
- }
- if (rc == 0) {
- ever_reset = true;
- actions->ResetServers(servers);
- } else {
- if (!ever_reset) {
- // ResetServers must be called at first time even if GetServers
- // failed, to wake up callers to `WaitForFirstBatchOfServers'
- ever_reset = true;
- servers.clear();
- actions->ResetServers(servers);
- }
- if (bthread_usleep(std::max(FLAGS_consul_retry_interval_ms, 1) * butil::Time::kMicrosecondsPerMillisecond) < 0) {
- if (errno == ESTOP) {
- RPC_VLOG << "Quit NamingServiceThread=" << bthread_self();
- return 0;
- }
- PLOG(FATAL) << "Fail to sleep";
- return -1;
- }
- }
- }
- CHECK(false);
- return -1;
-}
-
-
void ConsulNamingService::Describe(std::ostream& os,
const DescribeOptions&) const {
os << "consul";
- return;
}
NamingService* ConsulNamingService::New() const {
diff --git a/src/brpc/policy/consul_naming_service.h b/src/brpc/policy/consul_naming_service.h
index 93bee068..bf63e658 100644
--- a/src/brpc/policy/consul_naming_service.h
+++ b/src/brpc/policy/consul_naming_service.h
@@ -19,7 +19,7 @@
#ifndef BRPC_POLICY_CONSUL_NAMING_SERVICE
#define BRPC_POLICY_CONSUL_NAMING_SERVICE
-#include "brpc/naming_service.h"
+#include "brpc/periodic_naming_service.h"
#include "brpc/channel.h"
@@ -27,13 +27,15 @@ namespace brpc {
class Channel;
namespace policy {
-class ConsulNamingService : public NamingService {
-private:
- int RunNamingService(const char* service_name,
- NamingServiceActions* actions) override;
+class ConsulNamingService : public PeriodicNamingService {
+public:
+ ConsulNamingService();
+private:
int GetServers(const char* service_name,
- std::vector<ServerNode>* servers);
+ std::vector<ServerNode>* servers) override;
+
+ int GetNamingServiceAccessIntervalMs() const override;
void Describe(std::ostream& os, const DescribeOptions&) const override;
@@ -48,8 +50,8 @@ private:
Channel _channel;
std::string _consul_index;
std::string _consul_url;
- bool _backup_file_loaded = false;
- bool _consul_connected = false;
+ bool _backup_file_loaded;
+ bool _consul_connected;
};
} // namespace policy
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org