You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2022/07/14 09:43:32 UTC
[dubbo] branch 3.1 updated: Fix wildcard match (#10256)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.1 by this push:
new 8dec7d411d Fix wildcard match (#10256)
8dec7d411d is described below
commit 8dec7d411d0f045537516f3db2f916fd11486a72
Author: Albumen Kevin <jh...@gmail.com>
AuthorDate: Thu Jul 14 17:43:26 2022 +0800
Fix wildcard match (#10256)
---
.../apache/dubbo/common/ProtocolServiceKey.java | 124 ++++++++++++
.../java/org/apache/dubbo/common/ServiceKey.java | 146 +++++++++++++
.../common/ProtocolServiceKeyMatcherTest.java | 107 ++++++++++
.../dubbo/common/ProtocolServiceKeyTest.java | 76 +++++++
.../apache/dubbo/common/ServiceKeyMatcherTest.java | 224 ++++++++++++++++++++
.../org/apache/dubbo/common/ServiceKeyTest.java | 54 +++++
.../org/apache/dubbo/metadata/MetadataInfo.java | 29 ++-
.../registry/client/DefaultServiceInstance.java | 6 +
.../dubbo/registry/client/InstanceAddressURL.java | 21 ++
.../registry/client/ServiceDiscoveryRegistry.java | 8 +-
.../client/ServiceDiscoveryRegistryDirectory.java | 208 ++++++++++++++++---
.../listener/ServiceInstancesChangedListener.java | 213 ++++++++-----------
.../client/ServiceDiscoveryRegistryTest.java | 10 +-
.../MockServiceInstancesChangedListener.java | 5 +-
.../ServiceInstancesChangedListenerTest.java | 225 +++++++++++++++------
15 files changed, 1212 insertions(+), 244 deletions(-)
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/ProtocolServiceKey.java b/dubbo-common/src/main/java/org/apache/dubbo/common/ProtocolServiceKey.java
new file mode 100644
index 0000000000..0cc8b203d4
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/ProtocolServiceKey.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common;
+
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.utils.StringUtils;
+
+import java.util.Objects;
+
+public class ProtocolServiceKey extends ServiceKey {
+ private final String protocol;
+
+ public ProtocolServiceKey(String interfaceName, String version, String group, String protocol) {
+ super(interfaceName, version, group);
+ this.protocol = protocol;
+ }
+
+ public String getProtocol() {
+ return protocol;
+ }
+
+ public String getServiceKeyString() {
+ return super.toString();
+ }
+
+ public boolean isSameWith(ProtocolServiceKey protocolServiceKey) {
+ // interface version group should be the same
+ if (!super.equals(protocolServiceKey)) {
+ return false;
+ }
+
+ // origin protocol is *, can not match any protocol
+ if (CommonConstants.ANY_VALUE.equals(protocol)) {
+ return false;
+ }
+
+ // origin protocol is null, can match any protocol
+ if (StringUtils.isEmpty(protocol) || StringUtils.isEmpty(protocolServiceKey.getProtocol())) {
+ return true;
+ }
+
+ // origin protocol is not *, match itself
+ return Objects.equals(protocol, protocolServiceKey.getProtocol());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ ProtocolServiceKey that = (ProtocolServiceKey) o;
+ return Objects.equals(protocol, that.protocol);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), protocol);
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + CommonConstants.GROUP_CHAR_SEPARATOR + protocol;
+ }
+
+ public static class Matcher {
+ public static boolean isMatch(ProtocolServiceKey rule, ProtocolServiceKey target) {
+ // 1. 2. 3. match interface / version / group
+ if (!ServiceKey.Matcher.isMatch(rule, target)) {
+ return false;
+ }
+
+ // 4.match protocol
+ // 4.1. if rule group is *, match all
+ if (!CommonConstants.ANY_VALUE.equals(rule.getProtocol())) {
+ // 4.2. if rule protocol is null, match all
+ if (StringUtils.isNotEmpty(rule.getProtocol())) {
+ // 4.3. if rule protocol contains ',', split and match each
+ if (rule.getProtocol().contains(CommonConstants.COMMA_SEPARATOR)) {
+ String[] protocols = rule.getProtocol().split("\\" +CommonConstants.COMMA_SEPARATOR, -1);
+ boolean match = false;
+ for (String protocol : protocols) {
+ protocol = protocol.trim();
+ if (StringUtils.isEmpty(protocol) && StringUtils.isEmpty(target.getProtocol())) {
+ match = true;
+ break;
+ } else if (protocol.equals(target.getProtocol())) {
+ match = true;
+ break;
+ }
+ }
+ if (!match) {
+ return false;
+ }
+ }
+ // 4.3. if rule protocol is not contains ',', match directly
+ else if (!Objects.equals(rule.getProtocol(), target.getProtocol())) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ }
+}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/ServiceKey.java b/dubbo-common/src/main/java/org/apache/dubbo/common/ServiceKey.java
new file mode 100644
index 0000000000..c8d8f846cc
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/ServiceKey.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common;
+
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.utils.StringUtils;
+
+import java.util.Objects;
+
+public class ServiceKey {
+ private final String interfaceName;
+ private final String group;
+ private final String version;
+
+ public ServiceKey(String interfaceName, String version, String group) {
+ this.interfaceName = interfaceName;
+ this.group = group;
+ this.version = version;
+ }
+
+ public String getInterfaceName() {
+ return interfaceName;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ServiceKey that = (ServiceKey) o;
+ return Objects.equals(interfaceName, that.interfaceName) && Objects.equals(group, that.group) && Objects.equals(version, that.version);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(interfaceName, group, version);
+ }
+
+ @Override
+ public String toString() {
+ return BaseServiceMetadata.buildServiceKey(interfaceName, group, version);
+ }
+
+
+ public static class Matcher {
+ public static boolean isMatch(ServiceKey rule, ServiceKey target) {
+ // 1. match interface (accurate match)
+ if (!Objects.equals(rule.getInterfaceName(), target.getInterfaceName())) {
+ return false;
+ }
+
+ // 2. match version (accurate match)
+ // 2.1. if rule version is *, match all
+ if (!CommonConstants.ANY_VALUE.equals(rule.getVersion())) {
+ // 2.2. if rule version is null, target version should be null
+ if (StringUtils.isEmpty(rule.getVersion()) && !StringUtils.isEmpty(target.getVersion())) {
+ return false;
+ }
+ if (!StringUtils.isEmpty(rule.getVersion())) {
+ // 2.3. if rule version contains ',', split and match each
+ if (rule.getVersion().contains(CommonConstants.COMMA_SEPARATOR)) {
+ String[] versions = rule.getVersion().split("\\" +CommonConstants.COMMA_SEPARATOR, -1);
+ boolean match = false;
+ for (String version : versions) {
+ version = version.trim();
+ if (StringUtils.isEmpty(version) && StringUtils.isEmpty(target.getVersion())) {
+ match = true;
+ break;
+ } else if (version.equals(target.getVersion())) {
+ match = true;
+ break;
+ }
+ }
+ if (!match) {
+ return false;
+ }
+ }
+ // 2.4. if rule version is not contains ',', match directly
+ else if (!Objects.equals(rule.getVersion(), target.getVersion())) {
+ return false;
+ }
+ }
+ }
+
+ // 3. match group (wildcard match)
+ // 3.1. if rule group is *, match all
+ if (!CommonConstants.ANY_VALUE.equals(rule.getGroup())) {
+ // 3.2. if rule group is null, target group should be null
+ if (StringUtils.isEmpty(rule.getGroup()) && !StringUtils.isEmpty(target.getGroup())) {
+ return false;
+ }
+ if (!StringUtils.isEmpty(rule.getGroup())) {
+ // 3.3. if rule group contains ',', split and match each
+ if (rule.getGroup().contains(CommonConstants.COMMA_SEPARATOR)) {
+ String[] groups = rule.getGroup().split("\\" +CommonConstants.COMMA_SEPARATOR, -1);
+ boolean match = false;
+ for (String group : groups) {
+ group = group.trim();
+ if (StringUtils.isEmpty(group) && StringUtils.isEmpty(target.getGroup())) {
+ match = true;
+ break;
+ } else if (group.equals(target.getGroup())) {
+ match = true;
+ break;
+ }
+ }
+ if (!match) {
+ return false;
+ }
+ }
+ // 3.4. if rule group is not contains ',', match directly
+ else if (!Objects.equals(rule.getGroup(), target.getGroup())) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+ }
+}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/ProtocolServiceKeyMatcherTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/ProtocolServiceKeyMatcherTest.java
new file mode 100644
index 0000000000..635045c408
--- /dev/null
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/ProtocolServiceKeyMatcherTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ProtocolServiceKeyMatcherTest {
+
+ @Test
+ public void testProtocol() {
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo"),
+ new ProtocolServiceKey(null, null, null, "dubbo")
+ ));
+
+ Assertions.assertFalse(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo"),
+ new ProtocolServiceKey(null, null, null, null)
+ ));
+
+ Assertions.assertFalse(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo"),
+ new ProtocolServiceKey("DemoService", null, null, "dubbo")
+ ));
+
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, null),
+ new ProtocolServiceKey(null, null, null, "dubbo")
+ ));
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, ""),
+ new ProtocolServiceKey(null, null, null, "dubbo")
+ ));
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "*"),
+ new ProtocolServiceKey(null, null, null, "dubbo")
+ ));
+
+ Assertions.assertFalse(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo1,dubbo2"),
+ new ProtocolServiceKey(null, null, null, "dubbo")
+ ));
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo1,dubbo2"),
+ new ProtocolServiceKey(null, null, null, "dubbo1")
+ ));
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo1,dubbo2"),
+ new ProtocolServiceKey(null, null, null, "dubbo2")
+ ));
+
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo1,,dubbo2"),
+ new ProtocolServiceKey(null, null, null, null)
+ ));
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo1,,dubbo2"),
+ new ProtocolServiceKey(null, null, null, "")
+ ));
+
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, ",dubbo1,dubbo2"),
+ new ProtocolServiceKey(null, null, null, null)
+ ));
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, ",dubbo1,dubbo2"),
+ new ProtocolServiceKey(null, null, null, "")
+ ));
+
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo1,dubbo2,"),
+ new ProtocolServiceKey(null, null, null, null)
+ ));
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo1,dubbo2,"),
+ new ProtocolServiceKey(null, null, null, "")
+ ));
+
+ Assertions.assertFalse(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo1,,dubbo2"),
+ new ProtocolServiceKey(null, null, null, "dubbo")
+ ));
+ Assertions.assertFalse(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, ",dubbo1,dubbo2"),
+ new ProtocolServiceKey(null, null, null, "dubbo")
+ ));
+ Assertions.assertFalse(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo1,dubbo2,"),
+ new ProtocolServiceKey(null, null, null, "dubbo")
+ ));
+ }
+}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/ProtocolServiceKeyTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/ProtocolServiceKeyTest.java
new file mode 100644
index 0000000000..09b2711aa2
--- /dev/null
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/ProtocolServiceKeyTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ProtocolServiceKeyTest {
+ @Test
+ public void test() {
+ ProtocolServiceKey protocolServiceKey = new ProtocolServiceKey("DemoService", "1.0.0", "group1", "protocol1");
+ Assertions.assertEquals("DemoService", protocolServiceKey.getInterfaceName());
+ Assertions.assertEquals("1.0.0", protocolServiceKey.getVersion());
+ Assertions.assertEquals("group1", protocolServiceKey.getGroup());
+ Assertions.assertEquals("protocol1", protocolServiceKey.getProtocol());
+
+ Assertions.assertEquals("group1/DemoService:1.0.0:protocol1", protocolServiceKey.toString());
+ Assertions.assertEquals("group1/DemoService:1.0.0", protocolServiceKey.getServiceKeyString());
+
+ Assertions.assertEquals(protocolServiceKey, protocolServiceKey);
+
+ ProtocolServiceKey protocolServiceKey1 = new ProtocolServiceKey("DemoService", "1.0.0", "group1", "protocol1");
+ Assertions.assertEquals(protocolServiceKey, protocolServiceKey1);
+ Assertions.assertEquals(protocolServiceKey.hashCode(), protocolServiceKey1.hashCode());
+
+ ProtocolServiceKey protocolServiceKey2 = new ProtocolServiceKey("DemoService", "1.0.0", "group1", "protocol2");
+ Assertions.assertNotEquals(protocolServiceKey, protocolServiceKey2);
+
+ ProtocolServiceKey protocolServiceKey3 = new ProtocolServiceKey("DemoService", "1.0.0", "group2", "protocol1");
+ Assertions.assertNotEquals(protocolServiceKey, protocolServiceKey3);
+
+ ProtocolServiceKey protocolServiceKey4 = new ProtocolServiceKey("DemoService", "1.0.1", "group1", "protocol1");
+ Assertions.assertNotEquals(protocolServiceKey, protocolServiceKey4);
+
+ ProtocolServiceKey protocolServiceKey5 = new ProtocolServiceKey("DemoInterface", "1.0.0", "group1", "protocol1");
+ Assertions.assertNotEquals(protocolServiceKey, protocolServiceKey5);
+
+ ServiceKey serviceKey = new ServiceKey("DemoService", "1.0.0", "group1");
+ Assertions.assertNotEquals(protocolServiceKey, serviceKey);
+
+ Assertions.assertTrue(protocolServiceKey.isSameWith(protocolServiceKey));
+ Assertions.assertTrue(protocolServiceKey.isSameWith(new ProtocolServiceKey("DemoService", "1.0.0", "group1", "")));
+ Assertions.assertTrue(protocolServiceKey.isSameWith(new ProtocolServiceKey("DemoService", "1.0.0", "group1", null)));
+
+ Assertions.assertFalse(protocolServiceKey.isSameWith(new ProtocolServiceKey("DemoService", "1.0.0", "group2", "protocol1")));
+ Assertions.assertFalse(protocolServiceKey.isSameWith(new ProtocolServiceKey("DemoService", "1.0.0", "group2", "")));
+ Assertions.assertFalse(protocolServiceKey.isSameWith(new ProtocolServiceKey("DemoService", "1.0.0", "group2", null)));
+
+
+ ProtocolServiceKey protocolServiceKey6 = new ProtocolServiceKey("DemoService", "1.0.0", "group1", null);
+ Assertions.assertTrue(protocolServiceKey6.isSameWith(protocolServiceKey6));
+ Assertions.assertTrue(protocolServiceKey6.isSameWith(new ProtocolServiceKey("DemoService", "1.0.0", "group1", "")));
+ Assertions.assertTrue(protocolServiceKey6.isSameWith(new ProtocolServiceKey("DemoService", "1.0.0", "group1", "protocol1")));
+ Assertions.assertTrue(protocolServiceKey6.isSameWith(new ProtocolServiceKey("DemoService", "1.0.0", "group1", "protocol2")));
+
+ ProtocolServiceKey protocolServiceKey7 = new ProtocolServiceKey("DemoService", "1.0.0", "group1", "*");
+ Assertions.assertFalse(protocolServiceKey7.isSameWith(new ProtocolServiceKey("DemoService", "1.0.0", "group1", null)));
+ Assertions.assertFalse(protocolServiceKey7.isSameWith(new ProtocolServiceKey("DemoService", "1.0.0", "group1", "")));
+ Assertions.assertFalse(protocolServiceKey7.isSameWith(new ProtocolServiceKey("DemoService", "1.0.0", "group1", "protocol1")));
+ Assertions.assertFalse(protocolServiceKey7.isSameWith(new ProtocolServiceKey("DemoService", "1.0.0", "group1", "protocol2")));
+ }
+}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/ServiceKeyMatcherTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/ServiceKeyMatcherTest.java
new file mode 100644
index 0000000000..6d85c46b31
--- /dev/null
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/ServiceKeyMatcherTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ServiceKeyMatcherTest {
+
+ @Test
+ public void testInterface() {
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, null),
+ new ServiceKey(null, null, null)
+ ));
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey("DemoService", null, null),
+ new ServiceKey(null, null, null)
+ ));
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, null),
+ new ServiceKey("DemoService", null, null)
+ ));
+
+
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey("*", null, null),
+ new ServiceKey("DemoService", null, null)
+ ));
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey("*", null, null),
+ new ServiceKey(null, null, null)
+ ));
+ }
+
+ @Test
+ public void testVersion() {
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0", null),
+ new ServiceKey(null, "1.0.0", null)
+ ));
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, null),
+ new ServiceKey(null, null, null)
+ ));
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0", null),
+ new ServiceKey(null, null, null)
+ ));
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, null),
+ new ServiceKey(null, "1.0.0", null)
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0,1.0.1", null),
+ new ServiceKey(null, "1.0.0", null)
+ ));
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0,1.0.1", null),
+ new ServiceKey(null, "1.0.2", null)
+ ));
+
+
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0,1.0.1", null),
+ new ServiceKey(null, null, null)
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, ",1.0.0,1.0.1", null),
+ new ServiceKey(null, null, null)
+ ));
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, ",1.0.0,1.0.1", null),
+ new ServiceKey(null, "", null)
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0,,1.0.1", null),
+ new ServiceKey(null, null, null)
+ ));
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0,,1.0.1", null),
+ new ServiceKey(null, "", null)
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0,1.0.1,", null),
+ new ServiceKey(null, null, null)
+ ));
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0,1.0.1,", null),
+ new ServiceKey(null, "", null)
+ ));
+
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0,1.0.1", null),
+ new ServiceKey(null, null, null)
+ ));
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0,1.0.1", null),
+ new ServiceKey(null, "", null)
+ ));
+
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, ",1.0.0,1.0.1", null),
+ new ServiceKey(null, "1.0.2", null)
+ ));
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, ",1.0.0,1.0.1", null),
+ new ServiceKey(null, "1.0.2", null)
+ ));
+
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "*", null),
+ new ServiceKey(null, null, null)
+ ));
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "*", null),
+ new ServiceKey(null, "", null)
+ ));
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "*", null),
+ new ServiceKey(null, "1.0.0", null)
+ ));
+ }
+
+ @Test
+ public void testGroup() {
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "group1"),
+ new ServiceKey(null, null, "group1")
+ ));
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "group1"),
+ new ServiceKey(null, null, null)
+ ));
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, null),
+ new ServiceKey(null, null, "group1")
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "group1, group2"),
+ new ServiceKey(null, null, "group1")
+ ));
+
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "group2, group3"),
+ new ServiceKey(null, null, "group1")
+ ));
+
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "group2, group3"),
+ new ServiceKey(null, null, null)
+ ));
+
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "group2, group3"),
+ new ServiceKey(null, null, "")
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, ",group2"),
+ new ServiceKey(null, null, "")
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "group2,"),
+ new ServiceKey(null, null, "")
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "group2, ,group3"),
+ new ServiceKey(null, null, "")
+ ));
+
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, ",group2"),
+ new ServiceKey(null, null, "group1")
+ ));
+
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "group2,"),
+ new ServiceKey(null, null, "group1")
+ ));
+
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "group2, ,group3"),
+ new ServiceKey(null, null, "group1")
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "*"),
+ new ServiceKey(null, null, "")
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "*"),
+ new ServiceKey(null, null, null)
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "*"),
+ new ServiceKey(null, null, "group1")
+ ));
+ }
+}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/ServiceKeyTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/ServiceKeyTest.java
new file mode 100644
index 0000000000..5643166ffa
--- /dev/null
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/ServiceKeyTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ServiceKeyTest {
+ @Test
+ public void test() {
+ ServiceKey serviceKey = new ServiceKey("DemoService", "1.0.0", "group1");
+
+ Assertions.assertEquals("DemoService", serviceKey.getInterfaceName());
+ Assertions.assertEquals("1.0.0", serviceKey.getVersion());
+ Assertions.assertEquals("group1", serviceKey.getGroup());
+
+ Assertions.assertEquals("group1/DemoService:1.0.0", serviceKey.toString());
+ Assertions.assertEquals("DemoService", new ServiceKey("DemoService", null, null).toString());
+ Assertions.assertEquals("DemoService:1.0.0", new ServiceKey("DemoService", "1.0.0", null).toString());
+ Assertions.assertEquals("group1/DemoService", new ServiceKey("DemoService", null, "group1").toString());
+
+ Assertions.assertEquals(serviceKey, serviceKey);
+
+ ServiceKey serviceKey1 = new ServiceKey("DemoService", "1.0.0", "group1");
+ Assertions.assertEquals(serviceKey, serviceKey1);
+ Assertions.assertEquals(serviceKey.hashCode(), serviceKey1.hashCode());
+
+ ServiceKey serviceKey2 = new ServiceKey("DemoService", "1.0.0", "group2");
+ Assertions.assertNotEquals(serviceKey, serviceKey2);
+
+ ServiceKey serviceKey3 = new ServiceKey("DemoService", "1.0.1", "group1");
+ Assertions.assertNotEquals(serviceKey, serviceKey3);
+
+ ServiceKey serviceKey4 = new ServiceKey("DemoInterface", "1.0.0", "group1");
+ Assertions.assertNotEquals(serviceKey, serviceKey4);
+
+ ProtocolServiceKey protocolServiceKey = new ProtocolServiceKey("DemoService", "1.0.0", "group1", "protocol1");
+ Assertions.assertNotEquals(serviceKey, protocolServiceKey);
+ }
+}
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
index 2735b931ee..0465fbb228 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
@@ -16,6 +16,7 @@
*/
package org.apache.dubbo.metadata;
+import org.apache.dubbo.common.ProtocolServiceKey;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
@@ -437,6 +438,7 @@ public class MetadataInfo implements Serializable {
private String group;
private String version;
private String protocol;
+ private int port = -1;
private String path; // most of the time, path is the same with the interface name.
private Map<String, String> params;
@@ -453,12 +455,14 @@ public class MetadataInfo implements Serializable {
// service + group + version + protocol
private volatile transient String matchKey;
+ private volatile transient ProtocolServiceKey protocolServiceKey;
+
private transient URL url;
public ServiceInfo() {}
public ServiceInfo(URL url, List<MetadataParamsFilter> filters) {
- this(url.getServiceInterface(), url.getGroup(), url.getVersion(), url.getProtocol(), url.getPath(), null);
+ this(url.getServiceInterface(), url.getGroup(), url.getVersion(), url.getProtocol(), url.getPort(), url.getPath(), null);
this.url = url;
Map<String, String> params = extractServiceParams(url, filters);
// initialize method params caches.
@@ -466,11 +470,12 @@ public class MetadataInfo implements Serializable {
this.consumerMethodParams = URLParam.initMethodParameters(consumerParams);
}
- public ServiceInfo(String name, String group, String version, String protocol, String path, Map<String, String> params) {
+ public ServiceInfo(String name, String group, String version, String protocol, int port, String path, Map<String, String> params) {
this.name = name;
this.group = group;
this.version = version;
this.protocol = protocol;
+ this.port = port;
this.path = path;
this.params = params == null ? new ConcurrentHashMap<>() : params;
@@ -577,6 +582,14 @@ public class MetadataInfo implements Serializable {
return matchKey;
}
+ public ProtocolServiceKey getProtocolServiceKey() {
+ if (protocolServiceKey != null) {
+ return protocolServiceKey;
+ }
+ protocolServiceKey = new ProtocolServiceKey(name, version, group, protocol);
+ return protocolServiceKey;
+ }
+
private String buildServiceKey(String name, String group, String version) {
this.serviceKey = URL.buildKey(name, group, version);
return this.serviceKey;
@@ -630,6 +643,14 @@ public class MetadataInfo implements Serializable {
this.protocol = protocol;
}
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
public Map<String, String> getParams() {
if (params == null) {
return Collections.emptyMap();
@@ -759,12 +780,13 @@ public class MetadataInfo implements Serializable {
&& Objects.equals(this.getGroup(), serviceInfo.getGroup())
&& Objects.equals(this.getName(), serviceInfo.getName())
&& Objects.equals(this.getProtocol(), serviceInfo.getProtocol())
+ && Objects.equals(this.getPort(), serviceInfo.getPort())
&& this.getParams().equals(serviceInfo.getParams());
}
@Override
public int hashCode() {
- return Objects.hash(getVersion(), getGroup(), getName(), getProtocol(), getParams());
+ return Objects.hash(getVersion(), getGroup(), getName(), getProtocol(), getPort(), getParams());
}
@Override
@@ -778,6 +800,7 @@ public class MetadataInfo implements Serializable {
"group='" + group + "'," +
"version='" + version + "'," +
"protocol='" + protocol + "'," +
+ "port='" + port + "'," +
"params=" + params + "," +
"}";
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
index 074811beed..432a77738a 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
@@ -245,6 +245,12 @@ public class DefaultServiceInstance implements ServiceInstance {
return copyOfInstance;
}
+ public DefaultServiceInstance copyFrom(int port) {
+ DefaultServiceInstance copyOfInstance = new DefaultServiceInstance(this);
+ copyOfInstance.setPort(port);
+ return copyOfInstance;
+ }
+
@Override
public Map<String, String> getAllParams() {
if (extendParams == null) {
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
index 8e8b2f9424..225b6e6e2e 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.registry.client;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.url.component.ServiceConfigURL;
import org.apache.dubbo.common.url.component.URLAddress;
import org.apache.dubbo.common.url.component.URLParam;
import org.apache.dubbo.common.utils.CollectionUtils;
@@ -126,6 +127,26 @@ public class InstanceAddressURL extends URL {
return RpcContext.getServiceContext().getServiceKey();
}
+ @Override
+ public URL setProtocol(String protocol) {
+ return new ServiceConfigURL(protocol, getUsername(), getPassword(), getHost(), getPort(), getPath(), getParameters(), attributes);
+ }
+
+ @Override
+ public URL setHost(String host) {
+ return new ServiceConfigURL(getProtocol(), getUsername(), getPassword(), host, getPort(), getPath(), getParameters(), attributes);
+ }
+
+ @Override
+ public URL setPort(int port) {
+ return new ServiceConfigURL(getProtocol(), getUsername(), getPassword(), getHost(), port, getPath(), getParameters(), attributes);
+ }
+
+ @Override
+ public URL setPath(String path) {
+ return new ServiceConfigURL(getProtocol(), getUsername(), getPassword(), getHost(), getPort(), path, getParameters(), attributes);
+ }
+
@Override
public String getAddress() {
return instance.getAddress();
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
index 30eb983edd..a1aaf01cfe 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
@@ -299,8 +299,8 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
protected void subscribeURLs(URL url, NotifyListener listener, Set<String> serviceNames) {
serviceNames = toTreeSet(serviceNames);
String serviceNamesKey = toStringKeys(serviceNames);
- String protocolServiceKey = url.getProtocolServiceKey();
- logger.info(String.format("Trying to subscribe from apps %s for service key %s, ", serviceNamesKey, protocolServiceKey));
+ String serviceKey = url.getServiceKey();
+ logger.info(String.format("Trying to subscribe from apps %s for service key %s, ", serviceNamesKey, serviceKey));
// register ServiceInstancesChangedListener
Lock appSubscriptionLock = getAppSubscription(serviceNamesKey);
@@ -322,7 +322,7 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
if (!serviceInstancesChangedListener.isDestroyed()) {
serviceInstancesChangedListener.setUrl(url);
listener.addServiceListener(serviceInstancesChangedListener);
- serviceInstancesChangedListener.addListenerAndNotify(protocolServiceKey, listener);
+ serviceInstancesChangedListener.addListenerAndNotify(url, listener);
serviceDiscovery.addServiceInstancesChangedListener(serviceInstancesChangedListener);
} else {
logger.info(String.format("Listener of %s has been destroyed by another thread.", serviceNamesKey));
@@ -398,7 +398,7 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
Lock appSubscriptionLock = getAppSubscription(appKey);
try {
appSubscriptionLock.lock();
- oldListener.removeListener(url.getProtocolServiceKey(), listener);
+ oldListener.removeListener(url.getServiceKey(), listener);
if (!oldListener.hasListeners()) {
oldListener.destroy();
removeAppSubscriptionLock(appKey);
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
index bfe1b8b383..7bb611e3d3 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
@@ -16,8 +16,10 @@
*/
package org.apache.dubbo.registry.client;
+import org.apache.dubbo.common.ProtocolServiceKey;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
+import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
@@ -30,26 +32,35 @@ import org.apache.dubbo.registry.Constants;
import org.apache.dubbo.registry.ProviderFirstParams;
import org.apache.dubbo.registry.integration.AbstractConfiguratorListener;
import org.apache.dubbo.registry.integration.DynamicDirectory;
+import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
+import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcServiceContext;
import org.apache.dubbo.rpc.cluster.Configurator;
import org.apache.dubbo.rpc.cluster.RouterChain;
+import org.apache.dubbo.rpc.cluster.directory.StaticDirectory;
import org.apache.dubbo.rpc.cluster.router.state.BitList;
import org.apache.dubbo.rpc.model.ModuleModel;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import static org.apache.dubbo.common.constants.CommonConstants.DISABLED_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.ENABLED_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_HASHMAP_LOAD_FACTOR;
import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_TYPE_KEY;
@@ -64,13 +75,15 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> {
* instance address to invoker mapping.
* The initial value is null and the midway may be assigned to null, please use the local variable reference
*/
- private volatile Map<String, Invoker<T>> urlInvokerMap;
+ private volatile Map<ProtocolServiceKeyWithAddress, Invoker<T>> urlInvokerMap;
private volatile ReferenceConfigurationListener referenceConfigurationListener;
private volatile boolean enableConfigurationListen = true;
private volatile List<URL> originalUrls = null;
private volatile Map<String, String> overrideQueryMap;
private final Set<String> providerFirstParams;
private final ModuleModel moduleModel;
+ private final ProtocolServiceKey consumerProtocolServiceKey;
+ private final Map<ProtocolServiceKey, URL> customizedConsumerUrlMap = new ConcurrentHashMap<>();
public ServiceDiscoveryRegistryDirectory(Class<T> serviceType, URL url) {
super(serviceType, url);
@@ -94,6 +107,9 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> {
}
}
+ String protocol = consumerUrl.getParameter(PROTOCOL_KEY, consumerUrl.getProtocol());
+ consumerProtocolServiceKey = new ProtocolServiceKey(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(),
+ !CommonConstants.CONSUMER.equals(protocol) ? protocol : null);
}
@Override
@@ -224,15 +240,15 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> {
}
// use local reference to avoid NPE as this.urlInvokerMap will be set null concurrently at destroyAllInvokers().
- Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
+ Map<ProtocolServiceKeyWithAddress, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
// can't use local reference as oldUrlInvokerMap's mappings might be removed directly at toInvokers().
- Map<String, Invoker<T>> oldUrlInvokerMap = null;
+ Map<ProtocolServiceKeyWithAddress, Invoker<T>> oldUrlInvokerMap = null;
if (localUrlInvokerMap != null) {
// the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.
oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + localUrlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));
localUrlInvokerMap.forEach(oldUrlInvokerMap::put);
}
- Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map
+ Map<ProtocolServiceKeyWithAddress, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map
logger.info("Refreshed invoker size " + newUrlInvokerMap.size());
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
@@ -266,11 +282,12 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> {
* @param urls
* @return invokers
*/
- private Map<String, Invoker<T>> toInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, List<URL> urls) {
- Map<String, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1));
+ private Map<ProtocolServiceKeyWithAddress, Invoker<T>> toInvokers(Map<ProtocolServiceKeyWithAddress, Invoker<T>> oldUrlInvokerMap, List<URL> urls) {
+ Map<ProtocolServiceKeyWithAddress, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1));
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
+
for (URL url : urls) {
InstanceAddressURL instanceAddressURL = (InstanceAddressURL) url;
if (EMPTY_PROTOCOL.equals(instanceAddressURL.getProtocol())) {
@@ -291,33 +308,59 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> {
instanceAddressURL = overrideWithConfigurator(instanceAddressURL);
}
- Invoker<T> invoker = oldUrlInvokerMap == null ? null : oldUrlInvokerMap.get(instanceAddressURL.getAddress());
- if (invoker == null || urlChanged(invoker, instanceAddressURL)) { // Not in the cache, refer again
- try {
- boolean enabled = true;
- if (instanceAddressURL.hasParameter(DISABLED_KEY)) {
- enabled = !instanceAddressURL.getParameter(DISABLED_KEY, false);
- } else {
- enabled = instanceAddressURL.getParameter(ENABLED_KEY, true);
+ // filter all the service available (version wildcard, group wildcard, protocol wildcard)
+ int port = instanceAddressURL.getPort();
+ List<ProtocolServiceKey> matchedProtocolServiceKeys = instanceAddressURL.getMetadataInfo()
+ .getServices().values()
+ .stream()
+ .filter(serviceInfo -> serviceInfo.getPort() <= 0 || serviceInfo.getPort() == port)
+ .map(MetadataInfo.ServiceInfo::getProtocolServiceKey)
+ .filter(key -> ProtocolServiceKey.Matcher.isMatch(consumerProtocolServiceKey, key))
+ .collect(Collectors.toList());
+
+ // see org.apache.dubbo.common.ProtocolServiceKey.isSameWith
+ // check if needed to override the consumer url
+ boolean shouldWrap = matchedProtocolServiceKeys.size() != 1 || !consumerProtocolServiceKey.isSameWith(matchedProtocolServiceKeys.get(0));
+
+ for (ProtocolServiceKey matchedProtocolServiceKey : matchedProtocolServiceKeys) {
+ ProtocolServiceKeyWithAddress protocolServiceKeyWithAddress = new ProtocolServiceKeyWithAddress(matchedProtocolServiceKey, instanceAddressURL.getAddress());
+ Invoker<T> invoker = oldUrlInvokerMap == null ? null : oldUrlInvokerMap.get(protocolServiceKeyWithAddress);
+ if (invoker == null || urlChanged(invoker, instanceAddressURL, matchedProtocolServiceKey)) { // Not in the cache, refer again
+ try {
+ boolean enabled;
+ if (instanceAddressURL.hasParameter(DISABLED_KEY)) {
+ enabled = !instanceAddressURL.getParameter(DISABLED_KEY, false);
+ } else {
+ enabled = instanceAddressURL.getParameter(ENABLED_KEY, true);
+ }
+ if (enabled) {
+ if (shouldWrap) {
+ URL newConsumerUrl = customizedConsumerUrlMap.computeIfAbsent(matchedProtocolServiceKey,
+ k -> consumerUrl.setProtocol(k.getProtocol())
+ .addParameter(CommonConstants.GROUP_KEY, k.getGroup())
+ .addParameter(CommonConstants.VERSION_KEY, k.getVersion()));
+ RpcContext.getServiceContext().setConsumerUrl(newConsumerUrl);
+ invoker = new InstanceWrappedInvoker<>(protocol.refer(serviceType, instanceAddressURL), newConsumerUrl, matchedProtocolServiceKey);
+ } else {
+ invoker = protocol.refer(serviceType, instanceAddressURL);
+ }
+ }
+ } catch (Throwable t) {
+ logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + instanceAddressURL + ")" + t.getMessage(), t);
}
- if (enabled) {
- invoker = protocol.refer(serviceType, instanceAddressURL);
+ if (invoker != null) { // Put new invoker in cache
+ newUrlInvokerMap.put(protocolServiceKeyWithAddress, invoker);
}
- } catch (Throwable t) {
- logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + instanceAddressURL + ")" + t.getMessage(), t);
+ } else {
+ newUrlInvokerMap.put(protocolServiceKeyWithAddress, invoker);
+ oldUrlInvokerMap.remove(protocolServiceKeyWithAddress, invoker);
}
- if (invoker != null) { // Put new invoker in cache
- newUrlInvokerMap.put(instanceAddressURL.getAddress(), invoker);
- }
- } else {
- newUrlInvokerMap.put(instanceAddressURL.getAddress(), invoker);
- oldUrlInvokerMap.remove(instanceAddressURL.getAddress(), invoker);
}
}
return newUrlInvokerMap;
}
- private boolean urlChanged(Invoker<T> invoker, InstanceAddressURL newURL) {
+ private boolean urlChanged(Invoker<T> invoker, InstanceAddressURL newURL, ProtocolServiceKey protocolServiceKey) {
InstanceAddressURL oldURL = (InstanceAddressURL) invoker.getUrl();
if (!newURL.getInstance().equals(oldURL.getInstance())) {
@@ -335,16 +378,35 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> {
}
}
- MetadataInfo.ServiceInfo oldServiceInfo = oldURL.getMetadataInfo().getValidServiceInfo(getConsumerUrl().getProtocolServiceKey());
+ MetadataInfo.ServiceInfo oldServiceInfo = oldURL.getMetadataInfo().getValidServiceInfo(protocolServiceKey.toString());
if (null == oldServiceInfo) {
return false;
}
- return !oldServiceInfo.equals(newURL.getMetadataInfo().getValidServiceInfo(getConsumerUrl().getProtocolServiceKey()));
+ return !oldServiceInfo.equals(newURL.getMetadataInfo().getValidServiceInfo(protocolServiceKey.toString()));
}
private List<Invoker<T>> toMergeInvokerList(List<Invoker<T>> invokers) {
- return invokers;
+ List<Invoker<T>> mergedInvokers = new ArrayList<>();
+ Map<String, List<Invoker<T>>> groupMap = new HashMap<>();
+ for (Invoker<T> invoker : invokers) {
+ String group = invoker.getUrl().getGroup("");
+ groupMap.computeIfAbsent(group, k -> new ArrayList<>());
+ groupMap.get(group).add(invoker);
+ }
+
+ if (groupMap.size() == 1) {
+ mergedInvokers.addAll(groupMap.values().iterator().next());
+ } else if (groupMap.size() > 1) {
+ for (List<Invoker<T>> groupList : groupMap.values()) {
+ StaticDirectory<T> staticDirectory = new StaticDirectory<>(groupList);
+ staticDirectory.buildRouterChain();
+ mergedInvokers.add(cluster.join(staticDirectory, false));
+ }
+ } else {
+ mergedInvokers = invokers;
+ }
+ return mergedInvokers;
}
/**
@@ -352,7 +414,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> {
*/
@Override
protected void destroyAllInvokers() {
- Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
+ Map<ProtocolServiceKeyWithAddress, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
if (localUrlInvokerMap != null) {
for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) {
try {
@@ -375,7 +437,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> {
* @param oldUrlInvokerMap
* @param newUrlInvokerMap
*/
- private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) {
+ private void destroyUnusedInvokers(Map<ProtocolServiceKeyWithAddress, Invoker<T>> oldUrlInvokerMap, Map<ProtocolServiceKeyWithAddress, Invoker<T>> newUrlInvokerMap) {
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
destroyAllInvokers();
return;
@@ -385,7 +447,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> {
return;
}
- for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
+ for (Map.Entry<ProtocolServiceKeyWithAddress, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
Invoker<T> invoker = entry.getValue();
if (invoker != null) {
try {
@@ -461,4 +523,88 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> {
});
}
}
+
+ public static final class ProtocolServiceKeyWithAddress extends ProtocolServiceKey {
+ private final String address;
+
+ public ProtocolServiceKeyWithAddress(ProtocolServiceKey protocolServiceKey, String address) {
+ super(protocolServiceKey.getInterfaceName(), protocolServiceKey.getVersion(), protocolServiceKey.getGroup(), protocolServiceKey.getProtocol());
+ this.address = address;
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ ProtocolServiceKeyWithAddress that = (ProtocolServiceKeyWithAddress) o;
+ return Objects.equals(address, that.address);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), address);
+ }
+ }
+
+ public static final class InstanceWrappedInvoker<T> implements Invoker<T> {
+ private final Invoker<T> originInvoker;
+ private final URL newConsumerUrl;
+ private final ProtocolServiceKey protocolServiceKey;
+
+ public InstanceWrappedInvoker(Invoker<T> originInvoker, URL newConsumerUrl, ProtocolServiceKey protocolServiceKey) {
+ this.originInvoker = originInvoker;
+ this.newConsumerUrl = newConsumerUrl;
+ this.protocolServiceKey = protocolServiceKey;
+ }
+
+ @Override
+ public Class<T> getInterface() {
+ return originInvoker.getInterface();
+ }
+
+ @Override
+ public Result invoke(Invocation invocation) throws RpcException {
+ // override consumer url with real protocol service key
+ RpcContext.getServiceContext().setConsumerUrl(newConsumerUrl);
+ // recreate invocation due to the protocol service key changed
+ RpcInvocation copiedInvocation = new RpcInvocation(invocation.getTargetServiceUniqueName(),
+ invocation.getServiceModel(), invocation.getMethodName(), invocation.getServiceName(), protocolServiceKey.toString(),
+ invocation.getParameterTypes(), invocation.getArguments(), invocation.getObjectAttachments(),
+ invocation.getInvoker(), invocation.getAttributes(),
+ invocation instanceof RpcInvocation ? ((RpcInvocation) invocation).getInvokeMode() : null);
+ copiedInvocation.setObjectAttachment(CommonConstants.GROUP_KEY, protocolServiceKey.getGroup());
+ copiedInvocation.setObjectAttachment(CommonConstants.VERSION_KEY, protocolServiceKey.getVersion());
+ return originInvoker.invoke(copiedInvocation);
+ }
+
+ @Override
+ public URL getUrl() {
+ RpcContext.getServiceContext().setConsumerUrl(newConsumerUrl);
+ return originInvoker.getUrl();
+ }
+
+ @Override
+ public boolean isAvailable() {
+ RpcContext.getServiceContext().setConsumerUrl(newConsumerUrl);
+ return originInvoker.isAvailable();
+ }
+
+ @Override
+ public void destroy() {
+ RpcContext.getServiceContext().setConsumerUrl(newConsumerUrl);
+ originInvoker.destroy();
+ }
+ }
+
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index 486fd90868..0ec60af5ae 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -16,14 +16,15 @@
*/
package org.apache.dubbo.registry.client.event.listener;
+import org.apache.dubbo.common.ProtocolServiceKey;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;
+import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
-import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.metadata.MetadataInfo.ServiceInfo;
import org.apache.dubbo.registry.NotifyListener;
@@ -38,7 +39,6 @@ import org.apache.dubbo.rpc.model.ScopeModelUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -52,8 +52,6 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import static java.util.Collections.emptySet;
-import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPARATOR;
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
import static org.apache.dubbo.common.constants.RegistryConstants.ENABLE_EMPTY_PROTECTION_KEY;
@@ -77,7 +75,7 @@ public class ServiceInstancesChangedListener {
protected AtomicBoolean destroyed = new AtomicBoolean(false);
protected Map<String, List<ServiceInstance>> allInstances;
- protected Map<String, Object> serviceUrls;
+ protected Map<String, List<ProtocolServiceKeyWithUrls>> serviceUrls;
private volatile long lastRefreshTime;
private final Semaphore retryPermission;
@@ -96,8 +94,7 @@ public class ServiceInstancesChangedListener {
this.allInstances = new HashMap<>();
this.serviceUrls = new HashMap<>();
retryPermission = new Semaphore(1);
- this.scheduler = ScopeModelUtil.getApplicationModel(serviceDiscovery == null || serviceDiscovery.getUrl() == null ? null : serviceDiscovery.getUrl().getScopeModel())
- .getBeanFactory().getBean(FrameworkExecutorRepository.class).getMetadataRetryExecutor();
+ this.scheduler = ScopeModelUtil.getApplicationModel(serviceDiscovery == null || serviceDiscovery.getUrl() == null ? null : serviceDiscovery.getUrl().getScopeModel()).getBeanFactory().getBean(FrameworkExecutorRepository.class).getMetadataRetryExecutor();
}
/**
@@ -127,7 +124,7 @@ public class ServiceInstancesChangedListener {
}
Map<String, List<ServiceInstance>> revisionToInstances = new HashMap<>();
- Map<String, Map<String, Set<String>>> localServiceToRevisions = new HashMap<>();
+ Map<ServiceInfo, Set<String>> localServiceToRevisions = new HashMap<>();
// grouping all instances of this app(service name) by revision
for (Map.Entry<String, List<ServiceInstance>> entry : allInstances.entrySet()) {
@@ -179,55 +176,45 @@ public class ServiceInstancesChangedListener {
}
hasEmptyMetadata = false;
- Map<String, Map<Set<String>, Object>> protocolRevisionsToUrls = new HashMap<>();
- Map<String, Object> newServiceUrls = new HashMap<>();
- for (Map.Entry<String, Map<String, Set<String>>> entry : localServiceToRevisions.entrySet()) {
- String protocol = entry.getKey();
- entry.getValue().forEach((protocolServiceKey, revisions) -> {
- Map<Set<String>, Object> revisionsToUrls = protocolRevisionsToUrls.computeIfAbsent(protocol, k -> new HashMap<>());
- Object urls = revisionsToUrls.get(revisions);
- if (urls == null) {
- urls = getServiceUrlsCache(revisionToInstances, revisions, protocol);
- revisionsToUrls.put(revisions, urls);
- }
+ Map<String, Map<Integer, Map<Set<String>, Object>>> protocolRevisionsToUrls = new HashMap<>();
+ Map<String, List<ProtocolServiceKeyWithUrls>> newServiceUrls = new HashMap<>();
+ for (Map.Entry<ServiceInfo, Set<String>> entry : localServiceToRevisions.entrySet()) {
+ ServiceInfo serviceInfo = entry.getKey();
+ Set<String> revisions = entry.getValue();
+
+ Map<Integer, Map<Set<String>, Object>> portToRevisions = protocolRevisionsToUrls.computeIfAbsent(serviceInfo.getProtocol(), k -> new HashMap<>());
+ Map<Set<String>, Object> revisionsToUrls = portToRevisions.computeIfAbsent(serviceInfo.getPort(), k -> new HashMap<>());
+ Object urls = revisionsToUrls.get(revisions);
+ if (urls == null) {
+ urls = getServiceUrlsCache(revisionToInstances, revisions, serviceInfo.getProtocol(), serviceInfo.getPort());
+ revisionsToUrls.put(revisions, urls);
+ }
- newServiceUrls.put(protocolServiceKey, urls);
- });
+ List<ProtocolServiceKeyWithUrls> list = newServiceUrls.computeIfAbsent(serviceInfo.getPath(), k -> new LinkedList<>());
+ list.add(new ProtocolServiceKeyWithUrls(serviceInfo.getProtocolServiceKey(), (List<URL>) urls));
}
this.serviceUrls = newServiceUrls;
this.notifyAddressChanged();
}
- public synchronized void addListenerAndNotify(String serviceKey, NotifyListener listener) {
+ public synchronized void addListenerAndNotify(URL url, NotifyListener listener) {
if (destroyed.get()) {
return;
}
- Set<NotifyListenerWithKey> notifyListeners = this.listeners.computeIfAbsent(serviceKey, _k -> new ConcurrentHashSet<>());
- // {@code protocolServiceKeysToConsume} will be specific protocols configured in reference config or default protocols supported by framework.
- Set<String> protocolServiceKeysToConsume = getProtocolServiceKeyList(serviceKey, listener);
- // Add current listener to serviceKey set, there will have more than one listener when multiple references of one same service is configured.
- NotifyListenerWithKey listenerWithKey = new NotifyListenerWithKey(serviceKey, protocolServiceKeysToConsume, listener);
+ Set<NotifyListenerWithKey> notifyListeners = this.listeners.computeIfAbsent(url.getServiceKey(), _k -> new ConcurrentHashSet<>());
+ String protocol = listener.getConsumerUrl().getParameter(PROTOCOL_KEY, url.getProtocol());
+ ProtocolServiceKey protocolServiceKey = new ProtocolServiceKey(url.getServiceInterface(), url.getVersion(), url.getGroup(),
+ !CommonConstants.CONSUMER.equals(protocol) ? protocol : null);
+ NotifyListenerWithKey listenerWithKey = new NotifyListenerWithKey(protocolServiceKey, listener);
notifyListeners.add(listenerWithKey);
// Aggregate address and notify on subscription.
- List<URL> urls;
- if (protocolServiceKeysToConsume.size() > 1) {
- urls = new ArrayList<>();
- for (String protocolServiceKey : protocolServiceKeysToConsume) {
- List<URL> urlsOfProtocol = getAddresses(protocolServiceKey, listener.getConsumerUrl());
- if (CollectionUtils.isNotEmpty(urlsOfProtocol)) {
- logger.info(String.format("Found %s urls of protocol service key %s ", urlsOfProtocol.size(), protocolServiceKey));
- urls.addAll(urlsOfProtocol);
- }
- }
- } else {
- urls = getAddresses(protocolServiceKeysToConsume.iterator().next(), listener.getConsumerUrl());
- }
+ List<URL> urls = getAddresses(protocolServiceKey, listener.getConsumerUrl());
if (CollectionUtils.isNotEmpty(urls)) {
- logger.info(String.format("Notify serviceKey: %s, listener: %s with %s urls on subscription", serviceKey, listener, urls.size()));
+ logger.info(String.format("Notify serviceKey: %s, listener: %s with %s urls on subscription", protocolServiceKey, listener, urls.size()));
listener.notify(urls);
}
}
@@ -240,9 +227,7 @@ public class ServiceInstancesChangedListener {
// synchronized method, no need to use DCL
Set<NotifyListenerWithKey> notifyListeners = this.listeners.get(serviceKey);
if (notifyListeners != null) {
- NotifyListenerWithKey listenerWithKey = new NotifyListenerWithKey(serviceKey, notifyListener);
- // Remove from global listeners
- notifyListeners.remove(listenerWithKey);
+ notifyListeners.removeIf(listener -> listener.getNotifyListener().equals(notifyListener));
// ServiceKey has no listener, remove set
if (notifyListeners.size() == 0) {
@@ -341,23 +326,28 @@ public class ServiceInstancesChangedListener {
return emptyMetadataNum;
}
- protected Map<String, Map<String, Set<String>>> parseMetadata(String revision, MetadataInfo metadata, Map<String, Map<String, Set<String>>> localServiceToRevisions) {
+ protected Map<ServiceInfo, Set<String>> parseMetadata(String revision, MetadataInfo metadata, Map<ServiceInfo, Set<String>> localServiceToRevisions) {
Map<String, ServiceInfo> serviceInfos = metadata.getServices();
for (Map.Entry<String, ServiceInfo> entry : serviceInfos.entrySet()) {
- String protocol = entry.getValue().getProtocol();
- String protocolServiceKey = entry.getValue().getMatchKey();
- Map<String, Set<String>> map = localServiceToRevisions.computeIfAbsent(protocol, _p -> new HashMap<>());
- Set<String> set = map.computeIfAbsent(protocolServiceKey, _k -> new TreeSet<>());
+ Set<String> set = localServiceToRevisions.computeIfAbsent(entry.getValue(), _k -> new TreeSet<>());
set.add(revision);
}
return localServiceToRevisions;
}
- protected Object getServiceUrlsCache(Map<String, List<ServiceInstance>> revisionToInstances, Set<String> revisions, String protocol) {
+ protected Object getServiceUrlsCache(Map<String, List<ServiceInstance>> revisionToInstances, Set<String> revisions, String protocol, int port) {
List<URL> urls = new ArrayList<>();
for (String r : revisions) {
for (ServiceInstance i : revisionToInstances.get(r)) {
+ if (port > 0) {
+ if (i.getPort() == port) {
+ urls.add(i.toURL(protocol).setScopeModel(i.getApplicationModel()));
+ } else {
+ urls.add(((DefaultServiceInstance) i).copyFrom(port).toURL(protocol).setScopeModel(i.getApplicationModel()));
+ }
+ continue;
+ }
// different protocols may have ports specified in meta
if (ServiceInstanceMetadataUtils.hasEndpoints(i)) {
DefaultServiceInstance.Endpoint endpoint = ServiceInstanceMetadataUtils.getEndpoint(i, protocol);
@@ -372,8 +362,17 @@ public class ServiceInstancesChangedListener {
return urls;
}
- protected List<URL> getAddresses(String serviceProtocolKey, URL consumerURL) {
- return (List<URL>) serviceUrls.get(serviceProtocolKey);
+ protected List<URL> getAddresses(ProtocolServiceKey protocolServiceKey, URL consumerURL) {
+ List<ProtocolServiceKeyWithUrls> protocolServiceKeyWithUrlsList = serviceUrls.get(protocolServiceKey.getInterfaceName());
+ List<URL> urls = new ArrayList<>();
+ if (protocolServiceKeyWithUrlsList != null) {
+ for (ProtocolServiceKeyWithUrls protocolServiceKeyWithUrls : protocolServiceKeyWithUrlsList) {
+ if (ProtocolServiceKey.Matcher.isMatch(protocolServiceKey, protocolServiceKeyWithUrls.getProtocolServiceKey())) {
+ urls.addAll(protocolServiceKeyWithUrls.getUrls());
+ }
+ }
+ }
+ return urls;
}
/**
@@ -385,28 +384,10 @@ public class ServiceInstancesChangedListener {
// 2 multiple subscription listener of the same service
for (NotifyListenerWithKey listenerWithKey : listenerSet) {
NotifyListener notifyListener = listenerWithKey.getNotifyListener();
- if (listenerWithKey.getProtocolServiceKeys().size() == 1) {// 2.1 if one specific protocol is specified
- String protocolServiceKey = listenerWithKey.getProtocolServiceKeys().iterator().next();
- //FIXME, group wildcard match
- List<URL> urls = toUrlsWithEmpty(getAddresses(protocolServiceKey, notifyListener.getConsumerUrl()));
- logger.info("Notify service " + protocolServiceKey + " with urls " + urls.size());
- notifyListener.notify(urls);
- } else {// 2.2 multiple protocols or no protocol(using default protocols) set
- List<URL> urls = new ArrayList<>();
- int effectiveProtocolNum = 0;
- for (String protocolServiceKey : listenerWithKey.getProtocolServiceKeys()) {
- List<URL> tmpUrls = getAddresses(protocolServiceKey, notifyListener.getConsumerUrl());
- if (CollectionUtils.isNotEmpty(tmpUrls)) {
- logger.info("Found " + urls.size() + " urls of protocol service key " + protocolServiceKey);
- effectiveProtocolNum++;
- urls.addAll(tmpUrls);
- }
- }
- logger.info("Notify service " + serviceKey + " with " + urls.size() + " urls from " + effectiveProtocolNum + " different protocols");
- urls = toUrlsWithEmpty(urls);
- notifyListener.notify(urls);
- }
+ List<URL> urls = toUrlsWithEmpty(getAddresses(listenerWithKey.getProtocolServiceKey(), notifyListener.getConsumerUrl()));
+ logger.info("Notify service " + listenerWithKey.getProtocolServiceKey() + " with urls " + urls.size());
+ notifyListener.notify(urls);
}
});
}
@@ -421,9 +402,7 @@ public class ServiceInstancesChangedListener {
if (CollectionUtils.isEmpty(urls) && !emptyProtectionEnabled) {
// notice that the service of this.url may not be the same as notify listener.
- URL empty = URLBuilder.from(this.url)
- .setProtocol(EMPTY_PROTOCOL)
- .build();
+ URL empty = URLBuilder.from(this.url).setProtocol(EMPTY_PROTOCOL).build();
urls.add(empty);
}
return urls;
@@ -468,46 +447,6 @@ public class ServiceInstancesChangedListener {
return Objects.hash(getClass(), getServiceNames());
}
- /**
- * Calculate the protocol list that the consumer cares about.
- *
- * @param serviceKey possible input serviceKey includes
- * 1. {group}/{interface}:{version}, if protocol is not specified
- * 2. {group}/{interface}:{version}:{user specified protocols}
- * @param listener listener also contains the user specified protocols
- * @return protocol list with the format {group}/{interface}:{version}:{protocol}
- */
- protected Set<String> getProtocolServiceKeyList(String serviceKey, NotifyListener listener) {
- if (StringUtils.isEmpty(serviceKey)) {
- return emptySet();
- }
-
- Set<String> result = new HashSet<>();
- String protocol = listener.getConsumerUrl().getParameter(PROTOCOL_KEY);
- if (serviceKey.endsWith(CONSUMER_PROTOCOL_SUFFIX)) {
- serviceKey = serviceKey.substring(0, serviceKey.indexOf(CONSUMER_PROTOCOL_SUFFIX));
- }
-
- if (StringUtils.isNotEmpty(protocol)) {
- int protocolIndex = serviceKey.indexOf(":" + protocol);
- if (protocol.contains(",") && protocolIndex != -1) {
- serviceKey = serviceKey.substring(0, protocolIndex);
- String[] specifiedProtocols = protocol.split(",");
- for (String specifiedProtocol : specifiedProtocols) {
- result.add(serviceKey + GROUP_CHAR_SEPARATOR + specifiedProtocol);
- }
- } else {
- result.add(serviceKey);
- }
- } else {
- for (String supportedProtocol : SUPPORTED_PROTOCOLS) {
- result.add(serviceKey + GROUP_CHAR_SEPARATOR + supportedProtocol);
- }
- }
-
- return result;
- }
-
protected class AddressRefreshRetryTask implements Runnable {
private final RetryServiceInstancesChangedEvent retryEvent;
private final Semaphore retryPermission;
@@ -525,26 +464,16 @@ public class ServiceInstancesChangedListener {
}
public static class NotifyListenerWithKey {
- private final String serviceKey;
- private final Set<String> protocolServiceKeys;
+ private final ProtocolServiceKey protocolServiceKey;
private final NotifyListener notifyListener;
- public NotifyListenerWithKey(String protocolServiceKey, Set<String> protocolServiceKeys, NotifyListener notifyListener) {
- this.serviceKey = protocolServiceKey;
- this.protocolServiceKeys = (protocolServiceKeys == null ? new ConcurrentHashSet<>() : protocolServiceKeys);
+ public NotifyListenerWithKey(ProtocolServiceKey protocolServiceKey, NotifyListener notifyListener) {
+ this.protocolServiceKey = protocolServiceKey;
this.notifyListener = notifyListener;
}
- public NotifyListenerWithKey(String protocolServiceKey, NotifyListener notifyListener) {
- this(protocolServiceKey, null, notifyListener);
- }
-
- public String getServiceKey() {
- return serviceKey;
- }
-
- public Set<String> getProtocolServiceKeys() {
- return protocolServiceKeys;
+ public ProtocolServiceKey getProtocolServiceKey() {
+ return protocolServiceKey;
}
public NotifyListener getNotifyListener() {
@@ -560,12 +489,30 @@ public class ServiceInstancesChangedListener {
return false;
}
NotifyListenerWithKey that = (NotifyListenerWithKey) o;
- return Objects.equals(serviceKey, that.serviceKey) && Objects.equals(notifyListener, that.notifyListener);
+ return Objects.equals(protocolServiceKey, that.protocolServiceKey) && Objects.equals(notifyListener, that.notifyListener);
}
@Override
public int hashCode() {
- return Objects.hash(serviceKey, notifyListener);
+ return Objects.hash(protocolServiceKey, notifyListener);
+ }
+ }
+
+ public static class ProtocolServiceKeyWithUrls {
+ private final ProtocolServiceKey protocolServiceKey;
+ private final List<URL> urls;
+
+ public ProtocolServiceKeyWithUrls(ProtocolServiceKey protocolServiceKey, List<URL> urls) {
+ this.protocolServiceKey = protocolServiceKey;
+ this.urls = urls;
+ }
+
+ public ProtocolServiceKey getProtocolServiceKey() {
+ return protocolServiceKey;
+ }
+
+ public List<URL> getUrls() {
+ return urls;
}
}
}
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryTest.java
index 6ce0410b43..40546c73c2 100644
--- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryTest.java
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.dubbo.registry.client;
+import org.apache.dubbo.common.ProtocolServiceKey;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.metadata.AbstractServiceNameMapping;
import org.apache.dubbo.metadata.ServiceNameMapping;
@@ -43,7 +44,6 @@ import java.util.concurrent.locks.ReentrantLock;
import static org.apache.dubbo.common.constants.CommonConstants.CHECK_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
-import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPARATOR;
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
import static org.apache.dubbo.metadata.ServiceNameMapping.toStringKeys;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -220,10 +220,10 @@ public class ServiceDiscoveryRegistryTest {
// check different protocol
Map<String, Set<ServiceInstancesChangedListener.NotifyListenerWithKey>> serviceListeners = multiAppsInstanceListener.getServiceListeners();
assertEquals(2, serviceListeners.size());
- assertEquals(1, serviceListeners.get(url.getProtocolServiceKey()).size());
- assertEquals(1, serviceListeners.get(url2.getProtocolServiceKey()).size());
- String protocolServiceKey = url2.getServiceKey() + GROUP_CHAR_SEPARATOR + url2.getParameter(PROTOCOL_KEY, DUBBO);
- assertTrue(serviceListeners.get(url2.getProtocolServiceKey()).contains(new ServiceInstancesChangedListener.NotifyListenerWithKey(protocolServiceKey, testServiceListener2)));
+ assertEquals(1, serviceListeners.get(url.getServiceKey()).size());
+ assertEquals(1, serviceListeners.get(url2.getServiceKey()).size());
+ ProtocolServiceKey protocolServiceKey = new ProtocolServiceKey(url2.getServiceInterface(), url2.getVersion(), url2.getGroup(), url2.getParameter(PROTOCOL_KEY, DUBBO));
+ assertTrue(serviceListeners.get(url2.getServiceKey()).contains(new ServiceInstancesChangedListener.NotifyListenerWithKey(protocolServiceKey, testServiceListener2)));
}
/**
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/MockServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/MockServiceInstancesChangedListener.java
index 7c187eeef1..26f82b4f5b 100644
--- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/MockServiceInstancesChangedListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/MockServiceInstancesChangedListener.java
@@ -16,6 +16,7 @@
*/
package org.apache.dubbo.registry.client.event.listener;
+import org.apache.dubbo.common.ProtocolServiceKey;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
@@ -35,8 +36,8 @@ public class MockServiceInstancesChangedListener extends ServiceInstancesChanged
}
@Override
- public List<URL> getAddresses(String serviceProtocolKey, URL consumerURL) {
- return super.getAddresses(serviceProtocolKey, consumerURL);
+ public List<URL> getAddresses(ProtocolServiceKey protocolServiceKey, URL consumerURL) {
+ return super.getAddresses(protocolServiceKey, consumerURL);
}
public Map<String, Set<NotifyListenerWithKey>> getServiceListeners() {
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
index b8c69614c1..7bab0b8554 100644
--- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.dubbo.registry.client.event.listener;
+import org.apache.dubbo.common.ProtocolServiceKey;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.JsonUtils;
import org.apache.dubbo.common.utils.LRUCache;
@@ -49,7 +50,6 @@ import org.mockito.stubbing.Answer;
import java.lang.reflect.Field;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -62,7 +62,6 @@ import static org.apache.dubbo.common.utils.CollectionUtils.isEmpty;
import static org.apache.dubbo.common.utils.CollectionUtils.isNotEmpty;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.EXPORTED_SERVICES_REVISION_PROPERTY_NAME;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.eq;
@@ -219,7 +218,8 @@ public class ServiceInstancesChangedListenerTest {
Assertions.assertEquals(1, allInstances.size());
Assertions.assertEquals(3, allInstances.get("app1").size());
- List<URL> serviceUrls = listener.getAddresses(service1 + ":dubbo", consumerURL);
+ ProtocolServiceKey protocolServiceKey = new ProtocolServiceKey(service1, null, null, "dubbo");
+ List<URL> serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
Assertions.assertEquals(3, serviceUrls.size());
assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
}
@@ -240,7 +240,8 @@ public class ServiceInstancesChangedListenerTest {
Assertions.assertEquals(1, allInstances.size());
Assertions.assertEquals(3, allInstances.get("app1").size());
- List<URL> serviceUrls = listener.getAddresses(service1 + ":dubbo", consumerURL);
+ ProtocolServiceKey protocolServiceKey = new ProtocolServiceKey(service1, null, null, "dubbo");
+ List<URL> serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
Assertions.assertEquals(3, serviceUrls.size());
assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
@@ -271,12 +272,16 @@ public class ServiceInstancesChangedListenerTest {
Assertions.assertEquals(3, allInstances.get("app1").size());
Assertions.assertEquals(4, allInstances.get("app2").size());
- List<URL> serviceUrls = listener.getAddresses(service1 + ":dubbo", consumerURL);
+ ProtocolServiceKey protocolServiceKey1 = new ProtocolServiceKey(service1, null, null, "dubbo");
+ ProtocolServiceKey protocolServiceKey2 = new ProtocolServiceKey(service2, null, null, "dubbo");
+ ProtocolServiceKey protocolServiceKey3 = new ProtocolServiceKey(service3, null, null, "dubbo");
+
+ List<URL> serviceUrls = listener.getAddresses(protocolServiceKey1, consumerURL);
Assertions.assertEquals(7, serviceUrls.size());
- List<URL> serviceUrls2 = listener.getAddresses(service2 + ":dubbo", consumerURL);
+ List<URL> serviceUrls2 = listener.getAddresses(protocolServiceKey2, consumerURL);
Assertions.assertEquals(4, serviceUrls2.size());
assertTrue(serviceUrls2.get(0).getIp().contains("30.10."));
- List<URL> serviceUrls3 = listener.getAddresses(service3 + ":dubbo", consumerURL);
+ List<URL> serviceUrls3 = listener.getAddresses(protocolServiceKey3, consumerURL);
Assertions.assertEquals(2, serviceUrls3.size());
assertTrue(serviceUrls3.get(0).getIp().contains("30.10."));
}
@@ -308,13 +313,17 @@ public class ServiceInstancesChangedListenerTest {
Assertions.assertEquals(0, allInstances.get("app1").size());
Assertions.assertEquals(4, allInstances.get("app2").size());
- List<URL> serviceUrls = listener.getAddresses(service1 + ":dubbo", consumerURL);
+ ProtocolServiceKey protocolServiceKey1 = new ProtocolServiceKey(service1, null, null, "dubbo");
+ ProtocolServiceKey protocolServiceKey2 = new ProtocolServiceKey(service2, null, null, "dubbo");
+ ProtocolServiceKey protocolServiceKey3 = new ProtocolServiceKey(service3, null, null, "dubbo");
+
+ List<URL> serviceUrls = listener.getAddresses(protocolServiceKey1, consumerURL);
Assertions.assertEquals(4, serviceUrls.size());
assertTrue(serviceUrls.get(0).getIp().contains("30.10."));
- List<URL> serviceUrls2 = listener.getAddresses(service2 + ":dubbo", consumerURL);
+ List<URL> serviceUrls2 = listener.getAddresses(protocolServiceKey2, consumerURL);
Assertions.assertEquals(4, serviceUrls2.size());
assertTrue(serviceUrls2.get(0).getIp().contains("30.10."));
- List<URL> serviceUrls3 = listener.getAddresses(service3 + ":dubbo", consumerURL);
+ List<URL> serviceUrls3 = listener.getAddresses(protocolServiceKey3, consumerURL);
Assertions.assertEquals(2, serviceUrls3.size());
assertTrue(serviceUrls3.get(0).getIp().contains("30.10."));
@@ -328,9 +337,9 @@ public class ServiceInstancesChangedListenerTest {
Assertions.assertEquals(0, allInstances_app2.get("app1").size());
Assertions.assertEquals(0, allInstances_app2.get("app2").size());
- assertTrue(isEmpty(listener.getAddresses(service1 + ":dubbo", consumerURL)));
- assertTrue(isEmpty(listener.getAddresses(service2 + ":dubbo", consumerURL)));
- assertTrue(isEmpty(listener.getAddresses(service3 + ":dubbo", consumerURL)));
+ assertTrue(isEmpty(listener.getAddresses(protocolServiceKey1, consumerURL)));
+ assertTrue(isEmpty(listener.getAddresses(protocolServiceKey2, consumerURL)));
+ assertTrue(isEmpty(listener.getAddresses(protocolServiceKey3, consumerURL)));
}
// 正常场景。检查instance listener -> service listener(Directory)地址推送流程
@@ -345,8 +354,8 @@ public class ServiceInstancesChangedListenerTest {
when(demoServiceListener.getConsumerUrl()).thenReturn(consumerURL);
NotifyListener demoService2Listener = Mockito.mock(NotifyListener.class);
when(demoService2Listener.getConsumerUrl()).thenReturn(consumerURL2);
- listener.addListenerAndNotify(consumerURL.getProtocolServiceKey(), demoServiceListener);
- listener.addListenerAndNotify(consumerURL2.getProtocolServiceKey(), demoService2Listener);
+ listener.addListenerAndNotify(consumerURL, demoServiceListener);
+ listener.addListenerAndNotify(consumerURL2, demoService2Listener);
// notify app1 instance change
ServiceInstancesChangedEvent app1_event = new ServiceInstancesChangedEvent("app1", app1Instances);
listener.onEvent(app1_event);
@@ -378,7 +387,7 @@ public class ServiceInstancesChangedListenerTest {
// test service listener still get notified when added after instance notification.
NotifyListener demoService3Listener = Mockito.mock(NotifyListener.class);
when(demoService3Listener.getConsumerUrl()).thenReturn(consumerURL3);
- listener.addListenerAndNotify(consumerURL3.getProtocolServiceKey(), demoService3Listener);
+ listener.addListenerAndNotify(consumerURL3, demoService3Listener);
Mockito.verify(demoService3Listener, Mockito.times(1)).notify(Mockito.anyList());
}
@@ -397,10 +406,10 @@ public class ServiceInstancesChangedListenerTest {
when(demoService2Listener1.getConsumerUrl()).thenReturn(consumerURL2);
NotifyListener demoService2Listener2 = Mockito.mock(NotifyListener.class);
when(demoService2Listener2.getConsumerUrl()).thenReturn(consumerURL2);
- listener.addListenerAndNotify(consumerURL.getProtocolServiceKey(), demoServiceListener1);
- listener.addListenerAndNotify(consumerURL.getProtocolServiceKey(), demoServiceListener2);
- listener.addListenerAndNotify(consumerURL2.getProtocolServiceKey(), demoService2Listener1);
- listener.addListenerAndNotify(consumerURL2.getProtocolServiceKey(), demoService2Listener2);
+ listener.addListenerAndNotify(consumerURL, demoServiceListener1);
+ listener.addListenerAndNotify(consumerURL, demoServiceListener2);
+ listener.addListenerAndNotify(consumerURL2, demoService2Listener1);
+ listener.addListenerAndNotify(consumerURL2, demoService2Listener2);
// notify app1 instance change
ServiceInstancesChangedEvent app1_event = new ServiceInstancesChangedEvent("app1", app1Instances);
listener.onEvent(app1_event);
@@ -432,7 +441,7 @@ public class ServiceInstancesChangedListenerTest {
// test service listener still get notified when added after instance notification.
NotifyListener demoService3Listener = Mockito.mock(NotifyListener.class);
when(demoService3Listener.getConsumerUrl()).thenReturn(consumerURL3);
- listener.addListenerAndNotify(consumerURL3.getProtocolServiceKey(), demoService3Listener);
+ listener.addListenerAndNotify(consumerURL3, demoService3Listener);
Mockito.verify(demoService3Listener, Mockito.times(1)).notify(Mockito.anyList());
}
@@ -448,15 +457,15 @@ public class ServiceInstancesChangedListenerTest {
// no protocol specified, consume all instances
NotifyListener demoServiceListener1 = Mockito.mock(NotifyListener.class);
when(demoServiceListener1.getConsumerUrl()).thenReturn(noProtocolConsumerURL);
- listener.addListenerAndNotify(noProtocolConsumerURL.getProtocolServiceKey(), demoServiceListener1);
+ listener.addListenerAndNotify(noProtocolConsumerURL, demoServiceListener1);
// multiple protocols specified
NotifyListener demoServiceListener2 = Mockito.mock(NotifyListener.class);
when(demoServiceListener2.getConsumerUrl()).thenReturn(multipleProtocolsConsumerURL);
- listener.addListenerAndNotify(multipleProtocolsConsumerURL.getProtocolServiceKey(), demoServiceListener2);
+ listener.addListenerAndNotify(multipleProtocolsConsumerURL, demoServiceListener2);
// one protocol specified
NotifyListener demoServiceListener3 = Mockito.mock(NotifyListener.class);
when(demoServiceListener3.getConsumerUrl()).thenReturn(singleProtocolsConsumerURL);
- listener.addListenerAndNotify(singleProtocolsConsumerURL.getProtocolServiceKey(), demoServiceListener3);
+ listener.addListenerAndNotify(singleProtocolsConsumerURL, demoServiceListener3);
// notify app1 instance change
ServiceInstancesChangedEvent app1_event = new ServiceInstancesChangedEvent("app1", app1InstancesMultipleProtocols);
@@ -479,9 +488,119 @@ public class ServiceInstancesChangedListenerTest {
Assertions.assertEquals(1, single_protocol_notifiedUrls.size());
}
- // revision 异常场景。第一次启动,完全拿不到metadata,只能通知部分地址
+ /**
+ * Test subscribe multiple groups
+ */
@Test
@Order(8)
+ public void testSubscribeMultipleGroups() {
+ Set<String> serviceNames = new HashSet<>();
+ serviceNames.add("app1");
+ listener = new ServiceInstancesChangedListener(serviceNames, serviceDiscovery);
+
+ // notify instance change
+ ServiceInstancesChangedEvent event = new ServiceInstancesChangedEvent("app1", app1Instances);
+ listener.onEvent(event);
+
+ Map<String, List<ServiceInstance>> allInstances = listener.getAllInstances();
+ Assertions.assertEquals(1, allInstances.size());
+ Assertions.assertEquals(3, allInstances.get("app1").size());
+
+ ProtocolServiceKey protocolServiceKey = new ProtocolServiceKey(service1, null, null, "dubbo");
+ List<URL> serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, null, "", "dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, null, ",group1", "dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, null, "group1,", "dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, null, "*", "dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, null, "group1", "dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(0, serviceUrls.size());
+
+ protocolServiceKey = new ProtocolServiceKey(service1, null, "group1,group2", "dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(0, serviceUrls.size());
+
+ protocolServiceKey = new ProtocolServiceKey(service1, null, "group1,,group2", "dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+ }
+
+ /**
+ * Test subscribe multiple versions
+ */
+ @Test
+ @Order(9)
+ public void testSubscribeMultipleVersions() {
+ Set<String> serviceNames = new HashSet<>();
+ serviceNames.add("app1");
+ listener = new ServiceInstancesChangedListener(serviceNames, serviceDiscovery);
+
+ // notify instance change
+ ServiceInstancesChangedEvent event = new ServiceInstancesChangedEvent("app1", app1Instances);
+ listener.onEvent(event);
+
+ Map<String, List<ServiceInstance>> allInstances = listener.getAllInstances();
+ Assertions.assertEquals(1, allInstances.size());
+ Assertions.assertEquals(3, allInstances.get("app1").size());
+
+ ProtocolServiceKey protocolServiceKey = new ProtocolServiceKey(service1, null, null, "dubbo");
+ List<URL> serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, "", null, "dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, "*", null, "dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, ",1.0.0", null, "dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, "1.0.0,", null, "dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, "1.0.0,,1.0.1", null, "dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, "1.0.1,1.0.0", null, "dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(0, serviceUrls.size());
+ }
+
+ // revision 异常场景。第一次启动,完全拿不到metadata,只能通知部分地址
+ @Test
+ @Order(10)
public void testRevisionFailureOnStartup() {
Set<String> serviceNames = new HashSet<>();
serviceNames.add("app1");
@@ -490,8 +609,12 @@ public class ServiceInstancesChangedListenerTest {
ServiceInstancesChangedEvent failed_revision_event = new ServiceInstancesChangedEvent("app1", app1FailedInstances);
listener.onEvent(failed_revision_event);
- List<URL> serviceUrls = listener.getAddresses(service1 + ":dubbo", consumerURL);
- List<URL> serviceUrls2 = listener.getAddresses(service2 + ":dubbo", consumerURL);
+
+ ProtocolServiceKey protocolServiceKey1 = new ProtocolServiceKey(service1, null, null, "dubbo");
+ ProtocolServiceKey protocolServiceKey2 = new ProtocolServiceKey(service2, null, null, "dubbo");
+
+ List<URL> serviceUrls = listener.getAddresses(protocolServiceKey1, consumerURL);
+ List<URL> serviceUrls2 = listener.getAddresses(protocolServiceKey2, consumerURL);
assertTrue(isNotEmpty(serviceUrls));
assertTrue(isNotEmpty(serviceUrls2));
@@ -499,7 +622,7 @@ public class ServiceInstancesChangedListenerTest {
// revision 异常场景。运行中地址通知,拿不到revision就用老版本revision
@Test
- @Order(9)
+ @Order(11)
public void testRevisionFailureOnNotification() {
Set<String> serviceNames = new HashSet<>();
serviceNames.add("app1");
@@ -524,8 +647,11 @@ public class ServiceInstancesChangedListenerTest {
listener.onEvent(event2);
// event2 did not really take effect
- Assertions.assertEquals(3, listener.getAddresses(service1 + ":dubbo", consumerURL).size());
- assertTrue(isEmpty(listener.getAddresses(service2 + ":dubbo", consumerURL)));
+ ProtocolServiceKey protocolServiceKey1 = new ProtocolServiceKey(service1, null, null, "dubbo");
+ ProtocolServiceKey protocolServiceKey2 = new ProtocolServiceKey(service2, null, null, "dubbo");
+
+ Assertions.assertEquals(3, listener.getAddresses(protocolServiceKey1, consumerURL).size());
+ assertTrue(isEmpty(listener.getAddresses(protocolServiceKey2, consumerURL)));
//
init();
@@ -536,16 +662,16 @@ public class ServiceInstancesChangedListenerTest {
e.printStackTrace();
}
// check recovered after retry.
- List<URL> serviceUrls_after_retry = listener.getAddresses(service1 + ":dubbo", consumerURL);
+ List<URL> serviceUrls_after_retry = listener.getAddresses(protocolServiceKey1, consumerURL);
Assertions.assertEquals(5, serviceUrls_after_retry.size());
- List<URL> serviceUrls2_after_retry = listener.getAddresses(service2 + ":dubbo", consumerURL);
+ List<URL> serviceUrls2_after_retry = listener.getAddresses(protocolServiceKey2, consumerURL);
Assertions.assertEquals(2, serviceUrls2_after_retry.size());
}
// Abnormal case. Instance does not have revision
@Test
- @Order(10)
+ @Order(12)
public void testInstanceWithoutRevision() {
Set<String> serviceNames = new HashSet<>();
serviceNames.add("app1");
@@ -559,39 +685,6 @@ public class ServiceInstancesChangedListenerTest {
assertTrue(true);
}
- /**
- * Test calculation of subscription protocols
- */
- @Test
- public void testGetProtocolServiceKeyList() {
- NotifyListener listener = Mockito.mock(NotifyListener.class);
-
- Set<String> serviceNames = new HashSet<>();
- serviceNames.add("app1");
- ServiceDiscovery serviceDiscovery = Mockito.mock(ServiceDiscovery.class);
- ServiceInstancesChangedListener instancesChangedListener = new ServiceInstancesChangedListener(serviceNames, serviceDiscovery);
-
- URL url1 = URL.valueOf("tri://localhost/Service?protocol=tri");
- when(listener.getConsumerUrl()).thenReturn(url1);
- Set<String> keyList11 = instancesChangedListener.getProtocolServiceKeyList(url1.getProtocolServiceKey(), listener);
- assertEquals(getExpectedSet(Arrays.asList("Service:tri")), keyList11);
-
- URL url2 = URL.valueOf("consumer://localhost/Service?group=group&version=1.0");
- when(listener.getConsumerUrl()).thenReturn(url2);
- Set<String> keyList12 = instancesChangedListener.getProtocolServiceKeyList(url2.getProtocolServiceKey(), listener);
- assertEquals(getExpectedSet(Arrays.asList("group/Service:1.0:tri", "group/Service:1.0:dubbo", "group/Service:1.0:rest")), keyList12);
-
- URL url3 = URL.valueOf("dubbo://localhost/Service?protocol=dubbo&group=group&version=1.0");
- when(listener.getConsumerUrl()).thenReturn(url3);
- Set<String> keyList21 = instancesChangedListener.getProtocolServiceKeyList(url3.getProtocolServiceKey(), listener);
- assertEquals(getExpectedSet(Arrays.asList("group/Service:1.0:dubbo")), keyList21);
-
- URL url4 = URL.valueOf("dubbo,tri://localhost/Service?protocol=dubbo,tri&group=group&version=1.0");
- when(listener.getConsumerUrl()).thenReturn(url4);
- Set<String> keyList23 = instancesChangedListener.getProtocolServiceKeyList(url4.getProtocolServiceKey(), listener);
- assertEquals(getExpectedSet(Arrays.asList("group/Service:1.0:dubbo", "group/Service:1.0:tri")), keyList23);
- }
-
Set<String> getExpectedSet(List<String> list) {
return new HashSet<>(list);
}