You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2020/09/22 13:13:08 UTC

[dubbo] branch 3.0 updated: fix migration problem

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new c8e043a  fix migration problem
c8e043a is described below

commit c8e043a85b2f64eb3a1cdb0d9bf6ba91894caa1e
Author: ken.lj <ke...@gmail.com>
AuthorDate: Tue Sep 22 21:11:43 2020 +0800

    fix migration problem
---
 .../listener/ServiceInstancesChangedListener.java  |   8 +-
 .../client/migration/MigrationClusterInvoker.java  |   4 +-
 .../client/migration/MigrationInvoker.java         | 111 ++++++++++++---------
 .../client/migration/MigrationRuleHandler.java     |  11 +-
 .../ServiceDiscoveryMigrationInvoker.java          |   4 +-
 .../registry/integration/DynamicDirectory.java     |   9 +-
 .../apache/dubbo/rpc/protocol/AbstractInvoker.java |   2 +-
 7 files changed, 84 insertions(+), 65 deletions(-)

diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index 87e0977..5c9a2b3 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -94,7 +94,8 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
 
         Map<String, List<ServiceInstance>> revisionToInstances = new HashMap<>();
         Map<String, Set<String>> localServiceToRevisions = new HashMap<>();
-        Map<Set<String>, List<URL>> revisionsToUrls = new HashMap();
+        Map<Set<String>, List<URL>> revisionsToUrls = new HashMap<>();
+        Map<String, List<URL>> newServiceUrls = new HashMap<>();//TODO
         for (Map.Entry<String, List<ServiceInstance>> entry : allInstances.entrySet()) {
             List<ServiceInstance> instances = entry.getValue();
             for (ServiceInstance instance : instances) {
@@ -131,7 +132,7 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
             localServiceToRevisions.forEach((serviceKey, revisions) -> {
                 List<URL> urls = revisionsToUrls.get(revisions);
                 if (urls != null) {
-                    serviceUrls.put(serviceKey, urls);
+                    newServiceUrls.put(serviceKey, urls);
                 } else {
                     urls = new ArrayList<>();
                     for (String r : revisions) {
@@ -140,11 +141,12 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
                         }
                     }
                     revisionsToUrls.put(revisions, urls);
-                    serviceUrls.put(serviceKey, urls);
+                    newServiceUrls.put(serviceKey, urls);
                 }
             });
         }
 
+        this.serviceUrls = newServiceUrls;
         this.notifyAddressChanged();
     }
 
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationClusterInvoker.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationClusterInvoker.java
index 00c727f..fcb68f3 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationClusterInvoker.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationClusterInvoker.java
@@ -29,7 +29,9 @@ public interface MigrationClusterInvoker<T> extends ClusterInvoker<T> {
 
     boolean isServiceDiscovery();
 
-    MigrationStep getCurrentStep();
+    MigrationStep getMigrationStep();
+
+    void setMigrationStep(MigrationStep step);
 
     boolean invokersChanged();
 
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
index 6aa6f4a..9af31cd 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
@@ -49,6 +49,7 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
     private volatile ClusterInvoker<T> invoker;
     private volatile ClusterInvoker<T> serviceDiscoveryInvoker;
     private volatile ClusterInvoker<T> currentAvailableInvoker;
+    private volatile MigrationStep step;
 
     public MigrationInvoker(RegistryProtocol registryProtocol,
                             Cluster cluster,
@@ -99,25 +100,6 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
     }
 
     @Override
-    public synchronized void migrateToServiceDiscoveryInvoker(boolean forceMigrate) {
-        if (!forceMigrate) {
-            refreshServiceDiscoveryInvoker();
-            refreshInterfaceInvoker();
-            setListener(invoker, () -> {
-                this.compareAddresses(invoker, serviceDiscoveryInvoker);
-            });
-            setListener(serviceDiscoveryInvoker, () -> {
-                this.compareAddresses(invoker, serviceDiscoveryInvoker);
-            });
-        } else {
-            refreshServiceDiscoveryInvoker();
-            setListener(serviceDiscoveryInvoker, () -> {
-                this.destroyInterfaceInvoker(this.invoker);
-            });
-        }
-    }
-
-    @Override
     public void reRefer(URL newSubscribeUrl) {
         // update url to prepare for migration refresh
         this.url = url.addParameter(REFER_KEY, StringUtils.toQueryString(newSubscribeUrl.getParameters()));
@@ -145,7 +127,7 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
     }
 
     @Override
-    public synchronized void fallbackToInterfaceInvoker() {
+    public void fallbackToInterfaceInvoker() {
         refreshInterfaceInvoker();
         setListener(invoker, () -> {
             this.destroyServiceDiscoveryInvoker(this.serviceDiscoveryInvoker);
@@ -153,15 +135,44 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
     }
 
     @Override
+    public void migrateToServiceDiscoveryInvoker(boolean forceMigrate) {
+        if (!forceMigrate) {
+            refreshServiceDiscoveryInvoker();
+            refreshInterfaceInvoker();
+            setListener(invoker, () -> {
+                this.compareAddresses(serviceDiscoveryInvoker, invoker);
+            });
+            setListener(serviceDiscoveryInvoker, () -> {
+                this.compareAddresses(serviceDiscoveryInvoker, invoker);
+            });
+        } else {
+            refreshServiceDiscoveryInvoker();
+            setListener(serviceDiscoveryInvoker, () -> {
+                this.destroyInterfaceInvoker(this.invoker);
+            });
+        }
+    }
+
+    @Override
     public Result invoke(Invocation invocation) throws RpcException {
-        if (!checkInvokerAvailable(serviceDiscoveryInvoker)) {
-            logger.debug("Using interface addresses to handle invocation, interface " + type.getName() + ", total address size " + invoker.getDirectory().getAllInvokers().size());
-            return invoker.invoke(invocation);
+        if (currentAvailableInvoker != null) {
+            return currentAvailableInvoker.invoke(invocation);
         }
 
-        if (!checkInvokerAvailable(invoker)) {
-            logger.debug("Using instance addresses to handle invocation, interface " + type.getName() + ", total address size " + serviceDiscoveryInvoker.getDirectory().getAllInvokers().size());
-            return serviceDiscoveryInvoker.invoke(invocation);
+        switch (step) {
+            case APPLICATION_FIRST:
+                if (checkInvokerAvailable(serviceDiscoveryInvoker)) {
+                    currentAvailableInvoker = serviceDiscoveryInvoker;
+                } else {
+                    currentAvailableInvoker = invoker;
+                }
+                break;
+            case FORCE_APPLICATION:
+                currentAvailableInvoker = serviceDiscoveryInvoker;
+                break;
+            case INTERFACE_FIRST:
+            default:
+                currentAvailableInvoker = invoker;
         }
 
         return currentAvailableInvoker.invoke(invocation);
@@ -226,8 +237,13 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
     }
 
     @Override
-    public MigrationStep getCurrentStep() {
-        return null;
+    public MigrationStep getMigrationStep() {
+        return step;
+    }
+
+    @Override
+    public void setMigrationStep(MigrationStep step) {
+        this.step = step;
     }
 
     @Override
@@ -252,7 +268,7 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
     }
 
     protected synchronized void destroyServiceDiscoveryInvoker(ClusterInvoker<?> serviceDiscoveryInvoker) {
-        if (checkInvokerAvailable(this.invoker)) {
+        if (this.invoker != null) {
             this.currentAvailableInvoker = this.invoker;
         }
         if (serviceDiscoveryInvoker != null) {
@@ -264,18 +280,18 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
     }
 
     protected synchronized void discardServiceDiscoveryInvokerAddress(ClusterInvoker<?> serviceDiscoveryInvoker) {
-        if (checkInvokerAvailable(this.invoker)) {
+        if (this.invoker != null) {
             this.currentAvailableInvoker = this.invoker;
         }
         if (serviceDiscoveryInvoker != null) {
             if (logger.isDebugEnabled()) {
                 logger.debug("Discarding instance addresses, total size " + serviceDiscoveryInvoker.getDirectory().getAllInvokers().size());
             }
-            serviceDiscoveryInvoker.getDirectory().discordAddresses();
+//            serviceDiscoveryInvoker.getDirectory().discordAddresses();
         }
     }
 
-    protected synchronized void refreshServiceDiscoveryInvoker() {
+    protected void refreshServiceDiscoveryInvoker() {
         clearListener(serviceDiscoveryInvoker);
         if (needRefresh(serviceDiscoveryInvoker)) {
             if (logger.isDebugEnabled()) {
@@ -285,25 +301,14 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
         }
     }
 
-    private void clearListener(ClusterInvoker<T> invoker) {
-        if (invoker == null) return;
-        DynamicDirectory<T> directory = (DynamicDirectory<T>) invoker.getDirectory();
-        directory.setInvokersChangedListener(null);
-    }
-
-    private void setListener(ClusterInvoker<T> invoker, InvokersChangedListener listener) {
-        if (invoker == null) return;
-        DynamicDirectory<T> directory = (DynamicDirectory<T>) invoker.getDirectory();
-        directory.setInvokersChangedListener(listener);
-    }
-
-    protected synchronized void refreshInterfaceInvoker() {
+    protected void refreshInterfaceInvoker() {
         clearListener(invoker);
+        // FIXME invoker.destroy();
         if (needRefresh(invoker)) {
-            // FIXME invoker.destroy();
             if (logger.isDebugEnabled()) {
                 logger.debug("Re-subscribing interface addresses for interface " + type.getName());
             }
+
             invoker = registryProtocol.getInvoker(cluster, registry, type, url);
         }
     }
@@ -328,10 +333,22 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
             if (logger.isDebugEnabled()) {
                 logger.debug("Discarding interface addresses, total address size " + invoker.getDirectory().getAllInvokers().size());
             }
-            invoker.getDirectory().discordAddresses();
+            //invoker.getDirectory().discordAddresses();
         }
     }
 
+    private void clearListener(ClusterInvoker<T> invoker) {
+        if (invoker == null) return;
+        DynamicDirectory<T> directory = (DynamicDirectory<T>) invoker.getDirectory();
+        directory.setInvokersChangedListener(null);
+    }
+
+    private void setListener(ClusterInvoker<T> invoker, InvokersChangedListener listener) {
+        if (invoker == null) return;
+        DynamicDirectory<T> directory = (DynamicDirectory<T>) invoker.getDirectory();
+        directory.setInvokersChangedListener(listener);
+    }
+
     private boolean needRefresh(ClusterInvoker<T> invoker) {
         return invoker == null || invoker.isDestroyed();
     }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java
index 1572355..d22bd6b 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java
@@ -31,10 +31,10 @@ public class MigrationRuleHandler<T> {
     private static final Logger logger = LoggerFactory.getLogger(MigrationRuleHandler.class);
     private static final String DUBBO_SERVICEDISCOVERY_MIGRATION = "dubbo.application.service-discovery.migration";
 
-    private MigrationInvoker<T> migrationInvoker;
+    private MigrationClusterInvoker<T> migrationInvoker;
     private MigrationStep currentStep;
 
-    public MigrationRuleHandler(MigrationInvoker<T> invoker) {
+    public MigrationRuleHandler(MigrationClusterInvoker<T> invoker) {
         this.migrationInvoker = invoker;
     }
 
@@ -53,7 +53,7 @@ public class MigrationRuleHandler<T> {
         }
 
         if (currentStep == null || currentStep != step) {
-            currentStep = step;
+            setCurrentStep(step);
             switch (step) {
                 case APPLICATION_FIRST:
                     migrationInvoker.migrateToServiceDiscoveryInvoker(false);
@@ -67,4 +67,9 @@ public class MigrationRuleHandler<T> {
             }
         }
     }
+
+    public void setCurrentStep(MigrationStep currentStep) {
+        this.currentStep = currentStep;
+        this.migrationInvoker.setMigrationStep(currentStep);
+    }
 }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/ServiceDiscoveryMigrationInvoker.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/ServiceDiscoveryMigrationInvoker.java
index fb9e17e..774073b 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/ServiceDiscoveryMigrationInvoker.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/ServiceDiscoveryMigrationInvoker.java
@@ -40,13 +40,13 @@ public class ServiceDiscoveryMigrationInvoker<T> extends MigrationInvoker<T> {
     }
 
     @Override
-    public synchronized void fallbackToInterfaceInvoker() {
+    public void fallbackToInterfaceInvoker() {
         logger.error("Service discovery registry type does not support discovery of interface level addresses, " + getRegistryUrl());
         migrateToServiceDiscoveryInvoker(true);
     }
 
     @Override
-    public synchronized void migrateToServiceDiscoveryInvoker(boolean forceMigrate) {
+    public void migrateToServiceDiscoveryInvoker(boolean forceMigrate) {
         refreshServiceDiscoveryInvoker();
     }
 
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
index 3056506..8d2ed8b 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
@@ -260,22 +260,15 @@ public abstract class DynamicDirectory<T> extends AbstractDirectory<T> implement
     }
 
     private volatile InvokersChangedListener invokersChangedListener;
-    private volatile boolean addressChanged;
 
     public void setInvokersChangedListener(InvokersChangedListener listener) {
         this.invokersChangedListener = listener;
-        if (addressChanged) {
-            invokersChangedListener.onChange();
-            this.addressChanged = false;
-        }
+        invokersChanged();
     }
 
     protected void invokersChanged() {
         if (invokersChangedListener != null) {
             invokersChangedListener.onChange();
-            this.addressChanged = false;
-        } else {
-            this.addressChanged = true;
         }
     }
 
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
index 0865f08..94c7b79 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
@@ -128,7 +128,7 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
 
     @Override
     public String toString() {
-        return getInterface() + " -> " + (getUrl() == null ? "" : getUrl().toString());
+        return getInterface() + " -> " + (getUrl() == null ? "" : getUrl().getAddress());
     }
 
     @Override