You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/05/07 08:59:59 UTC
[shardingsphere] branch master updated: Add event handling in general lock manager (#17417)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 56f9cb6cf61 Add event handling in general lock manager (#17417)
56f9cb6cf61 is described below
commit 56f9cb6cf6191ab5886655d2c4ed851f851764ef
Author: gin <ja...@163.com>
AuthorDate: Sat May 7 16:59:43 2022 +0800
Add event handling in general lock manager (#17417)
---
.../mode/manager/ShardingSphereLockManager.java | 8 ---
.../coordinator/lock/DistributeLockContext.java | 4 +-
.../global/ShardingSphereDistributeGlobalLock.java | 27 +++++++--
.../ShardingSphereDatabaseLockManager.java | 13 ++---
.../global/general/ShardingSphereGeneralLock.java | 3 +-
.../general/ShardingSphereGeneralLockManager.java | 65 ++++++++++++++++++++--
.../ShardingSphereStandardLockManager.java | 5 --
7 files changed, 88 insertions(+), 37 deletions(-)
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ShardingSphereLockManager.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ShardingSphereLockManager.java
index e33bd5d5f91..a3d4b614e22 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ShardingSphereLockManager.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ShardingSphereLockManager.java
@@ -46,14 +46,6 @@ public interface ShardingSphereLockManager {
*/
ShardingSphereLock getOrCreateLock(String lockName);
- /**
- * Get lock.
- *
- * @param lockName lock name
- * @return lock
- */
- ShardingSphereLock getLock(String lockName);
-
/**
* Is locked.
*
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java
index 32d85d2d0dd..06b72f131e8 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java
@@ -85,13 +85,13 @@ public final class DistributeLockContext implements LockContext {
@Override
public synchronized ShardingSphereLock getGlobalLock(final String lockName) {
- Preconditions.checkNotNull(lockName, "Get or create global lock args lock name can not be null.");
+ Preconditions.checkNotNull(lockName, "Get global lock args lock name can not be null.");
return lockManagers.get(LockType.GENERAL).getOrCreateLock(lockName);
}
@Override
public synchronized ShardingSphereLock getStandardLock(final String lockName) {
- Preconditions.checkNotNull(lockName, "Get or create standard lock args lock name can not be null.");
+ Preconditions.checkNotNull(lockName, "Get standard lock args lock name can not be null.");
return lockManagers.get(LockType.STANDARD).getOrCreateLock(lockName);
}
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/ShardingSphereDistributeGlobalLock.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/ShardingSphereDistributeGlobalLock.java
index f359fa10bed..a8e4be4b10d 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/ShardingSphereDistributeGlobalLock.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/ShardingSphereDistributeGlobalLock.java
@@ -62,11 +62,22 @@ public final class ShardingSphereDistributeGlobalLock implements ShardingSphereG
}
private boolean innerTryLock(final String lockName, final long timeout) {
- if (synchronizedLockState.compareAndSet(LockState.UNLOCKED, LockState.LOCKING) && isOwner.compareAndSet(false, true)) {
- return acquire(lockName, timeout) ? synchronizedLockState.compareAndSet(LockState.LOCKING, LockState.LOCKED)
- : isOwner.compareAndSet(true, false) && synchronizedLockState.compareAndSet(LockState.LOCKING, LockState.UNLOCKED);
+ if (!synchronizedLockState.compareAndSet(LockState.UNLOCKED, LockState.LOCKING)) {
+ log.debug("Inner try Lock failed, lockName: {} is locking", lockName);
+ return false;
+ }
+ if (!isOwner.compareAndSet(false, true)) {
+ log.debug("Inner try Lock set owner failed, lockName: {}", lockName);
+ return false;
+ }
+ if (acquire(lockName, timeout)) {
+ if (synchronizedLockState.compareAndSet(LockState.LOCKING, LockState.LOCKED)) {
+ log.debug("Inner try Lock locked success, lockName: {}", lockName);
+ return true;
+ }
}
- log.debug("innerTryLock locking, lockName={}", lockName);
+ reSetLock();
+ log.debug("Inner try Lock locked failed, lockName: {}", lockName);
return false;
}
@@ -90,6 +101,11 @@ public final class ShardingSphereDistributeGlobalLock implements ShardingSphereG
return false;
}
+ private void reSetLock() {
+ isOwner.set(false);
+ synchronizedLockState.set(LockState.UNLOCKED);
+ }
+
private String getCurrentInstanceId() {
return currentInstance.getInstanceDefinition().getInstanceId().getId();
}
@@ -121,9 +137,8 @@ public final class ShardingSphereDistributeGlobalLock implements ShardingSphereG
String currentInstanceId = getCurrentInstanceId();
if (isOwner.get()) {
lockService.releaseLock(lockName);
- isOwner.compareAndSet(true, false);
lockedInstances.remove(currentInstanceId);
- synchronizedLockState.compareAndSet(LockState.LOCKED, LockState.UNLOCKED);
+ reSetLock();
return;
}
lockService.removeLock(lockName);
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/ShardingSphereDatabaseLockManager.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/ShardingSphereDatabaseLockManager.java
index 41cb9fd664c..f81821b7ec8 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/ShardingSphereDatabaseLockManager.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/ShardingSphereDatabaseLockManager.java
@@ -95,11 +95,6 @@ public final class ShardingSphereDatabaseLockManager implements ShardingSphereLo
return result;
}
- @Override
- public ShardingSphereLock getLock(final String lockName) {
- return locks.get(lockName);
- }
-
@Override
public boolean isLocked(final String lockName) {
if (locks.isEmpty()) {
@@ -113,7 +108,7 @@ public final class ShardingSphereDatabaseLockManager implements ShardingSphereLo
}
/**
- * Locked.
+ * Database locked.
*
* @param event database locked event
*/
@@ -129,7 +124,7 @@ public final class ShardingSphereDatabaseLockManager implements ShardingSphereLo
}
/**
- * Lock released.
+ * Database lock released.
*
* @param event database lock released event
*/
@@ -140,7 +135,7 @@ public final class ShardingSphereDatabaseLockManager implements ShardingSphereLo
}
/**
- * Ack locked.
+ * Database ack locked.
*
* @param event database ack locked event
*/
@@ -150,7 +145,7 @@ public final class ShardingSphereDatabaseLockManager implements ShardingSphereLo
}
/**
- * Ack lock released.
+ * Database ack lock released.
*
* @param event database ack lock released event
*/
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/ShardingSphereGeneralLock.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/ShardingSphereGeneralLock.java
index 15bdad69588..dc700e9175c 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/ShardingSphereGeneralLock.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/ShardingSphereGeneralLock.java
@@ -60,8 +60,7 @@ public final class ShardingSphereGeneralLock implements ShardingSphereGlobalLock
}
private synchronized boolean innerTryLock(final String lockName, final long timeoutMillis) {
- boolean isAcquired = sequencedSemaphoreLock.tryLock(lockName, TimeoutMilliseconds.MIN_TRY_LOCK);
- if (!isAcquired) {
+ if (!sequencedSemaphoreLock.tryLock(lockName, TimeoutMilliseconds.MIN_TRY_LOCK)) {
return false;
}
try {
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/ShardingSphereGeneralLockManager.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/ShardingSphereGeneralLockManager.java
index 7bdd3902ccd..a85038baf65 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/ShardingSphereGeneralLockManager.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/ShardingSphereGeneralLockManager.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general;
+import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.lock.LockType;
@@ -25,6 +26,10 @@ import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
import org.apache.shardingsphere.mode.manager.ShardingSphereLockManager;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeService;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeServiceFactory;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general.event.GeneralAckLockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general.event.GeneralAckLockedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general.event.GeneralLockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general.event.GeneralLockedEvent;
import org.apache.shardingsphere.mode.persist.PersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -90,11 +95,6 @@ public final class ShardingSphereGeneralLockManager implements ShardingSphereLoc
return result;
}
- @Override
- public ShardingSphereLock getLock(final String lockName) {
- return locks.get(lockName);
- }
-
@Override
public boolean isLocked(final String lockName) {
if (locks.isEmpty()) {
@@ -107,6 +107,61 @@ public final class ShardingSphereGeneralLockManager implements ShardingSphereLoc
return false;
}
+ /**
+ * General locked.
+ *
+ * @param event general locked event
+ */
+ @Subscribe
+ public synchronized void locked(final GeneralLockedEvent event) {
+ String lockName = event.getLockName();
+ ShardingSphereGeneralLock lock = locks.get(lockName);
+ if (null == lock) {
+ lock = createGeneralLock();
+ locks.put(lockName, lock);
+ }
+ lock.ackLock(lockName, getCurrentInstanceId());
+ }
+
+ /**
+ * General lock released.
+ *
+ * @param event general lock released event
+ */
+ @Subscribe
+ public synchronized void lockReleased(final GeneralLockReleasedEvent event) {
+ String lockName = event.getLockName();
+ getOptionalLock(lockName).ifPresent(optional -> optional.releaseAckLock(lockName, getCurrentInstanceId()));
+ }
+
+ /**
+ * General ack locked.
+ *
+ * @param event general ack locked event
+ */
+ @Subscribe
+ public synchronized void ackLocked(final GeneralAckLockedEvent event) {
+ getOptionalLock(event.getLockName()).ifPresent(optional -> optional.addLockedInstance(event.getLockedInstance()));
+ }
+
+ /**
+ * General ack lock released.
+ *
+ * @param event general ack lock released event
+ */
+ @Subscribe
+ public synchronized void ackLockReleased(final GeneralAckLockReleasedEvent event) {
+ getOptionalLock(event.getLockName()).ifPresent(optional -> optional.removeLockedInstance(event.getLockedInstance()));
+ }
+
+ private String getCurrentInstanceId() {
+ return currentInstance.getInstanceDefinition().getInstanceId().getId();
+ }
+
+ private Optional<ShardingSphereGeneralLock> getOptionalLock(final String lockName) {
+ return Optional.ofNullable(locks.get(lockName));
+ }
+
@Override
public LockType getLockType() {
return LockType.GENERAL;
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/standard/ShardingSphereStandardLockManager.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/standard/ShardingSphereStandardLockManager.java
index f96a559e33e..70249eb60b1 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/standard/ShardingSphereStandardLockManager.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/standard/ShardingSphereStandardLockManager.java
@@ -64,11 +64,6 @@ public final class ShardingSphereStandardLockManager implements ShardingSphereLo
return result;
}
- @Override
- public ShardingSphereLock getLock(final String lockName) {
- return locks.get(lockName);
- }
-
@Override
public boolean isLocked(final String lockName) {
if (locks.isEmpty()) {