You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by yx...@apache.org on 2022/10/30 13:08:05 UTC
[shardingsphere] branch master updated: Refactor ConsulDistributedLock (#21853)
This is an automated email from the ASF dual-hosted git repository.
yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0b1a40213f2 Refactor ConsulDistributedLock (#21853)
0b1a40213f2 is described below
commit 0b1a40213f28442fdfd5de91627cdd8f678ca7d3
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Sun Oct 30 21:07:59 2022 +0800
Refactor ConsulDistributedLock (#21853)
* Refactor ConsulDistributedLock
* Fix checkstyle
---
.../cluster/consul/ShardingSphereQueryParams.java | 6 +-
.../cluster/consul/lock/ConsulDistributedLock.java | 110 +++++++++------------
.../consul/lock/ConsulDistributedLockHolder.java | 19 +---
3 files changed, 50 insertions(+), 85 deletions(-)
diff --git a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereQueryParams.java b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereQueryParams.java
index 65d19da64bd..f1110840f0b 100644
--- a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereQueryParams.java
+++ b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereQueryParams.java
@@ -31,15 +31,15 @@ import java.util.concurrent.TimeUnit;
@RequiredArgsConstructor
public final class ShardingSphereQueryParams implements UrlParameters {
- private final long waitTime;
+ private final long waitMillis;
private final long index;
@Override
public List<String> toUrlParameters() {
List<String> result = new ArrayList<>(2);
- if (-1 != waitTime) {
- result.add(String.format("wait=%dms", TimeUnit.MILLISECONDS.toMillis(waitTime)));
+ if (-1 != waitMillis) {
+ result.add(String.format("wait=%dms", TimeUnit.MILLISECONDS.toMillis(waitMillis)));
}
if (-1 != index) {
result.add(String.format("index=%s", Utils.toUnsignedString(index)));
diff --git a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLock.java b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLock.java
index eb6204d6193..af071595747 100644
--- a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLock.java
+++ b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLock.java
@@ -27,10 +27,9 @@ import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.kv.model.GetValue;
import com.ecwid.consul.v1.kv.model.PutParams;
import com.ecwid.consul.v1.session.model.NewSession;
-import com.ecwid.consul.v1.session.model.Session;
+import com.ecwid.consul.v1.session.model.Session.Behavior;
import com.google.common.base.Strings;
import com.google.common.reflect.TypeToken;
-import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.mode.repository.cluster.consul.ShardingSphereConsulClient;
import org.apache.shardingsphere.mode.repository.cluster.consul.ShardingSphereQueryParams;
import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
@@ -40,31 +39,34 @@ import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* Consul distributed lock.
*/
-@RequiredArgsConstructor
public final class ConsulDistributedLock implements DistributedLock {
- private static final String CONSUL_ROOT_PATH = "sharding/lock";
+ private static final String LOCK_PATH_PATTERN = "lock/%s";
- private static final String CONSUL_PATH_SEPARATOR = "/";
+ private static final String LOCK_VALUE = "LOCKED";
- private static final String DEFAULT_CONSUL_LOCK_VALUE = "LOCKED";
-
- private static final String DEFAULT_CONSUL_UNLOCK_VALUE = "UNLOCKED";
+ private static final String UNLOCK_VALUE = "UNLOCKED";
private static final ScheduledThreadPoolExecutor SESSION_FLUSH_EXECUTOR = new ScheduledThreadPoolExecutor(2);
- private final String lockKey;
+ private final String lockPath;
private final ConsulClient client;
- private final ConsulProperties props;
+ private final String timeToLiveSeconds;
+
+ private final ThreadLocal<String> lockSessionMap;
- private final ThreadLocal<String> lockSessionMap = new ThreadLocal<>();
+ public ConsulDistributedLock(final String lockKey, final ConsulClient client, final ConsulProperties props) {
+ lockPath = String.format(LOCK_PATH_PATTERN, lockKey);
+ this.client = client;
+ timeToLiveSeconds = props.getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS);
+ lockSessionMap = new ThreadLocal<>();
+ }
@Override
public boolean tryLock(final long timeoutMillis) {
@@ -72,89 +74,68 @@ public final class ConsulDistributedLock implements DistributedLock {
return true;
}
try {
- long lockTime = timeoutMillis;
PutParams putParams = new PutParams();
- String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockKey;
+ long remainingMillis = timeoutMillis;
while (true) {
- String sessionId = createSession(lockPath);
+ String sessionId = createSessionId();
putParams.setAcquireSession(sessionId);
- Response<Boolean> response = client.setKVValue(lockPath, DEFAULT_CONSUL_LOCK_VALUE, putParams);
+ Response<Boolean> response = client.setKVValue(lockPath, LOCK_VALUE, putParams);
if (response.getValue()) {
- // lock success
lockSessionMap.set(sessionId);
SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> client.renewSession(sessionId, QueryParams.DEFAULT), 5L, 10L, TimeUnit.SECONDS);
return true;
}
- // lock failed,exist race so retry
- // block query if value is change so return
client.sessionDestroy(sessionId, null);
- long waitTime = doWaitRelease(lockPath, response.getConsulIndex(), lockTime);
- if (waitTime < lockTime) {
- lockTime = lockTime - waitTime;
- continue;
+ long waitingMillis = waitUntilRelease(response.getConsulIndex(), remainingMillis);
+ if (waitingMillis >= remainingMillis) {
+ return false;
}
- return false;
+ remainingMillis -= waitingMillis;
}
// CHECKSTYLE:OFF
- } catch (final Exception ex) {
+ } catch (final Exception ignored) {
// CHECKSTYLE:ON
return false;
}
}
- private String createSession(final String lockName) {
+ private String createSessionId() {
NewSession session = new NewSession();
- session.setName(lockName);
- // lock was released by force while session is invalid
- session.setBehavior(Session.Behavior.RELEASE);
- session.setTtl(props.getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS));
+ session.setName(lockPath);
+ session.setTtl(timeToLiveSeconds);
+ session.setBehavior(Behavior.RELEASE);
return client.sessionCreate(session, null).getValue();
}
- private long doWaitRelease(final String key, final long valueIndex, final long waitTime) {
- long currentIndex = valueIndex;
- if (currentIndex < 0) {
- currentIndex = 0;
- }
- AtomicBoolean running = new AtomicBoolean(true);
- long waitCostTime = 0L;
- long now = System.currentTimeMillis();
- long deadlineWaitTime = now + waitTime;
- long blockWaitTime = waitTime;
- while (running.get()) {
- long startWaitTime = System.currentTimeMillis();
- if (startWaitTime >= deadlineWaitTime) {
- // wait time is reached max
- return waitTime;
+ private long waitUntilRelease(final long valueIndex, final long timeoutMillis) {
+ long currentIndex = valueIndex < 0 ? 0 : valueIndex;
+ long spentMillis = 0L;
+ long timeoutTime = System.currentTimeMillis() + timeoutMillis;
+ long remainingMillis = timeoutMillis;
+ while (true) {
+ long startTime = System.currentTimeMillis();
+ if (startTime >= timeoutTime) {
+ return timeoutMillis;
}
- RawResponse rawResponse = ((ShardingSphereConsulClient) client).getRawClient().makeGetRequest("/v1/kv/" + key, null, new ShardingSphereQueryParams(blockWaitTime, currentIndex));
- Response<GetValue> response = warpRawResponse(rawResponse);
+ Response<GetValue> response = getResponse(
+ ((ShardingSphereConsulClient) client).getRawClient().makeGetRequest("/v1/kv/" + lockPath, null, new ShardingSphereQueryParams(remainingMillis, currentIndex)));
+ spentMillis += System.currentTimeMillis() - startTime;
+ remainingMillis -= spentMillis;
Long index = response.getConsulIndex();
- waitCostTime += System.currentTimeMillis() - startWaitTime;
- blockWaitTime -= waitCostTime;
if (null != index && index >= currentIndex) {
- if (currentIndex == 0) {
- currentIndex = index;
- continue;
+ if (0 != currentIndex && (null == response.getValue() || null == response.getValue().getValue() || lockPath.equals(response.getValue().getKey()))) {
+ return spentMillis;
}
currentIndex = index;
- GetValue getValue = response.getValue();
- if (null == getValue || null == getValue.getValue()) {
- return waitCostTime;
- }
- if (!key.equals(getValue.getKey())) {
- continue;
- }
- return waitCostTime;
+ continue;
}
if (null != index) {
currentIndex = 0;
}
}
- return -1;
}
- private Response<GetValue> warpRawResponse(final RawResponse rawResponse) {
+ private Response<GetValue> getResponse(final RawResponse rawResponse) {
if (200 == rawResponse.getStatusCode()) {
List<GetValue> value = GsonFactory.getGson().fromJson(rawResponse.getContent(), new TypeToken<List<GetValue>>() {
@@ -181,11 +162,10 @@ public final class ConsulDistributedLock implements DistributedLock {
PutParams putParams = new PutParams();
String sessionId = lockSessionMap.get();
putParams.setReleaseSession(sessionId);
- String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockKey;
- client.setKVValue(lockPath, DEFAULT_CONSUL_UNLOCK_VALUE, putParams);
+ client.setKVValue(lockPath, UNLOCK_VALUE, putParams);
client.sessionDestroy(sessionId, null);
// CHECKSTYLE:OFF
- } catch (final Exception ex) {
+ } catch (final Exception ignored) {
// CHECKSTYLE:ON
} finally {
lockSessionMap.remove();
diff --git a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockHolder.java b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockHolder.java
index c77561a012f..0cc8b45d827 100644
--- a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockHolder.java
+++ b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockHolder.java
@@ -18,12 +18,11 @@
package org.apache.shardingsphere.mode.repository.cluster.consul.lock;
import com.ecwid.consul.v1.ConsulClient;
-import com.ecwid.consul.v1.session.model.NewSession;
import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLockHolder;
+
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -31,7 +30,6 @@ import java.util.concurrent.ConcurrentHashMap;
* Consul distributed lock holder.
*/
@RequiredArgsConstructor
-@Slf4j
public class ConsulDistributedLockHolder implements DistributedLockHolder {
private final Map<String, ConsulDistributedLock> locks = new ConcurrentHashMap<>();
@@ -44,22 +42,9 @@ public class ConsulDistributedLockHolder implements DistributedLockHolder {
public DistributedLock getDistributedLock(final String lockKey) {
ConsulDistributedLock result = locks.get(lockKey);
if (null == result) {
- result = createLock(lockKey);
+ result = new ConsulDistributedLock(lockKey, client, props);
locks.put(lockKey, result);
}
return result;
}
-
- private ConsulDistributedLock createLock(final String lockName) {
- try {
- NewSession session = new NewSession();
- session.setName(lockName);
- return new ConsulDistributedLock(lockName, client, props);
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- log.error("ConsulRepository tryLock error, lockName:{}", lockName, ex);
- }
- return null;
- }
}