You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/12/22 04:39:27 UTC
[shardingsphere] branch master updated: Refactor global lock (#8706)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new aeef163 Refactor global lock (#8706)
aeef163 is described below
commit aeef1633bfece1739fd3f76abe4ce534e52c80ee
Author: Haoran Meng <me...@gmail.com>
AuthorDate: Tue Dec 22 12:39:13 2020 +0800
Refactor global lock (#8706)
* Refactor governance lock for code review
* Refactor global lock
* Add final for the variable of RegistryCenter
---
.../metadata/GovernanceMetaDataContexts.java | 5 +-
.../metadata/GovernanceMetaDataContextsTest.java | 2 -
.../core/event/model/lock/LockNoticeEvent.java | 32 -----
.../governance/core/facade/GovernanceFacade.java | 7 --
.../facade/listener/GovernanceListenerManager.java | 5 -
.../governance/core/lock/LockCenter.java | 137 ---------------------
.../core/lock/listener/LockListenerManager.java | 40 ------
.../core/lock/strategy/GovernanceLockStrategy.java | 52 +++++++-
.../governance/core/registry/RegistryCenter.java | 63 +++++++++-
.../listener/GlobalLockChangedListener.java | 2 +-
.../registry/listener/RegistryListenerManager.java | 4 +
.../core/state/GovernedStateContext.java | 76 ------------
.../governance/core/lock/LockCenterTest.java | 76 ------------
.../lock/listener/LockListenerManagerTest.java | 55 ---------
.../lock/strategy/GovernanceLockStrategyTest.java | 14 ++-
.../core/registry/RegistryCenterTest.java | 14 +++
.../listener/GlobalLockChangedListenerTest.java | 2 +-
.../shardingsphere/infra/lock/LockContext.java | 13 +-
.../shardingsphere/infra/lock/LockStrategy.java | 5 +-
.../infra/lock/StandardLockStrategy.java | 4 +-
.../shardingsphere/infra/lock/LockContextTest.java | 9 +-
.../infra/lock/StandardLockStrategyTest.java | 6 +-
.../infra/lock/fixture/FixtureLockStrategy.java | 4 +-
.../communication/DatabaseCommunicationEngine.java | 3 +-
.../impl/AbstractBootstrapInitializer.java | 13 ++
.../impl/GovernanceBootstrapInitializer.java | 5 +-
.../impl/StandardBootstrapInitializer.java | 2 +-
27 files changed, 183 insertions(+), 467 deletions(-)
diff --git a/shardingsphere-governance/shardingsphere-governance-context/src/main/java/org/apache/shardingsphere/governance/context/metadata/GovernanceMetaDataContexts.java b/shardingsphere-governance/shardingsphere-governance-context/src/main/java/org/apache/shardingsphere/governance/context/metadata/GovernanceMetaDataContexts.java
index 8c9c95a..2af684c 100644
--- a/shardingsphere-governance/shardingsphere-governance-context/src/main/java/org/apache/shardingsphere/governance/context/metadata/GovernanceMetaDataContexts.java
+++ b/shardingsphere-governance/shardingsphere-governance-context/src/main/java/org/apache/shardingsphere/governance/context/metadata/GovernanceMetaDataContexts.java
@@ -30,7 +30,6 @@ import org.apache.shardingsphere.governance.core.event.model.schema.SchemaChange
import org.apache.shardingsphere.governance.core.facade.GovernanceFacade;
import org.apache.shardingsphere.governance.core.registry.event.DisabledStateChangedEvent;
import org.apache.shardingsphere.governance.core.registry.schema.GovernanceSchema;
-import org.apache.shardingsphere.governance.core.state.GovernedStateContext;
import org.apache.shardingsphere.infra.auth.Authentication;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
@@ -48,6 +47,7 @@ import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.event.impl.DataSourceNameDisabledEvent;
import org.apache.shardingsphere.infra.rule.type.StatusContainedRule;
import org.apache.shardingsphere.infra.state.StateContext;
+import org.apache.shardingsphere.infra.state.StateEvent;
import org.apache.shardingsphere.infra.state.StateType;
import javax.sql.DataSource;
@@ -205,7 +205,8 @@ public final class GovernanceMetaDataContexts implements MetaDataContexts {
metaDataContexts = new StandardMetaDataContexts(newMetaDataMap, metaDataContexts.getExecutorEngine(), metaDataContexts.getAuthentication(), metaDataContexts.getProps());
} finally {
if (StateContext.getCurrentState() == StateType.LOCK) {
- GovernedStateContext.unlock();
+ StateContext.switchState(new StateEvent(StateType.LOCK, false));
+ governanceFacade.getRegistryCenter().persistInstanceData("");
}
}
}
diff --git a/shardingsphere-governance/shardingsphere-governance-context/src/test/java/org/apache/shardingsphere/governance/context/metadata/GovernanceMetaDataContextsTest.java b/shardingsphere-governance/shardingsphere-governance-context/src/test/java/org/apache/shardingsphere/governance/context/metadata/GovernanceMetaDataContextsTest.java
index 49cd8ed..8693f50 100644
--- a/shardingsphere-governance/shardingsphere-governance-context/src/test/java/org/apache/shardingsphere/governance/context/metadata/GovernanceMetaDataContextsTest.java
+++ b/shardingsphere-governance/shardingsphere-governance-context/src/test/java/org/apache/shardingsphere/governance/context/metadata/GovernanceMetaDataContextsTest.java
@@ -29,7 +29,6 @@ import org.apache.shardingsphere.governance.core.facade.GovernanceFacade;
import org.apache.shardingsphere.governance.core.registry.RegistryCenter;
import org.apache.shardingsphere.governance.core.registry.event.DisabledStateChangedEvent;
import org.apache.shardingsphere.governance.core.registry.schema.GovernanceSchema;
-import org.apache.shardingsphere.governance.core.state.GovernedStateContext;
import org.apache.shardingsphere.infra.auth.builtin.DefaultAuthentication;
import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
import org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
@@ -104,7 +103,6 @@ public final class GovernanceMetaDataContextsTest {
when(governanceFacade.getConfigCenter()).thenReturn(configCenter);
when(registryCenter.loadDisabledDataSources("schema")).thenReturn(Collections.singletonList("schema.ds_1"));
governanceMetaDataContexts = new GovernanceMetaDataContexts(new StandardMetaDataContexts(createMetaDataMap(), mock(ExecutorEngine.class), authentication, props), governanceFacade);
- GovernedStateContext.startUp();
}
private Map<String, ShardingSphereMetaData> createMetaDataMap() {
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/event/model/lock/LockNoticeEvent.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/event/model/lock/LockNoticeEvent.java
deleted file mode 100644
index 5860a99..0000000
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/event/model/lock/LockNoticeEvent.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.governance.core.event.model.lock;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.governance.core.event.model.GovernanceEvent;
-
-/**
- * Lock notice event.
- */
-@RequiredArgsConstructor
-@Getter
-public final class LockNoticeEvent implements GovernanceEvent {
-
- private final boolean locked;
-}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/facade/GovernanceFacade.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/facade/GovernanceFacade.java
index a23020f..5116f1b 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/facade/GovernanceFacade.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/facade/GovernanceFacade.java
@@ -21,9 +21,7 @@ import lombok.Getter;
import org.apache.shardingsphere.governance.core.config.ConfigCenter;
import org.apache.shardingsphere.governance.core.facade.listener.GovernanceListenerManager;
import org.apache.shardingsphere.governance.core.facade.repository.GovernanceRepositoryFacade;
-import org.apache.shardingsphere.governance.core.lock.LockCenter;
import org.apache.shardingsphere.governance.core.registry.RegistryCenter;
-import org.apache.shardingsphere.governance.core.state.GovernedStateContext;
import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
import org.apache.shardingsphere.infra.auth.builtin.DefaultAuthentication;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
@@ -51,9 +49,6 @@ public final class GovernanceFacade implements AutoCloseable {
private GovernanceListenerManager listenerManager;
- @Getter
- private LockCenter lockCenter = LockCenter.getInstance();
-
/**
* Initialize governance facade.
*
@@ -65,10 +60,8 @@ public final class GovernanceFacade implements AutoCloseable {
repositoryFacade = new GovernanceRepositoryFacade(config);
registryCenter = new RegistryCenter(repositoryFacade.getRegistryRepository());
configCenter = new ConfigCenter(repositoryFacade.getConfigurationRepository());
- lockCenter.init(repositoryFacade.getRegistryRepository(), registryCenter);
listenerManager = new GovernanceListenerManager(repositoryFacade.getRegistryRepository(),
repositoryFacade.getConfigurationRepository(), schemaNames.isEmpty() ? configCenter.getAllSchemaNames() : schemaNames);
- GovernedStateContext.startUp();
}
/**
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/facade/listener/GovernanceListenerManager.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/facade/listener/GovernanceListenerManager.java
index 75eea29..ad03308 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/facade/listener/GovernanceListenerManager.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/facade/listener/GovernanceListenerManager.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.governance.core.facade.listener;
import org.apache.shardingsphere.governance.core.config.listener.ConfigurationListenerManager;
-import org.apache.shardingsphere.governance.core.lock.listener.LockListenerManager;
import org.apache.shardingsphere.governance.core.registry.listener.RegistryListenerManager;
import org.apache.shardingsphere.governance.repository.api.ConfigurationRepository;
import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
@@ -34,12 +33,9 @@ public final class GovernanceListenerManager {
private final RegistryListenerManager registryListenerManager;
- private final LockListenerManager lockListenerManager;
-
public GovernanceListenerManager(final RegistryRepository registryRepository, final ConfigurationRepository configurationRepository, final Collection<String> schemaNames) {
configurationListenerManager = new ConfigurationListenerManager(configurationRepository, schemaNames);
registryListenerManager = new RegistryListenerManager(registryRepository, schemaNames);
- lockListenerManager = new LockListenerManager(registryRepository);
}
/**
@@ -48,6 +44,5 @@ public final class GovernanceListenerManager {
public void init() {
configurationListenerManager.initListeners();
registryListenerManager.initListeners();
- lockListenerManager.initListeners();
}
}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/LockCenter.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/LockCenter.java
deleted file mode 100644
index 1224f7b..0000000
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/LockCenter.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.governance.core.lock;
-
-import com.google.common.eventbus.Subscribe;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.governance.core.event.model.lock.LockNoticeEvent;
-import org.apache.shardingsphere.governance.core.lock.node.LockNode;
-import org.apache.shardingsphere.governance.core.registry.RegistryCenter;
-import org.apache.shardingsphere.governance.core.registry.RegistryCenterNodeStatus;
-import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
-import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Lock center.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class LockCenter {
-
- private static final int CHECK_RETRY_MAXIMUM = 5;
-
- private static final int CHECK_RETRY_INTERVAL_SECONDS = 3;
-
- private static final LockCenter INSTANCE = new LockCenter();
-
- private RegistryRepository registryRepository;
-
- private RegistryCenter registryCenter;
-
- private final LockNode lockNode = new LockNode();
-
- /**
- * Get lock center instance.
- *
- * @return lock center instance
- */
- public static LockCenter getInstance() {
- return INSTANCE;
- }
-
- /**
- * Initialize lock center.
- *
- * @param registryRepository registry repository
- * @param registryCenter registry center
- */
- public void init(final RegistryRepository registryRepository, final RegistryCenter registryCenter) {
- this.registryRepository = registryRepository;
- this.registryCenter = registryCenter;
- this.registryRepository.initLock(lockNode.getGlobalLockNodePath());
- ShardingSphereEventBus.getInstance().register(this);
- }
-
- /**
- * Try to get global lock.
- *
- * @param timeout the maximum time in milliseconds to acquire lock
- * @return true if get the lock, false if not
- */
- public boolean tryGlobalLock(final Long timeout) {
- return registryRepository.tryLock(timeout, TimeUnit.MILLISECONDS);
- }
-
- /**
- * Release global lock.
- */
- public void releaseGlobalLock() {
- registryRepository.releaseLock();
- registryRepository.delete(lockNode.getGlobalLockNodePath());
- }
-
- /**
- * Check lock state.
- *
- * @return true if all instances were locked, else false
- */
- public boolean checkLock() {
- Collection<String> instanceIds = registryCenter.loadAllInstances();
- if (instanceIds.isEmpty()) {
- return true;
- }
- return checkOrRetry(instanceIds);
- }
-
- private boolean checkOrRetry(final Collection<String> instanceIds) {
- for (int i = 0; i < CHECK_RETRY_MAXIMUM; i++) {
- if (check(instanceIds)) {
- return true;
- }
- try {
- Thread.sleep(CHECK_RETRY_INTERVAL_SECONDS * 1000L);
- // CHECKSTYLE:OFF
- } catch (final InterruptedException ex) {
- // CHECKSTYLE:ON
- }
- }
- return false;
- }
-
- private boolean check(final Collection<String> instanceIds) {
- for (String each : instanceIds) {
- if (!RegistryCenterNodeStatus.LOCKED.toString().equalsIgnoreCase(registryCenter.loadInstanceData(each))) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Notifies the locking state of the current instance.
- *
- * @param event lock notice event
- */
- @Subscribe
- public synchronized void lockNotice(final LockNoticeEvent event) {
- registryCenter.persistInstanceData(event.isLocked() ? RegistryCenterNodeStatus.LOCKED.toString() : "");
- }
-}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/listener/LockListenerManager.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/listener/LockListenerManager.java
deleted file mode 100644
index ef3b87c..0000000
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/listener/LockListenerManager.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.governance.core.lock.listener;
-
-import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
-import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent.Type;
-
-/**
- * Lock listener manager.
- */
-public final class LockListenerManager {
-
- private final GlobalLockChangedListener globalLockChangedListener;
-
- public LockListenerManager(final RegistryRepository registryRepository) {
- globalLockChangedListener = new GlobalLockChangedListener(registryRepository);
- }
-
- /**
- * Initialize all distributed lock listeners.
- */
- public void initListeners() {
- globalLockChangedListener.watch(Type.ADDED);
- }
-}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/strategy/GovernanceLockStrategy.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/strategy/GovernanceLockStrategy.java
index 04fb739..af3a6aa 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/strategy/GovernanceLockStrategy.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/strategy/GovernanceLockStrategy.java
@@ -17,34 +17,74 @@
package org.apache.shardingsphere.governance.core.lock.strategy;
-import org.apache.shardingsphere.governance.core.lock.LockCenter;
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.governance.core.event.model.lock.GlobalLockAddedEvent;
+import org.apache.shardingsphere.governance.core.registry.RegistryCenter;
+import org.apache.shardingsphere.governance.core.registry.RegistryCenterNodeStatus;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.lock.LockStrategy;
import org.apache.shardingsphere.infra.lock.LockStrategyType;
+import org.apache.shardingsphere.infra.state.StateContext;
+import org.apache.shardingsphere.infra.state.StateEvent;
+import org.apache.shardingsphere.infra.state.StateType;
+
+import java.util.concurrent.TimeUnit;
/**
* Governance lock strategy.
*/
public final class GovernanceLockStrategy implements LockStrategy {
- private final LockCenter lockCenter = LockCenter.getInstance();
+ private RegistryCenter registryCenter;
+
+ /**
+ * Init governance lock strategy.
+ *
+ * @param registryCenter registry center
+ */
+ public void init(final RegistryCenter registryCenter) {
+ this.registryCenter = registryCenter;
+ ShardingSphereEventBus.getInstance().register(this);
+ }
@Override
- public boolean tryLock(final Long timeout) {
- return lockCenter.tryGlobalLock(timeout);
+ public boolean tryLock(final long timeout, final TimeUnit timeUnit) {
+ return registryCenter.tryGlobalLock(timeout, timeUnit);
}
@Override
public void releaseLock() {
- lockCenter.releaseGlobalLock();
+ registryCenter.releaseGlobalLock();
}
@Override
public boolean checkLock() {
- return lockCenter.checkLock();
+ return registryCenter.checkLock();
}
@Override
public String getType() {
return LockStrategyType.GOVERNANCE.name();
}
+
+ /**
+ * Switch state.
+ *
+ * @param event state event
+ */
+ @Subscribe
+ public void switchState(final StateEvent event) {
+ StateContext.switchState(event);
+ }
+
+ /**
+ * Lock instance after global lock added.
+ *
+ * @param event global lock added event
+ */
+ @Subscribe
+ public void doLock(final GlobalLockAddedEvent event) {
+ StateContext.switchState(new StateEvent(StateType.LOCK, true));
+ registryCenter.persistInstanceData(RegistryCenterNodeStatus.LOCKED.toString());
+ }
}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
index a79111f..7b0e8d6 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
@@ -18,11 +18,13 @@
package org.apache.shardingsphere.governance.core.registry;
import com.google.common.base.Strings;
-import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.governance.core.lock.node.LockNode;
import org.apache.shardingsphere.governance.core.registry.instance.GovernanceInstance;
import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import java.util.Collection;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@@ -30,16 +32,24 @@ import java.util.stream.Collectors;
*/
public final class RegistryCenter {
+ private static final int CHECK_RETRY_MAXIMUM = 5;
+
+ private static final int CHECK_RETRY_INTERVAL_SECONDS = 3;
+
private final RegistryCenterNode node;
private final RegistryRepository repository;
private final GovernanceInstance instance;
+ private final LockNode lockNode;
+
public RegistryCenter(final RegistryRepository registryRepository) {
node = new RegistryCenterNode();
repository = registryRepository;
instance = GovernanceInstance.getInstance();
+ lockNode = new LockNode();
+ registryRepository.initLock(lockNode.getGlobalLockNodePath());
ShardingSphereEventBus.getInstance().register(this);
}
@@ -112,4 +122,55 @@ public final class RegistryCenter {
private String getDataSourceNodeData(final String schemaName, final String dataSourceName) {
return repository.get(node.getDataSourcePath(schemaName, dataSourceName));
}
+
+ /**
+ * Try to get global lock.
+ *
+ * @param timeout the maximum time in milliseconds to acquire lock
+ * @param timeUnit time unit
+ * @return true if get the lock, false if not
+ */
+ public boolean tryGlobalLock(final long timeout, final TimeUnit timeUnit) {
+ return repository.tryLock(timeout, timeUnit);
+ }
+
+ /**
+ * Release global lock.
+ */
+ public void releaseGlobalLock() {
+ repository.releaseLock();
+ }
+
+ /**
+ * Check lock state.
+ *
+ * @return true if all instances were locked, else false
+ */
+ public boolean checkLock() {
+ return checkOrRetry(this.loadAllInstances());
+ }
+
+ private boolean checkOrRetry(final Collection<String> instanceIds) {
+ for (int i = 0; i < CHECK_RETRY_MAXIMUM; i++) {
+ if (check(instanceIds)) {
+ return true;
+ }
+ try {
+ Thread.sleep(CHECK_RETRY_INTERVAL_SECONDS * 1000L);
+ // CHECKSTYLE:OFF
+ } catch (final InterruptedException ex) {
+ // CHECKSTYLE:ON
+ }
+ }
+ return false;
+ }
+
+ private boolean check(final Collection<String> instanceIds) {
+ for (String each : instanceIds) {
+ if (!RegistryCenterNodeStatus.LOCKED.toString().equalsIgnoreCase(this.loadInstanceData(each))) {
+ return false;
+ }
+ }
+ return true;
+ }
}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/listener/GlobalLockChangedListener.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/listener/GlobalLockChangedListener.java
similarity index 96%
rename from shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/listener/GlobalLockChangedListener.java
rename to shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/listener/GlobalLockChangedListener.java
index fedacf7..4fad8eb 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/listener/GlobalLockChangedListener.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/listener/GlobalLockChangedListener.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.governance.core.lock.listener;
+package org.apache.shardingsphere.governance.core.registry.listener;
import org.apache.shardingsphere.governance.core.event.listener.PostGovernanceRepositoryEventListener;
import org.apache.shardingsphere.governance.core.event.model.GovernanceEvent;
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/listener/RegistryListenerManager.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/listener/RegistryListenerManager.java
index 5033679..4172580 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/listener/RegistryListenerManager.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/listener/RegistryListenerManager.java
@@ -31,9 +31,12 @@ public final class RegistryListenerManager {
private final DataSourceStateChangedListener dataSourceStateChangedListener;
+ private final GlobalLockChangedListener globalLockChangedListener;
+
public RegistryListenerManager(final RegistryRepository registryRepository, final Collection<String> schemaNames) {
terminalStateChangedListener = new TerminalStateChangedListener(registryRepository);
dataSourceStateChangedListener = new DataSourceStateChangedListener(registryRepository, schemaNames);
+ globalLockChangedListener = new GlobalLockChangedListener(registryRepository);
}
/**
@@ -42,5 +45,6 @@ public final class RegistryListenerManager {
public void initListeners() {
terminalStateChangedListener.watch(Type.UPDATED);
dataSourceStateChangedListener.watch(Type.UPDATED, Type.DELETED, Type.ADDED);
+ globalLockChangedListener.watch(Type.ADDED);
}
}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/state/GovernedStateContext.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/state/GovernedStateContext.java
deleted file mode 100644
index 19c357b..0000000
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/state/GovernedStateContext.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.governance.core.state;
-
-import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.governance.core.event.model.lock.GlobalLockAddedEvent;
-import org.apache.shardingsphere.governance.core.event.model.lock.LockNoticeEvent;
-import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import org.apache.shardingsphere.infra.state.StateContext;
-import org.apache.shardingsphere.infra.state.StateEvent;
-import org.apache.shardingsphere.infra.state.StateType;
-
-import java.util.Optional;
-
-/**
- * Governed state machine.
- */
-public final class GovernedStateContext {
-
- /**
- * Start up governed state machine.
- */
- public static void startUp() {
- ShardingSphereEventBus.getInstance().register(new GovernedStateContext());
- }
-
- /**
- * Switch state.
- *
- * @param event state event
- */
- @Subscribe
- public void switchState(final StateEvent event) {
- StateContext.switchState(event);
- }
-
- /**
- * Lock instance after global lock added.
- *
- * @param event global lock added event
- */
- @Subscribe
- public void lock(final GlobalLockAddedEvent event) {
- if (Optional.of(event).isPresent()) {
- StateContext.switchState(new StateEvent(StateType.LOCK, true));
- notice(true);
- }
- }
-
- /**
- * Unlock instance.
- */
- public static void unlock() {
- StateContext.switchState(new StateEvent(StateType.LOCK, false));
- notice(false);
- }
-
- private static void notice(final boolean locked) {
- ShardingSphereEventBus.getInstance().post(new LockNoticeEvent(locked));
- }
-}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/LockCenterTest.java b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/LockCenterTest.java
deleted file mode 100644
index 4189a28..0000000
--- a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/LockCenterTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.governance.core.lock;
-
-import org.apache.shardingsphere.governance.core.event.model.lock.LockNoticeEvent;
-import org.apache.shardingsphere.governance.core.lock.node.LockNode;
-import org.apache.shardingsphere.governance.core.registry.RegistryCenter;
-import org.apache.shardingsphere.governance.core.registry.RegistryCenterNodeStatus;
-import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.verify;
-
-@RunWith(MockitoJUnitRunner.class)
-public final class LockCenterTest {
-
- @Mock
- private RegistryRepository registryRepository;
-
- @Mock
- private RegistryCenter registryCenter;
-
- private LockCenter lockCenter = LockCenter.getInstance();
-
- @Before
- public void setUp() {
- lockCenter.init(registryRepository, registryCenter);
- }
-
- @Test
- public void assertTryGlobalLock() {
- lockCenter.tryGlobalLock(50L);
- verify(registryRepository).tryLock(eq(50L), eq(TimeUnit.MILLISECONDS));
- }
-
- @Test
- public void assertReleaseGlobalLock() {
- lockCenter.releaseGlobalLock();
- verify(registryRepository).releaseLock();
- verify(registryRepository).delete(eq(new LockNode().getGlobalLockNodePath()));
- }
-
- @Test
- public void assertLockedLockNotice() {
- lockCenter.lockNotice(new LockNoticeEvent(true));
- verify(registryCenter).persistInstanceData(RegistryCenterNodeStatus.LOCKED.toString());
- }
-
- @Test
- public void assertUnLockedLockNotice() {
- lockCenter.lockNotice(new LockNoticeEvent(false));
- verify(registryCenter).persistInstanceData("");
- }
-}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/listener/LockListenerManagerTest.java b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/listener/LockListenerManagerTest.java
deleted file mode 100644
index 5397c99..0000000
--- a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/listener/LockListenerManagerTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.governance.core.lock.listener;
-
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
-import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent.Type;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.lang.reflect.Field;
-
-import static org.mockito.Mockito.verify;
-
-@RunWith(MockitoJUnitRunner.class)
-public final class LockListenerManagerTest {
-
- @Mock
- private RegistryRepository registryRepository;
-
- @Mock
- private GlobalLockChangedListener globalLockChangedListener;
-
- @Test
- public void assertInitListeners() {
- LockListenerManager actual = new LockListenerManager(registryRepository);
- setField(actual, "globalLockChangedListener", globalLockChangedListener);
- actual.initListeners();
- verify(globalLockChangedListener).watch(Type.ADDED);
- }
-
- @SneakyThrows(ReflectiveOperationException.class)
- private static void setField(final Object target, final String fieldName, final Object fieldValue) {
- Field field = target.getClass().getDeclaredField(fieldName);
- field.setAccessible(true);
- field.set(target, fieldValue);
- }
-}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/strategy/GovernanceLockStrategyTest.java b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/strategy/GovernanceLockStrategyTest.java
index 7bfb7a6..a25925f 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/strategy/GovernanceLockStrategyTest.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/strategy/GovernanceLockStrategyTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.governance.core.lock.strategy;
-import org.apache.shardingsphere.governance.core.lock.LockCenter;
+import org.apache.shardingsphere.governance.core.registry.RegistryCenter;
import org.apache.shardingsphere.governance.core.registry.util.FieldUtil;
import org.apache.shardingsphere.infra.lock.LockStrategy;
import org.junit.Before;
@@ -26,6 +26,8 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import java.util.concurrent.TimeUnit;
+
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
@@ -33,25 +35,25 @@ import static org.mockito.Mockito.verify;
public final class GovernanceLockStrategyTest {
@Mock
- private LockCenter lockCenter;
+ private RegistryCenter registryCenter;
private LockStrategy lockStrategy;
@Before
public void setUp() {
lockStrategy = new GovernanceLockStrategy();
- FieldUtil.setField(lockStrategy, "lockCenter", lockCenter);
+ FieldUtil.setField(lockStrategy, "registryCenter", registryCenter);
}
@Test
public void assertTryLock() {
- lockStrategy.tryLock(50L);
- verify(lockCenter).tryGlobalLock(eq(50L));
+ lockStrategy.tryLock(50L, TimeUnit.MILLISECONDS);
+ verify(registryCenter).tryGlobalLock(eq(50L), eq(TimeUnit.MILLISECONDS));
}
@Test
public void assertReleaseLock() {
lockStrategy.releaseLock();
- verify(lockCenter).releaseGlobalLock();
+ verify(registryCenter).releaseGlobalLock();
}
}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterTest.java b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterTest.java
index ce9aa65..ae9b92f 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterTest.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterTest.java
@@ -27,8 +27,10 @@ import org.mockito.junit.MockitoJUnitRunner;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -80,4 +82,16 @@ public final class RegistryCenterTest {
verify(registryRepository).getChildrenKeys(anyString());
verify(registryRepository).get(anyString());
}
+
+ @Test
+ public void assertTryGlobalLock() {
+ registryCenter.tryGlobalLock(50L, TimeUnit.MILLISECONDS);
+ verify(registryRepository).tryLock(eq(50L), eq(TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void assertReleaseGlobalLock() {
+ registryCenter.releaseGlobalLock();
+ verify(registryRepository).releaseLock();
+ }
}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/listener/GlobalLockChangedListenerTest.java b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/listener/GlobalLockChangedListenerTest.java
similarity index 96%
rename from shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/listener/GlobalLockChangedListenerTest.java
rename to shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/listener/GlobalLockChangedListenerTest.java
index de90e1a..889115a 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/listener/GlobalLockChangedListenerTest.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/listener/GlobalLockChangedListenerTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.governance.core.lock.listener;
+package org.apache.shardingsphere.governance.core.registry.listener;
import org.apache.shardingsphere.governance.core.event.model.GovernanceEvent;
import org.apache.shardingsphere.governance.core.event.model.lock.GlobalLockAddedEvent;
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockContext.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockContext.java
index 16a0f9b..1846c2b 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockContext.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockContext.java
@@ -19,10 +19,7 @@ package org.apache.shardingsphere.infra.lock;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.infra.spi.typed.TypedSPIRegistry;
-import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
@@ -41,17 +38,13 @@ public final class LockContext {
private static final Condition CONDITION = LOCK.newCondition();
- static {
- ShardingSphereServiceLoader.register(LockStrategy.class);
- }
-
/**
* Init lock strategy.
*
- * @param lockStrategyType lock strategy type
+ * @param lockStrategy lock strategy
*/
- public static void init(final LockStrategyType lockStrategyType) {
- LOCK_STRATEGY.set(TypedSPIRegistry.getRegisteredService(LockStrategy.class, lockStrategyType.name(), new Properties()));
+ public static void init(final LockStrategy lockStrategy) {
+ LOCK_STRATEGY.set(lockStrategy);
}
/**
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockStrategy.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockStrategy.java
index 474221b..9011b0c 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockStrategy.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockStrategy.java
@@ -19,6 +19,8 @@ package org.apache.shardingsphere.infra.lock;
import org.apache.shardingsphere.infra.spi.typed.TypedSPI;
+import java.util.concurrent.TimeUnit;
+
/**
* Lock strategy.
*/
@@ -28,9 +30,10 @@ public interface LockStrategy extends TypedSPI {
* Try to get lock.
*
* @param timeout the maximum time in milliseconds to acquire lock
+ * @param timeUnit time unit
* @return true if get the lock, false if not
*/
- boolean tryLock(Long timeout);
+ boolean tryLock(long timeout, TimeUnit timeUnit);
/**
* Release lock.
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/StandardLockStrategy.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/StandardLockStrategy.java
index d962058..b3a5019 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/StandardLockStrategy.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/StandardLockStrategy.java
@@ -32,10 +32,10 @@ public final class StandardLockStrategy implements LockStrategy {
private final ReentrantLock lock = new ReentrantLock();
@Override
- public boolean tryLock(final Long timeout) {
+ public boolean tryLock(final long timeout, final TimeUnit timeUnit) {
boolean result = false;
try {
- result = lock.tryLock(timeout, TimeUnit.MILLISECONDS);
+ result = lock.tryLock(timeout, timeUnit);
// CHECKSTYLE:OFF
} catch (final InterruptedException e) {
// CHECKSTYLE:ON
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/lock/LockContextTest.java b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/lock/LockContextTest.java
index 8b9f871..fce1e2c 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/lock/LockContextTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/lock/LockContextTest.java
@@ -17,9 +17,12 @@
package org.apache.shardingsphere.infra.lock;
+import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.spi.typed.TypedSPIRegistry;
import org.junit.Before;
import org.junit.Test;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertNotNull;
@@ -27,9 +30,13 @@ import static org.junit.Assert.assertTrue;
public final class LockContextTest {
+ static {
+ ShardingSphereServiceLoader.register(LockStrategy.class);
+ }
+
@Before
public void init() {
- LockContext.init(LockStrategyType.STANDARD);
+ LockContext.init(TypedSPIRegistry.getRegisteredService(LockStrategy.class, LockStrategyType.STANDARD.name(), new Properties()));
}
@Test
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/lock/StandardLockStrategyTest.java b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/lock/StandardLockStrategyTest.java
index 90fa920..c3e3852 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/lock/StandardLockStrategyTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/lock/StandardLockStrategyTest.java
@@ -21,6 +21,8 @@ import org.apache.shardingsphere.infra.state.StateContext;
import org.apache.shardingsphere.infra.state.StateType;
import org.junit.Test;
+import java.util.concurrent.TimeUnit;
+
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -31,14 +33,14 @@ public final class StandardLockStrategyTest {
@Test
public void assertTryLock() {
- assertTrue(lockStrategy.tryLock(50L));
+ assertTrue(lockStrategy.tryLock(50L, TimeUnit.MILLISECONDS));
assertThat(StateContext.getCurrentState(), is(StateType.LOCK));
lockStrategy.releaseLock();
}
@Test
public void assertReleaseLock() {
- assertTrue(lockStrategy.tryLock(50L));
+ assertTrue(lockStrategy.tryLock(50L, TimeUnit.MILLISECONDS));
assertThat(StateContext.getCurrentState(), is(StateType.LOCK));
lockStrategy.releaseLock();
assertThat(StateContext.getCurrentState(), is(StateType.OK));
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/lock/fixture/FixtureLockStrategy.java b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/lock/fixture/FixtureLockStrategy.java
index 92267df..8c8619d 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/lock/fixture/FixtureLockStrategy.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/lock/fixture/FixtureLockStrategy.java
@@ -20,9 +20,11 @@ package org.apache.shardingsphere.infra.lock.fixture;
import org.apache.shardingsphere.infra.lock.LockStrategy;
import org.apache.shardingsphere.infra.lock.LockStrategyType;
+import java.util.concurrent.TimeUnit;
+
public final class FixtureLockStrategy implements LockStrategy {
@Override
- public boolean tryLock(final Long timeout) {
+ public boolean tryLock(final long timeout, final TimeUnit timeUnit) {
return false;
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
index b40f58c..236f86a 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
@@ -60,6 +60,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@@ -119,7 +120,7 @@ public final class DatabaseCommunicationEngine {
private void lockForDDL(final ExecutionContext executionContext, final Long lockTimeoutMilliseconds) {
if (needLock(executionContext)) {
- if (!LockContext.getLockStrategy().tryLock(lockTimeoutMilliseconds)) {
+ if (!LockContext.getLockStrategy().tryLock(lockTimeoutMilliseconds, TimeUnit.MILLISECONDS)) {
throw new LockWaitTimeoutException(lockTimeoutMilliseconds);
}
checkLock(lockTimeoutMilliseconds);
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/AbstractBootstrapInitializer.java b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/AbstractBootstrapInitializer.java
index 622e9bb..f41e040 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/AbstractBootstrapInitializer.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/AbstractBootstrapInitializer.java
@@ -23,7 +23,11 @@ import org.apache.shardingsphere.infra.config.datasource.DataSourceParameter;
import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
import org.apache.shardingsphere.infra.context.metadata.MetaDataContextsBuilder;
+import org.apache.shardingsphere.infra.lock.LockStrategy;
+import org.apache.shardingsphere.infra.lock.LockStrategyType;
import org.apache.shardingsphere.infra.metadata.resource.ShardingSphereResource;
+import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.spi.typed.TypedSPIRegistry;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.datasource.factory.JDBCRawBackendDataSourceFactory;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.config.ProxyConfiguration;
@@ -43,6 +47,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
+import java.util.Properties;
import java.util.stream.Collectors;
/**
@@ -53,6 +58,10 @@ public abstract class AbstractBootstrapInitializer implements BootstrapInitializ
private final ShardingSphereProxy shardingSphereProxy = new ShardingSphereProxy();
+ static {
+ ShardingSphereServiceLoader.register(LockStrategy.class);
+ }
+
@Override
public final void init(final YamlProxyConfiguration yamlConfig, final int port) throws SQLException {
ProxyConfiguration proxyConfig = getProxyConfiguration(yamlConfig);
@@ -118,4 +127,8 @@ public abstract class AbstractBootstrapInitializer implements BootstrapInitializ
protected abstract TransactionContexts decorateTransactionContexts(TransactionContexts transactionContexts, String xaTransactionMangerType);
protected abstract void initLockContext();
+
+ protected LockStrategy loadLockStrategy(final LockStrategyType lockStrategyType) {
+ return TypedSPIRegistry.getRegisteredService(LockStrategy.class, lockStrategyType.name(), new Properties());
+ }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializer.java b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializer.java
index 62087a0..85eba9e 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializer.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializer.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.proxy.initializer.impl;
import org.apache.shardingsphere.governance.context.metadata.GovernanceMetaDataContexts;
import org.apache.shardingsphere.governance.context.transaction.GovernanceTransactionContexts;
import org.apache.shardingsphere.governance.core.facade.GovernanceFacade;
+import org.apache.shardingsphere.governance.core.lock.strategy.GovernanceLockStrategy;
import org.apache.shardingsphere.governance.core.yaml.swapper.GovernanceConfigurationYamlSwapper;
import org.apache.shardingsphere.infra.auth.builtin.DefaultAuthentication;
import org.apache.shardingsphere.infra.auth.builtin.yaml.config.YamlAuthenticationConfiguration;
@@ -126,6 +127,8 @@ public final class GovernanceBootstrapInitializer extends AbstractBootstrapIniti
@Override
protected void initLockContext() {
- LockContext.init(LockStrategyType.GOVERNANCE);
+ GovernanceLockStrategy governanceLockStrategy = (GovernanceLockStrategy) loadLockStrategy(LockStrategyType.GOVERNANCE);
+ governanceLockStrategy.init(governanceFacade.getRegistryCenter());
+ LockContext.init(governanceLockStrategy);
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/StandardBootstrapInitializer.java b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/StandardBootstrapInitializer.java
index 6ecc167..d9fda32 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/StandardBootstrapInitializer.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/StandardBootstrapInitializer.java
@@ -47,6 +47,6 @@ public final class StandardBootstrapInitializer extends AbstractBootstrapInitial
@Override
protected void initLockContext() {
- LockContext.init(LockStrategyType.STANDARD);
+ LockContext.init(loadLockStrategy(LockStrategyType.STANDARD));
}
}