You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by ji...@apache.org on 2022/10/24 23:31:54 UTC
[shardingsphere] branch master updated: Move ConsulInternalLock from internal class to independent class (#21730)
This is an automated email from the ASF dual-hosted git repository.
jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 46732a0b14f Move ConsulInternalLock from internal class to independent class (#21730)
46732a0b14f is described below
commit 46732a0b14fc25a3297fe010d390857f401beaf9
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Tue Oct 25 07:31:37 2022 +0800
Move ConsulInternalLock from internal class to independent class (#21730)
* Move ConsulInternalLock from internal class to independent class
* Move ConsulInternalLock from internal class to independent class
* Move ConsulInternalLock from internal class to independent class
---
.../cluster/consul/lock/ConsulInternalLock.java | 198 +++++++++++++++++++++
.../consul/lock/ConsulInternalLockProvider.java | 160 -----------------
.../data/pipeline/cases/base/BaseITCase.java | 2 +-
3 files changed, 199 insertions(+), 161 deletions(-)
diff --git a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulInternalLock.java b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulInternalLock.java
new file mode 100644
index 00000000000..a746081dd8c
--- /dev/null
+++ b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulInternalLock.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.consul.lock;
+
+import com.ecwid.consul.ConsulException;
+import com.ecwid.consul.json.GsonFactory;
+import com.ecwid.consul.transport.RawResponse;
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.OperationException;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.kv.model.GetValue;
+import com.ecwid.consul.v1.kv.model.PutParams;
+import com.ecwid.consul.v1.session.model.NewSession;
+import com.ecwid.consul.v1.session.model.Session;
+import com.google.common.base.Strings;
+import com.google.common.reflect.TypeToken;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.mode.repository.cluster.consul.ShardingSphereConsulClient;
+import org.apache.shardingsphere.mode.repository.cluster.consul.ShardingSphereQueryParams;
+import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
+import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulPropertyKey;
+import org.apache.shardingsphere.mode.repository.cluster.lock.InternalLock;
+
+import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Consul internal lock holder.
+ */
+@RequiredArgsConstructor
+@Slf4j
+public final class ConsulInternalLock implements InternalLock {
+
+ private static final String CONSUL_ROOT_PATH = "sharding/lock";
+
+ private static final String CONSUL_PATH_SEPARATOR = "/";
+
+ private static final String DEFAULT_CONSUL_LOCK_VALUE = "LOCKED";
+
+ private static final String DEFAULT_CONSUL_UNLOCK_VALUE = "UNLOCKED";
+
+ private static final ScheduledThreadPoolExecutor SESSION_FLUSH_EXECUTOR = new ScheduledThreadPoolExecutor(2);
+
+ private final ConsulClient consulClient;
+
+ private final String lockName;
+
+ private final ConsulProperties consulProperties;
+
+ private final ThreadLocal<String> lockSessionMap = new ThreadLocal<>();
+
+ @Override
+ public boolean tryLock(final long timeoutMillis) {
+ if (!Strings.isNullOrEmpty(lockSessionMap.get())) {
+ return true;
+ }
+ try {
+ long lockTime = timeoutMillis;
+ PutParams putParams = new PutParams();
+ String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockName;
+ while (true) {
+ String sessionId = createSession(lockPath);
+ putParams.setAcquireSession(sessionId);
+ Response<Boolean> response = consulClient.setKVValue(lockPath, DEFAULT_CONSUL_LOCK_VALUE, putParams);
+ if (response.getValue()) {
+ // lock success
+ lockSessionMap.set(sessionId);
+ SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> consulClient.renewSession(sessionId, QueryParams.DEFAULT), 5L, 10L, TimeUnit.SECONDS);
+ return true;
+ }
+ // lock failed,exist race so retry
+ // block query if value is change so return
+ consulClient.sessionDestroy(sessionId, null);
+ long waitTime = doWaitRelease(lockPath, response.getConsulIndex(), lockTime);
+ if (waitTime < lockTime) {
+ lockTime = lockTime - waitTime;
+ continue;
+ }
+ return false;
+ }
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ log.error("EtcdRepository tryLock error, lockName:{}", lockName, ex);
+ return false;
+ }
+ }
+
+ private String createSession(final String lockName) {
+ NewSession session = new NewSession();
+ session.setName(lockName);
+ // lock was released by force while session is invalid
+ session.setBehavior(Session.Behavior.RELEASE);
+ session.setTtl(consulProperties.getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS));
+ return consulClient.sessionCreate(session, null).getValue();
+ }
+
+ private long doWaitRelease(final String key, final long valueIndex, final long waitTime) {
+ long currentIndex = valueIndex;
+ if (currentIndex < 0) {
+ currentIndex = 0;
+ }
+ AtomicBoolean running = new AtomicBoolean(true);
+ long waitCostTime = 0L;
+ long now = System.currentTimeMillis();
+ long deadlineWaitTime = now + waitTime;
+ long blockWaitTime = waitTime;
+ while (running.get()) {
+ long startWaitTime = System.currentTimeMillis();
+ if (startWaitTime >= deadlineWaitTime) {
+ // wait time is reached max
+ return waitTime;
+ }
+ RawResponse rawResponse = ((ShardingSphereConsulClient) consulClient).getRawClient().makeGetRequest("/v1/kv/" + key, null, new ShardingSphereQueryParams(blockWaitTime, currentIndex));
+ Response<GetValue> response = warpRawResponse(rawResponse);
+ Long index = response.getConsulIndex();
+ waitCostTime += System.currentTimeMillis() - startWaitTime;
+ blockWaitTime -= waitCostTime;
+ if (null != index && index >= currentIndex) {
+ if (currentIndex == 0) {
+ currentIndex = index;
+ continue;
+ }
+ currentIndex = index;
+ GetValue getValue = response.getValue();
+ if (null == getValue || null == getValue.getValue()) {
+ return waitCostTime;
+ }
+ if (!key.equals(getValue.getKey())) {
+ continue;
+ }
+ return waitCostTime;
+ }
+ if (null != index) {
+ currentIndex = 0;
+ }
+ }
+ return -1;
+ }
+
+ private Response<GetValue> warpRawResponse(final RawResponse rawResponse) {
+ if (200 == rawResponse.getStatusCode()) {
+ List<GetValue> value = GsonFactory.getGson().fromJson(rawResponse.getContent(), new TypeToken<List<GetValue>>() {
+
+ private static final long serialVersionUID = -5065504617907914417L;
+
+ }.getType());
+ if (value.isEmpty()) {
+ return new Response<>(null, rawResponse);
+ }
+ if (1 == value.size()) {
+ return new Response<>(value.get(0), rawResponse);
+ }
+ throw new ConsulException("Strange response (list size=" + value.size() + ")");
+ }
+ if (404 == rawResponse.getStatusCode()) {
+ return new Response<>(null, rawResponse);
+ }
+ throw new OperationException(rawResponse);
+ }
+
+ @Override
+ public void unlock() {
+ try {
+ PutParams putParams = new PutParams();
+ String sessionId = lockSessionMap.get();
+ putParams.setReleaseSession(sessionId);
+ String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockName;
+ consulClient.setKVValue(lockPath, DEFAULT_CONSUL_UNLOCK_VALUE, putParams);
+ consulClient.sessionDestroy(sessionId, null);
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ log.error("EtcdRepository unlock error, lockName: {}", lockName, ex);
+ } finally {
+ lockSessionMap.remove();
+ }
+ }
+}
diff --git a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulInternalLockProvider.java b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulInternalLockProvider.java
index 8610f659215..c48dcabc0e8 100644
--- a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulInternalLockProvider.java
+++ b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulInternalLockProvider.java
@@ -17,34 +17,19 @@
package org.apache.shardingsphere.mode.repository.cluster.consul.lock;
-import com.ecwid.consul.ConsulException;
-import com.ecwid.consul.json.GsonFactory;
-import com.ecwid.consul.transport.RawResponse;
import com.ecwid.consul.v1.ConsulClient;
-import com.ecwid.consul.v1.OperationException;
import com.ecwid.consul.v1.QueryParams;
-import com.ecwid.consul.v1.Response;
-import com.ecwid.consul.v1.kv.model.GetValue;
-import com.ecwid.consul.v1.kv.model.PutParams;
import com.ecwid.consul.v1.session.model.NewSession;
-import com.ecwid.consul.v1.session.model.Session;
-import com.google.common.base.Strings;
-import com.google.common.reflect.TypeToken;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.mode.repository.cluster.consul.ShardingSphereConsulClient;
-import org.apache.shardingsphere.mode.repository.cluster.consul.ShardingSphereQueryParams;
import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
-import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulPropertyKey;
import org.apache.shardingsphere.mode.repository.cluster.lock.InternalLock;
import org.apache.shardingsphere.mode.repository.cluster.lock.InternalLockProvider;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* Consul internal lock holder.
@@ -53,14 +38,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
public class ConsulInternalLockProvider implements InternalLockProvider {
- private static final String CONSUL_ROOT_PATH = "sharding/lock";
-
- private static final String CONSUL_PATH_SEPARATOR = "/";
-
- private static final String DEFAULT_CONSUL_LOCK_VALUE = "LOCKED";
-
- private static final String DEFAULT_CONSUL_UNLOCK_VALUE = "UNLOCKED";
-
private static final ScheduledThreadPoolExecutor SESSION_FLUSH_EXECUTOR = new ScheduledThreadPoolExecutor(2);
private final Map<String, ConsulInternalLock> locks = new ConcurrentHashMap<>();
@@ -121,141 +98,4 @@ public class ConsulInternalLockProvider implements InternalLockProvider {
public void generatorFlushSessionTtlTask(final ConsulClient consulClient, final String sessionId) {
SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> consulClient.renewSession(sessionId, QueryParams.DEFAULT), 5L, 10L, TimeUnit.SECONDS);
}
-
- @RequiredArgsConstructor
- private static class ConsulInternalLock implements InternalLock {
-
- private final ConsulClient consulClient;
-
- private final String lockName;
-
- private final ConsulProperties consulProperties;
-
- private final ThreadLocal<String> lockSessionMap = new ThreadLocal<>();
-
- @Override
- public boolean tryLock(final long timeoutMillis) {
- if (!Strings.isNullOrEmpty(lockSessionMap.get())) {
- return true;
- }
- try {
- long lockTime = timeoutMillis;
- PutParams putParams = new PutParams();
- String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockName;
- while (true) {
- String sessionId = createSession(lockPath);
- putParams.setAcquireSession(sessionId);
- Response<Boolean> response = consulClient.setKVValue(lockPath, DEFAULT_CONSUL_LOCK_VALUE, putParams);
- if (response.getValue()) {
- // lock success
- lockSessionMap.set(sessionId);
- SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> consulClient.renewSession(sessionId, QueryParams.DEFAULT), 5L, 10L, TimeUnit.SECONDS);
- return true;
- }
- // lock failed,exist race so retry
- // block query if value is change so return
- consulClient.sessionDestroy(sessionId, null);
- long waitTime = doWaitRelease(lockPath, response.getConsulIndex(), lockTime);
- if (waitTime < lockTime) {
- lockTime = lockTime - waitTime;
- continue;
- }
- return false;
- }
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- log.error("EtcdRepository tryLock error, lockName:{}", lockName, ex);
- return false;
- }
- }
-
- private String createSession(final String lockName) {
- NewSession session = new NewSession();
- session.setName(lockName);
- // lock was released by force while session is invalid
- session.setBehavior(Session.Behavior.RELEASE);
- session.setTtl(consulProperties.getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS));
- return consulClient.sessionCreate(session, null).getValue();
- }
-
- private long doWaitRelease(final String key, final long valueIndex, final long waitTime) {
- long currentIndex = valueIndex;
- if (currentIndex < 0) {
- currentIndex = 0;
- }
- AtomicBoolean running = new AtomicBoolean(true);
- long waitCostTime = 0L;
- long now = System.currentTimeMillis();
- long deadlineWaitTime = now + waitTime;
- long blockWaitTime = waitTime;
- while (running.get()) {
- long startWaitTime = System.currentTimeMillis();
- if (startWaitTime >= deadlineWaitTime) {
- // wait time is reached max
- return waitTime;
- }
- RawResponse rawResponse = ((ShardingSphereConsulClient) consulClient).getRawClient().makeGetRequest("/v1/kv/" + key, null, new ShardingSphereQueryParams(blockWaitTime, currentIndex));
- Response<GetValue> response = warpRawResponse(rawResponse);
- Long index = response.getConsulIndex();
- waitCostTime += System.currentTimeMillis() - startWaitTime;
- blockWaitTime -= waitCostTime;
- if (null != index && index >= currentIndex) {
- if (currentIndex == 0) {
- currentIndex = index;
- continue;
- }
- currentIndex = index;
- GetValue getValue = response.getValue();
- if (null == getValue || null == getValue.getValue()) {
- return waitCostTime;
- }
- if (!key.equals(getValue.getKey())) {
- continue;
- }
- return waitCostTime;
- }
- if (null != index) {
- currentIndex = 0;
- }
- }
- return -1;
- }
-
- private Response<GetValue> warpRawResponse(final RawResponse rawResponse) {
- if (200 == rawResponse.getStatusCode()) {
- List<GetValue> value = GsonFactory.getGson().fromJson(rawResponse.getContent(), new TypeToken<List<GetValue>>() {
- }.getType());
- if (value.isEmpty()) {
- return new Response<>(null, rawResponse);
- }
- if (1 == value.size()) {
- return new Response<>(value.get(0), rawResponse);
- }
- throw new ConsulException("Strange response (list size=" + value.size() + ")");
- }
- if (404 == rawResponse.getStatusCode()) {
- return new Response<>(null, rawResponse);
- }
- throw new OperationException(rawResponse);
- }
-
- @Override
- public void unlock() {
- try {
- PutParams putParams = new PutParams();
- String sessionId = lockSessionMap.get();
- putParams.setReleaseSession(sessionId);
- String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockName;
- consulClient.setKVValue(lockPath, DEFAULT_CONSUL_UNLOCK_VALUE, putParams);
- consulClient.sessionDestroy(sessionId, null);
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- log.error("EtcdRepository unlock error, lockName: {}", lockName, ex);
- } finally {
- lockSessionMap.remove();
- }
- }
- }
}
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index dd5c14db202..085c7f1f790 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -301,7 +301,7 @@ public abstract class BaseITCase {
}
protected void assertProxyOrderRecordExist(final Object id) throws SQLException {
- // must refresh firstly, otherwise proxy can't get schema and table info
+ // must refresh firstly, otherwise proxy can't get schema and table info
boolean recordExist = false;
proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
for (int i = 0; i < 5; i++) {