You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/01/26 10:54:59 UTC

[shardingsphere] branch master updated: For #14722:Support only one scaling job could be executed at the same time for one schema (#14991)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e14fcb7  For #14722:Support only one scaling job could be executed at the same time for one schema (#14991)
e14fcb7 is described below

commit e14fcb7b5bb88022068875e0a731652b2daa2367
Author: ReyYang <yi...@163.com>
AuthorDate: Wed Jan 26 18:54:09 2022 +0800

    For #14722:Support only one scaling job could be executed at the same time for one schema (#14991)
    
    * For #14722:Support only one scaling job could be executed at the same time for one schema
    
    * For #14722:fix review
    
    * For #14722:fix review,add ScalingJobReleaseSchemaNameLockEvent
    
    * For #14722:fix review,delete ScalingJobReleaseSchemaNameLockEvent
---
 .../subscriber/ScalingRegistrySubscriber.java      | 22 ++++++++++++++++++++++
 .../zookeeper/CuratorZookeeperRepository.java      |  4 ++--
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
index c23b570..d286517 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
@@ -17,11 +17,13 @@
 
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.subscriber;
 
+import com.google.common.collect.Maps;
 import com.google.common.eventbus.Subscribe;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockRegistryService;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.RegistryCacheManager;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.event.StartScalingEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ClusterSwitchConfigurationEvent;
@@ -56,11 +58,16 @@ public final class ScalingRegistrySubscriber {
     
     private final RegistryCacheManager registryCacheManager;
     
+    private final LockRegistryService lockRegistryService;
+    
+    private final Map<String, Boolean> schemaNameLockedMap = Maps.newConcurrentMap();
+    
     public ScalingRegistrySubscriber(final ClusterPersistRepository repository) {
         this.repository = repository;
         this.persistService = new SchemaRulePersistService(repository);
         dataSourcePersistService = new DataSourcePersistService(repository);
         registryCacheManager = new RegistryCacheManager(repository);
+        lockRegistryService = new LockRegistryService(repository);
         ShardingSphereEventBus.getInstance().register(this);
     }
     
@@ -71,10 +78,16 @@ public final class ScalingRegistrySubscriber {
      */
     @Subscribe
     public void ruleConfigurationCached(final RuleConfigurationCachedEvent event) {
+        boolean locked = lockRegistryService.tryLock(decorateLockName(event.getSchemaName()), 1000);
+        if (!locked) {
+            return;
+        }
+        schemaNameLockedMap.put(event.getSchemaName(), true);
         String sourceDataSource = repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(event.getSchemaName()));
         String sourceRule = repository.get(SchemaMetaDataNode.getRulePath(event.getSchemaName()));
         String targetRule = registryCacheManager.loadCache(SchemaMetaDataNode.getRulePath(event.getSchemaName()), event.getCacheId());
         String ruleCacheId = event.getCacheId();
+        log.info("start scaling job, locked the schema name, event={}", event);
         StartScalingEvent startScalingEvent = new StartScalingEvent(event.getSchemaName(), sourceDataSource, sourceRule, targetRule, ruleCacheId);
         ShardingSphereEventBus.getInstance().post(startScalingEvent);
     }
@@ -98,6 +111,11 @@ public final class ScalingRegistrySubscriber {
             log.info("start to delete cache, ruleCacheId={}", ruleCacheId);
             registryCacheManager.deleteCache(SchemaMetaDataNode.getRulePath(event.getTargetSchemaName()), ruleCacheId);
         }
+        if (schemaNameLockedMap.getOrDefault(event.getTargetSchemaName(), false)) {
+            log.info("scaling job finished, release schema name lock, event = {}", event);
+            schemaNameLockedMap.remove(event.getTargetSchemaName());
+            lockRegistryService.releaseLock(decorateLockName(event.getTargetSchemaName()));
+        }
     }
     
     /**
@@ -112,4 +130,8 @@ public final class ScalingRegistrySubscriber {
         dataSourcePersistService.persist(schemaName, event.getTargetDataSourcePropertiesMap());
         persistService.persist(schemaName, event.getTargetRuleConfigs());
     }
+    
+    private String decorateLockName(final String schemaName) {
+        return "Scaling-" + schemaName;
+    }
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-prov [...]
index 8b11302..6a948ba 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
@@ -29,7 +29,7 @@ import org.apache.curator.framework.recipes.cache.CuratorCache;
 import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.locks.InterProcessLock;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -323,7 +323,7 @@ public final class CuratorZookeeperRepository implements ClusterPersistRepositor
         if (availableLock(key)) {
             return locks.get(key);
         }
-        InterProcessLock lock = new InterProcessMutex(client, key);
+        InterProcessLock lock = new InterProcessSemaphoreMutex(client, key);
         locks.put(key, lock);
         return lock;
     }