You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/05/07 08:59:59 UTC

[shardingsphere] branch master updated: Add event handling in general lock manager (#17417)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 56f9cb6cf61 Add event handling in general lock manager (#17417)
56f9cb6cf61 is described below

commit 56f9cb6cf6191ab5886655d2c4ed851f851764ef
Author: gin <ja...@163.com>
AuthorDate: Sat May 7 16:59:43 2022 +0800

    Add event handling in general lock manager (#17417)
---
 .../mode/manager/ShardingSphereLockManager.java    |  8 ---
 .../coordinator/lock/DistributeLockContext.java    |  4 +-
 .../global/ShardingSphereDistributeGlobalLock.java | 27 +++++++--
 .../ShardingSphereDatabaseLockManager.java         | 13 ++---
 .../global/general/ShardingSphereGeneralLock.java  |  3 +-
 .../general/ShardingSphereGeneralLockManager.java  | 65 ++++++++++++++++++++--
 .../ShardingSphereStandardLockManager.java         |  5 --
 7 files changed, 88 insertions(+), 37 deletions(-)

diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ShardingSphereLockManager.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ShardingSphereLockManager.java
index e33bd5d5f91..a3d4b614e22 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ShardingSphereLockManager.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ShardingSphereLockManager.java
@@ -46,14 +46,6 @@ public interface ShardingSphereLockManager {
      */
     ShardingSphereLock getOrCreateLock(String lockName);
     
-    /**
-     * Get lock.
-     *
-     * @param lockName lock name
-     * @return lock
-     */
-    ShardingSphereLock getLock(String lockName);
-    
     /**
      * Is locked.
      *
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java
index 32d85d2d0dd..06b72f131e8 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java
@@ -85,13 +85,13 @@ public final class DistributeLockContext implements LockContext {
     
     @Override
     public synchronized ShardingSphereLock getGlobalLock(final String lockName) {
-        Preconditions.checkNotNull(lockName, "Get or create global lock args lock name can not be null.");
+        Preconditions.checkNotNull(lockName, "Get global lock args lock name can not be null.");
         return lockManagers.get(LockType.GENERAL).getOrCreateLock(lockName);
     }
     
     @Override
     public synchronized ShardingSphereLock getStandardLock(final String lockName) {
-        Preconditions.checkNotNull(lockName, "Get or create standard lock args lock name can not be null.");
+        Preconditions.checkNotNull(lockName, "Get standard lock args lock name can not be null.");
         return lockManagers.get(LockType.STANDARD).getOrCreateLock(lockName);
     }
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/ShardingSphereDistributeGlobalLock.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/ShardingSphereDistributeGlobalLock.java
index f359fa10bed..a8e4be4b10d 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/ShardingSphereDistributeGlobalLock.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/ShardingSphereDistributeGlobalLock.java
@@ -62,11 +62,22 @@ public final class ShardingSphereDistributeGlobalLock implements ShardingSphereG
     }
     
     private boolean innerTryLock(final String lockName, final long timeout) {
-        if (synchronizedLockState.compareAndSet(LockState.UNLOCKED, LockState.LOCKING) && isOwner.compareAndSet(false, true)) {
-            return acquire(lockName, timeout) ? synchronizedLockState.compareAndSet(LockState.LOCKING, LockState.LOCKED)
-                    : isOwner.compareAndSet(true, false) && synchronizedLockState.compareAndSet(LockState.LOCKING, LockState.UNLOCKED);
+        if (!synchronizedLockState.compareAndSet(LockState.UNLOCKED, LockState.LOCKING)) {
+            log.debug("Inner try Lock failed, lockName: {} is locking", lockName);
+            return false;
+        }
+        if (!isOwner.compareAndSet(false, true)) {
+            log.debug("Inner try Lock set owner failed, lockName: {}", lockName);
+            return false;
+        }
+        if (acquire(lockName, timeout)) {
+            if (synchronizedLockState.compareAndSet(LockState.LOCKING, LockState.LOCKED)) {
+                log.debug("Inner try Lock locked success, lockName: {}", lockName);
+                return true;
+            }
         }
-        log.debug("innerTryLock locking, lockName={}", lockName);
+        reSetLock();
+        log.debug("Inner try Lock locked failed, lockName: {}", lockName);
         return false;
     }
     
@@ -90,6 +101,11 @@ public final class ShardingSphereDistributeGlobalLock implements ShardingSphereG
         return false;
     }
     
+    private void reSetLock() {
+        isOwner.set(false);
+        synchronizedLockState.set(LockState.UNLOCKED);
+    }
+    
     private String getCurrentInstanceId() {
         return currentInstance.getInstanceDefinition().getInstanceId().getId();
     }
@@ -121,9 +137,8 @@ public final class ShardingSphereDistributeGlobalLock implements ShardingSphereG
             String currentInstanceId = getCurrentInstanceId();
             if (isOwner.get()) {
                 lockService.releaseLock(lockName);
-                isOwner.compareAndSet(true, false);
                 lockedInstances.remove(currentInstanceId);
-                synchronizedLockState.compareAndSet(LockState.LOCKED, LockState.UNLOCKED);
+                reSetLock();
                 return;
             }
             lockService.removeLock(lockName);
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/ShardingSphereDatabaseLockManager.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/ShardingSphereDatabaseLockManager.java
index 41cb9fd664c..f81821b7ec8 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/ShardingSphereDatabaseLockManager.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/ShardingSphereDatabaseLockManager.java
@@ -95,11 +95,6 @@ public final class ShardingSphereDatabaseLockManager implements ShardingSphereLo
         return result;
     }
     
-    @Override
-    public ShardingSphereLock getLock(final String lockName) {
-        return locks.get(lockName);
-    }
-    
     @Override
     public boolean isLocked(final String lockName) {
         if (locks.isEmpty()) {
@@ -113,7 +108,7 @@ public final class ShardingSphereDatabaseLockManager implements ShardingSphereLo
     }
     
     /**
-     * Locked.
+     * Database locked.
      *
      * @param event database locked event
      */
@@ -129,7 +124,7 @@ public final class ShardingSphereDatabaseLockManager implements ShardingSphereLo
     }
     
     /**
-     * Lock released.
+     * Database lock released.
      *
      * @param event database lock released event
      */
@@ -140,7 +135,7 @@ public final class ShardingSphereDatabaseLockManager implements ShardingSphereLo
     }
     
     /**
-     * Ack locked.
+     * Database ack locked.
      *
      * @param event database ack locked event
      */
@@ -150,7 +145,7 @@ public final class ShardingSphereDatabaseLockManager implements ShardingSphereLo
     }
     
     /**
-     * Ack lock released.
+     * Database ack lock released.
      *
      * @param event database ack lock released event
      */
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/ShardingSphereGeneralLock.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/ShardingSphereGeneralLock.java
index 15bdad69588..dc700e9175c 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/ShardingSphereGeneralLock.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/ShardingSphereGeneralLock.java
@@ -60,8 +60,7 @@ public final class ShardingSphereGeneralLock implements ShardingSphereGlobalLock
     }
     
     private synchronized boolean innerTryLock(final String lockName, final long timeoutMillis) {
-        boolean isAcquired = sequencedSemaphoreLock.tryLock(lockName, TimeoutMilliseconds.MIN_TRY_LOCK);
-        if (!isAcquired) {
+        if (!sequencedSemaphoreLock.tryLock(lockName, TimeoutMilliseconds.MIN_TRY_LOCK)) {
             return false;
         }
         try {
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/ShardingSphereGeneralLockManager.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/ShardingSphereGeneralLockManager.java
index 7bdd3902ccd..a85038baf65 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/ShardingSphereGeneralLockManager.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/ShardingSphereGeneralLockManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general;
 
+import com.google.common.eventbus.Subscribe;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import org.apache.shardingsphere.infra.lock.LockType;
@@ -25,6 +26,10 @@ import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
 import org.apache.shardingsphere.mode.manager.ShardingSphereLockManager;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeService;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeServiceFactory;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general.event.GeneralAckLockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general.event.GeneralAckLockedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general.event.GeneralLockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general.event.GeneralLockedEvent;
 import org.apache.shardingsphere.mode.persist.PersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
@@ -90,11 +95,6 @@ public final class ShardingSphereGeneralLockManager implements ShardingSphereLoc
         return result;
     }
     
-    @Override
-    public ShardingSphereLock getLock(final String lockName) {
-        return locks.get(lockName);
-    }
-    
     @Override
     public boolean isLocked(final String lockName) {
         if (locks.isEmpty()) {
@@ -107,6 +107,61 @@ public final class ShardingSphereGeneralLockManager implements ShardingSphereLoc
         return false;
     }
     
+    /**
+     * General locked.
+     *
+     * @param event general locked event
+     */
+    @Subscribe
+    public synchronized void locked(final GeneralLockedEvent event) {
+        String lockName = event.getLockName();
+        ShardingSphereGeneralLock lock = locks.get(lockName);
+        if (null == lock) {
+            lock = createGeneralLock();
+            locks.put(lockName, lock);
+        }
+        lock.ackLock(lockName, getCurrentInstanceId());
+    }
+    
+    /**
+     * General lock released.
+     *
+     * @param event general lock released event
+     */
+    @Subscribe
+    public synchronized void lockReleased(final GeneralLockReleasedEvent event) {
+        String lockName = event.getLockName();
+        getOptionalLock(lockName).ifPresent(optional -> optional.releaseAckLock(lockName, getCurrentInstanceId()));
+    }
+    
+    /**
+     * General ack locked.
+     *
+     * @param event general ack locked event
+     */
+    @Subscribe
+    public synchronized void ackLocked(final GeneralAckLockedEvent event) {
+        getOptionalLock(event.getLockName()).ifPresent(optional -> optional.addLockedInstance(event.getLockedInstance()));
+    }
+    
+    /**
+     * General ack lock released.
+     *
+     * @param event general ack lock released event
+     */
+    @Subscribe
+    public synchronized void ackLockReleased(final GeneralAckLockReleasedEvent event) {
+        getOptionalLock(event.getLockName()).ifPresent(optional -> optional.removeLockedInstance(event.getLockedInstance()));
+    }
+    
+    private String getCurrentInstanceId() {
+        return currentInstance.getInstanceDefinition().getInstanceId().getId();
+    }
+    
+    private Optional<ShardingSphereGeneralLock> getOptionalLock(final String lockName) {
+        return Optional.ofNullable(locks.get(lockName));
+    }
+    
     @Override
     public LockType getLockType() {
         return LockType.GENERAL;
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/standard/ShardingSphereStandardLockManager.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/standard/ShardingSphereStandardLockManager.java
index f96a559e33e..70249eb60b1 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/standard/ShardingSphereStandardLockManager.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/standard/ShardingSphereStandardLockManager.java
@@ -64,11 +64,6 @@ public final class ShardingSphereStandardLockManager implements ShardingSphereLo
         return result;
     }
     
-    @Override
-    public ShardingSphereLock getLock(final String lockName) {
-        return locks.get(lockName);
-    }
-    
     @Override
     public boolean isLocked(final String lockName) {
         if (locks.isEmpty()) {