You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/04/07 10:07:38 UTC

[shardingsphere] branch master updated: Re-register synchronization lock state in cluster mode (#16636)

This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5aa2071bcb1 Re-register synchronization lock state in cluster mode (#16636)
5aa2071bcb1 is described below

commit 5aa2071bcb1d850b61b8e2b98ba77f2d12b683ae
Author: gin <ja...@163.com>
AuthorDate: Thu Apr 7 18:07:31 2022 +0800

    Re-register synchronization lock state in cluster mode (#16636)
---
 .../infra/instance/ComputeNodeInstance.java        |  9 ++++
 .../infra/instance/InstanceContext.java            |  7 +++
 .../shardingsphere/infra/lock/LockContext.java     |  9 ++++
 .../fixture/FixtureClusterPersistRepository.java   |  4 +-
 .../cluster/ClusterContextManagerBuilder.java      |  6 +--
 .../coordinator/lock/DistributeLockContext.java    | 54 +++++++++++-----------
 .../ClusterPersistRepositoryFactoryTest.java       |  4 +-
 .../fixture/ClusterPersistRepositoryFixture.java   |  4 +-
 .../lock/DistributeLockContextTest.java            |  6 +--
 ...ProcessListClusterPersistRepositoryFixture.java |  4 +-
 .../cluster/ClusterPersistRepository.java          |  6 +--
 .../cluster/ClusterPersistRepositoryFactory.java   |  5 +-
 .../repository/cluster/etcd/EtcdRepository.java    |  4 +-
 .../zookeeper/CuratorZookeeperRepository.java      |  6 +--
 .../listener/SessionConnectionListener.java        | 10 ++--
 .../manager/memory/lock/MemoryLockContext.java     |  6 +++
 .../standalone/lock/StandaloneLockContext.java     |  6 +++
 .../fixture/FixtureClusterPersistRepository.java   |  4 +-
 .../fixture/TestClusterPersistRepository.java      |  4 +-
 .../pipeline/core/util/PipelineContextUtil.java    |  4 +-
 20 files changed, 97 insertions(+), 65 deletions(-)

diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
index 758ad16e83c..806862fc674 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
@@ -66,4 +66,13 @@ public final class ComputeNodeInstance {
     public void switchState(final Collection<String> status) {
         state.switchState(StateType.CIRCUIT_BREAK, null != status && status.contains(StateType.CIRCUIT_BREAK.name()));
     }
+    
+    /**
+     * Get current instance id.
+     *
+     * @return current instance id
+     */
+    public String getCurrentInstanceId() {
+        return instanceDefinition.getInstanceId().getId();
+    }
 }
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
index 188df14c4d7..7d91b536f2f 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
@@ -153,4 +153,11 @@ public final class InstanceContext {
         }
         return result;
     }
+    
+    /**
+     * Init lock context.
+     */
+    public void initLockContext() {
+        lockContext.initLockState(this);
+    }
 }
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockContext.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockContext.java
index b77b778f2d2..49fd7962778 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockContext.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockContext.java
@@ -17,6 +17,8 @@
 
 package org.apache.shardingsphere.infra.lock;
 
+import org.apache.shardingsphere.infra.instance.InstanceContext;
+
 import java.util.Optional;
 
 /**
@@ -24,6 +26,13 @@ import java.util.Optional;
  */
 public interface LockContext {
     
+    /**
+     * Init lock state.
+     *
+     * @param instanceContext instance context
+     */
+    void initLockState(InstanceContext instanceContext);
+    
     /**
      * Get or create schema lock.
      *
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-spring/shardingsphere-jdbc-core-spring/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java b/shardingsphere-jdbc/shardingsphere-jdbc-spring/shardingsphere-jdbc-core-spring/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
index dc5b4be8fce..ea346a81882 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-spring/shardingsphere-jdbc-core-spring/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-spring/shardingsphere-jdbc-core-spring/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.spring.namespace.fixture;
 
 import org.apache.shardingsphere.infra.database.DefaultSchema;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
@@ -81,7 +81,7 @@ public final class FixtureClusterPersistRepository implements ClusterPersistRepo
     }
     
     @Override
-    public void watchSessionConnection(final InstanceDefinition instanceDefinition) {
+    public void watchSessionConnection(final InstanceContext instanceContext) {
     }
     
     @Override
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 7cf9e0c15f2..6da5f3e49c4 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -69,8 +69,7 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
     @Override
     public ContextManager build(final ContextManagerBuilderParameter parameter) throws SQLException {
         ModeScheduleContextFactory.getInstance().init(parameter.getInstanceDefinition().getInstanceId().getId(), parameter.getModeConfig());
-        ClusterPersistRepository repository = ClusterPersistRepositoryFactory.newInstance((ClusterPersistRepositoryConfiguration) parameter.getModeConfig().getRepository(),
-                parameter.getInstanceDefinition());
+        ClusterPersistRepository repository = ClusterPersistRepositoryFactory.newInstance((ClusterPersistRepositoryConfiguration) parameter.getModeConfig().getRepository());
         MetaDataPersistService metaDataPersistService = new MetaDataPersistService(repository);
         persistConfigurations(metaDataPersistService, parameter);
         RegistryCenter registryCenter = new RegistryCenter(repository);
@@ -159,7 +158,8 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
         ClusterWorkerIdGenerator clusterWorkerIdGenerator = new ClusterWorkerIdGenerator(repository, metaDataPersistService, instanceDefinition);
         DistributeLockContext distributeLockContext = new DistributeLockContext(new LockRegistryService(repository));
         InstanceContext instanceContext = new InstanceContext(computeNodeInstance, clusterWorkerIdGenerator, modeConfiguration, distributeLockContext);
-        distributeLockContext.synchronizeGlobalLock(instanceContext);
+        instanceContext.initLockContext();
+        repository.watchSessionConnection(instanceContext);
         generateTransactionConfigurationFile(instanceContext, metaDataContexts, transactionProps);
         TransactionContexts transactionContexts = new TransactionContextsBuilder(metaDataContexts.getMetaDataMap(), metaDataContexts.getGlobalRuleMetaData().getRules()).build();
         ContextManager result = new ContextManager();
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java
index 1bbb78c84cc..b00d776eabb 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java
@@ -46,12 +46,36 @@ public final class DistributeLockContext implements LockContext {
     
     private final Map<String, ShardingSphereGlobalLock> globalLocks = new ConcurrentHashMap<>();
     
-    private final LockRegistryService globalLockService;
+    private final LockRegistryService lockRegistryService;
     
     private volatile ComputeNodeInstance currentInstance;
     
     private volatile Collection<ComputeNodeInstance> computeNodeInstances;
     
+    @Override
+    public void initLockState(final InstanceContext instanceContext) {
+        register(instanceContext);
+        synchronizeGlobalLock();
+    }
+    
+    private void register(final InstanceContext instanceContext) {
+        currentInstance = instanceContext.getInstance();
+        computeNodeInstances = instanceContext.getComputeNodeInstances();
+        ShardingSphereEventBus.getInstance().register(this);
+    }
+    
+    private void synchronizeGlobalLock() {
+        Collection<String> allGlobalLock = lockRegistryService.getAllGlobalLock();
+        if (allGlobalLock.isEmpty()) {
+            lockRegistryService.initGlobalLockRoot();
+            return;
+        }
+        for (String each : allGlobalLock) {
+            String[] schemaInstanceId = LockNodeUtil.parseSchemaLockName(each);
+            globalLocks.put(schemaInstanceId[0], crateGlobalLock(schemaInstanceId[1]));
+        }
+    }
+    
     @Override
     public synchronized ShardingSphereLock getOrCreateSchemaLock(final String schemaName) {
         Preconditions.checkNotNull(schemaName, "Get or create schema lock args schema name can not be null.");
@@ -65,7 +89,7 @@ public final class DistributeLockContext implements LockContext {
     }
     
     private ShardingSphereGlobalLock crateGlobalLock(final String ownerInstanceId) {
-        return new ShardingSphereDistributeGlobalLock(currentInstance, ownerInstanceId, globalLockService, computeNodeInstances);
+        return new ShardingSphereDistributeGlobalLock(currentInstance, ownerInstanceId, lockRegistryService, computeNodeInstances);
     }
     
     private String getCurrentInstanceId() {
@@ -81,7 +105,7 @@ public final class DistributeLockContext implements LockContext {
     }
     
     @Override
-    public synchronized boolean isLockedSchema(final String schemaName) {
+    public boolean isLockedSchema(final String schemaName) {
         Preconditions.checkNotNull(schemaName, "Is locked schema args schema name can not be null.");
         return getGlobalLock(schemaName).map(shardingSphereGlobalLock -> shardingSphereGlobalLock.isLocked(schemaName)).orElse(false);
     }
@@ -90,30 +114,6 @@ public final class DistributeLockContext implements LockContext {
         return Optional.ofNullable(globalLocks.get(schemaName));
     }
     
-    /**
-     * Synchronize global lock.
-     *
-     * @param instanceContext instance context
-     */
-    public void synchronizeGlobalLock(final InstanceContext instanceContext) {
-        init(instanceContext);
-        Collection<String> allGlobalLock = globalLockService.getAllGlobalLock();
-        if (allGlobalLock.isEmpty()) {
-            globalLockService.initGlobalLockRoot();
-            return;
-        }
-        for (String each : allGlobalLock) {
-            String[] schemaInstanceId = LockNodeUtil.parseSchemaLockName(each);
-            globalLocks.put(schemaInstanceId[0], crateGlobalLock(schemaInstanceId[1]));
-        }
-    }
-    
-    private void init(final InstanceContext instanceContext) {
-        currentInstance = instanceContext.getInstance();
-        computeNodeInstances = instanceContext.getComputeNodeInstances();
-        ShardingSphereEventBus.getInstance().register(this);
-    }
-    
     private boolean isSameInstanceId(final String instanceId) {
         return getCurrentInstanceId().equals(instanceId);
     }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterPersistRepositoryFactoryTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterPersistRepositoryFactoryTest.java
index 0bbe37f57b3..facf7bb5925 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterPersistRepositoryFactoryTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterPersistRepositoryFactoryTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.mode.manager.cluster.coordinator;
 
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.fixture.ClusterPersistRepositoryFixture;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
@@ -29,7 +28,6 @@ import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
 
 public final class ClusterPersistRepositoryFactoryTest {
 
@@ -40,7 +38,7 @@ public final class ClusterPersistRepositoryFactoryTest {
     @Test
     public void assertNewInstance() {
         ClusterPersistRepositoryConfiguration config = new ClusterPersistRepositoryConfiguration("TEST", "", "", new Properties());
-        ClusterPersistRepository clusterPersistRepository = ClusterPersistRepositoryFactory.newInstance(config, mock(InstanceDefinition.class));
+        ClusterPersistRepository clusterPersistRepository = ClusterPersistRepositoryFactory.newInstance(config);
         assertThat(clusterPersistRepository.getType(), is("TEST"));
     }
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
index d9bb900bea2..ade212aa3b6 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.fixture;
 
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
@@ -73,7 +73,7 @@ public final class ClusterPersistRepositoryFixture implements ClusterPersistRepo
     }
     
     @Override
-    public void watchSessionConnection(final InstanceDefinition instanceDefinition) {
+    public void watchSessionConnection(final InstanceContext instanceContext) {
     }
     
     @Override
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContextTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContextTest.java
index 98ce914ca67..d06dd7164be 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContextTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContextTest.java
@@ -41,7 +41,7 @@ public final class DistributeLockContextTest {
         DistributeLockContext distributeLockContext = new DistributeLockContext(mock(LockRegistryService.class));
         ComputeNodeInstance currentInstance = new ComputeNodeInstance(new InstanceDefinition(InstanceType.PROXY, "127.0.0.1@3307"));
         InstanceContext instanceContext = new InstanceContext(currentInstance, mock(WorkerIdGenerator.class), mock(ModeConfiguration.class), distributeLockContext);
-        distributeLockContext.synchronizeGlobalLock(instanceContext);
+        instanceContext.initLockContext();
         ShardingSphereLock schemaLock = distributeLockContext.getOrCreateSchemaLock("schema");
         assertNotNull(schemaLock);
     }
@@ -51,7 +51,7 @@ public final class DistributeLockContextTest {
         DistributeLockContext distributeLockContext = new DistributeLockContext(mock(LockRegistryService.class));
         ComputeNodeInstance currentInstance = new ComputeNodeInstance(new InstanceDefinition(InstanceType.PROXY, "127.0.0.1@3307"));
         InstanceContext instanceContext = new InstanceContext(currentInstance, mock(WorkerIdGenerator.class), mock(ModeConfiguration.class), distributeLockContext);
-        distributeLockContext.synchronizeGlobalLock(instanceContext);
+        instanceContext.initLockContext();
         distributeLockContext.getOrCreateSchemaLock("schema");
         Optional<ShardingSphereLock> schemaLock = distributeLockContext.getSchemaLock("schema");
         assertTrue(schemaLock.isPresent());
@@ -63,7 +63,7 @@ public final class DistributeLockContextTest {
         DistributeLockContext distributeLockContext = new DistributeLockContext(mock(LockRegistryService.class));
         ComputeNodeInstance currentInstance = new ComputeNodeInstance(new InstanceDefinition(InstanceType.PROXY, "127.0.0.1@3307"));
         InstanceContext instanceContext = new InstanceContext(currentInstance, mock(WorkerIdGenerator.class), mock(ModeConfiguration.class), distributeLockContext);
-        distributeLockContext.synchronizeGlobalLock(instanceContext);
+        instanceContext.initLockContext();
         distributeLockContext.getOrCreateSchemaLock("schema");
         assertFalse(distributeLockContext.isLockedSchema("schema"));
     }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFix [...]
index a8feb6870fd..ab6b2128933 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process;
 
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -80,7 +80,7 @@ public final class ProcessListClusterPersistRepositoryFixture implements Cluster
     }
     
     @Override
-    public void watchSessionConnection(final InstanceDefinition instanceDefinition) {
+    public void watchSessionConnection(final InstanceContext instanceContext) {
     }
     
     @Override
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPer [...]
index 6868da2b9b5..25b5f9d7a28 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.mode.repository.cluster;
 
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.mode.persist.PersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 
@@ -80,7 +80,7 @@ public interface ClusterPersistRepository extends PersistRepository {
     /**
      * Watch session connection.
      *
-     * @param instanceDefinition instance definition
+     * @param instanceContext instance context
      */
-    void watchSessionConnection(InstanceDefinition instanceDefinition);
+    void watchSessionConnection(InstanceContext instanceContext);
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepositoryFactory.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/Clu [...]
index 8c1453e9fbc..78e6b6bc2fe 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepositoryFactory.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepositoryFactory.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.mode.repository.cluster;
 import com.google.common.base.Preconditions;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
 import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.spi.type.typed.TypedSPIRegistry;
 
@@ -38,14 +37,12 @@ public final class ClusterPersistRepositoryFactory {
      * Create new instance of cluster persist repository.
      * 
      * @param config persist repository configuration
-     * @param instanceDefinition instance definition
      * @return new instance of cluster persist repository
      */
-    public static ClusterPersistRepository newInstance(final ClusterPersistRepositoryConfiguration config, final InstanceDefinition instanceDefinition) {
+    public static ClusterPersistRepository newInstance(final ClusterPersistRepositoryConfiguration config) {
         Preconditions.checkNotNull(config, "Cluster persist repository configuration cannot be null.");
         ClusterPersistRepository result = TypedSPIRegistry.getRegisteredService(ClusterPersistRepository.class, config.getType(), config.getProps());
         result.init(config);
-        result.watchSessionConnection(instanceDefinition);
         return result;
     }
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mo [...]
index 6ac45201dd5..47a53ad56d8 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
@@ -33,7 +33,7 @@ import lombok.Getter;
 import lombok.Setter;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdProperties;
 import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdPropertyKey;
@@ -173,7 +173,7 @@ public final class EtcdRepository implements ClusterPersistRepository {
     }
     
     @Override
-    public void watchSessionConnection(final InstanceDefinition instanceDefinition) {
+    public void watchSessionConnection(final InstanceContext instanceContext) {
     }
     
     @Override
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-prov [...]
index 53dac57bd8a..61ab8d042d4 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
@@ -31,7 +31,7 @@ import org.apache.curator.framework.recipes.locks.InterProcessLock;
 import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -335,8 +335,8 @@ public final class CuratorZookeeperRepository implements ClusterPersistRepositor
     }
     
     @Override
-    public void watchSessionConnection(final InstanceDefinition instanceDefinition) {
-        client.getConnectionStateListenable().addListener(new SessionConnectionListener(instanceDefinition, this));
+    public void watchSessionConnection(final InstanceContext instanceContext) {
+        client.getConnectionStateListenable().addListener(new SessionConnectionListener(instanceContext, this));
     }
     
     @Override
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionListener.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-reposit [...]
index f21e62dabc4..ca0b1aa3e0a 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionListener.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionListener.java
@@ -23,7 +23,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.zookeeper.handler.CuratorZookeeperExceptionHandler;
@@ -39,7 +39,7 @@ public final class SessionConnectionListener implements ConnectionStateListener
     
     private static final int RECONNECT_INTERVAL_SECONDS = 5;
     
-    private final InstanceDefinition instanceDefinition;
+    private final InstanceContext instanceContext;
     
     private final ClusterPersistRepository repository;
     
@@ -50,14 +50,16 @@ public final class SessionConnectionListener implements ConnectionStateListener
             do {
                 reRegistered = reRegister(client);
             } while (!reRegistered);
-            log.debug("instance re-register success instance id: {}", instanceDefinition.getInstanceId().getId());
+            log.debug("instance re-register success instance id: {}", instanceContext.getInstance().getCurrentInstanceId());
         }
     }
     
     private boolean reRegister(final CuratorFramework client) {
         try {
             if (client.getZookeeperClient().blockUntilConnectedOrTimedOut()) {
-                repository.persistEphemeral(ComputeNode.getOnlineInstanceNodePath(instanceDefinition.getInstanceId().getId(), instanceDefinition.getInstanceType()), "");
+                instanceContext.initLockContext();
+                repository.persistEphemeral(ComputeNode.getOnlineInstanceNodePath(instanceContext.getInstance().getCurrentInstanceId(),
+                        instanceContext.getInstance().getInstanceDefinition().getInstanceType()), "");
                 return true;
             }
             sleepInterval();
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/MemoryLockContext.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/MemoryLockContext.java
index 8f1823833cb..beec557f64e 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/MemoryLockContext.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/MemoryLockContext.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.mode.manager.memory.lock;
 
 import com.google.common.base.Preconditions;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.infra.lock.LockContext;
 import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
 
@@ -33,6 +34,11 @@ public final class MemoryLockContext implements LockContext {
     
     private final Map<String, ShardingSphereLock> locks = new ConcurrentHashMap<>();
     
+    @Override
+    public void initLockState(final InstanceContext instanceContext) {
+        throw new UnsupportedOperationException("Lock context init lock state not supported in memory mode");
+    }
+    
     @Override
     public ShardingSphereLock getOrCreateSchemaLock(final String schemaName) {
         Preconditions.checkNotNull(schemaName, "Get or create schema lock args schema name can not be null.");
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/StandaloneLockContext.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/StandaloneLockContext.java
index 1dfe4169134..1bdc83a543c 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/StandaloneLockContext.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/StandaloneLockContext.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.mode.manager.standalone.lock;
 
 import com.google.common.base.Preconditions;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.infra.lock.LockContext;
 import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
 
@@ -33,6 +34,11 @@ public final class StandaloneLockContext implements LockContext {
     
     private final Map<String, ShardingSphereLock> locks = new ConcurrentHashMap<>();
     
+    @Override
+    public void initLockState(final InstanceContext instanceContext) {
+        throw new UnsupportedOperationException("Lock context init lock state not supported in standalone mode");
+    }
+    
     @Override
     public ShardingSphereLock getOrCreateSchemaLock(final String schemaName) {
         Preconditions.checkNotNull(schemaName, "Get or create schema lock args schema name can not be null.");
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/FixtureClusterPersistRepository.java b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/FixtureClusterPersistRepository.java
index 6142fd61ba0..ef6cb5de839 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/FixtureClusterPersistRepository.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/FixtureClusterPersistRepository.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.proxy.fixture;
 
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
@@ -79,7 +79,7 @@ public final class FixtureClusterPersistRepository implements ClusterPersistRepo
     }
     
     @Override
-    public void watchSessionConnection(final InstanceDefinition instanceDefinition) {
+    public void watchSessionConnection(final InstanceContext instanceContext) {
     }
     
     @Override
diff --git a/shardingsphere-test/shardingsphere-integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java b/shardingsphere-test/shardingsphere-integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
index 7339866d549..e1d213791a8 100644
--- a/shardingsphere-test/shardingsphere-integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
+++ b/shardingsphere-test/shardingsphere-integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.driver.fixture;
 
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -81,7 +81,7 @@ public final class TestClusterPersistRepository implements ClusterPersistReposit
     }
     
     @Override
-    public void watchSessionConnection(final InstanceDefinition instanceDefinition) {
+    public void watchSessionConnection(final InstanceContext instanceContext) {
     }
     
     @Override
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
index 6ba5bd8d590..bb1ad78d0c7 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
@@ -30,8 +30,6 @@ import org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel.MemoryPip
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
 import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
-import org.apache.shardingsphere.infra.instance.definition.InstanceType;
 import org.apache.shardingsphere.infra.metadata.schema.model.ColumnMetaData;
 import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
 import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -65,7 +63,7 @@ public final class PipelineContextUtil {
             
             @Override
             protected ClusterPersistRepository initialize() {
-                return ClusterPersistRepositoryFactory.newInstance(PERSIST_REPOSITORY_CONFIG, new InstanceDefinition(InstanceType.PROXY));
+                return ClusterPersistRepositoryFactory.newInstance(PERSIST_REPOSITORY_CONFIG);
             }
         };
     }