You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/01/26 10:54:59 UTC
[shardingsphere] branch master updated: For #14722:Support only one scaling job could be executed at the same time for one schema (#14991)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e14fcb7 For #14722:Support only one scaling job could be executed at the same time for one schema (#14991)
e14fcb7 is described below
commit e14fcb7b5bb88022068875e0a731652b2daa2367
Author: ReyYang <yi...@163.com>
AuthorDate: Wed Jan 26 18:54:09 2022 +0800
For #14722:Support only one scaling job could be executed at the same time for one schema (#14991)
* For #14722:Support only one scaling job could be executed at the same time for one schema
* For #14722:fix review
* For #14722:fix review,add ScalingJobReleaseSchemaNameLockEvent
* For #14722:fix review,delete ScalingJobReleaseSchemaNameLockEvent
---
.../subscriber/ScalingRegistrySubscriber.java | 22 ++++++++++++++++++++++
.../zookeeper/CuratorZookeeperRepository.java | 4 ++--
2 files changed, 24 insertions(+), 2 deletions(-)
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
index c23b570..d286517 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
@@ -17,11 +17,13 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.subscriber;
+import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockRegistryService;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.RegistryCacheManager;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.event.StartScalingEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ClusterSwitchConfigurationEvent;
@@ -56,11 +58,16 @@ public final class ScalingRegistrySubscriber {
private final RegistryCacheManager registryCacheManager;
+ private final LockRegistryService lockRegistryService;
+
+ private final Map<String, Boolean> schemaNameLockedMap = Maps.newConcurrentMap();
+
public ScalingRegistrySubscriber(final ClusterPersistRepository repository) {
this.repository = repository;
this.persistService = new SchemaRulePersistService(repository);
dataSourcePersistService = new DataSourcePersistService(repository);
registryCacheManager = new RegistryCacheManager(repository);
+ lockRegistryService = new LockRegistryService(repository);
ShardingSphereEventBus.getInstance().register(this);
}
@@ -71,10 +78,16 @@ public final class ScalingRegistrySubscriber {
*/
@Subscribe
public void ruleConfigurationCached(final RuleConfigurationCachedEvent event) {
+ boolean locked = lockRegistryService.tryLock(decorateLockName(event.getSchemaName()), 1000);
+ if (!locked) {
+ return;
+ }
+ schemaNameLockedMap.put(event.getSchemaName(), true);
String sourceDataSource = repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(event.getSchemaName()));
String sourceRule = repository.get(SchemaMetaDataNode.getRulePath(event.getSchemaName()));
String targetRule = registryCacheManager.loadCache(SchemaMetaDataNode.getRulePath(event.getSchemaName()), event.getCacheId());
String ruleCacheId = event.getCacheId();
+ log.info("start scaling job, locked the schema name, event={}", event);
StartScalingEvent startScalingEvent = new StartScalingEvent(event.getSchemaName(), sourceDataSource, sourceRule, targetRule, ruleCacheId);
ShardingSphereEventBus.getInstance().post(startScalingEvent);
}
@@ -98,6 +111,11 @@ public final class ScalingRegistrySubscriber {
log.info("start to delete cache, ruleCacheId={}", ruleCacheId);
registryCacheManager.deleteCache(SchemaMetaDataNode.getRulePath(event.getTargetSchemaName()), ruleCacheId);
}
+ if (schemaNameLockedMap.getOrDefault(event.getTargetSchemaName(), false)) {
+ log.info("scaling job finished, release schema name lock, event = {}", event);
+ schemaNameLockedMap.remove(event.getTargetSchemaName());
+ lockRegistryService.releaseLock(decorateLockName(event.getTargetSchemaName()));
+ }
}
/**
@@ -112,4 +130,8 @@ public final class ScalingRegistrySubscriber {
dataSourcePersistService.persist(schemaName, event.getTargetDataSourcePropertiesMap());
persistService.persist(schemaName, event.getTargetRuleConfigs());
}
+
+ private String decorateLockName(final String schemaName) {
+ return "Scaling-" + schemaName;
+ }
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-prov [...]
index 8b11302..6a948ba 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
@@ -29,7 +29,7 @@ import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -323,7 +323,7 @@ public final class CuratorZookeeperRepository implements ClusterPersistRepositor
if (availableLock(key)) {
return locks.get(key);
}
- InterProcessLock lock = new InterProcessMutex(client, key);
+ InterProcessLock lock = new InterProcessSemaphoreMutex(client, key);
locks.put(key, lock);
return lock;
}