You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/06/24 07:14:19 UTC

[dubbo] branch 3.0 updated: Opt Migration & Fix reSubscribe for ServiceDiscovery (#8129)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 82f5665  Opt Migration & Fix reSubscribe for ServiceDiscovery (#8129)
82f5665 is described below

commit 82f5665684be51b03e08163a6daa4f5b9d24354d
Author: Albumen Kevin <jh...@gmail.com>
AuthorDate: Thu Jun 24 15:14:03 2021 +0800

    Opt Migration & Fix reSubscribe for ServiceDiscovery (#8129)
---
 .../registry/client/ServiceDiscoveryRegistry.java  |  3 +
 .../metadata/MetadataServiceNameMapping.java       |  5 ++
 .../client/migration/MigrationRuleListener.java    | 73 +++++++++++++++++++---
 .../client/migration/model/MigrationRule.java      | 52 ++++++++++-----
 4 files changed, 107 insertions(+), 26 deletions(-)

diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
index 4ca66ad..a28c0c5 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
@@ -259,10 +259,13 @@ public class ServiceDiscoveryRegistry implements Registry {
             String serviceNamesKey = toStringKeys(serviceNames);
             ServiceInstancesChangedListener instancesChangedListener = serviceListeners.get(serviceNamesKey);
             if (instancesChangedListener != null) {
+                String listenerId = createListenerId(url, instancesChangedListener);
+
                 instancesChangedListener.removeListener(protocolServiceKey);
                 if (!instancesChangedListener.hasListeners()) {
                     serviceListeners.remove(serviceNamesKey);
                 }
+                registeredListeners.remove(listenerId);
             }
         }
     }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataServiceNameMapping.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataServiceNameMapping.java
index 4ad262c..ad4dde0 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataServiceNameMapping.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataServiceNameMapping.java
@@ -55,6 +55,11 @@ public class MetadataServiceNameMapping extends AbstractServiceNameMapping {
             String registryCluster = getRegistryCluster(url);
             MetadataReport metadataReport = MetadataReportInstance.getMetadataReport(registryCluster);
 
+            if (metadataReport.registerServiceAppMapping(serviceInterface, getName(), url)) {
+                // MetadataReport support directly register service-app mapping
+                return;
+            }
+
             int currentRetryTimes = 1;
             boolean success;
             String newConfigContent = getName();
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
index 56f33af..f1fea06 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
@@ -35,9 +35,14 @@ import org.apache.dubbo.rpc.cluster.ClusterInvoker;
 import org.apache.dubbo.rpc.model.ApplicationModel;
 
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.dubbo.common.constants.RegistryConstants.INIT;
 
@@ -49,6 +54,11 @@ public class MigrationRuleListener implements RegistryProtocolListener, Configur
     private final String RULE_KEY = ApplicationModel.getName() + ".migration";
 
     private final Map<MigrationInvoker, MigrationRuleHandler> handlers = new ConcurrentHashMap<>();
+    private final LinkedBlockingQueue<String> ruleQueue = new LinkedBlockingQueue<>();
+
+    private final AtomicBoolean executorSubmit = new AtomicBoolean(false);
+    private final ExecutorService ruleManageExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("Dubbo-Migration-Listener"));
+
     private DynamicConfiguration configuration;
 
     private volatile String rawRule;
@@ -104,19 +114,64 @@ public class MigrationRuleListener implements RegistryProtocolListener, Configur
         String rawRule = event.getContent();
         if (StringUtils.isEmpty(rawRule)) {
             // fail back to startup status
-            setRawRule(INIT);
+            rawRule = INIT;
             //logger.warn("Received empty migration rule, will ignore.");
-            return;
+        }
+        try {
+            ruleQueue.put(rawRule);
+        } catch (InterruptedException e) {
+            logger.error("Put rawRule to rule management queue failed. rawRule: " + rawRule, e);
         }
 
-        logger.info("Using the following migration rule to migrate:");
-        logger.info(rawRule);
-
-        setRawRule(rawRule);
-
-        if (CollectionUtils.isNotEmptyMap(handlers)) {
-            handlers.forEach((_key, handler) -> handler.doMigrate(rule));
+        if (executorSubmit.compareAndSet(false, true)) {
+            ruleManageExecutor.submit(() -> {
+                while (true) {
+                    String rule = "";
+                    try {
+                        rule = ruleQueue.take();
+                        if (StringUtils.isEmpty(rule)) {
+                            Thread.sleep(1000);
+                        }
+                    } catch (InterruptedException e) {
+                        logger.error("Poll Rule from config center failed.", e);
+                    }
+                    if (StringUtils.isEmpty(rule)) {
+                        continue;
+                    }
+                    if (Objects.equals(this.rawRule, rule)) {
+                        logger.info("Ignore duplicated rule");
+                        continue;
+                    }
+                    try {
+                        logger.info("Using the following migration rule to migrate:");
+                        logger.info(rule);
+
+                        setRawRule(rule);
+
+                        if (CollectionUtils.isNotEmptyMap(handlers)) {
+                            ExecutorService executorService = Executors.newFixedThreadPool(100, new NamedThreadFactory("Dubbo-Invoker-Migrate"));
+                            CountDownLatch countDownLatch = new CountDownLatch(handlers.size());
+
+                            handlers.forEach((_key, handler) ->
+                                executorService.submit(() -> {
+                                    handler.doMigrate(this.rule);
+                                    countDownLatch.countDown();
+                                }));
+
+                            try {
+                                countDownLatch.await(1, TimeUnit.HOURS);
+                            } catch (InterruptedException e) {
+                                logger.error("Wait Invoker Migrate interrupted!", e);
+                            }
+                            executorService.shutdown();
+                        }
+                    } catch (Throwable t) {
+                        logger.error("Error occurred when migration.", t);
+                    }
+                }
+            });
         }
+
     }
 
     public void setRawRule(String rawRule) {
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/MigrationRule.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/MigrationRule.java
index ce7e34b..39463fb 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/MigrationRule.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/MigrationRule.java
@@ -144,24 +144,26 @@ public class MigrationRule {
         if (interfaceRules != null) {
             SubMigrationRule rule = interfaceRules.get(consumerURL.getDisplayServiceKey());
             if (rule != null) {
-                return rule.getStep() == null ? step : rule.getStep();
+                if (rule.getStep() != null) {
+                    return rule.getStep();
+                }
             }
         }
 
         if (applications != null) {
             ServiceNameMapping serviceNameMapping = ServiceNameMapping.getDefaultExtension();
             Set<String> services = serviceNameMapping.getServices(consumerURL);
-            if(CollectionUtils.isNotEmpty(services)) {
+            if (CollectionUtils.isNotEmpty(services)) {
                 for (String service : services) {
                     SubMigrationRule rule = applicationRules.get(service);
-                    if (rule != null) {
-                        return rule.getStep() == null ? step : rule.getStep();
+                    if (rule.getStep() != null) {
+                        return rule.getStep();
                     }
                 }
             }
         }
 
-        if(step == null) {
+        if (step == null) {
             // initial step : APPLICATION_FIRST
             step = MigrationStep.APPLICATION_FIRST;
             step = Enum.valueOf(MigrationStep.class,
@@ -180,18 +182,22 @@ public class MigrationRule {
         if (interfaceRules != null) {
             SubMigrationRule rule = interfaceRules.get(consumerURL.getDisplayServiceKey());
             if (rule != null) {
-                return rule.getThreshold() == null ? threshold : rule.getThreshold();
+                if (rule.getThreshold() != null) {
+                    return rule.getThreshold();
+                }
             }
         }
 
         if (applications != null) {
             ServiceNameMapping serviceNameMapping = ServiceNameMapping.getDefaultExtension();
             Set<String> services = serviceNameMapping.getServices(consumerURL);
-            if(CollectionUtils.isNotEmpty(services)) {
+            if (CollectionUtils.isNotEmpty(services)) {
                 for (String service : services) {
                     SubMigrationRule rule = applicationRules.get(service);
                     if (rule != null) {
-                        return rule.getThreshold() == null ? threshold : rule.getThreshold();
+                        if (rule.getThreshold() != null) {
+                            return rule.getThreshold();
+                        }
                     }
                 }
             }
@@ -216,18 +222,22 @@ public class MigrationRule {
         if (interfaceRules != null) {
             SubMigrationRule rule = interfaceRules.get(consumerURL.getDisplayServiceKey());
             if (rule != null) {
-                return rule.getProportion() == null ? proportion : rule.getProportion();
+                if (rule.getProportion() != null) {
+                    return rule.getProportion();
+                }
             }
         }
 
         if (applications != null) {
             ServiceNameMapping serviceNameMapping = ServiceNameMapping.getDefaultExtension();
             Set<String> services = serviceNameMapping.getServices(consumerURL);
-            if(CollectionUtils.isNotEmpty(services)) {
+            if (CollectionUtils.isNotEmpty(services)) {
                 for (String service : services) {
                     SubMigrationRule rule = applicationRules.get(service);
                     if (rule != null) {
-                        return rule.getProportion() == null ? proportion : rule.getProportion();
+                        if (rule.getProportion() != null) {
+                            return rule.getProportion();
+                        }
                     }
                 }
             }
@@ -248,18 +258,22 @@ public class MigrationRule {
         if (interfaceRules != null) {
             SubMigrationRule rule = interfaceRules.get(consumerURL.getDisplayServiceKey());
             if (rule != null) {
-                return rule.getDelay() == null ? delay : rule.getDelay();
+                if (rule.getDelay() != null) {
+                    return rule.getDelay();
+                }
             }
         }
 
         if (applications != null) {
             ServiceNameMapping serviceNameMapping = ServiceNameMapping.getDefaultExtension();
             Set<String> services = serviceNameMapping.getServices(consumerURL);
-            if(CollectionUtils.isNotEmpty(services)) {
+            if (CollectionUtils.isNotEmpty(services)) {
                 for (String service : services) {
                     SubMigrationRule rule = applicationRules.get(service);
                     if (rule != null) {
-                        return rule.getDelay() == null ? delay : rule.getDelay();
+                        if (rule.getDelay() != null) {
+                            return rule.getDelay();
+                        }
                     }
                 }
             }
@@ -284,18 +298,22 @@ public class MigrationRule {
         if (interfaceRules != null) {
             SubMigrationRule rule = interfaceRules.get(consumerURL.getDisplayServiceKey());
             if (rule != null) {
-                return rule.getForce() == null ? force : rule.getForce();
+                if (rule.getForce() != null) {
+                    return rule.getForce();
+                }
             }
         }
 
         if (applications != null) {
             ServiceNameMapping serviceNameMapping = ServiceNameMapping.getDefaultExtension();
             Set<String> services = serviceNameMapping.getServices(consumerURL);
-            if(CollectionUtils.isNotEmpty(services)) {
+            if (CollectionUtils.isNotEmpty(services)) {
                 for (String service : services) {
                     SubMigrationRule rule = applicationRules.get(service);
                     if (rule != null) {
-                        return rule.getForce() == null ? force : rule.getForce();
+                        if (rule.getForce() != null) {
+                            return rule.getForce();
+                        }
                     }
                 }
             }