You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2018/10/17 06:07:57 UTC
[incubator-dubbo] 01/03: Improvements for dynamic config and
router: 1. support route in StaticDirectory,
multi-registry & group merger 2. tolerance of abnormal configs from
configserver. 3. add check policy when failed to connect to configserver
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch dev-metadata
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
commit 91fddde79a718b3223b5caa421c4112bf5744f69
Author: ken.lj <ke...@gmail.com>
AuthorDate: Wed Oct 17 14:05:47 2018 +0800
Improvements for dynamic config and router:
1. support route in StaticDirectory, multi-registry & group merger
2. tolerance of abnormal configs from configserver.
3. add check policy when failed to connect to configserver
---
.../java/org/apache/dubbo/rpc/cluster/Router.java | 4 +-
.../org/apache/dubbo/rpc/cluster/RouterChain.java | 14 +-
.../rpc/cluster/directory/AbstractDirectory.java | 11 +-
.../rpc/cluster/directory/StaticDirectory.java | 15 +-
.../dubbo/rpc/cluster/router/AbstractRouter.java | 5 +
.../condition/config/ConfigConditionRouter.java | 30 ++-
.../router/{ => mock}/MockInvokersSelector.java | 229 +++++++++------------
.../rpc/cluster/router/mock/MockRouterFactory.java | 32 ++-
.../dubbo/rpc/cluster/router/tag/TagRouter.java | 13 +-
.../cluster/support/AbstractClusterInvoker.java | 6 +-
.../dubbo/rpc/cluster/support/ClusterUtils.java | 9 +-
.../cluster/support/MergeableClusterInvoker.java | 29 ++-
.../rpc/cluster/support/RegistryAwareCluster.java | 30 +--
.../support/RegistryAwareClusterInvoker.java | 59 ++++++
.../internal/org.apache.dubbo.rpc.cluster.Cluster | 3 +-
.../org.apache.dubbo.rpc.cluster.RouterFactory | 3 +-
.../java/org/apache/dubbo/common/Constants.java | 1 +
.../dubbo/common/extension/ExtensionLoader.java | 4 +
.../org/apache/dubbo/config/ReferenceConfig.java | 9 +-
dubbo-config/dubbo-config-dynamic/pom.xml | 2 +-
.../dynamic/AbstractDynamicConfiguration.java | 4 +-
.../support/apollo/ApolloDynamicConfiguration.java | 49 ++---
.../archaius/ArchaiusDynamicConfiguration.java | 3 +-
.../sources/ZooKeeperConfigurationSource.java | 16 +-
.../support/nop/NopDynamicConfiguration.java | 2 +-
.../registry/integration/RegistryDirectory.java | 86 +++++---
.../java/org/apache/dubbo/rpc/RpcException.java | 5 +
27 files changed, 375 insertions(+), 298 deletions(-)
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Router.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Router.java
index 65cf69f..3aaf298 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Router.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Router.java
@@ -63,9 +63,7 @@ public interface Router extends Comparable<Router> {
return "";
}
- default boolean isRuntime() {
- return true;
- }
+ boolean isRuntime();
default String getKey() {
return TreeNode.FAILOVER_KEY;
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/RouterChain.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/RouterChain.java
index a1cb9d5..d5db399 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/RouterChain.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/RouterChain.java
@@ -75,7 +75,7 @@ public class RouterChain<T> {
/**
* @param methodInvokers
* @param url
- * @param invocation TODO has no been used yet
+ * @param invocation TODO has not being used yet
*/
public void preRoute(Map<String, List<Invoker<T>>> methodInvokers, URL url, Invocation invocation) {
if (CollectionUtils.isEmpty(routers)) {
@@ -103,7 +103,7 @@ public class RouterChain<T> {
});
}
- public List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) {
+ public List<Invoker<T>> route(URL url, Invocation invocation) {
List<Invoker<T>> finalInvokers = treeCache.getInvokers(treeCache.getTree(), url, invocation);
for (Router router : routers) {
if (router.isRuntime()) {
@@ -113,6 +113,16 @@ public class RouterChain<T> {
return finalInvokers;
}
+ public List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) {
+ List<Invoker<T>> finalInvokers = invokers;
+ for (Router router : routers) {
+ if (router.isRuntime()) {
+ finalInvokers = router.route(invokers, url, invocation);
+ }
+ }
+ return finalInvokers;
+ }
+
public void notifyRuleChanged() {
preRoute(this.fullMethodInvokers, url, null);
}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
index 07e6dd3..2cee248 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
@@ -68,16 +68,7 @@ public abstract class AbstractDirectory<T> implements Directory<T> {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
- List<Invoker<T>> invokers = doList(invocation);
-
- try {
- // Get invokers from cache, only runtime routers will be executed.
- return routerChain.route(invokers, getConsumerUrl(), invocation);
- } catch (Throwable t) {
- logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
- }
-
- return invokers;
+ return doList(invocation);
}
@Override
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectory.java
index 9367bdf..154fafe 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectory.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectory.java
@@ -17,6 +17,8 @@
package org.apache.dubbo.rpc.cluster.directory;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
@@ -31,6 +33,7 @@ import java.util.List;
*
*/
public class StaticDirectory<T> extends AbstractDirectory<T> {
+ private static final Logger logger = LoggerFactory.getLogger(StaticDirectory.class);
private final List<Invoker<T>> invokers;
@@ -85,8 +88,16 @@ public class StaticDirectory<T> extends AbstractDirectory<T> {
@Override
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
-
- return invokers;
+ List<Invoker<T>> finalInvokers = invokers;
+ if (routerChain != null) {
+ try {
+ // Get invokers from cache, only runtime routers will be executed.
+ finalInvokers = routerChain.route(getConsumerUrl(), invocation);
+ } catch (Throwable t) {
+ logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
+ }
+ }
+ return finalInvokers;
}
}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/AbstractRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/AbstractRouter.java
index f814336..bb76236 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/AbstractRouter.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/AbstractRouter.java
@@ -74,6 +74,11 @@ public abstract class AbstractRouter implements Router {
}
@Override
+ public boolean isRuntime() {
+ return true;
+ }
+
+ @Override
public boolean isForce() {
return force;
}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/ConfigConditionRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/ConfigConditionRouter.java
index 8605afe..1d60ee3 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/ConfigConditionRouter.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/ConfigConditionRouter.java
@@ -72,23 +72,23 @@ public class ConfigConditionRouter extends AbstractRouter implements Configurati
generateAppConditions();
}
} catch (Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
+ throw new IllegalStateException("Failed to init the condition router.", e);
}
}
@Override
public void process(ConfigChangeEvent event) {
- try {
- if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
- // Now, we can only recognize if it's a app level or service level change by try to match event key.
- if (event.getKey().endsWith(this.url.getParameter(Constants.APPLICATION_KEY) + Constants.ROUTERS_SUFFIX)) {
- appRouterRule = null;
- conditionRouters.clear();
- } else {
- routerRule = null;
- appConditionRouters.clear();
- }
+ if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
+ // Now, we can only recognize if it's a app level or service level change by try to match event key.
+ if (event.getKey().endsWith(this.url.getParameter(Constants.APPLICATION_KEY) + Constants.ROUTERS_SUFFIX)) {
+ appRouterRule = null;
+ conditionRouters.clear();
} else {
+ routerRule = null;
+ appConditionRouters.clear();
+ }
+ } else {
+ try {
if (event.getKey().endsWith(this.url.getParameter(Constants.APPLICATION_KEY) + Constants.ROUTERS_SUFFIX)) {
appRouterRule = ConditionRuleParser.parse(event.getNewValue());
generateAppConditions();
@@ -96,12 +96,11 @@ public class ConfigConditionRouter extends AbstractRouter implements Configurati
routerRule = ConditionRuleParser.parse(event.getNewValue());
generateConditions();
}
+ } catch (Exception e) {
+ logger.error("Failed to parse the raw condition rule and it will not take effect, please check if the condition rule matches with the template, the raw rule is:\n " + event.getNewValue(), e);
}
- routerChain.notifyRuleChanged();
- } catch (Exception e) {
- logger.error(e);
- // TODO
}
+ routerChain.notifyRuleChanged();
}
@Override
@@ -139,7 +138,6 @@ public class ConfigConditionRouter extends AbstractRouter implements Configurati
return invokers;
}
-
if (isAppRuleEnabled()) {
for (Router router : appConditionRouters) {
invokers = router.route(invokers, url, invocation);
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/MockInvokersSelector.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/mock/MockInvokersSelector.java
similarity index 76%
rename from dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/MockInvokersSelector.java
rename to dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/mock/MockInvokersSelector.java
index f775f89..5dc954a 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/MockInvokersSelector.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/mock/MockInvokersSelector.java
@@ -1,133 +1,96 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.cluster.router;
-
-import org.apache.dubbo.common.Constants;
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.utils.CollectionUtils;
-import org.apache.dubbo.rpc.Invocation;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.RpcException;
-import org.apache.dubbo.rpc.cluster.Router;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A specific Router designed to realize mock feature.
- * If a request is configured to use mock, then this router guarantees that only the invokers with protocol MOCK appear in final the invoker list, all other invokers will be excluded.
- *
- */
-public class MockInvokersSelector implements Router {
-
- @Override
- public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers,
- URL url, final Invocation invocation) throws RpcException {
- if (invocation.getAttachments() == null) {
- return getNormalInvokers(invokers);
- } else {
- String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK);
- if (value == null)
- return getNormalInvokers(invokers);
- else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) {
- return getMockedInvokers(invokers);
- }
- }
- return invokers;
- }
-
- @Override
- public <T> Map<String, List<Invoker<T>>> preRoute(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
- Map<String, List<Invoker<T>>> map = new HashMap<>();
-
- if (CollectionUtils.isEmpty(invokers)) {
- return map;
- }
-
- if (isRuntime()) {
- map.put(TreeNode.FAILOVER_KEY, invokers);
- return map;
- }
- return map;
- }
-
- @Override
- public boolean isRuntime() {
- return true;
- }
-
- @Override
- public String getKey() {
- return TreeNode.FAILOVER_KEY;
- }
-
- @Override
- public boolean isForce() {
- return false;
- }
-
- private <T> List<Invoker<T>> getMockedInvokers(final List<Invoker<T>> invokers) {
- if (!hasMockProviders(invokers)) {
- return null;
- }
- List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(1);
- for (Invoker<T> invoker : invokers) {
- if (invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) {
- sInvokers.add(invoker);
- }
- }
- return sInvokers;
- }
-
- private <T> List<Invoker<T>> getNormalInvokers(final List<Invoker<T>> invokers) {
- if (!hasMockProviders(invokers)) {
- return invokers;
- } else {
- List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(invokers.size());
- for (Invoker<T> invoker : invokers) {
- if (!invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) {
- sInvokers.add(invoker);
- }
- }
- return sInvokers;
- }
- }
-
- private <T> boolean hasMockProviders(final List<Invoker<T>> invokers) {
- boolean hasMockProvider = false;
- for (Invoker<T> invoker : invokers) {
- if (invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) {
- hasMockProvider = true;
- break;
- }
- }
- return hasMockProvider;
- }
-
- @Override
- public URL getUrl() {
- return null;
- }
-
- @Override
- public int compareTo(Router o) {
- return 1;
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.mock;
+
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.Router;
+import org.apache.dubbo.rpc.cluster.router.AbstractRouter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A specific Router designed to realize mock feature.
+ * If a request is configured to use mock, then this router guarantees that only the invokers with protocol MOCK appear in final the invoker list, all other invokers will be excluded.
+ *
+ */
+public class MockInvokersSelector extends AbstractRouter {
+
+ @Override
+ public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers,
+ URL url, final Invocation invocation) throws RpcException {
+ if (invocation.getAttachments() == null) {
+ return getNormalInvokers(invokers);
+ } else {
+ String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK);
+ if (value == null)
+ return getNormalInvokers(invokers);
+ else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) {
+ return getMockedInvokers(invokers);
+ }
+ }
+ return invokers;
+ }
+
+ private <T> List<Invoker<T>> getMockedInvokers(final List<Invoker<T>> invokers) {
+ if (!hasMockProviders(invokers)) {
+ return null;
+ }
+ List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(1);
+ for (Invoker<T> invoker : invokers) {
+ if (invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) {
+ sInvokers.add(invoker);
+ }
+ }
+ return sInvokers;
+ }
+
+ private <T> List<Invoker<T>> getNormalInvokers(final List<Invoker<T>> invokers) {
+ if (!hasMockProviders(invokers)) {
+ return invokers;
+ } else {
+ List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(invokers.size());
+ for (Invoker<T> invoker : invokers) {
+ if (!invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) {
+ sInvokers.add(invoker);
+ }
+ }
+ return sInvokers;
+ }
+ }
+
+ private <T> boolean hasMockProviders(final List<Invoker<T>> invokers) {
+ boolean hasMockProvider = false;
+ for (Invoker<T> invoker : invokers) {
+ if (invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) {
+ hasMockProvider = true;
+ break;
+ }
+ }
+ return hasMockProvider;
+ }
+
+ @Override
+ public int compareTo(Router o) {
+ return 1;
+ }
+
+}
diff --git a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/nop/NopDynamicConfiguration.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/mock/MockRouterFactory.java
similarity index 57%
copy from dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/nop/NopDynamicConfiguration.java
copy to dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/mock/MockRouterFactory.java
index 75e8bf9..79b9b99 100644
--- a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/nop/NopDynamicConfiguration.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/mock/MockRouterFactory.java
@@ -14,33 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.config.dynamic.support.nop;
+package org.apache.dubbo.rpc.cluster.router.mock;
-import org.apache.dubbo.config.dynamic.AbstractDynamicConfiguration;
-import org.apache.dubbo.config.dynamic.ConfigurationListener;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.config.dynamic.DynamicConfiguration;
+import org.apache.dubbo.rpc.cluster.Router;
+import org.apache.dubbo.rpc.cluster.RouterFactory;
/**
*
*/
-public class NopDynamicConfiguration extends AbstractDynamicConfiguration {
+@Activate
+public class MockRouterFactory implements RouterFactory {
@Override
- public void init() {
-
- }
-
- @Override
- protected String getInternalProperty(String key, String group, long timeout, ConfigurationListener listener) {
- return null;
- }
-
- @Override
- protected void addTargetListener(String key, Object o) {
-
+ public Router getRouter(URL url) {
+ return new MockInvokersSelector();
}
@Override
- protected Object createTargetConfigListener(String key, ConfigurationListener listener) {
- return null;
+ public Router getRouter(DynamicConfiguration dynamicConfiguration, URL url) {
+ return getRouter(url);
}
-}
+}
\ No newline at end of file
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java
index c970974..4aadc82 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java
@@ -77,9 +77,13 @@ public class TagRouter extends AbstractRouter implements Comparable<Router>, Con
if (StringUtils.isEmpty(application)) {
logger.error("TagRouter must getConfig from or subscribe to a specific application, but the application in this TagRouter is not specified.");
}
- String rawRule = this.configuration.getConfig(application + TAGROUTERRULES_DATAID, "dubbo", this);
- if (StringUtils.isNotEmpty(rawRule)) {
- this.tagRouterRule = TagRuleParser.parse(rawRule);
+ try {
+ String rawRule = this.configuration.getConfig(application + TAGROUTERRULES_DATAID, "dubbo", this);
+ if (StringUtils.isNotEmpty(rawRule)) {
+ this.tagRouterRule = TagRuleParser.parse(rawRule);
+ }
+ } catch (Exception e) {
+ logger.error("Failed to parse the raw tag router rule and it will not take effect, please check if the rule matches with the template, the raw rule is:\n ", e);
}
}
@@ -93,8 +97,7 @@ public class TagRouter extends AbstractRouter implements Comparable<Router>, Con
}
routerChain.notifyRuleChanged();
} catch (Exception e) {
- // TODO
- logger.error(e);
+ logger.error("Failed to parse the raw tag router rule and it will not take effect, please check if the rule matches with the template, the raw rule is:\n ", e);
}
}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
index 51fe092..92d2ee7 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
@@ -26,10 +26,10 @@ import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.RpcContext;
-import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.support.RpcUtils;
@@ -254,7 +254,7 @@ public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
protected void checkInvokers(List<Invoker<T>> invokers, Invocation invocation) {
if (invokers == null || invokers.isEmpty()) {
- throw new RpcException("Failed to invoke the method "
+ throw new RpcException(RpcException.NO_INVOKER_AVAILABLE_AFTER_FILTER, "Failed to invoke the method "
+ invocation.getMethodName() + " in the service " + getInterface().getName()
+ ". No provider available for the service " + directory.getUrl().getServiceKey()
+ " from registry " + directory.getUrl().getAddress()
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ClusterUtils.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ClusterUtils.java
index 1c62d9f..5b4b589 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ClusterUtils.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ClusterUtils.java
@@ -35,7 +35,6 @@ public class ClusterUtils {
Map<String, String> map = new HashMap<String, String>();
Map<String, String> remoteMap = remoteUrl.getParameters();
-
if (remoteMap != null && remoteMap.size() > 0) {
map.putAll(remoteMap);
@@ -66,6 +65,10 @@ public class ClusterUtils {
}
if (localMap != null && localMap.size() > 0) {
+ // All providers come to here have been filtered by group, which means only those providers that have the exact same group value with the consumer could come to here.
+ // So, generally, we don't need to care about the group value here.
+ // But when comes to group merger, there is an exception, the consumer group may be '*' while the provider group can be empty or any other values.
+ localMap.remove(Constants.GROUP_KEY);
map.putAll(localMap);
}
if (remoteMap != null && remoteMap.size() > 0) {
@@ -78,10 +81,6 @@ public class ClusterUtils {
if (version != null && version.length() > 0) {
map.put(Constants.VERSION_KEY, version);
}
- String group = remoteMap.get(Constants.GROUP_KEY);
- if (group != null && group.length() > 0) {
- map.put(Constants.GROUP_KEY, group);
- }
String methods = remoteMap.get(Constants.METHODS_KEY);
if (methods != null && methods.length() > 0) {
map.put(Constants.METHODS_KEY, methods);
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvoker.java
index 0c68eab..82a0e8a 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvoker.java
@@ -30,6 +30,7 @@ import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
+import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.cluster.Merger;
import org.apache.dubbo.rpc.cluster.merger.MergerFactory;
@@ -47,26 +48,31 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@SuppressWarnings("unchecked")
-public class MergeableClusterInvoker<T> implements Invoker<T> {
+public class MergeableClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger log = LoggerFactory.getLogger(MergeableClusterInvoker.class);
- private final Directory<T> directory;
private ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("mergeable-cluster-executor", true));
public MergeableClusterInvoker(Directory<T> directory) {
- this.directory = directory;
+ super(directory);
}
@Override
- @SuppressWarnings("rawtypes")
- public Result invoke(final Invocation invocation) throws RpcException {
- List<Invoker<T>> invokers = directory.list(invocation);
-
+ protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
+ checkInvokers(invokers, invocation);
String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group
for (final Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
- return invoker.invoke(invocation);
+ try {
+ return invoker.invoke(invocation);
+ } catch (RpcException e) {
+ if (e.isNoInvokerAvailableAfterFilter()) {
+ log.debug("No available provider for service" + directory.getUrl().getServiceKey() + " on group " + invoker.getUrl().getParameter(Constants.GROUP_KEY) + ", will continue to try another group.");
+ } else {
+ throw e;
+ }
+ }
}
}
return invokers.iterator().next().invoke(invocation);
@@ -101,8 +107,8 @@ public class MergeableClusterInvoker<T> implements Invoker<T> {
try {
Result r = future.get(timeout, TimeUnit.MILLISECONDS);
if (r.hasException()) {
- log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) +
- " failed: " + r.getException().getMessage(),
+ log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) +
+ " failed: " + r.getException().getMessage(),
r.getException());
} else {
resultList.add(r);
@@ -128,7 +134,7 @@ public class MergeableClusterInvoker<T> implements Invoker<T> {
try {
method = returnType.getMethod(merger, returnType);
} catch (NoSuchMethodException e) {
- throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " +
+ throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " +
returnType.getClass().getName() + " ]");
}
if (!Modifier.isPublic(method.getModifiers())) {
@@ -170,6 +176,7 @@ public class MergeableClusterInvoker<T> implements Invoker<T> {
return new RpcResult(result);
}
+
@Override
public Class<T> getInterface() {
return directory.getInterface();
diff --git a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/nop/NopDynamicConfiguration.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/RegistryAwareCluster.java
similarity index 55%
copy from dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/nop/NopDynamicConfiguration.java
copy to dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/RegistryAwareCluster.java
index 75e8bf9..81b21a3 100644
--- a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/nop/NopDynamicConfiguration.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/RegistryAwareCluster.java
@@ -14,33 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.config.dynamic.support.nop;
+package org.apache.dubbo.rpc.cluster.support;
-import org.apache.dubbo.config.dynamic.AbstractDynamicConfiguration;
-import org.apache.dubbo.config.dynamic.ConfigurationListener;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.Cluster;
+import org.apache.dubbo.rpc.cluster.Directory;
/**
*
*/
-public class NopDynamicConfiguration extends AbstractDynamicConfiguration {
+public class RegistryAwareCluster implements Cluster {
- @Override
- public void init() {
-
- }
+ public final static String NAME = "registryaware";
@Override
- protected String getInternalProperty(String key, String group, long timeout, ConfigurationListener listener) {
- return null;
+ public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
+ return new RegistryAwareClusterInvoker<T>(directory);
}
- @Override
- protected void addTargetListener(String key, Object o) {
-
- }
-
- @Override
- protected Object createTargetConfigListener(String key, ConfigurationListener listener) {
- return null;
- }
-}
+}
\ No newline at end of file
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/RegistryAwareClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/RegistryAwareClusterInvoker.java
new file mode 100644
index 0000000..dfd8156
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/RegistryAwareClusterInvoker.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.support;
+
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.Directory;
+import org.apache.dubbo.rpc.cluster.LoadBalance;
+
+import java.util.List;
+
+/**
+ *
+ */
+public class RegistryAwareClusterInvoker<T> extends AbstractClusterInvoker<T> {
+
+ private static final Logger logger = LoggerFactory.getLogger(RegistryAwareClusterInvoker.class);
+
+ public RegistryAwareClusterInvoker(Directory<T> directory) {
+ super(directory);
+ }
+
+ @Override
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
+ // First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'default' key.
+ for (Invoker<T> invoker : invokers) {
+ if (invoker.getUrl().getParameter(Constants.REGISTRY_KEY + "." + Constants.DEFAULT_KEY, false)) {
+ return invoker.invoke(invocation);
+ }
+ }
+ // If none of the invokers has a local signal, pick the first one available.
+ for (Invoker<T> invoker : invokers) {
+ if (invoker.isAvailable()) {
+ return invoker.invoke(invocation);
+ }
+ }
+ throw new RpcException("No provider available in " + invokers);
+ }
+}
diff --git a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.Cluster b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.Cluster
index ef212d7..ba30247 100644
--- a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.Cluster
+++ b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.Cluster
@@ -6,4 +6,5 @@ failback=org.apache.dubbo.rpc.cluster.support.FailbackCluster
forking=org.apache.dubbo.rpc.cluster.support.ForkingCluster
available=org.apache.dubbo.rpc.cluster.support.AvailableCluster
mergeable=org.apache.dubbo.rpc.cluster.support.MergeableCluster
-broadcast=org.apache.dubbo.rpc.cluster.support.BroadcastCluster
\ No newline at end of file
+broadcast=org.apache.dubbo.rpc.cluster.support.BroadcastCluster
+registryaware=org.apache.dubbo.rpc.cluster.support.RegistryAwareCluster
\ No newline at end of file
diff --git a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory
index 3c36311..4a27434 100644
--- a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory
+++ b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory
@@ -2,4 +2,5 @@ file=org.apache.dubbo.rpc.cluster.router.file.FileRouterFactory
script=org.apache.dubbo.rpc.cluster.router.script.ScriptRouterFactory
condition=org.apache.dubbo.rpc.cluster.router.condition.ConditionRouterFactory
configcondition=org.apache.dubbo.rpc.cluster.router.condition.config.ConfigConditionRouterFactory
-tag=org.apache.dubbo.rpc.cluster.router.tag.TagRouterFactory
\ No newline at end of file
+tag=org.apache.dubbo.rpc.cluster.router.tag.TagRouterFactory
+mock=org.apache.dubbo.rpc.cluster.router.mock.MockRouterFactory
\ No newline at end of file
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
index 49b0bd1..8b99c9d 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
@@ -61,6 +61,7 @@ public class Constants {
public static final String CONFIG_ENV_KEY = "config.env";
public static final String CONFIG_CLUSTER_KEY = "config.cluster";
public static final String CONFIG_NAMESPACE_KEY = "config.namespace";
+ public static final String CONFIG_CHECK_KEY = "config.check";
public static final String DEFAULT_CATEGORY = PROVIDERS_CATEGORY;
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionLoader.java b/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionLoader.java
index 10e9fbb..e402a21 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionLoader.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionLoader.java
@@ -24,6 +24,7 @@ import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.common.utils.Holder;
+import org.apache.dubbo.common.utils.ReflectUtils;
import org.apache.dubbo.common.utils.StringUtils;
import java.io.BufferedReader;
@@ -528,6 +529,9 @@ public class ExtensionLoader<T> {
&& method.getParameterTypes().length == 1
&& Modifier.isPublic(method.getModifiers())) {
Class<?> pt = method.getParameterTypes()[0];
+ if (ReflectUtils.isPrimitives(pt)) {
+ continue;
+ }
try {
String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
Object object = objectFactory.getExtension(pt, property);
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
index 43e3ca0..b6cea16 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
@@ -36,8 +36,8 @@ import org.apache.dubbo.rpc.ProxyFactory;
import org.apache.dubbo.rpc.StaticContext;
import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.directory.StaticDirectory;
-import org.apache.dubbo.rpc.cluster.support.AvailableCluster;
import org.apache.dubbo.rpc.cluster.support.ClusterUtils;
+import org.apache.dubbo.rpc.cluster.support.RegistryAwareCluster;
import org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol;
import org.apache.dubbo.rpc.service.GenericService;
import org.apache.dubbo.rpc.support.ProtocolUtils;
@@ -404,10 +404,11 @@ public class ReferenceConfig<T> extends AbstractReferenceConfig {
}
}
if (registryURL != null) { // registry url is available
- // use AvailableCluster only when register's cluster is available
- URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
+ // use RegistryAwareCluster only when register's cluster is available
+ URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME);
+ // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
invoker = cluster.join(new StaticDirectory(u, invokers));
- } else { // not a registry url
+ } else { // not a registry url, must be direct invoke.
invoker = cluster.join(new StaticDirectory(invokers));
}
}
diff --git a/dubbo-config/dubbo-config-dynamic/pom.xml b/dubbo-config/dubbo-config-dynamic/pom.xml
index 3243be6..b08bc84 100644
--- a/dubbo-config/dubbo-config-dynamic/pom.xml
+++ b/dubbo-config/dubbo-config-dynamic/pom.xml
@@ -43,7 +43,7 @@
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
- <version>1.0.0</version>
+ <version>1.1.1</version>
</dependency>
<dependency>
<groupId>com.netflix.archaius</groupId>
diff --git a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/AbstractDynamicConfiguration.java b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/AbstractDynamicConfiguration.java
index 5b8a189..a75db46 100644
--- a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/AbstractDynamicConfiguration.java
+++ b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/AbstractDynamicConfiguration.java
@@ -57,7 +57,7 @@ public abstract class AbstractDynamicConfiguration<TargetConfigListener> impleme
if (listener != null) {
this.addListener(key, listener);
}
- return getInternalProperty(key, group, timeout, listener);
+ return getInternalProperty(key, group, timeout);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
@@ -71,7 +71,7 @@ public abstract class AbstractDynamicConfiguration<TargetConfigListener> impleme
this.url = url;
}
- protected abstract String getInternalProperty(String key, String group, long timeout, ConfigurationListener listener);
+ protected abstract String getInternalProperty(String key, String group, long timeout);
protected abstract void addTargetListener(String key, TargetConfigListener listener);
diff --git a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/apollo/ApolloDynamicConfiguration.java b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/apollo/ApolloDynamicConfiguration.java
index 489bf47..a70b113 100644
--- a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/apollo/ApolloDynamicConfiguration.java
+++ b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/apollo/ApolloDynamicConfiguration.java
@@ -19,11 +19,14 @@ package org.apache.dubbo.config.dynamic.support.apollo;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigChangeListener;
import com.ctrip.framework.apollo.ConfigService;
+import com.ctrip.framework.apollo.enums.ConfigSourceType;
import com.ctrip.framework.apollo.enums.PropertyChangeType;
import com.ctrip.framework.apollo.model.ConfigChange;
import com.ctrip.framework.apollo.model.ConfigChangeEvent;
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.config.dynamic.AbstractDynamicConfiguration;
import org.apache.dubbo.config.dynamic.ConfigChangeType;
@@ -37,15 +40,13 @@ import java.util.Set;
*
*/
public class ApolloDynamicConfiguration extends AbstractDynamicConfiguration<ConfigChangeListener> {
+ private static final Logger logger = LoggerFactory.getLogger(ApolloDynamicConfiguration.class);
private static final String APOLLO_ENV_KEY = "env";
private static final String APOLLO_ADDR_KEY = "apollo.meta";
private static final String APOLLO_CLUSTER_KEY = "apollo.cluster";
private static final String APPLO_DEFAULT_NAMESPACE = "dubbo";
- /**
- * support two namespaces: application -> dubbo
- */
+
private Config dubboConfig;
- private Config appConfig;
public ApolloDynamicConfiguration() {
@@ -72,33 +73,28 @@ public class ApolloDynamicConfiguration extends AbstractDynamicConfiguration<Con
}
dubboConfig = ConfigService.getConfig(url.getParameter(Constants.CONFIG_NAMESPACE_KEY, APPLO_DEFAULT_NAMESPACE));
- appConfig = ConfigService.getAppConfig();
- }
-
- @Override
- public void addListener(String key, ConfigurationListener listener) {
- Set<String> keys = new HashSet<>(1);
- keys.add(key);
- this.appConfig.addChangeListener(new ApolloListener(listener), keys);
- this.dubboConfig.addChangeListener(new ApolloListener(listener), keys);
+ // Decide to fail or to continue when failed to connect to remote server.
+ boolean check = url.getParameter(Constants.CONFIG_CHECK_KEY, false);
+ if (dubboConfig.getSourceType() != ConfigSourceType.REMOTE) {
+ if (check) {
+ throw new IllegalStateException("Failed to connect to ConfigCenter, the ConfigCenter is Apollo, the address is: " + (StringUtils.isNotEmpty(configAddr) ? configAddr : configEnv));
+ } else {
+ logger.warn("Failed to connect to ConfigCenter, the ConfigCenter is Apollo, " +
+ "the address is: " + (StringUtils.isNotEmpty(configAddr) ? configAddr : configEnv) +
+ ". will use the local cache value instead before finally connected.");
+ }
+ }
}
@Override
- protected String getInternalProperty(String key, String group, long timeout, ConfigurationListener listener) {
- // FIXME According to Apollo, if it fails to get a value from one namespace, it will keep logging warning msg. They are working to improve it.
- String value = appConfig.getProperty(key, null);
- if (value == null) {
- value = dubboConfig.getProperty(key, null);
- }
-
- return value;
+ protected String getInternalProperty(String key, String group, long timeout) {
+ return dubboConfig.getProperty(key, null);
}
@Override
protected void addTargetListener(String key, ConfigChangeListener listener) {
Set<String> keys = new HashSet<>(1);
keys.add(key);
- this.appConfig.addChangeListener(listener, keys);
this.dubboConfig.addChangeListener(listener, keys);
}
@@ -107,8 +103,8 @@ public class ApolloDynamicConfiguration extends AbstractDynamicConfiguration<Con
return new ApolloListener(listener);
}
- public ConfigChangeType getChangeType(PropertyChangeType changeType) {
- if (changeType.equals(PropertyChangeType.DELETED)) {
+ public ConfigChangeType getChangeType(ConfigChange change) {
+ if (change.getChangeType() == PropertyChangeType.DELETED || StringUtils.isEmpty(change.getNewValue())) {
return ConfigChangeType.DELETED;
}
return ConfigChangeType.MODIFIED;
@@ -127,16 +123,15 @@ public class ApolloDynamicConfiguration extends AbstractDynamicConfiguration<Con
this.listener = listener;
}
- // FIXME will Apollo consider an empty value "" as deleted?
@Override
public void onChange(ConfigChangeEvent changeEvent) {
for (String key : changeEvent.changedKeys()) {
ConfigChange change = changeEvent.getChange(key);
// TODO Maybe we no longer need to identify the type of change. Because there's no scenario that a callback will subscribe for both configurators and routers
if (change.getPropertyName().endsWith(Constants.CONFIGURATORS_SUFFIX)) {
- listener.process(new org.apache.dubbo.config.dynamic.ConfigChangeEvent(key, change.getNewValue(), ConfigType.CONFIGURATORS, getChangeType(change.getChangeType())));
+ listener.process(new org.apache.dubbo.config.dynamic.ConfigChangeEvent(key, change.getNewValue(), ConfigType.CONFIGURATORS, getChangeType(change)));
} else {
- listener.process(new org.apache.dubbo.config.dynamic.ConfigChangeEvent(key, change.getNewValue(), ConfigType.ROUTERS, getChangeType(change.getChangeType())));
+ listener.process(new org.apache.dubbo.config.dynamic.ConfigChangeEvent(key, change.getNewValue(), ConfigType.ROUTERS, getChangeType(change)));
}
}
}
diff --git a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/ArchaiusDynamicConfiguration.java b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/ArchaiusDynamicConfiguration.java
index feb177a..6639584 100644
--- a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/ArchaiusDynamicConfiguration.java
+++ b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/ArchaiusDynamicConfiguration.java
@@ -48,6 +48,7 @@ public class ArchaiusDynamicConfiguration extends AbstractDynamicConfiguration<R
System.setProperty(ZooKeeperConfigurationSource.ARCHAIUS_SOURCE_ADDRESS_KEY, address);
}
System.setProperty(ZooKeeperConfigurationSource.ARCHAIUS_CONFIG_ROOT_PATH_KEY, url.getParameter(Constants.CONFIG_NAMESPACE_KEY, ZooKeeperConfigurationSource.DEFAULT_CONFIG_ROOT_PATH));
+ System.setProperty(ZooKeeperConfigurationSource.ARCHAIUS_CONFIG_CHECK_KEY, url.getParameter(Constants.CONFIG_CHECK_KEY, "false"));
try {
ZooKeeperConfigurationSource zkConfigSource = new ZooKeeperConfigurationSource();
@@ -63,7 +64,7 @@ public class ArchaiusDynamicConfiguration extends AbstractDynamicConfiguration<R
}
@Override
- protected String getInternalProperty(String key, String group, long timeout, ConfigurationListener listener) {
+ protected String getInternalProperty(String key, String group, long timeout) {
return DynamicPropertyFactory.getInstance()
.getStringProperty(key, null)
.get();
diff --git a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/sources/ZooKeeperConfigurationSource.java b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/sources/ZooKeeperConfigurationSource.java
index 41d6861..fdfcba7 100644
--- a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/sources/ZooKeeperConfigurationSource.java
+++ b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/sources/ZooKeeperConfigurationSource.java
@@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit;
public class ZooKeeperConfigurationSource implements WatchedConfigurationSource, Closeable {
public static final String ARCHAIUS_SOURCE_ADDRESS_KEY = "archaius.zk.address";
public static final String ARCHAIUS_CONFIG_ROOT_PATH_KEY = "archaius.zk.rootpath";
+ public static final String ARCHAIUS_CONFIG_CHECK_KEY = "archaius.zk.check";
public static final String DEFAULT_CONFIG_ROOT_PATH = "/dubbo/config";
private static final Logger logger = LoggerFactory.getLogger(com.netflix.config.source.ZooKeeperConfigurationSource.class);
@@ -63,7 +64,7 @@ public class ZooKeeperConfigurationSource implements WatchedConfigurationSource,
private List<WatchedUpdateListener> listeners = new CopyOnWriteArrayList<WatchedUpdateListener>();
public ZooKeeperConfigurationSource() {
- this(System.getProperty(ARCHAIUS_SOURCE_ADDRESS_KEY), 60 * 1000, 60 * 1000, System.getProperty(ARCHAIUS_CONFIG_ROOT_PATH_KEY, DEFAULT_CONFIG_ROOT_PATH));
+ this(System.getProperty(ARCHAIUS_SOURCE_ADDRESS_KEY), 60 * 1000, 10000, System.getProperty(ARCHAIUS_CONFIG_ROOT_PATH_KEY, DEFAULT_CONFIG_ROOT_PATH));
}
public ZooKeeperConfigurationSource(int sessionTimeout, int connectTimeout, String configRootPath) {
@@ -80,12 +81,17 @@ public class ZooKeeperConfigurationSource implements WatchedConfigurationSource,
new ExponentialBackoffRetry(1000, 3));
client.start();
try {
- connected = client.blockUntilConnected(connectTimeout * 4, TimeUnit.MILLISECONDS);
+ connected = client.blockUntilConnected(connectTimeout, TimeUnit.MILLISECONDS);
if (!connected) {
- logger.warn("Cannot connect to ConfigCenter at zookeeper " + connectString + " in " + connectTimeout * 4 + "ms");
+ boolean check = Boolean.parseBoolean(System.getProperty(ARCHAIUS_CONFIG_CHECK_KEY, "false"));
+ if (check) {
+ throw new IllegalStateException("Failed to connect to ConfigCenter Zookeeper : " + connectString + " in " + connectTimeout + "ms.");
+ } else {
+ logger.warn("Cannot connect to ConfigCenter at zookeeper " + connectString + " in " + connectTimeout + "ms");
+ }
}
} catch (InterruptedException e) {
- logger.error("The thread was interrupted unexpectedly when try connecting to zookeeper " + connectString + " as ConfigCenter, ", e);
+ throw new IllegalStateException("The thread was interrupted unexpectedly when try connecting to zookeeper " + connectString + " as ConfigCenter, ", e);
}
this.client = client;
this.configRootPath = configRootPath;
@@ -181,7 +187,7 @@ public class ZooKeeperConfigurationSource implements WatchedConfigurationSource,
Map<String, Object> all = new HashMap<>();
if (!connected) {
- logger.warn("ConfigServer is not connected yet, zookeeper don't support local snapshot yet, so there's no old data to use!");
+ logger.warn("ConfigCenter is not connected yet, zookeeper don't support local snapshot yet, so there's no old data to use!");
return all;
}
diff --git a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/nop/NopDynamicConfiguration.java b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/nop/NopDynamicConfiguration.java
index 75e8bf9..83f475e 100644
--- a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/nop/NopDynamicConfiguration.java
+++ b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/nop/NopDynamicConfiguration.java
@@ -30,7 +30,7 @@ public class NopDynamicConfiguration extends AbstractDynamicConfiguration {
}
@Override
- protected String getInternalProperty(String key, String group, long timeout, ConfigurationListener listener) {
+ protected String getInternalProperty(String key, String group, long timeout) {
return null;
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
index c3403df..0270e96 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
@@ -53,7 +53,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -116,13 +115,22 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
this.serviceType = serviceType;
this.serviceKey = url.getServiceKey();
this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
- this.overrideDirectoryUrl = this.directoryUrl = url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY);
+ this.overrideDirectoryUrl = this.directoryUrl = turnRegistryUrlToConsumerUrl(url);
String group = directoryUrl.getParameter(Constants.GROUP_KEY, "");
this.multiGroup = group != null && ("*".equals(group) || group.contains(","));
String methods = queryMap.get(Constants.METHODS_KEY);
this.serviceMethods = methods == null ? null : Constants.COMMA_SPLIT_PATTERN.split(methods);
}
+ private URL turnRegistryUrlToConsumerUrl(URL url) {
+ // save any parameter in registry that will be useful to the new url.
+ String isDefault = url.getParameter(Constants.DEFAULT_KEY);
+ if (StringUtils.isNotEmpty(isDefault)) {
+ queryMap.put(Constants.REGISTRY_KEY + "." + Constants.DEFAULT_KEY, isDefault);
+ }
+ return url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY);
+ }
+
/**
* Convert override urls to map for use when re-refer.
* Send all rules every time, the urls will be reassembled and calculated
@@ -176,14 +184,24 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
public void subscribe(URL url) {
setConsumerUrl(url);
- String rawConfig = dynamicConfiguration.getConfig(url.getServiceKey() + Constants.CONFIGURATORS_SUFFIX, "dubbo", this);
- String rawConfigApp = dynamicConfiguration.getConfig(url.getParameter(Constants.APPLICATION_KEY) + Constants.CONFIGURATORS_SUFFIX, "dubbo", this);
-
- if (StringUtils.isNotEmpty(rawConfig)) {
- this.dynamicConfigurators = configToConfiguratiors(rawConfig);
+ String rawConfig = null;
+ try {
+ rawConfig = dynamicConfiguration.getConfig(url.getServiceKey() + Constants.CONFIGURATORS_SUFFIX, "dubbo", this);
+ if (StringUtils.isNotEmpty(rawConfig)) {
+ this.dynamicConfigurators = configToConfiguratiors(rawConfig);
+ }
+ } catch (Exception e) {
+ logger.error("Failed to load or parse dynamic config (service level), the raw config is: " + rawConfig, e);
}
- if (StringUtils.isNotEmpty(rawConfigApp)) {
- this.appDynamicConfigurators = configToConfiguratiors(rawConfigApp);
+
+ String rawConfigApp = null;
+ try {
+ rawConfigApp = dynamicConfiguration.getConfig(url.getParameter(Constants.APPLICATION_KEY) + Constants.CONFIGURATORS_SUFFIX, "dubbo", this);
+ if (StringUtils.isNotEmpty(rawConfigApp)) {
+ this.appDynamicConfigurators = configToConfiguratiors(rawConfigApp);
+ }
+ } catch (Exception e) {
+ logger.error("Failed to load or parse dynamic config (application level), the raw config is: " + rawConfigApp, e);
}
registry.subscribe(url, this);
@@ -306,10 +324,12 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
+ // pre-route and build cache, notice that route cache should build on original Invoker list.
+ // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers should be routed.
+ routerChain.notifyFullInvokers(newMethodInvokerMap, getConsumerUrl());
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
- // Route and build cache
- routerChain.notifyFullInvokers(methodInvokerMap, getConsumerUrl());
+
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
@@ -338,7 +358,9 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
} else if (groupMap.size() > 1) {
List<Invoker<T>> groupInvokers = new ArrayList<Invoker<T>>();
for (List<Invoker<T>> groupList : groupMap.values()) {
- groupInvokers.add(cluster.join(new StaticDirectory<T>(groupList)));
+ StaticDirectory<T> staticDirectory = new StaticDirectory<>(groupList);
+ staticDirectory.setRouterChain(routerChain);
+ groupInvokers.add(cluster.join(staticDirectory));
}
result.put(method, groupInvokers);
} else {
@@ -624,18 +646,27 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
"No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
}
+
+ if (multiGroup) {
+ Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
+ String methodName = RpcUtils.getMethodName(invocation);
+ return localMethodInvokerMap.get(methodName);
+ }
+
List<Invoker<T>> invokers = null;
- Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
+ try {
+ // Get invokers from cache, only runtime routers will be executed.
+ invokers = routerChain.route(getConsumerUrl(), invocation);
+ } catch (Throwable t) {
+ logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
+ }
+
+
+ // FIXME Is there any need of failing back to Constants.ANY_VALUE or the first available method invokers when invokers is null?
+ /*Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
String methodName = RpcUtils.getMethodName(invocation);
- Object[] args = RpcUtils.getArguments(invocation);
- if (args != null && args.length > 0 && args[0] != null
- && (args[0] instanceof String || args[0].getClass().isEnum())) {
- invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
- }
- if (invokers == null) {
- invokers = localMethodInvokerMap.get(methodName);
- }
+ invokers = localMethodInvokerMap.get(methodName);
if (invokers == null) {
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
@@ -645,8 +676,8 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
invokers = iterator.next();
}
}
- }
- return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
+ }*/
+ return invokers == null ? new ArrayList<>(0) : invokers;
}
@Override
@@ -656,7 +687,7 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
@Override
public void process(ConfigChangeEvent event) {
- List<URL> urls;
+ List<URL> urls = new ArrayList<>();
if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
URL url = getConsumerUrl().clearParameters().setProtocol(Constants.EMPTY_PROTOCOL);
if (event.getKey().endsWith(this.queryMap.get(APPLICATION_KEY) + CONFIGURATORS_SUFFIX)) {
@@ -664,10 +695,13 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
} else {
url = url.addParameter(Constants.CATEGORY_KEY, Constants.DYNAMIC_CONFIGURATORS_CATEGORY);
}
- urls = new ArrayList<>();
urls.add(url);
} else {
- urls = ConfigParser.parseConfigurators(event.getNewValue());
+ try {
+ urls = ConfigParser.parseConfigurators(event.getNewValue());
+ } catch (Exception e) {
+ logger.error("Failed to parse raw dynamic config and it will not take effect, the raw config is: " + event.getNewValue(), e);
+ }
}
notify(urls);
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcException.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcException.java
index 2596912..deb37ca 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcException.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcException.java
@@ -32,6 +32,7 @@ public /**final**/ class RpcException extends RuntimeException {
public static final int BIZ_EXCEPTION = 3;
public static final int FORBIDDEN_EXCEPTION = 4;
public static final int SERIALIZATION_EXCEPTION = 5;
+ public static final int NO_INVOKER_AVAILABLE_AFTER_FILTER = 6;
private static final long serialVersionUID = 7815426752583648734L;
private int code; // RpcException cannot be extended, use error code for exception type to keep compatibility
@@ -98,4 +99,8 @@ public /**final**/ class RpcException extends RuntimeException {
public boolean isSerialization() {
return code == SERIALIZATION_EXCEPTION;
}
+
+ public boolean isNoInvokerAvailableAfterFilter() {
+ return code == NO_INVOKER_AVAILABLE_AFTER_FILTER;
+ }
}
\ No newline at end of file