You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brpc.apache.org by ww...@apache.org on 2023/02/23 01:55:14 UTC
[brpc] branch master updated: revert consul naming service (#2129)
This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new d587bd10 revert consul naming service (#2129)
d587bd10 is described below
commit d587bd10d84f397bc099e41a07826ddc7de55ab5
Author: Bright Chen <ch...@foxmail.com>
AuthorDate: Thu Feb 23 09:55:07 2023 +0800
revert consul naming service (#2129)
* revert consul naming service
* revert consul naming service
* revert consul naming service
---
src/brpc/periodic_naming_service.cpp | 6 +++
src/brpc/policy/consul_naming_service.cpp | 63 ++++++++++++++++++++++++-------
src/brpc/policy/consul_naming_service.h | 20 +++++-----
3 files changed, 64 insertions(+), 25 deletions(-)
diff --git a/src/brpc/periodic_naming_service.cpp b/src/brpc/periodic_naming_service.cpp
index ac3c3370..69d403c9 100644
--- a/src/brpc/periodic_naming_service.cpp
+++ b/src/brpc/periodic_naming_service.cpp
@@ -51,6 +51,12 @@ int PeriodicNamingService::RunNamingService(
actions->ResetServers(servers);
}
+ // If `bthread_stop' is called to stop the ns bthread when `brpc::Join‘ is called
+ // in `GetServers' to wait for a rpc to complete. The bthread will be woken up,
+ // reset `TaskMeta::interrupted' and continue to join the rpc. After the rpc is complete,
+ // `bthread_usleep' will not sense the interrupt signal and sleep successfully.
+ // Finally, the ns bthread will never exit. So need to check the stop status of
+ // the bthread here and exit the bthread in time.
if (bthread_stopped(bthread_self())) {
RPC_VLOG << "Quit NamingServiceThread=" << bthread_self();
return 0;
diff --git a/src/brpc/policy/consul_naming_service.cpp b/src/brpc/policy/consul_naming_service.cpp
index 795c180c..70e0a465 100644
--- a/src/brpc/policy/consul_naming_service.cpp
+++ b/src/brpc/policy/consul_naming_service.cpp
@@ -48,8 +48,8 @@ DEFINE_int32(consul_blocking_query_wait_secs, 60,
DEFINE_bool(consul_enable_degrade_to_file_naming_service, false,
"Use local backup file when consul cannot connect");
DEFINE_string(consul_file_naming_service_dir, "",
- "When it degraded to file naming service, the file with name of the "
- "service name will be searched in this dir to use.");
+ "When it degraded to file naming service, the file with name of the "
+ "service name will be searched in this dir to use.");
DEFINE_int32(consul_retry_interval_ms, 500,
"Wait so many milliseconds before retry when error happens");
@@ -62,14 +62,6 @@ std::string RapidjsonValueToString(const BUTIL_RAPIDJSON_NAMESPACE::Value& value
return buffer.GetString();
}
-ConsulNamingService::ConsulNamingService()
- : _backup_file_loaded(false), _consul_connected(false) {}
-
-int ConsulNamingService::GetNamingServiceAccessIntervalMs() const {
- return FLAGS_consul_retry_interval_ms > 0 ? FLAGS_consul_retry_interval_ms :
- PeriodicNamingService::GetNamingServiceAccessIntervalMs();
-}
-
int ConsulNamingService::DegradeToOtherServiceIfNeeded(const char* service_name,
std::vector<ServerNode>* servers) {
if (FLAGS_consul_enable_degrade_to_file_naming_service && !_backup_file_loaded) {
@@ -122,8 +114,8 @@ int ConsulNamingService::GetServers(const char* service_name,
if (index != nullptr) {
if (*index == _consul_index) {
LOG_EVERY_N(INFO, 100) << "There is no service changed for the list of "
- << service_name
- << ", consul_index: " << _consul_index;
+ << service_name
+ << ", consul_index: " << _consul_index;
return -1;
}
} else {
@@ -208,7 +200,7 @@ int ConsulNamingService::GetServers(const char* service_name,
if (servers->empty() && !services.Empty()) {
LOG(ERROR) << "All service about " << service_name
<< " from consul is invalid, refuse to update servers";
- return -1;
+ return -1;
}
RPC_VLOG << "Got " << servers->size()
@@ -217,6 +209,49 @@ int ConsulNamingService::GetServers(const char* service_name,
return 0;
}
+int ConsulNamingService::RunNamingService(const char* service_name,
+ NamingServiceActions* actions) {
+ std::vector<ServerNode> servers;
+ bool ever_reset = false;
+ for (;;) {
+ servers.clear();
+ const int rc = GetServers(service_name, &servers);
+ // If `bthread_stop' is called to stop the ns bthread when `brpc::Join‘ is called
+ // in `GetServers' to wait for a rpc to complete. The bthread will be woken up,
+ // reset `TaskMeta::interrupted' and continue to join the rpc. After the rpc is complete,
+ // `bthread_usleep' will not sense the interrupt signal and sleep successfully.
+ // Finally, the ns bthread will never exit. So need to check the stop status of
+ // the bthread here and exit the bthread in time.
+ if (bthread_stopped(bthread_self())) {
+ RPC_VLOG << "Quit NamingServiceThread=" << bthread_self();
+ return 0;
+ }
+ if (rc == 0) {
+ ever_reset = true;
+ actions->ResetServers(servers);
+ } else {
+ if (!ever_reset) {
+ // ResetServers must be called at first time even if GetServers
+ // failed, to wake up callers to `WaitForFirstBatchOfServers'
+ ever_reset = true;
+ servers.clear();
+ actions->ResetServers(servers);
+ }
+ if (bthread_usleep(std::max(FLAGS_consul_retry_interval_ms, 1) * butil::Time::kMicrosecondsPerMillisecond) < 0) {
+ if (errno == ESTOP) {
+ RPC_VLOG << "Quit NamingServiceThread=" << bthread_self();
+ return 0;
+ }
+ PLOG(FATAL) << "Fail to sleep";
+ return -1;
+ }
+ }
+ }
+ CHECK(false);
+ return -1;
+}
+
+
void ConsulNamingService::Describe(std::ostream& os,
const DescribeOptions&) const {
os << "consul";
@@ -231,4 +266,4 @@ void ConsulNamingService::Destroy() {
}
} // namespace policy
-} // namespace brpc
+} // namespace brpc
\ No newline at end of file
diff --git a/src/brpc/policy/consul_naming_service.h b/src/brpc/policy/consul_naming_service.h
index bf63e658..062fed24 100644
--- a/src/brpc/policy/consul_naming_service.h
+++ b/src/brpc/policy/consul_naming_service.h
@@ -19,7 +19,7 @@
#ifndef BRPC_POLICY_CONSUL_NAMING_SERVICE
#define BRPC_POLICY_CONSUL_NAMING_SERVICE
-#include "brpc/periodic_naming_service.h"
+#include "brpc/naming_service.h"
#include "brpc/channel.h"
@@ -27,15 +27,13 @@ namespace brpc {
class Channel;
namespace policy {
-class ConsulNamingService : public PeriodicNamingService {
-public:
- ConsulNamingService();
-
+class ConsulNamingService : public NamingService {
private:
- int GetServers(const char* service_name,
- std::vector<ServerNode>* servers) override;
+ int RunNamingService(const char* service_name,
+ NamingServiceActions* actions) override;
- int GetNamingServiceAccessIntervalMs() const override;
+ int GetServers(const char* service_name,
+ std::vector<ServerNode>* servers);
void Describe(std::ostream& os, const DescribeOptions&) const override;
@@ -50,12 +48,12 @@ private:
Channel _channel;
std::string _consul_index;
std::string _consul_url;
- bool _backup_file_loaded;
- bool _consul_connected;
+ bool _backup_file_loaded = false;
+ bool _consul_connected = false;
};
} // namespace policy
} // namespace brpc
-#endif //BRPC_POLICY_CONSUL_NAMING_SERVICE
+#endif //BRPC_POLICY_CONSUL_NAMING_SERVICE
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org