You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by ji...@apache.org on 2022/10/24 23:31:54 UTC

[shardingsphere] branch master updated: Move ConsulInternalLock from internal class to independent class (#21730)

This is an automated email from the ASF dual-hosted git repository.

jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 46732a0b14f Move ConsulInternalLock from internal class to independent class (#21730)
46732a0b14f is described below

commit 46732a0b14fc25a3297fe010d390857f401beaf9
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Tue Oct 25 07:31:37 2022 +0800

    Move ConsulInternalLock from internal class to independent class (#21730)
    
    * Move ConsulInternalLock from internal class to independent class
    
    * Move ConsulInternalLock from internal class to independent class
    
    * Move ConsulInternalLock from internal class to independent class
---
 .../cluster/consul/lock/ConsulInternalLock.java    | 198 +++++++++++++++++++++
 .../consul/lock/ConsulInternalLockProvider.java    | 160 -----------------
 .../data/pipeline/cases/base/BaseITCase.java       |   2 +-
 3 files changed, 199 insertions(+), 161 deletions(-)

diff --git a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulInternalLock.java b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulInternalLock.java
new file mode 100644
index 00000000000..a746081dd8c
--- /dev/null
+++ b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulInternalLock.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.consul.lock;
+
+import com.ecwid.consul.ConsulException;
+import com.ecwid.consul.json.GsonFactory;
+import com.ecwid.consul.transport.RawResponse;
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.OperationException;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.kv.model.GetValue;
+import com.ecwid.consul.v1.kv.model.PutParams;
+import com.ecwid.consul.v1.session.model.NewSession;
+import com.ecwid.consul.v1.session.model.Session;
+import com.google.common.base.Strings;
+import com.google.common.reflect.TypeToken;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.mode.repository.cluster.consul.ShardingSphereConsulClient;
+import org.apache.shardingsphere.mode.repository.cluster.consul.ShardingSphereQueryParams;
+import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
+import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulPropertyKey;
+import org.apache.shardingsphere.mode.repository.cluster.lock.InternalLock;
+
+import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Consul internal lock holder.
+ */
+@RequiredArgsConstructor
+@Slf4j
+public final class ConsulInternalLock implements InternalLock {
+    
+    private static final String CONSUL_ROOT_PATH = "sharding/lock";
+    
+    private static final String CONSUL_PATH_SEPARATOR = "/";
+    
+    private static final String DEFAULT_CONSUL_LOCK_VALUE = "LOCKED";
+    
+    private static final String DEFAULT_CONSUL_UNLOCK_VALUE = "UNLOCKED";
+    
+    private static final ScheduledThreadPoolExecutor SESSION_FLUSH_EXECUTOR = new ScheduledThreadPoolExecutor(2);
+    
+    private final ConsulClient consulClient;
+    
+    private final String lockName;
+    
+    private final ConsulProperties consulProperties;
+    
+    private final ThreadLocal<String> lockSessionMap = new ThreadLocal<>();
+    
+    @Override
+    public boolean tryLock(final long timeoutMillis) {
+        if (!Strings.isNullOrEmpty(lockSessionMap.get())) {
+            return true;
+        }
+        try {
+            long lockTime = timeoutMillis;
+            PutParams putParams = new PutParams();
+            String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockName;
+            while (true) {
+                String sessionId = createSession(lockPath);
+                putParams.setAcquireSession(sessionId);
+                Response<Boolean> response = consulClient.setKVValue(lockPath, DEFAULT_CONSUL_LOCK_VALUE, putParams);
+                if (response.getValue()) {
+                    // lock success
+                    lockSessionMap.set(sessionId);
+                    SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> consulClient.renewSession(sessionId, QueryParams.DEFAULT), 5L, 10L, TimeUnit.SECONDS);
+                    return true;
+                }
+                // lock failed,exist race so retry
+                // block query if value is change so return
+                consulClient.sessionDestroy(sessionId, null);
+                long waitTime = doWaitRelease(lockPath, response.getConsulIndex(), lockTime);
+                if (waitTime < lockTime) {
+                    lockTime = lockTime - waitTime;
+                    continue;
+                }
+                return false;
+            }
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            log.error("EtcdRepository tryLock error, lockName:{}", lockName, ex);
+            return false;
+        }
+    }
+    
+    private String createSession(final String lockName) {
+        NewSession session = new NewSession();
+        session.setName(lockName);
+        // lock was released by force while session is invalid
+        session.setBehavior(Session.Behavior.RELEASE);
+        session.setTtl(consulProperties.getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS));
+        return consulClient.sessionCreate(session, null).getValue();
+    }
+    
+    private long doWaitRelease(final String key, final long valueIndex, final long waitTime) {
+        long currentIndex = valueIndex;
+        if (currentIndex < 0) {
+            currentIndex = 0;
+        }
+        AtomicBoolean running = new AtomicBoolean(true);
+        long waitCostTime = 0L;
+        long now = System.currentTimeMillis();
+        long deadlineWaitTime = now + waitTime;
+        long blockWaitTime = waitTime;
+        while (running.get()) {
+            long startWaitTime = System.currentTimeMillis();
+            if (startWaitTime >= deadlineWaitTime) {
+                // wait time is reached max
+                return waitTime;
+            }
+            RawResponse rawResponse = ((ShardingSphereConsulClient) consulClient).getRawClient().makeGetRequest("/v1/kv/" + key, null, new ShardingSphereQueryParams(blockWaitTime, currentIndex));
+            Response<GetValue> response = warpRawResponse(rawResponse);
+            Long index = response.getConsulIndex();
+            waitCostTime += System.currentTimeMillis() - startWaitTime;
+            blockWaitTime -= waitCostTime;
+            if (null != index && index >= currentIndex) {
+                if (currentIndex == 0) {
+                    currentIndex = index;
+                    continue;
+                }
+                currentIndex = index;
+                GetValue getValue = response.getValue();
+                if (null == getValue || null == getValue.getValue()) {
+                    return waitCostTime;
+                }
+                if (!key.equals(getValue.getKey())) {
+                    continue;
+                }
+                return waitCostTime;
+            }
+            if (null != index) {
+                currentIndex = 0;
+            }
+        }
+        return -1;
+    }
+    
+    private Response<GetValue> warpRawResponse(final RawResponse rawResponse) {
+        if (200 == rawResponse.getStatusCode()) {
+            List<GetValue> value = GsonFactory.getGson().fromJson(rawResponse.getContent(), new TypeToken<List<GetValue>>() {
+                
+                private static final long serialVersionUID = -5065504617907914417L;
+                
+            }.getType());
+            if (value.isEmpty()) {
+                return new Response<>(null, rawResponse);
+            }
+            if (1 == value.size()) {
+                return new Response<>(value.get(0), rawResponse);
+            }
+            throw new ConsulException("Strange response (list size=" + value.size() + ")");
+        }
+        if (404 == rawResponse.getStatusCode()) {
+            return new Response<>(null, rawResponse);
+        }
+        throw new OperationException(rawResponse);
+    }
+    
+    @Override
+    public void unlock() {
+        try {
+            PutParams putParams = new PutParams();
+            String sessionId = lockSessionMap.get();
+            putParams.setReleaseSession(sessionId);
+            String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockName;
+            consulClient.setKVValue(lockPath, DEFAULT_CONSUL_UNLOCK_VALUE, putParams);
+            consulClient.sessionDestroy(sessionId, null);
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            log.error("EtcdRepository unlock error, lockName: {}", lockName, ex);
+        } finally {
+            lockSessionMap.remove();
+        }
+    }
+}
diff --git a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulInternalLockProvider.java b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulInternalLockProvider.java
index 8610f659215..c48dcabc0e8 100644
--- a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulInternalLockProvider.java
+++ b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulInternalLockProvider.java
@@ -17,34 +17,19 @@
 
 package org.apache.shardingsphere.mode.repository.cluster.consul.lock;
 
-import com.ecwid.consul.ConsulException;
-import com.ecwid.consul.json.GsonFactory;
-import com.ecwid.consul.transport.RawResponse;
 import com.ecwid.consul.v1.ConsulClient;
-import com.ecwid.consul.v1.OperationException;
 import com.ecwid.consul.v1.QueryParams;
-import com.ecwid.consul.v1.Response;
-import com.ecwid.consul.v1.kv.model.GetValue;
-import com.ecwid.consul.v1.kv.model.PutParams;
 import com.ecwid.consul.v1.session.model.NewSession;
-import com.ecwid.consul.v1.session.model.Session;
-import com.google.common.base.Strings;
-import com.google.common.reflect.TypeToken;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.mode.repository.cluster.consul.ShardingSphereConsulClient;
-import org.apache.shardingsphere.mode.repository.cluster.consul.ShardingSphereQueryParams;
 import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
-import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulPropertyKey;
 import org.apache.shardingsphere.mode.repository.cluster.lock.InternalLock;
 import org.apache.shardingsphere.mode.repository.cluster.lock.InternalLockProvider;
 
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Consul internal lock holder.
@@ -53,14 +38,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 @Slf4j
 public class ConsulInternalLockProvider implements InternalLockProvider {
     
-    private static final String CONSUL_ROOT_PATH = "sharding/lock";
-    
-    private static final String CONSUL_PATH_SEPARATOR = "/";
-    
-    private static final String DEFAULT_CONSUL_LOCK_VALUE = "LOCKED";
-    
-    private static final String DEFAULT_CONSUL_UNLOCK_VALUE = "UNLOCKED";
-    
     private static final ScheduledThreadPoolExecutor SESSION_FLUSH_EXECUTOR = new ScheduledThreadPoolExecutor(2);
     
     private final Map<String, ConsulInternalLock> locks = new ConcurrentHashMap<>();
@@ -121,141 +98,4 @@ public class ConsulInternalLockProvider implements InternalLockProvider {
     public void generatorFlushSessionTtlTask(final ConsulClient consulClient, final String sessionId) {
         SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> consulClient.renewSession(sessionId, QueryParams.DEFAULT), 5L, 10L, TimeUnit.SECONDS);
     }
-    
-    @RequiredArgsConstructor
-    private static class ConsulInternalLock implements InternalLock {
-        
-        private final ConsulClient consulClient;
-        
-        private final String lockName;
-        
-        private final ConsulProperties consulProperties;
-        
-        private final ThreadLocal<String> lockSessionMap = new ThreadLocal<>();
-        
-        @Override
-        public boolean tryLock(final long timeoutMillis) {
-            if (!Strings.isNullOrEmpty(lockSessionMap.get())) {
-                return true;
-            }
-            try {
-                long lockTime = timeoutMillis;
-                PutParams putParams = new PutParams();
-                String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockName;
-                while (true) {
-                    String sessionId = createSession(lockPath);
-                    putParams.setAcquireSession(sessionId);
-                    Response<Boolean> response = consulClient.setKVValue(lockPath, DEFAULT_CONSUL_LOCK_VALUE, putParams);
-                    if (response.getValue()) {
-                        // lock success
-                        lockSessionMap.set(sessionId);
-                        SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> consulClient.renewSession(sessionId, QueryParams.DEFAULT), 5L, 10L, TimeUnit.SECONDS);
-                        return true;
-                    }
-                    // lock failed,exist race so retry
-                    // block query if value is change so return
-                    consulClient.sessionDestroy(sessionId, null);
-                    long waitTime = doWaitRelease(lockPath, response.getConsulIndex(), lockTime);
-                    if (waitTime < lockTime) {
-                        lockTime = lockTime - waitTime;
-                        continue;
-                    }
-                    return false;
-                }
-                // CHECKSTYLE:OFF
-            } catch (final Exception ex) {
-                // CHECKSTYLE:ON
-                log.error("EtcdRepository tryLock error, lockName:{}", lockName, ex);
-                return false;
-            }
-        }
-        
-        private String createSession(final String lockName) {
-            NewSession session = new NewSession();
-            session.setName(lockName);
-            // lock was released by force while session is invalid
-            session.setBehavior(Session.Behavior.RELEASE);
-            session.setTtl(consulProperties.getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS));
-            return consulClient.sessionCreate(session, null).getValue();
-        }
-        
-        private long doWaitRelease(final String key, final long valueIndex, final long waitTime) {
-            long currentIndex = valueIndex;
-            if (currentIndex < 0) {
-                currentIndex = 0;
-            }
-            AtomicBoolean running = new AtomicBoolean(true);
-            long waitCostTime = 0L;
-            long now = System.currentTimeMillis();
-            long deadlineWaitTime = now + waitTime;
-            long blockWaitTime = waitTime;
-            while (running.get()) {
-                long startWaitTime = System.currentTimeMillis();
-                if (startWaitTime >= deadlineWaitTime) {
-                    // wait time is reached max
-                    return waitTime;
-                }
-                RawResponse rawResponse = ((ShardingSphereConsulClient) consulClient).getRawClient().makeGetRequest("/v1/kv/" + key, null, new ShardingSphereQueryParams(blockWaitTime, currentIndex));
-                Response<GetValue> response = warpRawResponse(rawResponse);
-                Long index = response.getConsulIndex();
-                waitCostTime += System.currentTimeMillis() - startWaitTime;
-                blockWaitTime -= waitCostTime;
-                if (null != index && index >= currentIndex) {
-                    if (currentIndex == 0) {
-                        currentIndex = index;
-                        continue;
-                    }
-                    currentIndex = index;
-                    GetValue getValue = response.getValue();
-                    if (null == getValue || null == getValue.getValue()) {
-                        return waitCostTime;
-                    }
-                    if (!key.equals(getValue.getKey())) {
-                        continue;
-                    }
-                    return waitCostTime;
-                }
-                if (null != index) {
-                    currentIndex = 0;
-                }
-            }
-            return -1;
-        }
-        
-        private Response<GetValue> warpRawResponse(final RawResponse rawResponse) {
-            if (200 == rawResponse.getStatusCode()) {
-                List<GetValue> value = GsonFactory.getGson().fromJson(rawResponse.getContent(), new TypeToken<List<GetValue>>() {
-                }.getType());
-                if (value.isEmpty()) {
-                    return new Response<>(null, rawResponse);
-                }
-                if (1 == value.size()) {
-                    return new Response<>(value.get(0), rawResponse);
-                }
-                throw new ConsulException("Strange response (list size=" + value.size() + ")");
-            }
-            if (404 == rawResponse.getStatusCode()) {
-                return new Response<>(null, rawResponse);
-            }
-            throw new OperationException(rawResponse);
-        }
-        
-        @Override
-        public void unlock() {
-            try {
-                PutParams putParams = new PutParams();
-                String sessionId = lockSessionMap.get();
-                putParams.setReleaseSession(sessionId);
-                String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockName;
-                consulClient.setKVValue(lockPath, DEFAULT_CONSUL_UNLOCK_VALUE, putParams);
-                consulClient.sessionDestroy(sessionId, null);
-                // CHECKSTYLE:OFF
-            } catch (final Exception ex) {
-                // CHECKSTYLE:ON
-                log.error("EtcdRepository unlock error, lockName: {}", lockName, ex);
-            } finally {
-                lockSessionMap.remove();
-            }
-        }
-    }
 }
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index dd5c14db202..085c7f1f790 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -301,7 +301,7 @@ public abstract class BaseITCase {
     }
     
     protected void assertProxyOrderRecordExist(final Object id) throws SQLException {
-        // must refresh firstly, otherwise proxy can't get schema and table info 
+        // must refresh firstly, otherwise proxy can't get schema and table info
         boolean recordExist = false;
         proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
         for (int i = 0; i < 5; i++) {