You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/10/03 15:19:25 UTC
[shardingsphere] branch master updated: Cluster mode repository support consul register center (#21331)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a11de40aff0 Cluster mode repository support consul register center (#21331)
a11de40aff0 is described below
commit a11de40aff052d9cb760ffbbe2b47e094905a194
Author: GavinPeng <ro...@163.com>
AuthorDate: Mon Oct 3 23:19:18 2022 +0800
Cluster mode repository support consul register center (#21331)
* support consul repository
* one: support reentrant lock,two :add test code
* api support 5.2.1 snapshot
* remove java doc author and date
Co-authored-by: gavin.peng <ga...@ximalaya.com>
---
.../shardingsphere-mode-type/pom.xml | 1 +
.../pom.xml | 1 +
.../pom.xml | 39 +++
.../cluster/consul/ConsulRepository.java | 200 ++++++++++++
.../cluster/consul/ShardingSphereConsulClient.java | 42 +++
.../cluster/consul/ShardingSphereQueryParams.java | 89 ++++++
.../consul/lock/ConsulInternalLockProvider.java | 336 +++++++++++++++++++++
.../cluster/consul/props/ConsulProperties.java | 31 ++
.../cluster/consul/props/ConsulPropertyKey.java | 51 ++++
...ode.repository.cluster.ClusterPersistRepository | 18 ++
.../cluster/consul/ConsulRepositoryTest.java | 223 ++++++++++++++
.../cluster/consul/props/ConsulPropertiesTest.java | 44 +++
12 files changed, 1075 insertions(+)
diff --git a/shardingsphere-mode/shardingsphere-mode-type/pom.xml b/shardingsphere-mode/shardingsphere-mode-type/pom.xml
index 07a8225577d..da2ae9a9658 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/pom.xml
+++ b/shardingsphere-mode/shardingsphere-mode-type/pom.xml
@@ -31,5 +31,6 @@
<modules>
<module>shardingsphere-standalone-mode</module>
<module>shardingsphere-cluster-mode</module>
+ <!-- <module>shardingsphere-cluster-mode-repository-consul</module>-->
</modules>
</project>
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/pom.xml b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/pom.xml
index c001e33eb34..bd9cd5cd3ad 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/pom.xml
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/pom.xml
@@ -32,5 +32,6 @@
<module>shardingsphere-cluster-mode-repository-zookeeper-curator</module>
<module>shardingsphere-cluster-mode-repository-etcd</module>
<module>shardingsphere-cluster-mode-repository-nacos</module>
+ <module>shardingsphere-cluster-mode-repository-consul</module>
</modules>
</project>
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/pom.xml b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/pom.xml
new file mode 100644
index 00000000000..4e789d80b23
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-cluster-mode-repository-provider</artifactId>
+ <version>5.2.1-SNAPSHOT</version>
+ </parent>
+ <artifactId>shardingsphere-cluster-mode-repository-consul</artifactId>
+ <name>${project.artifactId}</name>
+
+ <properties>
+ <consul.api.version>1.4.1</consul.api.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-cluster-mode-repository-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.ecwid.consul</groupId>
+ <artifactId>consul-api</artifactId>
+ <version>${consul.api.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5.5</version>
+ <scope>compile</scope>
+ </dependency>
+
+ </dependencies>
+</project>
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-clus [...]
new file mode 100644
index 00000000000..0d9c171d006
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.consul;
+
+import com.ecwid.consul.v1.ConsulRawClient;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.kv.model.GetValue;
+import com.ecwid.consul.v1.kv.model.PutParams;
+import com.ecwid.consul.v1.session.model.NewSession;
+import com.ecwid.consul.v1.session.model.Session;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
+import org.apache.shardingsphere.mode.repository.cluster.consul.lock.ConsulInternalLockProvider;
+import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
+import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulPropertyKey;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import org.apache.shardingsphere.mode.repository.cluster.lock.InternalLock;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Map;
+import java.util.List;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Registry repository of Consul.
+ */
+public class ConsulRepository implements ClusterPersistRepository {
+
+ private ShardingSphereConsulClient consulClient;
+
+ private ConsulInternalLockProvider consulInternalLockProvider;
+
+ private ConsulProperties consulProperties;
+
+ private Map<String, Set<String>> watchKeyMap;
+
+ @Override
+ public void init(final ClusterPersistRepositoryConfiguration config) {
+ this.consulClient = new ShardingSphereConsulClient(new ConsulRawClient(config.getServerLists()));
+ this.consulProperties = new ConsulProperties(config.getProps());
+ this.consulInternalLockProvider = new ConsulInternalLockProvider(this.consulClient, this.consulProperties);
+ this.watchKeyMap = new HashMap<String, Set<String>>(6);
+ }
+
+ @Override
+ public String get(final String key) {
+ Response<GetValue> response = this.consulClient.getKVValue(key);
+ return response != null ? response.getValue().getValue() : null;
+ }
+
+ @Override
+ public List<String> getChildrenKeys(final String key) {
+ Response<List<String>> response = this.consulClient.getKVKeysOnly(key);
+ return response != null ? response.getValue() : Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public void persist(final String key, final String value) {
+ this.consulClient.setKVValue(key, value);
+ }
+
+ @Override
+ public void delete(final String key) {
+ this.consulClient.deleteKVValue(key);
+ }
+
+ @Override
+ public void close() {
+ // this.consulClien
+ // this.consulClient.
+ }
+
+ @Override
+ public void persistEphemeral(final String key, final String value) {
+ NewSession session = new NewSession();
+ session.setName(key);
+ session.setBehavior(Session.Behavior.DELETE);
+ session.setTtl(this.consulProperties.getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS));
+ Response<String> response = this.consulClient.sessionCreate(session, QueryParams.DEFAULT);
+ final String sessionId = response.getValue();
+ PutParams putParams = new PutParams();
+ putParams.setAcquireSession(sessionId);
+ this.consulClient.setKVValue(key, value, putParams);
+ this.consulInternalLockProvider.generatorFlushSessionTtlTask(this.consulClient, sessionId);
+ }
+
+ @Override
+ public void persistExclusiveEphemeral(final String key, final String value) {
+ this.persistEphemeral(key, value);
+ }
+
+ @Override
+ public boolean tryLock(final String lockKey, final long timeoutMillis) {
+ InternalLock lock = this.consulInternalLockProvider.getInternalMutexLock(lockKey);
+ return lock.tryLock(timeoutMillis);
+ }
+
+ @Override
+ public void unlock(final String lockKey) {
+ InternalLock lock = this.consulInternalLockProvider.getInternalMutexLock(lockKey);
+ lock.unlock();
+ }
+
+ @Override
+ public void watch(final String key, final DataChangedEventListener listener) {
+ Thread watchThread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ watchChildKeyChangeEvent(key, listener);
+ }
+ });
+ watchThread.setDaemon(true);
+ watchThread.start();
+ }
+
+ private void watchChildKeyChangeEvent(final String key, final DataChangedEventListener listener) {
+ AtomicBoolean running = new AtomicBoolean(true);
+ long currentIndex = 0;
+ while (running.get()) {
+ Response<List<GetValue>> response = consulClient.getKVValues(key,
+ new QueryParams(consulProperties.getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS), currentIndex));
+ Long index = response.getConsulIndex();
+ if (index != null && currentIndex == 0) {
+ currentIndex = index;
+ Set<String> watchKeySet = watchKeyMap.get(key);
+ if (watchKeySet == null) {
+ watchKeySet = new HashSet<>();
+ }
+ for (GetValue getValue : response.getValue()) {
+ if (!watchKeySet.contains(getValue.getKey())) {
+ watchKeySet.add(getValue.getKey());
+ }
+ }
+ continue;
+ }
+ if (index != null && index > currentIndex) {
+ currentIndex = index;
+ Set<String> newKeySet = new HashSet<>(response.getValue().size());
+ Set<String> watchKeySet = watchKeyMap.get(key);
+ for (GetValue getValue : response.getValue()) {
+ newKeySet.add(getValue.getKey());
+ if (!watchKeySet.contains(getValue.getKey())) {
+ watchKeySet.add(getValue.getKey());
+ fireDataChangeEvent(getValue, listener, DataChangedEvent.Type.ADDED);
+ } else if (watchKeySet.contains(getValue.getKey()) && getValue.getModifyIndex() >= currentIndex) {
+ fireDataChangeEvent(getValue, listener, DataChangedEvent.Type.UPDATED);
+ }
+ }
+ for (String existKey : watchKeySet) {
+ if (!newKeySet.contains(existKey)) {
+ GetValue getValue = new GetValue();
+ getValue.setKey(existKey);
+ fireDataChangeEvent(getValue, listener, DataChangedEvent.Type.DELETED);
+ }
+ }
+ this.watchKeyMap.put(key, newKeySet);
+ } else if (index != null && index < currentIndex) {
+ currentIndex = 0;
+ }
+ }
+ }
+
+ private void fireDataChangeEvent(final GetValue getValue, final DataChangedEventListener listener, final DataChangedEvent.Type type) {
+ DataChangedEvent event = new DataChangedEvent(getValue.getKey(), getValue.getValue(), type);
+ listener.onChange(event);
+ }
+
+ @Override
+ public String getType() {
+ return "Consul";
+ }
+
+ @Override
+ public Collection<String> getTypeAliases() {
+ return ClusterPersistRepository.super.getTypeAliases();
+ }
+
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereConsulClient.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardings [...]
new file mode 100644
index 00000000000..ec58721877f
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereConsulClient.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.consul;
+
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.ConsulRawClient;
+
+/**
+ * ShardingSphere consul client support use raw client.
+ */
+public class ShardingSphereConsulClient extends ConsulClient {
+
+ private ConsulRawClient rawClient;
+
+ public ShardingSphereConsulClient(final ConsulRawClient rawClient) {
+ super(rawClient);
+ this.rawClient = rawClient;
+ }
+
+ /**
+ * Get consul raw client.
+ * @return raw consul client
+ */
+ public ConsulRawClient getRawClient() {
+ return rawClient;
+ }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereQueryParams.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsp [...]
new file mode 100644
index 00000000000..d8a738878f8
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereQueryParams.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.consul;
+
+import com.ecwid.consul.UrlParameters;
+import com.ecwid.consul.Utils;
+import com.ecwid.consul.v1.ConsistencyMode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ShardingConsul Query Params support wait time MILLISECONDS level.
+ */
+public final class ShardingSphereQueryParams implements UrlParameters {
+
+ public static final ShardingSphereQueryParams DEFAULT = new ShardingSphereQueryParams(ConsistencyMode.DEFAULT);
+
+ private final String datacenter;
+
+ private final ConsistencyMode consistencyMode;
+
+ private final long waitTime;
+
+ private TimeUnit timeUnit;
+
+ private final long index;
+
+ private final String near;
+
+ private ShardingSphereQueryParams(final String datacenter, final ConsistencyMode consistencyMode, final long waitTime, final TimeUnit timeUnit, final long index, final String near) {
+ this.datacenter = datacenter;
+ this.consistencyMode = consistencyMode;
+ this.waitTime = waitTime;
+ this.timeUnit = timeUnit;
+ this.index = index;
+ this.near = near;
+ }
+
+ private ShardingSphereQueryParams(final String datacenter, final ConsistencyMode consistencyMode, final long waitTime, final long index) {
+ this(datacenter, consistencyMode, waitTime, TimeUnit.MILLISECONDS, index, null);
+ }
+
+ public ShardingSphereQueryParams(final ConsistencyMode consistencyMode) {
+ this(null, consistencyMode, -1, -1);
+ }
+
+ public ShardingSphereQueryParams(final long waitTime, final long index) {
+ this(null, ConsistencyMode.DEFAULT, waitTime, index);
+ }
+
+ @Override
+ public List<String> toUrlParameters() {
+ List<String> params = new ArrayList<String>();
+ if (datacenter != null) {
+ params.add("dc=" + Utils.encodeValue(datacenter));
+ }
+ if (consistencyMode != ConsistencyMode.DEFAULT) {
+ params.add(consistencyMode.name().toLowerCase());
+ }
+ if (waitTime != -1) {
+ String waitStr = String.valueOf(timeUnit.toMillis(waitTime)) + "ms";
+ params.add("wait=" + waitStr);
+ }
+ if (index != -1) {
+ params.add("index=" + Utils.toUnsignedString(index));
+ }
+ if (near != null) {
+ params.add("near=" + Utils.encodeValue(near));
+ }
+ return params;
+ }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulInternalLockProvider.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shar [...]
new file mode 100644
index 00000000000..c369232ba6f
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulInternalLockProvider.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.consul.lock;
+
+import com.ecwid.consul.ConsulException;
+import com.ecwid.consul.json.GsonFactory;
+import com.ecwid.consul.transport.RawResponse;
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.OperationException;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.kv.model.GetValue;
+import com.ecwid.consul.v1.kv.model.PutParams;
+import com.ecwid.consul.v1.session.model.NewSession;
+import com.ecwid.consul.v1.session.model.Session;
+import com.google.gson.reflect.TypeToken;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.apache.shardingsphere.mode.repository.cluster.consul.ShardingSphereConsulClient;
+import org.apache.shardingsphere.mode.repository.cluster.consul.ShardingSphereQueryParams;
+import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
+import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulPropertyKey;
+import org.apache.shardingsphere.mode.repository.cluster.lock.InternalLock;
+import org.apache.shardingsphere.mode.repository.cluster.lock.InternalLockProvider;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+
+/**
+ * Consul internal lock holder.
+ */
+@RequiredArgsConstructor
+@Slf4j
+public class ConsulInternalLockProvider implements InternalLockProvider {
+
+ private static final String CONSUL_ROOT_PATH = "sharding/lock";
+
+ private static final String CONSUL_PATH_SEPARATOR = "/";
+
+ private static final String DEFAULT_CONSUL_LOCK_VALUE = "LOCKED";
+
+ private static final String DEFAULT_CONSUL_UNLOCK_VALUE = "UNLOCKED";
+
+ private static final long DEFAULT_LOCK_WAIT_TIME = 5000L;
+
+ private static final ScheduledThreadPoolExecutor SESSION_FLUSH_EXECUTOR = new ScheduledThreadPoolExecutor(2);
+
+ private final Map<String, ConsulInternalLock> locks = new ConcurrentHashMap<String, ConsulInternalLock>();
+
+ private final ConsulClient consulClient;
+
+ private final ConsulProperties consulProps;
+
+ @Override
+ public InternalLock getInternalLock(final String lockKey) {
+ return getInternalReentrantMutexLock(lockKey);
+ }
+
+ /**
+ * Get internal mutex lock.
+ *
+ * @param lockName lock name
+ * @return internal mutex lock
+ */
+ public InternalLock getInternalMutexLock(final String lockName) {
+ return getInternalReentrantMutexLock(lockName);
+ }
+
+ /**
+ * Get internal reentrant mutex lock.
+ *
+ * @param lockName lock name
+ * @return internal reentrant mutex lock
+ */
+ public InternalLock getInternalReentrantMutexLock(final String lockName) {
+ ConsulInternalLock result = locks.get(lockName);
+ if (result == null) {
+ result = createLock(lockName);
+ locks.put(lockName, result);
+ }
+ return result;
+ }
+
+ private ConsulInternalLock createLock(final String lockName) {
+ try {
+ NewSession session = new NewSession();
+ session.setName(lockName);
+ return new ConsulInternalLock(consulClient, lockName, consulProps);
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ log.error("ConsulRepository tryLock error, lockName:{}", lockName, ex);
+ }
+ return null;
+ }
+
+ /**
+ * flush session by update ttl.
+ * @param consulClient consul client
+ * @param sessionId session id
+ */
+ public static void generatorFlushSessionTtlTask(final ConsulClient consulClient, final String sessionId) {
+ SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(new Runnable() {
+
+ @Override
+ public void run() {
+ consulClient.renewSession(sessionId, QueryParams.DEFAULT);
+ }
+ }, 5, 10, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Consul internal lock.
+ */
+ private static class ConsulInternalLock implements InternalLock {
+
+ private final ConsulClient consulClient;
+
+ private final ConsulProperties consulProperties;
+
+ private final ThreadLocal<String> lockSessionMap;
+
+ private final String lockName;
+
+ ConsulInternalLock(final ConsulClient consulClient, final String lockName, final ConsulProperties consulProperties) {
+ this.consulClient = consulClient;
+ this.lockName = lockName;
+ this.consulProperties = consulProperties;
+ this.lockSessionMap = new ThreadLocal<String>();
+ }
+
+ // @Override
+ public void lock() {
+ try {
+ // support reentrant lock
+ if (StringUtils.isNotEmpty(lockSessionMap.get())) {
+ return;
+ }
+ PutParams putParams = new PutParams();
+ String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockName;
+ while (true) {
+ String sessionId = createSession(lockName);
+ putParams.setAcquireSession(sessionId);
+ Response<Boolean> response = consulClient.setKVValue(lockPath, DEFAULT_CONSUL_LOCK_VALUE, putParams);
+ if (response.getValue()) {
+ // lock success
+ lockSessionMap.set(sessionId);
+ ConsulInternalLockProvider.generatorFlushSessionTtlTask(consulClient, sessionId);
+ if (log.isDebugEnabled()) {
+ log.debug("Session id {} get lock {} is success", sessionId, lockName);
+ }
+ return;
+ } else {
+ // lock failed,exist race so retry
+ // block query if value is change so return
+ consulClient.sessionDestroy(sessionId, null);
+ Long lockIndex = response.getConsulIndex();
+ if (lockIndex == null) {
+ lockIndex = 0L;
+ }
+ long waitTime = doWaitRelease(lockPath, lockIndex, DEFAULT_LOCK_WAIT_TIME);
+ if (log.isDebugEnabled()) {
+ log.debug("Wait lock {} time {}ms found lock is by release so to retry lock", lockName, TimeUnit.NANOSECONDS.toMillis(waitTime));
+ }
+ }
+ }
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ log.error("ConsulRepository tryLock error, lockName:{}", lockName, ex);
+ throw new IllegalStateException("Acquire consul lock failed", ex);
+ }
+ }
+
+ @Override
+ public boolean tryLock(final long timeoutMillis) {
+ try {
+ if (StringUtils.isNotEmpty(lockSessionMap.get())) {
+ return true;
+ }
+ long lockTime = timeoutMillis;
+ PutParams putParams = new PutParams();
+ String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockName;
+ while (true) {
+ String sessionId = createSession(lockPath);
+ putParams.setAcquireSession(sessionId);
+ Response<Boolean> response = consulClient.setKVValue(lockPath, DEFAULT_CONSUL_LOCK_VALUE, putParams);
+ if (response.getValue()) {
+ // lock success
+ lockSessionMap.set(sessionId);
+ ConsulInternalLockProvider.generatorFlushSessionTtlTask(this.consulClient, sessionId);
+ return true;
+ } else {
+ // lock failed,exist race so retry
+ // block query if value is change so return
+ consulClient.sessionDestroy(sessionId, null);
+ long waitTime = doWaitRelease(lockPath, response.getConsulIndex(), lockTime);
+ if (waitTime < lockTime) {
+ lockTime = lockTime - waitTime;
+ continue;
+ }
+ return false;
+ }
+ }
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ log.error("EtcdRepository tryLock error, lockName:{}", lockName, ex);
+ return false;
+ }
+ }
+
+ @Override
+ public void unlock() {
+ try {
+ PutParams putParams = new PutParams();
+ String sessionId = lockSessionMap.get();
+ putParams.setReleaseSession(sessionId);
+ String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockName;
+ this.consulClient.setKVValue(lockPath, DEFAULT_CONSUL_UNLOCK_VALUE, putParams).getValue();
+ this.consulClient.sessionDestroy(sessionId, null);
+ if (log.isDebugEnabled()) {
+ log.debug("Release lock {} with session id {} success", lockName, sessionId);
+ }
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ log.error("EtcdRepository unlock error, lockName:{}", lockName, ex);
+ } finally {
+ lockSessionMap.remove();
+ }
+ }
+
+ // @Override
+ public void lockInterruptibly() {
+ throw new UnsupportedOperationException();
+ }
+
+ // @Override
+ public Condition newCondition() {
+ throw new UnsupportedOperationException();
+ }
+
+ private String createSession(final String lockName) {
+ NewSession session = new NewSession();
+ session.setName(lockName);
+ // lock was released by force while session is invalid
+ session.setBehavior(Session.Behavior.RELEASE);
+ session.setTtl(consulProperties.getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS));
+ return this.consulClient.sessionCreate(session, null).getValue();
+ }
+
+ private long doWaitRelease(final String key, final long valueIndex, final long waitTime) {
+ long currentIndex = valueIndex;
+ if (currentIndex < 0) {
+ currentIndex = 0;
+ }
+ ShardingSphereConsulClient shardingSphereConsulClient = (ShardingSphereConsulClient) consulClient;
+ AtomicBoolean running = new AtomicBoolean(true);
+ long waitCostTime = 0L;
+ long now = System.currentTimeMillis();
+ long deadlineWaitTime = now + waitTime;
+ long blockWaitTime = waitTime;
+ while (running.get()) {
+ long startWaitTime = System.currentTimeMillis();
+ if (startWaitTime >= deadlineWaitTime) {
+ // wait time is reached max
+ return waitTime;
+ }
+ RawResponse rawResponse = shardingSphereConsulClient.getRawClient().makeGetRequest("/v1/kv/" + key, null, new ShardingSphereQueryParams(blockWaitTime, currentIndex));
+ Response<GetValue> response = warpRawResponse(rawResponse);
+ Long index = response.getConsulIndex();
+ waitCostTime += System.currentTimeMillis() - startWaitTime;
+ blockWaitTime -= waitCostTime;
+ if (index != null && index >= currentIndex) {
+ if (currentIndex == 0) {
+ currentIndex = index;
+ continue;
+ }
+ currentIndex = index;
+ GetValue getValue = response.getValue();
+ if (getValue == null || getValue.getValue() == null) {
+ return waitCostTime;
+ }
+ if (!key.equals(getValue.getKey())) {
+ continue;
+ }
+ return waitCostTime;
+ } else if (index != null && index < currentIndex) {
+ currentIndex = 0;
+ }
+ }
+ return -1;
+ }
+
+ private Response<GetValue> warpRawResponse(final RawResponse rawResponse) {
+ if (rawResponse.getStatusCode() == 200) {
+ List<GetValue> value = GsonFactory.getGson().fromJson(rawResponse.getContent(), new TypeToken<List<GetValue>>() {
+ }.getType());
+
+ if (value.size() == 0) {
+ return new Response<GetValue>(null, rawResponse);
+ } else if (value.size() == 1) {
+ return new Response<GetValue>(value.get(0), rawResponse);
+ } else {
+ throw new ConsulException("Strange response (list size=" + value.size() + ")");
+ }
+ } else if (rawResponse.getStatusCode() == 404) {
+ return new Response<GetValue>(null, rawResponse);
+ } else {
+ throw new OperationException(rawResponse);
+ }
+ }
+ }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/props/ConsulProperties.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingspher [...]
new file mode 100644
index 00000000000..37df4ab43bd
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/props/ConsulProperties.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.consul.props;
+
+import org.apache.shardingsphere.infra.util.props.TypedProperties;
+import java.util.Properties;
+
+/**
+ * Typed properties of Consul.
+ */
+public final class ConsulProperties extends TypedProperties<ConsulPropertyKey> {
+
+ public ConsulProperties(final Properties props) {
+ super(ConsulPropertyKey.class, props);
+ }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/props/ConsulPropertyKey.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphe [...]
new file mode 100644
index 00000000000..779c5250451
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/props/ConsulPropertyKey.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.consul.props;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.util.props.TypedPropertyKey;
+
+/**
+ * Typed property key of Consul.
+ */
+@RequiredArgsConstructor
+@Getter
+public enum ConsulPropertyKey implements TypedPropertyKey {
+
+ /**
+ * Time to live seconds.
+ */
+ TIME_TO_LIVE_SECONDS("timeToLiveSeconds", "30s", String.class),
+
+ /**
+ *Time to live seconds.
+ */
+ LOCK_DELAY_TO_MICORSENDS("lockDelayToMicorsends", "2", String.class),
+
+ /**
+ *Block query time seconds.
+ */
+ BLOCK_QUERY_TIME_TO_SECONDS("blockQueryTimeToSeconds", "60", long.class);
+
+ private final String key;
+
+ private final String defaultValue;
+
+ private final Class<?> type;
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/resources/META-INF.services/org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/ [...]
new file mode 100644
index 00000000000..b1215b4ba8c
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/resources/META-INF.services/org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.mode.repository.cluster.etcd.ConsulRepository
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere- [...]
new file mode 100644
index 00000000000..f903c9c4067
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.consul;
+
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.kv.model.GetValue;
+import com.ecwid.consul.v1.kv.model.PutParams;
+import com.ecwid.consul.v1.session.model.NewSession;
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.internal.configuration.plugins.Plugins;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.plugins.MemberAccessor;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.atLeastOnce;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class ConsulRepositoryTest {
+
+ private final ConsulRepository repository = new ConsulRepository();
+
+ @Mock
+ private ShardingSphereConsulClient client;
+
+ @Mock
+ private Response<GetValue> response;
+
+ @Mock
+ private Response<List<String>> responseList;
+
+ @Mock
+ private Response<List<GetValue>> responseGetValueList;
+
+ @Mock
+ private Response<Boolean> responseBoolean;
+
+ @Mock
+ private Response<String> sessionResponse;
+
+ @Mock
+ private GetValue getValue;
+
+ @Mock
+ private List<GetValue> getValueList;
+
+ private long index = 123456L;
+
+ @Before
+ public void setUp() {
+ setClient();
+ setProperties();
+ }
+
+ @SneakyThrows(ReflectiveOperationException.class)
+ private void setClient() {
+ mockClient();
+ MemberAccessor accessor = Plugins.getMemberAccessor();
+ accessor.set(repository.getClass().getDeclaredField("consulClient"), repository, client);
+ }
+
+ @SneakyThrows(ReflectiveOperationException.class)
+ private void setProperties() {
+ MemberAccessor accessor = Plugins.getMemberAccessor();
+ accessor.set(repository.getClass().getDeclaredField("consulProperties"), repository, new ConsulProperties(new Properties()));
+ accessor.set(repository.getClass().getDeclaredField("watchKeyMap"), repository, new HashMap<>(4));
+ }
+
+ @SuppressWarnings("unchecked")
+ // @SneakyThrows({InterruptedException.class, ExecutionException.class})
+ private ConsulClient mockClient() {
+ when(client.getKVValue(any(String.class))).thenReturn(response);
+ when(response.getValue()).thenReturn(getValue);
+ when(client.getKVValues(any(String.class), any(QueryParams.class))).thenReturn(responseGetValueList);
+ when(client.getKVKeysOnly(any(String.class))).thenReturn(responseList);
+ when(client.sessionCreate(any(NewSession.class), any(QueryParams.class))).thenReturn(sessionResponse);
+ when(sessionResponse.getValue()).thenReturn("12323ddsf3sss");
+ when(responseGetValueList.getConsulIndex()).thenReturn(index++);
+ when(responseGetValueList.getValue()).thenReturn(getValueList);
+ when(client.setKVValue(any(String.class), any(String.class))).thenReturn(responseBoolean);
+ return client;
+ }
+
+ @Test
+ public void assertGetKey() {
+ repository.get("key");
+ verify(client).getKVValue("key");
+ verify(response).getValue();
+ }
+
+ @Test
+ public void assertGetChildrenKeys() {
+ final String key = "/key";
+ String k1 = "/key/key1/key1-1";
+ String v1 = "value1";
+ client.setKVValue(k1, v1);
+ String k2 = "/key/key2";
+ String v2 = "value2";
+ client.setKVValue(k2, v2);
+ List<String> getValues = Arrays.asList(k1, k2);
+ when(responseList.getValue()).thenReturn(getValues);
+ List<String> actual = repository.getChildrenKeys(key);
+ assertThat(actual.size(), is(2));
+ Iterator<String> iterator = actual.iterator();
+ assertThat(iterator.next(), is("/key/key1/key1-1"));
+ assertThat(iterator.next(), is("/key/key2"));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void assertPersistEphemeral() {
+ repository.persistEphemeral("key1", "value1");
+ verify(client).sessionCreate(any(NewSession.class), any(QueryParams.class));
+ verify(client).setKVValue(any(String.class), any(String.class), any(PutParams.class));
+ try {
+ Thread.sleep(6000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ verify(client).renewSession(any(String.class), any(QueryParams.class));
+ }
+
+ @Test
+ public void assertWatchUpdate() {
+ final String key = "sharding/key";
+ final String k1 = "sharding/key/key1";
+ final String v1 = "value1";
+ client.setKVValue(k1, v1);
+ GetValue getValue1 = new GetValue();
+ getValue1.setKey(k1);
+ getValue1.setValue(v1);
+ List<GetValue> getValues = Arrays.asList(getValue1);
+ when(responseGetValueList.getValue()).thenReturn(getValues);
+ repository.watch(key, event -> {
+ });
+ client.setKVValue(k1, "value1-1");
+ verify(client, atLeastOnce()).getKVValues(any(String.class), any(QueryParams.class));
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ @Test
+ public void assertWatchDelete() {
+ final String key = "sharding/key";
+ final String k1 = "sharding/key/key1";
+ final String v1 = "value1";
+ final String k2 = "sharding/key/key2";
+ final String v2 = "value1";
+ client.setKVValue(k1, v1);
+ client.setKVValue(k2, v2);
+ GetValue getValue1 = new GetValue();
+ getValue1.setKey(k1);
+ getValue1.setValue(v1);
+ List<GetValue> getValues = Arrays.asList(getValue1);
+ when(responseGetValueList.getValue()).thenReturn(getValues);
+ repository.watch(key, event -> {
+ });
+ client.deleteKVValue(k2);
+ verify(client, atLeastOnce()).getKVValues(any(String.class), any(QueryParams.class));
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void assertWatchIgnored() {
+ // TODO
+ }
+
+ @Test
+ public void assertDelete() {
+ repository.delete("key");
+ verify(client).deleteKVValue(any(String.class));
+ }
+
+ @Test
+ public void assertPersist() {
+ repository.persist("key1", "value1");
+ verify(client).setKVValue(any(String.class), any(String.class));
+ }
+
+ @Test
+ public void assertClose() {
+ repository.close();
+ // verify(client).close();
+ }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/props/ConsulPropertiesTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardings [...]
new file mode 100644
index 00000000000..c1809ad218b
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/props/ConsulPropertiesTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.consul.props;
+
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public final class ConsulPropertiesTest {
+
+ @Test
+ public void assertGetValue() {
+ assertThat(new ConsulProperties(createProperties()).getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS), is(60L));
+ }
+
+ private Properties createProperties() {
+ Properties result = new Properties();
+ result.setProperty(ConsulPropertyKey.TIME_TO_LIVE_SECONDS.getKey(), "50");
+ return result;
+ }
+
+ @Test
+ public void assertGetDefaultValue() {
+ assertThat(new ConsulProperties(new Properties()).getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS), is(30L));
+ }
+}