You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/25 12:14:30 UTC
[shardingsphere] branch master updated: Support Nacos to persist metadata in cluster mode (#20984)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ba081e00d57 Support Nacos to persist metadata in cluster mode (#20984)
ba081e00d57 is described below
commit ba081e00d57a37a1b88bd624bbc612c840b02a5d
Author: caqhy <75...@users.noreply.github.com>
AuthorDate: Sun Sep 25 20:14:22 2022 +0800
Support Nacos to persist metadata in cluster mode (#20984)
* Support ClusterPersistRepository: Nacos in cluster mode
* Add unit tests for NacosRepository
* Add unit tests for database based distributed lock
* Refactor example
* Fix NPE when etcd listener triggers
* Add license
* Revert commit
* Refactor NacosRepository
* Refactor NacosRepositoryTest
* Update pom
* Remove duplicate IP check
* Remove SPI
* Add NacosRepository SPI
* Refactor example
* Refactor MetadataUtil
* Fix lombok
* Add final
* Refactor nacos initialization process and watcher logic
* Refactor NacosRepositoryTest
* Remove RegisterMetadata
* Add ServiceController to manage service metadata
* Update NacosRepositoryTest
* Update clusterIp default value
* Update NacosPropertiesTest
* Remove Slf4j
* Add license
* Rename tryLock and unlock
* Fix import
Co-authored-by: qiuhaoyang <69...@users.noreply.gitlab.topviewclub.cn>
---
examples/pom.xml | 5 +
.../cluster-mode-raw-jdbc-example/pom.xml | 8 +
.../ClusterModeRawJavaConfigurationExample.java | 15 +-
.../jdbc/config/ClusterModeConfigurationUtil.java | 33 +-
.../mode/raw/jdbc/config/type/RepositoryType.java | 30 ++
.../pom.xml | 1 +
.../pom.xml | 21 +-
.../repository/cluster/nacos/NacosRepository.java | 380 +++++++++++++++++++
.../repository/cluster/nacos/entity/KeyValue.java | 35 ++
.../cluster/nacos/entity/ServiceController.java | 63 ++++
.../cluster/nacos/entity/ServiceMetadata.java | 62 ++++
.../nacos/listener/NamingEventListener.java | 131 +++++++
.../cluster/nacos/listener/WatchData.java | 39 ++
.../cluster/nacos/props/NacosProperties.java | 32 ++
.../cluster/nacos/props/NacosPropertyKey.java | 56 +++
.../cluster/nacos/utils/MetadataUtil.java | 85 +++++
...ode.repository.cluster.ClusterPersistRepository | 18 +
.../cluster/nacos/NacosRepositoryTest.java | 403 +++++++++++++++++++++
.../cluster/nacos/props/NacosPropertiesTest.java | 55 +++
.../shardingsphere-proxy-bootstrap/pom.xml | 5 +
20 files changed, 1459 insertions(+), 18 deletions(-)
diff --git a/examples/pom.xml b/examples/pom.xml
index 51b467da4ef..0a39691f482 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -142,6 +142,11 @@
<artifactId>shardingsphere-cluster-mode-repository-etcd</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-cluster-mode-repository-nacos</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-infra-common</artifactId>
diff --git a/examples/shardingsphere-jdbc-example/single-feature-example/cluster-mode-example/cluster-mode-raw-jdbc-example/pom.xml b/examples/shardingsphere-jdbc-example/single-feature-example/cluster-mode-example/cluster-mode-raw-jdbc-example/pom.xml
index 08e7d6e4ad1..a337c657072 100644
--- a/examples/shardingsphere-jdbc-example/single-feature-example/cluster-mode-example/cluster-mode-raw-jdbc-example/pom.xml
+++ b/examples/shardingsphere-jdbc-example/single-feature-example/cluster-mode-example/cluster-mode-raw-jdbc-example/pom.xml
@@ -47,5 +47,13 @@
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-cluster-mode-repository-zookeeper-curator</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-cluster-mode-repository-etcd</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-cluster-mode-repository-nacos</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/examples/shardingsphere-jdbc-example/single-feature-example/cluster-mode-example/cluster-mode-raw-jdbc-example/src/main/java/org/apache/shardingsphere/example/cluster/mode/raw/jdbc/ClusterModeRawJavaConfigurationExample.java b/examples/shardingsphere-jdbc-example/single-feature-example/cluster-mode-example/cluster-mode-raw-jdbc-example/src/main/java/org/apache/shardingsphere/example/cluster/mode/raw/jdbc/ClusterModeRawJavaConfigurationExample.java
index 4bb947fb01f..0effb52a2d0 100644
--- a/examples/shardingsphere-jdbc-example/single-feature-example/cluster-mode-example/cluster-mode-raw-jdbc-example/src/main/java/org/apache/shardingsphere/example/cluster/mode/raw/jdbc/ClusterModeRawJavaConfigurationExample.java
+++ b/examples/shardingsphere-jdbc-example/single-feature-example/cluster-mode-example/cluster-mode-raw-jdbc-example/src/main/java/org/apache/shardingsphere/example/cluster/mode/raw/jdbc/ClusterModeRawJavaConfigurationExample.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.example.cluster.mode.raw.jdbc.config.local.Loca
import org.apache.shardingsphere.example.cluster.mode.raw.jdbc.config.local.LocalReadwriteSplittingConfiguration;
import org.apache.shardingsphere.example.cluster.mode.raw.jdbc.config.local.LocalShadowConfiguration;
import org.apache.shardingsphere.example.cluster.mode.raw.jdbc.config.local.LocalShardingDatabasesAndTablesConfiguration;
+import org.apache.shardingsphere.example.cluster.mode.raw.jdbc.config.type.RepositoryType;
import org.apache.shardingsphere.example.config.ExampleConfiguration;
import org.apache.shardingsphere.example.core.api.ExampleExecuteTemplate;
import org.apache.shardingsphere.example.core.api.service.ExampleService;
@@ -51,8 +52,12 @@ public final class ClusterModeRawJavaConfigurationExample {
private static boolean loadConfigFromRegCenter = false;
// private static boolean loadConfigFromRegCenter = true;
+ private static String repositoryType = RepositoryType.ZOOKEEPER;
+// private static String repositoryType = RepositoryType.ETCD;
+// private static String repositoryType = RepositoryType.NACOS;
+
public static void main(final String[] args) throws Exception {
- DataSource dataSource = getDataSource(shardingType, loadConfigFromRegCenter);
+ DataSource dataSource = getDataSource(shardingType, loadConfigFromRegCenter, repositoryType);
try {
ExampleExecuteTemplate.run(getExampleService(dataSource));
} finally {
@@ -60,8 +65,8 @@ public final class ClusterModeRawJavaConfigurationExample {
}
}
- private static DataSource getDataSource(final ShardingType shardingType, final boolean loadConfigFromRegCenter) throws SQLException {
- ModeConfiguration modeConfig = getModeConfiguration(shardingType);
+ private static DataSource getDataSource(final ShardingType shardingType, final boolean loadConfigFromRegCenter, final String repositoryType) throws SQLException {
+ ModeConfiguration modeConfig = getModeConfiguration(shardingType, repositoryType);
ExampleConfiguration config;
switch (shardingType) {
case SHARDING_DATABASES_AND_TABLES:
@@ -83,8 +88,8 @@ public final class ClusterModeRawJavaConfigurationExample {
return config.getDataSource();
}
- private static ModeConfiguration getModeConfiguration(final ShardingType shardingType) {
- return ClusterModeConfigurationUtil.getZooKeeperConfiguration(!loadConfigFromRegCenter, shardingType);
+ private static ModeConfiguration getModeConfiguration(final ShardingType shardingType, final String repositoryType) {
+ return ClusterModeConfigurationUtil.getRepositoryConfiguration(!loadConfigFromRegCenter, shardingType, repositoryType);
}
private static ExampleService getExampleService(final DataSource dataSource) {
diff --git a/examples/shardingsphere-jdbc-example/single-feature-example/cluster-mode-example/cluster-mode-raw-jdbc-example/src/main/java/org/apache/shardingsphere/example/cluster/mode/raw/jdbc/config/ClusterModeConfigurationUtil.java b/examples/shardingsphere-jdbc-example/single-feature-example/cluster-mode-example/cluster-mode-raw-jdbc-example/src/main/java/org/apache/shardingsphere/example/cluster/mode/raw/jdbc/config/ClusterModeConfigurationUtil.java
index 129b011f069..a562e3240e2 100644
--- a/examples/shardingsphere-jdbc-example/single-feature-example/cluster-mode-example/cluster-mode-raw-jdbc-example/src/main/java/org/apache/shardingsphere/example/cluster/mode/raw/jdbc/config/ClusterModeConfigurationUtil.java
+++ b/examples/shardingsphere-jdbc-example/single-feature-example/cluster-mode-example/cluster-mode-raw-jdbc-example/src/main/java/org/apache/shardingsphere/example/cluster/mode/raw/jdbc/config/ClusterModeConfigurationUtil.java
@@ -17,30 +17,51 @@
package org.apache.shardingsphere.example.cluster.mode.raw.jdbc.config;
+import org.apache.shardingsphere.example.cluster.mode.raw.jdbc.config.type.RepositoryType;
import org.apache.shardingsphere.example.type.ShardingType;
-import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
+import org.apache.shardingsphere.mode.repository.cluster.nacos.props.NacosPropertyKey;
import java.util.Properties;
public final class ClusterModeConfigurationUtil {
+ private static final String NACOS_CONNECTION_STRING = "localhost:8848";
+
private static final String ZOOKEEPER_CONNECTION_STRING = "localhost:2181";
- public static ModeConfiguration getZooKeeperConfiguration(final boolean overwrite, final ShardingType shardingType) {
+ private static final String ETCD_CONNECTION_STRING = "http://localhost:2379";
+
+ public static ModeConfiguration getRepositoryConfiguration(final boolean overwrite, final ShardingType shardingType, final String repositoryType) {
+ Properties props = new Properties();
+ String repositoryConnection;
+ switch (repositoryType) {
+ case RepositoryType.NACOS:
+ repositoryConnection = NACOS_CONNECTION_STRING;
+ break;
+ case RepositoryType.ZOOKEEPER:
+ repositoryConnection = ZOOKEEPER_CONNECTION_STRING;
+ break;
+ case RepositoryType.ETCD:
+ repositoryConnection = ETCD_CONNECTION_STRING;
+ break;
+ default:
+ throw new UnsupportedOperationException(repositoryType);
+ }
ClusterPersistRepositoryConfiguration clusterRepositoryConfig;
switch (shardingType) {
case SHARDING_DATABASES_AND_TABLES:
- clusterRepositoryConfig = new ClusterPersistRepositoryConfiguration("ZooKeeper", "governance-sharding-data-source", ZOOKEEPER_CONNECTION_STRING, new Properties());
+ clusterRepositoryConfig = new ClusterPersistRepositoryConfiguration(repositoryType, "governance-sharding-data-source", repositoryConnection, props);
return new ModeConfiguration("Cluster", clusterRepositoryConfig, overwrite);
case READWRITE_SPLITTING:
- clusterRepositoryConfig = new ClusterPersistRepositoryConfiguration("ZooKeeper", "governance-readwrite-splitting-data-source", ZOOKEEPER_CONNECTION_STRING, new Properties());
+ clusterRepositoryConfig = new ClusterPersistRepositoryConfiguration(repositoryType, "governance-readwrite-splitting-data-source", repositoryConnection, props);
return new ModeConfiguration("Cluster", clusterRepositoryConfig, overwrite);
case ENCRYPT:
- clusterRepositoryConfig = new ClusterPersistRepositoryConfiguration("ZooKeeper", "governance-encrypt-data-source", ZOOKEEPER_CONNECTION_STRING, new Properties());
+ clusterRepositoryConfig = new ClusterPersistRepositoryConfiguration(repositoryType, "governance-encrypt-data-source", repositoryConnection, props);
return new ModeConfiguration("Cluster", clusterRepositoryConfig, overwrite);
case SHADOW:
- clusterRepositoryConfig = new ClusterPersistRepositoryConfiguration("ZooKeeper", "governance-shadow-data-source", ZOOKEEPER_CONNECTION_STRING, new Properties());
+ clusterRepositoryConfig = new ClusterPersistRepositoryConfiguration(repositoryType, "governance-shadow-data-source", repositoryConnection, props);
return new ModeConfiguration("Cluster", clusterRepositoryConfig, overwrite);
default:
throw new UnsupportedOperationException(shardingType.toString());
diff --git a/examples/shardingsphere-jdbc-example/single-feature-example/cluster-mode-example/cluster-mode-raw-jdbc-example/src/main/java/org/apache/shardingsphere/example/cluster/mode/raw/jdbc/config/type/RepositoryType.java b/examples/shardingsphere-jdbc-example/single-feature-example/cluster-mode-example/cluster-mode-raw-jdbc-example/src/main/java/org/apache/shardingsphere/example/cluster/mode/raw/jdbc/config/type/RepositoryType.java
new file mode 100644
index 00000000000..e5fb636943d
--- /dev/null
+++ b/examples/shardingsphere-jdbc-example/single-feature-example/cluster-mode-example/cluster-mode-raw-jdbc-example/src/main/java/org/apache/shardingsphere/example/cluster/mode/raw/jdbc/config/type/RepositoryType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.example.cluster.mode.raw.jdbc.config.type;
+
+/**
+ * Repository type.
+ */
+public final class RepositoryType {
+
+ public static final String NACOS = "Nacos";
+
+ public static final String ZOOKEEPER = "ZooKeeper";
+
+ public static final String ETCD = "etcd";
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/pom.xml b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/pom.xml
index 8539c0bee9b..c001e33eb34 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/pom.xml
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/pom.xml
@@ -31,5 +31,6 @@
<modules>
<module>shardingsphere-cluster-mode-repository-zookeeper-curator</module>
<module>shardingsphere-cluster-mode-repository-etcd</module>
+ <module>shardingsphere-cluster-mode-repository-nacos</module>
</modules>
</project>
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/pom.xml b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/pom.xml
similarity index 68%
copy from shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/pom.xml
copy to shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/pom.xml
index 8539c0bee9b..e52070736a1 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/pom.xml
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/pom.xml
@@ -21,15 +21,22 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.shardingsphere</groupId>
- <artifactId>shardingsphere-cluster-mode-repository</artifactId>
+ <artifactId>shardingsphere-cluster-mode-repository-provider</artifactId>
<version>5.2.1-SNAPSHOT</version>
</parent>
- <artifactId>shardingsphere-cluster-mode-repository-provider</artifactId>
- <packaging>pom</packaging>
+ <artifactId>shardingsphere-cluster-mode-repository-nacos</artifactId>
<name>${project.artifactId}</name>
- <modules>
- <module>shardingsphere-cluster-mode-repository-zookeeper-curator</module>
- <module>shardingsphere-cluster-mode-repository-etcd</module>
- </modules>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-cluster-mode-repository-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba.nacos</groupId>
+ <artifactId>nacos-client</artifactId>
+ <version>1.4.2</version>
+ </dependency>
+ </dependencies>
</project>
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster [...]
new file mode 100644
index 00000000000..764a9244fdb
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.nacos;
+
+import com.alibaba.nacos.api.exception.NacosException;
+import com.alibaba.nacos.api.naming.NamingFactory;
+import com.alibaba.nacos.api.naming.NamingService;
+import com.alibaba.nacos.api.naming.PreservedMetadataKeys;
+import com.alibaba.nacos.api.naming.pojo.Instance;
+import com.alibaba.nacos.common.utils.CollectionUtils;
+import com.alibaba.nacos.common.utils.StringUtils;
+import com.google.common.base.Strings;
+import org.apache.shardingsphere.infra.instance.utils.IpUtils;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryException;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import org.apache.shardingsphere.mode.repository.cluster.nacos.entity.KeyValue;
+import org.apache.shardingsphere.mode.repository.cluster.nacos.entity.ServiceController;
+import org.apache.shardingsphere.mode.repository.cluster.nacos.entity.ServiceMetadata;
+import org.apache.shardingsphere.mode.repository.cluster.nacos.listener.NamingEventListener;
+import org.apache.shardingsphere.mode.repository.cluster.nacos.props.NacosProperties;
+import org.apache.shardingsphere.mode.repository.cluster.nacos.props.NacosPropertyKey;
+import org.apache.shardingsphere.mode.repository.cluster.nacos.utils.MetadataUtil;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Registry repository of Nacos.
+ */
+public final class NacosRepository implements ClusterPersistRepository {
+
+ private NamingService client;
+
+ private NacosProperties nacosProps;
+
+ private ServiceController serviceController;
+
+ @Override
+ public void init(final ClusterPersistRepositoryConfiguration config) {
+ nacosProps = new NacosProperties(config.getProps());
+ initClient(config);
+ initServiceMetadata();
+ }
+
+ private void initClient(final ClusterPersistRepositoryConfiguration config) {
+ Properties props = new Properties();
+ props.setProperty("serverAddr", config.getServerLists());
+ props.setProperty("namespace", config.getNamespace());
+ try {
+ client = NamingFactory.createNamingService(props);
+ // CHECKSTYLE:OFF
+ } catch (Exception cause) {
+ // CHECKSTYLE:ON
+ throw new ClusterPersistRepositoryException(cause);
+ }
+ }
+
+ private void initServiceMetadata() {
+ try {
+ String clusterIp = nacosProps.getValue(NacosPropertyKey.CLUSTER_IP);
+ String ip = Strings.isNullOrEmpty(clusterIp) ? IpUtils.getIp() : clusterIp;
+ serviceController = new ServiceController();
+ for (ServiceMetadata each : serviceController.getAllServices()) {
+ Integer port = client.getAllInstances(each.getServiceName(), false).stream()
+ .filter(instance -> StringUtils.equals(instance.getIp(), ip)).map(Instance::getPort).max(Comparator.naturalOrder()).orElse(Integer.MIN_VALUE);
+ each.setIp(ip);
+ each.setPort(new AtomicInteger(port));
+ }
+ // CHECKSTYLE:OFF
+ } catch (Exception cause) {
+ // CHECKSTYLE:ON
+ throw new ClusterPersistRepositoryException(cause);
+ }
+ }
+
+ @Override
+ public void persistEphemeral(final String key, final String value) {
+ try {
+ if (Objects.isNull(value)) {
+ throw new IllegalArgumentException("Value cannot be null");
+ }
+ if (!findExisted(key, true).isEmpty()) {
+ delete(key);
+ }
+ put(key, value, true);
+ // CHECKSTYLE:OFF
+ } catch (Exception cause) {
+ // CHECKSTYLE:ON
+ throw new ClusterPersistRepositoryException(cause);
+ }
+ }
+
+ @Override
+ public void persistExclusiveEphemeral(final String key, final String value) {
+ try {
+ if (!findExisted(key, true).isEmpty()) {
+ throw new IllegalStateException("Key: " + key + " already exists");
+ }
+ put(key, value, true);
+ // CHECKSTYLE:OFF
+ } catch (Exception cause) {
+ // CHECKSTYLE:ON
+ throw new ClusterPersistRepositoryException(cause);
+ }
+ }
+
+ @Override
+ public boolean tryLock(final String lockKey, final long timeoutMillis) {
+ // TODO
+ return false;
+ }
+
+ @Override
+ public void unlock(final String lockKey) {
+ // TODO
+ }
+
+ @Override
+ public void watch(final String key, final DataChangedEventListener listener) {
+ try {
+ for (ServiceMetadata each : serviceController.getAllServices()) {
+ NamingEventListener eventListener = each.getListener();
+ if (Objects.isNull(eventListener)) {
+ eventListener = new NamingEventListener();
+ eventListener.put(key, listener);
+ each.setListener(eventListener);
+ client.subscribe(each.getServiceName(), eventListener);
+ } else {
+ eventListener.put(key, listener);
+ }
+ }
+ // CHECKSTYLE:OFF
+ } catch (Exception cause) {
+ // CHECKSTYLE:ON
+ throw new ClusterPersistRepositoryException(cause);
+ }
+ }
+
+ @Override
+ public String get(final String key) {
+ try {
+ for (ServiceMetadata each : serviceController.getAllServices()) {
+ Optional<Instance> optional = findExisted(key, each.isEphemeral()).stream().max(Comparator.comparing(MetadataUtil::getTimestamp));
+ if (optional.isPresent()) {
+ return MetadataUtil.getValue(optional.get());
+ }
+ }
+ return null;
+ // CHECKSTYLE:OFF
+ } catch (Exception cause) {
+ // CHECKSTYLE:ON
+ throw new ClusterPersistRepositoryException(cause);
+ }
+ }
+
+ @Override
+ public List<String> getChildrenKeys(final String key) {
+ try {
+ Stream<String> concatKeys = Stream.empty();
+ for (ServiceMetadata each : serviceController.getAllServices()) {
+ Stream<String> keys = findExisted(each.isEphemeral()).stream()
+ .map(instance -> {
+ String fullPath = MetadataUtil.getKey(instance);
+ if (fullPath.startsWith(key + PATH_SEPARATOR)) {
+ String pathWithoutPrefix = fullPath.substring((key + PATH_SEPARATOR).length());
+ return pathWithoutPrefix.contains(PATH_SEPARATOR) ? pathWithoutPrefix.substring(0, pathWithoutPrefix.indexOf(PATH_SEPARATOR)) : pathWithoutPrefix;
+ }
+ return null;
+ }).filter(Objects::nonNull);
+ concatKeys = Stream.concat(concatKeys, keys);
+ }
+ return concatKeys.distinct().sorted(Comparator.reverseOrder()).collect(Collectors.toList());
+ // CHECKSTYLE:OFF
+ } catch (Exception cause) {
+ // CHECKSTYLE:ON
+ throw new ClusterPersistRepositoryException(cause);
+ }
+ }
+
+ @Override
+ public void persist(final String key, final String value) {
+ try {
+ if (Objects.isNull(value)) {
+ throw new IllegalArgumentException("Value cannot be null");
+ }
+ Optional<Instance> optional = findExisted(key, false).stream().max(Comparator.comparing(MetadataUtil::getTimestamp));
+ if (optional.isPresent()) {
+ update(optional.get(), value);
+ } else {
+ put(key, value, false);
+ }
+ // CHECKSTYLE:OFF
+ } catch (Exception cause) {
+ // CHECKSTYLE:ON
+ throw new ClusterPersistRepositoryException(cause);
+ }
+ }
+
+ private void put(final String key, final String value, final boolean ephemeral) throws NacosException, InterruptedException {
+ final List<KeyValue> keyValues = buildParentPath(key);
+ ServiceMetadata serviceMetadata = serviceController.getService(ephemeral);
+ Instance instance = new Instance();
+ instance.setIp(serviceMetadata.getIp());
+ instance.setPort(serviceMetadata.getPort());
+ instance.setEphemeral(ephemeral);
+ Map<String, String> metadataMap = new HashMap<>(5, 1);
+ if (ephemeral) {
+ fillEphemeralMetadata(metadataMap);
+ }
+ metadataMap.put(key, value);
+ metadataMap.put(MetadataUtil.UTC_ZONE_OFFSET.toString(), String.valueOf(MetadataUtil.getTimestamp()));
+ instance.setMetadata(metadataMap);
+ client.registerInstance(serviceMetadata.getServiceName(), instance);
+ keyValues.add(new KeyValue(key, value, ephemeral));
+ waitValue(keyValues);
+ }
+
+ private List<KeyValue> buildParentPath(final String key) throws NacosException {
+ List<KeyValue> result = new LinkedList<>();
+ StringBuilder parentPath = new StringBuilder();
+ String[] partPath = key.split(PATH_SEPARATOR);
+ for (int index = 1; index < partPath.length - 1; index++) {
+ String path = parentPath.append(PATH_SEPARATOR).append(partPath[index]).toString();
+ if (findExisted(path, false).isEmpty()) {
+ result.addAll(build(path));
+ }
+ }
+ return result;
+ }
+
+ private List<KeyValue> build(final String key) throws NacosException {
+ List<KeyValue> result = new LinkedList<>();
+ if (findExisted(key, false).isEmpty()) {
+ Instance instance = new Instance();
+ ServiceMetadata persistentService = serviceController.getPersistentService();
+ instance.setIp(persistentService.getIp());
+ instance.setPort(persistentService.getPort());
+ instance.setEphemeral(false);
+ Map<String, String> metadataMap = new HashMap<>(2, 1);
+ metadataMap.put(key, MetadataUtil.EMPTY);
+ metadataMap.put(MetadataUtil.UTC_ZONE_OFFSET.toString(), String.valueOf(MetadataUtil.getTimestamp()));
+ instance.setMetadata(metadataMap);
+ client.registerInstance(persistentService.getServiceName(), instance);
+ result.add(new KeyValue(key, MetadataUtil.EMPTY, false));
+ }
+ return result;
+ }
+
+ private void fillEphemeralMetadata(final Map<String, String> metadataMap) {
+ int timeToLiveSeconds = nacosProps.getValue(NacosPropertyKey.TIME_TO_LIVE_SECONDS);
+ metadataMap.put(PreservedMetadataKeys.HEART_BEAT_INTERVAL, String.valueOf(timeToLiveSeconds * 1000 / 3));
+ metadataMap.put(PreservedMetadataKeys.HEART_BEAT_TIMEOUT, String.valueOf(timeToLiveSeconds * 1000 * 2 / 3));
+ metadataMap.put(PreservedMetadataKeys.IP_DELETE_TIMEOUT, String.valueOf(timeToLiveSeconds * 1000));
+ }
+
+ private void update(final Instance instance, final String value) throws NacosException, InterruptedException {
+ Map<String, String> metadataMap = instance.getMetadata();
+ String key = MetadataUtil.getKey(instance);
+ metadataMap.put(key, value);
+ metadataMap.put(MetadataUtil.UTC_ZONE_OFFSET.toString(), String.valueOf(MetadataUtil.getTimestamp()));
+ instance.setMetadata(metadataMap);
+ ServiceMetadata persistentService = serviceController.getPersistentService();
+ client.registerInstance(persistentService.getServiceName(), instance);
+ LinkedList<KeyValue> keyValues = new LinkedList<>();
+ keyValues.add(new KeyValue(key, value, instance.isEphemeral()));
+ waitValue(keyValues);
+ }
+
+ @Override
+ public void delete(final String key) {
+ try {
+ for (ServiceMetadata each : serviceController.getAllServices()) {
+ List<Instance> instances = findExisted(each.isEphemeral()).stream()
+ .filter(instance -> {
+ String fullPath = MetadataUtil.getKey(instance);
+ return fullPath.startsWith(key + PATH_SEPARATOR) || StringUtils.equals(fullPath, key);
+ })
+ .sorted(Comparator.comparing(MetadataUtil::getKey).reversed()).collect(Collectors.toList());
+ List<KeyValue> keyValues = new LinkedList<>();
+ for (Instance instance : instances) {
+ client.deregisterInstance(each.getServiceName(), instance);
+ keyValues.add(new KeyValue(MetadataUtil.getKey(instance), null, each.isEphemeral()));
+ }
+ waitValue(keyValues);
+ }
+ // CHECKSTYLE:OFF
+ } catch (Exception cause) {
+ // CHECKSTYLE:ON
+ throw new ClusterPersistRepositoryException(cause);
+ }
+ }
+
+ private List<Instance> findExisted(final String key, final boolean ephemeral) throws NacosException {
+ ServiceMetadata service = serviceController.getService(ephemeral);
+ return client.getAllInstances(service.getServiceName(), false).stream()
+ .filter(instance -> Objects.equals(key, MetadataUtil.getKey(instance))).collect(Collectors.toList());
+ }
+
+ private List<Instance> findExisted(final boolean ephemeral) throws NacosException {
+ ServiceMetadata service = serviceController.getService(ephemeral);
+ return client.getAllInstances(service.getServiceName(), false);
+ }
+
+ private void waitValue(final List<KeyValue> keyValues) throws NacosException, InterruptedException {
+ if (!isAvailable(keyValues)) {
+ long retryIntervalMilliseconds = nacosProps.getValue(NacosPropertyKey.RETRY_INTERVAL_MILLISECONDS);
+ int maxRetries = nacosProps.getValue(NacosPropertyKey.MAX_RETRIES);
+ for (int retry = 0; retry < maxRetries; retry++) {
+ Thread.sleep(getSleepTimeMs(retry, retryIntervalMilliseconds));
+ if (isAvailable(keyValues)) {
+ return;
+ }
+ }
+ throw new NacosException(NacosException.RESOURCE_NOT_FOUND, "Wait value availability timeout exceeded");
+ }
+ }
+
+ private boolean isAvailable(final List<KeyValue> keyValues) throws NacosException {
+ Map<Boolean, List<KeyValue>> keyValueMap = keyValues.stream().collect(Collectors.groupingBy(KeyValue::isEphemeral));
+ for (Map.Entry<Boolean, List<KeyValue>> entry : keyValueMap.entrySet()) {
+ ServiceMetadata service = serviceController.getService(entry.getKey());
+ Map<String, List<Instance>> instanceMap = client.getAllInstances(service.getServiceName(), false).stream()
+ .collect(Collectors.groupingBy(MetadataUtil::getKey));
+ keyValues.removeIf(keyValue -> {
+ List<Instance> instances = instanceMap.get(keyValue.getKey());
+ String value = keyValue.getValue();
+ return CollectionUtils.isNotEmpty(instances) ? instances.stream().anyMatch(instance -> StringUtils.equals(MetadataUtil.getValue(instance), value)) : Objects.isNull(value);
+ });
+ }
+ return keyValues.isEmpty();
+ }
+
+ private long getSleepTimeMs(final int retryCount, final long baseSleepTimeMs) {
+ // copied from Hadoop's RetryPolicies.java
+ return baseSleepTimeMs * Math.max(1, new Random().nextInt(1 << (retryCount + 1)));
+ }
+
+ @Override
+ public void close() {
+ try {
+ client.shutDown();
+ // CHECKSTYLE:OFF
+ } catch (Exception cause) {
+ // CHECKSTYLE:ON
+ throw new ClusterPersistRepositoryException(cause);
+ }
+ }
+
+ @Override
+ public String getType() {
+ return "Nacos";
+ }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/entity/KeyValue.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster [...]
new file mode 100644
index 00000000000..a11b5cbc526
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/entity/KeyValue.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.nacos.entity;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Key value.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class KeyValue {
+
+ private final String key;
+
+ private final String value;
+
+ private final boolean ephemeral;
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/entity/ServiceController.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingspher [...]
new file mode 100644
index 00000000000..37addf8eac3
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/entity/ServiceController.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.nacos.entity;
+
+import lombok.Getter;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Service controller.
+ */
+public final class ServiceController {
+
+ private static final String PERSISTENT_SERVICE_NAME = "PERSISTENT";
+
+ private static final String EPHEMERAL_SERVICE_NAME = "EPHEMERAL";
+
+ @Getter
+ private final ServiceMetadata persistentService = new ServiceMetadata(PERSISTENT_SERVICE_NAME, false);
+
+ @Getter
+ private final ServiceMetadata ephemeralService = new ServiceMetadata(EPHEMERAL_SERVICE_NAME, true);
+
+ private final Map<Boolean, ServiceMetadata> serviceMap = Stream.of(persistentService, ephemeralService).collect(Collectors.toMap(ServiceMetadata::isEphemeral, Function.identity()));
+
+ /**
+ * Get all services.
+ *
+ * @return all services
+ */
+ public Collection<ServiceMetadata> getAllServices() {
+ return serviceMap.values();
+ }
+
+ /**
+ * Get service.
+ *
+ * @param ephemeral is ephemeral service
+ * @return ephemeral service or persistent service
+ */
+ public ServiceMetadata getService(final boolean ephemeral) {
+ return serviceMap.get(ephemeral);
+ }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/entity/ServiceMetadata.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere- [...]
new file mode 100644
index 00000000000..5dd69dcf90a
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/entity/ServiceMetadata.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.nacos.entity;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import org.apache.shardingsphere.mode.repository.cluster.nacos.listener.NamingEventListener;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Service metadata.
+ */
+@RequiredArgsConstructor
+public final class ServiceMetadata {
+
+ @Getter
+ private final String serviceName;
+
+ @Getter
+ @Setter
+ private String ip;
+
+ @Setter
+ private AtomicInteger port;
+
+ @Getter
+ @Setter
+ private NamingEventListener listener;
+
+ @Getter
+ private final boolean ephemeral;
+
+ /**
+ * Get incremental port.
+ *
+ * @return incremental port
+ */
+ public int getPort() {
+ int result = port.incrementAndGet();
+ if (result == Integer.MIN_VALUE) {
+ throw new IllegalStateException("Specified cluster ip exceeded the maximum number of persisting");
+ }
+ return result;
+ }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/listener/NamingEventListener.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardings [...]
new file mode 100644
index 00000000000..c2b3e97984b
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/listener/NamingEventListener.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.nacos.listener;
+
+import com.alibaba.nacos.api.naming.listener.Event;
+import com.alibaba.nacos.api.naming.listener.EventListener;
+import com.alibaba.nacos.api.naming.listener.NamingEvent;
+import com.alibaba.nacos.api.naming.pojo.Instance;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import org.apache.shardingsphere.mode.repository.cluster.nacos.utils.MetadataUtil;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Naming event listener.
+ */
+public final class NamingEventListener implements EventListener {
+
+ private Map<String, Instance> preInstances = new HashMap<>();
+
+ private final Map<String, DataChangedEventListener> prefixListenerMap = new HashMap<>();
+
+ @Override
+ public void onEvent(final Event event) {
+ if (event instanceof NamingEvent) {
+ NamingEvent namingEvent = (NamingEvent) event;
+ List<Instance> instances = namingEvent.getInstances().stream().sorted(Comparator.comparing(MetadataUtil::getKey)).collect(Collectors.toList());
+ List<WatchData> watchDataList = new LinkedList<>();
+ synchronized (this) {
+ instances.forEach(instance -> prefixListenerMap.forEach((prefixPath, listener) -> {
+ String key = MetadataUtil.getKey(instance);
+ if (key.startsWith(prefixPath)) {
+ Instance preInstance = preInstances.remove(key);
+ WatchData watchData = new WatchData(key, preInstance, instance, listener);
+ watchDataList.add(watchData);
+ }
+ }));
+ preInstances.values().stream().sorted(Comparator.comparing(MetadataUtil::getKey).reversed()).forEach(instance -> prefixListenerMap.forEach((prefixPath, listener) -> {
+ String key = MetadataUtil.getKey(instance);
+ if (key.startsWith(prefixPath)) {
+ WatchData watchData = new WatchData(key, instance, null, listener);
+ watchDataList.add(watchData);
+ }
+ }));
+ watchDataList.forEach(watchData -> {
+ String key = watchData.getKey();
+ Instance preInstance = watchData.getPreInstance();
+ Instance instance = watchData.getInstance();
+ DataChangedEventListener listener = watchData.getListener();
+ DataChangedEvent.Type changedType = getEventChangedType(preInstance, instance);
+ switch (changedType) {
+ case ADDED:
+ case UPDATED:
+ listener.onChange(new DataChangedEvent(key, MetadataUtil.getValue(instance), changedType));
+ break;
+ case DELETED:
+ listener.onChange(new DataChangedEvent(key, MetadataUtil.getValue(preInstance), changedType));
+ break;
+ default:
+ }
+ });
+ setPreInstances(instances);
+ }
+ }
+ }
+
+ private DataChangedEvent.Type getEventChangedType(final Instance preInstance, final Instance instance) {
+ DataChangedEvent.Type result;
+ if (Objects.isNull(preInstance) && Objects.nonNull(instance)) {
+ result = DataChangedEvent.Type.ADDED;
+ } else if (Objects.nonNull(preInstance) && Objects.nonNull(instance)
+ && MetadataUtil.getTimestamp(preInstance) != MetadataUtil.getTimestamp(instance)) {
+ result = DataChangedEvent.Type.UPDATED;
+ } else if (Objects.nonNull(preInstance) && Objects.isNull(instance)) {
+ result = DataChangedEvent.Type.DELETED;
+ } else {
+ return DataChangedEvent.Type.IGNORED;
+ }
+ return result;
+ }
+
+ /**
+ * Update preInstances.
+ *
+ * @param instances instances
+ */
+ public void setPreInstances(final List<Instance> instances) {
+ this.preInstances = instances.stream().filter(instance -> {
+ for (String prefixPath : prefixListenerMap.keySet()) {
+ String key = MetadataUtil.getKey(instance);
+ if (key.startsWith(prefixPath)) {
+ return true;
+ }
+ }
+ return false;
+ }).collect(Collectors.toMap(MetadataUtil::getKey, Function.identity(), (a, b) -> MetadataUtil.getTimestamp(a) > MetadataUtil.getTimestamp(b) ? a : b));
+ }
+
+ /**
+ * Put prefixPath and listener.
+ *
+ * @param prefixPath prefixPath
+ * @param listener listener
+ */
+ public synchronized void put(final String prefixPath, final DataChangedEventListener listener) {
+ prefixListenerMap.put(prefixPath, listener);
+ }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/listener/WatchData.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-clus [...]
new file mode 100644
index 00000000000..939bb037a9c
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/listener/WatchData.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.nacos.listener;
+
+import com.alibaba.nacos.api.naming.pojo.Instance;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+
+/**
+ * Watch data.
+ */
+@Getter
+@RequiredArgsConstructor
+public final class WatchData {
+
+ private final String key;
+
+ private final Instance preInstance;
+
+ private final Instance instance;
+
+ private final DataChangedEventListener listener;
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/props/NacosProperties.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-c [...]
new file mode 100644
index 00000000000..9c819117028
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/props/NacosProperties.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.nacos.props;
+
+import org.apache.shardingsphere.infra.util.props.TypedProperties;
+
+import java.util.Properties;
+
+/**
+ * Typed properties of Nacos.
+ */
+public final class NacosProperties extends TypedProperties<NacosPropertyKey> {
+
+ public NacosProperties(final Properties props) {
+ super(NacosPropertyKey.class, props);
+ }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/props/NacosPropertyKey.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere- [...]
new file mode 100644
index 00000000000..f0e310c9ab5
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/props/NacosPropertyKey.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.nacos.props;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.util.props.TypedPropertyKey;
+
+/**
+ * Typed property key of Nacos.
+ */
+@RequiredArgsConstructor
+@Getter
+public enum NacosPropertyKey implements TypedPropertyKey {
+
+ /**
+ * Cluster ip.
+ */
+ CLUSTER_IP("clusterIp", "", String.class),
+
+ /**
+ * Retry interval milliseconds when checking whether value is available.
+ */
+ RETRY_INTERVAL_MILLISECONDS("retryIntervalMilliseconds", String.valueOf(500), long.class),
+
+ /**
+ * Max Retry times when checking whether value is available.
+ */
+ MAX_RETRIES("maxRetries", String.valueOf(3), int.class),
+
+ /**
+ * Time to live seconds.
+ */
+ TIME_TO_LIVE_SECONDS("timeToLiveSeconds", String.valueOf(30), int.class);
+
+ private final String key;
+
+ private final String defaultValue;
+
+ private final Class<?> type;
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/utils/MetadataUtil.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-clus [...]
new file mode 100644
index 00000000000..c2056759861
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/utils/MetadataUtil.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.nacos.utils;
+
+import com.alibaba.nacos.api.exception.NacosException;
+import com.alibaba.nacos.api.naming.PreservedMetadataKeys;
+import com.alibaba.nacos.api.naming.pojo.Instance;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.SneakyThrows;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+
+/**
+ * Metadata util.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class MetadataUtil {
+
+ public static final String EMPTY = "";
+
+ public static final ZoneOffset UTC_ZONE_OFFSET = ZoneOffset.of("+8");
+
+ /**
+ * Get timestamp.
+ *
+ * @param instance instance
+ * @return timestamp
+ */
+ @SneakyThrows
+ public static long getTimestamp(final Instance instance) {
+ return Long.parseLong(instance.getMetadata().get(UTC_ZONE_OFFSET.toString()));
+ }
+
+ /**
+ * Get timestamp.
+ *
+ * @return timeStamp
+ */
+ public static long getTimestamp() {
+ return LocalDateTime.now().toInstant(UTC_ZONE_OFFSET).toEpochMilli();
+ }
+
+ /**
+ * Get value.
+ *
+ * @param instance instance
+ * @return value
+ */
+ public static String getValue(final Instance instance) {
+ return instance.getMetadata().get(getKey(instance));
+ }
+
+ /**
+ * Get key.
+ *
+ * @param instance instance
+ * @return key
+ */
+ @SneakyThrows
+ public static String getKey(final Instance instance) {
+ return instance.getMetadata().keySet().stream()
+ .filter(entryKey -> !entryKey.equals(PreservedMetadataKeys.HEART_BEAT_INTERVAL)
+ && !entryKey.equals(PreservedMetadataKeys.HEART_BEAT_TIMEOUT)
+ && !entryKey.equals(PreservedMetadataKeys.IP_DELETE_TIMEOUT)
+ && !entryKey.equals(UTC_ZONE_OFFSET.toString()))
+ .findFirst().orElseThrow(() -> new NacosException(NacosException.RESOURCE_NOT_FOUND, "Failed to find key "));
+ }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/s [...]
new file mode 100644
index 00000000000..7ab9d965fcb
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.mode.repository.cluster.nacos.NacosRepository
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/test/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepositoryTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-clu [...]
new file mode 100644
index 00000000000..113cac1c176
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/test/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepositoryTest.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.nacos;
+
+import com.alibaba.nacos.api.exception.NacosException;
+import com.alibaba.nacos.api.naming.NamingService;
+import com.alibaba.nacos.api.naming.PreservedMetadataKeys;
+import com.alibaba.nacos.api.naming.listener.Event;
+import com.alibaba.nacos.api.naming.listener.EventListener;
+import com.alibaba.nacos.api.naming.listener.NamingEvent;
+import com.alibaba.nacos.api.naming.pojo.Instance;
+import com.alibaba.nacos.common.utils.StringUtils;
+import com.google.common.util.concurrent.SettableFuture;
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.mode.persist.PersistRepository;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryException;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.apache.shardingsphere.mode.repository.cluster.nacos.entity.ServiceController;
+import org.apache.shardingsphere.mode.repository.cluster.nacos.entity.ServiceMetadata;
+import org.apache.shardingsphere.mode.repository.cluster.nacos.props.NacosProperties;
+import org.apache.shardingsphere.mode.repository.cluster.nacos.props.NacosPropertyKey;
+import org.apache.shardingsphere.mode.repository.cluster.nacos.utils.MetadataUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.AdditionalAnswers;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.internal.configuration.plugins.Plugins;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.plugins.MemberAccessor;
+import org.mockito.stubbing.VoidAnswer2;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class NacosRepositoryTest {
+
+ private static final NacosRepository REPOSITORY = new NacosRepository();
+
+ @Mock
+ private NamingService client;
+
+ private ServiceController serviceController;
+
+ @Before
+ @SneakyThrows(Exception.class)
+ public void initClient() {
+ MemberAccessor accessor = Plugins.getMemberAccessor();
+ accessor.set(REPOSITORY.getClass().getDeclaredField("nacosProps"), REPOSITORY, new NacosProperties(new Properties()));
+ accessor.set(REPOSITORY.getClass().getDeclaredField("client"), REPOSITORY, client);
+ accessor.invoke(REPOSITORY.getClass().getDeclaredMethod("initServiceMetadata"), REPOSITORY);
+ serviceController = (ServiceController) accessor.get(REPOSITORY.getClass().getDeclaredField("serviceController"), REPOSITORY);
+ }
+
+ @Test
+ @SneakyThrows
+ public void assertGetLatestKey() {
+ int total = 2;
+ String key = "/test/children/keys/persistent/1";
+ List<Instance> instances = new LinkedList<>();
+ for (int count = 1; count <= total; count++) {
+ Instance instance = new Instance();
+ final Map<String, String> metadataMap = new HashMap<>(2, 1);
+ metadataMap.put(key, "value" + count);
+ metadataMap.put(MetadataUtil.UTC_ZONE_OFFSET.toString(), String.valueOf(count));
+ instance.setMetadata(metadataMap);
+ instances.add(instance);
+ }
+ ServiceMetadata persistentService = serviceController.getPersistentService();
+ when(client.getAllInstances(persistentService.getServiceName(), false)).thenReturn(instances);
+ String value = REPOSITORY.get(key);
+ assertThat(value, is("value2"));
+ }
+
+ @Test
+ @SneakyThrows
+ public void assertGetChildrenKeys() {
+ Instance instance = new Instance();
+ String key = "/test/children/keys/persistent/0";
+ instance.setMetadata(Collections.singletonMap(key, "value0"));
+ ServiceMetadata persistentService = serviceController.getPersistentService();
+ when(client.getAllInstances(persistentService.getServiceName(), false)).thenReturn(Collections.singletonList(instance));
+ instance = new Instance();
+ key = "/test/children/keys/ephemeral/0";
+ instance.setMetadata(Collections.singletonMap(key, "value0"));
+ ServiceMetadata ephemeralService = serviceController.getEphemeralService();
+ when(client.getAllInstances(ephemeralService.getServiceName(), false)).thenReturn(Collections.singletonList(instance));
+ List<String> childrenKeys = REPOSITORY.getChildrenKeys("/test/children/keys");
+ assertThat(childrenKeys.size(), is(2));
+ assertThat(childrenKeys.get(0), is("persistent"));
+ assertThat(childrenKeys.get(1), is("ephemeral"));
+ }
+
+ @Test
+ @SneakyThrows
+ public void assertPersistNotExistKey() {
+ String key = "/test/children/keys/persistent/1";
+ doAnswer(AdditionalAnswers.answerVoid(getRegisterInstanceAnswer())).when(client).registerInstance(anyString(), any(Instance.class));
+ REPOSITORY.persist(key, "value4");
+ ArgumentCaptor<Instance> instanceArgumentCaptor = ArgumentCaptor.forClass(Instance.class);
+ ArgumentCaptor<String> stringArgumentCaptor = ArgumentCaptor.forClass(String.class);
+ verify(client, times(5)).registerInstance(stringArgumentCaptor.capture(), instanceArgumentCaptor.capture());
+ Instance registerInstance = instanceArgumentCaptor.getValue();
+ String registerType = stringArgumentCaptor.getValue();
+ ServiceMetadata persistentService = serviceController.getPersistentService();
+ assertThat(registerType, is(persistentService.getServiceName()));
+ assertThat(registerInstance.isEphemeral(), is(false));
+ assertThat(MetadataUtil.getValue(registerInstance), is("value4"));
+ }
+
+ @Test
+ @SneakyThrows
+ public void assertPersistExistKey() {
+ String ip = "127.0.0.1";
+ Instance instance = new Instance();
+ instance.setIp(ip);
+ instance.setEphemeral(false);
+ String key = "/test/children/keys/persistent/0";
+ Map<String, String> metadataMap = new HashMap<>(1, 1);
+ metadataMap.put(key, "value0");
+ instance.setMetadata(metadataMap);
+ List<Instance> instances = new LinkedList<>();
+ buildParentPath(key, instances);
+ instances.add(instance);
+ ServiceMetadata persistentService = serviceController.getPersistentService();
+ when(client.getAllInstances(persistentService.getServiceName(), false)).thenReturn(instances);
+ doAnswer(AdditionalAnswers.answerVoid(getRegisterInstanceAnswer())).when(client).registerInstance(anyString(), any(Instance.class));
+ REPOSITORY.persist(key, "value4");
+ ArgumentCaptor<Instance> instanceArgumentCaptor = ArgumentCaptor.forClass(Instance.class);
+ ArgumentCaptor<String> stringArgumentCaptor = ArgumentCaptor.forClass(String.class);
+ verify(client).registerInstance(stringArgumentCaptor.capture(), instanceArgumentCaptor.capture());
+ Instance registerInstance = instanceArgumentCaptor.getValue();
+ String registerType = stringArgumentCaptor.getValue();
+ assertThat(registerType, is(persistentService.getServiceName()));
+ assertThat(registerInstance.getIp(), is(ip));
+ assertThat(registerInstance.isEphemeral(), is(false));
+ assertThat(MetadataUtil.getValue(registerInstance), is("value4"));
+ }
+
+ @Test
+ @SneakyThrows
+ public void assertPersistEphemeralExistKey() {
+ final String key = "/test/children/keys/ephemeral/1";
+ final Instance instance = new Instance();
+ instance.setEphemeral(true);
+ Map<String, String> metadataMap = new HashMap<>(4, 1);
+ metadataMap.put(PreservedMetadataKeys.HEART_BEAT_INTERVAL, String.valueOf(2000));
+ metadataMap.put(PreservedMetadataKeys.HEART_BEAT_TIMEOUT, String.valueOf(4000));
+ metadataMap.put(PreservedMetadataKeys.IP_DELETE_TIMEOUT, String.valueOf(6000));
+ metadataMap.put(key, "value0");
+ instance.setMetadata(metadataMap);
+ List<Instance> instances = new LinkedList<>();
+ buildParentPath(key, instances);
+ ServiceMetadata persistentService = serviceController.getPersistentService();
+ when(client.getAllInstances(persistentService.getServiceName(), false)).thenReturn(instances);
+ instances = new LinkedList<>();
+ instances.add(instance);
+ ServiceMetadata ephemeralService = serviceController.getEphemeralService();
+ when(client.getAllInstances(ephemeralService.getServiceName(), false)).thenReturn(instances);
+ doAnswer(AdditionalAnswers.answerVoid(getDeregisterInstanceAnswer())).when(client).deregisterInstance(anyString(), any(Instance.class));
+ doAnswer(AdditionalAnswers.answerVoid(getRegisterInstanceAnswer())).when(client).registerInstance(anyString(), any(Instance.class));
+ REPOSITORY.persistEphemeral(key, "value4");
+ ArgumentCaptor<Instance> instanceArgumentCaptor = ArgumentCaptor.forClass(Instance.class);
+ ArgumentCaptor<String> stringArgumentCaptor = ArgumentCaptor.forClass(String.class);
+ verify(client).deregisterInstance(anyString(), any(Instance.class));
+ verify(client).registerInstance(stringArgumentCaptor.capture(), instanceArgumentCaptor.capture());
+ Instance registerInstance = instanceArgumentCaptor.getValue();
+ String registerType = stringArgumentCaptor.getValue();
+ assertThat(registerType, is(ephemeralService.getServiceName()));
+ assertThat(registerInstance.isEphemeral(), is(true));
+ assertThat(MetadataUtil.getValue(registerInstance), is("value4"));
+ Map<String, String> metadata = registerInstance.getMetadata();
+ long timeToLiveSeconds = Long.parseLong(NacosPropertyKey.TIME_TO_LIVE_SECONDS.getDefaultValue());
+ assertThat(metadata.get(PreservedMetadataKeys.HEART_BEAT_INTERVAL), is(String.valueOf(timeToLiveSeconds * 1000 / 3)));
+ assertThat(metadata.get(PreservedMetadataKeys.HEART_BEAT_TIMEOUT), is(String.valueOf(timeToLiveSeconds * 1000 * 2 / 3)));
+ assertThat(metadata.get(PreservedMetadataKeys.IP_DELETE_TIMEOUT), is(String.valueOf(timeToLiveSeconds * 1000)));
+ }
+
+ private void buildParentPath(final String key, final List<Instance> instances) {
+ StringBuilder parentPath = new StringBuilder();
+ final String[] partPath = key.split(PersistRepository.PATH_SEPARATOR);
+ for (int index = 1; index < partPath.length - 1; index++) {
+ parentPath.append(PersistRepository.PATH_SEPARATOR);
+ parentPath.append(partPath[index]);
+ String path = parentPath.toString();
+ Instance instance = new Instance();
+ instance.setEphemeral(false);
+ instance.setMetadata(Collections.singletonMap(path, ""));
+ instances.add(instance);
+ }
+ }
+
+ @Test
+ @SneakyThrows
+ public void assertPersistEphemeralNotExistKey() {
+ String key = "/test/children/keys/ephemeral/0";
+ doAnswer(AdditionalAnswers.answerVoid(getRegisterInstanceAnswer())).when(client).registerInstance(anyString(), any(Instance.class));
+ REPOSITORY.persistEphemeral(key, "value0");
+ ArgumentCaptor<Instance> instanceArgumentCaptor = ArgumentCaptor.forClass(Instance.class);
+ ArgumentCaptor<String> stringArgumentCaptor = ArgumentCaptor.forClass(String.class);
+ verify(client, times(5)).registerInstance(stringArgumentCaptor.capture(), instanceArgumentCaptor.capture());
+ Instance registerInstance = instanceArgumentCaptor.getValue();
+ String registerType = stringArgumentCaptor.getValue();
+ ServiceMetadata ephemeralService = serviceController.getEphemeralService();
+ assertThat(registerType, is(ephemeralService.getServiceName()));
+ assertThat(registerInstance.isEphemeral(), is(true));
+ assertThat(MetadataUtil.getValue(registerInstance), is("value0"));
+ Map<String, String> metadata = registerInstance.getMetadata();
+ long timeToLiveSeconds = Long.parseLong(NacosPropertyKey.TIME_TO_LIVE_SECONDS.getDefaultValue());
+ assertThat(metadata.get(PreservedMetadataKeys.HEART_BEAT_INTERVAL), is(String.valueOf(timeToLiveSeconds * 1000 / 3)));
+ assertThat(metadata.get(PreservedMetadataKeys.HEART_BEAT_TIMEOUT), is(String.valueOf(timeToLiveSeconds * 1000 * 2 / 3)));
+ assertThat(metadata.get(PreservedMetadataKeys.IP_DELETE_TIMEOUT), is(String.valueOf(timeToLiveSeconds * 1000)));
+ }
+
+ @Test
+ @SneakyThrows
+ public void assertDeleteExistKey() {
+ int total = 3;
+ List<Instance> instances = new LinkedList<>();
+ for (int count = 1; count <= total; count++) {
+ String key = "/test/children/keys/ephemeral/" + count;
+ Instance instance = new Instance();
+ instance.setEphemeral(true);
+ instance.setMetadata(Collections.singletonMap(key, "value" + count));
+ instances.add(instance);
+ }
+ ServiceMetadata ephemeralService = serviceController.getEphemeralService();
+ when(client.getAllInstances(ephemeralService.getServiceName(), false)).thenReturn(instances);
+ instances = new LinkedList<>();
+ String key = "/test/children/keys/persistent/0";
+ Instance instance = new Instance();
+ instance.setEphemeral(false);
+ instance.setMetadata(Collections.singletonMap(key, "value0"));
+ instances.add(instance);
+ ServiceMetadata persistentService = serviceController.getPersistentService();
+ when(client.getAllInstances(persistentService.getServiceName(), false)).thenReturn(instances);
+ doAnswer(AdditionalAnswers.answerVoid(getDeregisterInstanceAnswer())).when(client).deregisterInstance(anyString(), any(Instance.class));
+ REPOSITORY.delete("/test/children/keys");
+ verify(client, times(4)).deregisterInstance(anyString(), any(Instance.class));
+ }
+
+ @Test
+ @SneakyThrows
+ public void assertDeleteNotExistKey() {
+ REPOSITORY.delete("/test/children/keys/persistent/1");
+ verify(client, times(0)).deregisterInstance(anyString(), any(Instance.class));
+ }
+
+ @Test
+ @SneakyThrows
+ public void assertWatchAdded() {
+ ServiceMetadata ephemeralService = serviceController.getEphemeralService();
+ ephemeralService.setListener(null);
+ String key = "key/key";
+ String value = "value2";
+ Instance instance = new Instance();
+ instance.setMetadata(Collections.singletonMap(key, value));
+ Event event = new NamingEvent(ephemeralService.getServiceName(), Collections.singletonList(instance));
+ doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(null, event))).when(client).subscribe(anyString(), any(EventListener.class));
+ SettableFuture<DataChangedEvent> settableFuture = SettableFuture.create();
+ REPOSITORY.watch(key, settableFuture::set);
+ DataChangedEvent dataChangedEvent = settableFuture.get();
+ assertThat(dataChangedEvent.getType(), is(DataChangedEvent.Type.ADDED));
+ assertThat(dataChangedEvent.getKey(), is(key));
+ assertThat(dataChangedEvent.getValue(), is(value));
+ }
+
+ @Test
+ @SneakyThrows
+ public void assertWatchUpdate() {
+ ServiceMetadata persistentService = serviceController.getPersistentService();
+ persistentService.setListener(null);
+ String key = "key/key";
+ long epochMilliseconds = MetadataUtil.getTimestamp();
+ Instance preInstance = new Instance();
+ Map<String, String> metadataMap = new HashMap<>();
+ metadataMap.put(key, "value1");
+ metadataMap.put(MetadataUtil.UTC_ZONE_OFFSET.toString(), String.valueOf(epochMilliseconds));
+ preInstance.setMetadata(metadataMap);
+ final Instance instance = new Instance();
+ metadataMap = new HashMap<>();
+ metadataMap.put(key, "value2");
+ metadataMap.put(MetadataUtil.UTC_ZONE_OFFSET.toString(), String.valueOf(epochMilliseconds + 1));
+ instance.setMetadata(metadataMap);
+ Event event = new NamingEvent(persistentService.getServiceName(), Collections.singletonList(instance));
+ doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(preInstance, event))).when(client).subscribe(anyString(), any(EventListener.class));
+ SettableFuture<DataChangedEvent> settableFuture = SettableFuture.create();
+ REPOSITORY.watch(key, settableFuture::set);
+ DataChangedEvent dataChangedEvent = settableFuture.get();
+ assertThat(dataChangedEvent.getType(), is(DataChangedEvent.Type.UPDATED));
+ assertThat(dataChangedEvent.getKey(), is(key));
+ assertThat(dataChangedEvent.getValue(), is("value2"));
+ }
+
+ @Test
+ @SneakyThrows
+ public void assertWatchDelete() {
+ ServiceMetadata persistentService = serviceController.getPersistentService();
+ persistentService.setListener(null);
+ String key = "key/key";
+ Instance preInstance = new Instance();
+ preInstance.setMetadata(Collections.singletonMap(key, "value1"));
+ Event event = new NamingEvent(persistentService.getServiceName(), Collections.emptyList());
+ doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(preInstance, event))).when(client).subscribe(anyString(), any(EventListener.class));
+ SettableFuture<DataChangedEvent> settableFuture = SettableFuture.create();
+ REPOSITORY.watch(key, settableFuture::set);
+ DataChangedEvent dataChangedEvent = settableFuture.get();
+ assertThat(dataChangedEvent.getType(), is(DataChangedEvent.Type.DELETED));
+ assertThat(dataChangedEvent.getKey(), is(key));
+ assertThat(dataChangedEvent.getValue(), is("value1"));
+ }
+
+ @Test
+ @SneakyThrows
+ public void assertClose() {
+ REPOSITORY.close();
+ verify(client).shutDown();
+ }
+
+ @Test(expected = NacosException.class)
+ @SneakyThrows
+ public void assertPersistNotAvailable() {
+ try {
+ REPOSITORY.persist("/test/children/keys/persistent/1", "value4");
+ } catch (ClusterPersistRepositoryException cause) {
+ throw cause.getCause();
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ @SneakyThrows
+ public void assertExceededMaximum() {
+ ServiceMetadata ephemeralService = serviceController.getEphemeralService();
+ ephemeralService.setPort(new AtomicInteger(Integer.MAX_VALUE));
+ try {
+ REPOSITORY.persistEphemeral("/key2", "value");
+ } catch (ClusterPersistRepositoryException cause) {
+ throw cause.getCause();
+ }
+ }
+
+ private VoidAnswer2<String, EventListener> getListenerAnswer(final Instance preInstance, final Event event) {
+ return (serviceName, listener) -> {
+ MemberAccessor accessor = Plugins.getMemberAccessor();
+ if (Objects.nonNull(preInstance)) {
+ Map<String, Instance> preInstances = new HashMap<>();
+ preInstances.put(MetadataUtil.getKey(preInstance), preInstance);
+ accessor.set(listener.getClass().getDeclaredField("preInstances"), listener, preInstances);
+ }
+ listener.onEvent(event);
+ };
+ }
+
+ private VoidAnswer2<String, Instance> getRegisterInstanceAnswer() {
+ return (serviceName, instance) -> {
+ List<Instance> instances = client.getAllInstances(serviceName, false);
+ instances.removeIf(each -> StringUtils.equals(each.getIp(), instance.getIp()) && each.getPort() == instance.getPort());
+ instances.add(instance);
+ when(client.getAllInstances(serviceName, false)).thenReturn(instances);
+ };
+ }
+
+ private VoidAnswer2<String, Instance> getDeregisterInstanceAnswer() {
+ return (serviceName, instance) -> {
+ List<Instance> instances = client.getAllInstances(serviceName, false);
+ instances.remove(instance);
+ when(client.getAllInstances(serviceName, false)).thenReturn(instances);
+ };
+ }
+
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/test/java/org/apache/shardingsphere/mode/repository/cluster/nacos/props/NacosPropertiesTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphe [...]
new file mode 100644
index 00000000000..a393b24bb3c
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-nacos/src/test/java/org/apache/shardingsphere/mode/repository/cluster/nacos/props/NacosPropertiesTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.nacos.props;
+
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public final class NacosPropertiesTest {
+
+ @Test
+ public void assertGetValue() {
+ NacosProperties actual = new NacosProperties(createProperties());
+ assertThat(actual.getValue(NacosPropertyKey.CLUSTER_IP), is("127.0.0.1"));
+ assertThat(actual.getValue(NacosPropertyKey.RETRY_INTERVAL_MILLISECONDS), is(1000L));
+ assertThat(actual.getValue(NacosPropertyKey.MAX_RETRIES), is(5));
+ assertThat(actual.getValue(NacosPropertyKey.TIME_TO_LIVE_SECONDS), is(60));
+ }
+
+ private Properties createProperties() {
+ Properties result = new Properties();
+ result.setProperty(NacosPropertyKey.CLUSTER_IP.getKey(), "127.0.0.1");
+ result.setProperty(NacosPropertyKey.RETRY_INTERVAL_MILLISECONDS.getKey(), "1000");
+ result.setProperty(NacosPropertyKey.MAX_RETRIES.getKey(), "5");
+ result.setProperty(NacosPropertyKey.TIME_TO_LIVE_SECONDS.getKey(), "60");
+ return result;
+ }
+
+ @Test
+ public void assertGetDefaultValue() {
+ NacosProperties actual = new NacosProperties(new Properties());
+ assertThat(actual.getValue(NacosPropertyKey.CLUSTER_IP), is(""));
+ assertThat(actual.getValue(NacosPropertyKey.RETRY_INTERVAL_MILLISECONDS), is(500L));
+ assertThat(actual.getValue(NacosPropertyKey.MAX_RETRIES), is(3));
+ assertThat(actual.getValue(NacosPropertyKey.TIME_TO_LIVE_SECONDS), is(30));
+ }
+}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/pom.xml b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/pom.xml
index d0b3f8b514c..3e1086e1fe2 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/pom.xml
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/pom.xml
@@ -93,6 +93,11 @@
<artifactId>shardingsphere-cluster-mode-repository-etcd</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-cluster-mode-repository-nacos</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core</artifactId>