You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by cr...@apache.org on 2022/04/26 12:47:31 UTC
[dubbo] branch 3.0 updated: [3.0] Add dynamic decide invoker in APPLICATION_FIRST mode (#9967)
This is an automated email from the ASF dual-hosted git repository.
crazyhzm pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new c4847d5934 [3.0] Add dynamic decide invoker in APPLICATION_FIRST mode (#9967)
c4847d5934 is described below
commit c4847d59347c2bad64b884156fd679bb27c65964
Author: Albumen Kevin <jh...@gmail.com>
AuthorDate: Tue Apr 26 20:47:13 2022 +0800
[3.0] Add dynamic decide invoker in APPLICATION_FIRST mode (#9967)
---
.../client/migration/MigrationInvoker.java | 22 +++++---
.../client/migration/MigrationInvokerTest.java | 63 ++++++++++++++++++++++
2 files changed, 78 insertions(+), 7 deletions(-)
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
index 2c48684097..8f41a305d5 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
@@ -274,21 +274,18 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
if (step == APPLICATION_FIRST) {
// call ratio calculation based on random value
if (promotion < 100 && ThreadLocalRandom.current().nextDouble(100) > promotion) {
+ // fall back to interface mode
return invoker.invoke(invocation);
}
+ // check if invoker available for each time
+ return decideInvoker().invoke(invocation);
}
return currentAvailableInvoker.invoke(invocation);
}
switch (step) {
case APPLICATION_FIRST:
- if (checkInvokerAvailable(serviceDiscoveryInvoker)) {
- currentAvailableInvoker = serviceDiscoveryInvoker;
- } else if (checkInvokerAvailable(invoker)) {
- currentAvailableInvoker = invoker;
- } else {
- currentAvailableInvoker = serviceDiscoveryInvoker;
- }
+ currentAvailableInvoker = decideInvoker();
break;
case FORCE_APPLICATION:
currentAvailableInvoker = serviceDiscoveryInvoker;
@@ -301,6 +298,17 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
return currentAvailableInvoker.invoke(invocation);
}
+ private ClusterInvoker<T> decideInvoker() {
+ if (currentAvailableInvoker == serviceDiscoveryInvoker) {
+ if (checkInvokerAvailable(serviceDiscoveryInvoker)) {
+ return serviceDiscoveryInvoker;
+ }
+ return invoker;
+ } else {
+ return currentAvailableInvoker;
+ }
+ }
+
@Override
public boolean isAvailable() {
return currentAvailableInvoker != null
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationInvokerTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationInvokerTest.java
index 4a4ca06485..dee3ca5e2d 100644
--- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationInvokerTest.java
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationInvokerTest.java
@@ -67,6 +67,9 @@ public class MigrationInvokerTest {
Mockito.when(invoker.getDirectory()).thenReturn(directory);
Mockito.when(serviceDiscoveryInvoker.getDirectory()).thenReturn(serviceDiscoveryDirectory);
+ Mockito.when(invoker.isAvailable()).thenReturn(true);
+ Mockito.when(serviceDiscoveryInvoker.isAvailable()).thenReturn(true);
+
Mockito.when(invoker.hasProxyInvokers()).thenReturn(true);
Mockito.when(serviceDiscoveryInvoker.hasProxyInvokers()).thenReturn(true);
@@ -222,6 +225,66 @@ public class MigrationInvokerTest {
Assertions.assertTrue(System.currentTimeMillis() - currentTimeMillis >= 2000);
}
+ @Test
+ public void testDecide() {
+ RegistryProtocol registryProtocol = Mockito.mock(RegistryProtocol.class);
+
+ ClusterInvoker invoker = Mockito.mock(ClusterInvoker.class);
+ ClusterInvoker serviceDiscoveryInvoker = Mockito.mock(ClusterInvoker.class);
+
+ DynamicDirectory directory = Mockito.mock(DynamicDirectory.class);
+ DynamicDirectory serviceDiscoveryDirectory = Mockito.mock(DynamicDirectory.class);
+
+ Mockito.when(invoker.getDirectory()).thenReturn(directory);
+ Mockito.when(serviceDiscoveryInvoker.getDirectory()).thenReturn(serviceDiscoveryDirectory);
+
+ Mockito.when(invoker.isAvailable()).thenReturn(true);
+ Mockito.when(serviceDiscoveryInvoker.isAvailable()).thenReturn(true);
+
+ Mockito.when(invoker.hasProxyInvokers()).thenReturn(true);
+ Mockito.when(serviceDiscoveryInvoker.hasProxyInvokers()).thenReturn(true);
+
+ List<Invoker> invokers = new LinkedList<>();
+ invokers.add(Mockito.mock(Invoker.class));
+ invokers.add(Mockito.mock(Invoker.class));
+ List<Invoker> serviceDiscoveryInvokers = new LinkedList<>();
+ serviceDiscoveryInvokers.add(Mockito.mock(Invoker.class));
+ serviceDiscoveryInvokers.add(Mockito.mock(Invoker.class));
+ Mockito.when(directory.getAllInvokers()).thenReturn(invokers);
+ Mockito.when(serviceDiscoveryDirectory.getAllInvokers()).thenReturn(serviceDiscoveryInvokers);
+
+ Mockito.when(registryProtocol.getInvoker(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(invoker);
+ Mockito.when(registryProtocol.getServiceDiscoveryInvoker(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(serviceDiscoveryInvoker);
+
+ URL consumerURL = Mockito.mock(URL.class);
+ Mockito.when(consumerURL.getServiceInterface()).thenReturn("Test");
+ Mockito.when(consumerURL.getGroup()).thenReturn("Group");
+ Mockito.when(consumerURL.getVersion()).thenReturn("0.0.0");
+ Mockito.when(consumerURL.getServiceKey()).thenReturn("Group/Test:0.0.0");
+ Mockito.when(consumerURL.getDisplayServiceKey()).thenReturn("Test:0.0.0");
+ Mockito.when(consumerURL.getOrDefaultApplicationModel()).thenReturn(ApplicationModel.defaultModel());
+
+ Mockito.when(invoker.getUrl()).thenReturn(consumerURL);
+ Mockito.when(serviceDiscoveryInvoker.getUrl()).thenReturn(consumerURL);
+
+ MigrationInvoker migrationInvoker = new MigrationInvoker(registryProtocol, null, null, DemoService.class, null, consumerURL);
+
+ MigrationRule migrationRule = Mockito.mock(MigrationRule.class);
+ Mockito.when(migrationRule.getForce(Mockito.any())).thenReturn(true);
+ migrationInvoker.migrateToApplicationFirstInvoker(migrationRule);
+ migrationInvoker.setMigrationStep(MigrationStep.APPLICATION_FIRST);
+ migrationInvoker.invoke(null);
+ Mockito.verify(serviceDiscoveryInvoker, Mockito.times(1)).invoke(null);
+
+ Mockito.when(serviceDiscoveryInvoker.isAvailable()).thenReturn(false);
+ migrationInvoker.invoke(null);
+ Mockito.verify(invoker, Mockito.times(1)).invoke(null);
+
+ Mockito.when(serviceDiscoveryInvoker.isAvailable()).thenReturn(true);
+ migrationInvoker.invoke(null);
+ Mockito.verify(serviceDiscoveryInvoker, Mockito.times(2)).invoke(null);
+ }
+
@Test
public void testConcurrency() {
// 独立线程