You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/04/27 13:10:05 UTC

[shardingsphere] branch master updated: Optimize lock node service by SPI (#17154)

This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 956cbd178ba Optimize lock node service by SPI (#17154)
956cbd178ba is described below

commit 956cbd178bacc6c00ba0a0391e23ffed3c5b7059
Author: gin <ja...@163.com>
AuthorDate: Wed Apr 27 21:09:46 2022 +0800

    Optimize lock node service by SPI (#17154)
---
 .../cluster/coordinator/lock/LockNodeService.java  |  9 +++
 .../coordinator/lock/LockNodeServiceFactory.java   | 70 ++++++++++++++++
 .../ShardingSphereDatabaseLockManager.java         | 11 ++-
 .../watcher/DatabaseAckChangedWatcher.java         |  5 +-
 .../watcher/DatabaseLocksChangedWatcher.java       |  5 +-
 .../general/ShardingSphereGeneralLockManager.java  | 11 ++-
 .../general/watcher/GeneralAckChangedWatcher.java  |  5 +-
 .../watcher/GeneralLocksChangedWatcher.java        |  5 +-
 .../global/service/DatabaseLockNodeService.java    |  7 ++
 .../global/service/GeneralLockNodeService.java     |  7 ++
 ...anager.cluster.coordinator.lock.LockNodeService | 19 +++++
 .../watcher/DatabaseAckChangedWatcherTest.java     | 82 +++++++++++++++++++
 .../watcher/DatabaseLocksChangedWatcherTest.java   | 80 ++++++++++++++++++
 .../watcher/GeneralAckChangedWatcherTest.java      | 94 ++++++++++++++++++++++
 .../watcher/GeneralLocksChangedWatcherTest.java    | 92 +++++++++++++++++++++
 15 files changed, 488 insertions(+), 14 deletions(-)

diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/LockNodeService.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/LockNodeService.java
index 7fd827d997c..6b870d166d5 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/LockNodeService.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/LockNodeService.java
@@ -17,6 +17,8 @@
 
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock;
 
+import org.apache.shardingsphere.infra.lock.LockType;
+
 import java.util.Optional;
 
 /**
@@ -83,4 +85,11 @@ public interface LockNodeService {
      * @return global locked ack node path
      */
     Optional<String> parseGlobalLockedAckNodePath(String nodePath);
+    
+    /**
+     * Get lock type.
+     *
+     * @return lock type
+     */
+    LockType getLockType();
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/LockNodeServiceFactory.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/LockNodeServiceFactory.java
new file mode 100644
index 00000000000..0832398e0ec
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/LockNodeServiceFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock;
+
+import org.apache.shardingsphere.infra.lock.LockType;
+import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+/**
+ * Lock node service factory.
+ */
+public final class LockNodeServiceFactory {
+    
+    static {
+        ShardingSphereServiceLoader.register(LockNodeService.class);
+    }
+    
+    private static final Map<LockType, LockNodeService> SERVICES = new EnumMap<>(LockType.class);
+    
+    private static final LockNodeServiceFactory INSTANCE = new LockNodeServiceFactory();
+    
+    private LockNodeServiceFactory() {
+        loadLockNodeService();
+    }
+    
+    private void loadLockNodeService() {
+        for (LockNodeService each : ShardingSphereServiceLoader.getServiceInstances(LockNodeService.class)) {
+            if (SERVICES.containsKey(each.getLockType())) {
+                continue;
+            }
+            SERVICES.put(each.getLockType(), each);
+        }
+    }
+    
+    /**
+     * Get instance.
+     *
+     * @return instance
+     */
+    public static LockNodeServiceFactory getInstance() {
+        return INSTANCE;
+    }
+    
+    /**
+     * Get lock node service.
+     *
+     * @param lockType lock type
+     * @return lock node service
+     */
+    public LockNodeService getLockNodeService(final LockType lockType) {
+        return SERVICES.get(lockType);
+    }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/ShardingSphereDatabaseLockManager.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/ShardingSphereDatabaseLockManager.java
index 4ac1d10b1dc..0ec16ec895e 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/ShardingSphereDatabaseLockManager.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/ShardingSphereDatabaseLockManager.java
@@ -25,11 +25,11 @@ import org.apache.shardingsphere.infra.lock.ShardingSphereGlobalLock;
 import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
 import org.apache.shardingsphere.mode.manager.ShardingSphereLockManager;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeServiceFactory;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.database.event.DatabaseAckLockReleasedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.database.event.DatabaseAckLockedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.database.event.DatabaseLockReleasedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.database.event.DatabaseLockedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.service.DatabaseLockNodeService;
 import org.apache.shardingsphere.mode.persist.PersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
@@ -43,9 +43,9 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public final class ShardingSphereDatabaseLockManager implements ShardingSphereLockManager {
     
-    private final Map<String, ShardingSphereDatabaseLock> locks = new ConcurrentHashMap<>();
+    private final Map<String, ShardingSphereDatabaseLock> locks;
     
-    private final LockNodeService lockNodeService = new DatabaseLockNodeService();
+    private final LockNodeService lockNodeService;
     
     private ClusterPersistRepository clusterRepository;
     
@@ -53,6 +53,11 @@ public final class ShardingSphereDatabaseLockManager implements ShardingSphereLo
     
     private Collection<ComputeNodeInstance> computeNodeInstances;
     
+    public ShardingSphereDatabaseLockManager() {
+        locks = new ConcurrentHashMap<>();
+        lockNodeService = LockNodeServiceFactory.getInstance().getLockNodeService(getLockType());
+    }
+    
     @Override
     public void initLocksState(final PersistRepository repository, final ComputeNodeInstance currentInstance, final Collection<ComputeNodeInstance> computeNodeInstances) {
         clusterRepository = (ClusterPersistRepository) repository;
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/watcher/DatabaseAckChangedWatcher.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/watcher/DatabaseAckChangedWatcher.java
index c8a0b014378..7c0e5365abb 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/watcher/DatabaseAckChangedWatcher.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/watcher/DatabaseAckChangedWatcher.java
@@ -17,10 +17,11 @@
 
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.database.watcher;
 
+import org.apache.shardingsphere.infra.lock.LockType;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeServiceFactory;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.database.event.DatabaseAckLockReleasedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.database.event.DatabaseAckLockedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.service.DatabaseLockNodeService;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -36,7 +37,7 @@ import java.util.Optional;
  */
 public final class DatabaseAckChangedWatcher implements GovernanceWatcher<GovernanceEvent> {
     
-    private final LockNodeService lockNode = new DatabaseLockNodeService();
+    private final LockNodeService lockNode = LockNodeServiceFactory.getInstance().getLockNodeService(LockType.DATABASE);
     
     @Override
     public Collection<String> getWatchingKeys() {
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/watcher/DatabaseLocksChangedWatcher.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/watcher/DatabaseLocksChangedWatcher.java
index dba63128f4f..7c80d864657 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/watcher/DatabaseLocksChangedWatcher.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/watcher/DatabaseLocksChangedWatcher.java
@@ -17,10 +17,11 @@
 
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.database.watcher;
 
+import org.apache.shardingsphere.infra.lock.LockType;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeServiceFactory;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.database.event.DatabaseLockReleasedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.database.event.DatabaseLockedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.service.DatabaseLockNodeService;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -36,7 +37,7 @@ import java.util.Optional;
  */
 public final class DatabaseLocksChangedWatcher implements GovernanceWatcher<GovernanceEvent> {
     
-    private final LockNodeService lockNode = new DatabaseLockNodeService();
+    private final LockNodeService lockNode = LockNodeServiceFactory.getInstance().getLockNodeService(LockType.DATABASE);
     
     @Override
     public Collection<String> getWatchingKeys() {
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/ShardingSphereGeneralLockManager.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/ShardingSphereGeneralLockManager.java
index 880bf441b64..b66bf2d5277 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/ShardingSphereGeneralLockManager.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/ShardingSphereGeneralLockManager.java
@@ -24,7 +24,7 @@ import org.apache.shardingsphere.infra.lock.ShardingSphereGlobalLock;
 import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
 import org.apache.shardingsphere.mode.manager.ShardingSphereLockManager;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeService;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.service.GeneralLockNodeService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeServiceFactory;
 import org.apache.shardingsphere.mode.persist.PersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
@@ -38,9 +38,9 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public final class ShardingSphereGeneralLockManager implements ShardingSphereLockManager {
     
-    private final Map<String, ShardingSphereGeneralLock> locks = new ConcurrentHashMap<>();
+    private final Map<String, ShardingSphereGeneralLock> locks;
     
-    private final LockNodeService lockNodeService = new GeneralLockNodeService();
+    private final LockNodeService lockNodeService;
     
     private ClusterPersistRepository clusterRepository;
     
@@ -48,6 +48,11 @@ public final class ShardingSphereGeneralLockManager implements ShardingSphereLoc
     
     private Collection<ComputeNodeInstance> computeNodeInstances;
     
+    public ShardingSphereGeneralLockManager() {
+        locks = new ConcurrentHashMap<>();
+        lockNodeService = LockNodeServiceFactory.getInstance().getLockNodeService(getLockType());
+    }
+    
     @Override
     public void initLocksState(final PersistRepository repository, final ComputeNodeInstance instance, final Collection<ComputeNodeInstance> computeNodeInstances) {
         clusterRepository = (ClusterPersistRepository) repository;
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/watcher/GeneralAckChangedWatcher.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/watcher/GeneralAckChangedWatcher.java
index 4da0457a7de..cb6e134a97d 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/watcher/GeneralAckChangedWatcher.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/watcher/GeneralAckChangedWatcher.java
@@ -17,10 +17,11 @@
 
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general.watcher;
 
+import org.apache.shardingsphere.infra.lock.LockType;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeServiceFactory;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general.event.GeneralAckLockReleasedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general.event.GeneralAckLockedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.service.GeneralLockNodeService;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -36,7 +37,7 @@ import java.util.Optional;
  */
 public final class GeneralAckChangedWatcher implements GovernanceWatcher<GovernanceEvent> {
     
-    private final LockNodeService lockNode = new GeneralLockNodeService();
+    private final LockNodeService lockNode = LockNodeServiceFactory.getInstance().getLockNodeService(LockType.GENERAL);
     
     @Override
     public Collection<String> getWatchingKeys() {
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/watcher/GeneralLocksChangedWatcher.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/watcher/GeneralLocksChangedWatcher.java
index df3b95619c9..1a9ba9b3574 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/watcher/GeneralLocksChangedWatcher.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/watcher/GeneralLocksChangedWatcher.java
@@ -17,10 +17,11 @@
 
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general.watcher;
 
+import org.apache.shardingsphere.infra.lock.LockType;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeServiceFactory;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general.event.GeneralLockReleasedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general.event.GeneralLockedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.service.GeneralLockNodeService;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -36,7 +37,7 @@ import java.util.Optional;
  */
 public final class GeneralLocksChangedWatcher implements GovernanceWatcher<GovernanceEvent> {
     
-    private final LockNodeService lockNode = new GeneralLockNodeService();
+    private final LockNodeService lockNode = LockNodeServiceFactory.getInstance().getLockNodeService(LockType.GENERAL);
     
     @Override
     public Collection<String> getWatchingKeys() {
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/service/DatabaseLockNodeService.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/service/DatabaseLockNodeService.java
index 95b3db9a95a..3f1aeafd54e 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/service/DatabaseLockNodeService.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/service/DatabaseLockNodeService.java
@@ -17,6 +17,8 @@
 
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.service;
 
+import org.apache.shardingsphere.infra.lock.LockType;
+
 /**
  * Database lock node service.
  */
@@ -31,4 +33,9 @@ public final class DatabaseLockNodeService extends AbstractGlobalLockNodeService
     protected String getLockLevel() {
         return "database";
     }
+    
+    @Override
+    public LockType getLockType() {
+        return LockType.DATABASE;
+    }
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/service/GeneralLockNodeService.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/service/GeneralLockNodeService.java
index d14f66d9902..968eab7ce32 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/service/GeneralLockNodeService.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/service/GeneralLockNodeService.java
@@ -17,6 +17,8 @@
 
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.service;
 
+import org.apache.shardingsphere.infra.lock.LockType;
+
 /**
  * General lock node service.
  */
@@ -31,4 +33,9 @@ public final class GeneralLockNodeService extends AbstractGlobalLockNodeService
     protected String getLockLevel() {
         return "general";
     }
+    
+    @Override
+    public LockType getLockType() {
+        return LockType.GENERAL;
+    }
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeService b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeService
new file mode 100644
index 00000000000..18c63116f20
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeService
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.service.GeneralLockNodeService
+org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.service.DatabaseLockNodeService
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/watcher/DatabaseAckChangedWatcherTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/watcher/DatabaseAckChangedWatcherTest.java
new file mode 100644
index 00000000000..191c1a7be4c
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/watcher/DatabaseAckChangedWatcherTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.database.watcher;
+
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.database.event.DatabaseAckLockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.database.event.DatabaseAckLockedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public final class DatabaseAckChangedWatcherTest {
+    
+    private DatabaseAckChangedWatcher watcher;
+    
+    @Before
+    public void init() {
+        watcher = new DatabaseAckChangedWatcher();
+    }
+    
+    @Test
+    public void assertGetWatchingKeys() {
+        Collection<String> keys = watcher.getWatchingKeys();
+        assertThat(keys.size(), is(1));
+        assertThat("/lock/global/database/ack", is(keys.iterator().next()));
+    }
+    
+    @Test
+    public void assertGetWatchingTypes() {
+        Collection<DataChangedEvent.Type> types = watcher.getWatchingTypes();
+        assertThat(types.size(), is(2));
+        Iterator<DataChangedEvent.Type> iterator = types.iterator();
+        assertThat(iterator.next(), is(DataChangedEvent.Type.ADDED));
+        assertThat(iterator.next(), is(DataChangedEvent.Type.DELETED));
+    }
+    
+    @Test
+    public void assertCreateGovernanceEvent() {
+        DataChangedEvent addDataChangedEvent = new DataChangedEvent("/lock/global/database/ack/sharding_db#@#127.0.0.1@3307", "127.0.0.1@3307", DataChangedEvent.Type.ADDED);
+        Optional<GovernanceEvent> addGovernanceEvent = watcher.createGovernanceEvent(addDataChangedEvent);
+        assertTrue(addGovernanceEvent.isPresent());
+        assertTrue(addGovernanceEvent.get() instanceof DatabaseAckLockedEvent);
+        assertThat(((DatabaseAckLockedEvent) addGovernanceEvent.get()).getDatabase(), is("sharding_db"));
+        assertThat(((DatabaseAckLockedEvent) addGovernanceEvent.get()).getLockedInstance(), is("127.0.0.1@3307"));
+        DataChangedEvent deleteDataChangedEvent = new DataChangedEvent("/lock/global/database/ack/sharding_db#@#127.0.0.1@3307", "127.0.0.1@3307", DataChangedEvent.Type.DELETED);
+        Optional<GovernanceEvent> deleteGovernanceEvent = watcher.createGovernanceEvent(deleteDataChangedEvent);
+        assertTrue(deleteGovernanceEvent.isPresent());
+        assertTrue(deleteGovernanceEvent.get() instanceof DatabaseAckLockReleasedEvent);
+        assertThat(((DatabaseAckLockReleasedEvent) deleteGovernanceEvent.get()).getDatabase(), is("sharding_db"));
+        assertThat(((DatabaseAckLockReleasedEvent) deleteGovernanceEvent.get()).getLockedInstance(), is("127.0.0.1@3307"));
+        DataChangedEvent updateDataChangedEvent = new DataChangedEvent("/lock/global/database/ack/sharding_db#@#127.0.0.1@3307", "127.0.0.1@3307", DataChangedEvent.Type.UPDATED);
+        Optional<GovernanceEvent> updateGovernanceEvent = watcher.createGovernanceEvent(updateDataChangedEvent);
+        assertFalse(updateGovernanceEvent.isPresent());
+        DataChangedEvent ignoredDataChangedEvent = new DataChangedEvent("/lock/global/database/ack/sharding_db#@#127.0.0.1@3307", "127.0.0.1@3307", DataChangedEvent.Type.IGNORED);
+        Optional<GovernanceEvent> ignoredGovernanceEvent = watcher.createGovernanceEvent(ignoredDataChangedEvent);
+        assertFalse(ignoredGovernanceEvent.isPresent());
+    }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/watcher/DatabaseLocksChangedWatcherTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/watcher/DatabaseLocksChangedWatch [...]
new file mode 100644
index 00000000000..2f6d71d12c8
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/database/watcher/DatabaseLocksChangedWatcherTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.database.watcher;
+
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.database.event.DatabaseLockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.database.event.DatabaseLockedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public final class DatabaseLocksChangedWatcherTest {
+    
+    private DatabaseLocksChangedWatcher watcher;
+    
+    @Before
+    public void init() {
+        watcher = new DatabaseLocksChangedWatcher();
+    }
+    
+    @Test
+    public void assertGetWatchingKeys() {
+        Collection<String> keys = watcher.getWatchingKeys();
+        assertThat(keys.size(), is(1));
+        assertThat("/lock/global/database/locks", is(keys.iterator().next()));
+    }
+    
+    @Test
+    public void assertGetWatchingTypes() {
+        Collection<DataChangedEvent.Type> types = watcher.getWatchingTypes();
+        assertThat(types.size(), is(2));
+        Iterator<DataChangedEvent.Type> iterator = types.iterator();
+        assertThat(iterator.next(), is(DataChangedEvent.Type.ADDED));
+        assertThat(iterator.next(), is(DataChangedEvent.Type.DELETED));
+    }
+    
+    @Test
+    public void assertCreateGovernanceEvent() {
+        DataChangedEvent addDataChangedEvent = new DataChangedEvent("/lock/global/database/locks/sharding_db/leases/c_l_0000000", "000000000", DataChangedEvent.Type.ADDED);
+        Optional<GovernanceEvent> addGovernanceEvent = watcher.createGovernanceEvent(addDataChangedEvent);
+        assertTrue(addGovernanceEvent.isPresent());
+        assertTrue(addGovernanceEvent.get() instanceof DatabaseLockedEvent);
+        assertThat(((DatabaseLockedEvent) addGovernanceEvent.get()).getDatabase(), is("sharding_db"));
+        DataChangedEvent deleteDataChangedEvent = new DataChangedEvent("/lock/global/database/locks/sharding_db/leases/c_l_0000000", "000000000", DataChangedEvent.Type.DELETED);
+        Optional<GovernanceEvent> deleteGovernanceEvent = watcher.createGovernanceEvent(deleteDataChangedEvent);
+        assertTrue(deleteGovernanceEvent.isPresent());
+        assertTrue(deleteGovernanceEvent.get() instanceof DatabaseLockReleasedEvent);
+        assertThat(((DatabaseLockReleasedEvent) deleteGovernanceEvent.get()).getDatabase(), is("sharding_db"));
+        DataChangedEvent updateDataChangedEvent = new DataChangedEvent("/lock/global/database/ack/sharding_db#@#127.0.0.1@3307", "127.0.0.1@3307", DataChangedEvent.Type.UPDATED);
+        Optional<GovernanceEvent> updateGovernanceEvent = watcher.createGovernanceEvent(updateDataChangedEvent);
+        assertFalse(updateGovernanceEvent.isPresent());
+        DataChangedEvent ignoredDataChangedEvent = new DataChangedEvent("/lock/global/database/ack/sharding_db#@#127.0.0.1@3307", "127.0.0.1@3307", DataChangedEvent.Type.IGNORED);
+        Optional<GovernanceEvent> ignoredGovernanceEvent = watcher.createGovernanceEvent(ignoredDataChangedEvent);
+        assertFalse(ignoredGovernanceEvent.isPresent());
+    }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/watcher/GeneralAckChangedWatcherTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/watcher/GeneralAckChangedWatcherTest.java
new file mode 100644
index 00000000000..8d7ce97ac45
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/watcher/GeneralAckChangedWatcherTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general.watcher;
+
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general.event.GeneralAckLockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general.event.GeneralAckLockedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public final class GeneralAckChangedWatcherTest {
+    
+    private GeneralAckChangedWatcher watcher;
+    
+    @Before
+    public void init() {
+        watcher = new GeneralAckChangedWatcher();
+    }
+    
+    @Test
+    public void assertGetWatchingKeys() {
+        Collection<String> keys = watcher.getWatchingKeys();
+        assertThat(keys.size(), is(1));
+        assertThat("/lock/global/general/ack", is(keys.iterator().next()));
+    }
+    
+    @Test
+    public void assertGetWatchingTypes() {
+        Collection<DataChangedEvent.Type> types = watcher.getWatchingTypes();
+        assertThat(types.size(), is(2));
+        Iterator<DataChangedEvent.Type> iterator = types.iterator();
+        assertThat(iterator.next(), is(DataChangedEvent.Type.ADDED));
+        assertThat(iterator.next(), is(DataChangedEvent.Type.DELETED));
+    }
+    
+    @Test
+    public void assertAddCreateGovernanceEvent() {
+        DataChangedEvent addDataChangedEvent = new DataChangedEvent("/lock/global/general/ack/lock_name#@#127.0.0.1@3307", "127.0.0.1@3307", DataChangedEvent.Type.ADDED);
+        Optional<GovernanceEvent> addGovernanceEvent = watcher.createGovernanceEvent(addDataChangedEvent);
+        assertTrue(addGovernanceEvent.isPresent());
+        assertTrue(addGovernanceEvent.get() instanceof GeneralAckLockedEvent);
+        assertThat(((GeneralAckLockedEvent) addGovernanceEvent.get()).getLockName(), is("lock_name"));
+        assertThat(((GeneralAckLockedEvent) addGovernanceEvent.get()).getLockedInstance(), is("127.0.0.1@3307"));
+    }
+    
+    @Test
+    public void assertDeleteCreateGovernanceEvent() {
+        DataChangedEvent deleteDataChangedEvent = new DataChangedEvent("/lock/global/general/ack/lock_name#@#127.0.0.1@3307", "127.0.0.1@3307", DataChangedEvent.Type.DELETED);
+        Optional<GovernanceEvent> deleteGovernanceEvent = watcher.createGovernanceEvent(deleteDataChangedEvent);
+        assertTrue(deleteGovernanceEvent.isPresent());
+        assertTrue(deleteGovernanceEvent.get() instanceof GeneralAckLockReleasedEvent);
+        assertThat(((GeneralAckLockReleasedEvent) deleteGovernanceEvent.get()).getLockName(), is("lock_name"));
+        assertThat(((GeneralAckLockReleasedEvent) deleteGovernanceEvent.get()).getLockedInstance(), is("127.0.0.1@3307"));
+    }
+    
+    @Test
+    public void assertUpdateCreateGovernanceEvent() {
+        DataChangedEvent updateDataChangedEvent = new DataChangedEvent("/lock/global/general/ack/lock_name#@#127.0.0.1@3307", "127.0.0.1@3307", DataChangedEvent.Type.UPDATED);
+        Optional<GovernanceEvent> updateGovernanceEvent = watcher.createGovernanceEvent(updateDataChangedEvent);
+        assertFalse(updateGovernanceEvent.isPresent());
+    }
+    
+    @Test
+    public void assertIgnoredCreateGovernanceEvent() {
+        DataChangedEvent ignoredDataChangedEvent = new DataChangedEvent("/lock/global/general/ack/lock_name#@#127.0.0.1@3307", "127.0.0.1@3307", DataChangedEvent.Type.IGNORED);
+        Optional<GovernanceEvent> ignoredGovernanceEvent = watcher.createGovernanceEvent(ignoredDataChangedEvent);
+        assertFalse(ignoredGovernanceEvent.isPresent());
+    }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/watcher/GeneralLocksChangedWatcherTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/watcher/GeneralLocksChangedWatcherTest.java
new file mode 100644
index 00000000000..39ab0fe03aa
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/global/general/watcher/GeneralLocksChangedWatcherTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general.watcher;
+
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general.event.GeneralLockReleasedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.global.general.event.GeneralLockedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public final class GeneralLocksChangedWatcherTest {
+    
+    private GeneralLocksChangedWatcher watcher;
+    
+    @Before
+    public void init() {
+        watcher = new GeneralLocksChangedWatcher();
+    }
+    
+    @Test
+    public void assertGetWatchingKeys() {
+        Collection<String> keys = watcher.getWatchingKeys();
+        assertThat(keys.size(), is(1));
+        assertThat("/lock/global/general/locks", is(keys.iterator().next()));
+    }
+    
+    @Test
+    public void assertGetWatchingTypes() {
+        Collection<DataChangedEvent.Type> types = watcher.getWatchingTypes();
+        assertThat(types.size(), is(2));
+        Iterator<DataChangedEvent.Type> iterator = types.iterator();
+        assertThat(iterator.next(), is(DataChangedEvent.Type.ADDED));
+        assertThat(iterator.next(), is(DataChangedEvent.Type.DELETED));
+    }
+    
+    @Test
+    public void assertAddCreateGovernanceEvent() {
+        DataChangedEvent addDataChangedEvent = new DataChangedEvent("/lock/global/general/locks/lock_name/leases/c_l_0000000", "0000000000", DataChangedEvent.Type.ADDED);
+        Optional<GovernanceEvent> addGovernanceEvent = watcher.createGovernanceEvent(addDataChangedEvent);
+        assertTrue(addGovernanceEvent.isPresent());
+        assertTrue(addGovernanceEvent.get() instanceof GeneralLockedEvent);
+        assertThat(((GeneralLockedEvent) addGovernanceEvent.get()).getLockName(), is("lock_name"));
+    }
+    
+    @Test
+    public void assertDeleteCreateGovernanceEvent() {
+        DataChangedEvent deleteDataChangedEvent = new DataChangedEvent("/lock/global/general/locks/lock_name/leases/c_l_0000000", "0000000000", DataChangedEvent.Type.DELETED);
+        Optional<GovernanceEvent> deleteGovernanceEvent = watcher.createGovernanceEvent(deleteDataChangedEvent);
+        assertTrue(deleteGovernanceEvent.isPresent());
+        assertTrue(deleteGovernanceEvent.get() instanceof GeneralLockReleasedEvent);
+        assertThat(((GeneralLockReleasedEvent) deleteGovernanceEvent.get()).getLockName(), is("lock_name"));
+    }
+    
+    @Test
+    public void assertUpdateCreateGovernanceEvent() {
+        DataChangedEvent updateDataChangedEvent = new DataChangedEvent("/lock/global/general/locks/lock_name/leases/c_l_0000000", "0000000000", DataChangedEvent.Type.UPDATED);
+        Optional<GovernanceEvent> updateGovernanceEvent = watcher.createGovernanceEvent(updateDataChangedEvent);
+        assertFalse(updateGovernanceEvent.isPresent());
+    }
+    
+    @Test
+    public void assertIgnoredCreateGovernanceEvent() {
+        DataChangedEvent ignoredDataChangedEvent = new DataChangedEvent("/lock/global/general/locks/lock_name/leases/c_l_0000000", "0000000000", DataChangedEvent.Type.IGNORED);
+        Optional<GovernanceEvent> ignoredGovernanceEvent = watcher.createGovernanceEvent(ignoredDataChangedEvent);
+        assertFalse(ignoredGovernanceEvent.isPresent());
+    }
+}