You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/01/12 09:12:13 UTC
[shardingsphere] branch master updated: Add instance context to manage compute node instance and status (#14710)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 22c15aa Add instance context to manage compute node instance and status (#14710)
22c15aa is described below
commit 22c15aab1af00eb914f7f1530759110fd98cb341
Author: Haoran Meng <me...@gmail.com>
AuthorDate: Wed Jan 12 17:11:15 2022 +0800
Add instance context to manage compute node instance and status (#14710)
---
.../collector/ProxyInfoCollectorTest.java | 7 +++
.../service/PrometheusPluginBootServiceTest.java | 7 +++
.../infra/instance/InstanceContext.java | 54 ++++++++++++++++++++++
.../shardingsphere/infra/state/StateContext.java | 16 +++----
.../shardingsphere/infra/state/StateEvent.java | 6 ++-
.../infra/state/StateContextTest.java | 12 ++---
.../driver/state/DriverStateContext.java | 2 +-
.../datasource/ShardingSphereDataSourceTest.java | 4 +-
.../driver/state/DriverStateContextTest.java | 2 +-
.../mode/manager/ContextManager.java | 8 ++--
.../persist/service/ComputeNodePersistService.java | 22 ++++++---
.../mode/manager/ContextManagerTest.java | 6 ++-
.../cluster/ClusterContextManagerBuilder.java | 6 ++-
.../ClusterContextManagerCoordinator.java | 13 ++++++
.../watcher/ComputeNodeStateChangedWatcher.java | 4 +-
.../ComputeNodeStateChangedWatcherTest.java | 9 ++--
.../memory/MemoryContextManagerBuilder.java | 11 ++++-
.../StandaloneContextManagerBuilder.java | 3 +-
.../proxy/backend/context/ProxyContext.java | 2 +-
19 files changed, 151 insertions(+), 43 deletions(-)
diff --git a/shardingsphere-agent/shardingsphere-agent-plugins/shardingsphere-agent-plugin-metrics/shardingsphere-agent-metrics-prometheus/src/test/java/org/apache/shardingsphere/agent/metrics/prometheus/collector/ProxyInfoCollectorTest.java b/shardingsphere-agent/shardingsphere-agent-plugins/shardingsphere-agent-plugin-metrics/shardingsphere-agent-metrics-prometheus/src/test/java/org/apache/shardingsphere/agent/metrics/prometheus/collector/ProxyInfoCollectorTest.java
index 1513174..773a766 100644
--- a/shardingsphere-agent/shardingsphere-agent-plugins/shardingsphere-agent-plugin-metrics/shardingsphere-agent-metrics-prometheus/src/test/java/org/apache/shardingsphere/agent/metrics/prometheus/collector/ProxyInfoCollectorTest.java
+++ b/shardingsphere-agent/shardingsphere-agent-plugins/shardingsphere-agent-plugin-metrics/shardingsphere-agent-metrics-prometheus/src/test/java/org/apache/shardingsphere/agent/metrics/prometheus/collector/ProxyInfoCollectorTest.java
@@ -18,16 +18,23 @@
package org.apache.shardingsphere.agent.metrics.prometheus.collector;
import io.prometheus.client.Collector;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import org.apache.shardingsphere.transaction.context.TransactionContexts;
import org.junit.Test;
import java.util.List;
import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
public final class ProxyInfoCollectorTest {
@Test
public void assertCollect() {
+ ProxyContext.getInstance().getContextManager().init(mock(MetaDataContexts.class), mock(TransactionContexts.class), new InstanceContext(new ComputeNodeInstance()));
ProxyInfoCollector proxyInfoCollector = new ProxyInfoCollector();
List<Collector.MetricFamilySamples> metricFamilySamples = proxyInfoCollector.collect();
assertFalse(metricFamilySamples.isEmpty());
diff --git a/shardingsphere-agent/shardingsphere-agent-plugins/shardingsphere-agent-plugin-metrics/shardingsphere-agent-metrics-prometheus/src/test/java/org/apache/shardingsphere/agent/metrics/prometheus/service/PrometheusPluginBootServiceTest.java b/shardingsphere-agent/shardingsphere-agent-plugins/shardingsphere-agent-plugin-metrics/shardingsphere-agent-metrics-prometheus/src/test/java/org/apache/shardingsphere/agent/metrics/prometheus/service/PrometheusPluginBootServiceTest.java
index 928adf4..d4cd5c6 100644
--- a/shardingsphere-agent/shardingsphere-agent-plugins/shardingsphere-agent-plugin-metrics/shardingsphere-agent-metrics-prometheus/src/test/java/org/apache/shardingsphere/agent/metrics/prometheus/service/PrometheusPluginBootServiceTest.java
+++ b/shardingsphere-agent/shardingsphere-agent-plugins/shardingsphere-agent-plugin-metrics/shardingsphere-agent-metrics-prometheus/src/test/java/org/apache/shardingsphere/agent/metrics/prometheus/service/PrometheusPluginBootServiceTest.java
@@ -20,6 +20,11 @@ package org.apache.shardingsphere.agent.metrics.prometheus.service;
import io.prometheus.client.exporter.HTTPServer;
import lombok.SneakyThrows;
import org.apache.shardingsphere.agent.config.PluginConfiguration;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import org.apache.shardingsphere.transaction.context.TransactionContexts;
import org.junit.AfterClass;
import org.junit.Test;
@@ -29,6 +34,7 @@ import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
public final class PrometheusPluginBootServiceTest {
@@ -37,6 +43,7 @@ public final class PrometheusPluginBootServiceTest {
@SneakyThrows
@Test
public void assertStart() {
+ ProxyContext.getInstance().getContextManager().init(mock(MetaDataContexts.class), mock(TransactionContexts.class), new InstanceContext(new ComputeNodeInstance()));
Properties props = new Properties();
props.setProperty("JVM_INFORMATION_COLLECTOR_ENABLED", "true");
PluginConfiguration configuration = new PluginConfiguration("localhost", 8090, "", props);
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
new file mode 100644
index 0000000..0e3fa0f
--- /dev/null
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.instance;
+
+import lombok.Getter;
+import org.apache.shardingsphere.infra.state.StateContext;
+import org.apache.shardingsphere.infra.state.StateType;
+
+import java.util.Collection;
+
+/**
+ * Instance context.
+ */
+@Getter
+public final class InstanceContext {
+
+ private final ComputeNodeInstance instance;
+
+ private final StateContext state = new StateContext();
+
+ public InstanceContext(final ComputeNodeInstance instance) {
+ this.instance = instance;
+ switchInstanceState(instance.getStatus());
+ }
+
+ /**
+ * Update instance status.
+ *
+ * @param status collection of status
+ */
+ public void updateInstanceStatus(final Collection<String> status) {
+ instance.setStatus(status);
+ switchInstanceState(status);
+ }
+
+ private void switchInstanceState(final Collection<String> status) {
+ state.switchState(StateType.CIRCUIT_BREAK, null != status && status.contains(StateType.CIRCUIT_BREAK.name()));
+ }
+}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/state/StateContext.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/state/StateContext.java
index addff4a..f1f4216 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/state/StateContext.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/state/StateContext.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.infra.state;
import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import java.util.Collections;
import java.util.Deque;
@@ -32,21 +31,18 @@ public final class StateContext {
private final Deque<StateType> currentState = new ConcurrentLinkedDeque<>(Collections.singleton(StateType.OK));
- public StateContext() {
- ShardingSphereEventBus.getInstance().register(this);
- }
-
/**
* Switch state.
*
- * @param event state event
+ * @param type state type
+ * @param on true if state type is valid, false if not
*/
@Subscribe
- public void switchState(final StateEvent event) {
- if (event.isOn()) {
- currentState.push(event.getType());
+ public void switchState(final StateType type, final boolean on) {
+ if (on) {
+ currentState.push(type);
} else {
- if (getCurrentState().equals(event.getType())) {
+ if (getCurrentState().equals(type)) {
recoverState();
}
}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/state/StateEvent.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/state/StateEvent.java
index d58e49a..d2f9dff 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/state/StateEvent.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/state/StateEvent.java
@@ -20,6 +20,8 @@ package org.apache.shardingsphere.infra.state;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import java.util.Collection;
+
/**
* State event.
*/
@@ -27,7 +29,7 @@ import lombok.RequiredArgsConstructor;
@Getter
public final class StateEvent {
- private final StateType type;
+ private final String instanceId;
- private final boolean on;
+ private final Collection<String> status;
}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/state/StateContextTest.java b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/state/StateContextTest.java
index 78f652b..dbc13a2 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/state/StateContextTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/state/StateContextTest.java
@@ -28,23 +28,23 @@ public final class StateContextTest {
@Test
public void assertSwitchStateWithCircuitBreakOn() {
- stateContext.switchState(new StateEvent(StateType.CIRCUIT_BREAK, true));
+ stateContext.switchState(StateType.CIRCUIT_BREAK, true);
assertThat(stateContext.getCurrentState(), is(StateType.CIRCUIT_BREAK));
- stateContext.switchState(new StateEvent(StateType.CIRCUIT_BREAK, false));
+ stateContext.switchState(StateType.CIRCUIT_BREAK, false);
}
@Test
public void assertSwitchStateWithCircuitBreakOff() {
- stateContext.switchState(new StateEvent(StateType.CIRCUIT_BREAK, false));
+ stateContext.switchState(StateType.CIRCUIT_BREAK, false);
assertThat(stateContext.getCurrentState(), is(StateType.OK));
}
@Test
public void assertSwitchStateWithMultiState() {
- stateContext.switchState(new StateEvent(StateType.CIRCUIT_BREAK, true));
- stateContext.switchState(new StateEvent(StateType.LOCK, true));
+ stateContext.switchState(StateType.CIRCUIT_BREAK, true);
+ stateContext.switchState(StateType.LOCK, true);
assertThat(stateContext.getCurrentState(), is(StateType.LOCK));
- stateContext.switchState(new StateEvent(StateType.LOCK, false));
+ stateContext.switchState(StateType.LOCK, false);
assertThat(stateContext.getCurrentState(), is(StateType.CIRCUIT_BREAK));
}
}
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/state/DriverStateContext.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/state/DriverStateContext.java
index 4bcd1d1..342b803 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/state/DriverStateContext.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/state/DriverStateContext.java
@@ -45,6 +45,6 @@ public final class DriverStateContext {
* @return connection
*/
public static Connection getConnection(final String schemaName, final ContextManager contextManager) {
- return STATES.get(contextManager.getStateContext().getCurrentState().name()).getConnection(schemaName, contextManager);
+ return STATES.get(contextManager.getInstanceContext().getState().getCurrentState().name()).getConnection(schemaName, contextManager);
}
}
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSourceTest.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSourceTest.java
index dd03929..91f0d68 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSourceTest.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSourceTest.java
@@ -60,7 +60,7 @@ public final class ShardingSphereDataSourceTest {
assertNotNull(actual.getContextManager());
assertTrue(actual.getContextManager().getMetaDataContexts().getMetaDataMap().containsKey(DefaultSchema.LOGIC_NAME));
assertTrue(actual.getContextManager().getTransactionContexts().getEngines().containsKey(DefaultSchema.LOGIC_NAME));
- assertThat(actual.getContextManager().getStateContext().getCurrentState(), is(StateType.OK));
+ assertThat(actual.getContextManager().getInstanceContext().getState().getCurrentState(), is(StateType.OK));
assertThat(actual.getContextManager().getDataSourceMap(DefaultSchema.LOGIC_NAME).size(), is(0));
}
@@ -71,7 +71,7 @@ public final class ShardingSphereDataSourceTest {
assertNotNull(actual.getContextManager());
assertTrue(actual.getContextManager().getMetaDataContexts().getMetaDataMap().containsKey(DefaultSchema.LOGIC_NAME));
assertTrue(actual.getContextManager().getTransactionContexts().getEngines().containsKey(DefaultSchema.LOGIC_NAME));
- assertThat(actual.getContextManager().getStateContext().getCurrentState(), is(StateType.OK));
+ assertThat(actual.getContextManager().getInstanceContext().getState().getCurrentState(), is(StateType.OK));
assertThat(actual.getContextManager().getDataSourceMap(DefaultSchema.LOGIC_NAME).size(), is(1));
DataSource ds = actual.getContextManager().getDataSourceMap(DefaultSchema.LOGIC_NAME).get("ds");
assertNotNull(ds);
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/state/DriverStateContextTest.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/state/DriverStateContextTest.java
index 1acbe6b..864a096 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/state/DriverStateContextTest.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/state/DriverStateContextTest.java
@@ -46,7 +46,7 @@ public final class DriverStateContextTest {
@Before
public void setUp() {
when(contextManager.getMetaDataContexts()).thenReturn(new MetaDataContexts(mock(MetaDataPersistService.class)));
- when(contextManager.getStateContext()).thenReturn(new StateContext());
+ when(contextManager.getInstanceContext().getState()).thenReturn(new StateContext());
}
@Test
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index ad84a95..1cd3dc2 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.infra.config.properties.ConfigurationProperties
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.federation.optimizer.context.planner.OptimizerPlannerContextFactory;
import org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationSchemaMetaData;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.resource.ShardingSphereResource;
import org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
@@ -38,7 +39,6 @@ import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.builder.global.GlobalRulesBuilder;
import org.apache.shardingsphere.infra.rule.builder.schema.SchemaRulesBuilder;
-import org.apache.shardingsphere.infra.state.StateContext;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsBuilder;
import org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
@@ -69,17 +69,19 @@ public final class ContextManager implements AutoCloseable {
private volatile TransactionContexts transactionContexts = new TransactionContexts();
- private final StateContext stateContext = new StateContext();
+ private volatile InstanceContext instanceContext;
/**
* Initialize context manager.
*
* @param metaDataContexts meta data contexts
* @param transactionContexts transaction contexts
+ * @param instanceContext instance context
*/
- public void init(final MetaDataContexts metaDataContexts, final TransactionContexts transactionContexts) {
+ public void init(final MetaDataContexts metaDataContexts, final TransactionContexts transactionContexts, final InstanceContext instanceContext) {
this.metaDataContexts = metaDataContexts;
this.transactionContexts = transactionContexts;
+ this.instanceContext = instanceContext;
}
/**
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
index 1fa2e27..483af04 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
@@ -103,14 +103,22 @@ public final class ComputeNodePersistService {
Collection<ComputeNodeInstance> result = new ArrayList<>();
Arrays.stream(InstanceType.values()).forEach(instanceType -> {
Collection<String> onlineComputeNodes = repository.getChildrenKeys(ComputeNode.getOnlineNodePath(instanceType));
- onlineComputeNodes.forEach(each -> {
- ComputeNodeInstance instance = new ComputeNodeInstance();
- instance.setInstanceDefinition(new InstanceDefinition(instanceType, each));
- instance.setLabels(loadInstanceLabels(each));
- instance.setStatus(loadInstanceStatus(each));
- result.add(instance);
- });
+ onlineComputeNodes.forEach(each -> result.add(loadComputeNodeInstance(new InstanceDefinition(instanceType, each))));
});
return result;
}
+
+ /**
+ * Load compute node instance by instance definition.
+ *
+ * @param instanceDefinition instance definition
+ * @return compute node instance
+ */
+ public ComputeNodeInstance loadComputeNodeInstance(final InstanceDefinition instanceDefinition) {
+ ComputeNodeInstance result = new ComputeNodeInstance();
+ result.setInstanceDefinition(instanceDefinition);
+ result.setLabels(loadInstanceLabels(instanceDefinition.getInstanceId().getId()));
+ result.setStatus(loadInstanceStatus(instanceDefinition.getInstanceId().getId()));
+ return result;
+ }
}
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
index b08876c..437bad4 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.infra.federation.optimizer.context.parser.Optim
import org.apache.shardingsphere.infra.federation.optimizer.context.planner.OptimizerPlannerContext;
import org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationMetaData;
import org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationSchemaMetaData;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.resource.CachedDatabaseMetaData;
import org.apache.shardingsphere.infra.metadata.resource.DataSourcesMetaData;
@@ -87,12 +88,15 @@ public final class ContextManagerTest {
@Mock
private TransactionContexts transactionContexts;
+ @Mock
+ private InstanceContext instanceContext;
+
private ContextManager contextManager;
@Before
public void setUp() throws SQLException {
contextManager = new ContextManager();
- contextManager.init(metaDataContexts, transactionContexts);
+ contextManager.init(metaDataContexts, transactionContexts, instanceContext);
dataSourceMap = new HashMap<>(2, 1);
DataSource primaryDataSource = mock(DataSource.class);
DataSource replicaDataSource = mock(DataSource.class);
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 94d6317..cce6a67 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.infra.config.RuleConfiguration;
import org.apache.shardingsphere.infra.config.datasource.DataSourceProperties;
import org.apache.shardingsphere.infra.config.datasource.pool.creator.DataSourcePoolCreatorUtil;
import org.apache.shardingsphere.infra.config.datasource.pool.destroyer.DataSourcePoolDestroyerFactory;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.instance.InstanceDefinition;
import org.apache.shardingsphere.infra.metadata.schema.QualifiedSchema;
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
@@ -77,13 +78,15 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
private TransactionContexts transactionContexts;
+ private InstanceContext instanceContext;
+
private ContextManager contextManager;
@Override
public ContextManager build(final ContextManagerBuilderParameter parameter) throws SQLException {
beforeBuildContextManager(parameter);
contextManager = new ContextManager();
- contextManager.init(metaDataContexts, transactionContexts);
+ contextManager.init(metaDataContexts, transactionContexts, instanceContext);
afterBuildContextManager(parameter);
return contextManager;
}
@@ -106,6 +109,7 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
metaDataContexts = new MetaDataContextsBuilder(clusterDataSources, clusterSchemaRuleConfigs, metaDataPersistService.getGlobalRuleService().load(), schemas, rules, clusterProps)
.build(metaDataPersistService);
transactionContexts = new TransactionContextsBuilder(metaDataContexts.getMetaDataMap(), metaDataContexts.getGlobalRuleMetaData().getRules()).build();
+ instanceContext = new InstanceContext(metaDataPersistService.getComputeNodePersistService().loadComputeNodeInstance(parameter.getInstanceDefinition()));
}
private void afterBuildContextManager(final ContextManagerBuilderParameter parameter) {
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index 1e2e3d0..a37965e 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.infra.metadata.schema.QualifiedSchema;
import org.apache.shardingsphere.infra.rule.event.impl.DataSourceNameDisabledEvent;
import org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceChangedEvent;
import org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
+import org.apache.shardingsphere.infra.state.StateEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.authority.event.AuthorityChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent;
@@ -171,6 +172,18 @@ public final class ClusterContextManagerCoordinator {
contextManager.alterGlobalRuleConfiguration(event.getRuleConfigurations());
}
+ /**
+ * Renew instance status.
+ *
+ * @param event state event
+ */
+ @Subscribe
+ public synchronized void renew(final StateEvent event) {
+ if (contextManager.getInstanceContext().getInstance().getInstanceDefinition().getInstanceId().getId().equals(event.getInstanceId())) {
+ contextManager.getInstanceContext().updateInstanceStatus(event.getStatus());
+ }
+ }
+
private void persistSchema(final String schemaName) {
if (!metaDataPersistService.getDataSourceService().isExisted(schemaName)) {
metaDataPersistService.getDataSourceService().persist(schemaName, new LinkedHashMap<>());
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChan [...]
index 771aa39..03dde36 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -19,10 +19,8 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
import com.google.common.base.Strings;
import org.apache.shardingsphere.infra.state.StateEvent;
-import org.apache.shardingsphere.infra.state.StateType;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -53,7 +51,7 @@ public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<S
String instanceId = ComputeNode.getInstanceIdByStatus(event.getKey());
if (!Strings.isNullOrEmpty(instanceId)) {
Collection<String> status = Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() : YamlEngine.unmarshal(event.getValue(), Collection.class);
- return Optional.of(new StateEvent(StateType.CIRCUIT_BREAK, status.contains(ComputeNodeStatus.CIRCUIT_BREAK.name())));
+ return Optional.of(new StateEvent(instanceId, status));
}
return Optional.empty();
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeState [...]
index 925b1fb..c12c494 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
@@ -27,7 +27,8 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Optional;
-import static org.junit.Assert.assertFalse;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public final class ComputeNodeStateChangedWatcherTest {
@@ -37,13 +38,15 @@ public final class ComputeNodeStateChangedWatcherTest {
Optional<StateEvent> actual = new ComputeNodeStateChangedWatcher().createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/attributes/127.0.0.1@3307/status",
YamlEngine.marshal(Arrays.asList(ComputeNodeStatus.CIRCUIT_BREAK.name())), Type.ADDED));
assertTrue(actual.isPresent());
- assertTrue(actual.get().isOn());
+ assertThat(actual.get().getStatus(), is(Arrays.asList(ComputeNodeStatus.CIRCUIT_BREAK.name())));
+ assertThat(actual.get().getInstanceId(), is("127.0.0.1@3307"));
}
@Test
public void assertCreateEventWhenDisabled() {
Optional<StateEvent> actual = new ComputeNodeStateChangedWatcher().createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/attributes/127.0.0.1@3307/status", "", Type.UPDATED));
assertTrue(actual.isPresent());
- assertFalse(actual.get().isOn());
+ assertTrue(actual.get().getStatus().isEmpty());
+ assertThat(actual.get().getInstanceId(), is("127.0.0.1@3307"));
}
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/MemoryContextManagerBuilder.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/MemoryContextManagerBuilder.java
index 7999af2..5080281 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/MemoryContextManagerBuilder.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/MemoryContextManagerBuilder.java
@@ -17,6 +17,8 @@
package org.apache.shardingsphere.mode.manager.memory;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.schema.loader.SchemaLoader;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
@@ -46,10 +48,17 @@ public final class MemoryContextManagerBuilder implements ContextManagerBuilder
schemas, rules, parameter.getProps()).build(null);
TransactionContexts transactionContexts = new TransactionContextsBuilder(metaDataContexts.getMetaDataMap(), metaDataContexts.getGlobalRuleMetaData().getRules()).build();
ContextManager result = new ContextManager();
- result.init(metaDataContexts, transactionContexts);
+ result.init(metaDataContexts, transactionContexts, buildInstanceContext(parameter));
return result;
}
+ private InstanceContext buildInstanceContext(final ContextManagerBuilderParameter parameter) {
+ ComputeNodeInstance instance = new ComputeNodeInstance();
+ instance.setInstanceDefinition(parameter.getInstanceDefinition());
+ instance.setLabels(parameter.getLabels());
+ return new InstanceContext(instance);
+ }
+
@Override
public String getType() {
return "Memory";
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
index 30a8d1f..e4fff3f 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.infra.config.datasource.DataSourceProperties;
import org.apache.shardingsphere.infra.config.datasource.pool.creator.DataSourcePoolCreatorUtil;
import org.apache.shardingsphere.infra.config.datasource.pool.destroyer.DataSourcePoolDestroyerFactory;
import org.apache.shardingsphere.infra.config.mode.PersistRepositoryConfiguration;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.schema.loader.SchemaLoader;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
@@ -79,7 +80,7 @@ public final class StandaloneContextManagerBuilder implements ContextManagerBuil
rules, standaloneProps).build(metaDataPersistService);
TransactionContexts transactionContexts = new TransactionContextsBuilder(metaDataContexts.getMetaDataMap(), metaDataContexts.getGlobalRuleMetaData().getRules()).build();
ContextManager result = new ContextManager();
- result.init(metaDataContexts, transactionContexts);
+ result.init(metaDataContexts, transactionContexts, new InstanceContext(metaDataPersistService.getComputeNodePersistService().loadComputeNodeInstance(parameter.getInstanceDefinition())));
return result;
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/context/ProxyContext.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/context/ProxyContext.java
index 9ac18a1..5d02e89 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/context/ProxyContext.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/context/ProxyContext.java
@@ -100,7 +100,7 @@ public final class ProxyContext {
* @return state context
*/
public StateContext getStateContext() {
- return contextManager.getStateContext();
+ return contextManager.getInstanceContext().getState();
}
/**