You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/07/06 06:27:59 UTC

[dubbo] 02/07: [3.0] Fixed the problem that the service was offline for a long time without re-registration (#10200)

This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit 2dab2d7fdb69a7a391b377eb02fb8d92424819fb
Author: 一个不知名的Java靓仔 <cl...@gmail.com>
AuthorDate: Mon Jul 4 19:49:29 2022 +0800

    [3.0] Fixed the problem that the service was offline for a long time without re-registration (#10200)
    
    * 修复因网络抖动导致服务长时间下线没有重新注册的问题
    
    * lazy init CuratorWatcher executor and destroyed on shutdown
    
    * add lock on close
---
 .../zookeeper/curator/CuratorZookeeperClient.java  | 40 +++++++++++++++++++++-
 1 file changed, 39 insertions(+), 1 deletion(-)

diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java
index 975e10fd19..d1bbb5e960 100644
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.config.configcenter.ConfigItem;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.remoting.zookeeper.AbstractZookeeperClient;
 import org.apache.dubbo.remoting.zookeeper.ChildListener;
@@ -49,6 +50,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.dubbo.common.constants.CommonConstants.SESSION_KEY;
@@ -84,6 +87,7 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorZooke
             if (!connected) {
                 throw new IllegalStateException("zookeeper not connected");
             }
+            CuratorWatcherImpl.closed = false;
         } catch (Exception e) {
             close();
             throw new IllegalStateException(e.getMessage(), e);
@@ -256,6 +260,13 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorZooke
     public void doClose() {
         super.close();
         client.close();
+        CuratorWatcherImpl.closed = true;
+        synchronized (CuratorWatcherImpl.class) {
+            if (CuratorWatcherImpl.CURATOR_WATCHER_EXECUTOR_SERVICE != null) {
+                CuratorWatcherImpl.CURATOR_WATCHER_EXECUTOR_SERVICE.shutdown();
+                CuratorWatcherImpl.CURATOR_WATCHER_EXECUTOR_SERVICE = null;
+            }
+        }
     }
 
     @Override
@@ -354,10 +365,24 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorZooke
 
     static class CuratorWatcherImpl implements CuratorWatcher {
 
+        private static volatile ExecutorService CURATOR_WATCHER_EXECUTOR_SERVICE;
+
+        private static volatile boolean closed = false;
+
         private CuratorFramework client;
         private volatile ChildListener childListener;
         private String path;
 
+        private static void initExecutorIfNecessary() {
+            if (!closed && CURATOR_WATCHER_EXECUTOR_SERVICE == null) {
+                synchronized (CuratorWatcherImpl.class) {
+                    if (!closed && CURATOR_WATCHER_EXECUTOR_SERVICE == null) {
+                        CURATOR_WATCHER_EXECUTOR_SERVICE = Executors.newSingleThreadExecutor(new NamedThreadFactory("Dubbo-CuratorWatcher"));
+                    }
+                }
+            }
+        }
+
         public CuratorWatcherImpl(CuratorFramework client, ChildListener listener, String path) {
             this.client = client;
             this.childListener = listener;
@@ -380,7 +405,20 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorZooke
             }
 
             if (childListener != null) {
-                childListener.childChanged(path, client.getChildren().usingWatcher(this).forPath(path));
+                Runnable task = new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            childListener.childChanged(path, client.getChildren().usingWatcher(CuratorWatcherImpl.this).forPath(path));
+                        } catch (Exception e) {
+                            logger.warn("client get children error", e);
+                        }
+                    }
+                };
+                initExecutorIfNecessary();
+                if (!closed && CURATOR_WATCHER_EXECUTOR_SERVICE != null) {
+                    CURATOR_WATCHER_EXECUTOR_SERVICE.execute(task);
+                }
             }
         }
     }