You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/06/24 07:14:19 UTC
[dubbo] branch 3.0 updated: Opt Migration & Fix reSubscribe for
ServiceDiscovery (#8129)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 82f5665 Opt Migration & Fix reSubscribe for ServiceDiscovery (#8129)
82f5665 is described below
commit 82f5665684be51b03e08163a6daa4f5b9d24354d
Author: Albumen Kevin <jh...@gmail.com>
AuthorDate: Thu Jun 24 15:14:03 2021 +0800
Opt Migration & Fix reSubscribe for ServiceDiscovery (#8129)
---
.../registry/client/ServiceDiscoveryRegistry.java | 3 +
.../metadata/MetadataServiceNameMapping.java | 5 ++
.../client/migration/MigrationRuleListener.java | 73 +++++++++++++++++++---
.../client/migration/model/MigrationRule.java | 52 ++++++++++-----
4 files changed, 107 insertions(+), 26 deletions(-)
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
index 4ca66ad..a28c0c5 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
@@ -259,10 +259,13 @@ public class ServiceDiscoveryRegistry implements Registry {
String serviceNamesKey = toStringKeys(serviceNames);
ServiceInstancesChangedListener instancesChangedListener = serviceListeners.get(serviceNamesKey);
if (instancesChangedListener != null) {
+ String listenerId = createListenerId(url, instancesChangedListener);
+
instancesChangedListener.removeListener(protocolServiceKey);
if (!instancesChangedListener.hasListeners()) {
serviceListeners.remove(serviceNamesKey);
}
+ registeredListeners.remove(listenerId);
}
}
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataServiceNameMapping.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataServiceNameMapping.java
index 4ad262c..ad4dde0 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataServiceNameMapping.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataServiceNameMapping.java
@@ -55,6 +55,11 @@ public class MetadataServiceNameMapping extends AbstractServiceNameMapping {
String registryCluster = getRegistryCluster(url);
MetadataReport metadataReport = MetadataReportInstance.getMetadataReport(registryCluster);
+ if (metadataReport.registerServiceAppMapping(serviceInterface, getName(), url)) {
+ // MetadataReport support directly register service-app mapping
+ return;
+ }
+
int currentRetryTimes = 1;
boolean success;
String newConfigContent = getName();
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
index 56f33af..f1fea06 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
@@ -35,9 +35,14 @@ import org.apache.dubbo.rpc.cluster.ClusterInvoker;
import org.apache.dubbo.rpc.model.ApplicationModel;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.dubbo.common.constants.RegistryConstants.INIT;
@@ -49,6 +54,11 @@ public class MigrationRuleListener implements RegistryProtocolListener, Configur
private final String RULE_KEY = ApplicationModel.getName() + ".migration";
private final Map<MigrationInvoker, MigrationRuleHandler> handlers = new ConcurrentHashMap<>();
+ private final LinkedBlockingQueue<String> ruleQueue = new LinkedBlockingQueue<>();
+
+ private final AtomicBoolean executorSubmit = new AtomicBoolean(false);
+ private final ExecutorService ruleManageExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("Dubbo-Migration-Listener"));
+
private DynamicConfiguration configuration;
private volatile String rawRule;
@@ -104,19 +114,64 @@ public class MigrationRuleListener implements RegistryProtocolListener, Configur
String rawRule = event.getContent();
if (StringUtils.isEmpty(rawRule)) {
// fail back to startup status
- setRawRule(INIT);
+ rawRule = INIT;
//logger.warn("Received empty migration rule, will ignore.");
- return;
+ }
+ try {
+ ruleQueue.put(rawRule);
+ } catch (InterruptedException e) {
+ logger.error("Put rawRule to rule management queue failed. rawRule: " + rawRule, e);
}
- logger.info("Using the following migration rule to migrate:");
- logger.info(rawRule);
-
- setRawRule(rawRule);
-
- if (CollectionUtils.isNotEmptyMap(handlers)) {
- handlers.forEach((_key, handler) -> handler.doMigrate(rule));
+ if (executorSubmit.compareAndSet(false, true)) {
+ ruleManageExecutor.submit(() -> {
+ while (true) {
+ String rule = "";
+ try {
+ rule = ruleQueue.take();
+ if (StringUtils.isEmpty(rule)) {
+ Thread.sleep(1000);
+ }
+ } catch (InterruptedException e) {
+ logger.error("Poll Rule from config center failed.", e);
+ }
+ if (StringUtils.isEmpty(rule)) {
+ continue;
+ }
+ if (Objects.equals(this.rawRule, rule)) {
+ logger.info("Ignore duplicated rule");
+ continue;
+ }
+ try {
+ logger.info("Using the following migration rule to migrate:");
+ logger.info(rule);
+
+ setRawRule(rule);
+
+ if (CollectionUtils.isNotEmptyMap(handlers)) {
+ ExecutorService executorService = Executors.newFixedThreadPool(100, new NamedThreadFactory("Dubbo-Invoker-Migrate"));
+ CountDownLatch countDownLatch = new CountDownLatch(handlers.size());
+
+ handlers.forEach((_key, handler) ->
+ executorService.submit(() -> {
+ handler.doMigrate(this.rule);
+ countDownLatch.countDown();
+ }));
+
+ try {
+ countDownLatch.await(1, TimeUnit.HOURS);
+ } catch (InterruptedException e) {
+ logger.error("Wait Invoker Migrate interrupted!", e);
+ }
+ executorService.shutdown();
+ }
+ } catch (Throwable t) {
+ logger.error("Error occurred when migration.", t);
+ }
+ }
+ });
}
+
}
public void setRawRule(String rawRule) {
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/MigrationRule.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/MigrationRule.java
index ce7e34b..39463fb 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/MigrationRule.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/MigrationRule.java
@@ -144,24 +144,26 @@ public class MigrationRule {
if (interfaceRules != null) {
SubMigrationRule rule = interfaceRules.get(consumerURL.getDisplayServiceKey());
if (rule != null) {
- return rule.getStep() == null ? step : rule.getStep();
+ if (rule.getStep() != null) {
+ return rule.getStep();
+ }
}
}
if (applications != null) {
ServiceNameMapping serviceNameMapping = ServiceNameMapping.getDefaultExtension();
Set<String> services = serviceNameMapping.getServices(consumerURL);
- if(CollectionUtils.isNotEmpty(services)) {
+ if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
SubMigrationRule rule = applicationRules.get(service);
- if (rule != null) {
- return rule.getStep() == null ? step : rule.getStep();
+ if (rule.getStep() != null) {
+ return rule.getStep();
}
}
}
}
- if(step == null) {
+ if (step == null) {
// initial step : APPLICATION_FIRST
step = MigrationStep.APPLICATION_FIRST;
step = Enum.valueOf(MigrationStep.class,
@@ -180,18 +182,22 @@ public class MigrationRule {
if (interfaceRules != null) {
SubMigrationRule rule = interfaceRules.get(consumerURL.getDisplayServiceKey());
if (rule != null) {
- return rule.getThreshold() == null ? threshold : rule.getThreshold();
+ if (rule.getThreshold() != null) {
+ return rule.getThreshold();
+ }
}
}
if (applications != null) {
ServiceNameMapping serviceNameMapping = ServiceNameMapping.getDefaultExtension();
Set<String> services = serviceNameMapping.getServices(consumerURL);
- if(CollectionUtils.isNotEmpty(services)) {
+ if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
SubMigrationRule rule = applicationRules.get(service);
if (rule != null) {
- return rule.getThreshold() == null ? threshold : rule.getThreshold();
+ if (rule.getThreshold() != null) {
+ return rule.getThreshold();
+ }
}
}
}
@@ -216,18 +222,22 @@ public class MigrationRule {
if (interfaceRules != null) {
SubMigrationRule rule = interfaceRules.get(consumerURL.getDisplayServiceKey());
if (rule != null) {
- return rule.getProportion() == null ? proportion : rule.getProportion();
+ if (rule.getProportion() != null) {
+ return rule.getProportion();
+ }
}
}
if (applications != null) {
ServiceNameMapping serviceNameMapping = ServiceNameMapping.getDefaultExtension();
Set<String> services = serviceNameMapping.getServices(consumerURL);
- if(CollectionUtils.isNotEmpty(services)) {
+ if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
SubMigrationRule rule = applicationRules.get(service);
if (rule != null) {
- return rule.getProportion() == null ? proportion : rule.getProportion();
+ if (rule.getProportion() != null) {
+ return rule.getProportion();
+ }
}
}
}
@@ -248,18 +258,22 @@ public class MigrationRule {
if (interfaceRules != null) {
SubMigrationRule rule = interfaceRules.get(consumerURL.getDisplayServiceKey());
if (rule != null) {
- return rule.getDelay() == null ? delay : rule.getDelay();
+ if (rule.getDelay() != null) {
+ return rule.getDelay();
+ }
}
}
if (applications != null) {
ServiceNameMapping serviceNameMapping = ServiceNameMapping.getDefaultExtension();
Set<String> services = serviceNameMapping.getServices(consumerURL);
- if(CollectionUtils.isNotEmpty(services)) {
+ if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
SubMigrationRule rule = applicationRules.get(service);
if (rule != null) {
- return rule.getDelay() == null ? delay : rule.getDelay();
+ if (rule.getDelay() != null) {
+ return rule.getDelay();
+ }
}
}
}
@@ -284,18 +298,22 @@ public class MigrationRule {
if (interfaceRules != null) {
SubMigrationRule rule = interfaceRules.get(consumerURL.getDisplayServiceKey());
if (rule != null) {
- return rule.getForce() == null ? force : rule.getForce();
+ if (rule.getForce() != null) {
+ return rule.getForce();
+ }
}
}
if (applications != null) {
ServiceNameMapping serviceNameMapping = ServiceNameMapping.getDefaultExtension();
Set<String> services = serviceNameMapping.getServices(consumerURL);
- if(CollectionUtils.isNotEmpty(services)) {
+ if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
SubMigrationRule rule = applicationRules.get(service);
if (rule != null) {
- return rule.getForce() == null ? force : rule.getForce();
+ if (rule.getForce() != null) {
+ return rule.getForce();
+ }
}
}
}