You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/07/06 06:27:59 UTC
[dubbo] 02/07: [3.0] Fixed the problem that the service was offline for a long time without re-registration (#10200)
This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit 2dab2d7fdb69a7a391b377eb02fb8d92424819fb
Author: 一个不知名的Java靓仔 <cl...@gmail.com>
AuthorDate: Mon Jul 4 19:49:29 2022 +0800
[3.0] Fixed the problem that the service was offline for a long time without re-registration (#10200)
* 修复因网络抖动导致服务长时间下线没有重新注册的问题
* lazy init CuratorWatcher executor and destroyed on shutdown
* add lock on close
---
.../zookeeper/curator/CuratorZookeeperClient.java | 40 +++++++++++++++++++++-
1 file changed, 39 insertions(+), 1 deletion(-)
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java
index 975e10fd19..d1bbb5e960 100644
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.configcenter.ConfigItem;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.zookeeper.AbstractZookeeperClient;
import org.apache.dubbo.remoting.zookeeper.ChildListener;
@@ -49,6 +50,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.apache.dubbo.common.constants.CommonConstants.SESSION_KEY;
@@ -84,6 +87,7 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorZooke
if (!connected) {
throw new IllegalStateException("zookeeper not connected");
}
+ CuratorWatcherImpl.closed = false;
} catch (Exception e) {
close();
throw new IllegalStateException(e.getMessage(), e);
@@ -256,6 +260,13 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorZooke
public void doClose() {
super.close();
client.close();
+ CuratorWatcherImpl.closed = true;
+ synchronized (CuratorWatcherImpl.class) {
+ if (CuratorWatcherImpl.CURATOR_WATCHER_EXECUTOR_SERVICE != null) {
+ CuratorWatcherImpl.CURATOR_WATCHER_EXECUTOR_SERVICE.shutdown();
+ CuratorWatcherImpl.CURATOR_WATCHER_EXECUTOR_SERVICE = null;
+ }
+ }
}
@Override
@@ -354,10 +365,24 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorZooke
static class CuratorWatcherImpl implements CuratorWatcher {
+ private static volatile ExecutorService CURATOR_WATCHER_EXECUTOR_SERVICE;
+
+ private static volatile boolean closed = false;
+
private CuratorFramework client;
private volatile ChildListener childListener;
private String path;
+ private static void initExecutorIfNecessary() {
+ if (!closed && CURATOR_WATCHER_EXECUTOR_SERVICE == null) {
+ synchronized (CuratorWatcherImpl.class) {
+ if (!closed && CURATOR_WATCHER_EXECUTOR_SERVICE == null) {
+ CURATOR_WATCHER_EXECUTOR_SERVICE = Executors.newSingleThreadExecutor(new NamedThreadFactory("Dubbo-CuratorWatcher"));
+ }
+ }
+ }
+ }
+
public CuratorWatcherImpl(CuratorFramework client, ChildListener listener, String path) {
this.client = client;
this.childListener = listener;
@@ -380,7 +405,20 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorZooke
}
if (childListener != null) {
- childListener.childChanged(path, client.getChildren().usingWatcher(this).forPath(path));
+ Runnable task = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ childListener.childChanged(path, client.getChildren().usingWatcher(CuratorWatcherImpl.this).forPath(path));
+ } catch (Exception e) {
+ logger.warn("client get children error", e);
+ }
+ }
+ };
+ initExecutorIfNecessary();
+ if (!closed && CURATOR_WATCHER_EXECUTOR_SERVICE != null) {
+ CURATOR_WATCHER_EXECUTOR_SERVICE.execute(task);
+ }
}
}
}