You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/04/26 02:12:57 UTC
[shardingsphere] branch master updated: Refactor distribute lock context to add lock manager by SPI (#17100)
This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 9b6e00f6f32 Refactor distribute lock context to add lock manager by SPI (#17100)
9b6e00f6f32 is described below
commit 9b6e00f6f32b8ff2e5bc9d2fd598cfd7019ba3e5
Author: gin <ja...@163.com>
AuthorDate: Tue Apr 26 10:12:51 2022 +0800
Refactor distribute lock context to add lock manager by SPI (#17100)
---
.../apache/shardingsphere/infra/lock/LockType.java | 14 +-
.../mode/manager/ShardingSphereLockManager.java | 71 ++++++++
.../cluster/ClusterContextManagerBuilder.java | 3 +-
.../coordinator/lock/DistributeLockContext.java | 139 ++++-----------
.../cluster/coordinator/lock/LockNodeService.java | 73 ++++++++
.../lock/ShardingSphereDistributeGlobalLock.java | 12 +-
.../lock/database/ShardingSphereDatabaseLock.java | 82 +++++++++
.../ShardingSphereDatabaseLockManager.java | 189 +++++++++++++++++++++
.../event/DatabaseAckLockReleasedEvent.java} | 17 +-
.../event/DatabaseAckLockedEvent.java} | 17 +-
.../event/DatabaseLockReleasedEvent.java | 4 +-
.../{ => database}/event/DatabaseLockedEvent.java | 4 +-
.../database/service/DatabaseLockNodeService.java | 76 +++++++++
.../watcher/DatabaseAckChangedWatcher.java} | 28 +--
.../watcher/DatabaseLocksChangedWatcher.java} | 22 +--
...ckReleasedEvent.java => LockReleasedEvent.java} | 2 +-
.../{DatabaseLockedEvent.java => LockedEvent.java} | 2 +-
.../lock/service/LockRegistryService.java | 3 +-
.../coordinator/lock/util/LockNodeUtil.java | 10 ++
.../lock/watcher/GlobalLocksChangedWatcher.java | 8 +-
...gsphere.mode.manager.ShardingSphereLockManager} | 8 +-
....cluster.coordinator.registry.GovernanceWatcher | 4 +-
.../lock/DistributeLockContextTest.java | 60 +------
.../watcher/GlobalLocksChangedWatcherTest.java | 12 +-
24 files changed, 620 insertions(+), 240 deletions(-)
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/DatabaseLockedEvent.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockType.java
similarity index 67%
copy from shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/DatabaseLockedEvent.java
copy to shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockType.java
index 014ef43f438..2fdb3c10241 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/DatabaseLockedEvent.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockType.java
@@ -15,18 +15,12 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+package org.apache.shardingsphere.infra.lock;
/**
- * Locked event.
+ * Lock type.
*/
-@RequiredArgsConstructor
-@Getter
-public final class DatabaseLockedEvent implements GovernanceEvent {
+public enum LockType {
- private final String database;
+ GENERAL, DATABASE, SCHEMA, TABLE
}
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ShardingSphereLockManager.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ShardingSphereLockManager.java
new file mode 100644
index 00000000000..e33bd5d5f91
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ShardingSphereLockManager.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager;
+
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.lock.LockType;
+import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
+import org.apache.shardingsphere.mode.persist.PersistRepository;
+
+import java.util.Collection;
+
+/**
+ * Lock manager of ShardingSphere.
+ */
+public interface ShardingSphereLockManager {
+
+ /**
+ * Init locks state.
+ *
+ * @param repository persist repository
+ * @param instance instance
+ * @param computeNodeInstances compute node instances
+ */
+ void initLocksState(PersistRepository repository, ComputeNodeInstance instance, Collection<ComputeNodeInstance> computeNodeInstances);
+
+ /**
+ * Get or create lock.
+ *
+ * @param lockName lock name
+ * @return lock
+ */
+ ShardingSphereLock getOrCreateLock(String lockName);
+
+ /**
+ * Get lock.
+ *
+ * @param lockName lock name
+ * @return lock
+ */
+ ShardingSphereLock getLock(String lockName);
+
+ /**
+ * Is locked.
+ *
+ * @param lockName lock name
+ * @return is locked or not
+ */
+ boolean isLocked(String lockName);
+
+ /**
+ * Get lock type.
+ *
+ * @return lock type
+ */
+ LockType getLockType();
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 7ee2703ce70..6893b3d67d1 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -37,7 +37,6 @@ import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.ClusterContextManagerCoordinator;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.DistributeLockContext;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockRegistryService;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsBuilder;
@@ -164,7 +163,7 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
final Properties transactionProps, final ModeConfiguration modeConfiguration) {
ComputeNodeInstance computeNodeInstance = metaDataPersistService.getComputeNodePersistService().loadComputeNodeInstance(instanceDefinition);
ClusterWorkerIdGenerator clusterWorkerIdGenerator = new ClusterWorkerIdGenerator(repository, metaDataPersistService, instanceDefinition);
- DistributeLockContext distributeLockContext = new DistributeLockContext(new LockRegistryService(repository));
+ DistributeLockContext distributeLockContext = new DistributeLockContext(repository, computeNodeInstance);
InstanceContext instanceContext = new InstanceContext(computeNodeInstance, clusterWorkerIdGenerator, modeConfiguration, distributeLockContext);
instanceContext.initLockContext();
repository.watchSessionConnection(instanceContext);
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java
index a8493191945..34b20eadb22 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java
@@ -18,151 +18,72 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock;
import com.google.common.base.Preconditions;
-import com.google.common.eventbus.Subscribe;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.lock.LockContext;
-import org.apache.shardingsphere.infra.lock.ShardingSphereGlobalLock;
+import org.apache.shardingsphere.infra.lock.LockType;
import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.AckLockedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.AckLockReleasedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.DatabaseLockedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.DatabaseLockReleasedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockNode;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockRegistryService;
+import org.apache.shardingsphere.mode.manager.ShardingSphereLockManager;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
import java.util.Collection;
+import java.util.EnumMap;
import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
/**
* Distribute lock context.
*/
-@RequiredArgsConstructor
public final class DistributeLockContext implements LockContext {
- private final Map<String, ShardingSphereGlobalLock> globalLocks = new ConcurrentHashMap<>();
+ static {
+ ShardingSphereServiceLoader.register(ShardingSphereLockManager.class);
+ }
- private final LockRegistryService lockRegistryService;
+ private final Map<LockType, ShardingSphereLockManager> lockManagers = new EnumMap<>(LockType.class);
- private volatile ComputeNodeInstance currentInstance;
+ private final ClusterPersistRepository repository;
- private volatile Collection<ComputeNodeInstance> computeNodeInstances;
+ private final ComputeNodeInstance currentInstance;
- @Override
- public void initLockState(final InstanceContext instanceContext) {
- register(instanceContext);
- synchronizeGlobalLock();
+ public DistributeLockContext(final ClusterPersistRepository repository, final ComputeNodeInstance currentInstance) {
+ this.repository = repository;
+ this.currentInstance = currentInstance;
+ loadLockManager();
}
- private void register(final InstanceContext instanceContext) {
- currentInstance = instanceContext.getInstance();
- computeNodeInstances = instanceContext.getComputeNodeInstances();
- ShardingSphereEventBus.getInstance().register(this);
+ private void loadLockManager() {
+ for (ShardingSphereLockManager each : ShardingSphereServiceLoader.getServiceInstances(ShardingSphereLockManager.class)) {
+ if (lockManagers.containsKey(each.getLockType())) {
+ continue;
+ }
+ lockManagers.put(each.getLockType(), each);
+ }
}
- private void synchronizeGlobalLock() {
- Collection<String> allGlobalLock = lockRegistryService.getAllGlobalDatabaseLocks();
- if (allGlobalLock.isEmpty()) {
- lockRegistryService.initGlobalLockRoot();
- return;
- }
- for (String each : allGlobalLock) {
- Optional<String> databaseLock = LockNode.parseGlobalDatabaseLocksNodePath(each);
- databaseLock.ifPresent(database -> globalLocks.put(database, crateGlobalLock()));
+ @Override
+ public void initLockState(final InstanceContext instanceContext) {
+ Collection<ComputeNodeInstance> computeNodeInstances = instanceContext.getComputeNodeInstances();
+ for (ShardingSphereLockManager each : lockManagers.values()) {
+ each.initLocksState(repository, currentInstance, computeNodeInstances);
}
}
@Override
public synchronized ShardingSphereLock getOrCreateDatabaseLock(final String databaseName) {
Preconditions.checkNotNull(databaseName, "Get or create database lock args database name can not be null.");
- ShardingSphereGlobalLock result = globalLocks.get(databaseName);
- if (null != result) {
- return result;
- }
- result = crateGlobalLock();
- globalLocks.put(databaseName, result);
- return result;
- }
-
- private ShardingSphereGlobalLock crateGlobalLock() {
- return new ShardingSphereDistributeGlobalLock(lockRegistryService, currentInstance, computeNodeInstances);
- }
-
- private String getCurrentInstanceId() {
- return currentInstance.getInstanceDefinition().getInstanceId().getId();
+ return lockManagers.get(LockType.DATABASE).getOrCreateLock(databaseName);
}
@Override
public ShardingSphereLock getDatabaseLock(final String databaseName) {
Preconditions.checkNotNull(databaseName, "Get database lock args database name can not be null.");
- return globalLocks.get(databaseName);
+ return lockManagers.get(LockType.DATABASE).getLock(databaseName);
}
@Override
public boolean isLockedDatabase(final String databaseName) {
Preconditions.checkNotNull(databaseName, "Is locked database args database name can not be null.");
- if (globalLocks.isEmpty()) {
- return false;
- }
- ShardingSphereGlobalLock shardingSphereGlobalLock = globalLocks.get(databaseName);
- if (null != shardingSphereGlobalLock) {
- return shardingSphereGlobalLock.isLocked(databaseName);
- }
- return false;
- }
-
- private Optional<ShardingSphereGlobalLock> getGlobalLock(final String databaseName) {
- return Optional.ofNullable(globalLocks.get(databaseName));
- }
-
- /**
- * Locked event.
- *
- * @param event locked event
- */
- @Subscribe
- public synchronized void renew(final DatabaseLockedEvent event) {
- String database = event.getDatabase();
- ShardingSphereGlobalLock globalLock = globalLocks.get(database);
- if (null == globalLock) {
- globalLock = crateGlobalLock();
- globalLocks.put(database, globalLock);
- }
- globalLock.ackLock(database, getCurrentInstanceId());
- }
-
- /**
- * Lock released event.
- *
- * @param event lock released event
- */
- @Subscribe
- public synchronized void renew(final DatabaseLockReleasedEvent event) {
- String database = event.getDatabase();
- getGlobalLock(database).ifPresent(lock -> lock.releaseAckLock(database, getCurrentInstanceId()));
- }
-
- /**
- * Ack locked event.
- *
- * @param event ack locked event
- */
- @Subscribe
- public synchronized void renew(final AckLockedEvent event) {
- getGlobalLock(event.getDatabase()).ifPresent(shardingSphereGlobalLock -> shardingSphereGlobalLock.addLockedInstance(event.getLockedInstance()));
- }
-
- /**
- * Ack lock released event.
- *
- * @param event ack lock released event.
- */
- @Subscribe
- public synchronized void renew(final AckLockReleasedEvent event) {
- getGlobalLock(event.getDatabase()).ifPresent(shardingSphereGlobalLock -> shardingSphereGlobalLock.removeLockedInstance(event.getLockedInstance()));
+ return lockManagers.get(LockType.DATABASE).isLocked(databaseName);
}
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/LockNodeService.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/LockNodeService.java
new file mode 100644
index 00000000000..c5308a9a702
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/LockNodeService.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock;
+
+import java.util.Optional;
+
+/**
+ * Lock node service.
+ */
+public interface LockNodeService {
+
+ /**
+ * Get global locks node path.
+ *
+ * @return global lock node path
+ */
+ String getGlobalLocksNodePath();
+
+ /**
+ * Get global locked ack node path.
+ *
+ * @return global locked ack node path
+ */
+ String getGlobalLockedAckNodePath();
+
+ /**
+ * Generate global locks name.
+ *
+ * @param locks locks
+ * @return global locks name
+ */
+ String generateGlobalLocksName(String locks);
+
+ /**
+ * Generate global ack lock name.
+ *
+ * @param lock lock
+ * @param lockedInstanceId locked instance id
+ * @return global ack lock name
+ */
+ String generateGlobalAckLockName(String lock, String lockedInstanceId);
+
+ /**
+ * Parse global Locks node path.
+ *
+ * @param nodePath locks node path
+ * @return global locked node path
+ */
+ Optional<String> parseGlobalLocksNodePath(String nodePath);
+
+ /**
+ * Parse global locked ack node path.
+ *
+ * @param nodePath locked ack node path
+ * @return global locked ack node path
+ */
+ Optional<String> parseGlobalLockedAckNodePath(String nodePath);
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/ShardingSphereDistributeGlobalLock.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/ShardingSphereDistributeGlobalLock.java
index a20dbb4e30a..2535171a5d9 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/ShardingSphereDistributeGlobalLock.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/ShardingSphereDistributeGlobalLock.java
@@ -21,8 +21,8 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.lock.ShardingSphereGlobalLock;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockRegistryService;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockNode;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockRegistryService;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockState;
import java.util.Collection;
@@ -99,7 +99,7 @@ public final class ShardingSphereDistributeGlobalLock implements ShardingSphereG
long consumeTime = 0;
String currentInstanceId = getCurrentInstanceId();
do {
- boolean isLocked = lockService.tryGlobalLock(LockNode.generateGlobalDatabaseLocksName(lockName), DEFAULT_REGISTRY_TIMEOUT_MILLISECONDS);
+ boolean isLocked = lockService.tryGlobalLock(lockName, DEFAULT_REGISTRY_TIMEOUT_MILLISECONDS);
consumeTime += DEFAULT_REGISTRY_TIMEOUT_MILLISECONDS;
if (isLocked) {
lockedInstances.add(currentInstanceId);
@@ -145,13 +145,13 @@ public final class ShardingSphereDistributeGlobalLock implements ShardingSphereG
log.debug("release lock, lockName={}", lockName);
String currentInstanceId = getCurrentInstanceId();
if (isOwner.get()) {
- lockService.releaseGlobalLock(LockNode.generateGlobalDatabaseLocksName(lockName), true);
+ lockService.releaseGlobalLock(lockName, true);
isOwner.compareAndSet(true, false);
lockedInstances.remove(currentInstanceId);
synchronizedLockState.compareAndSet(LockState.LOCKED, LockState.UNLOCKED);
return;
}
- lockService.releaseGlobalLock(LockNode.generateGlobalDatabaseLockReleasedNodePath(lockName), false);
+ lockService.releaseGlobalLock(lockName, false);
releaseAckLock(lockName, currentInstanceId);
}
log.debug("release lock, state is not locked, ignore, lockName={}", lockName);
@@ -177,7 +177,7 @@ public final class ShardingSphereDistributeGlobalLock implements ShardingSphereG
@Override
public void ackLock(final String lockName, final String lockedInstanceId) {
if (!isOwner.get() && LockState.UNLOCKED == synchronizedLockState.get()) {
- lockService.ackLock(LockNode.generateGlobalDatabaseAckLockName(lockName, lockedInstanceId), lockedInstanceId);
+ lockService.ackLock(lockName, lockedInstanceId);
lockedInstances.add(lockedInstanceId);
synchronizedLockState.compareAndSet(LockState.UNLOCKED, LockState.LOCKED);
}
@@ -186,7 +186,7 @@ public final class ShardingSphereDistributeGlobalLock implements ShardingSphereG
@Override
public void releaseAckLock(final String lockName, final String lockedInstanceId) {
if (!isOwner.get()) {
- lockService.releaseAckLock(LockNode.generateGlobalDatabaseAckLockName(lockName, lockedInstanceId));
+ lockService.releaseAckLock(lockName);
} else {
isOwner.compareAndSet(true, false);
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/ShardingSphereDatabaseLock.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/ShardingSphereDatabaseLock.java
new file mode 100644
index 00000000000..09412d328c7
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/ShardingSphereDatabaseLock.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database;
+
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.lock.ShardingSphereGlobalLock;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.ShardingSphereDistributeGlobalLock;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database.service.DatabaseLockNodeService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockRegistryService;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+import java.util.Collection;
+
+/**
+ * Database lock of ShardingSphere.
+ */
+public final class ShardingSphereDatabaseLock implements ShardingSphereGlobalLock {
+
+ private final LockNodeService lockNodeService = new DatabaseLockNodeService();
+
+ private final ShardingSphereDistributeGlobalLock innerDistributeGlobalLock;
+
+ public ShardingSphereDatabaseLock(final ClusterPersistRepository clusterRepository, final ComputeNodeInstance currentInstance, final Collection<ComputeNodeInstance> computeNodeInstances) {
+ innerDistributeGlobalLock = new ShardingSphereDistributeGlobalLock(new LockRegistryService(clusterRepository), currentInstance, computeNodeInstances);
+ }
+
+ @Override
+ public boolean tryLock(final String lockName) {
+ return innerDistributeGlobalLock.tryLock(lockNodeService.generateGlobalLocksName(lockName));
+ }
+
+ @Override
+ public boolean tryLock(final String lockName, final long timeoutMillis) {
+ return innerDistributeGlobalLock.tryLock(lockNodeService.generateGlobalLocksName(lockName), timeoutMillis);
+ }
+
+ @Override
+ public void releaseLock(final String lockName) {
+ innerDistributeGlobalLock.releaseLock(lockNodeService.generateGlobalLocksName(lockName));
+ }
+
+ @Override
+ public boolean isLocked(final String lockName) {
+ return innerDistributeGlobalLock.isLocked(lockName);
+ }
+
+ @Override
+ public void ackLock(final String lockName, final String lockedInstanceId) {
+ innerDistributeGlobalLock.ackLock(lockNodeService.generateGlobalAckLockName(lockName, lockedInstanceId), lockedInstanceId);
+ }
+
+ @Override
+ public void releaseAckLock(final String lockName, final String lockedInstanceId) {
+ innerDistributeGlobalLock.releaseAckLock(lockNodeService.generateGlobalAckLockName(lockName, lockedInstanceId), lockedInstanceId);
+ }
+
+ @Override
+ public void addLockedInstance(final String lockedInstanceId) {
+ innerDistributeGlobalLock.addLockedInstance(lockedInstanceId);
+ }
+
+ @Override
+ public void removeLockedInstance(final String lockedInstanceId) {
+ innerDistributeGlobalLock.removeLockedInstance(lockedInstanceId);
+ }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/ShardingSphereDatabaseLockManager.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/ShardingSphereDatabaseLockManager.java
new file mode 100644
index 00000000000..01a99e8cadf
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/ShardingSphereDatabaseLockManager.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.lock.LockType;
+import org.apache.shardingsphere.infra.lock.ShardingSphereGlobalLock;
+import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
+import org.apache.shardingsphere.mode.manager.ShardingSphereLockManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database.event.DatabaseAckLockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database.event.DatabaseAckLockedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database.event.DatabaseLockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database.event.DatabaseLockedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database.service.DatabaseLockNodeService;
+import org.apache.shardingsphere.mode.persist.PersistRepository;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Database lock manager of ShardingSphere.
+ */
+public final class ShardingSphereDatabaseLockManager implements ShardingSphereLockManager {
+
+ private final Map<String, ShardingSphereDatabaseLock> locks = new ConcurrentHashMap<>();
+
+ private final LockNodeService lockNodeService = new DatabaseLockNodeService();
+
+ private ClusterPersistRepository clusterRepository;
+
+ private ComputeNodeInstance currentInstance;
+
+ private Collection<ComputeNodeInstance> computeNodeInstances;
+
+ @Override
+ public void initLocksState(final PersistRepository repository, final ComputeNodeInstance currentInstance, final Collection<ComputeNodeInstance> computeNodeInstances) {
+ clusterRepository = (ClusterPersistRepository) repository;
+ this.currentInstance = currentInstance;
+ this.computeNodeInstances = computeNodeInstances;
+ ShardingSphereEventBus.getInstance().register(this);
+ synchronizeGlobalLock();
+ }
+
+ private void synchronizeGlobalLock() {
+ Collection<String> allGlobalLock = clusterRepository.getChildrenKeys(lockNodeService.getGlobalLocksNodePath());
+ if (allGlobalLock.isEmpty()) {
+ clusterRepository.persist(lockNodeService.getGlobalLocksNodePath(), "");
+ clusterRepository.persist(lockNodeService.getGlobalLockedAckNodePath(), "");
+ return;
+ }
+ for (String each : allGlobalLock) {
+ Optional<String> databaseLock = lockNodeService.parseGlobalLocksNodePath(each);
+ databaseLock.ifPresent(database -> locks.put(database, crateDatabaseLock()));
+ }
+ }
+
+ private ShardingSphereDatabaseLock crateDatabaseLock() {
+ return new ShardingSphereDatabaseLock(clusterRepository, currentInstance, computeNodeInstances);
+ }
+
+ @Override
+ public ShardingSphereLock getOrCreateLock(final String lockName) {
+ ShardingSphereDatabaseLock result = locks.get(lockName);
+ if (null != result) {
+ return result;
+ }
+ result = crateDatabaseLock();
+ locks.put(lockName, result);
+ return result;
+ }
+
+ @Override
+ public ShardingSphereLock getLock(final String lockName) {
+ return locks.get(lockName);
+ }
+
+ @Override
+ public boolean isLocked(final String lockName) {
+ if (locks.isEmpty()) {
+ return false;
+ }
+ ShardingSphereGlobalLock lock = locks.get(lockName);
+ if (null != lock) {
+ return lock.isLocked(lockName);
+ }
+ return false;
+ }
+
+ /**
+ * Locked.
+ *
+ * @param event database locked event
+ */
+ @Subscribe
+ public synchronized void locked(final DatabaseLockedEvent event) {
+ String database = event.getDatabase();
+ ShardingSphereDatabaseLock lock = locks.get(database);
+ if (null == lock) {
+ lock = crateDatabaseLock();
+ locks.put(database, lock);
+ }
+ lock.ackLock(database, getCurrentInstanceId());
+ }
+
+ /**
+ * Try lock write.
+ *
+ * @param name name
+ * @param timeoutMillis timeoutMillis
+ * @return is locked or not
+ */
+ public boolean tryLockWrite(final String name, final long timeoutMillis) {
+ return locks.get(name).tryLock(name, timeoutMillis);
+ }
+
+ /**
+ * Release lock write.
+ *
+ * @param name name
+ */
+ public void releaseLockWrite(final String name) {
+ locks.get(name).releaseLock(name);
+ }
+
+ /**
+ * Lock released.
+ *
+ * @param event database lock released event
+ */
+ @Subscribe
+ public synchronized void lockReleased(final DatabaseLockReleasedEvent event) {
+ String database = event.getDatabase();
+ getOptionalLock(database).ifPresent(lock -> lock.releaseAckLock(database, getCurrentInstanceId()));
+ }
+
+ /**
+ * Ack locked.
+ *
+ * @param event database ack locked event
+ */
+ @Subscribe
+ public synchronized void ackLocked(final DatabaseAckLockedEvent event) {
+ getOptionalLock(event.getDatabase()).ifPresent(lock -> lock.addLockedInstance(event.getLockedInstance()));
+ }
+
+ /**
+ * Ack lock released.
+ *
+ * @param event database ack lock released event
+ */
+ @Subscribe
+ public synchronized void ackLockReleased(final DatabaseAckLockReleasedEvent event) {
+ getOptionalLock(event.getDatabase()).ifPresent(lock -> lock.removeLockedInstance(event.getLockedInstance()));
+ }
+
+ private String getCurrentInstanceId() {
+ return currentInstance.getInstanceDefinition().getInstanceId().getId();
+ }
+
+ private Optional<ShardingSphereDatabaseLock> getOptionalLock(final String databaseName) {
+ return Optional.ofNullable(locks.get(databaseName));
+ }
+
+ @Override
+ public LockType getLockType() {
+ return LockType.DATABASE;
+ }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/DatabaseLockedEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/event/DatabaseAckLockReleasedEvent.java
similarity index 66%
copy from shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/DatabaseLockedEvent.java
copy to shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/event/DatabaseAckLockReleasedEvent.java
index 014ef43f438..dd2947aa0d8 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/DatabaseLockedEvent.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/event/DatabaseAckLockReleasedEvent.java
@@ -15,18 +15,25 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database.event;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.util.LockNodeUtil;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
- * Locked event.
+ * Database ack released Lock event.
*/
-@RequiredArgsConstructor
@Getter
-public final class DatabaseLockedEvent implements GovernanceEvent {
+public final class DatabaseAckLockReleasedEvent implements GovernanceEvent {
private final String database;
+
+ private final String lockedInstance;
+
+ public DatabaseAckLockReleasedEvent(final String lockName) {
+ String[] databaseInstance = LockNodeUtil.parseDatabaseLockName(lockName);
+ this.database = databaseInstance[0];
+ this.lockedInstance = databaseInstance[1];
+ }
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/DatabaseLockedEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/event/DatabaseAckLockedEvent.java
similarity index 67%
copy from shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/DatabaseLockedEvent.java
copy to shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/event/DatabaseAckLockedEvent.java
index 014ef43f438..6f34f90d352 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/DatabaseLockedEvent.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/event/DatabaseAckLockedEvent.java
@@ -15,18 +15,25 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database.event;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.util.LockNodeUtil;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
- * Locked event.
+ * Database ack locked event.
*/
-@RequiredArgsConstructor
@Getter
-public final class DatabaseLockedEvent implements GovernanceEvent {
+public final class DatabaseAckLockedEvent implements GovernanceEvent {
private final String database;
+
+ private final String lockedInstance;
+
+ public DatabaseAckLockedEvent(final String lockName) {
+ String[] databaseInstance = LockNodeUtil.parseDatabaseLockName(lockName);
+ this.database = databaseInstance[0];
+ this.lockedInstance = databaseInstance[1];
+ }
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/DatabaseLockReleasedEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/event/DatabaseLockReleasedEvent.java
similarity index 95%
copy from shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/DatabaseLockReleasedEvent.java
copy to shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/event/DatabaseLockReleasedEvent.java
index e7dc9dc96d8..88d0b045f6f 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/DatabaseLockReleasedEvent.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/event/DatabaseLockReleasedEvent.java
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database.event;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
- * Lock released event.
+ * Database lock released event.
*/
@RequiredArgsConstructor
@Getter
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/DatabaseLockedEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/event/DatabaseLockedEvent.java
similarity index 95%
copy from shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/DatabaseLockedEvent.java
copy to shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/event/DatabaseLockedEvent.java
index 014ef43f438..4c6171a0f8d 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/DatabaseLockedEvent.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/event/DatabaseLockedEvent.java
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database.event;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
- * Locked event.
+ * Database locked event.
*/
@RequiredArgsConstructor
@Getter
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/service/DatabaseLockNodeService.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/service/DatabaseLockNodeService.java
new file mode 100644
index 00000000000..06ad7799bf5
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/service/DatabaseLockNodeService.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database.service;
+
+import com.google.common.base.Joiner;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.util.LockNodeUtil;
+
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Database lock node service.
+ */
+public final class DatabaseLockNodeService implements LockNodeService {
+
+ private static final String LOCK_ROOT = "lock";
+
+ private static final String LOCK_SCOPE_GLOBAL = "global";
+
+ private static final String LOCK_LEVEL_DATABASE = "database";
+
+ private static final String LOCKS_NODE = "locks";
+
+ private static final String LOCKED_ACK_NODE = "ack";
+
+ @Override
+ public String getGlobalLocksNodePath() {
+ return Joiner.on("/").join("", LOCK_ROOT, LOCK_SCOPE_GLOBAL, LOCK_LEVEL_DATABASE, LOCKS_NODE);
+ }
+
+ @Override
+ public String getGlobalLockedAckNodePath() {
+ return Joiner.on("/").join("", LOCK_ROOT, LOCK_SCOPE_GLOBAL, LOCK_LEVEL_DATABASE, LOCKED_ACK_NODE);
+ }
+
+ @Override
+ public String generateGlobalLocksName(final String database) {
+ return getGlobalLocksNodePath() + "/" + database;
+ }
+
+ @Override
+ public String generateGlobalAckLockName(final String database, final String lockedInstanceId) {
+ return getGlobalLockedAckNodePath() + "/" + LockNodeUtil.generateDatabaseLockName(database, lockedInstanceId);
+ }
+
+ @Override
+ public Optional<String> parseGlobalLocksNodePath(final String nodePath) {
+ Pattern pattern = Pattern.compile(getGlobalLocksNodePath() + "/" + "(.+)/leases/(.+)$", Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pattern.matcher(nodePath);
+ return matcher.find() ? Optional.of(matcher.group(1)) : Optional.empty();
+ }
+
+ @Override
+ public Optional<String> parseGlobalLockedAckNodePath(final String nodePath) {
+ Pattern pattern = Pattern.compile(getGlobalLockedAckNodePath() + "/(.+)$", Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pattern.matcher(nodePath);
+ return matcher.find() ? Optional.of(matcher.group(1)) : Optional.empty();
+ }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcher.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/watcher/DatabaseAckChangedWatcher.java
similarity index 70%
copy from shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcher.java
copy to shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/watcher/DatabaseAckChangedWatcher.java
index e7b71861902..790085038e6 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcher.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/watcher/DatabaseAckChangedWatcher.java
@@ -15,11 +15,11 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.watcher;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database.watcher;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.DatabaseLockReleasedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.DatabaseLockedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockNode;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database.service.DatabaseLockNodeService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.AckLockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.AckLockedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -31,13 +31,15 @@ import java.util.Collections;
import java.util.Optional;
/**
- * Global locks changed watcher.
+ * Database ack changed watcher.
*/
-public final class GlobalLocksChangedWatcher implements GovernanceWatcher<GovernanceEvent> {
+public final class DatabaseAckChangedWatcher implements GovernanceWatcher<GovernanceEvent> {
+
+ private final DatabaseLockNodeService lockNode = new DatabaseLockNodeService();
@Override
public Collection<String> getWatchingKeys() {
- return Collections.singleton(LockNode.getGlobalDatabaseLocksNodePath());
+ return Collections.singleton(lockNode.getGlobalLockedAckNodePath());
}
@Override
@@ -47,18 +49,18 @@ public final class GlobalLocksChangedWatcher implements GovernanceWatcher<Govern
@Override
public Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) {
- Optional<String> lockedName = LockNode.parseGlobalDatabaseLocksNodePath(event.getKey());
- if (lockedName.isPresent()) {
- return handleGlobalSchemaLocksEvent(event.getType(), lockedName.get());
+ Optional<String> ackLockedName = lockNode.parseGlobalLockedAckNodePath(event.getKey());
+ if (ackLockedName.isPresent()) {
+ return handleGlobalAckEvent(event.getType(), ackLockedName.get());
}
return Optional.empty();
}
- private Optional<GovernanceEvent> handleGlobalSchemaLocksEvent(final Type eventType, final String lockedName) {
+ private Optional<GovernanceEvent> handleGlobalAckEvent(final Type eventType, final String lockedName) {
if (Type.ADDED == eventType) {
- return Optional.of(new DatabaseLockedEvent(lockedName));
+ return Optional.of(new AckLockedEvent(lockedName));
} else if (Type.DELETED == eventType) {
- return Optional.of(new DatabaseLockReleasedEvent(lockedName));
+ return Optional.of(new AckLockReleasedEvent(lockedName));
}
return Optional.empty();
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcher.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/watcher/DatabaseLocksChangedWatcher.java
similarity index 78%
copy from shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcher.java
copy to shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/watcher/DatabaseLocksChangedWatcher.java
index e7b71861902..415dee2c0b2 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcher.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/database/watcher/DatabaseLocksChangedWatcher.java
@@ -15,11 +15,11 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.watcher;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database.watcher;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.DatabaseLockReleasedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.DatabaseLockedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockNode;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database.service.DatabaseLockNodeService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.LockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.LockedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -31,13 +31,15 @@ import java.util.Collections;
import java.util.Optional;
/**
- * Global locks changed watcher.
+ * Database locks changed watcher.
*/
-public final class GlobalLocksChangedWatcher implements GovernanceWatcher<GovernanceEvent> {
+public final class DatabaseLocksChangedWatcher implements GovernanceWatcher<GovernanceEvent> {
+
+ private final DatabaseLockNodeService lockNode = new DatabaseLockNodeService();
@Override
public Collection<String> getWatchingKeys() {
- return Collections.singleton(LockNode.getGlobalDatabaseLocksNodePath());
+ return Collections.singleton(lockNode.getGlobalLocksNodePath());
}
@Override
@@ -47,7 +49,7 @@ public final class GlobalLocksChangedWatcher implements GovernanceWatcher<Govern
@Override
public Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) {
- Optional<String> lockedName = LockNode.parseGlobalDatabaseLocksNodePath(event.getKey());
+ Optional<String> lockedName = lockNode.parseGlobalLocksNodePath(event.getKey());
if (lockedName.isPresent()) {
return handleGlobalSchemaLocksEvent(event.getType(), lockedName.get());
}
@@ -56,9 +58,9 @@ public final class GlobalLocksChangedWatcher implements GovernanceWatcher<Govern
private Optional<GovernanceEvent> handleGlobalSchemaLocksEvent(final Type eventType, final String lockedName) {
if (Type.ADDED == eventType) {
- return Optional.of(new DatabaseLockedEvent(lockedName));
+ return Optional.of(new LockedEvent(lockedName));
} else if (Type.DELETED == eventType) {
- return Optional.of(new DatabaseLockReleasedEvent(lockedName));
+ return Optional.of(new LockReleasedEvent(lockedName));
}
return Optional.empty();
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/DatabaseLockReleasedEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/LockReleasedEvent.java
similarity index 93%
rename from shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/DatabaseLockReleasedEvent.java
rename to shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/LockReleasedEvent.java
index e7dc9dc96d8..0f128c674a6 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/DatabaseLockReleasedEvent.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/LockReleasedEvent.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.Gover
*/
@RequiredArgsConstructor
@Getter
-public final class DatabaseLockReleasedEvent implements GovernanceEvent {
+public final class LockReleasedEvent implements GovernanceEvent {
private final String database;
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/DatabaseLockedEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/LockedEvent.java
similarity index 94%
rename from shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/DatabaseLockedEvent.java
rename to shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/LockedEvent.java
index 014ef43f438..1f10e6a44cf 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/DatabaseLockedEvent.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/LockedEvent.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.Gover
*/
@RequiredArgsConstructor
@Getter
-public final class DatabaseLockedEvent implements GovernanceEvent {
+public final class LockedEvent implements GovernanceEvent {
private final String database;
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryService.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryService.java
index 6815d3fb5fa..48a49cb44e4 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryService.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryService.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.util.LockNodeUtil;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import java.util.Collection;
@@ -73,7 +74,7 @@ public final class LockRegistryService {
repository.releaseLock(lockName);
return;
}
- repository.delete(lockName);
+ repository.delete(LockNodeUtil.generateGlobalLockReleasedNodePath(lockName));
}
/**
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/util/LockNodeUtil.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/util/LockNodeUtil.java
index 8aea107f872..2aefd85ff88 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/util/LockNodeUtil.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/util/LockNodeUtil.java
@@ -28,6 +28,16 @@ public final class LockNodeUtil {
private static final String LOCK_DELIMITER = "-";
+ /**
+ * Generate global Lock leases node path.
+ *
+ * @param lockName lock name
+ * @return global Lock leases name
+ */
+ public static String generateGlobalLockReleasedNodePath(final String lockName) {
+ return lockName + "/leases";
+ }
+
/**
* Generate database ack lock name.
*
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcher.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcher.java
index e7b71861902..5b1048b81e8 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcher.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcher.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.watcher;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.DatabaseLockReleasedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.DatabaseLockedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.LockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.LockedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockNode;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
@@ -56,9 +56,9 @@ public final class GlobalLocksChangedWatcher implements GovernanceWatcher<Govern
private Optional<GovernanceEvent> handleGlobalSchemaLocksEvent(final Type eventType, final String lockedName) {
if (Type.ADDED == eventType) {
- return Optional.of(new DatabaseLockedEvent(lockedName));
+ return Optional.of(new LockedEvent(lockedName));
} else if (Type.DELETED == eventType) {
- return Optional.of(new DatabaseLockReleasedEvent(lockedName));
+ return Optional.of(new LockReleasedEvent(lockedName));
}
return Optional.empty();
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.ShardingSphereLockManager
similarity index 54%
copy from shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
copy to shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.ShardingSphereLockManager
index 93ac983a0e2..0c80094deff 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.ShardingSphereLockManager
@@ -15,10 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.watcher.StorageNodeStateChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.watcher.MetaDataChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.watcher.GlobalRuleChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.watcher.PropertiesChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher.ComputeNodeStateChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.watcher.GlobalLocksChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.watcher.GlobalAckChangedWatcher
+org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database.ShardingSphereDatabaseLockManager
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
index 93ac983a0e2..a0033b2b5e8 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
@@ -20,5 +20,5 @@ org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.wat
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.watcher.GlobalRuleChangedWatcher
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.watcher.PropertiesChangedWatcher
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher.ComputeNodeStateChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.watcher.GlobalLocksChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.watcher.GlobalAckChangedWatcher
+org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database.watcher.DatabaseLocksChangedWatcher
+org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database.watcher.DatabaseAckChangedWatcher
\ No newline at end of file
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContextTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContextTest.java
index 5173586ffcd..5071dc4c6d9 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContextTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContextTest.java
@@ -23,19 +23,11 @@ import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
import org.apache.shardingsphere.infra.instance.definition.InstanceType;
import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
-import org.apache.shardingsphere.infra.lock.ShardingSphereGlobalLock;
import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.AckLockReleasedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.AckLockedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.DatabaseLockedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockRegistryService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database.ShardingSphereDatabaseLock;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.junit.Test;
-import java.lang.reflect.Field;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -45,7 +37,7 @@ public final class DistributeLockContextTest {
@Test
public void assertGetOrCreateDatabaseLock() {
- DistributeLockContext distributeLockContext = new DistributeLockContext(mock(LockRegistryService.class));
+ DistributeLockContext distributeLockContext = new DistributeLockContext(mock(ClusterPersistRepository.class), mock(ComputeNodeInstance.class));
ComputeNodeInstance currentInstance = new ComputeNodeInstance(new InstanceDefinition(InstanceType.PROXY, "127.0.0.1@3307"));
InstanceContext instanceContext = new InstanceContext(currentInstance, mock(WorkerIdGenerator.class), mock(ModeConfiguration.class), distributeLockContext);
instanceContext.initLockContext();
@@ -55,62 +47,22 @@ public final class DistributeLockContextTest {
@Test
public void assertGetDatabaseLock() {
- DistributeLockContext distributeLockContext = new DistributeLockContext(mock(LockRegistryService.class));
+ DistributeLockContext distributeLockContext = new DistributeLockContext(mock(ClusterPersistRepository.class), mock(ComputeNodeInstance.class));
ComputeNodeInstance currentInstance = new ComputeNodeInstance(new InstanceDefinition(InstanceType.PROXY, "127.0.0.1@3307"));
InstanceContext instanceContext = new InstanceContext(currentInstance, mock(WorkerIdGenerator.class), mock(ModeConfiguration.class), distributeLockContext);
instanceContext.initLockContext();
distributeLockContext.getOrCreateDatabaseLock("database");
ShardingSphereLock databaseLock = distributeLockContext.getDatabaseLock("database");
- assertTrue(databaseLock instanceof ShardingSphereDistributeGlobalLock);
+ assertTrue(databaseLock instanceof ShardingSphereDatabaseLock);
}
@Test
public void assertIsLockedDatabase() {
- DistributeLockContext distributeLockContext = new DistributeLockContext(mock(LockRegistryService.class));
+ DistributeLockContext distributeLockContext = new DistributeLockContext(mock(ClusterPersistRepository.class), mock(ComputeNodeInstance.class));
ComputeNodeInstance currentInstance = new ComputeNodeInstance(new InstanceDefinition(InstanceType.PROXY, "127.0.0.1@3307"));
InstanceContext instanceContext = new InstanceContext(currentInstance, mock(WorkerIdGenerator.class), mock(ModeConfiguration.class), distributeLockContext);
instanceContext.initLockContext();
distributeLockContext.getOrCreateDatabaseLock("database");
assertFalse(distributeLockContext.isLockedDatabase("database"));
}
-
- @Test
- public void assertAddGlobalLock() throws IllegalAccessException, NoSuchFieldException {
- DistributeLockContext distributeLockContext = new DistributeLockContext(mock(LockRegistryService.class));
- Field declaredField = DistributeLockContext.class.getDeclaredField("currentInstance");
- declaredField.setAccessible(true);
- declaredField.set(distributeLockContext, new ComputeNodeInstance(new InstanceDefinition(InstanceType.PROXY, "127.0.0.1@3307")));
- Field computeNodeInstancesDeclaredField = DistributeLockContext.class.getDeclaredField("computeNodeInstances");
- computeNodeInstancesDeclaredField.setAccessible(true);
- computeNodeInstancesDeclaredField.set(distributeLockContext, Arrays.asList(new ComputeNodeInstance(new InstanceDefinition(InstanceType.PROXY, "127.0.0.1@3307"))));
- distributeLockContext.renew(new DatabaseLockedEvent("database1-127.0.0.1@3308"));
- }
-
- @Test
- public void assertRemoveGlobalLock() throws IllegalAccessException, NoSuchFieldException {
- DistributeLockContext distributeLockContext = new DistributeLockContext(mock(LockRegistryService.class));
- Field declaredField = DistributeLockContext.class.getDeclaredField("currentInstance");
- declaredField.setAccessible(true);
- declaredField.set(distributeLockContext, new ComputeNodeInstance(new InstanceDefinition(InstanceType.PROXY, "127.0.0.1@3307")));
- Map<String, ShardingSphereGlobalLock> globalLocks = new ConcurrentHashMap<>();
- globalLocks.put("database", mock(ShardingSphereGlobalLock.class));
- Field globalLocksDeclaredField = DistributeLockContext.class.getDeclaredField("globalLocks");
- globalLocksDeclaredField.setAccessible(true);
- globalLocksDeclaredField.set(distributeLockContext, globalLocks);
- assertNotNull(distributeLockContext.getDatabaseLock("database"));
- distributeLockContext.renew(new AckLockReleasedEvent("database-127.0.0.1@3307"));
- assertNotNull(distributeLockContext.getDatabaseLock("database"));
- }
-
- @Test
- public void assertRenew() throws IllegalAccessException, NoSuchFieldException {
- DistributeLockContext distributeLockContext = new DistributeLockContext(mock(LockRegistryService.class));
- Map<String, ShardingSphereGlobalLock> globalLocks = new ConcurrentHashMap<>();
- globalLocks.put("database", mock(ShardingSphereGlobalLock.class));
- Field declaredField = DistributeLockContext.class.getDeclaredField("globalLocks");
- declaredField.setAccessible(true);
- declaredField.set(distributeLockContext, globalLocks);
- AckLockedEvent event = new AckLockedEvent("database-127.0.0.1@3307");
- distributeLockContext.renew(event);
- }
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcherTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcherTest.java
index 8f8e285676a..a7f7bd23924 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcherTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcherTest.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.watcher;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.DatabaseLockReleasedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.DatabaseLockedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.LockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.LockedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import org.junit.Test;
@@ -37,13 +37,13 @@ public final class GlobalLocksChangedWatcherTest {
Optional<GovernanceEvent> add = new GlobalLocksChangedWatcher().createGovernanceEvent(addDataChangedEvent);
assertTrue(add.isPresent());
GovernanceEvent addEvent = add.get();
- assertTrue(addEvent instanceof DatabaseLockedEvent);
- assertThat(((DatabaseLockedEvent) addEvent).getDatabase(), is("database"));
+ assertTrue(addEvent instanceof LockedEvent);
+ assertThat(((LockedEvent) addEvent).getDatabase(), is("database"));
DataChangedEvent deleteDataChangedEvent = new DataChangedEvent("/lock/global/database/locks/database/leases/c_l_0000000", "127.0.0.1@3307", DataChangedEvent.Type.DELETED);
Optional<GovernanceEvent> delete = new GlobalLocksChangedWatcher().createGovernanceEvent(deleteDataChangedEvent);
assertTrue(delete.isPresent());
GovernanceEvent deleteEvent = delete.get();
- assertTrue(deleteEvent instanceof DatabaseLockReleasedEvent);
- assertThat(((DatabaseLockReleasedEvent) deleteEvent).getDatabase(), is("database"));
+ assertTrue(deleteEvent instanceof LockReleasedEvent);
+ assertThat(((LockReleasedEvent) deleteEvent).getDatabase(), is("database"));
}
}