You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/10/03 15:19:25 UTC

[shardingsphere] branch master updated: Cluster mode repository support consul register center (#21331)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a11de40aff0 Cluster mode repository support consul register center (#21331)
a11de40aff0 is described below

commit a11de40aff052d9cb760ffbbe2b47e094905a194
Author: GavinPeng <ro...@163.com>
AuthorDate: Mon Oct 3 23:19:18 2022 +0800

    Cluster mode repository support consul register center (#21331)
    
    * support consul repository
    
    * one: support reentrant lock,two :add test code
    
    * api support 5.2.1 snapshot
    
    * remove java doc author and date
    
    Co-authored-by: gavin.peng <ga...@ximalaya.com>
---
 .../shardingsphere-mode-type/pom.xml               |   1 +
 .../pom.xml                                        |   1 +
 .../pom.xml                                        |  39 +++
 .../cluster/consul/ConsulRepository.java           | 200 ++++++++++++
 .../cluster/consul/ShardingSphereConsulClient.java |  42 +++
 .../cluster/consul/ShardingSphereQueryParams.java  |  89 ++++++
 .../consul/lock/ConsulInternalLockProvider.java    | 336 +++++++++++++++++++++
 .../cluster/consul/props/ConsulProperties.java     |  31 ++
 .../cluster/consul/props/ConsulPropertyKey.java    |  51 ++++
 ...ode.repository.cluster.ClusterPersistRepository |  18 ++
 .../cluster/consul/ConsulRepositoryTest.java       | 223 ++++++++++++++
 .../cluster/consul/props/ConsulPropertiesTest.java |  44 +++
 12 files changed, 1075 insertions(+)

diff --git a/shardingsphere-mode/shardingsphere-mode-type/pom.xml b/shardingsphere-mode/shardingsphere-mode-type/pom.xml
index 07a8225577d..da2ae9a9658 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/pom.xml
+++ b/shardingsphere-mode/shardingsphere-mode-type/pom.xml
@@ -31,5 +31,6 @@
     <modules>
         <module>shardingsphere-standalone-mode</module>
         <module>shardingsphere-cluster-mode</module>
+        <!--        <module>shardingsphere-cluster-mode-repository-consul</module>-->
     </modules>
 </project>
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/pom.xml b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/pom.xml
index c001e33eb34..bd9cd5cd3ad 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/pom.xml
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/pom.xml
@@ -32,5 +32,6 @@
         <module>shardingsphere-cluster-mode-repository-zookeeper-curator</module>
         <module>shardingsphere-cluster-mode-repository-etcd</module>
         <module>shardingsphere-cluster-mode-repository-nacos</module>
+        <module>shardingsphere-cluster-mode-repository-consul</module>
     </modules>
 </project>
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/pom.xml b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/pom.xml
new file mode 100644
index 00000000000..4e789d80b23
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.shardingsphere</groupId>
+        <artifactId>shardingsphere-cluster-mode-repository-provider</artifactId>
+        <version>5.2.1-SNAPSHOT</version>
+    </parent>
+    <artifactId>shardingsphere-cluster-mode-repository-consul</artifactId>
+    <name>${project.artifactId}</name>
+    
+    <properties>
+        <consul.api.version>1.4.1</consul.api.version>
+    </properties>
+    
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-cluster-mode-repository-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        
+        <dependency>
+            <groupId>com.ecwid.consul</groupId>
+            <artifactId>consul-api</artifactId>
+            <version>${consul.api.version}</version>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.5.5</version>
+            <scope>compile</scope>
+        </dependency>
+        
+    </dependencies>
+</project>
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-clus [...]
new file mode 100644
index 00000000000..0d9c171d006
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.consul;
+
+import com.ecwid.consul.v1.ConsulRawClient;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.kv.model.GetValue;
+import com.ecwid.consul.v1.kv.model.PutParams;
+import com.ecwid.consul.v1.session.model.NewSession;
+import com.ecwid.consul.v1.session.model.Session;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
+import org.apache.shardingsphere.mode.repository.cluster.consul.lock.ConsulInternalLockProvider;
+import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
+import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulPropertyKey;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import org.apache.shardingsphere.mode.repository.cluster.lock.InternalLock;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Map;
+import java.util.List;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Registry repository of Consul.
+ */
+public class ConsulRepository implements ClusterPersistRepository {
+    
+    private ShardingSphereConsulClient consulClient;
+    
+    private ConsulInternalLockProvider consulInternalLockProvider;
+    
+    private ConsulProperties consulProperties;
+    
+    private Map<String, Set<String>> watchKeyMap;
+    
+    @Override
+    public void init(final ClusterPersistRepositoryConfiguration config) {
+        this.consulClient = new ShardingSphereConsulClient(new ConsulRawClient(config.getServerLists()));
+        this.consulProperties = new ConsulProperties(config.getProps());
+        this.consulInternalLockProvider = new ConsulInternalLockProvider(this.consulClient, this.consulProperties);
+        this.watchKeyMap = new HashMap<String, Set<String>>(6);
+    }
+    
+    @Override
+    public String get(final String key) {
+        Response<GetValue> response = this.consulClient.getKVValue(key);
+        return response != null ? response.getValue().getValue() : null;
+    }
+    
+    @Override
+    public List<String> getChildrenKeys(final String key) {
+        Response<List<String>> response = this.consulClient.getKVKeysOnly(key);
+        return response != null ? response.getValue() : Collections.EMPTY_LIST;
+    }
+    
+    @Override
+    public void persist(final String key, final String value) {
+        this.consulClient.setKVValue(key, value);
+    }
+    
+    @Override
+    public void delete(final String key) {
+        this.consulClient.deleteKVValue(key);
+    }
+    
+    @Override
+    public void close() {
+        // this.consulClien
+        // this.consulClient.
+    }
+    
+    @Override
+    public void persistEphemeral(final String key, final String value) {
+        NewSession session = new NewSession();
+        session.setName(key);
+        session.setBehavior(Session.Behavior.DELETE);
+        session.setTtl(this.consulProperties.getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS));
+        Response<String> response = this.consulClient.sessionCreate(session, QueryParams.DEFAULT);
+        final String sessionId = response.getValue();
+        PutParams putParams = new PutParams();
+        putParams.setAcquireSession(sessionId);
+        this.consulClient.setKVValue(key, value, putParams);
+        this.consulInternalLockProvider.generatorFlushSessionTtlTask(this.consulClient, sessionId);
+    }
+    
+    @Override
+    public void persistExclusiveEphemeral(final String key, final String value) {
+        this.persistEphemeral(key, value);
+    }
+    
+    @Override
+    public boolean tryLock(final String lockKey, final long timeoutMillis) {
+        InternalLock lock = this.consulInternalLockProvider.getInternalMutexLock(lockKey);
+        return lock.tryLock(timeoutMillis);
+    }
+    
+    @Override
+    public void unlock(final String lockKey) {
+        InternalLock lock = this.consulInternalLockProvider.getInternalMutexLock(lockKey);
+        lock.unlock();
+    }
+    
+    @Override
+    public void watch(final String key, final DataChangedEventListener listener) {
+        Thread watchThread = new Thread(new Runnable() {
+            
+            @Override
+            public void run() {
+                watchChildKeyChangeEvent(key, listener);
+            }
+        });
+        watchThread.setDaemon(true);
+        watchThread.start();
+    }
+    
+    private void watchChildKeyChangeEvent(final String key, final DataChangedEventListener listener) {
+        AtomicBoolean running = new AtomicBoolean(true);
+        long currentIndex = 0;
+        while (running.get()) {
+            Response<List<GetValue>> response = consulClient.getKVValues(key,
+                    new QueryParams(consulProperties.getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS), currentIndex));
+            Long index = response.getConsulIndex();
+            if (index != null && currentIndex == 0) {
+                currentIndex = index;
+                Set<String> watchKeySet = watchKeyMap.get(key);
+                if (watchKeySet == null) {
+                    watchKeySet = new HashSet<>();
+                }
+                for (GetValue getValue : response.getValue()) {
+                    if (!watchKeySet.contains(getValue.getKey())) {
+                        watchKeySet.add(getValue.getKey());
+                    }
+                }
+                continue;
+            }
+            if (index != null && index > currentIndex) {
+                currentIndex = index;
+                Set<String> newKeySet = new HashSet<>(response.getValue().size());
+                Set<String> watchKeySet = watchKeyMap.get(key);
+                for (GetValue getValue : response.getValue()) {
+                    newKeySet.add(getValue.getKey());
+                    if (!watchKeySet.contains(getValue.getKey())) {
+                        watchKeySet.add(getValue.getKey());
+                        fireDataChangeEvent(getValue, listener, DataChangedEvent.Type.ADDED);
+                    } else if (watchKeySet.contains(getValue.getKey()) && getValue.getModifyIndex() >= currentIndex) {
+                        fireDataChangeEvent(getValue, listener, DataChangedEvent.Type.UPDATED);
+                    }
+                }
+                for (String existKey : watchKeySet) {
+                    if (!newKeySet.contains(existKey)) {
+                        GetValue getValue = new GetValue();
+                        getValue.setKey(existKey);
+                        fireDataChangeEvent(getValue, listener, DataChangedEvent.Type.DELETED);
+                    }
+                }
+                this.watchKeyMap.put(key, newKeySet);
+            } else if (index != null && index < currentIndex) {
+                currentIndex = 0;
+            }
+        }
+    }
+    
+    private void fireDataChangeEvent(final GetValue getValue, final DataChangedEventListener listener, final DataChangedEvent.Type type) {
+        DataChangedEvent event = new DataChangedEvent(getValue.getKey(), getValue.getValue(), type);
+        listener.onChange(event);
+    }
+    
+    @Override
+    public String getType() {
+        return "Consul";
+    }
+    
+    @Override
+    public Collection<String> getTypeAliases() {
+        return ClusterPersistRepository.super.getTypeAliases();
+    }
+    
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereConsulClient.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardings [...]
new file mode 100644
index 00000000000..ec58721877f
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereConsulClient.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.consul;
+
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.ConsulRawClient;
+
+/**
+ * ShardingSphere consul client support use raw client.
+ */
+public class ShardingSphereConsulClient extends ConsulClient {
+    
+    private ConsulRawClient rawClient;
+    
+    public ShardingSphereConsulClient(final ConsulRawClient rawClient) {
+        super(rawClient);
+        this.rawClient = rawClient;
+    }
+    
+    /**
+     * Get consul raw client.
+     * @return raw consul client
+     */
+    public ConsulRawClient getRawClient() {
+        return rawClient;
+    }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereQueryParams.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsp [...]
new file mode 100644
index 00000000000..d8a738878f8
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereQueryParams.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.consul;
+
+import com.ecwid.consul.UrlParameters;
+import com.ecwid.consul.Utils;
+import com.ecwid.consul.v1.ConsistencyMode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ShardingConsul Query Params support wait time MILLISECONDS level.
+ */
+public final class ShardingSphereQueryParams implements UrlParameters {
+    
+    public static final ShardingSphereQueryParams DEFAULT = new ShardingSphereQueryParams(ConsistencyMode.DEFAULT);
+    
+    private final String datacenter;
+    
+    private final ConsistencyMode consistencyMode;
+    
+    private final long waitTime;
+    
+    private TimeUnit timeUnit;
+    
+    private final long index;
+    
+    private final String near;
+    
+    private ShardingSphereQueryParams(final String datacenter, final ConsistencyMode consistencyMode, final long waitTime, final TimeUnit timeUnit, final long index, final String near) {
+        this.datacenter = datacenter;
+        this.consistencyMode = consistencyMode;
+        this.waitTime = waitTime;
+        this.timeUnit = timeUnit;
+        this.index = index;
+        this.near = near;
+    }
+    
+    private ShardingSphereQueryParams(final String datacenter, final ConsistencyMode consistencyMode, final long waitTime, final long index) {
+        this(datacenter, consistencyMode, waitTime, TimeUnit.MILLISECONDS, index, null);
+    }
+    
+    public ShardingSphereQueryParams(final ConsistencyMode consistencyMode) {
+        this(null, consistencyMode, -1, -1);
+    }
+    
+    public ShardingSphereQueryParams(final long waitTime, final long index) {
+        this(null, ConsistencyMode.DEFAULT, waitTime, index);
+    }
+    
+    @Override
+    public List<String> toUrlParameters() {
+        List<String> params = new ArrayList<String>();
+        if (datacenter != null) {
+            params.add("dc=" + Utils.encodeValue(datacenter));
+        }
+        if (consistencyMode != ConsistencyMode.DEFAULT) {
+            params.add(consistencyMode.name().toLowerCase());
+        }
+        if (waitTime != -1) {
+            String waitStr = String.valueOf(timeUnit.toMillis(waitTime)) + "ms";
+            params.add("wait=" + waitStr);
+        }
+        if (index != -1) {
+            params.add("index=" + Utils.toUnsignedString(index));
+        }
+        if (near != null) {
+            params.add("near=" + Utils.encodeValue(near));
+        }
+        return params;
+    }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulInternalLockProvider.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shar [...]
new file mode 100644
index 00000000000..c369232ba6f
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulInternalLockProvider.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.consul.lock;
+
+import com.ecwid.consul.ConsulException;
+import com.ecwid.consul.json.GsonFactory;
+import com.ecwid.consul.transport.RawResponse;
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.OperationException;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.kv.model.GetValue;
+import com.ecwid.consul.v1.kv.model.PutParams;
+import com.ecwid.consul.v1.session.model.NewSession;
+import com.ecwid.consul.v1.session.model.Session;
+import com.google.gson.reflect.TypeToken;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.apache.shardingsphere.mode.repository.cluster.consul.ShardingSphereConsulClient;
+import org.apache.shardingsphere.mode.repository.cluster.consul.ShardingSphereQueryParams;
+import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
+import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulPropertyKey;
+import org.apache.shardingsphere.mode.repository.cluster.lock.InternalLock;
+import org.apache.shardingsphere.mode.repository.cluster.lock.InternalLockProvider;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+
+/**
+ * Consul internal lock holder.
+ */
+@RequiredArgsConstructor
+@Slf4j
+public class ConsulInternalLockProvider implements InternalLockProvider {
+    
+    private static final String CONSUL_ROOT_PATH = "sharding/lock";
+    
+    private static final String CONSUL_PATH_SEPARATOR = "/";
+    
+    private static final String DEFAULT_CONSUL_LOCK_VALUE = "LOCKED";
+    
+    private static final String DEFAULT_CONSUL_UNLOCK_VALUE = "UNLOCKED";
+    
+    private static final long DEFAULT_LOCK_WAIT_TIME = 5000L;
+    
+    private static final ScheduledThreadPoolExecutor SESSION_FLUSH_EXECUTOR = new ScheduledThreadPoolExecutor(2);
+    
+    private final Map<String, ConsulInternalLock> locks = new ConcurrentHashMap<String, ConsulInternalLock>();
+    
+    private final ConsulClient consulClient;
+    
+    private final ConsulProperties consulProps;
+    
+    @Override
+    public InternalLock getInternalLock(final String lockKey) {
+        return getInternalReentrantMutexLock(lockKey);
+    }
+    
+    /**
+     * Get internal mutex lock.
+     *
+     * @param lockName lock name
+     * @return internal mutex lock
+     */
+    public InternalLock getInternalMutexLock(final String lockName) {
+        return getInternalReentrantMutexLock(lockName);
+    }
+    
+    /**
+     * Get internal reentrant mutex lock.
+     *
+     * @param lockName lock name
+     * @return internal reentrant mutex lock
+     */
+    public InternalLock getInternalReentrantMutexLock(final String lockName) {
+        ConsulInternalLock result = locks.get(lockName);
+        if (result == null) {
+            result = createLock(lockName);
+            locks.put(lockName, result);
+        }
+        return result;
+    }
+    
+    private ConsulInternalLock createLock(final String lockName) {
+        try {
+            NewSession session = new NewSession();
+            session.setName(lockName);
+            return new ConsulInternalLock(consulClient, lockName, consulProps);
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            log.error("ConsulRepository tryLock error, lockName:{}", lockName, ex);
+        }
+        return null;
+    }
+    
+    /**
+     * flush session by update ttl.
+     * @param consulClient consul client
+     * @param sessionId session id
+     */
+    public static void generatorFlushSessionTtlTask(final ConsulClient consulClient, final String sessionId) {
+        SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(new Runnable() {
+            
+            @Override
+            public void run() {
+                consulClient.renewSession(sessionId, QueryParams.DEFAULT);
+            }
+        }, 5, 10, TimeUnit.SECONDS);
+    }
+    
+    /**
+     * Consul internal lock.
+     */
+    private static class ConsulInternalLock implements InternalLock {
+        
+        private final ConsulClient consulClient;
+        
+        private final ConsulProperties consulProperties;
+        
+        private final ThreadLocal<String> lockSessionMap;
+        
+        private final String lockName;
+        
+        ConsulInternalLock(final ConsulClient consulClient, final String lockName, final ConsulProperties consulProperties) {
+            this.consulClient = consulClient;
+            this.lockName = lockName;
+            this.consulProperties = consulProperties;
+            this.lockSessionMap = new ThreadLocal<String>();
+        }
+        
+        // @Override
+        public void lock() {
+            try {
+                // support reentrant lock
+                if (StringUtils.isNotEmpty(lockSessionMap.get())) {
+                    return;
+                }
+                PutParams putParams = new PutParams();
+                String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockName;
+                while (true) {
+                    String sessionId = createSession(lockName);
+                    putParams.setAcquireSession(sessionId);
+                    Response<Boolean> response = consulClient.setKVValue(lockPath, DEFAULT_CONSUL_LOCK_VALUE, putParams);
+                    if (response.getValue()) {
+                        // lock success
+                        lockSessionMap.set(sessionId);
+                        ConsulInternalLockProvider.generatorFlushSessionTtlTask(consulClient, sessionId);
+                        if (log.isDebugEnabled()) {
+                            log.debug("Session id {} get lock {} is success", sessionId, lockName);
+                        }
+                        return;
+                    } else {
+                        // lock failed,exist race so retry
+                        // block query if value is change so return
+                        consulClient.sessionDestroy(sessionId, null);
+                        Long lockIndex = response.getConsulIndex();
+                        if (lockIndex == null) {
+                            lockIndex = 0L;
+                        }
+                        long waitTime = doWaitRelease(lockPath, lockIndex, DEFAULT_LOCK_WAIT_TIME);
+                        if (log.isDebugEnabled()) {
+                            log.debug("Wait lock {} time {}ms found lock is by release so to retry lock", lockName, TimeUnit.NANOSECONDS.toMillis(waitTime));
+                        }
+                    }
+                }
+                // CHECKSTYLE:OFF
+            } catch (final Exception ex) {
+                // CHECKSTYLE:ON
+                log.error("ConsulRepository tryLock error, lockName:{}", lockName, ex);
+                throw new IllegalStateException("Acquire consul lock failed", ex);
+            }
+        }
+        
+        @Override
+        public boolean tryLock(final long timeoutMillis) {
+            try {
+                if (StringUtils.isNotEmpty(lockSessionMap.get())) {
+                    return true;
+                }
+                long lockTime = timeoutMillis;
+                PutParams putParams = new PutParams();
+                String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockName;
+                while (true) {
+                    String sessionId = createSession(lockPath);
+                    putParams.setAcquireSession(sessionId);
+                    Response<Boolean> response = consulClient.setKVValue(lockPath, DEFAULT_CONSUL_LOCK_VALUE, putParams);
+                    if (response.getValue()) {
+                        // lock success
+                        lockSessionMap.set(sessionId);
+                        ConsulInternalLockProvider.generatorFlushSessionTtlTask(this.consulClient, sessionId);
+                        return true;
+                    } else {
+                        // lock failed,exist race so retry
+                        // block query if value is change so return
+                        consulClient.sessionDestroy(sessionId, null);
+                        long waitTime = doWaitRelease(lockPath, response.getConsulIndex(), lockTime);
+                        if (waitTime < lockTime) {
+                            lockTime = lockTime - waitTime;
+                            continue;
+                        }
+                        return false;
+                    }
+                }
+                // CHECKSTYLE:OFF
+            } catch (final Exception ex) {
+                // CHECKSTYLE:ON
+                log.error("EtcdRepository tryLock error, lockName:{}", lockName, ex);
+                return false;
+            }
+        }
+        
+        @Override
+        public void unlock() {
+            try {
+                PutParams putParams = new PutParams();
+                String sessionId = lockSessionMap.get();
+                putParams.setReleaseSession(sessionId);
+                String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockName;
+                this.consulClient.setKVValue(lockPath, DEFAULT_CONSUL_UNLOCK_VALUE, putParams).getValue();
+                this.consulClient.sessionDestroy(sessionId, null);
+                if (log.isDebugEnabled()) {
+                    log.debug("Release lock {} with session id {} success", lockName, sessionId);
+                }
+                // CHECKSTYLE:OFF
+            } catch (final Exception ex) {
+                // CHECKSTYLE:ON
+                log.error("EtcdRepository unlock error, lockName:{}", lockName, ex);
+            } finally {
+                lockSessionMap.remove();
+            }
+        }
+        
+        // @Override
+        public void lockInterruptibly() {
+            throw new UnsupportedOperationException();
+        }
+        
+        // @Override
+        public Condition newCondition() {
+            throw new UnsupportedOperationException();
+        }
+        
+        private String createSession(final String lockName) {
+            NewSession session = new NewSession();
+            session.setName(lockName);
+            // lock was released by force while session is invalid
+            session.setBehavior(Session.Behavior.RELEASE);
+            session.setTtl(consulProperties.getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS));
+            return this.consulClient.sessionCreate(session, null).getValue();
+        }
+        
+        private long doWaitRelease(final String key, final long valueIndex, final long waitTime) {
+            long currentIndex = valueIndex;
+            if (currentIndex < 0) {
+                currentIndex = 0;
+            }
+            ShardingSphereConsulClient shardingSphereConsulClient = (ShardingSphereConsulClient) consulClient;
+            AtomicBoolean running = new AtomicBoolean(true);
+            long waitCostTime = 0L;
+            long now = System.currentTimeMillis();
+            long deadlineWaitTime = now + waitTime;
+            long blockWaitTime = waitTime;
+            while (running.get()) {
+                long startWaitTime = System.currentTimeMillis();
+                if (startWaitTime >= deadlineWaitTime) {
+                    // wait time is reached max
+                    return waitTime;
+                }
+                RawResponse rawResponse = shardingSphereConsulClient.getRawClient().makeGetRequest("/v1/kv/" + key, null, new ShardingSphereQueryParams(blockWaitTime, currentIndex));
+                Response<GetValue> response = warpRawResponse(rawResponse);
+                Long index = response.getConsulIndex();
+                waitCostTime += System.currentTimeMillis() - startWaitTime;
+                blockWaitTime -= waitCostTime;
+                if (index != null && index >= currentIndex) {
+                    if (currentIndex == 0) {
+                        currentIndex = index;
+                        continue;
+                    }
+                    currentIndex = index;
+                    GetValue getValue = response.getValue();
+                    if (getValue == null || getValue.getValue() == null) {
+                        return waitCostTime;
+                    }
+                    if (!key.equals(getValue.getKey())) {
+                        continue;
+                    }
+                    return waitCostTime;
+                } else if (index != null && index < currentIndex) {
+                    currentIndex = 0;
+                }
+            }
+            return -1;
+        }
+        
+        private Response<GetValue> warpRawResponse(final RawResponse rawResponse) {
+            if (rawResponse.getStatusCode() == 200) {
+                List<GetValue> value = GsonFactory.getGson().fromJson(rawResponse.getContent(), new TypeToken<List<GetValue>>() {
+                }.getType());
+                
+                if (value.size() == 0) {
+                    return new Response<GetValue>(null, rawResponse);
+                } else if (value.size() == 1) {
+                    return new Response<GetValue>(value.get(0), rawResponse);
+                } else {
+                    throw new ConsulException("Strange response (list size=" + value.size() + ")");
+                }
+            } else if (rawResponse.getStatusCode() == 404) {
+                return new Response<GetValue>(null, rawResponse);
+            } else {
+                throw new OperationException(rawResponse);
+            }
+        }
+    }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/props/ConsulProperties.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingspher [...]
new file mode 100644
index 00000000000..37df4ab43bd
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/props/ConsulProperties.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.consul.props;
+
+import org.apache.shardingsphere.infra.util.props.TypedProperties;
+import java.util.Properties;
+
+/**
+ * Typed properties of Consul.
+ */
+public final class ConsulProperties extends TypedProperties<ConsulPropertyKey> {
+    
+    public ConsulProperties(final Properties props) {
+        super(ConsulPropertyKey.class, props);
+    }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/props/ConsulPropertyKey.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphe [...]
new file mode 100644
index 00000000000..779c5250451
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/props/ConsulPropertyKey.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.consul.props;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.util.props.TypedPropertyKey;
+
+/**
+ * Typed property key of Consul.
+ */
+@RequiredArgsConstructor
+@Getter
+public enum ConsulPropertyKey implements TypedPropertyKey {
+    
+    /**
+     * Time to live seconds.
+     */
+    TIME_TO_LIVE_SECONDS("timeToLiveSeconds", "30s", String.class),
+    
+    /**
+     *Time to live seconds.
+     */
+    LOCK_DELAY_TO_MICORSENDS("lockDelayToMicorsends", "2", String.class),
+    
+    /**
+     *Block query time seconds.
+     */
+    BLOCK_QUERY_TIME_TO_SECONDS("blockQueryTimeToSeconds", "60", long.class);
+    
+    private final String key;
+    
+    private final String defaultValue;
+    
+    private final Class<?> type;
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/resources/META-INF.services/org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/ [...]
new file mode 100644
index 00000000000..b1215b4ba8c
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/main/resources/META-INF.services/org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.mode.repository.cluster.etcd.ConsulRepository
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere- [...]
new file mode 100644
index 00000000000..f903c9c4067
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.consul;
+
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.kv.model.GetValue;
+import com.ecwid.consul.v1.kv.model.PutParams;
+import com.ecwid.consul.v1.session.model.NewSession;
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.internal.configuration.plugins.Plugins;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.plugins.MemberAccessor;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.atLeastOnce;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class ConsulRepositoryTest {
+    
+    private final ConsulRepository repository = new ConsulRepository();
+    
+    @Mock
+    private ShardingSphereConsulClient client;
+    
+    @Mock
+    private Response<GetValue> response;
+    
+    @Mock
+    private Response<List<String>> responseList;
+    
+    @Mock
+    private Response<List<GetValue>> responseGetValueList;
+    
+    @Mock
+    private Response<Boolean> responseBoolean;
+    
+    @Mock
+    private Response<String> sessionResponse;
+    
+    @Mock
+    private GetValue getValue;
+    
+    @Mock
+    private List<GetValue> getValueList;
+    
+    private long index = 123456L;
+    
+    @Before
+    public void setUp() {
+        setClient();
+        setProperties();
+    }
+    
+    @SneakyThrows(ReflectiveOperationException.class)
+    private void setClient() {
+        mockClient();
+        MemberAccessor accessor = Plugins.getMemberAccessor();
+        accessor.set(repository.getClass().getDeclaredField("consulClient"), repository, client);
+    }
+    
+    @SneakyThrows(ReflectiveOperationException.class)
+    private void setProperties() {
+        MemberAccessor accessor = Plugins.getMemberAccessor();
+        accessor.set(repository.getClass().getDeclaredField("consulProperties"), repository, new ConsulProperties(new Properties()));
+        accessor.set(repository.getClass().getDeclaredField("watchKeyMap"), repository, new HashMap<>(4));
+    }
+    
+    @SuppressWarnings("unchecked")
+    // @SneakyThrows({InterruptedException.class, ExecutionException.class})
+    private ConsulClient mockClient() {
+        when(client.getKVValue(any(String.class))).thenReturn(response);
+        when(response.getValue()).thenReturn(getValue);
+        when(client.getKVValues(any(String.class), any(QueryParams.class))).thenReturn(responseGetValueList);
+        when(client.getKVKeysOnly(any(String.class))).thenReturn(responseList);
+        when(client.sessionCreate(any(NewSession.class), any(QueryParams.class))).thenReturn(sessionResponse);
+        when(sessionResponse.getValue()).thenReturn("12323ddsf3sss");
+        when(responseGetValueList.getConsulIndex()).thenReturn(index++);
+        when(responseGetValueList.getValue()).thenReturn(getValueList);
+        when(client.setKVValue(any(String.class), any(String.class))).thenReturn(responseBoolean);
+        return client;
+    }
+    
+    @Test
+    public void assertGetKey() {
+        repository.get("key");
+        verify(client).getKVValue("key");
+        verify(response).getValue();
+    }
+    
+    @Test
+    public void assertGetChildrenKeys() {
+        final String key = "/key";
+        String k1 = "/key/key1/key1-1";
+        String v1 = "value1";
+        client.setKVValue(k1, v1);
+        String k2 = "/key/key2";
+        String v2 = "value2";
+        client.setKVValue(k2, v2);
+        List<String> getValues = Arrays.asList(k1, k2);
+        when(responseList.getValue()).thenReturn(getValues);
+        List<String> actual = repository.getChildrenKeys(key);
+        assertThat(actual.size(), is(2));
+        Iterator<String> iterator = actual.iterator();
+        assertThat(iterator.next(), is("/key/key1/key1-1"));
+        assertThat(iterator.next(), is("/key/key2"));
+    }
+    
+    @Test
+    @SuppressWarnings("unchecked")
+    public void assertPersistEphemeral() {
+        repository.persistEphemeral("key1", "value1");
+        verify(client).sessionCreate(any(NewSession.class), any(QueryParams.class));
+        verify(client).setKVValue(any(String.class), any(String.class), any(PutParams.class));
+        try {
+            Thread.sleep(6000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        verify(client).renewSession(any(String.class), any(QueryParams.class));
+    }
+    
+    @Test
+    public void assertWatchUpdate() {
+        final String key = "sharding/key";
+        final String k1 = "sharding/key/key1";
+        final String v1 = "value1";
+        client.setKVValue(k1, v1);
+        GetValue getValue1 = new GetValue();
+        getValue1.setKey(k1);
+        getValue1.setValue(v1);
+        List<GetValue> getValues = Arrays.asList(getValue1);
+        when(responseGetValueList.getValue()).thenReturn(getValues);
+        repository.watch(key, event -> {
+        });
+        client.setKVValue(k1, "value1-1");
+        verify(client, atLeastOnce()).getKVValues(any(String.class), any(QueryParams.class));
+        try {
+            Thread.sleep(10000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        
+    }
+    
+    @Test
+    public void assertWatchDelete() {
+        final String key = "sharding/key";
+        final String k1 = "sharding/key/key1";
+        final String v1 = "value1";
+        final String k2 = "sharding/key/key2";
+        final String v2 = "value1";
+        client.setKVValue(k1, v1);
+        client.setKVValue(k2, v2);
+        GetValue getValue1 = new GetValue();
+        getValue1.setKey(k1);
+        getValue1.setValue(v1);
+        List<GetValue> getValues = Arrays.asList(getValue1);
+        when(responseGetValueList.getValue()).thenReturn(getValues);
+        repository.watch(key, event -> {
+        });
+        client.deleteKVValue(k2);
+        verify(client, atLeastOnce()).getKVValues(any(String.class), any(QueryParams.class));
+        try {
+            Thread.sleep(10000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+    
+    @Test
+    public void assertWatchIgnored() {
+        // TODO
+    }
+    
+    @Test
+    public void assertDelete() {
+        repository.delete("key");
+        verify(client).deleteKVValue(any(String.class));
+    }
+    
+    @Test
+    public void assertPersist() {
+        repository.persist("key1", "value1");
+        verify(client).setKVValue(any(String.class), any(String.class));
+    }
+    
+    @Test
+    public void assertClose() {
+        repository.close();
+        // verify(client).close();
+    }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/props/ConsulPropertiesTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardings [...]
new file mode 100644
index 00000000000..c1809ad218b
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/props/ConsulPropertiesTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.consul.props;
+
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public final class ConsulPropertiesTest {
+    
+    @Test
+    public void assertGetValue() {
+        assertThat(new ConsulProperties(createProperties()).getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS), is(60L));
+    }
+    
+    private Properties createProperties() {
+        Properties result = new Properties();
+        result.setProperty(ConsulPropertyKey.TIME_TO_LIVE_SECONDS.getKey(), "50");
+        return result;
+    }
+    
+    @Test
+    public void assertGetDefaultValue() {
+        assertThat(new ConsulProperties(new Properties()).getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS), is(30L));
+    }
+}