You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/04/06 13:10:39 UTC
[shardingsphere] branch master updated: Refactor lock registry service (#16616)
This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 07050022a78 Refactor lock registry service (#16616)
07050022a78 is described below
commit 07050022a78588e7c0042f0b2a29bd704c97d1b9
Author: gin <ja...@163.com>
AuthorDate: Wed Apr 6 21:10:24 2022 +0800
Refactor lock registry service (#16616)
---
.../cluster/ClusterContextManagerBuilder.java | 4 +-
.../coordinator/lock/DistributeLockContext.java | 57 ++++++-----
.../lock/ShardingSphereDistributeGlobalLock.java | 16 ++--
...icationEvent.java => AckLockReleasedEvent.java} | 4 +-
...obalAckLockedEvent.java => AckLockedEvent.java} | 6 +-
.../lock/event/GlobalAckLockReleasedEvent.java | 34 -------
.../lock/event/GlobalLockReleasedEvent.java | 34 -------
.../lock/event/InnerLockReleasedEvent.java | 31 ------
.../{GlobalLockedEvent.java => LockedEvent.java} | 6 +-
.../coordinator/lock/service/GlobalLockNode.java | 106 ---------------------
.../lock/service/GlobalLockRegistryService.java | 95 ------------------
.../cluster/coordinator/lock/service/LockNode.java | 81 ++++++++++++++--
.../lock/service/LockRegistryService.java | 66 ++++++++++++-
.../coordinator/lock/util/LockNodeUtil.java | 8 +-
.../lock/watcher/GlobalAckChangedWatcher.java | 20 ++--
.../lock/watcher/GlobalLocksChangedWatcher.java | 20 ++--
.../lock/watcher/LockChangedWatcher.java | 59 ------------
....cluster.coordinator.registry.GovernanceWatcher | 1 -
.../lock/DistributeLockContextTest.java | 70 ++++++++++++++
.../lock/service/GlobalLockNodeTest.java | 63 ------------
.../coordinator/lock/service/LockNodeTest.java | 37 +++++++
.../lock/service/LockRegistryServiceTest.java | 27 ++++++
.../coordinator/lock/util/LockNodeUtilTest.java | 4 +-
.../lock/watcher/GlobalAckChangedWatcherTest.java | 49 ++++++++++
.../watcher/GlobalLocksChangedWatcherTest.java | 49 ++++++++++
.../lock/watcher/LockChangedWatcherTest.java | 36 -------
26 files changed, 436 insertions(+), 547 deletions(-)
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 09beec76290..7cf9e0c15f2 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -37,7 +37,7 @@ import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.ClusterContextManagerCoordinator;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.DistributeLockContext;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.GlobalLockRegistryService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockRegistryService;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsBuilder;
@@ -157,7 +157,7 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
final Properties transactionProps, final ModeConfiguration modeConfiguration) {
ComputeNodeInstance computeNodeInstance = metaDataPersistService.getComputeNodePersistService().loadComputeNodeInstance(instanceDefinition);
ClusterWorkerIdGenerator clusterWorkerIdGenerator = new ClusterWorkerIdGenerator(repository, metaDataPersistService, instanceDefinition);
- DistributeLockContext distributeLockContext = new DistributeLockContext(new GlobalLockRegistryService(repository));
+ DistributeLockContext distributeLockContext = new DistributeLockContext(new LockRegistryService(repository));
InstanceContext instanceContext = new InstanceContext(computeNodeInstance, clusterWorkerIdGenerator, modeConfiguration, distributeLockContext);
distributeLockContext.synchronizeGlobalLock(instanceContext);
generateTransactionConfigurationFile(instanceContext, metaDataContexts, transactionProps);
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java
index ab74e085ee8..1bbb78c84cc 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java
@@ -26,11 +26,11 @@ import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.ShardingSphereGlobalLock;
import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.GlobalAckLockedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.GlobalAckLockReleasedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.GlobalLockedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.GlobalLockReleasedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.GlobalLockRegistryService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.AckLockedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.AckLockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.LockedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.LockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockRegistryService;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.util.LockNodeUtil;
import java.util.Collection;
@@ -46,7 +46,7 @@ public final class DistributeLockContext implements LockContext {
private final Map<String, ShardingSphereGlobalLock> globalLocks = new ConcurrentHashMap<>();
- private final GlobalLockRegistryService globalLockService;
+ private final LockRegistryService globalLockService;
private volatile ComputeNodeInstance currentInstance;
@@ -97,13 +97,13 @@ public final class DistributeLockContext implements LockContext {
*/
public void synchronizeGlobalLock(final InstanceContext instanceContext) {
init(instanceContext);
- Collection<String> allGlobalLock = globalLockService.synchronizeAllGlobalLock();
+ Collection<String> allGlobalLock = globalLockService.getAllGlobalLock();
if (allGlobalLock.isEmpty()) {
globalLockService.initGlobalLockRoot();
return;
}
for (String each : allGlobalLock) {
- String[] schemaInstanceId = LockNodeUtil.parseLockName(each);
+ String[] schemaInstanceId = LockNodeUtil.parseSchemaLockName(each);
globalLocks.put(schemaInstanceId[0], crateGlobalLock(schemaInstanceId[1]));
}
}
@@ -124,21 +124,20 @@ public final class DistributeLockContext implements LockContext {
* @param event locked event
*/
@Subscribe
- public synchronized void renew(final GlobalLockedEvent event) {
- String schema = event.getSchema();
- String ownerInstanceId = event.getOwnerInstanceId();
- if (isSameInstanceId(ownerInstanceId)) {
+ public synchronized void renew(final LockedEvent event) {
+ String[] schemaInstance = LockNodeUtil.parseSchemaLockName(event.getLockName());
+ if (isSameInstanceId(schemaInstance[1])) {
return;
}
- Optional<ShardingSphereGlobalLock> globalLock = getGlobalLock(schema);
+ Optional<ShardingSphereGlobalLock> globalLock = getGlobalLock(schemaInstance[0]);
ShardingSphereGlobalLock lock;
if (globalLock.isPresent()) {
lock = globalLock.get();
} else {
- lock = crateGlobalLock(ownerInstanceId);
- globalLocks.put(schema, lock);
+ lock = crateGlobalLock(schemaInstance[1]);
+ globalLocks.put(schemaInstance[0], lock);
}
- lock.ackLock(schema, getCurrentInstanceId());
+ lock.ackLock(schemaInstance[0], getCurrentInstanceId());
}
/**
@@ -147,10 +146,10 @@ public final class DistributeLockContext implements LockContext {
* @param event lock released event
*/
@Subscribe
- public synchronized void renew(final GlobalLockReleasedEvent event) {
- String schema = event.getSchema();
- String ownerInstanceId = event.getOwnerInstanceId();
- if (isSameInstanceId(ownerInstanceId)) {
+ public synchronized void renew(final LockReleasedEvent event) {
+ String[] schemaInstance = LockNodeUtil.parseSchemaLockName(event.getLockName());
+ String schema = schemaInstance[0];
+ if (isSameInstanceId(schemaInstance[1])) {
ShardingSphereGlobalLock shardingSphereGlobalLock = globalLocks.get(schema);
if (null == shardingSphereGlobalLock) {
return;
@@ -171,10 +170,9 @@ public final class DistributeLockContext implements LockContext {
* @param event ack locked event
*/
@Subscribe
- public synchronized void renew(final GlobalAckLockedEvent event) {
- String schema = event.getSchema();
- String lockedInstanceId = event.getLockedInstanceId();
- getGlobalLock(schema).ifPresent(shardingSphereGlobalLock -> shardingSphereGlobalLock.addLockedInstance(lockedInstanceId));
+ public synchronized void renew(final AckLockedEvent event) {
+ String[] schemaInstance = LockNodeUtil.parseSchemaLockName(event.getLockName());
+ getGlobalLock(schemaInstance[0]).ifPresent(shardingSphereGlobalLock -> shardingSphereGlobalLock.addLockedInstance(schemaInstance[1]));
}
/**
@@ -183,13 +181,12 @@ public final class DistributeLockContext implements LockContext {
* @param event ack lock released event.
*/
@Subscribe
- public synchronized void renew(final GlobalAckLockReleasedEvent event) {
- String schema = event.getSchema();
- String lockedInstanceId = event.getLockedInstanceId();
- if (isSameInstanceId(lockedInstanceId)) {
- globalLocks.remove(schema);
+ public synchronized void renew(final AckLockReleasedEvent event) {
+ String[] schemaInstance = LockNodeUtil.parseSchemaLockName(event.getLockName());
+ if (isSameInstanceId(schemaInstance[1])) {
+ globalLocks.remove(schemaInstance[0]);
return;
}
- getGlobalLock(schema).ifPresent(shardingSphereGlobalLock -> shardingSphereGlobalLock.addLockedInstance(lockedInstanceId));
+ getGlobalLock(schemaInstance[0]).ifPresent(shardingSphereGlobalLock -> shardingSphereGlobalLock.addLockedInstance(schemaInstance[1]));
}
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/ShardingSphereDistributeGlobalLock.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/ShardingSphereDistributeGlobalLock.java
index 52c3c796441..a4cc84cfd1d 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/ShardingSphereDistributeGlobalLock.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/ShardingSphereDistributeGlobalLock.java
@@ -20,8 +20,8 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.lock.ShardingSphereGlobalLock;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.GlobalLockRegistryService;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.GlobalLockNode;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockRegistryService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockNode;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockState;
import java.util.Collection;
@@ -48,13 +48,13 @@ public final class ShardingSphereDistributeGlobalLock implements ShardingSphereG
private final AtomicReference<LockState> synchronizedLockState;
- private final GlobalLockRegistryService lockService;
+ private final LockRegistryService lockService;
private final Collection<ComputeNodeInstance> computeNodeInstances;
private final Set<String> lockedInstances = new CopyOnWriteArraySet<>();
- public ShardingSphereDistributeGlobalLock(final ComputeNodeInstance currentInstance, final String ownerInstanceId, final GlobalLockRegistryService lockService,
+ public ShardingSphereDistributeGlobalLock(final ComputeNodeInstance currentInstance, final String ownerInstanceId, final LockRegistryService lockService,
final Collection<ComputeNodeInstance> computeNodeInstances) {
this.currentInstance = currentInstance;
this.ownerInstanceId = ownerInstanceId;
@@ -93,7 +93,7 @@ public final class ShardingSphereDistributeGlobalLock implements ShardingSphereG
}
long count = 0;
do {
- if (lockService.tryLock(GlobalLockNode.generateSchemaLockName(lockName, ownerInstanceId))) {
+ if (lockService.tryGlobalLock(LockNode.generateSchemaLockName(lockName, ownerInstanceId))) {
if (isAckOK(timeout - count)) {
boolean result = synchronizedLockState.compareAndSet(LockState.UNLOCKED, LockState.LOCKED);
log.info("innerTryLock, result={}, lockName={}, lockState={}, globalLock.hashCode={}", result, lockName, synchronizedLockState.get(), hashCode());
@@ -134,7 +134,7 @@ public final class ShardingSphereDistributeGlobalLock implements ShardingSphereG
log.info("releaseLock, state is not locked, ignore, lockName={}", lockName);
return;
}
- lockService.releaseLock(GlobalLockNode.generateSchemaLockName(lockName, ownerInstanceId));
+ lockService.releaseGlobalLock(LockNode.generateSchemaLockName(lockName, ownerInstanceId));
String currentInstanceId = getCurrentInstanceId();
if (isOwnerInstanceId(currentInstanceId)) {
lockedInstances.remove(ownerInstanceId);
@@ -169,13 +169,13 @@ public final class ShardingSphereDistributeGlobalLock implements ShardingSphereG
@Override
public void ackLock(final String lockName, final String lockedInstanceId) {
- lockService.ackLock(GlobalLockNode.generateSchemaAckLockName(lockName, lockedInstanceId), lockedInstanceId);
+ lockService.ackLock(LockNode.generateSchemaAckLockName(lockName, lockedInstanceId), lockedInstanceId);
lockedInstances.add(lockedInstanceId);
}
@Override
public void releaseAckLock(final String lockName, final String lockedInstanceId) {
- lockService.releaseAckLock(GlobalLockNode.generateSchemaAckLockName(lockName, lockedInstanceId));
+ lockService.releaseAckLock(LockNode.generateSchemaAckLockName(lockName, lockedInstanceId));
lockedInstances.remove(lockedInstanceId);
synchronizedLockState.compareAndSet(LockState.LOCKED, LockState.UNLOCKED);
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/LockNotificationEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/AckLockReleasedEvent.java
similarity index 91%
rename from shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/LockNotificationEvent.java
rename to shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/AckLockReleasedEvent.java
index 09dd49bb039..1560a9c3994 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/LockNotificationEvent.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/AckLockReleasedEvent.java
@@ -22,11 +22,11 @@ import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
- * Lock notification event.
+ * Ack released Lock event.
*/
@RequiredArgsConstructor
@Getter
-public final class LockNotificationEvent implements GovernanceEvent {
+public final class AckLockReleasedEvent implements GovernanceEvent {
private final String lockName;
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/GlobalAckLockedEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/AckLockedEvent.java
similarity index 88%
rename from shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/GlobalAckLockedEvent.java
rename to shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/AckLockedEvent.java
index a944d395d25..21ee5300f71 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/GlobalAckLockedEvent.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/AckLockedEvent.java
@@ -26,9 +26,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.Gover
*/
@RequiredArgsConstructor
@Getter
-public final class GlobalAckLockedEvent implements GovernanceEvent {
+public final class AckLockedEvent implements GovernanceEvent {
- private final String schema;
-
- private final String lockedInstanceId;
+ private final String lockName;
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/GlobalAckLockReleasedEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/GlobalAckLockReleasedEvent.java
deleted file mode 100644
index 2cf548a386b..00000000000
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/GlobalAckLockReleasedEvent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
-
-/**
- * Ack released Lock event.
- */
-@RequiredArgsConstructor
-@Getter
-public final class GlobalAckLockReleasedEvent implements GovernanceEvent {
-
- private final String schema;
-
- private final String lockedInstanceId;
-}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/GlobalLockReleasedEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/GlobalLockReleasedEvent.java
deleted file mode 100644
index dadd92b1c88..00000000000
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/GlobalLockReleasedEvent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
-
-/**
- * Lock released event.
- */
-@RequiredArgsConstructor
-@Getter
-public final class GlobalLockReleasedEvent implements GovernanceEvent {
-
- private final String schema;
-
- private final String ownerInstanceId;
-}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/InnerLockReleasedEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/InnerLockReleasedEvent.java
deleted file mode 100644
index 0492e5d2b40..00000000000
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/InnerLockReleasedEvent.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-/**
- * Inner lock released event.
- */
-@RequiredArgsConstructor
-@Getter
-public final class InnerLockReleasedEvent {
-
- private final String lockName;
-}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/GlobalLockedEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/LockedEvent.java
similarity index 88%
rename from shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/GlobalLockedEvent.java
rename to shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/LockedEvent.java
index 3e33667a810..e8f596b1d08 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/GlobalLockedEvent.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/LockedEvent.java
@@ -26,9 +26,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.Gover
*/
@RequiredArgsConstructor
@Getter
-public final class GlobalLockedEvent implements GovernanceEvent {
+public final class LockedEvent implements GovernanceEvent {
- private final String schema;
-
- private final String ownerInstanceId;
+ private final String lockName;
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/GlobalLockNode.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/GlobalLockNode.java
deleted file mode 100644
index d31117bf595..00000000000
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/GlobalLockNode.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service;
-
-import com.google.common.base.Joiner;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.util.LockNodeUtil;
-
-import java.util.Optional;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Global lock node.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class GlobalLockNode {
-
- private static final String LOCK_ROOT = "lock";
-
- private static final String LOCK_SCOPE = "global";
-
- private static final String LOCKS_NODE = "locks";
-
- private static final String LOCKED_ACK_NODE = "ack";
-
- /**
- * Get locks node path.
- *
- * @return locks node path
- */
- public static String getGlobalLocksNodePath() {
- return Joiner.on("/").join("", LOCK_ROOT, LOCK_SCOPE, LOCKS_NODE);
- }
-
- /**
- * Get ack node path.
- *
- * @return ack node path
- */
- public static String getGlobalAckNodePath() {
- return Joiner.on("/").join("", LOCK_ROOT, LOCK_SCOPE, LOCKED_ACK_NODE);
- }
-
- /**
- * Generate schema lock name.
- *
- * @param schema schema
- * @param instanceId instance id
- * @return schema lock name
- */
- public static String generateSchemaLockName(final String schema, final String instanceId) {
- return getGlobalLocksNodePath() + "/" + LockNodeUtil.generateLockName(schema, instanceId);
- }
-
- /**
- * Generate schema ack lock name.
- *
- * @param schema schema
- * @param lockedInstanceId locked instance id
- * @return schema ack lock name
- */
- public static String generateSchemaAckLockName(final String schema, final String lockedInstanceId) {
- return getGlobalAckNodePath() + "/" + LockNodeUtil.generateLockName(schema, lockedInstanceId);
- }
-
- /**
- * Get locked key name by locks node path.
- *
- * @param locksNodePath locks node path
- * @return schema name
- */
- public static Optional<String> getLockedKey(final String locksNodePath) {
- Pattern pattern = Pattern.compile(getGlobalLocksNodePath() + "/(.+)$", Pattern.CASE_INSENSITIVE);
- Matcher matcher = pattern.matcher(locksNodePath);
- return matcher.find() ? Optional.of(matcher.group(1)) : Optional.empty();
- }
-
- /**
- * Get ack locked key name by ack node path.
- *
- * @param ackNodePath ack node path
- * @return locked instance id
- */
- public static Optional<String> getAckLockedKey(final String ackNodePath) {
- Pattern pattern = Pattern.compile(getGlobalAckNodePath() + "/(.+)$", Pattern.CASE_INSENSITIVE);
- Matcher matcher = pattern.matcher(ackNodePath);
- return matcher.find() ? Optional.of(matcher.group(1)) : Optional.empty();
- }
-}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/GlobalLockRegistryService.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/GlobalLockRegistryService.java
deleted file mode 100644
index 0c87b1b0368..00000000000
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/GlobalLockRegistryService.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service;
-
-import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryException;
-
-import java.util.Collection;
-
-/**
- * Global lock registry service.
- */
-public final class GlobalLockRegistryService {
-
- private final ClusterPersistRepository repository;
-
- public GlobalLockRegistryService(final ClusterPersistRepository repository) {
- this.repository = repository;
- }
-
- /**
- * Init global lock root patch.
- */
- public void initGlobalLockRoot() {
- repository.persist(GlobalLockNode.getGlobalLocksNodePath(), "");
- repository.persist(GlobalLockNode.getGlobalAckNodePath(), "");
- }
-
- /**
- * Synchronize all global locks.
- *
- * @return all global locks
- */
- public Collection<String> synchronizeAllGlobalLock() {
- return repository.getChildrenKeys(GlobalLockNode.getGlobalLocksNodePath());
- }
-
- /**
- * Try to get lock.
- *
- * @param lockName lock name
- * @return true if get the lock, false if not
- */
- public boolean tryLock(final String lockName) {
- try {
- repository.persistEphemeral(lockName, LockState.LOCKED.name());
- return true;
- } catch (final ClusterPersistRepositoryException ignored) {
- return false;
- }
- }
-
- /**
- * Release lock.
- *
- * @param lockName lock name
- */
- public void releaseLock(final String lockName) {
- repository.delete(lockName);
- }
-
- /**
- * Ack lock.
- *
- * @param lockName lock name
- * @param lockValue lock value
- */
- public void ackLock(final String lockName, final String lockValue) {
- repository.persistEphemeral(lockName, lockValue);
- }
-
- /**
- * Release ack lock.
- *
- * @param lockName lock name
- */
- public void releaseAckLock(final String lockName) {
- repository.delete(lockName);
- }
-}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockNode.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockNode.java
index 0b395c94c81..b1c50563c28 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockNode.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockNode.java
@@ -20,38 +20,83 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service;
import com.google.common.base.Joiner;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.util.LockNodeUtil;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
- * Lock node.
+ * Global lock node.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class LockNode {
- private static final String LOCK_NODE_ROOT = "lock";
+ private static final String LOCK_ROOT = "lock";
+
+ private static final String LOCK_SCOPE = "global";
private static final String LOCKS_NODE = "locks";
+ private static final String LOCKED_ACK_NODE = "ack";
+
/**
* Get lock root node path.
- *
+ *
* @return lock root node path
*/
public static String getLockRootNodePath() {
- return Joiner.on("/").join("", LOCK_NODE_ROOT, LOCKS_NODE);
+ return Joiner.on("/").join("", LOCK_ROOT, LOCKS_NODE);
}
/**
* Get lock node path.
- *
+ *
* @param lockName lock name
* @return lock node path
*/
public static String getLockNodePath(final String lockName) {
- return Joiner.on("/").join("", LOCK_NODE_ROOT, LOCKS_NODE, lockName);
+ return Joiner.on("/").join("", LOCK_ROOT, LOCKS_NODE, lockName);
+ }
+
+ /**
+ * Get locks node path.
+ *
+ * @return locks node path
+ */
+ public static String getGlobalLocksNodePath() {
+ return Joiner.on("/").join("", LOCK_ROOT, LOCK_SCOPE, LOCKS_NODE);
+ }
+
+ /**
+ * Get ack node path.
+ *
+ * @return ack node path
+ */
+ public static String getGlobalAckNodePath() {
+ return Joiner.on("/").join("", LOCK_ROOT, LOCK_SCOPE, LOCKED_ACK_NODE);
+ }
+
+ /**
+ * Generate schema lock name.
+ *
+ * @param schema schema
+ * @param instanceId instance id
+ * @return schema lock name
+ */
+ public static String generateSchemaLockName(final String schema, final String instanceId) {
+ return getGlobalLocksNodePath() + "/" + LockNodeUtil.generateSchemaLockName(schema, instanceId);
+ }
+
+ /**
+ * Generate schema ack lock name.
+ *
+ * @param schema schema
+ * @param lockedInstanceId locked instance id
+ * @return schema ack lock name
+ */
+ public static String generateSchemaAckLockName(final String schema, final String lockedInstanceId) {
+ return getGlobalAckNodePath() + "/" + LockNodeUtil.generateSchemaLockName(schema, lockedInstanceId);
}
/**
@@ -65,4 +110,28 @@ public final class LockNode {
Matcher matcher = pattern.matcher(lockNodePath);
return matcher.find() ? Optional.of(matcher.group(1)) : Optional.empty();
}
+
+ /**
+ * Get locked key name by locks node path.
+ *
+ * @param locksNodePath locks node path
+ * @return schema name
+ */
+ public static Optional<String> getLockedName(final String locksNodePath) {
+ Pattern pattern = Pattern.compile(getGlobalLocksNodePath() + "/(.+)$", Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pattern.matcher(locksNodePath);
+ return matcher.find() ? Optional.of(matcher.group(1)) : Optional.empty();
+ }
+
+ /**
+ * Get ack locked key name by ack node path.
+ *
+ * @param ackNodePath ack node path
+ * @return locked instance id
+ */
+ public static Optional<String> getAckLockedName(final String ackNodePath) {
+ Pattern pattern = Pattern.compile(getGlobalAckNodePath() + "/(.+)$", Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pattern.matcher(ackNodePath);
+ return matcher.find() ? Optional.of(matcher.group(1)) : Optional.empty();
+ }
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryService.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryService.java
index 5bf31fe3e81..2b03322d075 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryService.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryService.java
@@ -18,11 +18,13 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryException;
+import java.util.Collection;
import java.util.concurrent.TimeUnit;
/**
- * Lock registry service.
+ * Global lock registry service.
*/
public final class LockRegistryService {
@@ -30,11 +32,67 @@ public final class LockRegistryService {
public LockRegistryService(final ClusterPersistRepository repository) {
this.repository = repository;
- initLockNode();
}
- private void initLockNode() {
+ /**
+ * Init global lock root patch.
+ */
+ public void initGlobalLockRoot() {
repository.persist(LockNode.getLockRootNodePath(), "");
+ repository.persist(LockNode.getGlobalLocksNodePath(), "");
+ repository.persist(LockNode.getGlobalAckNodePath(), "");
+ }
+
+ /**
+ * Get all global locks.
+ *
+ * @return all global locks
+ */
+ public Collection<String> getAllGlobalLock() {
+ return repository.getChildrenKeys(LockNode.getGlobalLocksNodePath());
+ }
+
+ /**
+ * Try to get lock.
+ *
+ * @param lockName lock name
+ * @return true if get the lock, false if not
+ */
+ public boolean tryGlobalLock(final String lockName) {
+ try {
+ repository.persistEphemeral(lockName, LockState.LOCKED.name());
+ return true;
+ } catch (final ClusterPersistRepositoryException ignored) {
+ return false;
+ }
+ }
+
+ /**
+ * Release lock.
+ *
+ * @param lockName lock name
+ */
+ public void releaseGlobalLock(final String lockName) {
+ repository.delete(lockName);
+ }
+
+ /**
+ * Ack lock.
+ *
+ * @param lockName lock name
+ * @param lockValue lock value
+ */
+ public void ackLock(final String lockName, final String lockValue) {
+ repository.persistEphemeral(lockName, lockValue);
+ }
+
+ /**
+ * Release ack lock.
+ *
+ * @param lockName lock name
+ */
+ public void releaseAckLock(final String lockName) {
+ repository.delete(lockName);
}
/**
@@ -50,7 +108,7 @@ public final class LockRegistryService {
/**
* Release lock.
- *
+ *
* @param lockName lock name
*/
public void releaseLock(final String lockName) {
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/util/LockNodeUtil.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/util/LockNodeUtil.java
index 0f90489f85d..03a4f66eb0e 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/util/LockNodeUtil.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/util/LockNodeUtil.java
@@ -29,23 +29,23 @@ public final class LockNodeUtil {
private static final String LOCK_DELIMITER = "-";
/**
- * Generate lock name.
+ * Generate schema lock name.
*
* @param schema schema name
* @param instanceId instance id
* @return lock name
*/
- public static String generateLockName(final String schema, final String instanceId) {
+ public static String generateSchemaLockName(final String schema, final String instanceId) {
return schema + LOCK_DELIMITER + instanceId;
}
/**
- * Parse lock name.
+ * Parse schema lock name.
*
* @param lockedName locked name
* @return string array of schema name and instance id
*/
- public static String[] parseLockName(final String lockedName) {
+ public static String[] parseSchemaLockName(final String lockedName) {
return lockedName.trim().split(LOCK_DELIMITER);
}
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalAckChangedWatcher.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalAckChangedWatcher.java
index cb75baeba58..9b77fb1eaf6 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalAckChangedWatcher.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalAckChangedWatcher.java
@@ -17,10 +17,9 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.watcher;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.GlobalAckLockedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.GlobalAckLockReleasedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.GlobalLockNode;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.util.LockNodeUtil;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.AckLockedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.AckLockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockNode;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -38,7 +37,7 @@ public final class GlobalAckChangedWatcher implements GovernanceWatcher<Governan
@Override
public Collection<String> getWatchingKeys() {
- return Collections.singleton(GlobalLockNode.getGlobalAckNodePath());
+ return Collections.singleton(LockNode.getGlobalAckNodePath());
}
@Override
@@ -49,19 +48,18 @@ public final class GlobalAckChangedWatcher implements GovernanceWatcher<Governan
@Override
public Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) {
String key = event.getKey();
- Optional<String> ackLockedName = GlobalLockNode.getAckLockedKey(key);
+ Optional<String> ackLockedName = LockNode.getAckLockedName(key);
if (ackLockedName.isPresent()) {
- String[] schemaInstance = LockNodeUtil.parseLockName(ackLockedName.get());
- return handleGlobalAckEvent(event.getType(), schemaInstance[0], schemaInstance[1]);
+ return handleGlobalAckEvent(event.getType(), ackLockedName.get());
}
return Optional.empty();
}
- private Optional<GovernanceEvent> handleGlobalAckEvent(final Type eventType, final String schema, final String lockedInstanceId) {
+ private Optional<GovernanceEvent> handleGlobalAckEvent(final Type eventType, final String lockedName) {
if (Type.ADDED == eventType) {
- return Optional.of(new GlobalAckLockedEvent(schema, lockedInstanceId));
+ return Optional.of(new AckLockedEvent(lockedName));
} else if (Type.DELETED == eventType) {
- return Optional.of(new GlobalAckLockReleasedEvent(schema, lockedInstanceId));
+ return Optional.of(new AckLockReleasedEvent(lockedName));
}
return Optional.empty();
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcher.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcher.java
index 657a3d8ab9c..33b9791704b 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcher.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcher.java
@@ -17,10 +17,9 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.watcher;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.GlobalLockReleasedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.GlobalLockedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.GlobalLockNode;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.util.LockNodeUtil;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.LockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.LockedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockNode;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -38,7 +37,7 @@ public final class GlobalLocksChangedWatcher implements GovernanceWatcher<Govern
@Override
public Collection<String> getWatchingKeys() {
- return Collections.singleton(GlobalLockNode.getGlobalLocksNodePath());
+ return Collections.singleton(LockNode.getGlobalLocksNodePath());
}
@Override
@@ -48,19 +47,18 @@ public final class GlobalLocksChangedWatcher implements GovernanceWatcher<Govern
@Override
public Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) {
- Optional<String> lockedName = GlobalLockNode.getLockedKey(event.getKey());
+ Optional<String> lockedName = LockNode.getLockedName(event.getKey());
if (lockedName.isPresent()) {
- String[] schemaInstance = LockNodeUtil.parseLockName(lockedName.get());
- return handleGlobalLocksEvent(event.getType(), schemaInstance[0], schemaInstance[1]);
+ return handleGlobalLocksEvent(event.getType(), lockedName.get());
}
return Optional.empty();
}
- private Optional<GovernanceEvent> handleGlobalLocksEvent(final Type eventType, final String schema, final String instanceId) {
+ private Optional<GovernanceEvent> handleGlobalLocksEvent(final Type eventType, final String lockedName) {
if (Type.ADDED == eventType) {
- return Optional.of(new GlobalLockedEvent(schema, instanceId));
+ return Optional.of(new LockedEvent(lockedName));
} else if (Type.DELETED == eventType) {
- return Optional.of(new GlobalLockReleasedEvent(schema, instanceId));
+ return Optional.of(new LockReleasedEvent(lockedName));
} else {
return Optional.empty();
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/LockChangedWatcher.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/LockChangedWatcher.java
deleted file mode 100644
index bad518e1b5f..00000000000
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/LockChangedWatcher.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.watcher;
-
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockNode;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.LockNotificationEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.LockReleasedEvent;
-import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
-import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Optional;
-
-/**
- * Lock changed watcher.
- */
-public final class LockChangedWatcher implements GovernanceWatcher<GovernanceEvent> {
-
- @Override
- public Collection<String> getWatchingKeys() {
- return Collections.singleton(LockNode.getLockRootNodePath());
- }
-
- @Override
- public Collection<Type> getWatchingTypes() {
- return Arrays.asList(Type.ADDED, Type.DELETED);
- }
-
- @Override
- public Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) {
- if (!event.getKey().equals(LockNode.getLockRootNodePath()) && LockNode.getLockName(event.getKey()).isPresent()) {
- if (Type.ADDED == event.getType()) {
- return Optional.of(new LockNotificationEvent(LockNode.getLockName(event.getKey()).get()));
- } else if (Type.DELETED == event.getType()) {
- return Optional.of(new LockReleasedEvent(LockNode.getLockName(event.getKey()).get()));
- }
- }
- return Optional.empty();
- }
-}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
index cbe19f25895..93ac983a0e2 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
@@ -20,6 +20,5 @@ org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.wat
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.watcher.GlobalRuleChangedWatcher
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.watcher.PropertiesChangedWatcher
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher.ComputeNodeStateChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.watcher.LockChangedWatcher
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.watcher.GlobalLocksChangedWatcher
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.watcher.GlobalAckChangedWatcher
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContextTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContextTest.java
new file mode 100644
index 00000000000..98ce914ca67
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContextTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock;
+
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
+import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.definition.InstanceType;
+import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
+import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockRegistryService;
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public final class DistributeLockContextTest {
+
+ @Test
+ public void assertGetOrCreateSchemaLock() {
+ DistributeLockContext distributeLockContext = new DistributeLockContext(mock(LockRegistryService.class));
+ ComputeNodeInstance currentInstance = new ComputeNodeInstance(new InstanceDefinition(InstanceType.PROXY, "127.0.0.1@3307"));
+ InstanceContext instanceContext = new InstanceContext(currentInstance, mock(WorkerIdGenerator.class), mock(ModeConfiguration.class), distributeLockContext);
+ distributeLockContext.synchronizeGlobalLock(instanceContext);
+ ShardingSphereLock schemaLock = distributeLockContext.getOrCreateSchemaLock("schema");
+ assertNotNull(schemaLock);
+ }
+
+ @Test
+ public void assertGetSchemaLock() {
+ DistributeLockContext distributeLockContext = new DistributeLockContext(mock(LockRegistryService.class));
+ ComputeNodeInstance currentInstance = new ComputeNodeInstance(new InstanceDefinition(InstanceType.PROXY, "127.0.0.1@3307"));
+ InstanceContext instanceContext = new InstanceContext(currentInstance, mock(WorkerIdGenerator.class), mock(ModeConfiguration.class), distributeLockContext);
+ distributeLockContext.synchronizeGlobalLock(instanceContext);
+ distributeLockContext.getOrCreateSchemaLock("schema");
+ Optional<ShardingSphereLock> schemaLock = distributeLockContext.getSchemaLock("schema");
+ assertTrue(schemaLock.isPresent());
+ assertTrue(schemaLock.get() instanceof ShardingSphereDistributeGlobalLock);
+ }
+
+ @Test
+ public void assertIsLockedSchema() {
+ DistributeLockContext distributeLockContext = new DistributeLockContext(mock(LockRegistryService.class));
+ ComputeNodeInstance currentInstance = new ComputeNodeInstance(new InstanceDefinition(InstanceType.PROXY, "127.0.0.1@3307"));
+ InstanceContext instanceContext = new InstanceContext(currentInstance, mock(WorkerIdGenerator.class), mock(ModeConfiguration.class), distributeLockContext);
+ distributeLockContext.synchronizeGlobalLock(instanceContext);
+ distributeLockContext.getOrCreateSchemaLock("schema");
+ assertFalse(distributeLockContext.isLockedSchema("schema"));
+ }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/GlobalLockNodeTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/GlobalLockNodeTest.java
deleted file mode 100644
index 9a2a685934f..00000000000
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/GlobalLockNodeTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service;
-
-import org.junit.Test;
-
-import java.util.Optional;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-public final class GlobalLockNodeTest {
-
- @Test
- public void assertGetLockedKey() {
- Optional<String> lockName = GlobalLockNode.getLockedKey("key/lock/global/locks/schema-127.0.0.1@3307");
- assertTrue(lockName.isPresent());
- assertThat(lockName.get(), is("schema-127.0.0.1@3307"));
- }
-
- @Test
- public void assertGetAckLockedKey() {
- Optional<String> lockName = GlobalLockNode.getAckLockedKey("/lock/global/ack/schema-127.0.0.1@3308");
- assertTrue(lockName.isPresent());
- assertThat(lockName.get(), is("schema-127.0.0.1@3308"));
- }
-
- @Test
- public void assertGetGlobalLocksNodePath() {
- assertThat(GlobalLockNode.getGlobalLocksNodePath(), is("/lock/global/locks"));
- }
-
- @Test
- public void assertGenerateSchemaAckLockName() {
- assertThat(GlobalLockNode.generateSchemaAckLockName("schema", "lockedInstanceId"), is("/lock/global/ack/schema-lockedInstanceId"));
- }
-
- @Test
- public void assertGetGlobalAckNodePath() {
- assertThat(GlobalLockNode.getGlobalAckNodePath(), is("/lock/global/ack"));
- }
-
- @Test
- public void assertGenerateSchemaLockName() {
- assertThat(GlobalLockNode.generateSchemaLockName("schema", "instanceId"), is("/lock/global/locks/schema-instanceId"));
- }
-}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockNodeTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockNodeTest.java
index 26648289c57..be8fa720bed 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockNodeTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockNodeTest.java
@@ -19,8 +19,11 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service;
import org.junit.Test;
+import java.util.Optional;
+
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
public final class LockNodeTest {
@@ -33,4 +36,38 @@ public final class LockNodeTest {
public void assertGetLockName() {
assertThat(LockNode.getLockName("/lock/locks/sharding_db.test/_c_c2d-lock-00000").orElse(null), is("sharding_db.test"));
}
+
+ @Test
+ public void assertGetLockedKey() {
+ Optional<String> lockName = LockNode.getLockedName("key/lock/global/locks/schema-127.0.0.1@3307");
+ assertTrue(lockName.isPresent());
+ assertThat(lockName.get(), is("schema-127.0.0.1@3307"));
+ }
+
+ @Test
+ public void assertGetAckLockedKey() {
+ Optional<String> lockName = LockNode.getAckLockedName("/lock/global/ack/schema-127.0.0.1@3308");
+ assertTrue(lockName.isPresent());
+ assertThat(lockName.get(), is("schema-127.0.0.1@3308"));
+ }
+
+ @Test
+ public void assertGetGlobalLocksNodePath() {
+ assertThat(LockNode.getGlobalLocksNodePath(), is("/lock/global/locks"));
+ }
+
+ @Test
+ public void assertGenerateSchemaAckLockName() {
+ assertThat(LockNode.generateSchemaAckLockName("schema", "lockedInstanceId"), is("/lock/global/ack/schema-lockedInstanceId"));
+ }
+
+ @Test
+ public void assertGetGlobalAckNodePath() {
+ assertThat(LockNode.getGlobalAckNodePath(), is("/lock/global/ack"));
+ }
+
+ @Test
+ public void assertGenerateSchemaLockName() {
+ assertThat(LockNode.generateSchemaLockName("schema", "instanceId"), is("/lock/global/locks/schema-instanceId"));
+ }
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryServiceTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryServiceTest.java
index 4668306343b..aa70715106c 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryServiceTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryServiceTest.java
@@ -56,4 +56,31 @@ public final class LockRegistryServiceTest {
lockRegistryService.releaseLock("test");
verify(clusterPersistRepository).releaseLock(LockNode.getLockNodePath("test"));
}
+
+ @Test
+ public void assertGetAllGlobalLock() {
+ lockRegistryService.getAllGlobalLock();
+ verify(clusterPersistRepository).getChildrenKeys(LockNode.getGlobalLocksNodePath());
+ }
+
+ @Test
+ public void assertReleaseGlobalLock() {
+ String schemaLockName = LockNode.generateSchemaLockName("schema", "127.0.0.1@3307");
+ lockRegistryService.releaseGlobalLock(schemaLockName);
+ verify(clusterPersistRepository).delete(schemaLockName);
+ }
+
+ @Test
+ public void assertAckLock() {
+ String schemaAckLock = LockNode.generateSchemaAckLockName("schema", "127.0.0.1@3307");
+ lockRegistryService.ackLock(schemaAckLock, "127.0.0.1@3307");
+ verify(clusterPersistRepository).persistEphemeral(schemaAckLock, "127.0.0.1@3307");
+ }
+
+ @Test
+ public void assertReleaseAckLock() {
+ String schemaAckLock = LockNode.generateSchemaAckLockName("schema", "127.0.0.1@3307");
+ lockRegistryService.releaseAckLock(schemaAckLock);
+ verify(clusterPersistRepository).delete(schemaAckLock);
+ }
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/util/LockNodeUtilTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/util/LockNodeUtilTest.java
index e6a669e961b..24cc01a4cac 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/util/LockNodeUtilTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/util/LockNodeUtilTest.java
@@ -26,12 +26,12 @@ public final class LockNodeUtilTest {
@Test
public void assertGenerateLockName() {
- assertThat(LockNodeUtil.generateLockName("schema", "127.0.0.1@3307"), is("schema-127.0.0.1@3307"));
+ assertThat(LockNodeUtil.generateSchemaLockName("schema", "127.0.0.1@3307"), is("schema-127.0.0.1@3307"));
}
@Test
public void assertParseLockName() {
- String[] lockName = LockNodeUtil.parseLockName("schema-127.0.0.1@3307");
+ String[] lockName = LockNodeUtil.parseSchemaLockName("schema-127.0.0.1@3307");
assertThat(lockName.length, is(2));
assertThat(lockName[0], is("schema"));
assertThat(lockName[1], is("127.0.0.1@3307"));
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalAckChangedWatcherTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalAckChangedWatcherTest.java
new file mode 100644
index 00000000000..d1b3d6ea80a
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalAckChangedWatcherTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.watcher;
+
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.AckLockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.AckLockedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public final class GlobalAckChangedWatcherTest {
+
+ @Test
+ public void assertCreateGovernanceEvent() {
+ DataChangedEvent addDataChangedEvent = new DataChangedEvent("/lock/global/ack/schema-127.0.0.1@3307", "127.0.0.1@3307", DataChangedEvent.Type.ADDED);
+ Optional<GovernanceEvent> add = new GlobalAckChangedWatcher().createGovernanceEvent(addDataChangedEvent);
+ assertTrue(add.isPresent());
+ GovernanceEvent addEvent = add.get();
+ assertTrue(addEvent instanceof AckLockedEvent);
+ assertThat(((AckLockedEvent) addEvent).getLockName(), is("schema-127.0.0.1@3307"));
+ DataChangedEvent deleteDataChangedEvent = new DataChangedEvent("/lock/global/ack/schema-127.0.0.1@3307", "127.0.0.1@3307", DataChangedEvent.Type.DELETED);
+ Optional<GovernanceEvent> delete = new GlobalAckChangedWatcher().createGovernanceEvent(deleteDataChangedEvent);
+ assertTrue(delete.isPresent());
+ GovernanceEvent deleteEvent = delete.get();
+ assertTrue(deleteEvent instanceof AckLockReleasedEvent);
+ assertThat(((AckLockReleasedEvent) deleteEvent).getLockName(), is("schema-127.0.0.1@3307"));
+ }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcherTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcherTest.java
new file mode 100644
index 00000000000..298cc4a93dd
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcherTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.watcher;
+
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.LockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.LockedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public final class GlobalLocksChangedWatcherTest {
+
+ @Test
+ public void assertCreateGovernanceEvent() {
+ DataChangedEvent addDataChangedEvent = new DataChangedEvent("/lock/global/locks/schema-127.0.0.1@3307", "127.0.0.1@3307", DataChangedEvent.Type.ADDED);
+ Optional<GovernanceEvent> add = new GlobalLocksChangedWatcher().createGovernanceEvent(addDataChangedEvent);
+ assertTrue(add.isPresent());
+ GovernanceEvent addEvent = add.get();
+ assertTrue(addEvent instanceof LockedEvent);
+ assertThat(((LockedEvent) addEvent).getLockName(), is("schema-127.0.0.1@3307"));
+ DataChangedEvent deleteDataChangedEvent = new DataChangedEvent("/lock/global/locks/schema-127.0.0.1@3307", "127.0.0.1@3307", DataChangedEvent.Type.DELETED);
+ Optional<GovernanceEvent> delete = new GlobalLocksChangedWatcher().createGovernanceEvent(deleteDataChangedEvent);
+ assertTrue(delete.isPresent());
+ GovernanceEvent deleteEvent = delete.get();
+ assertTrue(deleteEvent instanceof LockReleasedEvent);
+ assertThat(((LockReleasedEvent) deleteEvent).getLockName(), is("schema-127.0.0.1@3307"));
+ }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/LockChangedWatcherTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/LockChangedWatcherTest.java
deleted file mode 100644
index 7c142fdc491..00000000000
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/LockChangedWatcherTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.watcher;
-
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
-import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
-import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
-import org.junit.Test;
-
-import java.util.Optional;
-
-import static org.junit.Assert.assertFalse;
-
-public final class LockChangedWatcherTest {
-
- @Test
- public void assertCreateEventWithInvalidPath() {
- Optional<GovernanceEvent> actual = new LockChangedWatcher().createGovernanceEvent(new DataChangedEvent("/lock/glock", "", Type.ADDED));
- assertFalse(actual.isPresent());
- }
-}