You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2020/09/22 13:13:08 UTC
[dubbo] branch 3.0 updated: fix migration problem
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new c8e043a fix migration problem
c8e043a is described below
commit c8e043a85b2f64eb3a1cdb0d9bf6ba91894caa1e
Author: ken.lj <ke...@gmail.com>
AuthorDate: Tue Sep 22 21:11:43 2020 +0800
fix migration problem
---
.../listener/ServiceInstancesChangedListener.java | 8 +-
.../client/migration/MigrationClusterInvoker.java | 4 +-
.../client/migration/MigrationInvoker.java | 111 ++++++++++++---------
.../client/migration/MigrationRuleHandler.java | 11 +-
.../ServiceDiscoveryMigrationInvoker.java | 4 +-
.../registry/integration/DynamicDirectory.java | 9 +-
.../apache/dubbo/rpc/protocol/AbstractInvoker.java | 2 +-
7 files changed, 84 insertions(+), 65 deletions(-)
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index 87e0977..5c9a2b3 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -94,7 +94,8 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
Map<String, List<ServiceInstance>> revisionToInstances = new HashMap<>();
Map<String, Set<String>> localServiceToRevisions = new HashMap<>();
- Map<Set<String>, List<URL>> revisionsToUrls = new HashMap();
+ Map<Set<String>, List<URL>> revisionsToUrls = new HashMap<>();
+ Map<String, List<URL>> newServiceUrls = new HashMap<>();//TODO
for (Map.Entry<String, List<ServiceInstance>> entry : allInstances.entrySet()) {
List<ServiceInstance> instances = entry.getValue();
for (ServiceInstance instance : instances) {
@@ -131,7 +132,7 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
localServiceToRevisions.forEach((serviceKey, revisions) -> {
List<URL> urls = revisionsToUrls.get(revisions);
if (urls != null) {
- serviceUrls.put(serviceKey, urls);
+ newServiceUrls.put(serviceKey, urls);
} else {
urls = new ArrayList<>();
for (String r : revisions) {
@@ -140,11 +141,12 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
}
}
revisionsToUrls.put(revisions, urls);
- serviceUrls.put(serviceKey, urls);
+ newServiceUrls.put(serviceKey, urls);
}
});
}
+ this.serviceUrls = newServiceUrls;
this.notifyAddressChanged();
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationClusterInvoker.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationClusterInvoker.java
index 00c727f..fcb68f3 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationClusterInvoker.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationClusterInvoker.java
@@ -29,7 +29,9 @@ public interface MigrationClusterInvoker<T> extends ClusterInvoker<T> {
boolean isServiceDiscovery();
- MigrationStep getCurrentStep();
+ MigrationStep getMigrationStep();
+
+ void setMigrationStep(MigrationStep step);
boolean invokersChanged();
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
index 6aa6f4a..9af31cd 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
@@ -49,6 +49,7 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
private volatile ClusterInvoker<T> invoker;
private volatile ClusterInvoker<T> serviceDiscoveryInvoker;
private volatile ClusterInvoker<T> currentAvailableInvoker;
+ private volatile MigrationStep step;
public MigrationInvoker(RegistryProtocol registryProtocol,
Cluster cluster,
@@ -99,25 +100,6 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
}
@Override
- public synchronized void migrateToServiceDiscoveryInvoker(boolean forceMigrate) {
- if (!forceMigrate) {
- refreshServiceDiscoveryInvoker();
- refreshInterfaceInvoker();
- setListener(invoker, () -> {
- this.compareAddresses(invoker, serviceDiscoveryInvoker);
- });
- setListener(serviceDiscoveryInvoker, () -> {
- this.compareAddresses(invoker, serviceDiscoveryInvoker);
- });
- } else {
- refreshServiceDiscoveryInvoker();
- setListener(serviceDiscoveryInvoker, () -> {
- this.destroyInterfaceInvoker(this.invoker);
- });
- }
- }
-
- @Override
public void reRefer(URL newSubscribeUrl) {
// update url to prepare for migration refresh
this.url = url.addParameter(REFER_KEY, StringUtils.toQueryString(newSubscribeUrl.getParameters()));
@@ -145,7 +127,7 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
}
@Override
- public synchronized void fallbackToInterfaceInvoker() {
+ public void fallbackToInterfaceInvoker() {
refreshInterfaceInvoker();
setListener(invoker, () -> {
this.destroyServiceDiscoveryInvoker(this.serviceDiscoveryInvoker);
@@ -153,15 +135,44 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
}
@Override
+ public void migrateToServiceDiscoveryInvoker(boolean forceMigrate) {
+ if (!forceMigrate) {
+ refreshServiceDiscoveryInvoker();
+ refreshInterfaceInvoker();
+ setListener(invoker, () -> {
+ this.compareAddresses(serviceDiscoveryInvoker, invoker);
+ });
+ setListener(serviceDiscoveryInvoker, () -> {
+ this.compareAddresses(serviceDiscoveryInvoker, invoker);
+ });
+ } else {
+ refreshServiceDiscoveryInvoker();
+ setListener(serviceDiscoveryInvoker, () -> {
+ this.destroyInterfaceInvoker(this.invoker);
+ });
+ }
+ }
+
+ @Override
public Result invoke(Invocation invocation) throws RpcException {
- if (!checkInvokerAvailable(serviceDiscoveryInvoker)) {
- logger.debug("Using interface addresses to handle invocation, interface " + type.getName() + ", total address size " + invoker.getDirectory().getAllInvokers().size());
- return invoker.invoke(invocation);
+ if (currentAvailableInvoker != null) {
+ return currentAvailableInvoker.invoke(invocation);
}
- if (!checkInvokerAvailable(invoker)) {
- logger.debug("Using instance addresses to handle invocation, interface " + type.getName() + ", total address size " + serviceDiscoveryInvoker.getDirectory().getAllInvokers().size());
- return serviceDiscoveryInvoker.invoke(invocation);
+ switch (step) {
+ case APPLICATION_FIRST:
+ if (checkInvokerAvailable(serviceDiscoveryInvoker)) {
+ currentAvailableInvoker = serviceDiscoveryInvoker;
+ } else {
+ currentAvailableInvoker = invoker;
+ }
+ break;
+ case FORCE_APPLICATION:
+ currentAvailableInvoker = serviceDiscoveryInvoker;
+ break;
+ case INTERFACE_FIRST:
+ default:
+ currentAvailableInvoker = invoker;
}
return currentAvailableInvoker.invoke(invocation);
@@ -226,8 +237,13 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
}
@Override
- public MigrationStep getCurrentStep() {
- return null;
+ public MigrationStep getMigrationStep() {
+ return step;
+ }
+
+ @Override
+ public void setMigrationStep(MigrationStep step) {
+ this.step = step;
}
@Override
@@ -252,7 +268,7 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
}
protected synchronized void destroyServiceDiscoveryInvoker(ClusterInvoker<?> serviceDiscoveryInvoker) {
- if (checkInvokerAvailable(this.invoker)) {
+ if (this.invoker != null) {
this.currentAvailableInvoker = this.invoker;
}
if (serviceDiscoveryInvoker != null) {
@@ -264,18 +280,18 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
}
protected synchronized void discardServiceDiscoveryInvokerAddress(ClusterInvoker<?> serviceDiscoveryInvoker) {
- if (checkInvokerAvailable(this.invoker)) {
+ if (this.invoker != null) {
this.currentAvailableInvoker = this.invoker;
}
if (serviceDiscoveryInvoker != null) {
if (logger.isDebugEnabled()) {
logger.debug("Discarding instance addresses, total size " + serviceDiscoveryInvoker.getDirectory().getAllInvokers().size());
}
- serviceDiscoveryInvoker.getDirectory().discordAddresses();
+// serviceDiscoveryInvoker.getDirectory().discordAddresses();
}
}
- protected synchronized void refreshServiceDiscoveryInvoker() {
+ protected void refreshServiceDiscoveryInvoker() {
clearListener(serviceDiscoveryInvoker);
if (needRefresh(serviceDiscoveryInvoker)) {
if (logger.isDebugEnabled()) {
@@ -285,25 +301,14 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
}
}
- private void clearListener(ClusterInvoker<T> invoker) {
- if (invoker == null) return;
- DynamicDirectory<T> directory = (DynamicDirectory<T>) invoker.getDirectory();
- directory.setInvokersChangedListener(null);
- }
-
- private void setListener(ClusterInvoker<T> invoker, InvokersChangedListener listener) {
- if (invoker == null) return;
- DynamicDirectory<T> directory = (DynamicDirectory<T>) invoker.getDirectory();
- directory.setInvokersChangedListener(listener);
- }
-
- protected synchronized void refreshInterfaceInvoker() {
+ protected void refreshInterfaceInvoker() {
clearListener(invoker);
+ // FIXME invoker.destroy();
if (needRefresh(invoker)) {
- // FIXME invoker.destroy();
if (logger.isDebugEnabled()) {
logger.debug("Re-subscribing interface addresses for interface " + type.getName());
}
+
invoker = registryProtocol.getInvoker(cluster, registry, type, url);
}
}
@@ -328,10 +333,22 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
if (logger.isDebugEnabled()) {
logger.debug("Discarding interface addresses, total address size " + invoker.getDirectory().getAllInvokers().size());
}
- invoker.getDirectory().discordAddresses();
+ //invoker.getDirectory().discordAddresses();
}
}
+ private void clearListener(ClusterInvoker<T> invoker) {
+ if (invoker == null) return;
+ DynamicDirectory<T> directory = (DynamicDirectory<T>) invoker.getDirectory();
+ directory.setInvokersChangedListener(null);
+ }
+
+ private void setListener(ClusterInvoker<T> invoker, InvokersChangedListener listener) {
+ if (invoker == null) return;
+ DynamicDirectory<T> directory = (DynamicDirectory<T>) invoker.getDirectory();
+ directory.setInvokersChangedListener(listener);
+ }
+
private boolean needRefresh(ClusterInvoker<T> invoker) {
return invoker == null || invoker.isDestroyed();
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java
index 1572355..d22bd6b 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java
@@ -31,10 +31,10 @@ public class MigrationRuleHandler<T> {
private static final Logger logger = LoggerFactory.getLogger(MigrationRuleHandler.class);
private static final String DUBBO_SERVICEDISCOVERY_MIGRATION = "dubbo.application.service-discovery.migration";
- private MigrationInvoker<T> migrationInvoker;
+ private MigrationClusterInvoker<T> migrationInvoker;
private MigrationStep currentStep;
- public MigrationRuleHandler(MigrationInvoker<T> invoker) {
+ public MigrationRuleHandler(MigrationClusterInvoker<T> invoker) {
this.migrationInvoker = invoker;
}
@@ -53,7 +53,7 @@ public class MigrationRuleHandler<T> {
}
if (currentStep == null || currentStep != step) {
- currentStep = step;
+ setCurrentStep(step);
switch (step) {
case APPLICATION_FIRST:
migrationInvoker.migrateToServiceDiscoveryInvoker(false);
@@ -67,4 +67,9 @@ public class MigrationRuleHandler<T> {
}
}
}
+
+ public void setCurrentStep(MigrationStep currentStep) {
+ this.currentStep = currentStep;
+ this.migrationInvoker.setMigrationStep(currentStep);
+ }
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/ServiceDiscoveryMigrationInvoker.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/ServiceDiscoveryMigrationInvoker.java
index fb9e17e..774073b 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/ServiceDiscoveryMigrationInvoker.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/ServiceDiscoveryMigrationInvoker.java
@@ -40,13 +40,13 @@ public class ServiceDiscoveryMigrationInvoker<T> extends MigrationInvoker<T> {
}
@Override
- public synchronized void fallbackToInterfaceInvoker() {
+ public void fallbackToInterfaceInvoker() {
logger.error("Service discovery registry type does not support discovery of interface level addresses, " + getRegistryUrl());
migrateToServiceDiscoveryInvoker(true);
}
@Override
- public synchronized void migrateToServiceDiscoveryInvoker(boolean forceMigrate) {
+ public void migrateToServiceDiscoveryInvoker(boolean forceMigrate) {
refreshServiceDiscoveryInvoker();
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
index 3056506..8d2ed8b 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
@@ -260,22 +260,15 @@ public abstract class DynamicDirectory<T> extends AbstractDirectory<T> implement
}
private volatile InvokersChangedListener invokersChangedListener;
- private volatile boolean addressChanged;
public void setInvokersChangedListener(InvokersChangedListener listener) {
this.invokersChangedListener = listener;
- if (addressChanged) {
- invokersChangedListener.onChange();
- this.addressChanged = false;
- }
+ invokersChanged();
}
protected void invokersChanged() {
if (invokersChangedListener != null) {
invokersChangedListener.onChange();
- this.addressChanged = false;
- } else {
- this.addressChanged = true;
}
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
index 0865f08..94c7b79 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
@@ -128,7 +128,7 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
@Override
public String toString() {
- return getInterface() + " -> " + (getUrl() == null ? "" : getUrl().toString());
+ return getInterface() + " -> " + (getUrl() == null ? "" : getUrl().getAddress());
}
@Override