You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/05/26 14:06:58 UTC
[shardingsphere] branch master updated: refactor all compute status node to ephemeral (#17980)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new bc1be3f8918 refactor all compute status node to ephemeral (#17980)
bc1be3f8918 is described below
commit bc1be3f891824293c817910c9e5c08ddc7ff7820
Author: Haoran Meng <me...@gmail.com>
AuthorDate: Thu May 26 22:06:46 2022 +0800
refactor all compute status node to ephemeral (#17980)
---
.../infra/instance/InstanceContext.java | 10 ++
.../src/test/resources/logback-test.xml | 2 +-
.../metadata/persist/MetaDataPersistService.java | 19 +--
.../persist/service/ComputeNodePersistService.java | 178 ---------------------
.../persist/MetaDataPersistServiceTest.java | 17 +-
.../service/ComputeNodePersistServiceTest.java | 121 --------------
.../cluster/ClusterContextManagerBuilder.java | 29 ++--
.../ClusterContextManagerCoordinator.java | 5 +-
.../cluster/coordinator/RegistryCenter.java | 10 +-
.../compute/service/ComputeNodeStatusService.java | 145 +++++++++++++++++
.../generator/ClusterWorkerIdGenerator.java | 8 +-
.../ClusterContextManagerCoordinatorTest.java | 4 +-
.../service/ComputeNodeStatusServiceTest.java | 86 ++++++++++
.../src/test/resources/logback-test.xml | 2 +-
.../StandaloneContextManagerBuilder.java | 9 +-
.../src/test/resources/logback-test.xml | 2 +-
.../ral/common/updatable/AlterInstanceHandler.java | 16 +-
.../ral/common/updatable/LabelInstanceHandler.java | 17 +-
.../common/updatable/SetInstanceStatusHandler.java | 54 ++-----
.../common/updatable/UnlabelInstanceHandler.java | 22 +--
.../src/test/resources/logback-test.xml | 2 +-
21 files changed, 323 insertions(+), 435 deletions(-)
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
index 3334a29eba5..5d2de4472a5 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
@@ -167,6 +167,16 @@ public final class InstanceContext {
return result;
}
+ /**
+ * Get compute node instance by instance id.
+ *
+ * @param instanceId instance id
+ * @return compute node instance
+ */
+ public Optional<ComputeNodeInstance> getComputeNodeInstanceById(final String instanceId) {
+ return computeNodeInstances.stream().filter(each -> instanceId.equals(each.getCurrentInstanceId())).findFirst();
+ }
+
/**
* Init lock context.
*/
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-spring/shardingsphere-jdbc-core-spring/shardingsphere-jdbc-core-spring-namespace/src/test/resources/logback-test.xml b/shardingsphere-jdbc/shardingsphere-jdbc-spring/shardingsphere-jdbc-core-spring/shardingsphere-jdbc-core-spring-namespace/src/test/resources/logback-test.xml
index a7207c70eba..064918a0829 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-spring/shardingsphere-jdbc-core-spring/shardingsphere-jdbc-core-spring-namespace/src/test/resources/logback-test.xml
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-spring/shardingsphere-jdbc-core-spring/shardingsphere-jdbc-core-spring-namespace/src/test/resources/logback-test.xml
@@ -26,7 +26,7 @@
<appender-ref ref="console" />
</logger>
<logger name="org.apache.shardingsphere.schedule.core.api.ModeScheduleContext" level="error" />
- <logger name="org.apache.shardingsphere.mode.metadata.persist.service.ComputeNodePersistService" level="off" />
+ <logger name="org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService" level="off" />
<root>
<level value="error" />
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
index 8991d107d96..63a27d47825 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
@@ -25,13 +25,12 @@ import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCre
import org.apache.shardingsphere.infra.datasource.pool.destroyer.DataSourcePoolDestroyer;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
-import org.apache.shardingsphere.mode.metadata.persist.service.ComputeNodePersistService;
-import org.apache.shardingsphere.mode.metadata.persist.service.SchemaMetaDataPersistService;
import org.apache.shardingsphere.mode.metadata.persist.service.DatabaseVersionPersistService;
+import org.apache.shardingsphere.mode.metadata.persist.service.SchemaMetaDataPersistService;
import org.apache.shardingsphere.mode.metadata.persist.service.impl.DataSourcePersistService;
+import org.apache.shardingsphere.mode.metadata.persist.service.impl.DatabaseRulePersistService;
import org.apache.shardingsphere.mode.metadata.persist.service.impl.GlobalRulePersistService;
import org.apache.shardingsphere.mode.metadata.persist.service.impl.PropertiesPersistService;
-import org.apache.shardingsphere.mode.metadata.persist.service.impl.DatabaseRulePersistService;
import org.apache.shardingsphere.mode.persist.PersistRepository;
import org.apache.shardingsphere.transaction.config.TransactionRuleConfiguration;
@@ -61,8 +60,6 @@ public final class MetaDataPersistService {
private final PropertiesPersistService propsService;
- private final ComputeNodePersistService computeNodePersistService;
-
private final DatabaseVersionPersistService databaseVersionPersistService;
public MetaDataPersistService(final PersistRepository repository) {
@@ -72,7 +69,6 @@ public final class MetaDataPersistService {
databaseRulePersistService = new DatabaseRulePersistService(repository);
globalRuleService = new GlobalRulePersistService(repository);
propsService = new PropertiesPersistService(repository);
- computeNodePersistService = new ComputeNodePersistService(repository);
databaseVersionPersistService = new DatabaseVersionPersistService(repository);
}
@@ -103,17 +99,6 @@ public final class MetaDataPersistService {
return result;
}
- /**
- * Persist instance labels.
- *
- * @param instanceId instance id
- * @param labels labels
- * @param isOverwrite whether overwrite registry center's configuration if existed
- */
- public void persistInstanceLabels(final String instanceId, final Collection<String> labels, final boolean isOverwrite) {
- computeNodePersistService.persistInstanceLabels(instanceId, labels, isOverwrite);
- }
-
/**
* Get effective data sources.
*
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
deleted file mode 100644
index b89839ebbbc..00000000000
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.metadata.persist.service;
-
-import com.google.common.base.Strings;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
-import org.apache.shardingsphere.infra.instance.definition.InstanceType;
-import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
-import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
-import org.apache.shardingsphere.mode.persist.PersistRepository;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-
-/**
- * Compute node persist service.
- */
-@Slf4j
-@RequiredArgsConstructor
-public final class ComputeNodePersistService {
-
- private final PersistRepository repository;
-
- /**
- * Persist instance labels.
- *
- * @param instanceId instance id
- * @param labels collection of label
- * @param isOverwrite whether overwrite registry center's configuration if existed
- */
- public void persistInstanceLabels(final String instanceId, final Collection<String> labels, final boolean isOverwrite) {
- if (null != labels && !labels.isEmpty() && (isOverwrite || !isExisted(instanceId))) {
- repository.persist(ComputeNode.getInstanceLabelsNodePath(instanceId), YamlEngine.marshal(labels));
- }
- }
-
- /**
- * Delete instance labels.
- *
- * @param instanceId instance id
- */
- public void deleteInstanceLabels(final String instanceId) {
- if (isExisted(instanceId)) {
- repository.delete(ComputeNode.getInstanceLabelsNodePath(instanceId));
- }
- }
-
- private boolean isExisted(final String instanceId) {
- return !Strings.isNullOrEmpty(repository.get(ComputeNode.getInstanceLabelsNodePath(instanceId)));
- }
-
- /**
- * Persist instance worker id.
- *
- * @param instanceId instance id
- * @param workerId worker id
- */
- public void persistInstanceWorkerId(final String instanceId, final Long workerId) {
- repository.persist(ComputeNode.getInstanceWorkerIdNodePath(instanceId), String.valueOf(workerId));
- }
-
- /**
- * Persist instance xa recovery id.
- *
- * @param instanceId instance id
- * @param xaRecoveryId xa recovery id
- */
- public void persistInstanceXaRecoveryId(final String instanceId, final String xaRecoveryId) {
- loadXaRecoveryId(instanceId).ifPresent(each -> repository.delete(ComputeNode.getInstanceXaRecoveryIdNodePath(each, instanceId)));
- repository.persist(ComputeNode.getInstanceXaRecoveryIdNodePath(xaRecoveryId, instanceId), "");
- }
-
- /**
- * Load instance labels.
- *
- * @param instanceId instance id
- * @return labels
- */
- @SuppressWarnings("unchecked")
- public Collection<String> loadInstanceLabels(final String instanceId) {
- String yamlContent = repository.get(ComputeNode.getInstanceLabelsNodePath(instanceId));
- return Strings.isNullOrEmpty(yamlContent) ? new ArrayList<>() : YamlEngine.unmarshal(yamlContent, Collection.class);
- }
-
- /**
- * Load instance status.
- *
- * @param instanceId instance id
- * @return status
- */
- @SuppressWarnings("unchecked")
- public Collection<String> loadInstanceStatus(final String instanceId) {
- String yamlContent = repository.get(ComputeNode.getInstanceStatusNodePath(instanceId));
- return Strings.isNullOrEmpty(yamlContent) ? new ArrayList<>() : YamlEngine.unmarshal(yamlContent, Collection.class);
- }
-
- /**
- * Load instance worker id.
- *
- * @param instanceId instance id
- * @return worker id
- */
- public Optional<Long> loadInstanceWorkerId(final String instanceId) {
- try {
- String workerId = repository.get(ComputeNode.getInstanceWorkerIdNodePath(instanceId));
- return Strings.isNullOrEmpty(workerId) ? Optional.empty() : Optional.of(Long.valueOf(workerId));
- } catch (final NumberFormatException ex) {
- log.error("Invalid worker id for instance: {}", instanceId);
- }
- return Optional.empty();
- }
-
- /**
- * Load instance xa recovery id.
- *
- * @param instanceId instance id
- * @return xa recovery id
- */
- public Optional<String> loadXaRecoveryId(final String instanceId) {
- List<String> xaRecoveryIds = repository.getChildrenKeys(ComputeNode.getXaRecoveryIdNodePath());
- for (String xaRecoveryId : xaRecoveryIds) {
- if (repository.getChildrenKeys(String.join("/", ComputeNode.getXaRecoveryIdNodePath(), xaRecoveryId)).contains(instanceId)) {
- return Optional.of(xaRecoveryId);
- }
- }
- return Optional.empty();
- }
-
- /**
- * Load all compute node instances.
- *
- * @return compute node instances
- */
- public Collection<ComputeNodeInstance> loadAllComputeNodeInstances() {
- Collection<ComputeNodeInstance> result = new ArrayList<>();
- Arrays.stream(InstanceType.values()).forEach(instanceType -> {
- Collection<String> onlineComputeNodes = repository.getChildrenKeys(ComputeNode.getOnlineNodePath(instanceType));
- onlineComputeNodes.forEach(each -> result.add(loadComputeNodeInstance(new InstanceDefinition(instanceType, each))));
- });
- return result;
- }
-
- /**
- * Load compute node instance by instance definition.
- *
- * @param instanceDefinition instance definition
- * @return compute node instance
- */
- public ComputeNodeInstance loadComputeNodeInstance(final InstanceDefinition instanceDefinition) {
- ComputeNodeInstance result = new ComputeNodeInstance(instanceDefinition);
- result.setLabels(loadInstanceLabels(instanceDefinition.getInstanceId().getId()));
- result.switchState(loadInstanceStatus(instanceDefinition.getInstanceId().getId()));
- loadInstanceWorkerId(instanceDefinition.getInstanceId().getId()).ifPresent(result::setWorkerId);
- loadXaRecoveryId(instanceDefinition.getInstanceId().getId()).ifPresent(result::setXaRecoveryId);
- return result;
- }
-}
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistServiceTest.java b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistServiceTest.java
index 5cb4d668d04..6c0d0102ff7 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistServiceTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistServiceTest.java
@@ -26,11 +26,10 @@ import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
import org.apache.shardingsphere.infra.yaml.config.swapper.YamlRuleConfigurationSwapperEngine;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
-import org.apache.shardingsphere.mode.metadata.persist.service.ComputeNodePersistService;
import org.apache.shardingsphere.mode.metadata.persist.service.impl.DataSourcePersistService;
+import org.apache.shardingsphere.mode.metadata.persist.service.impl.DatabaseRulePersistService;
import org.apache.shardingsphere.mode.metadata.persist.service.impl.GlobalRulePersistService;
import org.apache.shardingsphere.mode.metadata.persist.service.impl.PropertiesPersistService;
-import org.apache.shardingsphere.mode.metadata.persist.service.impl.DatabaseRulePersistService;
import org.apache.shardingsphere.mode.persist.PersistRepository;
import org.apache.shardingsphere.test.mock.MockedDataSource;
import org.apache.shardingsphere.transaction.config.TransactionRuleConfiguration;
@@ -52,9 +51,9 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.stream.Collectors;
-import java.util.Properties;
import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@@ -80,9 +79,6 @@ public final class MetaDataPersistServiceTest {
@Mock
private PropertiesPersistService propsService;
- @Mock
- private ComputeNodePersistService computeNodePersistService;
-
private MetaDataPersistService metaDataPersistService;
@Before
@@ -92,7 +88,6 @@ public final class MetaDataPersistServiceTest {
setField("databaseRulePersistService", databaseRulePersistService);
setField("globalRuleService", globalRuleService);
setField("propsService", propsService);
- setField("computeNodePersistService", computeNodePersistService);
}
private void setField(final String name, final Object value) throws ReflectiveOperationException {
@@ -120,12 +115,6 @@ public final class MetaDataPersistServiceTest {
Collectors.toMap(Entry::getKey, entry -> DataSourcePropertiesCreator.create(entry.getValue()), (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
}
- @Test
- public void assertPersistInstanceLabels() {
- metaDataPersistService.persistInstanceLabels("127.0.0.1@3307", Collections.singletonList("foo_label"), false);
- verify(computeNodePersistService).persistInstanceLabels("127.0.0.1@3307", Collections.singletonList("foo_label"), false);
- }
-
private Map<String, DataSource> createDataSourceMap() {
Map<String, DataSource> result = new LinkedHashMap<>(2, 1);
result.put("ds_0", createDataSource("ds_0"));
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistServiceTest.java b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistServiceTest.java
deleted file mode 100644
index eea8cff4d05..00000000000
--- a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistServiceTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.metadata.persist.service;
-
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
-import org.apache.shardingsphere.infra.instance.definition.InstanceType;
-import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
-import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
-import org.apache.shardingsphere.mode.persist.PersistRepository;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.util.Collection;
-import java.util.Collections;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public final class ComputeNodePersistServiceTest {
-
- @Mock
- private PersistRepository repository;
-
- @Test
- public void assertPersistInstanceLabels() {
- ComputeNodePersistService computeNodePersistService = new ComputeNodePersistService(repository);
- InstanceDefinition instanceDefinition = new InstanceDefinition(InstanceType.PROXY, 3307);
- final String instanceId = instanceDefinition.getInstanceId().getId();
- computeNodePersistService.persistInstanceLabels(instanceId, Collections.singletonList("test"), true);
- verify(repository, times(1)).persist(ComputeNode.getInstanceLabelsNodePath(instanceId), YamlEngine.marshal(Collections.singletonList("test")));
- computeNodePersistService.persistInstanceLabels(instanceId, Collections.emptyList(), true);
- verify(repository, times(0)).persist(ComputeNode.getInstanceLabelsNodePath(instanceId), YamlEngine.marshal(Collections.emptyList()));
- }
-
- @Test
- public void assertPersistInstanceWorkerId() {
- InstanceDefinition instanceDefinition = new InstanceDefinition(InstanceType.PROXY, 3307);
- final String instanceId = instanceDefinition.getInstanceId().getId();
- new ComputeNodePersistService(repository).persistInstanceWorkerId(instanceId, 100L);
- verify(repository).persist(ComputeNode.getInstanceWorkerIdNodePath(instanceId), String.valueOf(100L));
- }
-
- @Test
- public void assertPersistInstanceXaRecoveryId() {
- InstanceDefinition instanceDefinition = new InstanceDefinition(InstanceType.PROXY, 3307);
- final String instanceId = instanceDefinition.getInstanceId().getId();
- new ComputeNodePersistService(repository).persistInstanceXaRecoveryId(instanceId, instanceId);
- verify(repository).getChildrenKeys(ComputeNode.getXaRecoveryIdNodePath());
- verify(repository).persist(ComputeNode.getInstanceXaRecoveryIdNodePath(instanceId, instanceId), "");
- }
-
- @Test
- public void assertLoadInstanceLabels() {
- InstanceDefinition instanceDefinition = new InstanceDefinition(InstanceType.PROXY, 3307);
- final String instanceId = instanceDefinition.getInstanceId().getId();
- new ComputeNodePersistService(repository).loadInstanceLabels(instanceId);
- verify(repository).get(ComputeNode.getInstanceLabelsNodePath(instanceId));
- }
-
- @Test
- public void assertLoadInstanceStatus() {
- InstanceDefinition instanceDefinition = new InstanceDefinition(InstanceType.PROXY, 3307);
- final String instanceId = instanceDefinition.getInstanceId().getId();
- new ComputeNodePersistService(repository).loadInstanceStatus(instanceId);
- verify(repository).get(ComputeNode.getInstanceStatusNodePath(instanceId));
- }
-
- @Test
- public void assertLoadInstanceWorkerId() {
- InstanceDefinition instanceDefinition = new InstanceDefinition(InstanceType.PROXY, 3307);
- final String instanceId = instanceDefinition.getInstanceId().getId();
- new ComputeNodePersistService(repository).loadInstanceWorkerId(instanceId);
- verify(repository).get(ComputeNode.getInstanceWorkerIdNodePath(instanceId));
- }
-
- @Test
- public void assertLoadInstanceXaRecoveryId() {
- InstanceDefinition instanceDefinition = new InstanceDefinition(InstanceType.PROXY, 3307);
- final String instanceId = instanceDefinition.getInstanceId().getId();
- new ComputeNodePersistService(repository).loadXaRecoveryId(instanceId);
- verify(repository).getChildrenKeys(ComputeNode.getXaRecoveryIdNodePath());
-
- }
-
- @Test
- public void assertLoadAllComputeNodeInstances() {
- when(repository.getChildrenKeys("/nodes/compute_nodes/online/proxy")).thenReturn(Collections.singletonList("127.0.0.1@3307"));
- when(repository.getChildrenKeys("/nodes/compute_nodes/online/jdbc")).thenReturn(Collections.singletonList("127.0.0.1@3308"));
- Collection<ComputeNodeInstance> actual = new ComputeNodePersistService(repository).loadAllComputeNodeInstances();
- assertThat(actual.size(), is(2));
- }
-
- @Test
- public void assertLoadComputeNodeInstance() {
- InstanceDefinition instanceDefinition = new InstanceDefinition(InstanceType.PROXY, 3307);
- ComputeNodeInstance actual = new ComputeNodePersistService(repository).loadComputeNodeInstance(instanceDefinition);
- assertThat(actual.getInstanceDefinition(), is(instanceDefinition));
- }
-}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 722a6fcc4fe..219c353ab25 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -72,9 +72,9 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
MetaDataContexts metaDataContexts = createMetaDataContextsBuilder(metaDataPersistService, parameter).build(metaDataPersistService);
persistMetaData(metaDataContexts);
Properties transactionProps = getTransactionProperties(metaDataContexts);
- persistTransactionConfiguration(parameter, metaDataPersistService, transactionProps);
- ContextManager result = createContextManager(repository, metaDataPersistService, parameter.getInstanceDefinition(), metaDataContexts, transactionProps, parameter.getModeConfig());
- registerOnline(metaDataPersistService, parameter.getInstanceDefinition(), result, registryCenter);
+ persistTransactionConfiguration(metaDataPersistService, transactionProps);
+ ContextManager result = createContextManager(repository, registryCenter, parameter.getInstanceDefinition(), metaDataContexts, transactionProps, parameter.getModeConfig());
+ registerOnline(metaDataPersistService, parameter, result, registryCenter);
return result;
}
@@ -83,7 +83,6 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
if (!parameter.isEmpty()) {
metaDataPersistService.persistConfigurations(parameter.getDatabaseConfigs(), parameter.getGlobalRuleConfigs(), parameter.getProps(), isOverwrite);
}
- metaDataPersistService.persistInstanceLabels(parameter.getInstanceDefinition().getInstanceId().getId(), parameter.getLabels(), isOverwrite);
}
private MetaDataContextsBuilder createMetaDataContextsBuilder(final MetaDataPersistService metaDataPersistService, final ContextManagerBuilderParameter parameter) {
@@ -118,14 +117,10 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
return result;
}
- private void persistTransactionConfiguration(final ContextManagerBuilderParameter parameter, final MetaDataPersistService metaDataPersistService, final Properties transactionProps) {
+ private void persistTransactionConfiguration(final MetaDataPersistService metaDataPersistService, final Properties transactionProps) {
if (!transactionProps.isEmpty()) {
metaDataPersistService.persistTransactionRule(transactionProps, true);
}
- String instanceId = parameter.getInstanceDefinition().getInstanceId().getId();
- if (!metaDataPersistService.getComputeNodePersistService().loadXaRecoveryId(instanceId).isPresent()) {
- metaDataPersistService.getComputeNodePersistService().persistInstanceXaRecoveryId(instanceId, instanceId);
- }
}
private Map<String, DatabaseConfiguration> getDatabaseConfigMap(final Collection<String> databaseNames, final MetaDataPersistService metaDataPersistService,
@@ -147,13 +142,12 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
.forEach((schemaName, tables) -> metaDataContexts.getPersistService().ifPresent(optional -> optional.getSchemaMetaDataService().persistMetaData(databaseName, schemaName, tables))));
}
- private ContextManager createContextManager(final ClusterPersistRepository repository, final MetaDataPersistService metaDataPersistService,
+ private ContextManager createContextManager(final ClusterPersistRepository repository, final RegistryCenter registryCenter,
final InstanceDefinition instanceDefinition, final MetaDataContexts metaDataContexts,
final Properties transactionProps, final ModeConfiguration modeConfig) {
- ComputeNodeInstance computeNodeInstance = metaDataPersistService.getComputeNodePersistService().loadComputeNodeInstance(instanceDefinition);
- ClusterWorkerIdGenerator clusterWorkerIdGenerator = new ClusterWorkerIdGenerator(repository, metaDataPersistService, instanceDefinition);
+ ClusterWorkerIdGenerator clusterWorkerIdGenerator = new ClusterWorkerIdGenerator(repository, registryCenter, instanceDefinition);
DistributeLockContext distributeLockContext = new DistributeLockContext(repository);
- InstanceContext instanceContext = new InstanceContext(computeNodeInstance, clusterWorkerIdGenerator, modeConfig, distributeLockContext);
+ InstanceContext instanceContext = new InstanceContext(new ComputeNodeInstance(instanceDefinition), clusterWorkerIdGenerator, modeConfig, distributeLockContext);
repository.watchSessionConnection(instanceContext);
generateTransactionConfigurationFile(instanceContext, metaDataContexts, transactionProps);
TransactionContexts transactionContexts = new TransactionContextsBuilder(
@@ -170,11 +164,14 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
}
}
- private void registerOnline(final MetaDataPersistService metaDataPersistService, final InstanceDefinition instanceDefinition, final ContextManager contextManager,
+ private void registerOnline(final MetaDataPersistService metaDataPersistService, final ContextManagerBuilderParameter parameter, final ContextManager contextManager,
final RegistryCenter registryCenter) {
+ String instanceId = contextManager.getInstanceContext().getInstance().getCurrentInstanceId();
+ contextManager.getInstanceContext().getInstance().setXaRecoveryId(instanceId);
+ contextManager.getInstanceContext().getInstance().setLabels(parameter.getLabels());
+ contextManager.getInstanceContext().getComputeNodeInstances().addAll(registryCenter.getComputeNodeStatusService().loadAllComputeNodeInstances());
new ClusterContextManagerCoordinator(metaDataPersistService, contextManager, registryCenter);
- contextManager.getInstanceContext().getComputeNodeInstances().addAll(metaDataPersistService.getComputeNodePersistService().loadAllComputeNodeInstances());
- registryCenter.onlineInstance(instanceDefinition);
+ registryCenter.onlineInstance(contextManager.getInstanceContext().getInstance());
}
@Override
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index 36b29700913..3ea25752e9b 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -238,6 +238,7 @@ public final class ClusterContextManagerCoordinator {
*/
@Subscribe
public synchronized void renew(final LabelsEvent event) {
+ // TODO labels may be empty
contextManager.getInstanceContext().updateLabel(event.getInstanceId(), event.getLabels());
}
@@ -260,7 +261,7 @@ public final class ClusterContextManagerCoordinator {
*/
@Subscribe
public synchronized void renew(final InstanceOnlineEvent event) {
- ComputeNodeInstance instance = metaDataPersistService.getComputeNodePersistService().loadComputeNodeInstance(event.getInstanceDefinition());
+ ComputeNodeInstance instance = new ComputeNodeInstance(event.getInstanceDefinition());
contextManager.getInstanceContext().addComputeNodeInstance(instance);
}
@@ -271,7 +272,7 @@ public final class ClusterContextManagerCoordinator {
*/
@Subscribe
public synchronized void renew(final InstanceOfflineEvent event) {
- contextManager.getInstanceContext().deleteComputeNodeInstance(metaDataPersistService.getComputeNodePersistService().loadComputeNodeInstance(event.getInstanceDefinition()));
+ contextManager.getInstanceContext().deleteComputeNodeInstance(new ComputeNodeInstance(event.getInstanceDefinition()));
}
/**
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index 8e927bcd367..edde2f881b8 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator;
import lombok.Getter;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockRegistryService;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.MutexLockRegistryService;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
@@ -70,10 +70,12 @@ public final class RegistryCenter {
/**
* Online instance.
*
- * @param instanceDefinition instance definition
+ * @param computeNodeInstance compute node instance
*/
- public void onlineInstance(final InstanceDefinition instanceDefinition) {
- computeNodeStatusService.registerOnline(instanceDefinition);
+ public void onlineInstance(final ComputeNodeInstance computeNodeInstance) {
+ computeNodeStatusService.registerOnline(computeNodeInstance.getInstanceDefinition());
+ computeNodeStatusService.persistInstanceLabels(computeNodeInstance.getCurrentInstanceId(), computeNodeInstance.getLabels());
+ computeNodeStatusService.persistInstanceXaRecoveryId(computeNodeInstance.getCurrentInstanceId(), computeNodeInstance.getXaRecoveryId());
listenerFactory.watchListeners();
}
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index 5715e3cef56..36178bff7e9 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -17,15 +17,27 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service;
+import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.definition.InstanceType;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
/**
* Compute node status service.
*/
@RequiredArgsConstructor
+@Slf4j
public final class ComputeNodeStatusService {
private final ClusterPersistRepository repository;
@@ -38,4 +50,137 @@ public final class ComputeNodeStatusService {
public void registerOnline(final InstanceDefinition instanceDefinition) {
repository.persistEphemeral(ComputeNode.getOnlineInstanceNodePath(instanceDefinition.getInstanceId().getId(), instanceDefinition.getInstanceType()), "");
}
+
+ /**
+ * Persist instance labels.
+ *
+ * @param instanceId instance id
+ * @param labels collection of label
+ */
+ public void persistInstanceLabels(final String instanceId, final Collection<String> labels) {
+ if (null != labels && !labels.isEmpty()) {
+ repository.persistEphemeral(ComputeNode.getInstanceLabelsNodePath(instanceId), YamlEngine.marshal(labels));
+ }
+ }
+
+ /**
+ * Delete instance labels.
+ *
+ * @param instanceId instance id
+ */
+ public void deleteInstanceLabels(final String instanceId) {
+ if (isExisted(instanceId)) {
+ repository.delete(ComputeNode.getInstanceLabelsNodePath(instanceId));
+ }
+ }
+
+ private boolean isExisted(final String instanceId) {
+ return !Strings.isNullOrEmpty(repository.get(ComputeNode.getInstanceLabelsNodePath(instanceId)));
+ }
+
+ /**
+ * Persist instance worker id.
+ *
+ * @param instanceId instance id
+ * @param workerId worker id
+ */
+ public void persistInstanceWorkerId(final String instanceId, final Long workerId) {
+ repository.persistEphemeral(ComputeNode.getInstanceWorkerIdNodePath(instanceId), String.valueOf(workerId));
+ }
+
+ /**
+ * Persist instance xa recovery id.
+ *
+ * @param instanceId instance id
+ * @param xaRecoveryId xa recovery id
+ */
+ public void persistInstanceXaRecoveryId(final String instanceId, final String xaRecoveryId) {
+ loadXaRecoveryId(instanceId).ifPresent(each -> repository.delete(ComputeNode.getInstanceXaRecoveryIdNodePath(each, instanceId)));
+ repository.persistEphemeral(ComputeNode.getInstanceXaRecoveryIdNodePath(xaRecoveryId, instanceId), "");
+ }
+
+ /**
+ * Load instance labels.
+ *
+ * @param instanceId instance id
+ * @return labels
+ */
+ @SuppressWarnings("unchecked")
+ public Collection<String> loadInstanceLabels(final String instanceId) {
+ String yamlContent = repository.get(ComputeNode.getInstanceLabelsNodePath(instanceId));
+ return Strings.isNullOrEmpty(yamlContent) ? new ArrayList<>() : YamlEngine.unmarshal(yamlContent, Collection.class);
+ }
+
+ /**
+ * Load instance status.
+ *
+ * @param instanceId instance id
+ * @return status
+ */
+ @SuppressWarnings("unchecked")
+ public Collection<String> loadInstanceStatus(final String instanceId) {
+ String yamlContent = repository.get(ComputeNode.getInstanceStatusNodePath(instanceId));
+ return Strings.isNullOrEmpty(yamlContent) ? new ArrayList<>() : YamlEngine.unmarshal(yamlContent, Collection.class);
+ }
+
+ /**
+ * Load instance worker id.
+ *
+ * @param instanceId instance id
+ * @return worker id
+ */
+ public Optional<Long> loadInstanceWorkerId(final String instanceId) {
+ try {
+ String workerId = repository.get(ComputeNode.getInstanceWorkerIdNodePath(instanceId));
+ return Strings.isNullOrEmpty(workerId) ? Optional.empty() : Optional.of(Long.valueOf(workerId));
+ } catch (final NumberFormatException ex) {
+ log.error("Invalid worker id for instance: {}", instanceId);
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Load instance xa recovery id.
+ *
+ * @param instanceId instance id
+ * @return xa recovery id
+ */
+ public Optional<String> loadXaRecoveryId(final String instanceId) {
+ List<String> xaRecoveryIds = repository.getChildrenKeys(ComputeNode.getXaRecoveryIdNodePath());
+ for (String xaRecoveryId : xaRecoveryIds) {
+ if (repository.getChildrenKeys(String.join("/", ComputeNode.getXaRecoveryIdNodePath(), xaRecoveryId)).contains(instanceId)) {
+ return Optional.of(xaRecoveryId);
+ }
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Load all compute node instances.
+ *
+ * @return compute node instances
+ */
+ public Collection<ComputeNodeInstance> loadAllComputeNodeInstances() {
+ Collection<ComputeNodeInstance> result = new ArrayList<>();
+ Arrays.stream(InstanceType.values()).forEach(instanceType -> {
+ Collection<String> onlineComputeNodes = repository.getChildrenKeys(ComputeNode.getOnlineNodePath(instanceType));
+ onlineComputeNodes.forEach(each -> result.add(loadComputeNodeInstance(new InstanceDefinition(instanceType, each))));
+ });
+ return result;
+ }
+
+ /**
+ * Load compute node instance by instance definition.
+ *
+ * @param instanceDefinition instance definition
+ * @return compute node instance
+ */
+ public ComputeNodeInstance loadComputeNodeInstance(final InstanceDefinition instanceDefinition) {
+ ComputeNodeInstance result = new ComputeNodeInstance(instanceDefinition);
+ result.setLabels(loadInstanceLabels(instanceDefinition.getInstanceId().getId()));
+ result.switchState(loadInstanceStatus(instanceDefinition.getInstanceId().getId()));
+ loadInstanceWorkerId(instanceDefinition.getInstanceId().getId()).ifPresent(result::setWorkerId);
+ loadXaRecoveryId(instanceDefinition.getInstanceId().getId()).ifPresent(result::setXaRecoveryId);
+ return result;
+ }
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
index 0337c42be2d..944fb41cba3 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
@@ -20,8 +20,8 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.work
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.node.WorkerIdNode;
-import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import java.util.Optional;
@@ -34,18 +34,18 @@ public final class ClusterWorkerIdGenerator implements WorkerIdGenerator {
private final ClusterPersistRepository repository;
- private final MetaDataPersistService metaDataPersistService;
+ private final RegistryCenter registryCenter;
private final InstanceDefinition instanceDefinition;
@Override
public long generate() {
- return metaDataPersistService.getComputeNodePersistService().loadInstanceWorkerId(instanceDefinition.getInstanceId().getId()).orElseGet(this::reGenerate);
+ return registryCenter.getComputeNodeStatusService().loadInstanceWorkerId(instanceDefinition.getInstanceId().getId()).orElseGet(this::reGenerate);
}
private Long reGenerate() {
Long result = Long.valueOf(Optional.ofNullable(repository.getSequentialId(WorkerIdNode.getWorkerIdGeneratorPath(instanceDefinition.getInstanceId().getId()), "")).orElse("0"));
- metaDataPersistService.getComputeNodePersistService().persistInstanceWorkerId(instanceDefinition.getInstanceId().getId(), result);
+ registryCenter.getComputeNodeStatusService().persistInstanceWorkerId(instanceDefinition.getInstanceId().getId(), result);
return result;
}
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
index 443b675ec23..da966e03c1a 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
@@ -335,13 +335,11 @@ public final class ClusterContextManagerCoordinatorTest {
@Test
public void assertRenewInstanceOnlineEvent() {
InstanceDefinition instanceDefinition1 = new InstanceDefinition(InstanceType.PROXY, "online_instance_id@1");
- InstanceDefinition instanceDefinition2 = new InstanceDefinition(InstanceType.PROXY, "online_instance_id@2");
- when(metaDataPersistService.getComputeNodePersistService().loadComputeNodeInstance(instanceDefinition1)).thenReturn(new ComputeNodeInstance(instanceDefinition1));
- when(metaDataPersistService.getComputeNodePersistService().loadComputeNodeInstance(instanceDefinition2)).thenReturn(new ComputeNodeInstance(instanceDefinition2));
InstanceOnlineEvent instanceOnlineEvent1 = new InstanceOnlineEvent(instanceDefinition1);
coordinator.renew(instanceOnlineEvent1);
assertThat(contextManager.getInstanceContext().getComputeNodeInstances().size(), is(1));
assertThat(((LinkedList<ComputeNodeInstance>) contextManager.getInstanceContext().getComputeNodeInstances()).get(0).getInstanceDefinition(), is(instanceDefinition1));
+ InstanceDefinition instanceDefinition2 = new InstanceDefinition(InstanceType.PROXY, "online_instance_id@2");
InstanceOnlineEvent instanceOnlineEvent2 = new InstanceOnlineEvent(instanceDefinition2);
coordinator.renew(instanceOnlineEvent2);
assertThat(contextManager.getInstanceContext().getComputeNodeInstances().size(), is(2));
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServi [...]
index 2bb5688761e..0aac987ee72 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
@@ -17,15 +17,25 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
import org.apache.shardingsphere.infra.instance.definition.InstanceType;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
+import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public final class ComputeNodeStatusServiceTest {
@@ -39,4 +49,80 @@ public final class ComputeNodeStatusServiceTest {
new ComputeNodeStatusService(repository).registerOnline(instanceDefinition);
verify(repository).persistEphemeral("/nodes/compute_nodes/online/proxy/" + instanceDefinition.getInstanceId().getId(), "");
}
+
+ @Test
+ public void assertPersistInstanceLabels() {
+ ComputeNodeStatusService computeNodeStatusService = new ComputeNodeStatusService(repository);
+ InstanceDefinition instanceDefinition = new InstanceDefinition(InstanceType.PROXY, 3307);
+ final String instanceId = instanceDefinition.getInstanceId().getId();
+ computeNodeStatusService.persistInstanceLabels(instanceId, Collections.singletonList("test"));
+ verify(repository, times(1)).persistEphemeral(ComputeNode.getInstanceLabelsNodePath(instanceId), YamlEngine.marshal(Collections.singletonList("test")));
+ computeNodeStatusService.persistInstanceLabels(instanceId, Collections.emptyList());
+ verify(repository, times(0)).persistEphemeral(ComputeNode.getInstanceLabelsNodePath(instanceId), YamlEngine.marshal(Collections.emptyList()));
+ }
+
+ @Test
+ public void assertPersistInstanceWorkerId() {
+ InstanceDefinition instanceDefinition = new InstanceDefinition(InstanceType.PROXY, 3307);
+ final String instanceId = instanceDefinition.getInstanceId().getId();
+ new ComputeNodeStatusService(repository).persistInstanceWorkerId(instanceId, 100L);
+ verify(repository).persistEphemeral(ComputeNode.getInstanceWorkerIdNodePath(instanceId), String.valueOf(100L));
+ }
+
+ @Test
+ public void assertPersistInstanceXaRecoveryId() {
+ InstanceDefinition instanceDefinition = new InstanceDefinition(InstanceType.PROXY, 3307);
+ final String instanceId = instanceDefinition.getInstanceId().getId();
+ new ComputeNodeStatusService(repository).persistInstanceXaRecoveryId(instanceId, instanceId);
+ verify(repository).getChildrenKeys(ComputeNode.getXaRecoveryIdNodePath());
+ verify(repository).persistEphemeral(ComputeNode.getInstanceXaRecoveryIdNodePath(instanceId, instanceId), "");
+ }
+
+ @Test
+ public void assertLoadInstanceLabels() {
+ InstanceDefinition instanceDefinition = new InstanceDefinition(InstanceType.PROXY, 3307);
+ final String instanceId = instanceDefinition.getInstanceId().getId();
+ new ComputeNodeStatusService(repository).loadInstanceLabels(instanceId);
+ verify(repository).get(ComputeNode.getInstanceLabelsNodePath(instanceId));
+ }
+
+ @Test
+ public void assertLoadInstanceStatus() {
+ InstanceDefinition instanceDefinition = new InstanceDefinition(InstanceType.PROXY, 3307);
+ final String instanceId = instanceDefinition.getInstanceId().getId();
+ new ComputeNodeStatusService(repository).loadInstanceStatus(instanceId);
+ verify(repository).get(ComputeNode.getInstanceStatusNodePath(instanceId));
+ }
+
+ @Test
+ public void assertLoadInstanceWorkerId() {
+ InstanceDefinition instanceDefinition = new InstanceDefinition(InstanceType.PROXY, 3307);
+ final String instanceId = instanceDefinition.getInstanceId().getId();
+ new ComputeNodeStatusService(repository).loadInstanceWorkerId(instanceId);
+ verify(repository).get(ComputeNode.getInstanceWorkerIdNodePath(instanceId));
+ }
+
+ @Test
+ public void assertLoadInstanceXaRecoveryId() {
+ InstanceDefinition instanceDefinition = new InstanceDefinition(InstanceType.PROXY, 3307);
+ final String instanceId = instanceDefinition.getInstanceId().getId();
+ new ComputeNodeStatusService(repository).loadXaRecoveryId(instanceId);
+ verify(repository).getChildrenKeys(ComputeNode.getXaRecoveryIdNodePath());
+
+ }
+
+ @Test
+ public void assertLoadAllComputeNodeInstances() {
+ when(repository.getChildrenKeys("/nodes/compute_nodes/online/proxy")).thenReturn(Collections.singletonList("127.0.0.1@3307"));
+ when(repository.getChildrenKeys("/nodes/compute_nodes/online/jdbc")).thenReturn(Collections.singletonList("127.0.0.1@3308"));
+ Collection<ComputeNodeInstance> actual = new ComputeNodeStatusService(repository).loadAllComputeNodeInstances();
+ assertThat(actual.size(), is(2));
+ }
+
+ @Test
+ public void assertLoadComputeNodeInstance() {
+ InstanceDefinition instanceDefinition = new InstanceDefinition(InstanceType.PROXY, 3307);
+ ComputeNodeInstance actual = new ComputeNodeStatusService(repository).loadComputeNodeInstance(instanceDefinition);
+ assertThat(actual.getInstanceDefinition(), is(instanceDefinition));
+ }
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/resources/logback-test.xml b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/resources/logback-test.xml
index a7207c70eba..064918a0829 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/resources/logback-test.xml
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/resources/logback-test.xml
@@ -26,7 +26,7 @@
<appender-ref ref="console" />
</logger>
<logger name="org.apache.shardingsphere.schedule.core.api.ModeScheduleContext" level="error" />
- <logger name="org.apache.shardingsphere.mode.metadata.persist.service.ComputeNodePersistService" level="off" />
+ <logger name="org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService" level="off" />
<root>
<level value="error" />
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
index b88302d74b9..7ce473af11d 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
@@ -21,6 +21,7 @@ import org.apache.shardingsphere.infra.config.RuleConfiguration;
import org.apache.shardingsphere.infra.config.database.DatabaseConfiguration;
import org.apache.shardingsphere.infra.config.database.impl.DataSourceProvidedDatabaseConfiguration;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.instance.definition.InstanceType;
import org.apache.shardingsphere.infra.rule.identifier.type.InstanceAwareRule;
@@ -56,7 +57,7 @@ public final class StandaloneContextManagerBuilder implements ContextManagerBuil
MetaDataPersistService metaDataPersistService = new MetaDataPersistService(StandalonePersistRepositoryFactory.getInstance(parameter.getModeConfig().getRepository()));
persistConfigurations(metaDataPersistService, parameter);
MetaDataContexts metaDataContexts = createMetaDataContextsBuilder(metaDataPersistService, parameter).build(metaDataPersistService);
- return createContextManager(metaDataPersistService, parameter, metaDataContexts);
+ return createContextManager(parameter, metaDataContexts);
}
private void persistConfigurations(final MetaDataPersistService metaDataPersistService, final ContextManagerBuilderParameter parameter) {
@@ -89,9 +90,9 @@ public final class StandaloneContextManagerBuilder implements ContextManagerBuil
return new DataSourceProvidedDatabaseConfiguration(dataSources, databaseRuleConfigs);
}
- private ContextManager createContextManager(final MetaDataPersistService metaDataPersistService, final ContextManagerBuilderParameter parameter, final MetaDataContexts metaDataContexts) {
- InstanceContext instanceContext = new InstanceContext(metaDataPersistService.getComputeNodePersistService().loadComputeNodeInstance(parameter.getInstanceDefinition()),
- new StandaloneWorkerIdGenerator(), parameter.getModeConfig(), new StandaloneLockContext());
+ private ContextManager createContextManager(final ContextManagerBuilderParameter parameter, final MetaDataContexts metaDataContexts) {
+ InstanceContext instanceContext = new InstanceContext(new ComputeNodeInstance(parameter.getInstanceDefinition()), new StandaloneWorkerIdGenerator(), parameter.getModeConfig(),
+ new StandaloneLockContext());
generateTransactionConfigurationFile(instanceContext, metaDataContexts);
TransactionContexts transactionContexts = new TransactionContextsBuilder(
metaDataContexts.getMetaData().getDatabases(), metaDataContexts.getMetaData().getGlobalRuleMetaData().getRules()).build();
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/test/resources/logback-test.xml b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/test/resources/logback-test.xml
index 00559e3410c..5196f987673 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/test/resources/logback-test.xml
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/test/resources/logback-test.xml
@@ -25,7 +25,7 @@
<logger name="org.apache.shardingsphere" level="warn" additivity="false">
<appender-ref ref="console" />
</logger>
- <logger name="org.apache.shardingsphere.mode.metadata.persist.service.ComputeNodePersistService" level="off" />
+ <logger name="org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService" level="off" />
<root>
<level value="error" />
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/AlterInstanceHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/AlterInstanceHandler.java
index 0bd8e33016e..0a5c7e73151 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/AlterInstanceHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/AlterInstanceHandler.java
@@ -19,14 +19,13 @@ package org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.updatabl
import org.apache.shardingsphere.distsql.parser.statement.ral.common.updatable.AlterInstanceStatement;
import org.apache.shardingsphere.infra.distsql.exception.DistSQLException;
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdEvent;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.proxy.backend.text.distsql.ral.UpdatableRALBackendHandler;
-import java.util.Collection;
import java.util.Optional;
-import java.util.stream.Collectors;
/**
* Alter instance handler.
@@ -49,15 +48,10 @@ public final class AlterInstanceHandler extends UpdatableRALBackendHandler<Alter
if (!persistService.isPresent()) {
throw new UnsupportedOperationException(String.format("No persistence configuration found, unable to set '%s'", sqlStatement.getKey()));
}
- Collection<ComputeNodeInstance> instances = persistService.get().getComputeNodePersistService().loadAllComputeNodeInstances();
- checkExisted(instances, sqlStatement);
- persistService.get().getComputeNodePersistService().persistInstanceXaRecoveryId(sqlStatement.getInstanceId(), sqlStatement.getValue());
- }
-
- private void checkExisted(final Collection<ComputeNodeInstance> instances, final AlterInstanceStatement sqlStatement) {
- Collection<String> instanceIds = instances.stream().map(each -> each.getInstanceDefinition().getInstanceId().getId()).collect(Collectors.toSet());
- if (!instanceIds.contains(sqlStatement.getInstanceId())) {
+ if (!contextManager.getInstanceContext().getComputeNodeInstanceById(sqlStatement.getInstanceId()).isPresent()) {
throw new UnsupportedOperationException(String.format("'%s' does not exist", sqlStatement.getInstanceId()));
}
+ // TODO need support standalone mode
+ ShardingSphereEventBus.getInstance().post(new XaRecoveryIdEvent(sqlStatement.getInstanceId(), sqlStatement.getValue()));
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/LabelInstanceHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/LabelInstanceHandler.java
index d18d324f18a..563ea75a4c6 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/LabelInstanceHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/LabelInstanceHandler.java
@@ -18,11 +18,11 @@
package org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.updatable;
import org.apache.shardingsphere.distsql.parser.statement.ral.common.updatable.LabelInstanceStatement;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
import org.apache.shardingsphere.infra.instance.definition.InstanceId;
-import org.apache.shardingsphere.infra.instance.definition.InstanceType;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.mode.repository.standalone.StandalonePersistRepository;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -30,6 +30,7 @@ import org.apache.shardingsphere.proxy.backend.text.distsql.ral.UpdatableRALBack
import java.util.Collection;
import java.util.LinkedHashSet;
+import java.util.Optional;
/**
* Label instance handler.
@@ -43,11 +44,13 @@ public final class LabelInstanceHandler extends UpdatableRALBackendHandler<Label
throw new UnsupportedOperationException("Labels can only be added in cluster mode");
}
String instanceId = new InstanceId(sqlStatement.getIp(), String.valueOf(sqlStatement.getPort())).getId();
- ComputeNodeInstance instances = persistService.getComputeNodePersistService().loadComputeNodeInstance(new InstanceDefinition(InstanceType.PROXY, instanceId));
- Collection<String> labels = new LinkedHashSet<>(sqlStatement.getLabels());
- if (!sqlStatement.isOverwrite()) {
- labels.addAll(instances.getLabels());
+ Optional<ComputeNodeInstance> computeNodeInstance = contextManager.getInstanceContext().getComputeNodeInstanceById(instanceId);
+ if (computeNodeInstance.isPresent()) {
+ Collection<String> labels = new LinkedHashSet<>(sqlStatement.getLabels());
+ if (!sqlStatement.isOverwrite() && null != computeNodeInstance.get().getLabels()) {
+ labels.addAll(computeNodeInstance.get().getLabels());
+ }
+ ShardingSphereEventBus.getInstance().post(new LabelsEvent(instanceId, labels));
}
- persistService.getComputeNodePersistService().persistInstanceLabels(instanceId, labels, true);
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/SetInstanceStatusHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/SetInstanceStatusHandler.java
index 2162c85f747..4761e85ce1e 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/SetInstanceStatusHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/SetInstanceStatusHandler.java
@@ -19,21 +19,13 @@ package org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.updatabl
import org.apache.shardingsphere.distsql.parser.statement.ral.common.updatable.SetInstanceStatusStatement;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
-import org.apache.shardingsphere.infra.instance.InstanceContext;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
import org.apache.shardingsphere.infra.instance.definition.InstanceId;
import org.apache.shardingsphere.infra.state.StateType;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeStatusChangedEvent;
-import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
-import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.text.distsql.ral.UpdatableRALBackendHandler;
-import java.util.Collection;
-import java.util.Optional;
-
/**
* Set instance status handler.
*/
@@ -47,49 +39,29 @@ public final class SetInstanceStatusHandler extends UpdatableRALBackendHandler<S
InstanceId operationInstanceId = new InstanceId(sqlStatement.getIp(), String.valueOf(sqlStatement.getPort()));
boolean isDisable = "DISABLE".equals(sqlStatement.getStatus());
if (isDisable) {
- checkDisablingIsValid(operationInstanceId);
+ checkDisablingIsValid(contextManager, operationInstanceId);
} else {
- checkEnablingIsValid(operationInstanceId);
+ checkEnablingIsValid(contextManager, operationInstanceId);
}
ShardingSphereEventBus.getInstance().post(new ComputeNodeStatusChangedEvent(isDisable ? ComputeNodeStatus.CIRCUIT_BREAK : ComputeNodeStatus.ONLINE,
sqlStatement.getIp(), sqlStatement.getPort()));
}
- private void checkEnablingIsValid(final InstanceId operationInstanceId) {
- checkExist(operationInstanceId);
+ private void checkEnablingIsValid(final ContextManager contextManager, final InstanceId operationInstanceId) {
+ if (!contextManager.getInstanceContext().getComputeNodeInstanceById(operationInstanceId.getId()).isPresent()) {
+ throw new UnsupportedOperationException(String.format("`%s` does not exist", operationInstanceId.getId()));
+ }
}
- private void checkDisablingIsValid(final InstanceId operationInstanceId) {
- InstanceContext instanceContext = ProxyContext.getInstance().getContextManager().getInstanceContext();
- if (isIdenticalInstance(instanceContext.getInstance().getInstanceDefinition(), operationInstanceId)) {
+ private void checkDisablingIsValid(final ContextManager contextManager, final InstanceId operationInstanceId) {
+ if (contextManager.getInstanceContext().getInstance().getCurrentInstanceId().equals(operationInstanceId.getId())) {
throw new UnsupportedOperationException(String.format("`%s` is the currently in use instance and cannot be disabled", operationInstanceId.getId()));
}
- checkExist(operationInstanceId);
- checkExistDisabled(operationInstanceId);
- }
-
- private void checkExistDisabled(final InstanceId operationInstanceId) {
- Optional<MetaDataPersistService> metaDataPersistService = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getPersistService();
- if (metaDataPersistService.isPresent()) {
- metaDataPersistService.get().getComputeNodePersistService().loadAllComputeNodeInstances().forEach(each -> {
- if (StateType.CIRCUIT_BREAK == each.getState().getCurrentState() && isIdenticalInstance(each.getInstanceDefinition(), operationInstanceId)) {
- throw new UnsupportedOperationException(String.format("`%s` compute node has been disabled", operationInstanceId.getId()));
- }
- });
+ if (!contextManager.getInstanceContext().getComputeNodeInstanceById(operationInstanceId.getId()).isPresent()) {
+ throw new UnsupportedOperationException(String.format("`%s` does not exist", operationInstanceId.getId()));
+ }
+ if (contextManager.getInstanceContext().getComputeNodeInstanceById(operationInstanceId.getId()).get().getState().getCurrentState() == StateType.CIRCUIT_BREAK) {
+ throw new UnsupportedOperationException(String.format("`%s` compute node has been disabled", operationInstanceId.getId()));
}
- }
-
- private void checkExist(final InstanceId operationInstanceId) {
- Optional<MetaDataPersistService> metaDataPersistService = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getPersistService();
- metaDataPersistService.ifPresent(optional -> {
- Collection<ComputeNodeInstance> computeNodeInstances = optional.getComputeNodePersistService().loadAllComputeNodeInstances();
- if (computeNodeInstances.stream().noneMatch(each -> isIdenticalInstance(each.getInstanceDefinition(), operationInstanceId))) {
- throw new UnsupportedOperationException(String.format("`%s` does not exist", operationInstanceId.getId()));
- }
- });
- }
-
- private boolean isIdenticalInstance(final InstanceDefinition definition, final InstanceId instanceId) {
- return definition.getInstanceId().getId().equals(instanceId.getId());
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/UnlabelInstanceHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/UnlabelInstanceHandler.java
index b584e9249d5..78294bbbbe7 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/UnlabelInstanceHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/UnlabelInstanceHandler.java
@@ -19,18 +19,20 @@ package org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.updatabl
import org.apache.shardingsphere.distsql.parser.statement.ral.common.updatable.UnlabelInstanceStatement;
import org.apache.shardingsphere.infra.distsql.exception.DistSQLException;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
import org.apache.shardingsphere.infra.instance.definition.InstanceId;
-import org.apache.shardingsphere.infra.instance.definition.InstanceType;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.mode.repository.standalone.StandalonePersistRepository;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.text.distsql.ral.UpdatableRALBackendHandler;
import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedHashSet;
+import java.util.Optional;
/**
* Unlabel instance handler.
@@ -44,13 +46,15 @@ public final class UnlabelInstanceHandler extends UpdatableRALBackendHandler<Unl
throw new UnsupportedOperationException("Labels can only be removed in cluster mode");
}
String instanceId = new InstanceId(sqlStatement.getIp(), String.valueOf(sqlStatement.getPort())).getId();
- ComputeNodeInstance instances = persistService.getComputeNodePersistService().loadComputeNodeInstance(new InstanceDefinition(InstanceType.PROXY, instanceId));
- Collection<String> labels = new LinkedHashSet<>(instances.getLabels());
- if (sqlStatement.getLabels().isEmpty()) {
- persistService.getComputeNodePersistService().deleteInstanceLabels(instanceId);
- } else {
- labels.removeAll(sqlStatement.getLabels());
- persistService.getComputeNodePersistService().persistInstanceLabels(instanceId, labels, true);
+ Optional<ComputeNodeInstance> computeNodeInstance = contextManager.getInstanceContext().getComputeNodeInstanceById(instanceId);
+ if (computeNodeInstance.isPresent()) {
+ Collection<String> labels = new LinkedHashSet<>(computeNodeInstance.get().getLabels());
+ if (sqlStatement.getLabels().isEmpty()) {
+ ShardingSphereEventBus.getInstance().post(new LabelsEvent(instanceId, Collections.EMPTY_LIST));
+ } else {
+ labels.removeAll(sqlStatement.getLabels());
+ ShardingSphereEventBus.getInstance().post(new LabelsEvent(instanceId, labels));
+ }
}
}
}
diff --git a/shardingsphere-test/shardingsphere-integration-driver-test/src/test/resources/logback-test.xml b/shardingsphere-test/shardingsphere-integration-driver-test/src/test/resources/logback-test.xml
index a7207c70eba..064918a0829 100644
--- a/shardingsphere-test/shardingsphere-integration-driver-test/src/test/resources/logback-test.xml
+++ b/shardingsphere-test/shardingsphere-integration-driver-test/src/test/resources/logback-test.xml
@@ -26,7 +26,7 @@
<appender-ref ref="console" />
</logger>
<logger name="org.apache.shardingsphere.schedule.core.api.ModeScheduleContext" level="error" />
- <logger name="org.apache.shardingsphere.mode.metadata.persist.service.ComputeNodePersistService" level="off" />
+ <logger name="org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService" level="off" />
<root>
<level value="error" />