You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brpc.apache.org by ww...@apache.org on 2023/02/23 01:55:14 UTC

[brpc] branch master updated: revert consul naming service (#2129)

This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new d587bd10 revert consul naming service (#2129)
d587bd10 is described below

commit d587bd10d84f397bc099e41a07826ddc7de55ab5
Author: Bright Chen <ch...@foxmail.com>
AuthorDate: Thu Feb 23 09:55:07 2023 +0800

    revert consul naming service (#2129)
    
    * revert consul naming service
    
    * revert consul naming service
    
    * revert consul naming service
---
 src/brpc/periodic_naming_service.cpp      |  6 +++
 src/brpc/policy/consul_naming_service.cpp | 63 ++++++++++++++++++++++++-------
 src/brpc/policy/consul_naming_service.h   | 20 +++++-----
 3 files changed, 64 insertions(+), 25 deletions(-)

diff --git a/src/brpc/periodic_naming_service.cpp b/src/brpc/periodic_naming_service.cpp
index ac3c3370..69d403c9 100644
--- a/src/brpc/periodic_naming_service.cpp
+++ b/src/brpc/periodic_naming_service.cpp
@@ -51,6 +51,12 @@ int PeriodicNamingService::RunNamingService(
             actions->ResetServers(servers);
         }
 
+        // If `bthread_stop' is called to stop the ns bthread when `brpc::Join‘ is called
+        // in `GetServers' to wait for a rpc to complete. The bthread will be woken up,
+        // reset `TaskMeta::interrupted' and continue to join the rpc. After the rpc is complete,
+        // `bthread_usleep' will not sense the interrupt signal and sleep successfully.
+        // Finally, the ns bthread will never exit. So need to check the stop status of
+        // the bthread here and exit the bthread in time.
         if (bthread_stopped(bthread_self())) {
             RPC_VLOG << "Quit NamingServiceThread=" << bthread_self();
             return 0;
diff --git a/src/brpc/policy/consul_naming_service.cpp b/src/brpc/policy/consul_naming_service.cpp
index 795c180c..70e0a465 100644
--- a/src/brpc/policy/consul_naming_service.cpp
+++ b/src/brpc/policy/consul_naming_service.cpp
@@ -48,8 +48,8 @@ DEFINE_int32(consul_blocking_query_wait_secs, 60,
 DEFINE_bool(consul_enable_degrade_to_file_naming_service, false,
             "Use local backup file when consul cannot connect");
 DEFINE_string(consul_file_naming_service_dir, "",
-    "When it degraded to file naming service, the file with name of the "
-    "service name will be searched in this dir to use.");
+              "When it degraded to file naming service, the file with name of the "
+              "service name will be searched in this dir to use.");
 DEFINE_int32(consul_retry_interval_ms, 500,
              "Wait so many milliseconds before retry when error happens");
 
@@ -62,14 +62,6 @@ std::string RapidjsonValueToString(const BUTIL_RAPIDJSON_NAMESPACE::Value& value
     return buffer.GetString();
 }
 
-ConsulNamingService::ConsulNamingService()
-    : _backup_file_loaded(false), _consul_connected(false) {}
-
-int ConsulNamingService::GetNamingServiceAccessIntervalMs() const {
-    return FLAGS_consul_retry_interval_ms > 0 ? FLAGS_consul_retry_interval_ms :
-        PeriodicNamingService::GetNamingServiceAccessIntervalMs();
-}
-
 int ConsulNamingService::DegradeToOtherServiceIfNeeded(const char* service_name,
                                                        std::vector<ServerNode>* servers) {
     if (FLAGS_consul_enable_degrade_to_file_naming_service && !_backup_file_loaded) {
@@ -122,8 +114,8 @@ int ConsulNamingService::GetServers(const char* service_name,
     if (index != nullptr) {
         if (*index == _consul_index) {
             LOG_EVERY_N(INFO, 100) << "There is no service changed for the list of "
-                                    << service_name
-                                    << ", consul_index: " << _consul_index;
+                                   << service_name
+                                   << ", consul_index: " << _consul_index;
             return -1;
         }
     } else {
@@ -208,7 +200,7 @@ int ConsulNamingService::GetServers(const char* service_name,
     if (servers->empty() && !services.Empty()) {
         LOG(ERROR) << "All service about " << service_name
                    << " from consul is invalid, refuse to update servers";
-          return -1;
+        return -1;
     }
 
     RPC_VLOG << "Got " << servers->size()
@@ -217,6 +209,49 @@ int ConsulNamingService::GetServers(const char* service_name,
     return 0;
 }
 
+int ConsulNamingService::RunNamingService(const char* service_name,
+                                          NamingServiceActions* actions) {
+    std::vector<ServerNode> servers;
+    bool ever_reset = false;
+    for (;;) {
+        servers.clear();
+        const int rc = GetServers(service_name, &servers);
+        // If `bthread_stop' is called to stop the ns bthread when `brpc::Join‘ is called
+        // in `GetServers' to wait for a rpc to complete. The bthread will be woken up,
+        // reset `TaskMeta::interrupted' and continue to join the rpc. After the rpc is complete,
+        // `bthread_usleep' will not sense the interrupt signal and sleep successfully.
+        // Finally, the ns bthread will never exit. So need to check the stop status of
+        // the bthread here and exit the bthread in time.
+        if (bthread_stopped(bthread_self())) {
+            RPC_VLOG << "Quit NamingServiceThread=" << bthread_self();
+            return 0;
+        }
+        if (rc == 0) {
+            ever_reset = true;
+            actions->ResetServers(servers);
+        } else {
+            if (!ever_reset) {
+                // ResetServers must be called at first time even if GetServers
+                // failed, to wake up callers to `WaitForFirstBatchOfServers'
+                ever_reset = true;
+                servers.clear();
+                actions->ResetServers(servers);
+            }
+            if (bthread_usleep(std::max(FLAGS_consul_retry_interval_ms, 1) * butil::Time::kMicrosecondsPerMillisecond) < 0) {
+                if (errno == ESTOP) {
+                    RPC_VLOG << "Quit NamingServiceThread=" << bthread_self();
+                    return 0;
+                }
+                PLOG(FATAL) << "Fail to sleep";
+                return -1;
+            }
+        }
+    }
+    CHECK(false);
+    return -1;
+}
+
+
 void ConsulNamingService::Describe(std::ostream& os,
                                    const DescribeOptions&) const {
     os << "consul";
@@ -231,4 +266,4 @@ void ConsulNamingService::Destroy() {
 }
 
 }  // namespace policy
-} // namespace brpc
+} // namespace brpc
\ No newline at end of file
diff --git a/src/brpc/policy/consul_naming_service.h b/src/brpc/policy/consul_naming_service.h
index bf63e658..062fed24 100644
--- a/src/brpc/policy/consul_naming_service.h
+++ b/src/brpc/policy/consul_naming_service.h
@@ -19,7 +19,7 @@
 #ifndef  BRPC_POLICY_CONSUL_NAMING_SERVICE
 #define  BRPC_POLICY_CONSUL_NAMING_SERVICE
 
-#include "brpc/periodic_naming_service.h"
+#include "brpc/naming_service.h"
 #include "brpc/channel.h"
 
 
@@ -27,15 +27,13 @@ namespace brpc {
 class Channel;
 namespace policy {
 
-class ConsulNamingService : public PeriodicNamingService {
-public:
-    ConsulNamingService();
-
+class ConsulNamingService : public NamingService {
 private:
-    int GetServers(const char* service_name,
-                   std::vector<ServerNode>* servers) override;
+    int RunNamingService(const char* service_name,
+                         NamingServiceActions* actions) override;
 
-    int GetNamingServiceAccessIntervalMs() const override;
+    int GetServers(const char* service_name,
+                   std::vector<ServerNode>* servers);
 
     void Describe(std::ostream& os, const DescribeOptions&) const override;
 
@@ -50,12 +48,12 @@ private:
     Channel _channel;
     std::string _consul_index;
     std::string _consul_url;
-    bool _backup_file_loaded;
-    bool _consul_connected;
+    bool _backup_file_loaded = false;
+    bool _consul_connected = false;
 };
 
 }  // namespace policy
 } // namespace brpc
 
 
-#endif  //BRPC_POLICY_CONSUL_NAMING_SERVICE
+#endif  //BRPC_POLICY_CONSUL_NAMING_SERVICE
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org