You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/04/07 10:07:38 UTC
[shardingsphere] branch master updated: Re-register synchronization lock state in cluster mode (#16636)
This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5aa2071bcb1 Re-register synchronization lock state in cluster mode (#16636)
5aa2071bcb1 is described below
commit 5aa2071bcb1d850b61b8e2b98ba77f2d12b683ae
Author: gin <ja...@163.com>
AuthorDate: Thu Apr 7 18:07:31 2022 +0800
Re-register synchronization lock state in cluster mode (#16636)
---
.../infra/instance/ComputeNodeInstance.java | 9 ++++
.../infra/instance/InstanceContext.java | 7 +++
.../shardingsphere/infra/lock/LockContext.java | 9 ++++
.../fixture/FixtureClusterPersistRepository.java | 4 +-
.../cluster/ClusterContextManagerBuilder.java | 6 +--
.../coordinator/lock/DistributeLockContext.java | 54 +++++++++++-----------
.../ClusterPersistRepositoryFactoryTest.java | 4 +-
.../fixture/ClusterPersistRepositoryFixture.java | 4 +-
.../lock/DistributeLockContextTest.java | 6 +--
...ProcessListClusterPersistRepositoryFixture.java | 4 +-
.../cluster/ClusterPersistRepository.java | 6 +--
.../cluster/ClusterPersistRepositoryFactory.java | 5 +-
.../repository/cluster/etcd/EtcdRepository.java | 4 +-
.../zookeeper/CuratorZookeeperRepository.java | 6 +--
.../listener/SessionConnectionListener.java | 10 ++--
.../manager/memory/lock/MemoryLockContext.java | 6 +++
.../standalone/lock/StandaloneLockContext.java | 6 +++
.../fixture/FixtureClusterPersistRepository.java | 4 +-
.../fixture/TestClusterPersistRepository.java | 4 +-
.../pipeline/core/util/PipelineContextUtil.java | 4 +-
20 files changed, 97 insertions(+), 65 deletions(-)
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
index 758ad16e83c..806862fc674 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
@@ -66,4 +66,13 @@ public final class ComputeNodeInstance {
public void switchState(final Collection<String> status) {
state.switchState(StateType.CIRCUIT_BREAK, null != status && status.contains(StateType.CIRCUIT_BREAK.name()));
}
+
+ /**
+ * Get current instance id.
+ *
+ * @return current instance id
+ */
+ public String getCurrentInstanceId() {
+ return instanceDefinition.getInstanceId().getId();
+ }
}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
index 188df14c4d7..7d91b536f2f 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
@@ -153,4 +153,11 @@ public final class InstanceContext {
}
return result;
}
+
+ /**
+ * Init lock context.
+ */
+ public void initLockContext() {
+ lockContext.initLockState(this);
+ }
}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockContext.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockContext.java
index b77b778f2d2..49fd7962778 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockContext.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockContext.java
@@ -17,6 +17,8 @@
package org.apache.shardingsphere.infra.lock;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
+
import java.util.Optional;
/**
@@ -24,6 +26,13 @@ import java.util.Optional;
*/
public interface LockContext {
+ /**
+ * Init lock state.
+ *
+ * @param instanceContext instance context
+ */
+ void initLockState(InstanceContext instanceContext);
+
/**
* Get or create schema lock.
*
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-spring/shardingsphere-jdbc-core-spring/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java b/shardingsphere-jdbc/shardingsphere-jdbc-spring/shardingsphere-jdbc-core-spring/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
index dc5b4be8fce..ea346a81882 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-spring/shardingsphere-jdbc-core-spring/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-spring/shardingsphere-jdbc-core-spring/shardingsphere-jdbc-core-spring-namespace/src/test/java/org/apache/shardingsphere/spring/namespace/fixture/FixtureClusterPersistRepository.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.spring.namespace.fixture;
import org.apache.shardingsphere.infra.database.DefaultSchema;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
@@ -81,7 +81,7 @@ public final class FixtureClusterPersistRepository implements ClusterPersistRepo
}
@Override
- public void watchSessionConnection(final InstanceDefinition instanceDefinition) {
+ public void watchSessionConnection(final InstanceContext instanceContext) {
}
@Override
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 7cf9e0c15f2..6da5f3e49c4 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -69,8 +69,7 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
@Override
public ContextManager build(final ContextManagerBuilderParameter parameter) throws SQLException {
ModeScheduleContextFactory.getInstance().init(parameter.getInstanceDefinition().getInstanceId().getId(), parameter.getModeConfig());
- ClusterPersistRepository repository = ClusterPersistRepositoryFactory.newInstance((ClusterPersistRepositoryConfiguration) parameter.getModeConfig().getRepository(),
- parameter.getInstanceDefinition());
+ ClusterPersistRepository repository = ClusterPersistRepositoryFactory.newInstance((ClusterPersistRepositoryConfiguration) parameter.getModeConfig().getRepository());
MetaDataPersistService metaDataPersistService = new MetaDataPersistService(repository);
persistConfigurations(metaDataPersistService, parameter);
RegistryCenter registryCenter = new RegistryCenter(repository);
@@ -159,7 +158,8 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
ClusterWorkerIdGenerator clusterWorkerIdGenerator = new ClusterWorkerIdGenerator(repository, metaDataPersistService, instanceDefinition);
DistributeLockContext distributeLockContext = new DistributeLockContext(new LockRegistryService(repository));
InstanceContext instanceContext = new InstanceContext(computeNodeInstance, clusterWorkerIdGenerator, modeConfiguration, distributeLockContext);
- distributeLockContext.synchronizeGlobalLock(instanceContext);
+ instanceContext.initLockContext();
+ repository.watchSessionConnection(instanceContext);
generateTransactionConfigurationFile(instanceContext, metaDataContexts, transactionProps);
TransactionContexts transactionContexts = new TransactionContextsBuilder(metaDataContexts.getMetaDataMap(), metaDataContexts.getGlobalRuleMetaData().getRules()).build();
ContextManager result = new ContextManager();
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java
index 1bbb78c84cc..b00d776eabb 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContext.java
@@ -46,12 +46,36 @@ public final class DistributeLockContext implements LockContext {
private final Map<String, ShardingSphereGlobalLock> globalLocks = new ConcurrentHashMap<>();
- private final LockRegistryService globalLockService;
+ private final LockRegistryService lockRegistryService;
private volatile ComputeNodeInstance currentInstance;
private volatile Collection<ComputeNodeInstance> computeNodeInstances;
+ @Override
+ public void initLockState(final InstanceContext instanceContext) {
+ register(instanceContext);
+ synchronizeGlobalLock();
+ }
+
+ private void register(final InstanceContext instanceContext) {
+ currentInstance = instanceContext.getInstance();
+ computeNodeInstances = instanceContext.getComputeNodeInstances();
+ ShardingSphereEventBus.getInstance().register(this);
+ }
+
+ private void synchronizeGlobalLock() {
+ Collection<String> allGlobalLock = lockRegistryService.getAllGlobalLock();
+ if (allGlobalLock.isEmpty()) {
+ lockRegistryService.initGlobalLockRoot();
+ return;
+ }
+ for (String each : allGlobalLock) {
+ String[] schemaInstanceId = LockNodeUtil.parseSchemaLockName(each);
+ globalLocks.put(schemaInstanceId[0], crateGlobalLock(schemaInstanceId[1]));
+ }
+ }
+
@Override
public synchronized ShardingSphereLock getOrCreateSchemaLock(final String schemaName) {
Preconditions.checkNotNull(schemaName, "Get or create schema lock args schema name can not be null.");
@@ -65,7 +89,7 @@ public final class DistributeLockContext implements LockContext {
}
private ShardingSphereGlobalLock crateGlobalLock(final String ownerInstanceId) {
- return new ShardingSphereDistributeGlobalLock(currentInstance, ownerInstanceId, globalLockService, computeNodeInstances);
+ return new ShardingSphereDistributeGlobalLock(currentInstance, ownerInstanceId, lockRegistryService, computeNodeInstances);
}
private String getCurrentInstanceId() {
@@ -81,7 +105,7 @@ public final class DistributeLockContext implements LockContext {
}
@Override
- public synchronized boolean isLockedSchema(final String schemaName) {
+ public boolean isLockedSchema(final String schemaName) {
Preconditions.checkNotNull(schemaName, "Is locked schema args schema name can not be null.");
return getGlobalLock(schemaName).map(shardingSphereGlobalLock -> shardingSphereGlobalLock.isLocked(schemaName)).orElse(false);
}
@@ -90,30 +114,6 @@ public final class DistributeLockContext implements LockContext {
return Optional.ofNullable(globalLocks.get(schemaName));
}
- /**
- * Synchronize global lock.
- *
- * @param instanceContext instance context
- */
- public void synchronizeGlobalLock(final InstanceContext instanceContext) {
- init(instanceContext);
- Collection<String> allGlobalLock = globalLockService.getAllGlobalLock();
- if (allGlobalLock.isEmpty()) {
- globalLockService.initGlobalLockRoot();
- return;
- }
- for (String each : allGlobalLock) {
- String[] schemaInstanceId = LockNodeUtil.parseSchemaLockName(each);
- globalLocks.put(schemaInstanceId[0], crateGlobalLock(schemaInstanceId[1]));
- }
- }
-
- private void init(final InstanceContext instanceContext) {
- currentInstance = instanceContext.getInstance();
- computeNodeInstances = instanceContext.getComputeNodeInstances();
- ShardingSphereEventBus.getInstance().register(this);
- }
-
private boolean isSameInstanceId(final String instanceId) {
return getCurrentInstanceId().equals(instanceId);
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterPersistRepositoryFactoryTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterPersistRepositoryFactoryTest.java
index 0bbe37f57b3..facf7bb5925 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterPersistRepositoryFactoryTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterPersistRepositoryFactoryTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.fixture.ClusterPersistRepositoryFixture;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
@@ -29,7 +28,6 @@ import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
public final class ClusterPersistRepositoryFactoryTest {
@@ -40,7 +38,7 @@ public final class ClusterPersistRepositoryFactoryTest {
@Test
public void assertNewInstance() {
ClusterPersistRepositoryConfiguration config = new ClusterPersistRepositoryConfiguration("TEST", "", "", new Properties());
- ClusterPersistRepository clusterPersistRepository = ClusterPersistRepositoryFactory.newInstance(config, mock(InstanceDefinition.class));
+ ClusterPersistRepository clusterPersistRepository = ClusterPersistRepositoryFactory.newInstance(config);
assertThat(clusterPersistRepository.getType(), is("TEST"));
}
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
index d9bb900bea2..ade212aa3b6 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.fixture;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
@@ -73,7 +73,7 @@ public final class ClusterPersistRepositoryFixture implements ClusterPersistRepo
}
@Override
- public void watchSessionConnection(final InstanceDefinition instanceDefinition) {
+ public void watchSessionConnection(final InstanceContext instanceContext) {
}
@Override
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContextTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContextTest.java
index 98ce914ca67..d06dd7164be 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContextTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/DistributeLockContextTest.java
@@ -41,7 +41,7 @@ public final class DistributeLockContextTest {
DistributeLockContext distributeLockContext = new DistributeLockContext(mock(LockRegistryService.class));
ComputeNodeInstance currentInstance = new ComputeNodeInstance(new InstanceDefinition(InstanceType.PROXY, "127.0.0.1@3307"));
InstanceContext instanceContext = new InstanceContext(currentInstance, mock(WorkerIdGenerator.class), mock(ModeConfiguration.class), distributeLockContext);
- distributeLockContext.synchronizeGlobalLock(instanceContext);
+ instanceContext.initLockContext();
ShardingSphereLock schemaLock = distributeLockContext.getOrCreateSchemaLock("schema");
assertNotNull(schemaLock);
}
@@ -51,7 +51,7 @@ public final class DistributeLockContextTest {
DistributeLockContext distributeLockContext = new DistributeLockContext(mock(LockRegistryService.class));
ComputeNodeInstance currentInstance = new ComputeNodeInstance(new InstanceDefinition(InstanceType.PROXY, "127.0.0.1@3307"));
InstanceContext instanceContext = new InstanceContext(currentInstance, mock(WorkerIdGenerator.class), mock(ModeConfiguration.class), distributeLockContext);
- distributeLockContext.synchronizeGlobalLock(instanceContext);
+ instanceContext.initLockContext();
distributeLockContext.getOrCreateSchemaLock("schema");
Optional<ShardingSphereLock> schemaLock = distributeLockContext.getSchemaLock("schema");
assertTrue(schemaLock.isPresent());
@@ -63,7 +63,7 @@ public final class DistributeLockContextTest {
DistributeLockContext distributeLockContext = new DistributeLockContext(mock(LockRegistryService.class));
ComputeNodeInstance currentInstance = new ComputeNodeInstance(new InstanceDefinition(InstanceType.PROXY, "127.0.0.1@3307"));
InstanceContext instanceContext = new InstanceContext(currentInstance, mock(WorkerIdGenerator.class), mock(ModeConfiguration.class), distributeLockContext);
- distributeLockContext.synchronizeGlobalLock(instanceContext);
+ instanceContext.initLockContext();
distributeLockContext.getOrCreateSchemaLock("schema");
assertFalse(distributeLockContext.isLockedSchema("schema"));
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFix [...]
index a8feb6870fd..ab6b2128933 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -80,7 +80,7 @@ public final class ProcessListClusterPersistRepositoryFixture implements Cluster
}
@Override
- public void watchSessionConnection(final InstanceDefinition instanceDefinition) {
+ public void watchSessionConnection(final InstanceContext instanceContext) {
}
@Override
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPer [...]
index 6868da2b9b5..25b5f9d7a28 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.mode.repository.cluster;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.mode.persist.PersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
@@ -80,7 +80,7 @@ public interface ClusterPersistRepository extends PersistRepository {
/**
* Watch session connection.
*
- * @param instanceDefinition instance definition
+ * @param instanceContext instance context
*/
- void watchSessionConnection(InstanceDefinition instanceDefinition);
+ void watchSessionConnection(InstanceContext instanceContext);
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepositoryFactory.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/Clu [...]
index 8c1453e9fbc..78e6b6bc2fe 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepositoryFactory.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepositoryFactory.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.mode.repository.cluster;
import com.google.common.base.Preconditions;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.spi.type.typed.TypedSPIRegistry;
@@ -38,14 +37,12 @@ public final class ClusterPersistRepositoryFactory {
* Create new instance of cluster persist repository.
*
* @param config persist repository configuration
- * @param instanceDefinition instance definition
* @return new instance of cluster persist repository
*/
- public static ClusterPersistRepository newInstance(final ClusterPersistRepositoryConfiguration config, final InstanceDefinition instanceDefinition) {
+ public static ClusterPersistRepository newInstance(final ClusterPersistRepositoryConfiguration config) {
Preconditions.checkNotNull(config, "Cluster persist repository configuration cannot be null.");
ClusterPersistRepository result = TypedSPIRegistry.getRegisteredService(ClusterPersistRepository.class, config.getType(), config.getProps());
result.init(config);
- result.watchSessionConnection(instanceDefinition);
return result;
}
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mo [...]
index 6ac45201dd5..47a53ad56d8 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
@@ -33,7 +33,7 @@ import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdProperties;
import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdPropertyKey;
@@ -173,7 +173,7 @@ public final class EtcdRepository implements ClusterPersistRepository {
}
@Override
- public void watchSessionConnection(final InstanceDefinition instanceDefinition) {
+ public void watchSessionConnection(final InstanceContext instanceContext) {
}
@Override
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-prov [...]
index 53dac57bd8a..61ab8d042d4 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
@@ -31,7 +31,7 @@ import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -335,8 +335,8 @@ public final class CuratorZookeeperRepository implements ClusterPersistRepositor
}
@Override
- public void watchSessionConnection(final InstanceDefinition instanceDefinition) {
- client.getConnectionStateListenable().addListener(new SessionConnectionListener(instanceDefinition, this));
+ public void watchSessionConnection(final InstanceContext instanceContext) {
+ client.getConnectionStateListenable().addListener(new SessionConnectionListener(instanceContext, this));
}
@Override
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionListener.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-reposit [...]
index f21e62dabc4..ca0b1aa3e0a 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionListener.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionListener.java
@@ -23,7 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.zookeeper.handler.CuratorZookeeperExceptionHandler;
@@ -39,7 +39,7 @@ public final class SessionConnectionListener implements ConnectionStateListener
private static final int RECONNECT_INTERVAL_SECONDS = 5;
- private final InstanceDefinition instanceDefinition;
+ private final InstanceContext instanceContext;
private final ClusterPersistRepository repository;
@@ -50,14 +50,16 @@ public final class SessionConnectionListener implements ConnectionStateListener
do {
reRegistered = reRegister(client);
} while (!reRegistered);
- log.debug("instance re-register success instance id: {}", instanceDefinition.getInstanceId().getId());
+ log.debug("instance re-register success instance id: {}", instanceContext.getInstance().getCurrentInstanceId());
}
}
private boolean reRegister(final CuratorFramework client) {
try {
if (client.getZookeeperClient().blockUntilConnectedOrTimedOut()) {
- repository.persistEphemeral(ComputeNode.getOnlineInstanceNodePath(instanceDefinition.getInstanceId().getId(), instanceDefinition.getInstanceType()), "");
+ instanceContext.initLockContext();
+ repository.persistEphemeral(ComputeNode.getOnlineInstanceNodePath(instanceContext.getInstance().getCurrentInstanceId(),
+ instanceContext.getInstance().getInstanceDefinition().getInstanceType()), "");
return true;
}
sleepInterval();
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/MemoryLockContext.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/MemoryLockContext.java
index 8f1823833cb..beec557f64e 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/MemoryLockContext.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/MemoryLockContext.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.mode.manager.memory.lock;
import com.google.common.base.Preconditions;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
@@ -33,6 +34,11 @@ public final class MemoryLockContext implements LockContext {
private final Map<String, ShardingSphereLock> locks = new ConcurrentHashMap<>();
+ @Override
+ public void initLockState(final InstanceContext instanceContext) {
+ throw new UnsupportedOperationException("Lock context init lock state not supported in memory mode");
+ }
+
@Override
public ShardingSphereLock getOrCreateSchemaLock(final String schemaName) {
Preconditions.checkNotNull(schemaName, "Get or create schema lock args schema name can not be null.");
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/StandaloneLockContext.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/StandaloneLockContext.java
index 1dfe4169134..1bdc83a543c 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/StandaloneLockContext.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/StandaloneLockContext.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.mode.manager.standalone.lock;
import com.google.common.base.Preconditions;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
@@ -33,6 +34,11 @@ public final class StandaloneLockContext implements LockContext {
private final Map<String, ShardingSphereLock> locks = new ConcurrentHashMap<>();
+ @Override
+ public void initLockState(final InstanceContext instanceContext) {
+ throw new UnsupportedOperationException("Lock context init lock state not supported in standalone mode");
+ }
+
@Override
public ShardingSphereLock getOrCreateSchemaLock(final String schemaName) {
Preconditions.checkNotNull(schemaName, "Get or create schema lock args schema name can not be null.");
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/FixtureClusterPersistRepository.java b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/FixtureClusterPersistRepository.java
index 6142fd61ba0..ef6cb5de839 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/FixtureClusterPersistRepository.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/FixtureClusterPersistRepository.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.proxy.fixture;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
@@ -79,7 +79,7 @@ public final class FixtureClusterPersistRepository implements ClusterPersistRepo
}
@Override
- public void watchSessionConnection(final InstanceDefinition instanceDefinition) {
+ public void watchSessionConnection(final InstanceContext instanceContext) {
}
@Override
diff --git a/shardingsphere-test/shardingsphere-integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java b/shardingsphere-test/shardingsphere-integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
index 7339866d549..e1d213791a8 100644
--- a/shardingsphere-test/shardingsphere-integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
+++ b/shardingsphere-test/shardingsphere-integration-driver-test/src/test/java/org/apache/shardingsphere/driver/fixture/TestClusterPersistRepository.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.driver.fixture;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -81,7 +81,7 @@ public final class TestClusterPersistRepository implements ClusterPersistReposit
}
@Override
- public void watchSessionConnection(final InstanceDefinition instanceDefinition) {
+ public void watchSessionConnection(final InstanceContext instanceContext) {
}
@Override
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
index 6ba5bd8d590..bb1ad78d0c7 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
@@ -30,8 +30,6 @@ import org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel.MemoryPip
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
-import org.apache.shardingsphere.infra.instance.definition.InstanceType;
import org.apache.shardingsphere.infra.metadata.schema.model.ColumnMetaData;
import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -65,7 +63,7 @@ public final class PipelineContextUtil {
@Override
protected ClusterPersistRepository initialize() {
- return ClusterPersistRepositoryFactory.newInstance(PERSIST_REPOSITORY_CONFIG, new InstanceDefinition(InstanceType.PROXY));
+ return ClusterPersistRepositoryFactory.newInstance(PERSIST_REPOSITORY_CONFIG);
}
};
}