You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by su...@apache.org on 2022/10/30 10:26:24 UTC
[shardingsphere] branch master updated: Refactor ConsulDistributedLock (#21850)
This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d6b92154cc9 Refactor ConsulDistributedLock (#21850)
d6b92154cc9 is described below
commit d6b92154cc91a87d60b69f1055fced858d15a451
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Sun Oct 30 18:26:18 2022 +0800
Refactor ConsulDistributedLock (#21850)
---
.../cluster/consul/lock/ConsulDistributedLock.java | 22 +++++++++++-----------
.../consul/lock/ConsulDistributedLockHolder.java | 7 +++----
2 files changed, 14 insertions(+), 15 deletions(-)
diff --git a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLock.java b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLock.java
index 050ee7fe7be..90d9bf54a08 100644
--- a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLock.java
+++ b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLock.java
@@ -58,11 +58,11 @@ public final class ConsulDistributedLock implements DistributedLock {
private static final ScheduledThreadPoolExecutor SESSION_FLUSH_EXECUTOR = new ScheduledThreadPoolExecutor(2);
- private final ConsulClient consulClient;
-
private final String lockName;
- private final ConsulProperties consulProperties;
+ private final ConsulClient client;
+
+ private final ConsulProperties props;
private final ThreadLocal<String> lockSessionMap = new ThreadLocal<>();
@@ -78,16 +78,16 @@ public final class ConsulDistributedLock implements DistributedLock {
while (true) {
String sessionId = createSession(lockPath);
putParams.setAcquireSession(sessionId);
- Response<Boolean> response = consulClient.setKVValue(lockPath, DEFAULT_CONSUL_LOCK_VALUE, putParams);
+ Response<Boolean> response = client.setKVValue(lockPath, DEFAULT_CONSUL_LOCK_VALUE, putParams);
if (response.getValue()) {
// lock success
lockSessionMap.set(sessionId);
- SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> consulClient.renewSession(sessionId, QueryParams.DEFAULT), 5L, 10L, TimeUnit.SECONDS);
+ SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> client.renewSession(sessionId, QueryParams.DEFAULT), 5L, 10L, TimeUnit.SECONDS);
return true;
}
// lock failed,exist race so retry
// block query if value is change so return
- consulClient.sessionDestroy(sessionId, null);
+ client.sessionDestroy(sessionId, null);
long waitTime = doWaitRelease(lockPath, response.getConsulIndex(), lockTime);
if (waitTime < lockTime) {
lockTime = lockTime - waitTime;
@@ -107,8 +107,8 @@ public final class ConsulDistributedLock implements DistributedLock {
session.setName(lockName);
// lock was released by force while session is invalid
session.setBehavior(Session.Behavior.RELEASE);
- session.setTtl(consulProperties.getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS));
- return consulClient.sessionCreate(session, null).getValue();
+ session.setTtl(props.getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS));
+ return client.sessionCreate(session, null).getValue();
}
private long doWaitRelease(final String key, final long valueIndex, final long waitTime) {
@@ -127,7 +127,7 @@ public final class ConsulDistributedLock implements DistributedLock {
// wait time is reached max
return waitTime;
}
- RawResponse rawResponse = ((ShardingSphereConsulClient) consulClient).getRawClient().makeGetRequest("/v1/kv/" + key, null, new ShardingSphereQueryParams(blockWaitTime, currentIndex));
+ RawResponse rawResponse = ((ShardingSphereConsulClient) client).getRawClient().makeGetRequest("/v1/kv/" + key, null, new ShardingSphereQueryParams(blockWaitTime, currentIndex));
Response<GetValue> response = warpRawResponse(rawResponse);
Long index = response.getConsulIndex();
waitCostTime += System.currentTimeMillis() - startWaitTime;
@@ -182,8 +182,8 @@ public final class ConsulDistributedLock implements DistributedLock {
String sessionId = lockSessionMap.get();
putParams.setReleaseSession(sessionId);
String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockName;
- consulClient.setKVValue(lockPath, DEFAULT_CONSUL_UNLOCK_VALUE, putParams);
- consulClient.sessionDestroy(sessionId, null);
+ client.setKVValue(lockPath, DEFAULT_CONSUL_UNLOCK_VALUE, putParams);
+ client.sessionDestroy(sessionId, null);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
diff --git a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockHolder.java b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockHolder.java
index 3cc65df7797..c77561a012f 100644
--- a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockHolder.java
+++ b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockHolder.java
@@ -36,9 +36,9 @@ public class ConsulDistributedLockHolder implements DistributedLockHolder {
private final Map<String, ConsulDistributedLock> locks = new ConcurrentHashMap<>();
- private final ConsulClient consulClient;
+ private final ConsulClient client;
- private final ConsulProperties consulProps;
+ private final ConsulProperties props;
@Override
public DistributedLock getDistributedLock(final String lockKey) {
@@ -54,7 +54,7 @@ public class ConsulDistributedLockHolder implements DistributedLockHolder {
try {
NewSession session = new NewSession();
session.setName(lockName);
- return new ConsulDistributedLock(consulClient, lockName, consulProps);
+ return new ConsulDistributedLock(lockName, client, props);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
@@ -62,5 +62,4 @@ public class ConsulDistributedLockHolder implements DistributedLockHolder {
}
return null;
}
-
}