You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2020/08/31 03:16:46 UTC
[dubbo] branch 3.0 updated: migration optimization (#6665)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new dbc4530 migration optimization (#6665)
dbc4530 is described below
commit dbc4530359a47f10fcc35035cf6797f898f54a5b
Author: ken.lj <ke...@gmail.com>
AuthorDate: Mon Aug 31 11:16:32 2020 +0800
migration optimization (#6665)
---
.../apache/dubbo/demo/consumer/Application.java | 19 ++++++----
.../DefaultMigrationAddressComparator.java | 43 +++++++++++++++++++++-
.../client/migration/MigrationInvoker.java | 20 +++++++---
.../ServiceDiscoveryMigrationInvoker.java | 5 +++
.../java/org/apache/dubbo/registry/ZKTools.java | 2 +-
5 files changed, 74 insertions(+), 15 deletions(-)
diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/java/org/apache/dubbo/demo/consumer/Application.java b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/java/org/apache/dubbo/demo/consumer/Application.java
index fba18bc..72b6583 100644
--- a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/java/org/apache/dubbo/demo/consumer/Application.java
+++ b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/java/org/apache/dubbo/demo/consumer/Application.java
@@ -36,22 +36,27 @@ public class Application {
new Thread(() -> {
while (true) {
- String greetings = greetingService.hello();
- System.out.println(greetings + " from separated thread.");
try {
+ String greetings = greetingService.hello();
+ System.out.println(greetings + " from separated thread.");
+
Thread.sleep(100);
- } catch (InterruptedException e) {
+ } catch (Exception e) {
e.printStackTrace();
}
}
}).start();
while (true) {
- CompletableFuture<String> hello = demoService.sayHelloAsync("world");
- System.out.println("result: " + hello.get());
+ try {
+ CompletableFuture<String> hello = demoService.sayHelloAsync("world");
+ System.out.println("result: " + hello.get());
- String greetings = greetingService.hello();
- System.out.println("result: " + greetings);
+ String greetings = greetingService.hello();
+ System.out.println("result: " + greetings);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
Thread.sleep(500);
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java
index 5fc8480..eace753 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java
@@ -16,12 +16,53 @@
*/
package org.apache.dubbo.registry.client.migration;
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.cluster.ClusterInvoker;
+import java.util.List;
+
public class DefaultMigrationAddressComparator implements MigrationAddressComparator {
+ private static final Logger logger = LoggerFactory.getLogger(DefaultMigrationAddressComparator.class);
+ private static final String MIGRATION_THRESHOLD = "dubbo.application.migration.threshold";
+ private static final String DEFAULT_THREAD_STRING = "0.8";
+ private static final float DEFAULT_THREAD = 0.8f;
+
@Override
public <T> boolean shouldMigrate(ClusterInvoker<T> serviceDiscoveryInvoker, ClusterInvoker<T> invoker) {
- if (serviceDiscoveryInvoker.isAvailable()) {
+ if (!serviceDiscoveryInvoker.isAvailable()) {
+ return false;
+ }
+ if (!invoker.isAvailable()) {
+ return true;
+ }
+
+ List<Invoker<T>> invokers1 = serviceDiscoveryInvoker.getDirectory().getAllInvokers();
+ List<Invoker<T>> invokers2 = invoker.getDirectory().getAllInvokers();
+
+ int newAddressSize = CollectionUtils.isNotEmpty(invokers1) ? invokers1.size() : 0;
+ int oldAddressSize = CollectionUtils.isNotEmpty(invokers2) ? invokers2.size() : 0;
+
+ String rawThreshold = ConfigurationUtils.getProperty(MIGRATION_THRESHOLD, DEFAULT_THREAD_STRING);
+ float threshold;
+ try {
+ threshold = Float.parseFloat(rawThreshold);
+ } catch (Exception e) {
+ logger.error("Invalid migration threshold " + rawThreshold);
+ threshold = DEFAULT_THREAD;
+ }
+
+ if (newAddressSize != 0 && oldAddressSize == 0) {
+ return true;
+ }
+ if (newAddressSize == 0 && oldAddressSize == 0) {
+ return false;
+ }
+
+ if ((float) (newAddressSize / oldAddressSize) >= threshold) {
return true;
}
return false;
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
index 768f496..01c32dd 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
@@ -41,6 +41,7 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
private ClusterInvoker<T> invoker;
private ClusterInvoker<T> serviceDiscoveryInvoker;
+ private volatile ClusterInvoker<T> currentAvailableInvoker;
public MigrationInvoker(RegistryProtocol registryProtocol,
Cluster cluster,
@@ -93,9 +94,9 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
refreshServiceDiscoveryInvoker();
refreshInterfaceInvoker();
// any of the address list changes, compare these two lists.
- ((DynamicDirectory) invoker.getDirectory()).addInvokersChangedListener(this::checkAddresses);
- ((DynamicDirectory) serviceDiscoveryInvoker.getDirectory()).addInvokersChangedListener(this::checkAddresses);
- this.checkAddresses();
+ ((DynamicDirectory) invoker.getDirectory()).addInvokersChangedListener(this::compareAddresses);
+ ((DynamicDirectory) serviceDiscoveryInvoker.getDirectory()).addInvokersChangedListener(this::compareAddresses);
+ this.compareAddresses();
} else {
refreshServiceDiscoveryInvoker();
destroyInterfaceInvoker();
@@ -118,7 +119,7 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
return serviceDiscoveryInvoker.invoke(invocation);
}
- throw new IllegalStateException("Service discovery invoker and Interface invoker should has at least one being available, " + invocation.getServiceName());
+ return currentAvailableInvoker.invoke(invocation);
}
@Override
@@ -179,11 +180,14 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
@Override
public boolean invokersChanged() {
- return false;
+ return invokersChanged;
}
+ private volatile boolean invokersChanged;
+
+ private synchronized void compareAddresses() {
+ this.invokersChanged = true;
- private synchronized void checkAddresses() {
Set<MigrationAddressComparator> detectors = ExtensionLoader.getExtensionLoader(MigrationAddressComparator.class).getSupportedExtensionInstances();
if (detectors != null && detectors.stream().allMatch(migrationDetector -> migrationDetector.shouldMigrate(serviceDiscoveryInvoker, invoker))) {
discardInterfaceInvokerAddress();
@@ -193,6 +197,7 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
}
protected synchronized void destroyServiceDiscoveryInvoker() {
+ this.currentAvailableInvoker = invoker;
if (serviceDiscoveryInvoker != null) {
serviceDiscoveryInvoker.destroy();
serviceDiscoveryInvoker = null;
@@ -200,6 +205,7 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
}
protected synchronized void discardServiceDiscoveryInvokerAddress() {
+ this.currentAvailableInvoker = invoker;
if (serviceDiscoveryInvoker != null) {
serviceDiscoveryInvoker.getDirectory().discordAddresses();
}
@@ -219,6 +225,7 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
}
protected synchronized void destroyInterfaceInvoker() {
+ this.currentAvailableInvoker = serviceDiscoveryInvoker;
if (invoker != null) {
invoker.destroy();
invoker = null;
@@ -226,6 +233,7 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
}
protected synchronized void discardInterfaceInvokerAddress() {
+ this.currentAvailableInvoker = serviceDiscoveryInvoker;
if (invoker != null) {
invoker.getDirectory().discordAddresses();
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/ServiceDiscoveryMigrationInvoker.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/ServiceDiscoveryMigrationInvoker.java
index b37afba..e9aebe3 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/ServiceDiscoveryMigrationInvoker.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/ServiceDiscoveryMigrationInvoker.java
@@ -32,6 +32,11 @@ public class ServiceDiscoveryMigrationInvoker<T> extends MigrationInvoker<T> {
}
@Override
+ public boolean isServiceDiscovery() {
+ return true;
+ }
+
+ @Override
public synchronized void fallbackToInterfaceInvoker() {
destroyServiceDiscoveryInvoker();
}
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/ZKTools.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/ZKTools.java
index 86ff3a2..ac24765 100644
--- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/ZKTools.java
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/ZKTools.java
@@ -71,7 +71,7 @@ public class ZKTools {
public static void testMigrationRule() {
String serviceStr = "---\n" +
"key: demo-consumer\n" +
- "step: INTERFACE_FIRST\n" +
+ "step: APPLICATION_FIRST\n" +
"...";
try {
String servicePath = "/dubbo/config/DUBBO_SERVICEDISCOVERY_MIGRATION/demo-consumer.migration";