You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brpc.apache.org by se...@apache.org on 2023/02/15 02:01:11 UTC

[brpc] branch master updated: Periodic ns quit (#2123)

This is an automated email from the ASF dual-hosted git repository.

serverglen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d4db043 Periodic ns quit (#2123)
3d4db043 is described below

commit 3d4db04303736903f5345be2187a7842cf4a11f7
Author: Bright Chen <ch...@foxmail.com>
AuthorDate: Wed Feb 15 10:01:00 2023 +0800

    Periodic ns quit (#2123)
    
    * Fix periodic ns thread can not quit
    
    * Abstract ConsulNamingService into a PeriodicNamingService
    
    * Quit periodic ns when bthread is stopped
---
 src/brpc/periodic_naming_service.cpp      |  6 +++-
 src/brpc/periodic_naming_service.h        |  2 +-
 src/brpc/policy/consul_naming_service.cpp | 46 ++++++-------------------------
 src/brpc/policy/consul_naming_service.h   | 18 ++++++------
 4 files changed, 24 insertions(+), 48 deletions(-)

diff --git a/src/brpc/periodic_naming_service.cpp b/src/brpc/periodic_naming_service.cpp
index 5e109779..ac3c3370 100644
--- a/src/brpc/periodic_naming_service.cpp
+++ b/src/brpc/periodic_naming_service.cpp
@@ -37,7 +37,7 @@ int PeriodicNamingService::RunNamingService(
     const char* service_name, NamingServiceActions* actions) {
     std::vector<ServerNode> servers;
     bool ever_reset = false;
-    for (;;) {
+    while (true) {
         servers.clear();
         const int rc = GetServers(service_name, &servers);
         if (rc == 0) {
@@ -51,6 +51,10 @@ int PeriodicNamingService::RunNamingService(
             actions->ResetServers(servers);
         }
 
+        if (bthread_stopped(bthread_self())) {
+            RPC_VLOG << "Quit NamingServiceThread=" << bthread_self();
+            return 0;
+        }
         if (bthread_usleep(GetNamingServiceAccessIntervalMs() * 1000UL) < 0) {
             if (errno == ESTOP) {
                 RPC_VLOG << "Quit NamingServiceThread=" << bthread_self();
diff --git a/src/brpc/periodic_naming_service.h b/src/brpc/periodic_naming_service.h
index 8216ddfd..7e51114f 100644
--- a/src/brpc/periodic_naming_service.h
+++ b/src/brpc/periodic_naming_service.h
@@ -32,7 +32,7 @@ protected:
     virtual int GetNamingServiceAccessIntervalMs() const;
 
     int RunNamingService(const char* service_name,
-                         NamingServiceActions* actions);
+                         NamingServiceActions* actions) override;
 };
 
 } // namespace brpc
diff --git a/src/brpc/policy/consul_naming_service.cpp b/src/brpc/policy/consul_naming_service.cpp
index f4b2345e..795c180c 100644
--- a/src/brpc/policy/consul_naming_service.cpp
+++ b/src/brpc/policy/consul_naming_service.cpp
@@ -62,6 +62,14 @@ std::string RapidjsonValueToString(const BUTIL_RAPIDJSON_NAMESPACE::Value& value
     return buffer.GetString();
 }
 
+ConsulNamingService::ConsulNamingService()
+    : _backup_file_loaded(false), _consul_connected(false) {}
+
+int ConsulNamingService::GetNamingServiceAccessIntervalMs() const {
+    return FLAGS_consul_retry_interval_ms > 0 ? FLAGS_consul_retry_interval_ms :
+        PeriodicNamingService::GetNamingServiceAccessIntervalMs();
+}
+
 int ConsulNamingService::DegradeToOtherServiceIfNeeded(const char* service_name,
                                                        std::vector<ServerNode>* servers) {
     if (FLAGS_consul_enable_degrade_to_file_naming_service && !_backup_file_loaded) {
@@ -209,47 +217,9 @@ int ConsulNamingService::GetServers(const char* service_name,
     return 0;
 }
 
-int ConsulNamingService::RunNamingService(const char* service_name,
-                                          NamingServiceActions* actions) {
-    std::vector<ServerNode> servers;
-    bool ever_reset = false;
-    for (;;) {
-        servers.clear();
-        const int rc = GetServers(service_name, &servers);
-        if (bthread_stopped(bthread_self())) {
-            RPC_VLOG << "Quit NamingServiceThread=" << bthread_self();
-            return 0;
-        }
-        if (rc == 0) {
-            ever_reset = true;
-            actions->ResetServers(servers);
-        } else {
-            if (!ever_reset) {
-                // ResetServers must be called at first time even if GetServers
-                // failed, to wake up callers to `WaitForFirstBatchOfServers'
-                ever_reset = true;
-                servers.clear();
-                actions->ResetServers(servers);
-            }
-            if (bthread_usleep(std::max(FLAGS_consul_retry_interval_ms, 1) * butil::Time::kMicrosecondsPerMillisecond) < 0) {
-                if (errno == ESTOP) {
-                    RPC_VLOG << "Quit NamingServiceThread=" << bthread_self();
-                    return 0;
-                }
-                PLOG(FATAL) << "Fail to sleep";
-                return -1;
-            }
-        }
-    }
-    CHECK(false);
-    return -1;
-}
-
-
 void ConsulNamingService::Describe(std::ostream& os,
                                    const DescribeOptions&) const {
     os << "consul";
-    return;
 }
 
 NamingService* ConsulNamingService::New() const {
diff --git a/src/brpc/policy/consul_naming_service.h b/src/brpc/policy/consul_naming_service.h
index 93bee068..bf63e658 100644
--- a/src/brpc/policy/consul_naming_service.h
+++ b/src/brpc/policy/consul_naming_service.h
@@ -19,7 +19,7 @@
 #ifndef  BRPC_POLICY_CONSUL_NAMING_SERVICE
 #define  BRPC_POLICY_CONSUL_NAMING_SERVICE
 
-#include "brpc/naming_service.h"
+#include "brpc/periodic_naming_service.h"
 #include "brpc/channel.h"
 
 
@@ -27,13 +27,15 @@ namespace brpc {
 class Channel;
 namespace policy {
 
-class ConsulNamingService : public NamingService {
-private:
-    int RunNamingService(const char* service_name,
-                         NamingServiceActions* actions) override;
+class ConsulNamingService : public PeriodicNamingService {
+public:
+    ConsulNamingService();
 
+private:
     int GetServers(const char* service_name,
-                   std::vector<ServerNode>* servers);
+                   std::vector<ServerNode>* servers) override;
+
+    int GetNamingServiceAccessIntervalMs() const override;
 
     void Describe(std::ostream& os, const DescribeOptions&) const override;
 
@@ -48,8 +50,8 @@ private:
     Channel _channel;
     std::string _consul_index;
     std::string _consul_url;
-    bool _backup_file_loaded = false;
-    bool _consul_connected = false;
+    bool _backup_file_loaded;
+    bool _consul_connected;
 };
 
 }  // namespace policy


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org