You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/06/08 09:29:36 UTC
[dubbo] branch 3.0 updated: Opt Migration Invoker (#7959)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new c0269b6 Opt Migration Invoker (#7959)
c0269b6 is described below
commit c0269b6e6d917230237f438a060bc0882965e31b
Author: Albumen Kevin <jh...@gmail.com>
AuthorDate: Tue Jun 8 17:29:18 2021 +0800
Opt Migration Invoker (#7959)
---
.../apache/dubbo/common/config/Environment.java | 5 +
.../status/reporter/FrameworkStatusReporter.java | 22 ++
.../org/apache/dubbo/config/ReferenceConfig.java | 3 +-
.../dubbo/config/utils/ConfigValidationUtils.java | 2 +-
.../dubbo/config/bootstrap/DubboBootstrapTest.java | 21 +-
.../metadata/DefaultMetadataParamsFilter.java | 55 -----
.../org/apache/dubbo/metadata/MetadataInfo.java | 19 +-
.../org.apache.dubbo.metadata.MetadataParamsFilter | 1 -
.../registry/client/ServiceDiscoveryRegistry.java | 118 ++++++++-
.../DefaultMigrationAddressComparator.java | 29 +--
.../migration/MigrationAddressComparator.java | 4 +-
.../client/migration/MigrationClusterInvoker.java | 8 +-
.../client/migration/MigrationInvoker.java | 275 +++++++++++----------
.../client/migration/MigrationRuleHandler.java | 108 ++++----
.../client/migration/MigrationRuleListener.java | 159 +++---------
.../ServiceDiscoveryMigrationInvoker.java | 10 +-
.../migration/model/ApplicationMigrationRule.java | 75 ------
.../migration/model/InterfaceMigrationRule.java | 56 ++++-
.../client/migration/model/MigrationRule.java | 182 +++++++-------
.../client/migration/model/MigrationStep.java | 2 +-
.../registry/integration/DynamicDirectory.java | 1 +
.../DefaultMigrationAddressComparatorTest.java | 99 ++++++++
.../client/migration/MigrationInvokerTest.java | 218 ++++++++++++++++
.../client/migration/MigrationRuleHandlerTest.java | 54 ++++
.../migration/MigrationRuleListenerTest.java | 71 ++++++
.../client/migration/model/MigrationRuleTest.java | 32 ++-
26 files changed, 1025 insertions(+), 604 deletions(-)
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/config/Environment.java b/dubbo-common/src/main/java/org/apache/dubbo/common/config/Environment.java
index 9472731..34e5307 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/config/Environment.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/config/Environment.java
@@ -92,6 +92,11 @@ public class Environment extends LifecycleAdapter implements FrameworkExt {
this.localMigrationRule = ConfigUtils.loadMigrationRule(path);
}
+ @Deprecated
+ public void setLocalMigrationRule(String localMigrationRule) {
+ this.localMigrationRule = localMigrationRule;
+ }
+
@DisableInject
public void setExternalConfigMap(Map<String, String> externalConfiguration) {
if (externalConfiguration != null) {
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/status/reporter/FrameworkStatusReporter.java b/dubbo-common/src/main/java/org/apache/dubbo/common/status/reporter/FrameworkStatusReporter.java
index 6d32e9f..d122f91 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/status/reporter/FrameworkStatusReporter.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/status/reporter/FrameworkStatusReporter.java
@@ -34,6 +34,7 @@ public interface FrameworkStatusReporter {
Logger logger = LoggerFactory.getLogger(FrameworkStatusReporter.class);
String REGISTRATION_STATUS = "registration";
String ADDRESS_CONSUMPTION_STATUS = "consumption";
+ String MIGRATION_STEP_STATUS = "migrationStepStatus";
void report(String type, Object obj);
@@ -45,6 +46,14 @@ public interface FrameworkStatusReporter {
doReport(ADDRESS_CONSUMPTION_STATUS, obj);
}
+ static void reportMigrationStepStatus(Object obj) {
+ doReport(MIGRATION_STEP_STATUS, obj);
+ }
+
+ static boolean hasReporter() {
+ return ExtensionLoader.getExtensionLoader(FrameworkStatusReporter.class).getSupportedExtensions().size() > 0;
+ }
+
static void doReport(String type, Object obj) {
// TODO, report asynchronously
try {
@@ -76,4 +85,17 @@ public interface FrameworkStatusReporter {
migrationStatus.put("status", status);
return gson.toJson(migrationStatus);
}
+
+ static String createMigrationStepReport(String interfaceName, String version, String group, String originStep, String newStep, String success) {
+ HashMap<String, String> migrationStatus = new HashMap<>();
+ migrationStatus.put("type", "migrationStepStatus");
+ migrationStatus.put("application", ApplicationModel.getName());
+ migrationStatus.put("service", interfaceName);
+ migrationStatus.put("version", version);
+ migrationStatus.put("group", group);
+ migrationStatus.put("originStep", originStep);
+ migrationStatus.put("newStep", newStep);
+ migrationStatus.put("success", success);
+ return gson.toJson(migrationStatus);
+ }
}
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
index c858dd7..c01efd2 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
@@ -577,7 +577,8 @@ public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
}
// just for test
- Invoker<?> getInvoker() {
+ @Deprecated
+ public Invoker<?> getInvoker() {
return invoker;
}
}
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/ConfigValidationUtils.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/ConfigValidationUtils.java
index 2e3f5e6..d946356 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/ConfigValidationUtils.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/ConfigValidationUtils.java
@@ -241,7 +241,7 @@ public class ConfigValidationUtils {
result.add(interfaceCompatibleRegistryURL);
}
} else {
- registerMode = registryURL.getParameter(REGISTER_MODE_KEY, ConfigurationUtils.getCachedDynamicProperty(DUBBO_REGISTER_MODE_DEFAULT_KEY, DEFAULT_REGISTER_MODE_INTERFACE));
+ registerMode = registryURL.getParameter(REGISTER_MODE_KEY, ConfigurationUtils.getCachedDynamicProperty(DUBBO_REGISTER_MODE_DEFAULT_KEY, DEFAULT_REGISTER_MODE_ALL));
if (!isValidRegisterMode(registerMode)) {
registerMode = DEFAULT_REGISTER_MODE_INTERFACE;
}
diff --git a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/bootstrap/DubboBootstrapTest.java b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/bootstrap/DubboBootstrapTest.java
index b6e2943..68bda0c 100644
--- a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/bootstrap/DubboBootstrapTest.java
+++ b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/bootstrap/DubboBootstrapTest.java
@@ -24,10 +24,10 @@ import org.apache.dubbo.config.AbstractInterfaceConfigTest;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.MonitorConfig;
import org.apache.dubbo.config.ServiceConfig;
+import org.apache.dubbo.config.SysProps;
import org.apache.dubbo.config.api.DemoService;
import org.apache.dubbo.config.provider.impl.DemoServiceImpl;
import org.apache.dubbo.config.utils.ConfigValidationUtils;
-import org.apache.dubbo.config.SysProps;
import org.apache.dubbo.monitor.MonitorService;
import org.apache.dubbo.registry.RegistryService;
@@ -126,15 +126,16 @@ public class DubboBootstrapTest {
//ApplicationModel.getEnvironment().setDynamicConfiguration(new CompositeDynamicConfiguration());
List<URL> urls = ConfigValidationUtils.loadRegistries(serviceConfig, true);
- Assertions.assertEquals(1, urls.size());
- URL url = urls.get(0);
- Assertions.assertEquals("registry", url.getProtocol());
- Assertions.assertEquals("addr1:9090", url.getAddress());
- Assertions.assertEquals(RegistryService.class.getName(), url.getPath());
- Assertions.assertTrue(url.getParameters().containsKey("timestamp"));
- Assertions.assertTrue(url.getParameters().containsKey("pid"));
- Assertions.assertTrue(url.getParameters().containsKey("registry"));
- Assertions.assertTrue(url.getParameters().containsKey("dubbo"));
+ Assertions.assertEquals(2, urls.size());
+ for (URL url : urls) {
+ Assertions.assertTrue(url.getProtocol().contains("registry"));
+ Assertions.assertEquals("addr1:9090", url.getAddress());
+ Assertions.assertEquals(RegistryService.class.getName(), url.getPath());
+ Assertions.assertTrue(url.getParameters().containsKey("timestamp"));
+ Assertions.assertTrue(url.getParameters().containsKey("pid"));
+ Assertions.assertTrue(url.getParameters().containsKey("registry"));
+ Assertions.assertTrue(url.getParameters().containsKey("dubbo"));
+ }
}
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/DefaultMetadataParamsFilter.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/DefaultMetadataParamsFilter.java
deleted file mode 100644
index 667c7cd..0000000
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/DefaultMetadataParamsFilter.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.metadata;
-
-import org.apache.dubbo.common.extension.Activate;
-
-import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.RELEASE_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
-import static org.apache.dubbo.remoting.Constants.CODEC_KEY;
-import static org.apache.dubbo.remoting.Constants.CONNECTIONS_KEY;
-import static org.apache.dubbo.remoting.Constants.EXCHANGER_KEY;
-import static org.apache.dubbo.remoting.Constants.SERIALIZATION_KEY;
-import static org.apache.dubbo.rpc.Constants.DEPRECATED_KEY;
-import static org.apache.dubbo.rpc.Constants.MOCK_KEY;
-import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
-import static org.apache.dubbo.rpc.cluster.Constants.WARMUP_KEY;
-import static org.apache.dubbo.rpc.cluster.Constants.WEIGHT_KEY;
-
-@Activate
-public class DefaultMetadataParamsFilter implements MetadataParamsFilter {
- @Override
- public String[] serviceParamsIncluded() {
- return new String[]{
- CODEC_KEY, EXCHANGER_KEY, SERIALIZATION_KEY, CLUSTER_KEY, CONNECTIONS_KEY, DEPRECATED_KEY,
- GROUP_KEY, LOADBALANCE_KEY, MOCK_KEY, PATH_KEY, TIMEOUT_KEY, TOKEN_KEY, VERSION_KEY, WARMUP_KEY,
- WEIGHT_KEY, DUBBO_VERSION_KEY, RELEASE_KEY
- };
- }
-
-
- @Override
- public String[] instanceParamsIncluded() {
- return new String[0];
- }
-}
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
index afd1ec9..2410dc0 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
@@ -40,6 +40,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.dubbo.common.constants.CommonConstants.DOT_SEPARATOR;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPARATOR;
import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
+import static org.apache.dubbo.common.constants.FilterConstants.VALIDATION_KEY;
+import static org.apache.dubbo.common.constants.QosConstants.ACCEPT_FOREIGN_IP;
+import static org.apache.dubbo.common.constants.QosConstants.QOS_ENABLE;
+import static org.apache.dubbo.common.constants.QosConstants.QOS_HOST;
+import static org.apache.dubbo.common.constants.QosConstants.QOS_PORT;
+import static org.apache.dubbo.remoting.Constants.BIND_IP_KEY;
+import static org.apache.dubbo.remoting.Constants.BIND_PORT_KEY;
+import static org.apache.dubbo.rpc.Constants.INTERFACES;
public class MetadataInfo implements Serializable {
public static final MetadataInfo EMPTY = new MetadataInfo();
@@ -52,7 +61,8 @@ public class MetadataInfo implements Serializable {
private transient Map<String, String> extendParams;
private transient AtomicBoolean reported = new AtomicBoolean(false);
- public MetadataInfo() {}
+ public MetadataInfo() {
+ }
public MetadataInfo(String app) {
this(app, null, null);
@@ -217,6 +227,13 @@ public class MetadataInfo implements Serializable {
this.url = url;
Map<String, String> params = new HashMap<>();
List<MetadataParamsFilter> filters = loader.getActivateExtension(url, "params-filter");
+ if (filters.size() == 0) {
+ params.putAll(
+ url.removeParameters(
+ MONITOR_KEY, BIND_IP_KEY, BIND_PORT_KEY, QOS_ENABLE,
+ QOS_HOST, QOS_PORT, ACCEPT_FOREIGN_IP, VALIDATION_KEY, INTERFACES)
+ .getParameters());
+ }
for (MetadataParamsFilter filter : filters) {
String[] paramsIncluded = filter.serviceParamsIncluded();
if (ArrayUtils.isNotEmpty(paramsIncluded)) {
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.MetadataParamsFilter b/dubbo-metadata/dubbo-metadata-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.MetadataParamsFilter
deleted file mode 100644
index 0d903f3..0000000
--- a/dubbo-metadata/dubbo-metadata-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.MetadataParamsFilter
+++ /dev/null
@@ -1 +0,0 @@
-dubbo=org.apache.dubbo.metadata.DefaultMetadataParamsFilter
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
index 42d4c3e..b7599a3 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
@@ -22,6 +22,10 @@ import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.metadata.MappingChangedEvent;
+import org.apache.dubbo.metadata.MappingListener;
+import org.apache.dubbo.metadata.ServiceNameMapping;
import org.apache.dubbo.metadata.WritableMetadataService;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.Registry;
@@ -39,21 +43,31 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.TreeSet;
import java.util.stream.Collectors;
import static java.lang.String.format;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.unmodifiableSet;
+import static java.util.stream.Collectors.toSet;
+import static java.util.stream.Stream.of;
import static org.apache.dubbo.common.constants.CommonConstants.CHECK_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPARATOR;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.MAPPING_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_SIDE;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDED_BY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_CLUSTER_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_TYPE_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.SERVICE_REGISTRY_TYPE;
+import static org.apache.dubbo.common.constants.RegistryConstants.SUBSCRIBED_SERVICE_NAMES_KEY;
import static org.apache.dubbo.common.function.ThrowableAction.execute;
+import static org.apache.dubbo.common.utils.CollectionUtils.isEmpty;
+import static org.apache.dubbo.common.utils.StringUtils.isBlank;
import static org.apache.dubbo.metadata.ServiceNameMapping.toStringKeys;
import static org.apache.dubbo.registry.client.ServiceDiscoveryFactory.getExtension;
@@ -172,7 +186,7 @@ public class ServiceDiscoveryRegistry implements Registry {
}
} else {
if (logger.isWarnEnabled()) {
- logger.info(format("The URL[%s] has been registered.", url.toString()));
+ logger.warn(format("The URL[%s] has been registered.", url.toString()));
}
}
}
@@ -211,16 +225,25 @@ public class ServiceDiscoveryRegistry implements Registry {
writableMetadataService.subscribeURL(url);
boolean check = url.getParameter(CHECK_KEY, false);
+
+ Set<String> subscribedServices = Collections.emptySet();
+ try {
+ subscribedServices = getServices(registryURL, url, listener);
+ WritableMetadataService.getDefaultExtension().putCachedMapping(ServiceNameMapping.buildMappingKey(url), subscribedServices);
+ } catch (Exception e) {
+ logger.warn("Cannot find app mapping for service " + url.getServiceInterface() + ", will not migrate.", e);
+ }
+
Set<String> serviceNames = writableMetadataService.getCachedMapping(url);
- if (CollectionUtils.isEmpty(serviceNames)) {
+ if (CollectionUtils.isEmpty(subscribedServices)) {
if (check) {
throw new IllegalStateException("Should has at least one way to know which services this interface belongs to, subscription url: " + url);
}
return;
}
- subscribeURLs(url, listener, serviceNames);
+ subscribeURLs(url, listener, subscribedServices);
}
@Override
@@ -241,6 +264,7 @@ public class ServiceDiscoveryRegistry implements Registry {
}
public void doUnsubscribe(URL url, NotifyListener listener) {
+ // TODO: remove service name mapping listener
writableMetadataService.unsubscribeURL(url);
String protocolServiceKey = url.getServiceKey() + GROUP_CHAR_SEPARATOR + url.getParameter(PROTOCOL_KEY, DUBBO);
Set<String> serviceNames = writableMetadataService.getCachedMapping(url);
@@ -410,4 +434,92 @@ public class ServiceDiscoveryRegistry implements Registry {
public Map<String, ServiceInstancesChangedListener> getServiceListeners() {
return serviceListeners;
}
+
+
+ /**
+ * 1.developer explicitly specifies the application name this interface belongs to
+ * 2.check Interface-App mapping
+ * 3.use the services specified in registry url.
+ *
+ * @param subscribedURL
+ * @return
+ */
+ protected Set<String> getServices(URL registryURL, URL subscribedURL, NotifyListener listener) {
+ Set<String> subscribedServices = new TreeSet<>();
+ Set<String> globalConfiguredSubscribingServices = parseServices(registryURL.getParameter(SUBSCRIBED_SERVICE_NAMES_KEY));
+
+ String serviceNames = subscribedURL.getParameter(PROVIDED_BY);
+ if (StringUtils.isNotEmpty(serviceNames)) {
+ logger.info(subscribedURL.getServiceInterface() + " mapping to " + serviceNames + " instructed by provided-by set by user.");
+ subscribedServices.addAll(parseServices(serviceNames));
+ }
+
+ if (isEmpty(subscribedServices)) {
+ Set<String> mappedServices = findMappedServices(registryURL, subscribedURL, new DefaultMappingListener(subscribedURL, subscribedServices, listener));
+ logger.info(subscribedURL.getServiceInterface() + " mapping to " + mappedServices + " instructed by remote metadata center.");
+ subscribedServices.addAll(mappedServices);
+ if (isEmpty(subscribedServices)) {
+ logger.info(subscribedURL.getServiceInterface() + " mapping to " + globalConfiguredSubscribingServices + " by default.");
+ subscribedServices.addAll(globalConfiguredSubscribingServices);
+ }
+ }
+ return subscribedServices;
+ }
+
+ protected Set<String> findMappedServices(URL registryURL, URL subscribedURL, MappingListener listener) {
+ Set<String> result = new LinkedHashSet<>();
+ ServiceNameMapping serviceNameMapping = ServiceNameMapping.getExtension(registryURL.getParameter(MAPPING_KEY));
+ result.addAll(serviceNameMapping.getAndListen(subscribedURL, listener));
+ result.addAll(serviceNameMapping.getAndListenWithNewStore(subscribedURL, listener));
+ return result;
+ }
+
+ public static Set<String> parseServices(String literalServices) {
+ return isBlank(literalServices) ? emptySet() :
+ unmodifiableSet(of(literalServices.split(","))
+ .map(String::trim)
+ .filter(StringUtils::isNotEmpty)
+ .collect(toSet()));
+ }
+
+ private class DefaultMappingListener implements MappingListener {
+ private final Logger logger = LoggerFactory.getLogger(DefaultMappingListener.class);
+ private URL url;
+ private Set<String> oldApps;
+ private NotifyListener listener;
+
+ public DefaultMappingListener(URL subscribedURL, Set<String> serviceNames, NotifyListener listener) {
+ this.url = subscribedURL;
+ this.oldApps = serviceNames;
+ this.listener = listener;
+ }
+
+ @Override
+ public void onEvent(MappingChangedEvent event) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received mapping notification from meta server, " + event);
+ }
+ Set<String> newApps = event.getApps();
+ Set<String> tempOldApps = oldApps;
+ oldApps = newApps;
+
+ if (CollectionUtils.isEmpty(newApps)) {
+ return;
+ }
+
+ if (CollectionUtils.isEmpty(tempOldApps) && newApps.size() > 0) {
+ WritableMetadataService.getDefaultExtension().putCachedMapping(ServiceNameMapping.buildMappingKey(url), newApps);
+ subscribeURLs(url, listener, newApps);
+ return;
+ }
+
+ for (String newAppName : newApps) {
+ if (!tempOldApps.contains(newAppName)) {
+ WritableMetadataService.getDefaultExtension().putCachedMapping(ServiceNameMapping.buildMappingKey(url), newApps);
+ subscribeURLs(url, listener, newApps);
+ return;
+ }
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java
index a37d55b..5223933 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java
@@ -21,7 +21,6 @@ import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
-import org.apache.dubbo.metadata.WritableMetadataService;
import org.apache.dubbo.registry.client.migration.model.MigrationRule;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.cluster.ClusterInvoker;
@@ -33,42 +32,40 @@ import java.util.concurrent.ConcurrentHashMap;
public class DefaultMigrationAddressComparator implements MigrationAddressComparator {
private static final Logger logger = LoggerFactory.getLogger(DefaultMigrationAddressComparator.class);
private static final String MIGRATION_THRESHOLD = "dubbo.application.migration.threshold";
- private static final String DEFAULT_THRESHOLD_STRING = "1.0";
- private static final float DEFAULT_THREAD = 1.0f;
+ private static final String DEFAULT_THRESHOLD_STRING = "0.0";
+ private static final float DEFAULT_THREAD = 0f;
public static final String OLD_ADDRESS_SIZE = "OLD_ADDRESS_SIZE";
public static final String NEW_ADDRESS_SIZE = "NEW_ADDRESS_SIZE";
- private static final WritableMetadataService localMetadataService = WritableMetadataService.getDefaultExtension();
-
private Map<String, Map<String, Integer>> serviceMigrationData = new ConcurrentHashMap<>();
@Override
- public <T> boolean shouldMigrate(ClusterInvoker<T> serviceDiscoveryInvoker, ClusterInvoker<T> invoker, MigrationRule rule) {
- Map<String, Integer> migrationData = serviceMigrationData.computeIfAbsent(invoker.getUrl().getDisplayServiceKey(), _k -> new ConcurrentHashMap<>());
+ public <T> boolean shouldMigrate(ClusterInvoker<T> newInvoker, ClusterInvoker<T> oldInvoker, MigrationRule rule) {
+ Map<String, Integer> migrationData = serviceMigrationData.computeIfAbsent(oldInvoker.getUrl().getDisplayServiceKey(), _k -> new ConcurrentHashMap<>());
- if (!serviceDiscoveryInvoker.hasProxyInvokers()) {
- migrationData.put(OLD_ADDRESS_SIZE, getAddressSize(invoker));
+ if (!newInvoker.hasProxyInvokers()) {
+ migrationData.put(OLD_ADDRESS_SIZE, getAddressSize(oldInvoker));
migrationData.put(NEW_ADDRESS_SIZE, -1);
logger.info("No instance address available, stop compare.");
return false;
}
- if (!invoker.hasProxyInvokers()) {
+ if (!oldInvoker.hasProxyInvokers()) {
migrationData.put(OLD_ADDRESS_SIZE, -1);
- migrationData.put(NEW_ADDRESS_SIZE, getAddressSize(serviceDiscoveryInvoker));
+ migrationData.put(NEW_ADDRESS_SIZE, getAddressSize(newInvoker));
logger.info("No interface address available, stop compare.");
return true;
}
- int newAddressSize = getAddressSize(serviceDiscoveryInvoker);
- int oldAddressSize = getAddressSize(invoker);
+ int newAddressSize = getAddressSize(newInvoker);
+ int oldAddressSize = getAddressSize(oldInvoker);
migrationData.put(OLD_ADDRESS_SIZE, oldAddressSize);
migrationData.put(NEW_ADDRESS_SIZE, newAddressSize);
String rawThreshold = null;
- String serviceKey = invoker.getUrl().getDisplayServiceKey();
- Float configedThreshold = rule == null ? null : rule.getThreshold(serviceKey, localMetadataService.getCachedMapping(invoker.getUrl()));
+ String serviceKey = oldInvoker.getUrl().getDisplayServiceKey();
+ Float configedThreshold = rule == null ? null : rule.getThreshold(serviceKey);
if (configedThreshold != null && configedThreshold >= 0) {
rawThreshold = String.valueOf(configedThreshold);
}
@@ -81,7 +78,7 @@ public class DefaultMigrationAddressComparator implements MigrationAddressCompar
threshold = DEFAULT_THREAD;
}
- logger.info("serviceKey:" + invoker.getUrl().getServiceKey() + " Instance address size " + newAddressSize + ", interface address size " + oldAddressSize + ", threshold " + threshold);
+ logger.info("serviceKey:" + oldInvoker.getUrl().getServiceKey() + " Instance address size " + newAddressSize + ", interface address size " + oldAddressSize + ", threshold " + threshold);
if (newAddressSize != 0 && oldAddressSize == 0) {
return true;
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationAddressComparator.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationAddressComparator.java
index 32fedab..2f8b714 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationAddressComparator.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationAddressComparator.java
@@ -24,6 +24,8 @@ import java.util.Map;
@SPI
public interface MigrationAddressComparator {
- <T> boolean shouldMigrate(ClusterInvoker<T> serviceDiscoveryInvoker, ClusterInvoker<T> invoker, MigrationRule rule);
+
+ <T> boolean shouldMigrate(ClusterInvoker<T> newInvoker, ClusterInvoker<T> oldInvoker, MigrationRule rule);
+
Map<String, Integer> getAddressSize(String displayServiceKey);
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationClusterInvoker.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationClusterInvoker.java
index 2efae4c..2de95c5 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationClusterInvoker.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationClusterInvoker.java
@@ -38,13 +38,11 @@ public interface MigrationClusterInvoker<T> extends ClusterInvoker<T> {
void setMigrationRule(MigrationRule rule);
- boolean invokersChanged();
+ boolean migrateToForceInterfaceInvoker(MigrationRule newRule);
- void fallbackToInterfaceInvoker();
+ boolean migrateToForceApplicationInvoker(MigrationRule newRule);
- void migrateToServiceDiscoveryInvoker(boolean forceMigrate);
-
- void refreshServiceDiscoveryInvokerOnMappingCallback(boolean forceMigrate);
+ void migrateToApplicationFirstInvoker(MigrationRule newRule);
void reRefer(URL newSubscribeUrl);
}
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
index be2e0f8..d903a6b 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
@@ -17,7 +17,6 @@
package org.apache.dubbo.registry.client.migration;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
@@ -38,17 +37,18 @@ import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ConsumerModel;
+import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import static org.apache.dubbo.common.status.reporter.FrameworkStatusReporter.createConsumptionReport;
+import static org.apache.dubbo.registry.client.migration.model.MigrationStep.APPLICATION_FIRST;
import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY;
public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
private Logger logger = LoggerFactory.getLogger(MigrationInvoker.class);
- private static final String MIGRATION_DELAY_KEY = "dubbo.application.migration.delay";
private URL url;
private URL consumerUrl;
@@ -62,9 +62,7 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
private volatile ClusterInvoker<T> currentAvailableInvoker;
private volatile MigrationStep step;
private volatile MigrationRule rule;
- private volatile boolean migrated;
-
- private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+ private volatile int promotion = 100;
public MigrationInvoker(RegistryProtocol registryProtocol,
Cluster cluster,
@@ -114,6 +112,10 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
this.serviceDiscoveryInvoker = serviceDiscoveryInvoker;
}
+ public ClusterInvoker<T> getCurrentAvailableInvoker() {
+ return currentAvailableInvoker;
+ }
+
@Override
public Class<T> getInterface() {
return type;
@@ -148,130 +150,127 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
}
@Override
- public void fallbackToInterfaceInvoker() {
- migrated = false;
- refreshInterfaceInvoker();
- setListener(invoker, () -> {
- if (!migrated) {
- migrated = true;
- this.destroyServiceDiscoveryInvoker();
- FrameworkStatusReporter.reportConsumptionStatus(
- createConsumptionReport(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(), "interface")
- );
- }
- });
- }
+ public boolean migrateToForceInterfaceInvoker(MigrationRule newRule) {
+ CountDownLatch latch = new CountDownLatch(1);
+ refreshInterfaceInvoker(latch);
+
+ if (serviceDiscoveryInvoker == null) {
+ // serviceDiscoveryInvoker is absent, ignore threshold check
+ this.currentAvailableInvoker = invoker;
+ return true;
+ }
- @Override
- public void migrateToServiceDiscoveryInvoker(boolean forceMigrate) {
- if (!checkMigratingConditionMatch(consumerUrl)) {
- fallbackToInterfaceInvoker();
- return;
+ // wait and compare threshold
+ waitAddressNotify(newRule, latch);
+
+ if (Boolean.TRUE.equals(newRule.getForce(consumerUrl.getDisplayServiceKey()))) {
+ // force migrate, ignore threshold check
+ this.currentAvailableInvoker = invoker;
+ this.destroyServiceDiscoveryInvoker();
+ return true;
}
- migrated = false;
- if (!forceMigrate) {
- refreshServiceDiscoveryInvoker();
- refreshInterfaceInvoker();
- // By the time the task gets scheduled, the address notifications are expected to be finished for both address
- // types. Otherwise, migration task will just pick interface invoker.
- if (!migrated) {
- scheduler.schedule(new MigrationTask(), getDelay(), TimeUnit.MILLISECONDS);
- setListener(invoker, () -> {
- this.setAvailableInvoker(serviceDiscoveryInvoker, invoker);
- });
- setListener(serviceDiscoveryInvoker, () -> {
- this.setAvailableInvoker(serviceDiscoveryInvoker, invoker);
- });
+ Set<MigrationAddressComparator> detectors = ExtensionLoader.getExtensionLoader(MigrationAddressComparator.class).getSupportedExtensionInstances();
+ if (CollectionUtils.isNotEmpty(detectors)) {
+ if (detectors.stream().allMatch(comparator -> comparator.shouldMigrate(invoker, serviceDiscoveryInvoker, newRule))) {
+ this.currentAvailableInvoker = invoker;
+ this.destroyServiceDiscoveryInvoker();
+ return true;
}
- } else {
- refreshServiceDiscoveryInvoker();
- setListener(serviceDiscoveryInvoker, () -> {
- if (!migrated) {
- migrated = true;
- this.destroyInterfaceInvoker();
- FrameworkStatusReporter.reportConsumptionStatus(
- createConsumptionReport(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(), "app")
- );
- }
- });
}
- }
- private int getDelay() {
- int delay = 60000;
- String delayStr = ConfigurationUtils.getProperty(MIGRATION_DELAY_KEY);
- if (StringUtils.isEmpty(delayStr)) {
- return delay;
+ // compare failed, will not change state
+ if (step == MigrationStep.FORCE_APPLICATION) {
+ destroyInterfaceInvoker();
}
+ return false;
+ }
- try {
- delay = Integer.parseInt(delayStr);
- } catch (Exception e) {
- logger.warn("Invalid migration delay param " + delayStr);
+ @Override
+ public boolean migrateToForceApplicationInvoker(MigrationRule newRule) {
+ CountDownLatch latch = new CountDownLatch(1);
+ refreshServiceDiscoveryInvoker(latch);
+
+ if (invoker == null) {
+ // invoker is absent, ignore threshold check
+ this.currentAvailableInvoker = serviceDiscoveryInvoker;
+ return true;
}
- return delay;
- }
- private class MigrationTask implements Runnable {
- @Override
- public void run() {
- if (migrated) {
- return;
- }
+ // wait and compare threshold
+ waitAddressNotify(newRule, latch);
- Set<MigrationAddressComparator> detectors = ExtensionLoader.getExtensionLoader(MigrationAddressComparator.class).getSupportedExtensionInstances();
- if (CollectionUtils.isEmpty(detectors)) {
- migrated = true;
- destroyInterfaceInvoker();
- return;
- }
+ if (Boolean.TRUE.equals(newRule.getForce(consumerUrl.getDisplayServiceKey()))) {
+ // force migrate, ignore threshold check
+ this.currentAvailableInvoker = serviceDiscoveryInvoker;
+ this.destroyInterfaceInvoker();
+ return true;
+ }
- if (detectors.stream().allMatch(comparator -> comparator.shouldMigrate(serviceDiscoveryInvoker, invoker, rule))) {
- destroyInterfaceInvoker();
- FrameworkStatusReporter.reportConsumptionStatus(
- createConsumptionReport(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(), "app_app")
- );
- } else {
- // by default
- destroyServiceDiscoveryInvoker();
- FrameworkStatusReporter.reportConsumptionStatus(
- createConsumptionReport(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(), "app_interface")
- );
+ Set<MigrationAddressComparator> detectors = ExtensionLoader.getExtensionLoader(MigrationAddressComparator.class).getSupportedExtensionInstances();
+ if (CollectionUtils.isNotEmpty(detectors)) {
+ if (detectors.stream().allMatch(comparator -> comparator.shouldMigrate(serviceDiscoveryInvoker, invoker, newRule))) {
+ this.currentAvailableInvoker = serviceDiscoveryInvoker;
+ this.destroyInterfaceInvoker();
+ return true;
}
- migrated = true;
}
- }
- private boolean checkMigratingConditionMatch(URL consumerUrl) {
- Set<PreMigratingConditionChecker> checkers = ExtensionLoader.getExtensionLoader(PreMigratingConditionChecker.class).getSupportedExtensionInstances();
- if (CollectionUtils.isNotEmpty(checkers)) {
- PreMigratingConditionChecker checker = checkers.iterator().next();
- return checker.checkCondition(consumerUrl);
+ // compare failed, will not change state
+ if (step == MigrationStep.FORCE_INTERFACE) {
+ destroyServiceDiscoveryInvoker();
}
- return true;
+ return false;
}
@Override
- public void refreshServiceDiscoveryInvokerOnMappingCallback(boolean forceMigrate) {
- if (this.serviceDiscoveryInvoker != null) {
- DynamicDirectory dynamicDirectory = (DynamicDirectory) this.serviceDiscoveryInvoker.getDirectory();
- dynamicDirectory.subscribe(dynamicDirectory.getSubscribeUrl());
+ public void migrateToApplicationFirstInvoker(MigrationRule newRule) {
+ CountDownLatch latch = new CountDownLatch(0);
+ refreshInterfaceInvoker(latch);
+ refreshServiceDiscoveryInvoker(latch);
+
+ // directly calculate preferred invoker, will not wait until address notify
+ // calculation will re-occurred when address notify later
+ calcPreferredInvoker(newRule);
+ }
+
+ private void waitAddressNotify(MigrationRule newRule, CountDownLatch latch) {
+ // wait and compare threshold
+ Integer delay = newRule.getDelay(consumerUrl.getDisplayServiceKey());
+ if (delay != null) {
+ try {
+ Thread.sleep(delay * 1000);
+ } catch (InterruptedException e) {
+ logger.error("Interrupter when waiting for address notify!" + e);
+ }
} else {
- migrateToServiceDiscoveryInvoker(forceMigrate);
+ // do not wait address notify by default
+ delay = 0;
+ }
+ try {
+ latch.await(delay, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ logger.error("Interrupter when waiting for address notify!" + e);
}
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
if (currentAvailableInvoker != null) {
+ if (step == APPLICATION_FIRST) {
+ // call ratio calculation based on random value
+ if (ThreadLocalRandom.current().nextDouble(100) > promotion) {
+ return invoker.invoke(invocation);
+ }
+ }
return currentAvailableInvoker.invoke(invocation);
}
switch (step) {
case APPLICATION_FIRST:
- // FIXME, check ClusterInvoker.hasProxyInvokers() or ClusterInvoker.isAvailable()
- if (checkInvokerAvailable(invoker)) {
+ if (checkInvokerAvailable(serviceDiscoveryInvoker)) {
+ currentAvailableInvoker = serviceDiscoveryInvoker;
+ } else if (checkInvokerAvailable(invoker)) {
currentAvailableInvoker = invoker;
} else {
currentAvailableInvoker = serviceDiscoveryInvoker;
@@ -280,7 +279,7 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
case FORCE_APPLICATION:
currentAvailableInvoker = serviceDiscoveryInvoker;
break;
- case INTERFACE_FIRST:
+ case FORCE_INTERFACE:
default:
currentAvailableInvoker = invoker;
}
@@ -376,33 +375,7 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
@Override
public void setMigrationRule(MigrationRule rule) {
this.rule = rule;
- }
-
- @Override
- public boolean invokersChanged() {
- return invokersChanged;
- }
-
- private volatile boolean invokersChanged;
-
- /**
- * Set the available invoker before migration is determined. Interface invoker goes first.
- * @param serviceDiscoveryInvoker
- * @param invoker
- */
- private void setAvailableInvoker(ClusterInvoker<T> serviceDiscoveryInvoker, ClusterInvoker<T> invoker) {
- this.invokersChanged = true;
-
- if (currentAvailableInvoker == null) {
- if (invoker != null && !invoker.isDestroyed() && invoker.hasProxyInvokers()) {
- currentAvailableInvoker = invoker;
- } else if (serviceDiscoveryInvoker != null && !serviceDiscoveryInvoker.isDestroyed() && serviceDiscoveryInvoker.hasProxyInvokers()) {
- currentAvailableInvoker = serviceDiscoveryInvoker;
- }
- if (currentAvailableInvoker == null) {
- currentAvailableInvoker = invoker;
- }
- }
+ promotion = Optional.ofNullable(rule.getProportion(consumerUrl.getDisplayServiceKey())).orElse(100);
}
protected void destroyServiceDiscoveryInvoker() {
@@ -418,26 +391,66 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
}
}
- protected void refreshServiceDiscoveryInvoker() {
+ protected void refreshServiceDiscoveryInvoker(CountDownLatch latch) {
clearListener(serviceDiscoveryInvoker);
if (needRefresh(serviceDiscoveryInvoker)) {
if (logger.isDebugEnabled()) {
logger.debug("Re-subscribing instance addresses, current interface " + type.getName());
}
+
+ if (serviceDiscoveryInvoker != null) {
+ serviceDiscoveryInvoker.destroy();
+ }
serviceDiscoveryInvoker = registryProtocol.getServiceDiscoveryInvoker(cluster, registry, type, url);
}
+ setListener(serviceDiscoveryInvoker, () -> {
+ latch.countDown();
+ FrameworkStatusReporter.reportConsumptionStatus(
+ createConsumptionReport(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(), "app")
+ );
+ if (step == APPLICATION_FIRST) {
+ calcPreferredInvoker(rule);
+ }
+ });
}
- protected void refreshInterfaceInvoker() {
+ protected void refreshInterfaceInvoker(CountDownLatch latch) {
clearListener(invoker);
- // FIXME invoker.destroy();
if (needRefresh(invoker)) {
if (logger.isDebugEnabled()) {
logger.debug("Re-subscribing interface addresses for interface " + type.getName());
}
+ if (invoker != null) {
+ invoker.destroy();
+ }
invoker = registryProtocol.getInvoker(cluster, registry, type, url);
}
+ setListener(invoker, () -> {
+ latch.countDown();
+ FrameworkStatusReporter.reportConsumptionStatus(
+ createConsumptionReport(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(), "interface")
+ );
+ if (step == APPLICATION_FIRST) {
+ calcPreferredInvoker(rule);
+ }
+ });
+ }
+
+ private synchronized void calcPreferredInvoker(MigrationRule migrationRule) {
+ if (serviceDiscoveryInvoker == null || invoker == null) {
+ return;
+ }
+ Set<MigrationAddressComparator> detectors = ExtensionLoader.getExtensionLoader(MigrationAddressComparator.class).getSupportedExtensionInstances();
+ if (CollectionUtils.isNotEmpty(detectors)) {
+ // pick preferred invoker
+ // the real invoker choice in invocation will be affected by promotion
+ if (detectors.stream().allMatch(comparator -> comparator.shouldMigrate(serviceDiscoveryInvoker, invoker, migrationRule))) {
+ this.currentAvailableInvoker = serviceDiscoveryInvoker;
+ } else {
+ this.currentAvailableInvoker = invoker;
+ }
+ }
}
protected void destroyInterfaceInvoker() {
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java
index c194b2c..2bf90d6 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java
@@ -20,15 +20,13 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.CollectionUtils;
-import org.apache.dubbo.metadata.WritableMetadataService;
+import org.apache.dubbo.common.status.reporter.FrameworkStatusReporter;
import org.apache.dubbo.registry.client.migration.model.MigrationRule;
import org.apache.dubbo.registry.client.migration.model.MigrationStep;
-import java.util.Set;
-
public class MigrationRuleHandler<T> {
public static final String DUBBO_SERVICEDISCOVERY_MIGRATION = "dubbo.application.service-discovery.migration";
+ public static final String MIGRATION_KEY = "migration";
private static final Logger logger = LoggerFactory.getLogger(MigrationRuleHandler.class);
private MigrationClusterInvoker<T> migrationInvoker;
@@ -36,112 +34,102 @@ public class MigrationRuleHandler<T> {
private Float currentThreshold = 0f;
private URL consumerURL;
- private final WritableMetadataService writableMetadataService;
-
public MigrationRuleHandler(MigrationClusterInvoker<T> invoker, URL url) {
this.migrationInvoker = invoker;
this.consumerURL = url;
- this.writableMetadataService = WritableMetadataService.getDefaultExtension();
}
- public synchronized void doMigrate(MigrationRule rule, boolean isCallback) {
+ public synchronized void doMigrate(MigrationRule rule) {
if (migrationInvoker instanceof ServiceDiscoveryMigrationInvoker) {
- if (!isCallback) {
- initInvoker(MigrationStep.FORCE_APPLICATION, 1.0f);
- } else {
- migrationInvoker.refreshServiceDiscoveryInvokerOnMappingCallback(true);
- }
+ refreshInvoker(MigrationStep.FORCE_APPLICATION, 1.0f, rule);
return;
}
- MigrationStep step = MigrationStep.INTERFACE_FIRST;
+ // initial step : FORCE_INTERFACE
+ MigrationStep step = MigrationStep.APPLICATION_FIRST;
Float threshold = -1f;
if (rule == MigrationRule.INIT) {
- step = Enum.valueOf(MigrationStep.class, ConfigurationUtils.getCachedDynamicProperty(DUBBO_SERVICEDISCOVERY_MIGRATION, step.name()));
+ step = Enum.valueOf(MigrationStep.class,
+ consumerURL.getParameter(MIGRATION_KEY,
+ ConfigurationUtils.getCachedDynamicProperty(DUBBO_SERVICEDISCOVERY_MIGRATION, step.name())));
} else {
try {
String serviceKey = consumerURL.getDisplayServiceKey();
- Set<String> apps = writableMetadataService.getCachedMapping(consumerURL);
- // FIXME, consumerURL.getHost() might not exactly the ip expected.
- if (CollectionUtils.isNotEmpty(apps)) { //empty only happens when meta server does not work properly
- if (CollectionUtils.isEmpty(rule.getTargetIps())) {
- setMigrationRule(rule);
- step = getMigrationStep(rule, step, serviceKey, apps);
- threshold = getMigrationThreshold(rule, threshold, serviceKey, apps);
- } else {
- if (rule.getTargetIps().contains(consumerURL.getHost())) {
- setMigrationRule(rule);
- step = getMigrationStep(rule, step, serviceKey, apps);
- threshold = getMigrationThreshold(rule, threshold, serviceKey, apps);
- } else {
- setMigrationRule(null); // clear previous rule
- logger.info("New migration rule ignored and previous migration rule cleared, new target ips " + rule.getTargetIps() + " and local ip " + consumerURL.getHost() + " do not match");
- }
- }
- }
+ step = getMigrationStep(rule, step, serviceKey);
+ threshold = getMigrationThreshold(rule, threshold, serviceKey);
} catch (Exception e) {
logger.error("Failed to get step and threshold info from rule: " + rule, e);
}
}
- if (!isCallback) {
- initInvoker(step, threshold);
- } else {
- refreshInvoker(step, threshold);
+ if (refreshInvoker(step, threshold, rule)) {
+ // refresh success, update rule
+ setMigrationRule(rule);
}
}
- private void initInvoker(MigrationStep step, Float threshold) {
+ private boolean refreshInvoker(MigrationStep step, Float threshold, MigrationRule newRule) {
if (step == null || threshold == null) {
throw new IllegalStateException("Step or threshold of migration rule cannot be null");
}
+ MigrationStep originStep = currentStep;
+
if ((currentStep == null || currentStep != step) || !currentThreshold.equals(threshold)) {
- setCurrentStepAndThreshold(step, threshold);
+ boolean success = true;
switch (step) {
case APPLICATION_FIRST:
- migrationInvoker.migrateToServiceDiscoveryInvoker(false);
+ migrationInvoker.migrateToApplicationFirstInvoker(newRule);
break;
case FORCE_APPLICATION:
- migrationInvoker.migrateToServiceDiscoveryInvoker(true);
+ success = migrationInvoker.migrateToForceApplicationInvoker(newRule);
break;
- case INTERFACE_FIRST:
+ case FORCE_INTERFACE:
default:
- migrationInvoker.fallbackToInterfaceInvoker();
+ success = migrationInvoker.migrateToForceInterfaceInvoker(newRule);
}
- }
- }
- private void refreshInvoker(MigrationStep step, Float threshold) {
- if (step == null || threshold == null) {
- throw new IllegalStateException("Step or threshold of migration rule cannot be null");
+ if (success) {
+ setCurrentStepAndThreshold(step, threshold);
+ logger.info("Succeed Migrated to " + step + " mode. Service Name: " + consumerURL.getDisplayServiceKey());
+ report(step, originStep, "true");
+ } else {
+ // migrate failed, do not save new step and rule
+ logger.warn("Migrate to " + step + " mode failed. Probably not satisfy the threshold you set "
+ + threshold + ". Please try re-publish configuration if you still after check.");
+ report(step, originStep, "false");
+ }
+
+ return success;
}
+ // ignore if step is same with previous, will continue override rule for MigrationInvoker
+ return true;
+ }
- if (step == MigrationStep.APPLICATION_FIRST) {
- setCurrentStepAndThreshold(step, threshold);
- migrationInvoker.refreshServiceDiscoveryInvokerOnMappingCallback(false);
- } else if (step == MigrationStep.FORCE_APPLICATION) {
- setCurrentStepAndThreshold(step, threshold);
- migrationInvoker.refreshServiceDiscoveryInvokerOnMappingCallback(true);
+ private void report(MigrationStep step, MigrationStep originStep, String success) {
+ if (FrameworkStatusReporter.hasReporter()) {
+ FrameworkStatusReporter.reportMigrationStepStatus(
+ FrameworkStatusReporter.createMigrationStepReport(consumerURL.getServiceInterface(), consumerURL.getVersion(),
+ consumerURL.getGroup(), String.valueOf(originStep), String.valueOf(step), success));
}
}
- public void setMigrationRule(MigrationRule rule) {
+ private void setMigrationRule(MigrationRule rule) {
this.migrationInvoker.setMigrationRule(rule);
}
- private MigrationStep getMigrationStep(MigrationRule rule, MigrationStep step, String serviceKey, Set<String> apps) {
- MigrationStep configuredStep = rule.getStep(serviceKey, apps);
+ private MigrationStep getMigrationStep(MigrationRule rule, MigrationStep step, String serviceKey) {
+ MigrationStep configuredStep = rule.getStep(serviceKey);
step = configuredStep == null ? step : configuredStep;
return step;
}
- private Float getMigrationThreshold(MigrationRule rule, Float threshold, String serviceKey, Set<String> apps) {
- Float configuredThreshold = rule.getThreshold(serviceKey, apps);
+ private Float getMigrationThreshold(MigrationRule rule, Float threshold, String serviceKey) {
+ Float configuredThreshold = rule.getThreshold(serviceKey);
threshold = configuredThreshold == null ? threshold : configuredThreshold;
return threshold;
}
- public void setCurrentStepAndThreshold(MigrationStep currentStep, Float currentThreshold) {
+ private void setCurrentStepAndThreshold(MigrationStep currentStep, Float currentThreshold) {
if (currentThreshold != null) {
this.currentThreshold = currentThreshold;
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
index 355ab63..80193fc 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.registry.client.migration;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent;
import org.apache.dubbo.common.config.configcenter.ConfigurationListener;
import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
@@ -24,11 +25,8 @@ import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.StringUtils;
-import org.apache.dubbo.metadata.MappingChangedEvent;
-import org.apache.dubbo.metadata.MappingListener;
-import org.apache.dubbo.metadata.ServiceNameMapping;
-import org.apache.dubbo.metadata.WritableMetadataService;
import org.apache.dubbo.registry.client.migration.model.MigrationRule;
import org.apache.dubbo.registry.integration.RegistryProtocol;
import org.apache.dubbo.registry.integration.RegistryProtocolListener;
@@ -36,31 +34,22 @@ import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.cluster.ClusterInvoker;
import org.apache.dubbo.rpc.model.ApplicationModel;
-import java.util.LinkedHashSet;
import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
-import static java.util.Collections.emptySet;
-import static java.util.Collections.unmodifiableSet;
-import static java.util.stream.Collectors.toSet;
-import static java.util.stream.Stream.of;
-import static org.apache.dubbo.common.constants.CommonConstants.MAPPING_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.INIT;
-import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDED_BY;
-import static org.apache.dubbo.common.constants.RegistryConstants.SUBSCRIBED_SERVICE_NAMES_KEY;
-import static org.apache.dubbo.common.utils.CollectionUtils.isEmpty;
-import static org.apache.dubbo.common.utils.StringUtils.isBlank;
@Activate
public class MigrationRuleListener implements RegistryProtocolListener, ConfigurationListener {
private static final Logger logger = LoggerFactory.getLogger(MigrationRuleListener.class);
- private static final String RULE_KEY = ApplicationModel.getName() + ".migration";
private static final String DUBBO_SERVICEDISCOVERY_MIGRATION = "DUBBO_SERVICEDISCOVERY_MIGRATION";
+ private static final String MIGRATION_DELAY_KEY = "dubbo.application.migration.delay";
+ private final String RULE_KEY = ApplicationModel.getName() + ".migration";
- private Map<String, MigrationRuleHandler> handlers = new ConcurrentHashMap<>();
+ private final Map<String, MigrationRuleHandler> handlers = new ConcurrentHashMap<>();
private DynamicConfiguration configuration;
private volatile String rawRule;
@@ -69,25 +58,46 @@ public class MigrationRuleListener implements RegistryProtocolListener, Configur
public MigrationRuleListener() {
this.configuration = ApplicationModel.getEnvironment().getDynamicConfiguration().orElse(null);
- String localRawRule = ApplicationModel.getEnvironment().getLocalMigrationRule();
- String defaultRawRule = StringUtils.isEmpty(localRawRule) ? INIT : localRawRule;
-
if (this.configuration != null) {
logger.info("Listening for migration rules on dataId " + RULE_KEY + ", group " + DUBBO_SERVICEDISCOVERY_MIGRATION);
configuration.addListener(RULE_KEY, DUBBO_SERVICEDISCOVERY_MIGRATION, this);
String rawRule = configuration.getConfig(RULE_KEY, DUBBO_SERVICEDISCOVERY_MIGRATION);
if (StringUtils.isEmpty(rawRule)) {
- rawRule = defaultRawRule;
+ rawRule = INIT;
}
this.rawRule = rawRule;
} else {
if (logger.isWarnEnabled()) {
logger.warn("Using default configuration rule because config center is not configured!");
}
- rawRule = defaultRawRule;
+ rawRule = INIT;
+ }
+
+ String localRawRule = ApplicationModel.getEnvironment().getLocalMigrationRule();
+ if (!StringUtils.isEmpty(localRawRule)) {
+ Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboMigrationRuleDelayWorker", true))
+ .schedule(() -> {
+ if (this.rawRule.equals(INIT)) {
+ this.process(new ConfigChangedEvent(null, null, localRawRule));
+ }
+ }, getDelay(), TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private int getDelay() {
+ int delay = 60000;
+ String delayStr = ConfigurationUtils.getProperty(MIGRATION_DELAY_KEY);
+ if (StringUtils.isEmpty(delayStr)) {
+ return delay;
+ }
+
+ try {
+ delay = Integer.parseInt(delayStr);
+ } catch (Exception e) {
+ logger.warn("Invalid migration delay param " + delayStr);
}
-// process(new ConfigChangedEvent(RULE_KEY, DUBBO_SERVICEDISCOVERY_MIGRATION, rawRule));
+ return delay;
}
@Override
@@ -104,7 +114,7 @@ public class MigrationRuleListener implements RegistryProtocolListener, Configur
rule = parseRule(rawRule);
if (CollectionUtils.isNotEmptyMap(handlers)) {
- handlers.forEach((_key, handler) -> handler.doMigrate(rule, false));
+ handlers.forEach((_key, handler) -> handler.doMigrate(rule));
}
}
@@ -130,111 +140,22 @@ public class MigrationRuleListener implements RegistryProtocolListener, Configur
@Override
public synchronized void onRefer(RegistryProtocol registryProtocol, ClusterInvoker<?> invoker, URL consumerUrl, URL registryURL) {
MigrationRuleHandler<?> migrationRuleHandler = handlers.computeIfAbsent(consumerUrl.getServiceKey() + consumerUrl.getParameter(TIMESTAMP_KEY), _key -> {
- return new MigrationRuleHandler<>((MigrationInvoker<?>)invoker, consumerUrl);
+ return new MigrationRuleHandler<>((MigrationInvoker<?>) invoker, consumerUrl);
});
- try {
- Set<String> subscribedServices = getServices(registryURL, consumerUrl, migrationRuleHandler);
- WritableMetadataService.getDefaultExtension().putCachedMapping(ServiceNameMapping.buildMappingKey(consumerUrl), subscribedServices);
- } catch (Exception e) {
- logger.warn("Cannot find app mapping for service " + consumerUrl.getServiceInterface() + ", will not migrate.", e);
- }
-
rule = parseRule(rawRule);
- migrationRuleHandler.doMigrate(rule, false);
+ migrationRuleHandler.doMigrate(rule);
}
@Override
public void onDestroy() {
- if (configuration != null)
+ if (configuration != null) {
configuration.removeListener(RULE_KEY, this);
- }
-
- /**
- * 1.developer explicitly specifies the application name this interface belongs to
- * 2.check Interface-App mapping
- * 3.use the services specified in registry url.
- *
- * @param subscribedURL
- * @return
- */
- protected Set<String> getServices(URL registryURL, URL subscribedURL, MigrationRuleHandler handler) {
- Set<String> subscribedServices = new TreeSet<>();
- Set<String> globalConfiguredSubscribingServices = parseServices(registryURL.getParameter(SUBSCRIBED_SERVICE_NAMES_KEY));
-
- String serviceNames = subscribedURL.getParameter(PROVIDED_BY);
- if (StringUtils.isNotEmpty(serviceNames)) {
- logger.info(subscribedURL.getServiceInterface() + " mapping to " + serviceNames + " instructed by provided-by set by user.");
- subscribedServices.addAll(parseServices(serviceNames));
}
-
- if (isEmpty(subscribedServices)) {
- Set<String> mappedServices = findMappedServices(registryURL, subscribedURL, new DefaultMappingListener(subscribedURL, subscribedServices, handler));
- logger.info(subscribedURL.getServiceInterface() + " mapping to " + mappedServices + " instructed by remote metadata center.");
- subscribedServices.addAll(mappedServices);
- if (isEmpty(subscribedServices)) {
- logger.info(subscribedURL.getServiceInterface() + " mapping to " + globalConfiguredSubscribingServices + " by default.");
- subscribedServices.addAll(globalConfiguredSubscribingServices);
- }
- }
- return subscribedServices;
- }
-
- protected Set<String> findMappedServices(URL registryURL, URL subscribedURL, MappingListener listener) {
- Set<String> result = new LinkedHashSet<>();
- ServiceNameMapping serviceNameMapping = ServiceNameMapping.getExtension(registryURL.getParameter(MAPPING_KEY));
- result.addAll(serviceNameMapping.getAndListen(subscribedURL, listener));
- result.addAll(serviceNameMapping.getAndListenWithNewStore(subscribedURL, listener));
- return result;
}
- public static Set<String> parseServices(String literalServices) {
- return isBlank(literalServices) ? emptySet() :
- unmodifiableSet(of(literalServices.split(","))
- .map(String::trim)
- .filter(StringUtils::isNotEmpty)
- .collect(toSet()));
- }
-
- private class DefaultMappingListener implements MappingListener {
- private final Logger logger = LoggerFactory.getLogger(DefaultMappingListener.class);
- private URL url;
- private Set<String> oldApps;
- private MigrationRuleHandler handler;
-
- public DefaultMappingListener(URL subscribedURL, Set<String> serviceNames, MigrationRuleHandler handler) {
- this.url = subscribedURL;
- this.oldApps = serviceNames;
- this.handler = handler;
- }
-
- @Override
- public void onEvent(MappingChangedEvent event) {
- if(logger.isDebugEnabled()) {
- logger.debug("Received mapping notification from meta server, " + event);
- }
- Set<String> newApps = event.getApps();
- Set<String> tempOldApps = oldApps;
- oldApps = newApps;
-
- if (CollectionUtils.isEmpty(newApps)) {
- return;
- }
-
- if (CollectionUtils.isEmpty(tempOldApps) && newApps.size() > 0) {
- WritableMetadataService.getDefaultExtension().putCachedMapping(ServiceNameMapping.buildMappingKey(url), newApps);
- handler.doMigrate(rule, true);
- return;
- }
-
- for (String newAppName : newApps) {
- if (!tempOldApps.contains(newAppName)) {
- WritableMetadataService.getDefaultExtension().putCachedMapping(ServiceNameMapping.buildMappingKey(url), newApps);
- handler.doMigrate(rule, true);
- return;
- }
- }
- }
+ public Map<String, MigrationRuleHandler> getHandlers() {
+ return handlers;
}
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/ServiceDiscoveryMigrationInvoker.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/ServiceDiscoveryMigrationInvoker.java
index 774073b..38a0501 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/ServiceDiscoveryMigrationInvoker.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/ServiceDiscoveryMigrationInvoker.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.client.migration.model.MigrationRule;
import org.apache.dubbo.registry.integration.RegistryProtocol;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Result;
@@ -40,14 +41,9 @@ public class ServiceDiscoveryMigrationInvoker<T> extends MigrationInvoker<T> {
}
@Override
- public void fallbackToInterfaceInvoker() {
+ public boolean migrateToForceInterfaceInvoker(MigrationRule newRule) {
logger.error("Service discovery registry type does not support discovery of interface level addresses, " + getRegistryUrl());
- migrateToServiceDiscoveryInvoker(true);
- }
-
- @Override
- public void migrateToServiceDiscoveryInvoker(boolean forceMigrate) {
- refreshServiceDiscoveryInvoker();
+ return migrateToForceApplicationInvoker(newRule);
}
@Override
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/ApplicationMigrationRule.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/ApplicationMigrationRule.java
deleted file mode 100644
index 05b1483..0000000
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/ApplicationMigrationRule.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.registry.client.migration.model;
-
-import java.util.Map;
-
-public class ApplicationMigrationRule {
- private String name;
- private MigrationStep step;
- private Float threshold;
-
- public static ApplicationMigrationRule parseFromMap(Map<String, Object> map) {
- ApplicationMigrationRule applicationMigrationRule = new ApplicationMigrationRule();
- applicationMigrationRule.setName((String) map.get("name"));
-
- Object step = map.get("step");
- if (step != null) {
- applicationMigrationRule.setStep(MigrationStep.valueOf(step.toString()));
- }
-
- Object threshold = map.get("threshold");
- if (threshold != null) {
- applicationMigrationRule.setThreshold(Float.valueOf(threshold.toString()));
- }
-
- return applicationMigrationRule;
- }
-
- public ApplicationMigrationRule() {
- }
-
- public ApplicationMigrationRule(String name, MigrationStep step, Float threshold) {
- this.name = name;
- this.step = step;
- this.threshold = threshold;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public MigrationStep getStep() {
- return step;
- }
-
- public void setStep(MigrationStep step) {
- this.step = step;
- }
-
- public Float getThreshold() {
- return threshold;
- }
-
- public void setThreshold(Float threshold) {
- this.threshold = threshold;
- }
-}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/InterfaceMigrationRule.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/InterfaceMigrationRule.java
index 614cbbd..79f112b 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/InterfaceMigrationRule.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/InterfaceMigrationRule.java
@@ -19,14 +19,15 @@ package org.apache.dubbo.registry.client.migration.model;
import java.util.Map;
public class InterfaceMigrationRule {
- private String appName;
private String serviceKey;
private MigrationStep step;
private Float threshold;
+ private Integer proportion;
+ private Integer delay;
+ private Boolean force;
public static InterfaceMigrationRule parseFromMap(Map<String, Object> map) {
InterfaceMigrationRule interfaceMigrationRule = new InterfaceMigrationRule();
- interfaceMigrationRule.setAppName((String) map.get("appName"));
interfaceMigrationRule.setServiceKey((String) map.get("serviceKey"));
Object step = map.get("step");
@@ -39,24 +40,31 @@ public class InterfaceMigrationRule {
interfaceMigrationRule.setThreshold(Float.valueOf(threshold.toString()));
}
+ Object proportion = map.get("proportion");
+ if (proportion != null) {
+ interfaceMigrationRule.setProportion(Integer.valueOf(proportion.toString()));
+ }
+
+ Object delay = map.get("delay");
+ if (delay != null) {
+ interfaceMigrationRule.setDelay(Integer.valueOf(delay.toString()));
+ }
+
+ Object force = map.get("force");
+ if (force != null) {
+ interfaceMigrationRule.setForce(Boolean.valueOf(force.toString()));
+ }
+
return interfaceMigrationRule;
}
public InterfaceMigrationRule(){}
- public InterfaceMigrationRule(String appName, String serviceKey, MigrationStep step, Float threshold) {
- this.appName = appName;
+ public InterfaceMigrationRule(String serviceKey, MigrationStep step, Float threshold, Integer proportion) {
this.serviceKey = serviceKey;
this.step = step;
this.threshold = threshold;
- }
-
- public String getAppName() {
- return appName;
- }
-
- public void setAppName(String appName) {
- this.appName = appName;
+ this.proportion = proportion;
}
public String getServiceKey() {
@@ -82,4 +90,28 @@ public class InterfaceMigrationRule {
public void setThreshold(Float threshold) {
this.threshold = threshold;
}
+
+ public Integer getProportion() {
+ return proportion;
+ }
+
+ public void setProportion(Integer proportion) {
+ this.proportion = proportion;
+ }
+
+ public Integer getDelay() {
+ return delay;
+ }
+
+ public void setDelay(Integer delay) {
+ this.delay = delay;
+ }
+
+ public Boolean getForce() {
+ return force;
+ }
+
+ public void setForce(Boolean force) {
+ this.force = force;
+ }
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/MigrationRule.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/MigrationRule.java
index 755273f..199d702 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/MigrationRule.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/MigrationRule.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -36,12 +35,18 @@ import java.util.stream.Collectors;
* key: demo-consumer
* step: APPLICATION_FIRST
* threshold: 1.0
+ * proportion: 60
+ * delay: 60
+ * force: false
* interfaces:
- * - serviceKey: DemoService:1.0.0
- * threshold: 1.0
- * step: APPLICATION_FIRST
- * - serviceKey: GreetingService:1.0.0
- * step: FORCE_APPLICATION
+ * - serviceKey: DemoService:1.0.0
+ * threshold: 0.5
+ * proportion: 30
+ * delay: 30
+ * force: true
+ * step: APPLICATION_FIRST
+ * - serviceKey: GreetingService:1.0.0
+ * step: FORCE_APPLICATION
*/
public class MigrationRule {
public static final MigrationRule INIT = new MigrationRule();
@@ -49,13 +54,12 @@ public class MigrationRule {
private String key;
private MigrationStep step;
private Float threshold;
- // FIXME
- private List<String> targetIps;
+ private Integer proportion;
+ private Integer delay;
+ private Boolean force;
private List<InterfaceMigrationRule> interfaces;
- private List<ApplicationMigrationRule> applications;
private transient Map<String, InterfaceMigrationRule> interfaceRules;
- private transient Map<String, ApplicationMigrationRule> applicationRules;
@SuppressWarnings("unchecked")
private static MigrationRule parseFromMap(Map<String, Object> map) {
@@ -72,10 +76,19 @@ public class MigrationRule {
migrationRule.setThreshold(Float.valueOf(threshold.toString()));
}
- Object targetIps = map.get("targetIps");
- if (targetIps != null && List.class.isAssignableFrom(targetIps.getClass())) {
- migrationRule.setTargetIps(((List<Object>) targetIps).stream()
- .map(String::valueOf).collect(Collectors.toList()));
+ Object proportion = map.get("proportion");
+ if (proportion != null) {
+ migrationRule.setProportion(Integer.valueOf(proportion.toString()));
+ }
+
+ Object delay = map.get("delay");
+ if (delay != null) {
+ migrationRule.setDelay(Integer.valueOf(delay.toString()));
+ }
+
+ Object force = map.get("force");
+ if (force != null) {
+ migrationRule.setForce(Boolean.valueOf(force.toString()));
}
Object interfaces = map.get("interfaces");
@@ -84,12 +97,6 @@ public class MigrationRule {
.map(InterfaceMigrationRule::parseFromMap).collect(Collectors.toList()));
}
- Object applications = map.get("applications");
- if (applications != null && List.class.isAssignableFrom(applications.getClass())) {
- migrationRule.setApplications(((List<Map<String, Object>>) applications).stream()
- .map(ApplicationMigrationRule::parseFromMap).collect(Collectors.toList()));
- }
-
return migrationRule;
}
@@ -108,7 +115,7 @@ public class MigrationRule {
this.key = key;
}
- public MigrationStep getStep(String serviceKey, Set<String> apps) {
+ public MigrationStep getStep(String serviceKey) {
if (interfaceRules != null) {
InterfaceMigrationRule rule = interfaceRules.get(serviceKey);
if (rule != null) {
@@ -116,16 +123,6 @@ public class MigrationRule {
}
}
- if (apps != null) {
- for (String app : apps) {
- if (applicationRules != null) {
- ApplicationMigrationRule rule = applicationRules.get(app);
- if (rule != null) {
- return rule.getStep() == null ? step : rule.getStep();
- }
- }
- }
- }
return step;
}
@@ -136,35 +133,17 @@ public class MigrationRule {
return interfaceRules.get(serviceKey);
}
- public ApplicationMigrationRule getApplicationRule(String app) {
- if (applicationRules == null) {
- return null;
- }
- return applicationRules.get(app);
- }
-
public MigrationStep getStep() {
return step;
}
- public Float getThreshold(String serviceKey, Set<String> apps) {
+ public Float getThreshold(String serviceKey) {
if (interfaceRules != null) {
InterfaceMigrationRule rule = interfaceRules.get(serviceKey);
if (rule != null) {
return rule.getThreshold() == null ? threshold : rule.getThreshold();
}
}
-
- if (apps != null) {
- for (String app : apps) {
- if (applicationRules != null) {
- ApplicationMigrationRule rule = applicationRules.get(app);
- if (rule != null) {
- return rule.getThreshold() == null ? threshold : rule.getThreshold();
- }
- }
- }
- }
return threshold;
}
@@ -176,20 +155,66 @@ public class MigrationRule {
this.threshold = threshold;
}
+ public Integer getProportion() {
+ return proportion;
+ }
+
+ public Integer getProportion(String serviceKey) {
+ if (interfaceRules != null) {
+ InterfaceMigrationRule rule = interfaceRules.get(serviceKey);
+ if (rule != null) {
+ return rule.getProportion() == null ? proportion : rule.getProportion();
+ }
+ }
+ return proportion;
+ }
+
+ public void setProportion(Integer proportion) {
+ this.proportion = proportion;
+ }
+
+ public Integer getDelay() {
+ return delay;
+ }
+
+ public Integer getDelay(String serviceKey) {
+ if (interfaceRules != null) {
+ InterfaceMigrationRule rule = interfaceRules.get(serviceKey);
+ if (rule != null) {
+ return rule.getDelay() == null ? delay : rule.getDelay();
+ }
+ }
+ return delay;
+ }
+
+ public void setDelay(Integer delay) {
+ this.delay = delay;
+ }
+
public void setStep(MigrationStep step) {
this.step = step;
}
- public List<InterfaceMigrationRule> getInterfaces() {
- return interfaces;
+ public Boolean getForce() {
+ return force;
+ }
+
+ public Boolean getForce(String serviceKey) {
+ if (interfaceRules != null) {
+ InterfaceMigrationRule rule = interfaceRules.get(serviceKey);
+ if (rule != null) {
+ return rule.getForce() == null ? force : rule.getForce();
+ }
+ }
+ return force;
}
- public List<String> getTargetIps() {
- return targetIps;
+ public void setForce(Boolean force) {
+ this.force = force;
}
- public void setTargetIps(List<String> targetIps) {
- this.targetIps = targetIps;
+ public List<InterfaceMigrationRule> getInterfaces() {
+ return interfaces;
}
public void setInterfaces(List<InterfaceMigrationRule> interfaces) {
@@ -202,29 +227,6 @@ public class MigrationRule {
}
}
- public List<ApplicationMigrationRule> getApplications() {
- return applications;
- }
-
- public void setApplications(List<ApplicationMigrationRule> applications) {
- this.applications = applications;
- if (applications != null) {
- this.applicationRules = new HashMap<>();
- applications.forEach(rule -> {
- applicationRules.put(rule.getName(), rule);
- });
- }
- }
-
- public boolean removeApplicationRule(String providerApp) {
- if (CollectionUtils.isNotEmpty(this.applications)) {
- boolean removed = this.applications.removeIf(applicationMigrationRule -> applicationMigrationRule.getName().equals(providerApp));
- this.applicationRules.remove(providerApp);
- return removed;
- }
- return false;
- }
-
public boolean removeInterfaceRule(String serviceKey) {
if (CollectionUtils.isNotEmpty(this.interfaces)) {
boolean removed = this.interfaces.removeIf(interfaceMigrationRule -> interfaceMigrationRule.getServiceKey().equals(serviceKey));
@@ -234,7 +236,7 @@ public class MigrationRule {
return false;
}
- public boolean addInterfaceRule(String providerApp, String serviceKey, MigrationStep step, Float threshold) {
+ public boolean addInterfaceRule(String serviceKey, MigrationStep step, Float threshold, Integer proportion) {
if (getInterfaceRule(serviceKey) != null) {
return false;
}
@@ -242,7 +244,7 @@ public class MigrationRule {
if (this.interfaces == null) {
this.interfaces = new ArrayList<>();
}
- InterfaceMigrationRule interfaceMigrationRule = new InterfaceMigrationRule(providerApp, serviceKey, step, threshold);
+ InterfaceMigrationRule interfaceMigrationRule = new InterfaceMigrationRule(serviceKey, step, threshold, proportion);
this.interfaces.add(interfaceMigrationRule);
if (interfaceRules == null) {
@@ -252,24 +254,6 @@ public class MigrationRule {
return true;
}
- public boolean addApplicationRule(String providerApp, MigrationStep step, Float threshold) {
- if (getApplicationRule(providerApp) != null) {
- return false;
- }
-
- if (this.applications == null) {
- this.applications = new ArrayList<>();
- }
- ApplicationMigrationRule applicationMigrationRule = new ApplicationMigrationRule(providerApp, step, threshold);
- this.applications.add(applicationMigrationRule);
-
- if (applicationRules == null) {
- this.applicationRules = new HashMap<>();
- }
- this.applicationRules.put(providerApp, applicationMigrationRule);
- return true;
- }
-
public static MigrationRule parse(String rawRule) {
Yaml yaml = new Yaml(new SafeConstructor());
Map<String, Object> map = yaml.load(rawRule);
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/MigrationStep.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/MigrationStep.java
index 1fe8ec8..6d866c5 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/MigrationStep.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/model/MigrationStep.java
@@ -17,7 +17,7 @@
package org.apache.dubbo.registry.client.migration.model;
public enum MigrationStep {
- INTERFACE_FIRST,
+ FORCE_INTERFACE,
APPLICATION_FIRST,
FORCE_APPLICATION
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
index 4543dea..5b6fbc0 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
@@ -287,6 +287,7 @@ public abstract class DynamicDirectory<T> extends AbstractDirectory<T> implement
invokersChanged = true;
if (invokersChangedListener != null) {
invokersChangedListener.onChange();
+ invokersChanged = false;
}
}
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparatorTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparatorTest.java
new file mode 100644
index 0000000..8ae2417
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparatorTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.client.migration;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.client.migration.model.MigrationRule;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.cluster.ClusterInvoker;
+import org.apache.dubbo.rpc.cluster.Directory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.dubbo.registry.client.migration.DefaultMigrationAddressComparator.NEW_ADDRESS_SIZE;
+import static org.apache.dubbo.registry.client.migration.DefaultMigrationAddressComparator.OLD_ADDRESS_SIZE;
+
+public class DefaultMigrationAddressComparatorTest {
+
+ @Test
+ public void test() {
+ DefaultMigrationAddressComparator comparator = new DefaultMigrationAddressComparator();
+
+ ClusterInvoker newInvoker = Mockito.mock(ClusterInvoker.class);
+ ClusterInvoker oldInvoker = Mockito.mock(ClusterInvoker.class);
+ Directory newDirectory = Mockito.mock(Directory.class);
+ Directory oldDirectory = Mockito.mock(Directory.class);
+ MigrationRule rule = Mockito.mock(MigrationRule.class);
+ URL url = Mockito.mock(URL.class);
+
+ Mockito.when(url.getDisplayServiceKey()).thenReturn("test");
+ Mockito.when(newInvoker.getDirectory()).thenReturn(newDirectory);
+ Mockito.when(oldInvoker.getDirectory()).thenReturn(oldDirectory);
+ Mockito.when(newInvoker.getUrl()).thenReturn(url);
+ Mockito.when(oldInvoker.getUrl()).thenReturn(url);
+
+ Mockito.when(newInvoker.hasProxyInvokers()).thenReturn(false);
+ Mockito.when(newDirectory.getAllInvokers()).thenReturn(Collections.emptyList());
+
+ Assertions.assertFalse(comparator.shouldMigrate(newInvoker, oldInvoker, rule));
+ Assertions.assertEquals(-1, comparator.getAddressSize("test").get(NEW_ADDRESS_SIZE));
+ Assertions.assertEquals(0, comparator.getAddressSize("test").get(OLD_ADDRESS_SIZE));
+
+ Mockito.when(newInvoker.hasProxyInvokers()).thenReturn(true);
+ Mockito.when(oldInvoker.hasProxyInvokers()).thenReturn(false);
+ Mockito.when(oldDirectory.getAllInvokers()).thenReturn(Collections.emptyList());
+
+ Assertions.assertTrue(comparator.shouldMigrate(newInvoker, oldInvoker, rule));
+ Assertions.assertEquals(0, comparator.getAddressSize("test").get(NEW_ADDRESS_SIZE));
+ Assertions.assertEquals(-1, comparator.getAddressSize("test").get(OLD_ADDRESS_SIZE));
+
+ Mockito.when(oldInvoker.hasProxyInvokers()).thenReturn(true);
+
+ List<Invoker> newInvokerList = new LinkedList<>();
+ newInvokerList.add(Mockito.mock(Invoker.class));
+ newInvokerList.add(Mockito.mock(Invoker.class));
+ newInvokerList.add(Mockito.mock(Invoker.class));
+ Mockito.when(newDirectory.getAllInvokers()).thenReturn(newInvokerList);
+
+ List<Invoker> oldInvokerList = new LinkedList<>();
+ oldInvokerList.add(Mockito.mock(Invoker.class));
+ oldInvokerList.add(Mockito.mock(Invoker.class));
+ Mockito.when(oldDirectory.getAllInvokers()).thenReturn(oldInvokerList);
+
+ Assertions.assertTrue(comparator.shouldMigrate(newInvoker, oldInvoker, null));
+
+ Mockito.when(rule.getThreshold("test")).thenReturn(0.5f);
+ newInvokerList.clear();
+ newInvokerList.add(Mockito.mock(Invoker.class));
+ Assertions.assertTrue(comparator.shouldMigrate(newInvoker, oldInvoker, rule));
+
+ newInvokerList.clear();
+ // hasProxyInvokers will check if invokers list is empty
+ // if hasProxyInvokers return true, comparator will directly because default threshold is 0.0
+ Assertions.assertTrue(comparator.shouldMigrate(newInvoker, oldInvoker, null));
+ Assertions.assertFalse(comparator.shouldMigrate(newInvoker, oldInvoker, rule));
+
+ Assertions.assertEquals(0, comparator.getAddressSize("test").get(NEW_ADDRESS_SIZE));
+ Assertions.assertEquals(2, comparator.getAddressSize("test").get(OLD_ADDRESS_SIZE));
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationInvokerTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationInvokerTest.java
new file mode 100644
index 0000000..aadb374
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationInvokerTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.client.migration;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.registry.client.migration.model.MigrationRule;
+import org.apache.dubbo.registry.client.migration.model.MigrationStep;
+import org.apache.dubbo.registry.integration.DynamicDirectory;
+import org.apache.dubbo.registry.integration.RegistryProtocol;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.cluster.ClusterInvoker;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class MigrationInvokerTest {
+ @BeforeEach
+ public void before() {
+ ApplicationConfig applicationConfig = new ApplicationConfig();
+ applicationConfig.setName("Test");
+ ApplicationModel.getConfigManager().setApplication(applicationConfig);
+ }
+
+ @AfterEach
+ public void after() {
+ ApplicationModel.reset();
+ }
+
+ @Test
+ public void test() {
+ RegistryProtocol registryProtocol = Mockito.mock(RegistryProtocol.class);
+
+ ClusterInvoker invoker = Mockito.mock(ClusterInvoker.class);
+ ClusterInvoker serviceDiscoveryInvoker = Mockito.mock(ClusterInvoker.class);
+
+ DynamicDirectory directory = Mockito.mock(DynamicDirectory.class);
+ DynamicDirectory serviceDiscoveryDirectory = Mockito.mock(DynamicDirectory.class);
+
+ Mockito.when(invoker.getDirectory()).thenReturn(directory);
+ Mockito.when(serviceDiscoveryInvoker.getDirectory()).thenReturn(serviceDiscoveryDirectory);
+
+ Mockito.when(invoker.hasProxyInvokers()).thenReturn(true);
+ Mockito.when(serviceDiscoveryInvoker.hasProxyInvokers()).thenReturn(true);
+
+ List<Invoker> invokers = new LinkedList<>();
+ invokers.add(Mockito.mock(Invoker.class));
+ invokers.add(Mockito.mock(Invoker.class));
+ List<Invoker> serviceDiscoveryInvokers = new LinkedList<>();
+ serviceDiscoveryInvokers.add(Mockito.mock(Invoker.class));
+ serviceDiscoveryInvokers.add(Mockito.mock(Invoker.class));
+ Mockito.when(directory.getAllInvokers()).thenReturn(invokers);
+ Mockito.when(serviceDiscoveryDirectory.getAllInvokers()).thenReturn(serviceDiscoveryInvokers);
+
+ Mockito.when(registryProtocol.getInvoker(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(invoker);
+ Mockito.when(registryProtocol.getServiceDiscoveryInvoker(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(serviceDiscoveryInvoker);
+
+ URL consumerURL = Mockito.mock(URL.class);
+ Mockito.when(consumerURL.getServiceInterface()).thenReturn("Test");
+ Mockito.when(consumerURL.getGroup()).thenReturn("Group");
+ Mockito.when(consumerURL.getVersion()).thenReturn("0.0.0");
+ Mockito.when(consumerURL.getServiceKey()).thenReturn("Group/Test:0.0.0");
+ Mockito.when(consumerURL.getDisplayServiceKey()).thenReturn("Test:0.0.0");
+
+ Mockito.when(invoker.getUrl()).thenReturn(consumerURL);
+ Mockito.when(serviceDiscoveryInvoker.getUrl()).thenReturn(consumerURL);
+
+ MigrationInvoker migrationInvoker = new MigrationInvoker(registryProtocol, null, null, null, null, consumerURL);
+
+ MigrationRule migrationRule = Mockito.mock(MigrationRule.class);
+ Mockito.when(migrationRule.getForce(Mockito.any())).thenReturn(true);
+ migrationInvoker.migrateToForceInterfaceInvoker(migrationRule);
+ migrationInvoker.setMigrationStep(MigrationStep.FORCE_INTERFACE);
+ migrationInvoker.invoke(null);
+ Mockito.verify(invoker, Mockito.times(1)).invoke(null);
+
+ migrationInvoker.migrateToForceApplicationInvoker(migrationRule);
+ migrationInvoker.setMigrationStep(MigrationStep.FORCE_APPLICATION);
+ migrationInvoker.invoke(null);
+ Mockito.verify(serviceDiscoveryInvoker, Mockito.times(1)).invoke(null);
+
+ Mockito.when(migrationRule.getThreshold(Mockito.any())).thenReturn(1.0f);
+ migrationInvoker.migrateToApplicationFirstInvoker(migrationRule);
+ migrationInvoker.setMigrationStep(MigrationStep.APPLICATION_FIRST);
+ migrationInvoker.invoke(null);
+ Mockito.verify(serviceDiscoveryInvoker, Mockito.times(2)).invoke(null);
+
+ Mockito.when(migrationRule.getThreshold(Mockito.any())).thenReturn(2.0f);
+ migrationInvoker.migrateToApplicationFirstInvoker(migrationRule);
+ migrationInvoker.setMigrationStep(MigrationStep.APPLICATION_FIRST);
+ migrationInvoker.invoke(null);
+ Mockito.verify(invoker, Mockito.times(2)).invoke(null);
+
+ Mockito.when(migrationRule.getForce(Mockito.any())).thenReturn(false);
+ Mockito.when(migrationRule.getThreshold(Mockito.any())).thenReturn(1.0f);
+ migrationInvoker.migrateToForceInterfaceInvoker(migrationRule);
+ migrationInvoker.setMigrationStep(MigrationStep.FORCE_INTERFACE);
+ migrationInvoker.invoke(null);
+ Mockito.verify(invoker, Mockito.times(3)).invoke(null);
+
+ migrationInvoker.migrateToForceInterfaceInvoker(migrationRule);
+ migrationInvoker.setMigrationStep(MigrationStep.FORCE_INTERFACE);
+ migrationInvoker.invoke(null);
+ Mockito.verify(invoker, Mockito.times(4)).invoke(null);
+
+ Mockito.when(migrationRule.getThreshold(Mockito.any())).thenReturn(2.0f);
+ migrationInvoker.migrateToForceApplicationInvoker(migrationRule);
+ migrationInvoker.setMigrationStep(MigrationStep.FORCE_APPLICATION);
+ migrationInvoker.invoke(null);
+ Mockito.verify(invoker, Mockito.times(5)).invoke(null);
+
+ Mockito.when(migrationRule.getThreshold(Mockito.any())).thenReturn(1.0f);
+ migrationInvoker.migrateToForceApplicationInvoker(migrationRule);
+ migrationInvoker.setMigrationStep(MigrationStep.FORCE_APPLICATION);
+ migrationInvoker.invoke(null);
+ Mockito.verify(serviceDiscoveryInvoker, Mockito.times(3)).invoke(null);
+
+ migrationInvoker.migrateToForceApplicationInvoker(migrationRule);
+ migrationInvoker.setMigrationStep(MigrationStep.FORCE_APPLICATION);
+ migrationInvoker.invoke(null);
+ Mockito.verify(serviceDiscoveryInvoker, Mockito.times(4)).invoke(null);
+
+ Mockito.when(migrationRule.getThreshold(Mockito.any())).thenReturn(2.0f);
+ migrationInvoker.migrateToForceInterfaceInvoker(migrationRule);
+ migrationInvoker.setMigrationStep(MigrationStep.FORCE_INTERFACE);
+ migrationInvoker.invoke(null);
+ Mockito.verify(serviceDiscoveryInvoker, Mockito.times(5)).invoke(null);
+
+ Mockito.when(migrationRule.getThreshold(Mockito.any())).thenReturn(2.0f);
+ Mockito.when(migrationRule.getForce(Mockito.any())).thenReturn(true);
+ migrationInvoker.migrateToForceInterfaceInvoker(migrationRule);
+ migrationInvoker.setMigrationStep(MigrationStep.FORCE_INTERFACE);
+ migrationInvoker.invoke(null);
+ Mockito.verify(invoker, Mockito.times(6)).invoke(null);
+
+ migrationInvoker.migrateToForceApplicationInvoker(migrationRule);
+ migrationInvoker.setMigrationStep(MigrationStep.FORCE_APPLICATION);
+ migrationInvoker.invoke(null);
+ Mockito.verify(serviceDiscoveryInvoker, Mockito.times(6)).invoke(null);
+
+ Mockito.when(migrationRule.getForce(Mockito.any())).thenReturn(false);
+ migrationInvoker.migrateToForceInterfaceInvoker(migrationRule);
+ migrationInvoker.setMigrationStep(MigrationStep.FORCE_INTERFACE);
+ migrationInvoker.invoke(null);
+ Mockito.verify(serviceDiscoveryInvoker, Mockito.times(7)).invoke(null);
+ Assertions.assertNull(migrationInvoker.getInvoker());
+
+ Mockito.when(migrationRule.getForce(Mockito.any())).thenReturn(true);
+ migrationInvoker.migrateToForceInterfaceInvoker(migrationRule);
+ migrationInvoker.setMigrationStep(MigrationStep.FORCE_INTERFACE);
+
+ Mockito.when(migrationRule.getForce(Mockito.any())).thenReturn(false);
+ migrationInvoker.migrateToForceApplicationInvoker(migrationRule);
+ migrationInvoker.setMigrationStep(MigrationStep.FORCE_APPLICATION);
+ migrationInvoker.invoke(null);
+ Mockito.verify(invoker, Mockito.times(7)).invoke(null);
+ Assertions.assertNull(migrationInvoker.getServiceDiscoveryInvoker());
+
+ ArgumentCaptor<InvokersChangedListener> argument = ArgumentCaptor.forClass(InvokersChangedListener.class);
+ Mockito.verify(serviceDiscoveryDirectory, Mockito.atLeastOnce()).setInvokersChangedListener(argument.capture());
+
+ Mockito.when(migrationRule.getThreshold(Mockito.any())).thenReturn(1.0f);
+ migrationInvoker.migrateToApplicationFirstInvoker(migrationRule);
+ migrationInvoker.setMigrationStep(MigrationStep.APPLICATION_FIRST);
+ for (int i = 0; i < 20; i++) {
+ migrationInvoker.invoke(null);
+ }
+ Mockito.verify(serviceDiscoveryInvoker, Mockito.times(27)).invoke(null);
+
+ serviceDiscoveryInvokers.remove(1);
+ Mockito.when(serviceDiscoveryInvoker.hasProxyInvokers()).thenReturn(false);
+ argument.getAllValues().get(argument.getAllValues().size() - 1).onChange();
+
+ for (int i = 0; i < 20; i++) {
+ migrationInvoker.invoke(null);
+ }
+ Mockito.verify(invoker, Mockito.times(27)).invoke(null);
+
+ serviceDiscoveryInvokers.add(Mockito.mock(Invoker.class));
+ Mockito.when(serviceDiscoveryInvoker.hasProxyInvokers()).thenReturn(true);
+ argument.getAllValues().get(argument.getAllValues().size() - 1).onChange();
+
+ Mockito.when(migrationRule.getProportion(Mockito.any())).thenReturn(50);
+ migrationInvoker.setMigrationRule(migrationRule);
+ for (int i = 0; i < 1000; i++) {
+ migrationInvoker.invoke(null);
+ }
+ Mockito.verify(serviceDiscoveryInvoker, Mockito.atMost(1026)).invoke(null);
+ Mockito.verify(invoker, Mockito.atLeast(28)).invoke(null);
+
+ Mockito.when(migrationRule.getDelay(Mockito.any())).thenReturn(1);
+ long currentTimeMillis = System.currentTimeMillis();
+ migrationInvoker.migrateToForceApplicationInvoker(migrationRule);
+ Assertions.assertTrue(System.currentTimeMillis() - currentTimeMillis >= 2000);
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandlerTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandlerTest.java
new file mode 100644
index 0000000..83b72907
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandlerTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.client.migration;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.client.migration.model.MigrationRule;
+import org.apache.dubbo.registry.client.migration.model.MigrationStep;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class MigrationRuleHandlerTest {
+ @Test
+ public void test() {
+ MigrationClusterInvoker invoker = Mockito.mock(MigrationClusterInvoker.class);
+ URL url = Mockito.mock(URL.class);
+ Mockito.when(url.getDisplayServiceKey()).thenReturn("test");
+ Mockito.when(url.getParameter((String) Mockito.any(), (String) Mockito.any())).thenAnswer(i->i.getArgument(1));
+ MigrationRuleHandler handler = new MigrationRuleHandler(invoker, url);
+
+ Mockito.when(invoker.migrateToForceApplicationInvoker(Mockito.any())).thenReturn(true);
+ Mockito.when(invoker.migrateToForceInterfaceInvoker(Mockito.any())).thenReturn(true);
+
+ handler.doMigrate(MigrationRule.INIT);
+ Mockito.verify(invoker, Mockito.times(1)).migrateToApplicationFirstInvoker(MigrationRule.INIT);
+
+ MigrationRule rule = Mockito.mock(MigrationRule.class);
+ Mockito.when(rule.getStep("test")).thenReturn(MigrationStep.FORCE_APPLICATION);
+ handler.doMigrate(rule);
+ Mockito.verify(invoker, Mockito.times(1)).migrateToForceApplicationInvoker(rule);
+
+ Mockito.when(rule.getStep("test")).thenReturn(MigrationStep.APPLICATION_FIRST);
+ handler.doMigrate(rule);
+ Mockito.verify(invoker, Mockito.times(1)).migrateToApplicationFirstInvoker(rule);
+
+ Mockito.when(rule.getStep("test")).thenReturn(MigrationStep.FORCE_INTERFACE);
+ handler.doMigrate(rule);
+ Mockito.verify(invoker, Mockito.times(1)).migrateToForceInterfaceInvoker(rule);
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationRuleListenerTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationRuleListenerTest.java
new file mode 100644
index 0000000..ef15826
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationRuleListenerTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.client.migration;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class MigrationRuleListenerTest {
+ @Test
+ public void test() throws InterruptedException {
+ String rule = "key: demo-consumer\n" +
+ "step: APPLICATION_FIRST\n" +
+ "threshold: 1.0\n" +
+ "proportion: 60\n" +
+ "delay: 60\n" +
+ "force: false\n" +
+ "interfaces:\n" +
+ " - serviceKey: DemoService:1.0.0\n" +
+ " threshold: 0.5\n" +
+ " proportion: 30\n" +
+ " delay: 30\n" +
+ " force: true\n" +
+ " step: APPLICATION_FIRST\n" +
+ " - serviceKey: GreetingService:1.0.0\n" +
+ " step: FORCE_APPLICATION";
+
+ DynamicConfiguration dynamicConfiguration = Mockito.mock(DynamicConfiguration.class);
+
+ ApplicationModel.getEnvironment().setDynamicConfiguration(dynamicConfiguration);
+ ApplicationModel.getEnvironment().setLocalMigrationRule(rule);
+ ApplicationConfig applicationConfig = new ApplicationConfig();
+ applicationConfig.setName("demo-consumer");
+ ApplicationModel.getConfigManager().setApplication(applicationConfig);
+
+ URL consumerURL = Mockito.mock(URL.class);
+ Mockito.when(consumerURL.getServiceKey()).thenReturn("Test");
+ Mockito.when(consumerURL.getParameter("timestamp")).thenReturn("1");
+
+ System.setProperty("dubbo.application.migration.delay", "100");
+ MigrationRuleHandler handler = Mockito.mock(MigrationRuleHandler.class);
+
+ MigrationRuleListener migrationRuleListener = new MigrationRuleListener();
+ migrationRuleListener.getHandlers().put("Test1", handler);
+
+ Mockito.verify(handler, Mockito.timeout(5000)).doMigrate(Mockito.any());
+
+ migrationRuleListener.onRefer(null, null, consumerURL, null);
+ Mockito.verify(handler, Mockito.times(2)).doMigrate(Mockito.any());
+
+ ApplicationModel.reset();
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/model/MigrationRuleTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/model/MigrationRuleTest.java
index 1e2afe2..0b40e16 100644
--- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/model/MigrationRuleTest.java
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/model/MigrationRuleTest.java
@@ -20,7 +20,6 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
public class MigrationRuleTest {
@@ -29,20 +28,41 @@ public class MigrationRuleTest {
String rule = "key: demo-consumer\n" +
"step: APPLICATION_FIRST\n" +
"threshold: 1.0\n" +
+ "proportion: 60\n" +
+ "delay: 60\n" +
+ "force: false\n" +
"interfaces:\n" +
" - serviceKey: DemoService:1.0.0\n" +
- " threshold: 1.0\n" +
+ " threshold: 0.5\n" +
+ " proportion: 30\n" +
+ " delay: 30\n" +
+ " force: true\n" +
" step: APPLICATION_FIRST\n" +
" - serviceKey: GreetingService:1.0.0\n" +
" step: FORCE_APPLICATION";
MigrationRule migrationRule = MigrationRule.parse(rule);
- assertEquals(migrationRule.getKey(), "demo-consumer");
- assertEquals(migrationRule.getStep(), MigrationStep.APPLICATION_FIRST);
- assertEquals(migrationRule.getThreshold(), 1.0f);
+ assertEquals("demo-consumer", migrationRule.getKey());
+ assertEquals(MigrationStep.APPLICATION_FIRST ,migrationRule.getStep());
+ assertEquals(1.0f, migrationRule.getThreshold());
+ assertEquals(60, migrationRule.getProportion());
+ assertEquals(60, migrationRule.getDelay());
+ assertEquals(false, migrationRule.getForce());
+
assertEquals(migrationRule.getInterfaces().size(), 2);
assertNotNull(migrationRule.getInterfaceRule("DemoService:1.0.0"));
assertNotNull(migrationRule.getInterfaceRule("GreetingService:1.0.0"));
- assertNull(migrationRule.getApplications());
+
+ assertEquals(0.5f, migrationRule.getThreshold("DemoService:1.0.0"));
+ assertEquals(30, migrationRule.getProportion("DemoService:1.0.0"));
+ assertEquals(30, migrationRule.getDelay("DemoService:1.0.0"));
+ assertEquals(true, migrationRule.getForce("DemoService:1.0.0"));
+ assertEquals(MigrationStep.APPLICATION_FIRST ,migrationRule.getStep("DemoService:1.0.0"));
+
+ assertEquals(1.0f, migrationRule.getThreshold("GreetingService:1.0.0"));
+ assertEquals(60, migrationRule.getProportion("GreetingService:1.0.0"));
+ assertEquals(60, migrationRule.getDelay("GreetingService:1.0.0"));
+ assertEquals(false, migrationRule.getForce("GreetingService:1.0.0"));
+ assertEquals(MigrationStep.FORCE_APPLICATION ,migrationRule.getStep("GreetingService:1.0.0"));
}
}