You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by su...@apache.org on 2022/11/03 11:46:36 UTC

[shardingsphere] branch master updated: Refactor ClusterPersistRepository (#21938)

This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new defe1645330 Refactor ClusterPersistRepository (#21938)
defe1645330 is described below

commit defe16453304bf0d76f9d0e2157aa49c9d7280f4
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Thu Nov 3 19:46:29 2022 +0800

    Refactor ClusterPersistRepository (#21938)
---
 .../fixture/FixtureClusterPersistRepository.java       | 13 +++++--------
 .../cluster/coordinator/ClusterLockPersistService.java |  8 ++++----
 .../manager/cluster/coordinator/RegistryCenter.java    |  2 +-
 .../fixture/ClusterPersistRepositoryFixture.java       | 11 ++++-------
 .../ProcessListClusterPersistRepositoryFixture.java    | 13 +++++--------
 .../repository/cluster/ClusterPersistRepository.java   | 18 +++++-------------
 .../repository/cluster/consul/ConsulRepository.java    | 14 +++-----------
 .../mode/repository/cluster/etcd/EtcdRepository.java   | 12 ++----------
 .../mode/repository/cluster/nacos/NacosRepository.java | 11 +++--------
 .../cluster/zookeeper/ZookeeperRepository.java         | 12 ++----------
 .../proxy/fixture/ClusterPersistRepositoryFixture.java | 13 +++++--------
 .../driver/fixture/TestClusterPersistRepository.java   | 11 ++++-------
 12 files changed, 43 insertions(+), 95 deletions(-)

diff --git a/jdbc/spring/core/spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java b/jdbc/spring/core/spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
index 76188a2b84c..0f662a63b4f 100644
--- a/jdbc/spring/core/spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
+++ b/jdbc/spring/core/spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -71,20 +72,16 @@ public final class FixtureClusterPersistRepository implements ClusterPersistRepo
     }
     
     @Override
-    public void delete(final String key) {
-    }
-    
-    @Override
-    public void watch(final String key, final DataChangedEventListener listener) {
+    public DistributedLockHolder getDistributedLockHolder() {
+        return null;
     }
     
     @Override
-    public boolean tryLock(final String lockKey, final long timeoutMillis) {
-        return false;
+    public void delete(final String key) {
     }
     
     @Override
-    public void unlock(final String lockKey) {
+    public void watch(final String key, final DataChangedEventListener listener) {
     }
     
     @Override
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterLockPersistService.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterLockPersistService.java
index a8dc7afc9de..eb9e28fc644 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterLockPersistService.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterLockPersistService.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.lock.LockDefinition;
 import org.apache.shardingsphere.mode.lock.LockPersistService;
-import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
 
 /**
  * Cluster lock persist service.
@@ -28,15 +28,15 @@ import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
 @RequiredArgsConstructor
 public final class ClusterLockPersistService implements LockPersistService {
     
-    private final ClusterPersistRepository repository;
+    private final DistributedLockHolder distributedLockHolder;
     
     @Override
     public boolean tryLock(final LockDefinition lockDefinition, final long timeoutMillis) {
-        return repository.tryLock(lockDefinition.getLockKey(), timeoutMillis);
+        return distributedLockHolder.getDistributedLock(lockDefinition.getLockKey()).tryLock(timeoutMillis);
     }
     
     @Override
     public void unlock(final LockDefinition lockDefinition) {
-        repository.unlock(lockDefinition.getLockKey());
+        distributedLockHolder.getDistributedLock(lockDefinition.getLockKey()).unlock();
     }
 }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index af98aac940f..2aa113a0320 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -70,7 +70,7 @@ public final class RegistryCenter {
         this.databaseConfigs = databaseConfigs;
         storageNodeStatusService = new StorageNodeStatusService(repository);
         computeNodeStatusService = new ComputeNodeStatusService(repository);
-        lockPersistService = new ClusterLockPersistService(repository);
+        lockPersistService = new ClusterLockPersistService(repository.getDistributedLockHolder());
         listenerFactory = new GovernanceWatcherFactory(repository, eventBusContext, getJDBCDatabaseName());
         createSubscribers(repository);
     }
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
index d8899a6843d..67d3a6c6edb 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
@@ -21,6 +21,7 @@ import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
 
 import java.util.Collections;
 import java.util.List;
@@ -63,16 +64,12 @@ public final class ClusterPersistRepositoryFixture implements ClusterPersistRepo
     }
     
     @Override
-    public void delete(final String key) {
-    }
-    
-    @Override
-    public boolean tryLock(final String lockKey, final long timeoutMillis) {
-        return false;
+    public DistributedLockHolder getDistributedLockHolder() {
+        return null;
     }
     
     @Override
-    public void unlock(final String lockKey) {
+    public void delete(final String key) {
     }
     
     @Override
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
index ad9b6a21b18..c6286893389 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
@@ -21,6 +21,7 @@ import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -69,17 +70,13 @@ public final class ProcessListClusterPersistRepositoryFixture implements Cluster
     }
     
     @Override
-    public void delete(final String key) {
-        REGISTRY_DATA.remove(key);
-    }
-    
-    @Override
-    public boolean tryLock(final String lockKey, final long timeoutMillis) {
-        return false;
+    public DistributedLockHolder getDistributedLockHolder() {
+        return null;
     }
     
     @Override
-    public void unlock(final String lockKey) {
+    public void delete(final String key) {
+        REGISTRY_DATA.remove(key);
     }
     
     @Override
diff --git a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
index 24fd2ce64e1..7d5ebd52910 100644
--- a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
+++ b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.mode.repository.cluster;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
 import org.apache.shardingsphere.mode.persist.PersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
 
 /**
  * Cluster persist repository.
@@ -51,20 +52,11 @@ public interface ClusterPersistRepository extends PersistRepository {
     void persistExclusiveEphemeral(String key, String value);
     
     /**
-     * Try lock.
-     *
-     * @param lockKey lock key
-     * @param timeoutMillis timeout millis
-     * @return is locked or not
-     */
-    boolean tryLock(String lockKey, long timeoutMillis);
-    
-    /**
-     * Unlock.
-     *
-     * @param lockKey lock key
+     * Get distributed lock holder.
+     * 
+     * @return distributed lock holder
      */
-    void unlock(String lockKey);
+    DistributedLockHolder getDistributedLockHolder();
     
     /**
      * Watch key or path of governance server.
diff --git a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
index 3900527e258..fef02030f79 100644
--- a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
+++ b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
@@ -26,6 +26,7 @@ import com.ecwid.consul.v1.kv.model.PutParams;
 import com.ecwid.consul.v1.session.model.NewSession;
 import com.ecwid.consul.v1.session.model.Session;
 import com.google.common.base.Strings;
+import lombok.Getter;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
@@ -48,7 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 /**
  * Registry repository of Consul.
  */
-public class ConsulRepository implements ClusterPersistRepository {
+public final class ConsulRepository implements ClusterPersistRepository {
     
     private static final ScheduledThreadPoolExecutor SESSION_FLUSH_EXECUTOR = new ScheduledThreadPoolExecutor(2);
     
@@ -56,6 +57,7 @@ public class ConsulRepository implements ClusterPersistRepository {
     
     private ConsulProperties consulProps;
     
+    @Getter
     private DistributedLockHolder distributedLockHolder;
     
     private Map<String, Collection<String>> watchKeyMap;
@@ -129,16 +131,6 @@ public class ConsulRepository implements ClusterPersistRepository {
         persistEphemeral(key, value);
     }
     
-    @Override
-    public boolean tryLock(final String lockKey, final long timeoutMillis) {
-        return distributedLockHolder.getDistributedLock(lockKey).tryLock(timeoutMillis);
-    }
-    
-    @Override
-    public void unlock(final String lockKey) {
-        distributedLockHolder.getDistributedLock(lockKey).unlock();
-    }
-    
     @Override
     public void watch(final String key, final DataChangedEventListener listener) {
         Thread watchThread = new Thread(() -> watchChildKeyChangeEvent(key, listener));
diff --git a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
index cebc5f8dda8..3ad947091ed 100644
--- a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
+++ b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
@@ -31,6 +31,7 @@ import io.etcd.jetcd.options.WatchOption;
 import io.etcd.jetcd.support.Observers;
 import io.etcd.jetcd.support.Util;
 import io.etcd.jetcd.watch.WatchEvent;
+import lombok.Getter;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -56,6 +57,7 @@ public final class EtcdRepository implements ClusterPersistRepository {
     
     private EtcdProperties etcdProps;
     
+    @Getter
     private DistributedLockHolder distributedLockHolder;
     
     @Override
@@ -172,16 +174,6 @@ public final class EtcdRepository implements ClusterPersistRepository {
         }
     }
     
-    @Override
-    public boolean tryLock(final String lockKey, final long timeoutMillis) {
-        return distributedLockHolder.getDistributedLock(lockKey).tryLock(timeoutMillis);
-    }
-    
-    @Override
-    public void unlock(final String lockKey) {
-        distributedLockHolder.getDistributedLock(lockKey).unlock();
-    }
-    
     @Override
     public void close() {
         client.close();
diff --git a/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java b/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java
index ed3094320f3..2aa69c8cc20 100644
--- a/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java
+++ b/mode/type/cluster/repository/provider/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java
@@ -33,6 +33,7 @@ import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
 import org.apache.shardingsphere.mode.repository.cluster.nacos.entity.KeyValue;
 import org.apache.shardingsphere.mode.repository.cluster.nacos.entity.ServiceController;
 import org.apache.shardingsphere.mode.repository.cluster.nacos.entity.ServiceMetadata;
@@ -125,15 +126,9 @@ public final class NacosRepository implements ClusterPersistRepository {
     }
     
     @Override
-    public boolean tryLock(final String lockKey, final long timeoutMillis) {
+    public DistributedLockHolder getDistributedLockHolder() {
         // TODO
-        throw new UnsupportedOperationException("Can not support tryLock on Nacos yet.");
-    }
-    
-    @Override
-    public void unlock(final String lockKey) {
-        // TODO
-        throw new UnsupportedOperationException("Can not support unlock on Nacos yet.");
+        throw new UnsupportedOperationException("Can not support distributed lock on Nacos yet.");
     }
     
     @Override
diff --git a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
index d16e36fac9b..e7d5c2f9c15 100644
--- a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
+++ b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.mode.repository.cluster.zookeeper;
 
 import com.google.common.base.Strings;
+import lombok.Getter;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
@@ -66,6 +67,7 @@ public final class ZookeeperRepository implements ClusterPersistRepository, Inst
     
     private CuratorFramework client;
     
+    @Getter
     private DistributedLockHolder distributedLockHolder;
     
     private InstanceMetaData instanceMetaData;
@@ -273,16 +275,6 @@ public final class ZookeeperRepository implements ClusterPersistRepository, Inst
         }
     }
     
-    @Override
-    public boolean tryLock(final String lockKey, final long timeoutMillis) {
-        return distributedLockHolder.getDistributedLock(lockKey).tryLock(timeoutMillis);
-    }
-    
-    @Override
-    public void unlock(final String lockKey) {
-        distributedLockHolder.getDistributedLock(lockKey).unlock();
-    }
-    
     @Override
     public void close() {
         caches.values().forEach(CuratorCache::close);
diff --git a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
index b2f60c359e0..91381ef3bf9 100644
--- a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
+++ b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
@@ -21,6 +21,7 @@ import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -69,20 +70,16 @@ public final class ClusterPersistRepositoryFixture implements ClusterPersistRepo
     }
     
     @Override
-    public void delete(final String key) {
-    }
-    
-    @Override
-    public void watch(final String key, final DataChangedEventListener listener) {
+    public DistributedLockHolder getDistributedLockHolder() {
+        return null;
     }
     
     @Override
-    public boolean tryLock(final String lockKey, final long timeoutMillis) {
-        return false;
+    public void delete(final String key) {
     }
     
     @Override
-    public void unlock(final String lockKey) {
+    public void watch(final String key, final DataChangedEventListener listener) {
     }
     
     @Override
diff --git a/test/integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java b/test/integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
index d08f3105bf0..b6dd5998f75 100644
--- a/test/integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
+++ b/test/integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -71,16 +72,12 @@ public final class TestClusterPersistRepository implements ClusterPersistReposit
     }
     
     @Override
-    public void delete(final String key) {
-    }
-    
-    @Override
-    public boolean tryLock(final String lockKey, final long timeoutMillis) {
-        return false;
+    public DistributedLockHolder getDistributedLockHolder() {
+        return null;
     }
     
     @Override
-    public void unlock(final String lockKey) {
+    public void delete(final String key) {
     }
     
     @Override