You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2020/08/31 03:16:46 UTC

[dubbo] branch 3.0 updated: migration optimization (#6665)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new dbc4530  migration optimization (#6665)
dbc4530 is described below

commit dbc4530359a47f10fcc35035cf6797f898f54a5b
Author: ken.lj <ke...@gmail.com>
AuthorDate: Mon Aug 31 11:16:32 2020 +0800

    migration optimization (#6665)
---
 .../apache/dubbo/demo/consumer/Application.java    | 19 ++++++----
 .../DefaultMigrationAddressComparator.java         | 43 +++++++++++++++++++++-
 .../client/migration/MigrationInvoker.java         | 20 +++++++---
 .../ServiceDiscoveryMigrationInvoker.java          |  5 +++
 .../java/org/apache/dubbo/registry/ZKTools.java    |  2 +-
 5 files changed, 74 insertions(+), 15 deletions(-)

diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/java/org/apache/dubbo/demo/consumer/Application.java b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/java/org/apache/dubbo/demo/consumer/Application.java
index fba18bc..72b6583 100644
--- a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/java/org/apache/dubbo/demo/consumer/Application.java
+++ b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/java/org/apache/dubbo/demo/consumer/Application.java
@@ -36,22 +36,27 @@ public class Application {
 
         new Thread(() -> {
             while (true) {
-                String greetings = greetingService.hello();
-                System.out.println(greetings + " from separated thread.");
                 try {
+                    String greetings = greetingService.hello();
+                    System.out.println(greetings + " from separated thread.");
+
                     Thread.sleep(100);
-                } catch (InterruptedException e) {
+                } catch (Exception e) {
                     e.printStackTrace();
                 }
             }
         }).start();
 
         while (true) {
-            CompletableFuture<String> hello = demoService.sayHelloAsync("world");
-            System.out.println("result: " + hello.get());
+            try {
+                CompletableFuture<String> hello = demoService.sayHelloAsync("world");
+                System.out.println("result: " + hello.get());
 
-            String greetings = greetingService.hello();
-            System.out.println("result: " + greetings);
+                String greetings = greetingService.hello();
+                System.out.println("result: " + greetings);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
 
             Thread.sleep(500);
         }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java
index 5fc8480..eace753 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java
@@ -16,12 +16,53 @@
  */
 package org.apache.dubbo.registry.client.migration;
 
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.cluster.ClusterInvoker;
 
+import java.util.List;
+
 public class DefaultMigrationAddressComparator implements MigrationAddressComparator {
+    private static final Logger logger = LoggerFactory.getLogger(DefaultMigrationAddressComparator.class);
+    private static final String MIGRATION_THRESHOLD = "dubbo.application.migration.threshold";
+    private static final String DEFAULT_THREAD_STRING = "0.8";
+    private static final float DEFAULT_THREAD = 0.8f;
+
     @Override
     public <T> boolean shouldMigrate(ClusterInvoker<T> serviceDiscoveryInvoker, ClusterInvoker<T> invoker) {
-        if (serviceDiscoveryInvoker.isAvailable()) {
+        if (!serviceDiscoveryInvoker.isAvailable()) {
+            return false;
+        }
+        if (!invoker.isAvailable()) {
+            return true;
+        }
+
+        List<Invoker<T>> invokers1 = serviceDiscoveryInvoker.getDirectory().getAllInvokers();
+        List<Invoker<T>> invokers2 = invoker.getDirectory().getAllInvokers();
+
+        int newAddressSize = CollectionUtils.isNotEmpty(invokers1) ? invokers1.size() : 0;
+        int oldAddressSize = CollectionUtils.isNotEmpty(invokers2) ? invokers2.size() : 0;
+
+        String rawThreshold = ConfigurationUtils.getProperty(MIGRATION_THRESHOLD, DEFAULT_THREAD_STRING);
+        float threshold;
+        try {
+            threshold = Float.parseFloat(rawThreshold);
+        } catch (Exception e) {
+            logger.error("Invalid migration threshold " + rawThreshold);
+            threshold = DEFAULT_THREAD;
+        }
+
+        if (newAddressSize != 0 && oldAddressSize == 0) {
+            return true;
+        }
+        if (newAddressSize == 0 && oldAddressSize == 0) {
+            return false;
+        }
+
+        if ((float) (newAddressSize / oldAddressSize) >= threshold) {
             return true;
         }
         return false;
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
index 768f496..01c32dd 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
@@ -41,6 +41,7 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
 
     private ClusterInvoker<T> invoker;
     private ClusterInvoker<T> serviceDiscoveryInvoker;
+    private volatile ClusterInvoker<T> currentAvailableInvoker;
 
     public MigrationInvoker(RegistryProtocol registryProtocol,
                             Cluster cluster,
@@ -93,9 +94,9 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
             refreshServiceDiscoveryInvoker();
             refreshInterfaceInvoker();
             // any of the address list changes, compare these two lists.
-            ((DynamicDirectory) invoker.getDirectory()).addInvokersChangedListener(this::checkAddresses);
-            ((DynamicDirectory) serviceDiscoveryInvoker.getDirectory()).addInvokersChangedListener(this::checkAddresses);
-            this.checkAddresses();
+            ((DynamicDirectory) invoker.getDirectory()).addInvokersChangedListener(this::compareAddresses);
+            ((DynamicDirectory) serviceDiscoveryInvoker.getDirectory()).addInvokersChangedListener(this::compareAddresses);
+            this.compareAddresses();
         } else {
             refreshServiceDiscoveryInvoker();
             destroyInterfaceInvoker();
@@ -118,7 +119,7 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
             return serviceDiscoveryInvoker.invoke(invocation);
         }
 
-        throw new IllegalStateException("Service discovery invoker and Interface invoker should has at least one being available, " + invocation.getServiceName());
+        return currentAvailableInvoker.invoke(invocation);
     }
 
     @Override
@@ -179,11 +180,14 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
 
     @Override
     public boolean invokersChanged() {
-        return false;
+        return invokersChanged;
     }
 
+    private volatile boolean invokersChanged;
+
+    private synchronized void compareAddresses() {
+        this.invokersChanged = true;
 
-    private synchronized void checkAddresses() {
         Set<MigrationAddressComparator> detectors = ExtensionLoader.getExtensionLoader(MigrationAddressComparator.class).getSupportedExtensionInstances();
         if (detectors != null && detectors.stream().allMatch(migrationDetector -> migrationDetector.shouldMigrate(serviceDiscoveryInvoker, invoker))) {
             discardInterfaceInvokerAddress();
@@ -193,6 +197,7 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
     }
 
     protected synchronized void destroyServiceDiscoveryInvoker() {
+        this.currentAvailableInvoker = invoker;
         if (serviceDiscoveryInvoker != null) {
             serviceDiscoveryInvoker.destroy();
             serviceDiscoveryInvoker = null;
@@ -200,6 +205,7 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
     }
 
     protected synchronized void discardServiceDiscoveryInvokerAddress() {
+        this.currentAvailableInvoker = invoker;
         if (serviceDiscoveryInvoker != null) {
             serviceDiscoveryInvoker.getDirectory().discordAddresses();
         }
@@ -219,6 +225,7 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
     }
 
     protected synchronized void destroyInterfaceInvoker() {
+        this.currentAvailableInvoker = serviceDiscoveryInvoker;
         if (invoker != null) {
             invoker.destroy();
             invoker = null;
@@ -226,6 +233,7 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
     }
 
     protected synchronized void discardInterfaceInvokerAddress() {
+        this.currentAvailableInvoker = serviceDiscoveryInvoker;
         if (invoker != null) {
             invoker.getDirectory().discordAddresses();
         }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/ServiceDiscoveryMigrationInvoker.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/ServiceDiscoveryMigrationInvoker.java
index b37afba..e9aebe3 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/ServiceDiscoveryMigrationInvoker.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/ServiceDiscoveryMigrationInvoker.java
@@ -32,6 +32,11 @@ public class ServiceDiscoveryMigrationInvoker<T> extends MigrationInvoker<T> {
     }
 
     @Override
+    public boolean isServiceDiscovery() {
+        return true;
+    }
+
+    @Override
     public synchronized void fallbackToInterfaceInvoker() {
         destroyServiceDiscoveryInvoker();
     }
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/ZKTools.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/ZKTools.java
index 86ff3a2..ac24765 100644
--- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/ZKTools.java
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/ZKTools.java
@@ -71,7 +71,7 @@ public class ZKTools {
     public static void testMigrationRule() {
         String serviceStr = "---\n" +
                 "key: demo-consumer\n" +
-                "step: INTERFACE_FIRST\n" +
+                "step: APPLICATION_FIRST\n" +
                 "...";
         try {
             String servicePath = "/dubbo/config/DUBBO_SERVICEDISCOVERY_MIGRATION/demo-consumer.migration";