You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/04/19 12:39:07 UTC
[shardingsphere] branch master updated: Optimize global schema lock to remove owner instance id (#16931)
This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a85a6485efa Optimize global schema lock to remove owner instance id (#16931)
a85a6485efa is described below
commit a85a6485efa7682732d473c38276e339c036c0dd
Author: gin <ja...@163.com>
AuthorDate: Tue Apr 19 20:39:02 2022 +0800
Optimize global schema lock to remove owner instance id (#16931)
---
.../infra/lock/ShardingSphereGlobalLock.java | 14 ---
.../coordinator/lock/DistributeLockContext.java | 39 ++-----
.../lock/ShardingSphereDistributeGlobalLock.java | 129 +++++++++------------
...asedEvent.java => SchemaLockReleasedEvent.java} | 13 +--
.../{LockedEvent.java => SchemaLockedEvent.java} | 13 +--
.../cluster/coordinator/lock/service/LockNode.java | 10 +-
.../coordinator/lock/service/LockState.java | 2 +-
.../lock/watcher/GlobalLocksChangedWatcher.java | 12 +-
.../lock/DistributeLockContextTest.java | 5 +-
.../coordinator/lock/service/LockNodeTest.java | 4 +-
.../lock/service/LockRegistryServiceTest.java | 4 +-
.../watcher/GlobalLocksChangedWatcherTest.java | 18 ++-
12 files changed, 95 insertions(+), 168 deletions(-)
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/ShardingSphereGlobalLock.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/ShardingSphereGlobalLock.java
index b2d94acc28a..b613208e71f 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/ShardingSphereGlobalLock.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/ShardingSphereGlobalLock.java
@@ -51,18 +51,4 @@ public interface ShardingSphereGlobalLock extends ShardingSphereLock {
* @param instanceId instance id
*/
void removeLockedInstance(String instanceId);
-
- /**
- * Release locked state.
- *
- * @param lockName lock name
- */
- void releaseLockedState(String lockName);
-
- /**
- * Refresh owner instance id.
- *
- * @param ownerInstanceId owner instance id
- */
- void refreshOwner(String ownerInstanceId);
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java
index 342f04c2041..b1504977d62 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java
@@ -28,11 +28,10 @@ import org.apache.shardingsphere.infra.lock.ShardingSphereGlobalLock;
import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.AckLockedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.AckLockReleasedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.LockedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.LockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.SchemaLockedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.SchemaLockReleasedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockNode;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockRegistryService;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.util.LockNodeUtil;
import java.util.Collection;
import java.util.Map;
@@ -73,10 +72,7 @@ public final class DistributeLockContext implements LockContext {
}
for (String each : allGlobalLock) {
Optional<String> schemaLock = LockNode.parseGlobalSchemaLocksNodePath(each);
- if (schemaLock.isPresent()) {
- String[] schemaInstanceId = LockNodeUtil.parseSchemaLockName(schemaLock.get());
- globalLocks.put(schemaInstanceId[0], crateGlobalLock(schemaInstanceId[1]));
- }
+ schemaLock.ifPresent(schema -> globalLocks.put(schema, crateGlobalLock()));
}
}
@@ -87,13 +83,13 @@ public final class DistributeLockContext implements LockContext {
if (null != result) {
return result;
}
- result = crateGlobalLock(getCurrentInstanceId());
+ result = crateGlobalLock();
globalLocks.put(schemaName, result);
return result;
}
- private ShardingSphereGlobalLock crateGlobalLock(final String ownerInstanceId) {
- return new ShardingSphereDistributeGlobalLock(lockRegistryService, currentInstance, computeNodeInstances, ownerInstanceId);
+ private ShardingSphereGlobalLock crateGlobalLock() {
+ return new ShardingSphereDistributeGlobalLock(lockRegistryService, currentInstance, computeNodeInstances);
}
private String getCurrentInstanceId() {
@@ -123,29 +119,20 @@ public final class DistributeLockContext implements LockContext {
return Optional.ofNullable(globalLocks.get(schemaName));
}
- private boolean isOwnerInstance(final String ownerInstanceId) {
- return getCurrentInstanceId().equals(ownerInstanceId);
- }
-
/**
* Locked event.
*
* @param event locked event
*/
@Subscribe
- public synchronized void renew(final LockedEvent event) {
+ public synchronized void renew(final SchemaLockedEvent event) {
String schema = event.getSchema();
- String ownerInstanceId = event.getOwnerInstanceId();
- if (isOwnerInstance(ownerInstanceId)) {
- return;
- }
ShardingSphereGlobalLock globalLock = globalLocks.get(schema);
if (null == globalLock) {
- globalLock = crateGlobalLock(ownerInstanceId);
+ globalLock = crateGlobalLock();
globalLocks.put(schema, globalLock);
}
globalLock.ackLock(schema, getCurrentInstanceId());
- globalLock.refreshOwner(ownerInstanceId);
}
/**
@@ -154,15 +141,9 @@ public final class DistributeLockContext implements LockContext {
* @param event lock released event
*/
@Subscribe
- public synchronized void renew(final LockReleasedEvent event) {
+ public synchronized void renew(final SchemaLockReleasedEvent event) {
String schema = event.getSchema();
- String ownerInstanceId = event.getOwnerInstanceId();
- if (isOwnerInstance(ownerInstanceId)) {
- return;
- }
- getGlobalLock(schema).ifPresent(shardingSphereGlobalLock -> {
- shardingSphereGlobalLock.releaseAckLock(schema, getCurrentInstanceId());
- });
+ getGlobalLock(schema).ifPresent(lock -> lock.releaseAckLock(schema, getCurrentInstanceId()));
}
/**
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/ShardingSphereDistributeGlobalLock.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/ShardingSphereDistributeGlobalLock.java
index 26c673bef60..d89be40488e 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/ShardingSphereDistributeGlobalLock.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/ShardingSphereDistributeGlobalLock.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.lock.ShardingSphereGlobalLock;
@@ -28,12 +29,14 @@ import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* Global distribute lock of ShardingSphere.
*/
@Slf4j
+@RequiredArgsConstructor
public final class ShardingSphereDistributeGlobalLock implements ShardingSphereGlobalLock {
private static final int CHECK_ACK_INTERVAL_MILLISECONDS = 1000;
@@ -42,39 +45,17 @@ public final class ShardingSphereDistributeGlobalLock implements ShardingSphereG
private static final long DEFAULT_REGISTRY_TIMEOUT_MILLISECONDS = 2 * 100;
- private final ComputeNodeInstance currentInstance;
-
- private final AtomicReference<String> ownerInstanceId;
-
- private final AtomicReference<LockState> synchronizedLockState;
-
private final LockRegistryService lockService;
+ private final ComputeNodeInstance currentInstance;
+
private final Collection<ComputeNodeInstance> computeNodeInstances;
- private final Set<String> lockedInstances = new CopyOnWriteArraySet<>();
+ private final AtomicBoolean isOwner = new AtomicBoolean(false);
- public ShardingSphereDistributeGlobalLock(final LockRegistryService lockService, final ComputeNodeInstance currentInstance, final Collection<ComputeNodeInstance> computeNodeInstances,
- final String ownerInstanceId) {
- this.lockService = lockService;
- this.currentInstance = currentInstance;
- this.computeNodeInstances = computeNodeInstances;
- if (ownerInstanceId.equals(getCurrentInstanceId())) {
- synchronizedLockState = new AtomicReference<>(LockState.UNLOCKED);
- this.ownerInstanceId = new AtomicReference<>();
- } else {
- synchronizedLockState = new AtomicReference<>(LockState.LOCKED);
- this.ownerInstanceId = new AtomicReference<>(ownerInstanceId);
- }
- }
+ private final AtomicReference<LockState> synchronizedLockState = new AtomicReference<>(LockState.UNLOCKED);
- private String getCurrentInstanceId() {
- return currentInstance.getInstanceDefinition().getInstanceId().getId();
- }
-
- private boolean isOwnerInstanceId(final String lockedInstanceId) {
- return ownerInstanceId.get().equals(lockedInstanceId);
- }
+ private final Set<String> lockedInstances = new CopyOnWriteArraySet<>();
@Override
public boolean tryLock(final String lockName) {
@@ -86,33 +67,39 @@ public final class ShardingSphereDistributeGlobalLock implements ShardingSphereG
return innerTryLock(lockName, timeout);
}
- private synchronized boolean innerTryLock(final String lockName, final long timeout) {
- if (LockState.LOCKED == synchronizedLockState.get()) {
- log.debug("innerTryLock, already locked, lockName={}", lockName);
- return false;
+ private boolean innerTryLock(final String lockName, final long timeout) {
+ if (synchronizedLockState.compareAndSet(LockState.UNLOCKED, LockState.LOCKING) && isOwner.compareAndSet(false, true)) {
+ return acquire(lockName, timeout) ? synchronizedLockState.compareAndSet(LockState.LOCKING, LockState.LOCKED)
+ : isOwner.compareAndSet(true, false) && synchronizedLockState.compareAndSet(LockState.LOCKING, LockState.UNLOCKED);
}
- long count = 0;
+ log.debug("innerTryLock locking, lockName={}", lockName);
+ return false;
+ }
+
+ private boolean acquire(final String lockName, final long timeout) {
+ long consumeTime = 0;
+ String currentInstanceId = getCurrentInstanceId();
do {
- String currentInstanceId = getCurrentInstanceId();
- boolean isLocked = lockService.tryGlobalLock(LockNode.generateGlobalSchemaLocksName(lockName, currentInstanceId), DEFAULT_REGISTRY_TIMEOUT_MILLISECONDS);
- count += DEFAULT_REGISTRY_TIMEOUT_MILLISECONDS;
+ boolean isLocked = lockService.tryGlobalLock(LockNode.generateGlobalSchemaLocksName(lockName), DEFAULT_REGISTRY_TIMEOUT_MILLISECONDS);
+ consumeTime += DEFAULT_REGISTRY_TIMEOUT_MILLISECONDS;
if (isLocked) {
lockedInstances.add(currentInstanceId);
- if (isAckOK(timeout - count)) {
- boolean result = synchronizedLockState.compareAndSet(LockState.UNLOCKED, LockState.LOCKED);
- log.debug("innerTryLock, result={}, lockName={}, lockState={}, globalLock.hashCode={}", result, lockName, synchronizedLockState.get(), hashCode());
- ownerInstanceId.set(currentInstanceId);
- return result;
+ if (isAckOK(timeout - consumeTime)) {
+ return true;
} else {
lockedInstances.remove(currentInstanceId);
return false;
}
}
- } while (timeout > count);
- log.debug("innerTryLock timeout, lockName={}", lockName);
+ } while (timeout > consumeTime);
+ log.debug("inner try lock acquire timeout, lockName={}", lockName);
return false;
}
+ private String getCurrentInstanceId() {
+ return currentInstance.getInstanceDefinition().getInstanceId().getId();
+ }
+
private boolean isAckOK(final long timeout) {
long count = 0;
do {
@@ -122,7 +109,7 @@ public final class ShardingSphereDistributeGlobalLock implements ShardingSphereG
sleepInterval();
count += CHECK_ACK_INTERVAL_MILLISECONDS;
} while (timeout > count);
- log.debug("isAckOK timeout");
+ log.debug("is lock ack OK timeout");
return false;
}
@@ -135,22 +122,20 @@ public final class ShardingSphereDistributeGlobalLock implements ShardingSphereG
@Override
public void releaseLock(final String lockName) {
- log.debug("releaseLock, lockName={}", lockName);
- if (LockState.LOCKED != synchronizedLockState.get()) {
- log.debug("releaseLock, state is not locked, ignore, lockName={}", lockName);
- return;
- }
- String currentInstanceId = getCurrentInstanceId();
- if (isOwnerInstanceId(currentInstanceId)) {
- lockService.releaseGlobalLock(LockNode.generateGlobalSchemaLocksName(lockName, this.ownerInstanceId.get()), true);
- lockedInstances.remove(this.ownerInstanceId.get());
- this.ownerInstanceId.set("");
- synchronizedLockState.compareAndSet(LockState.LOCKED, LockState.UNLOCKED);
- return;
+ if (LockState.LOCKED == synchronizedLockState.get()) {
+ log.debug("release lock, lockName={}", lockName);
+ String currentInstanceId = getCurrentInstanceId();
+ if (isOwner.get()) {
+ lockService.releaseGlobalLock(LockNode.generateGlobalSchemaLocksName(lockName), true);
+ isOwner.compareAndSet(true, false);
+ lockedInstances.remove(currentInstanceId);
+ synchronizedLockState.compareAndSet(LockState.LOCKED, LockState.UNLOCKED);
+ return;
+ }
+ lockService.releaseGlobalLock(LockNode.generateGlobalSchemaLockReleasedNodePath(lockName), false);
+ releaseAckLock(lockName, currentInstanceId);
}
- lockService.releaseGlobalLock(LockNode.generateGlobalSchemaLockReleasedNodePath(lockName, this.ownerInstanceId.get()), false);
- ownerInstanceId.set("");
- releaseAckLock(lockName, currentInstanceId);
+ log.debug("release lock, state is not locked, ignore, lockName={}", lockName);
}
@Override
@@ -177,15 +162,21 @@ public final class ShardingSphereDistributeGlobalLock implements ShardingSphereG
@Override
public void ackLock(final String lockName, final String lockedInstanceId) {
- lockService.ackLock(LockNode.generateGlobalSchemaAckLockName(lockName, lockedInstanceId), lockedInstanceId);
- lockedInstances.add(lockedInstanceId);
- synchronizedLockState.compareAndSet(LockState.UNLOCKED, LockState.LOCKED);
+ if (!isOwner.get() && LockState.UNLOCKED == synchronizedLockState.get()) {
+ lockService.ackLock(LockNode.generateGlobalSchemaAckLockName(lockName, lockedInstanceId), lockedInstanceId);
+ lockedInstances.add(lockedInstanceId);
+ synchronizedLockState.compareAndSet(LockState.UNLOCKED, LockState.LOCKED);
+ }
}
@Override
public void releaseAckLock(final String lockName, final String lockedInstanceId) {
- lockService.releaseAckLock(LockNode.generateGlobalSchemaAckLockName(lockName, lockedInstanceId));
- lockedInstances.remove(lockedInstanceId);
+ if (!isOwner.get()) {
+ lockService.releaseAckLock(LockNode.generateGlobalSchemaAckLockName(lockName, lockedInstanceId));
+ } else {
+ isOwner.compareAndSet(true, false);
+ }
+ lockedInstances.remove(getCurrentInstanceId());
synchronizedLockState.compareAndSet(LockState.LOCKED, LockState.UNLOCKED);
}
@@ -198,16 +189,4 @@ public final class ShardingSphereDistributeGlobalLock implements ShardingSphereG
public void removeLockedInstance(final String lockedInstanceId) {
lockedInstances.remove(lockedInstanceId);
}
-
- @Override
- public void releaseLockedState(final String lockName) {
- if (isLocked(lockName)) {
- synchronizedLockState.compareAndSet(LockState.LOCKED, LockState.UNLOCKED);
- }
- }
-
- @Override
- public void refreshOwner(final String ownerInstanceId) {
- this.ownerInstanceId.set(ownerInstanceId);
- }
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/LockReleasedEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/SchemaLockReleasedEvent.java
similarity index 71%
rename from shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/LockReleasedEvent.java
rename to shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/SchemaLockReleasedEvent.java
index 8205d992efc..3c038b3a750 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/LockReleasedEvent.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/SchemaLockReleasedEvent.java
@@ -18,22 +18,15 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event;
import lombok.Getter;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.util.LockNodeUtil;
+import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
* Lock released event.
*/
+@RequiredArgsConstructor
@Getter
-public final class LockReleasedEvent implements GovernanceEvent {
+public final class SchemaLockReleasedEvent implements GovernanceEvent {
private final String schema;
-
- private final String ownerInstanceId;
-
- public LockReleasedEvent(final String lockName) {
- String[] schemaInstance = LockNodeUtil.parseSchemaLockName(lockName);
- this.schema = schemaInstance[0];
- this.ownerInstanceId = schemaInstance[1];
- }
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/LockedEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/SchemaLockedEvent.java
similarity index 71%
rename from shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/LockedEvent.java
rename to shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/SchemaLockedEvent.java
index ce59bd68459..b7fc2bac7c0 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/LockedEvent.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/event/SchemaLockedEvent.java
@@ -18,22 +18,15 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event;
import lombok.Getter;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.util.LockNodeUtil;
+import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
* Locked event.
*/
+@RequiredArgsConstructor
@Getter
-public final class LockedEvent implements GovernanceEvent {
+public final class SchemaLockedEvent implements GovernanceEvent {
private final String schema;
-
- private final String ownerInstanceId;
-
- public LockedEvent(final String lockName) {
- String[] schemaInstance = LockNodeUtil.parseSchemaLockName(lockName);
- this.schema = schemaInstance[0];
- this.ownerInstanceId = schemaInstance[1];
- }
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockNode.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockNode.java
index 31c2db34051..97b3ecae537 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockNode.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockNode.java
@@ -85,11 +85,10 @@ public final class LockNode {
* Generate global schema locks name.
*
* @param schema schema
- * @param instanceId instance id
* @return global schema locks name
*/
- public static String generateGlobalSchemaLocksName(final String schema, final String instanceId) {
- return getGlobalSchemaLocksNodePath() + "/" + LockNodeUtil.generateSchemaLockName(schema, instanceId);
+ public static String generateGlobalSchemaLocksName(final String schema) {
+ return getGlobalSchemaLocksNodePath() + "/" + schema;
}
/**
@@ -107,11 +106,10 @@ public final class LockNode {
* Generate global schema Lock released node path.
*
* @param schema schema
- * @param instanceId instance id
* @return global schema Lock released name
*/
- public static String generateGlobalSchemaLockReleasedNodePath(final String schema, final String instanceId) {
- return getGlobalSchemaLocksNodePath() + "/" + LockNodeUtil.generateSchemaLockName(schema, instanceId) + "/leases";
+ public static String generateGlobalSchemaLockReleasedNodePath(final String schema) {
+ return getGlobalSchemaLocksNodePath() + "/" + schema + "/leases";
}
/**
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockState.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockState.java
index 36172fb4905..46947840a51 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockState.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockState.java
@@ -22,5 +22,5 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service;
*/
public enum LockState {
- LOCKED, UNLOCKED
+ LOCKED, UNLOCKED, LOCKING
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcher.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcher.java
index 70bfb895c61..cf18ec2e9d3 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcher.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcher.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.watcher;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.LockReleasedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.LockedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.SchemaLockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.SchemaLockedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockNode;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
@@ -49,16 +49,16 @@ public final class GlobalLocksChangedWatcher implements GovernanceWatcher<Govern
public Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) {
Optional<String> lockedName = LockNode.parseGlobalSchemaLocksNodePath(event.getKey());
if (lockedName.isPresent()) {
- return handleGlobalLocksEvent(event.getType(), lockedName.get());
+ return handleGlobalSchemaLocksEvent(event.getType(), lockedName.get());
}
return Optional.empty();
}
- private Optional<GovernanceEvent> handleGlobalLocksEvent(final Type eventType, final String lockedName) {
+ private Optional<GovernanceEvent> handleGlobalSchemaLocksEvent(final Type eventType, final String lockedName) {
if (Type.ADDED == eventType) {
- return Optional.of(new LockedEvent(lockedName));
+ return Optional.of(new SchemaLockedEvent(lockedName));
} else if (Type.DELETED == eventType) {
- return Optional.of(new LockReleasedEvent(lockedName));
+ return Optional.of(new SchemaLockReleasedEvent(lockedName));
}
return Optional.empty();
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContextTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContextTest.java
index a5a9dd1f430..f8b660123fa 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContextTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContextTest.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.infra.lock.ShardingSphereGlobalLock;
import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.AckLockReleasedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.AckLockedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.LockedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.SchemaLockedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockRegistryService;
import org.junit.Test;
@@ -83,8 +83,7 @@ public final class DistributeLockContextTest {
Field computeNodeInstancesDeclaredField = DistributeLockContext.class.getDeclaredField("computeNodeInstances");
computeNodeInstancesDeclaredField.setAccessible(true);
computeNodeInstancesDeclaredField.set(distributeLockContext, Arrays.asList(new ComputeNodeInstance(new InstanceDefinition(InstanceType.PROXY, "127.0.0.1@3307"))));
- distributeLockContext.renew(new LockedEvent("schema1-127.0.0.1@3308"));
- assertNotNull(distributeLockContext.getSchemaLock("schema1"));
+ distributeLockContext.renew(new SchemaLockedEvent("schema1-127.0.0.1@3308"));
}
@Test
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockNodeTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockNodeTest.java
index 17d05f1d947..d07737ff5eb 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockNodeTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockNodeTest.java
@@ -49,7 +49,7 @@ public final class LockNodeTest {
@Test
public void assertGenerateGlobalSchemaLocksName() {
- assertThat(LockNode.generateGlobalSchemaLocksName("schema", "127.0.0.1@3307"), is("/lock/global/schema/locks/schema-127.0.0.1@3307"));
+ assertThat(LockNode.generateGlobalSchemaLocksName("schema"), is("/lock/global/schema/locks/schema"));
}
@Test
@@ -59,7 +59,7 @@ public final class LockNodeTest {
@Test
public void assertGenerateGlobalSchemaLockReleasedNodePath() {
- assertThat(LockNode.generateGlobalSchemaLockReleasedNodePath("schema", "127.0.0.1@3307"), is("/lock/global/schema/locks/schema-127.0.0.1@3307/leases"));
+ assertThat(LockNode.generateGlobalSchemaLockReleasedNodePath("schema"), is("/lock/global/schema/locks/schema/leases"));
}
@Test
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryServiceTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryServiceTest.java
index d2e292ba92d..5dc91b6ce3f 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryServiceTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryServiceTest.java
@@ -65,14 +65,14 @@ public final class LockRegistryServiceTest {
@Test
public void assertTryGlobalLock() {
- String schemaLockName = LockNode.generateGlobalSchemaLocksName("schema", "127.0.0.1@3307");
+ String schemaLockName = LockNode.generateGlobalSchemaLocksName("schema");
lockRegistryService.tryGlobalLock(schemaLockName, 300);
verify(clusterPersistRepository).tryLock(schemaLockName, 300, TimeUnit.MILLISECONDS);
}
@Test
public void assertReleaseGlobalLock() {
- String schemaLockName = LockNode.generateGlobalSchemaLocksName("schema", "127.0.0.1@3307");
+ String schemaLockName = LockNode.generateGlobalSchemaLocksName("schema");
lockRegistryService.releaseGlobalLock(schemaLockName, true);
verify(clusterPersistRepository).releaseLock(schemaLockName);
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcherTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcherTest.java
index 88fac57fb57..87f9efd817d 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcherTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/watcher/GlobalLocksChangedWatcherTest.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.watcher;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.LockReleasedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.LockedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.SchemaLockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.event.SchemaLockedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import org.junit.Test;
@@ -33,19 +33,17 @@ public final class GlobalLocksChangedWatcherTest {
@Test
public void assertCreateGovernanceEvent() {
- DataChangedEvent addDataChangedEvent = new DataChangedEvent("/lock/global/schema/locks/schema-127.0.0.1@3307/leases/c_l_0000000", "127.0.0.1@3307", DataChangedEvent.Type.ADDED);
+ DataChangedEvent addDataChangedEvent = new DataChangedEvent("/lock/global/schema/locks/schema/leases/c_l_0000000", "127.0.0.1@3307", DataChangedEvent.Type.ADDED);
Optional<GovernanceEvent> add = new GlobalLocksChangedWatcher().createGovernanceEvent(addDataChangedEvent);
assertTrue(add.isPresent());
GovernanceEvent addEvent = add.get();
- assertTrue(addEvent instanceof LockedEvent);
- assertThat(((LockedEvent) addEvent).getSchema(), is("schema"));
- assertThat(((LockedEvent) addEvent).getOwnerInstanceId(), is("127.0.0.1@3307"));
- DataChangedEvent deleteDataChangedEvent = new DataChangedEvent("/lock/global/schema/locks/schema-127.0.0.1@3307/leases/c_l_0000000", "127.0.0.1@3307", DataChangedEvent.Type.DELETED);
+ assertTrue(addEvent instanceof SchemaLockedEvent);
+ assertThat(((SchemaLockedEvent) addEvent).getSchema(), is("schema"));
+ DataChangedEvent deleteDataChangedEvent = new DataChangedEvent("/lock/global/schema/locks/schema/leases/c_l_0000000", "127.0.0.1@3307", DataChangedEvent.Type.DELETED);
Optional<GovernanceEvent> delete = new GlobalLocksChangedWatcher().createGovernanceEvent(deleteDataChangedEvent);
assertTrue(delete.isPresent());
GovernanceEvent deleteEvent = delete.get();
- assertTrue(deleteEvent instanceof LockReleasedEvent);
- assertThat(((LockReleasedEvent) deleteEvent).getSchema(), is("schema"));
- assertThat(((LockReleasedEvent) deleteEvent).getOwnerInstanceId(), is("127.0.0.1@3307"));
+ assertTrue(deleteEvent instanceof SchemaLockReleasedEvent);
+ assertThat(((SchemaLockReleasedEvent) deleteEvent).getSchema(), is("schema"));
}
}