You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by yx...@apache.org on 2022/10/30 13:08:05 UTC

[shardingsphere] branch master updated: Refactor ConsulDistributedLock (#21853)

This is an automated email from the ASF dual-hosted git repository.

yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b1a40213f2 Refactor ConsulDistributedLock (#21853)
0b1a40213f2 is described below

commit 0b1a40213f28442fdfd5de91627cdd8f678ca7d3
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Sun Oct 30 21:07:59 2022 +0800

    Refactor ConsulDistributedLock (#21853)
    
    * Refactor ConsulDistributedLock
    
    * Fix checkstyle
---
 .../cluster/consul/ShardingSphereQueryParams.java  |   6 +-
 .../cluster/consul/lock/ConsulDistributedLock.java | 110 +++++++++------------
 .../consul/lock/ConsulDistributedLockHolder.java   |  19 +---
 3 files changed, 50 insertions(+), 85 deletions(-)

diff --git a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereQueryParams.java b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereQueryParams.java
index 65d19da64bd..f1110840f0b 100644
--- a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereQueryParams.java
+++ b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereQueryParams.java
@@ -31,15 +31,15 @@ import java.util.concurrent.TimeUnit;
 @RequiredArgsConstructor
 public final class ShardingSphereQueryParams implements UrlParameters {
     
-    private final long waitTime;
+    private final long waitMillis;
     
     private final long index;
     
     @Override
     public List<String> toUrlParameters() {
         List<String> result = new ArrayList<>(2);
-        if (-1 != waitTime) {
-            result.add(String.format("wait=%dms", TimeUnit.MILLISECONDS.toMillis(waitTime)));
+        if (-1 != waitMillis) {
+            result.add(String.format("wait=%dms", TimeUnit.MILLISECONDS.toMillis(waitMillis)));
         }
         if (-1 != index) {
             result.add(String.format("index=%s", Utils.toUnsignedString(index)));
diff --git a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLock.java b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLock.java
index eb6204d6193..af071595747 100644
--- a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLock.java
+++ b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLock.java
@@ -27,10 +27,9 @@ import com.ecwid.consul.v1.Response;
 import com.ecwid.consul.v1.kv.model.GetValue;
 import com.ecwid.consul.v1.kv.model.PutParams;
 import com.ecwid.consul.v1.session.model.NewSession;
-import com.ecwid.consul.v1.session.model.Session;
+import com.ecwid.consul.v1.session.model.Session.Behavior;
 import com.google.common.base.Strings;
 import com.google.common.reflect.TypeToken;
-import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.mode.repository.cluster.consul.ShardingSphereConsulClient;
 import org.apache.shardingsphere.mode.repository.cluster.consul.ShardingSphereQueryParams;
 import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
@@ -40,31 +39,34 @@ import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
 import java.util.List;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Consul distributed lock.
  */
-@RequiredArgsConstructor
 public final class ConsulDistributedLock implements DistributedLock {
     
-    private static final String CONSUL_ROOT_PATH = "sharding/lock";
+    private static final String LOCK_PATH_PATTERN = "lock/%s";
     
-    private static final String CONSUL_PATH_SEPARATOR = "/";
+    private static final String LOCK_VALUE = "LOCKED";
     
-    private static final String DEFAULT_CONSUL_LOCK_VALUE = "LOCKED";
-    
-    private static final String DEFAULT_CONSUL_UNLOCK_VALUE = "UNLOCKED";
+    private static final String UNLOCK_VALUE = "UNLOCKED";
     
     private static final ScheduledThreadPoolExecutor SESSION_FLUSH_EXECUTOR = new ScheduledThreadPoolExecutor(2);
     
-    private final String lockKey;
+    private final String lockPath;
     
     private final ConsulClient client;
     
-    private final ConsulProperties props;
+    private final String timeToLiveSeconds;
+    
+    private final ThreadLocal<String> lockSessionMap;
     
-    private final ThreadLocal<String> lockSessionMap = new ThreadLocal<>();
+    public ConsulDistributedLock(final String lockKey, final ConsulClient client, final ConsulProperties props) {
+        lockPath = String.format(LOCK_PATH_PATTERN, lockKey);
+        this.client = client;
+        timeToLiveSeconds = props.getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS);
+        lockSessionMap = new ThreadLocal<>();
+    }
     
     @Override
     public boolean tryLock(final long timeoutMillis) {
@@ -72,89 +74,68 @@ public final class ConsulDistributedLock implements DistributedLock {
             return true;
         }
         try {
-            long lockTime = timeoutMillis;
             PutParams putParams = new PutParams();
-            String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockKey;
+            long remainingMillis = timeoutMillis;
             while (true) {
-                String sessionId = createSession(lockPath);
+                String sessionId = createSessionId();
                 putParams.setAcquireSession(sessionId);
-                Response<Boolean> response = client.setKVValue(lockPath, DEFAULT_CONSUL_LOCK_VALUE, putParams);
+                Response<Boolean> response = client.setKVValue(lockPath, LOCK_VALUE, putParams);
                 if (response.getValue()) {
-                    // lock success
                     lockSessionMap.set(sessionId);
                     SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> client.renewSession(sessionId, QueryParams.DEFAULT), 5L, 10L, TimeUnit.SECONDS);
                     return true;
                 }
-                // lock failed,exist race so retry
-                // block query if value is change so return
                 client.sessionDestroy(sessionId, null);
-                long waitTime = doWaitRelease(lockPath, response.getConsulIndex(), lockTime);
-                if (waitTime < lockTime) {
-                    lockTime = lockTime - waitTime;
-                    continue;
+                long waitingMillis = waitUntilRelease(response.getConsulIndex(), remainingMillis);
+                if (waitingMillis >= remainingMillis) {
+                    return false;
                 }
-                return false;
+                remainingMillis -= waitingMillis;
             }
             // CHECKSTYLE:OFF
-        } catch (final Exception ex) {
+        } catch (final Exception ignored) {
             // CHECKSTYLE:ON
             return false;
         }
     }
     
-    private String createSession(final String lockName) {
+    private String createSessionId() {
         NewSession session = new NewSession();
-        session.setName(lockName);
-        // lock was released by force while session is invalid
-        session.setBehavior(Session.Behavior.RELEASE);
-        session.setTtl(props.getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS));
+        session.setName(lockPath);
+        session.setTtl(timeToLiveSeconds);
+        session.setBehavior(Behavior.RELEASE);
         return client.sessionCreate(session, null).getValue();
     }
     
-    private long doWaitRelease(final String key, final long valueIndex, final long waitTime) {
-        long currentIndex = valueIndex;
-        if (currentIndex < 0) {
-            currentIndex = 0;
-        }
-        AtomicBoolean running = new AtomicBoolean(true);
-        long waitCostTime = 0L;
-        long now = System.currentTimeMillis();
-        long deadlineWaitTime = now + waitTime;
-        long blockWaitTime = waitTime;
-        while (running.get()) {
-            long startWaitTime = System.currentTimeMillis();
-            if (startWaitTime >= deadlineWaitTime) {
-                // wait time is reached max
-                return waitTime;
+    private long waitUntilRelease(final long valueIndex, final long timeoutMillis) {
+        long currentIndex = valueIndex < 0 ? 0 : valueIndex;
+        long spentMillis = 0L;
+        long timeoutTime = System.currentTimeMillis() + timeoutMillis;
+        long remainingMillis = timeoutMillis;
+        while (true) {
+            long startTime = System.currentTimeMillis();
+            if (startTime >= timeoutTime) {
+                return timeoutMillis;
             }
-            RawResponse rawResponse = ((ShardingSphereConsulClient) client).getRawClient().makeGetRequest("/v1/kv/" + key, null, new ShardingSphereQueryParams(blockWaitTime, currentIndex));
-            Response<GetValue> response = warpRawResponse(rawResponse);
+            Response<GetValue> response = getResponse(
+                    ((ShardingSphereConsulClient) client).getRawClient().makeGetRequest("/v1/kv/" + lockPath, null, new ShardingSphereQueryParams(remainingMillis, currentIndex)));
+            spentMillis += System.currentTimeMillis() - startTime;
+            remainingMillis -= spentMillis;
             Long index = response.getConsulIndex();
-            waitCostTime += System.currentTimeMillis() - startWaitTime;
-            blockWaitTime -= waitCostTime;
             if (null != index && index >= currentIndex) {
-                if (currentIndex == 0) {
-                    currentIndex = index;
-                    continue;
+                if (0 != currentIndex && (null == response.getValue() || null == response.getValue().getValue() || lockPath.equals(response.getValue().getKey()))) {
+                    return spentMillis;
                 }
                 currentIndex = index;
-                GetValue getValue = response.getValue();
-                if (null == getValue || null == getValue.getValue()) {
-                    return waitCostTime;
-                }
-                if (!key.equals(getValue.getKey())) {
-                    continue;
-                }
-                return waitCostTime;
+                continue;
             }
             if (null != index) {
                 currentIndex = 0;
             }
         }
-        return -1;
     }
     
-    private Response<GetValue> warpRawResponse(final RawResponse rawResponse) {
+    private Response<GetValue> getResponse(final RawResponse rawResponse) {
         if (200 == rawResponse.getStatusCode()) {
             List<GetValue> value = GsonFactory.getGson().fromJson(rawResponse.getContent(), new TypeToken<List<GetValue>>() {
                 
@@ -181,11 +162,10 @@ public final class ConsulDistributedLock implements DistributedLock {
             PutParams putParams = new PutParams();
             String sessionId = lockSessionMap.get();
             putParams.setReleaseSession(sessionId);
-            String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockKey;
-            client.setKVValue(lockPath, DEFAULT_CONSUL_UNLOCK_VALUE, putParams);
+            client.setKVValue(lockPath, UNLOCK_VALUE, putParams);
             client.sessionDestroy(sessionId, null);
             // CHECKSTYLE:OFF
-        } catch (final Exception ex) {
+        } catch (final Exception ignored) {
             // CHECKSTYLE:ON
         } finally {
             lockSessionMap.remove();
diff --git a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockHolder.java b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockHolder.java
index c77561a012f..0cc8b45d827 100644
--- a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockHolder.java
+++ b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockHolder.java
@@ -18,12 +18,11 @@
 package org.apache.shardingsphere.mode.repository.cluster.consul.lock;
 
 import com.ecwid.consul.v1.ConsulClient;
-import com.ecwid.consul.v1.session.model.NewSession;
 import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
 import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
 import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLockHolder;
+
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -31,7 +30,6 @@ import java.util.concurrent.ConcurrentHashMap;
  * Consul distributed lock holder.
  */
 @RequiredArgsConstructor
-@Slf4j
 public class ConsulDistributedLockHolder implements DistributedLockHolder {
     
     private final Map<String, ConsulDistributedLock> locks = new ConcurrentHashMap<>();
@@ -44,22 +42,9 @@ public class ConsulDistributedLockHolder implements DistributedLockHolder {
     public DistributedLock getDistributedLock(final String lockKey) {
         ConsulDistributedLock result = locks.get(lockKey);
         if (null == result) {
-            result = createLock(lockKey);
+            result = new ConsulDistributedLock(lockKey, client, props);
             locks.put(lockKey, result);
         }
         return result;
     }
-    
-    private ConsulDistributedLock createLock(final String lockName) {
-        try {
-            NewSession session = new NewSession();
-            session.setName(lockName);
-            return new ConsulDistributedLock(lockName, client, props);
-            // CHECKSTYLE:OFF
-        } catch (final Exception ex) {
-            // CHECKSTYLE:ON
-            log.error("ConsulRepository tryLock error, lockName:{}", lockName, ex);
-        }
-        return null;
-    }
 }