You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/06/17 06:10:01 UTC
[shardingsphere] branch master updated: Add new subscriber for new metadata structure (#26398)
This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 10b9bb9e530 Add new subscriber for new metadata structure (#26398)
10b9bb9e530 is described below
commit 10b9bb9e5307d58232143e53d0bb3249c1d9c744
Author: ChenJiaHao <Pa...@163.com>
AuthorDate: Sat Jun 17 14:09:54 2023 +0800
Add new subscriber for new metadata structure (#26398)
---
.../cluster/NewClusterContextManagerBuilder.java | 14 +-
.../cluster/coordinator/NewRegistryCenter.java | 129 ++++++++++++++++++
.../registry/NewGovernanceWatcherFactory.java | 60 +++++++++
.../NewProcessListChangedSubscriber.java | 115 ++++++++++++++++
.../subscriber/NewComputeNodeStatusSubscriber.java | 69 ++++++++++
.../generator/NewClusterWorkerIdGenerator.java | 95 ++++++++++++++
.../NewContextManagerSubscriberFacade.java | 10 +-
.../subscriber/NewStateChangedSubscriber.java | 146 +++++++++++++++++++++
8 files changed, 626 insertions(+), 12 deletions(-)
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/NewClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/NewClusterContextManagerBuilder.java
index 971e8d31763..27a9992051e 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/NewClusterContextManagerBuilder.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/NewClusterContextManagerBuilder.java
@@ -31,8 +31,8 @@ import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerAware;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.NewRegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.NewClusterWorkerIdGenerator;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.NewContextManagerSubscriberFacade;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.NewMetaDataContextsFactory;
@@ -50,7 +50,7 @@ public final class NewClusterContextManagerBuilder implements ContextManagerBuil
@Override
public ContextManager build(final ContextManagerBuilderParameter param) throws SQLException {
ClusterPersistRepository repository = getClusterPersistRepository((ClusterPersistRepositoryConfiguration) param.getModeConfiguration().getRepository());
- RegistryCenter registryCenter = new RegistryCenter(repository, new EventBusContext(), param.getInstanceMetaData(), param.getDatabaseConfigs());
+ NewRegistryCenter registryCenter = new NewRegistryCenter(repository, new EventBusContext(), param.getInstanceMetaData(), param.getDatabaseConfigs());
InstanceContext instanceContext = buildInstanceContext(registryCenter, param);
if (registryCenter.getRepository() instanceof InstanceContextAware) {
((InstanceContextAware) registryCenter.getRepository()).setInstanceContext(instanceContext);
@@ -70,8 +70,8 @@ public final class NewClusterContextManagerBuilder implements ContextManagerBuil
return result;
}
- private InstanceContext buildInstanceContext(final RegistryCenter registryCenter, final ContextManagerBuilderParameter param) {
- return new InstanceContext(new ComputeNodeInstance(param.getInstanceMetaData()), new ClusterWorkerIdGenerator(registryCenter, param.getInstanceMetaData()),
+ private InstanceContext buildInstanceContext(final NewRegistryCenter registryCenter, final ContextManagerBuilderParameter param) {
+ return new InstanceContext(new ComputeNodeInstance(param.getInstanceMetaData()), new NewClusterWorkerIdGenerator(registryCenter, param.getInstanceMetaData()),
param.getModeConfiguration(), new NewClusterModeContextManager(), new GlobalLockContext(registryCenter.getGlobalLockPersistService()), registryCenter.getEventBusContext());
}
@@ -79,7 +79,7 @@ public final class NewClusterContextManagerBuilder implements ContextManagerBuil
((ContextManagerAware) contextManager.getInstanceContext().getModeContextManager()).setContextManagerAware(contextManager);
}
- private void registerOnline(final RegistryCenter registryCenter, final ContextManagerBuilderParameter param, final ContextManager contextManager) {
+ private void registerOnline(final NewRegistryCenter registryCenter, final ContextManagerBuilderParameter param, final ContextManager contextManager) {
registryCenter.onlineInstance(contextManager.getInstanceContext().getInstance());
loadClusterStatus(registryCenter, contextManager);
contextManager.getInstanceContext().getInstance().setLabels(param.getLabels());
@@ -88,7 +88,7 @@ public final class NewClusterContextManagerBuilder implements ContextManagerBuil
new NewContextManagerSubscriberFacade(registryCenter, contextManager);
}
- private void loadClusterStatus(final RegistryCenter registryCenter, final ContextManager contextManager) {
+ private void loadClusterStatus(final NewRegistryCenter registryCenter, final ContextManager contextManager) {
registryCenter.persistClusterState(contextManager);
contextManager.updateClusterState(registryCenter.getClusterStatusService().loadClusterStatus());
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/NewRegistryCenter.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/NewRegistryCenter.java
new file mode 100644
index 00000000000..b3ee76c8cee
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/NewRegistryCenter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator;
+
+import com.google.common.base.Strings;
+import lombok.Getter;
+import org.apache.shardingsphere.infra.config.database.DatabaseConfiguration;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
+import org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.NewGovernanceWatcherFactory;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber.ShardingSphereSchemaDataRegistrySubscriber;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ClusterProcessSubscriber;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.service.ClusterStatusService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber.ClusterStatusSubscriber;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber.NewComputeNodeStatusSubscriber;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.service.StorageNodeStatusService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.StorageNodeStatusSubscriber;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
+import org.apache.shardingsphere.mode.repository.cluster.lock.impl.props.DefaultLockTypedProperties;
+
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * TODO replace the old RegistryCenter after meta data refactor completed
+ * New registry center.
+ */
+public final class NewRegistryCenter {
+
+ @Getter
+ private final ClusterPersistRepository repository;
+
+ @Getter
+ private final StorageNodeStatusService storageNodeStatusService;
+
+ @Getter
+ private final ClusterStatusService clusterStatusService;
+
+ @Getter
+ private final ComputeNodeStatusService computeNodeStatusService;
+
+ @Getter
+ private final GlobalLockPersistService globalLockPersistService;
+
+ @Getter
+ private final EventBusContext eventBusContext;
+
+ private final InstanceMetaData instanceMetaData;
+
+ private final Map<String, DatabaseConfiguration> databaseConfigs;
+
+ private final NewGovernanceWatcherFactory listenerFactory;
+
+ public NewRegistryCenter(final ClusterPersistRepository repository, final EventBusContext eventBusContext,
+ final InstanceMetaData instanceMetaData, final Map<String, DatabaseConfiguration> databaseConfigs) {
+ this.repository = repository;
+ this.eventBusContext = eventBusContext;
+ this.instanceMetaData = instanceMetaData;
+ this.databaseConfigs = databaseConfigs;
+ storageNodeStatusService = new StorageNodeStatusService(repository);
+ clusterStatusService = new ClusterStatusService(repository);
+ computeNodeStatusService = new ComputeNodeStatusService(repository);
+ globalLockPersistService = new GlobalLockPersistService(initDistributedLockHolder(repository));
+ listenerFactory = new NewGovernanceWatcherFactory(repository, eventBusContext, getJDBCDatabaseName());
+ createSubscribers(repository);
+ }
+
+ private DistributedLockHolder initDistributedLockHolder(final ClusterPersistRepository repository) {
+ DistributedLockHolder distributedLockHolder = repository.getDistributedLockHolder();
+ return null == distributedLockHolder ? new DistributedLockHolder("default", repository, new DefaultLockTypedProperties(new Properties())) : distributedLockHolder;
+ }
+
+ private String getJDBCDatabaseName() {
+ return instanceMetaData instanceof JDBCInstanceMetaData ? databaseConfigs.keySet().stream().findFirst().orElse(null) : null;
+ }
+
+ private void createSubscribers(final ClusterPersistRepository repository) {
+ new NewComputeNodeStatusSubscriber(this, repository);
+ new ClusterStatusSubscriber(repository, eventBusContext);
+ new StorageNodeStatusSubscriber(repository, eventBusContext);
+ new ClusterProcessSubscriber(repository, eventBusContext);
+ new ShardingSphereSchemaDataRegistrySubscriber(repository, globalLockPersistService, eventBusContext);
+ }
+
+ /**
+ * Online instance.
+ *
+ * @param computeNodeInstance compute node instance
+ */
+ public void onlineInstance(final ComputeNodeInstance computeNodeInstance) {
+ computeNodeStatusService.registerOnline(computeNodeInstance.getMetaData());
+ computeNodeStatusService.persistInstanceLabels(computeNodeInstance.getCurrentInstanceId(), computeNodeInstance.getLabels());
+ computeNodeStatusService.persistInstanceState(computeNodeInstance.getCurrentInstanceId(), computeNodeInstance.getState());
+ listenerFactory.watchListeners();
+ }
+
+ /**
+ * Persist cluster state.
+ *
+ * @param contextManager context manager
+ */
+ public void persistClusterState(final ContextManager contextManager) {
+ if (Strings.isNullOrEmpty(repository.getDirectly(ComputeNode.getClusterStatusNodePath()))) {
+ clusterStatusService.persistClusterState(contextManager.getClusterStateContext());
+ }
+ }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/NewGovernanceWatcherFactory.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/NewGovernanceWatcherFactory.java
new file mode 100644
index 00000000000..8a4b535ecda
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/NewGovernanceWatcherFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+/**
+ * TODO replace the old GovernanceWatcherFactory after meta data refactor completed
+ * New governance watcher factory.
+ */
+@RequiredArgsConstructor
+public final class NewGovernanceWatcherFactory {
+
+ private final ClusterPersistRepository repository;
+
+ private final EventBusContext eventBusContext;
+
+ private final String databaseName;
+
+ /**
+ * Watch listeners.
+ */
+ public void watchListeners() {
+ for (NewGovernanceWatcher<?> each : ShardingSphereServiceLoader.getServiceInstances(NewGovernanceWatcher.class)) {
+ watch(each);
+ }
+ }
+
+ private void watch(final NewGovernanceWatcher<?> listener) {
+ for (String each : listener.getWatchingKeys(databaseName)) {
+ watch(each, listener);
+ }
+ }
+
+ private void watch(final String watchingKey, final NewGovernanceWatcher<?> listener) {
+ repository.watch(watchingKey, dataChangedEventListener -> {
+ if (listener.getWatchingTypes().contains(dataChangedEventListener.getType())) {
+ listener.createGovernanceEvent(dataChangedEventListener).ifPresent(eventBusContext::post);
+ }
+ });
+ }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/NewProcessListChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/NewProcessListChangedSubscriber.java
new file mode 100644
index 00000000000..f6aba3dd3d0
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/NewProcessListChangedSubscriber.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.infra.executor.sql.process.Process;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
+import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
+import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
+import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.NewRegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessCompletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesEvent;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+
+/**
+ * TODO replace the old ProcessListChangedSubscriber after meta data refactor completed
+ * New process list changed subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class NewProcessListChangedSubscriber {
+
+ private final NewRegistryCenter registryCenter;
+
+ private final ContextManager contextManager;
+
+ private final YamlProcessListSwapper swapper = new YamlProcessListSwapper();
+
+ public NewProcessListChangedSubscriber(final NewRegistryCenter registryCenter, final ContextManager contextManager) {
+ this.registryCenter = registryCenter;
+ this.contextManager = contextManager;
+ contextManager.getInstanceContext().getEventBusContext().register(this);
+ }
+
+ /**
+ * Report local processes.
+ *
+ * @param event show process list trigger event
+ */
+ @Subscribe
+ public void reportLocalProcesses(final ReportLocalProcessesEvent event) {
+ if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
+ return;
+ }
+ Collection<Process> processes = ProcessRegistry.getInstance().listAll();
+ if (!processes.isEmpty()) {
+ registryCenter.getRepository().persist(
+ ProcessNode.getProcessListInstancePath(event.getTaskId(), event.getInstanceId()), YamlEngine.marshal(swapper.swapToYamlConfiguration(processes)));
+ }
+ registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(), event.getTaskId()));
+ }
+
+ /**
+ * Complete to report local processes.
+ *
+ * @param event report local processes completed event
+ */
+ @Subscribe
+ public synchronized void completeToReportLocalProcesses(final ReportLocalProcessesCompletedEvent event) {
+ ProcessOperationLockRegistry.getInstance().notify(event.getTaskId());
+ }
+
+ /**
+ * Kill local process.
+ *
+ * @param event kill local process event
+ * @throws SQLException SQL exception
+ */
+ @Subscribe
+ public synchronized void killLocalProcess(final KillLocalProcessEvent event) throws SQLException {
+ if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
+ return;
+ }
+ Process process = ProcessRegistry.getInstance().get(event.getProcessId());
+ if (null != process) {
+ for (Statement each : process.getProcessStatements()) {
+ each.cancel();
+ }
+ }
+ registryCenter.getRepository().delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(), event.getProcessId()));
+ }
+
+ /**
+ * Complete to kill local process.
+ *
+ * @param event kill local process completed event
+ */
+ @Subscribe
+ public synchronized void completeToKillLocalProcess(final KillLocalProcessCompletedEvent event) {
+ ProcessOperationLockRegistry.getInstance().notify(event.getProcessId());
+ }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/NewComputeNodeStatusSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/NewComputeNodeStatusSubscriber.java
new file mode 100644
index 00000000000..53dee90fc41
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/NewComputeNodeStatusSubscriber.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
+import org.apache.shardingsphere.mode.event.compute.ComputeNodeStatusChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.NewRegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsChangedEvent;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+import java.util.Collections;
+
+/**
+ * TODO replace the old ComputeNodeStatusSubscriber after meta data refactor completed
+ * New compute node status subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class NewComputeNodeStatusSubscriber {
+
+ private final NewRegistryCenter registryCenter;
+
+ private final ClusterPersistRepository repository;
+
+ public NewComputeNodeStatusSubscriber(final NewRegistryCenter registryCenter, final ClusterPersistRepository repository) {
+ this.registryCenter = registryCenter;
+ this.repository = repository;
+ registryCenter.getEventBusContext().register(this);
+ }
+
+ /**
+ * Update compute node status.
+ *
+ * @param event compute node status changed event
+ */
+ @Subscribe
+ public void update(final ComputeNodeStatusChangedEvent event) {
+ repository.persistEphemeral(ComputeNode.getInstanceStatusNodePath(event.getInstanceId()), event.getState().name());
+ }
+
+ /**
+ * Update compute node labels.
+ *
+ * @param event labels changed event
+ */
+ @Subscribe
+ public void update(final LabelsChangedEvent event) {
+ if (event.getLabels().isEmpty()) {
+ registryCenter.getComputeNodeStatusService().persistInstanceLabels(event.getInstanceId(), Collections.emptyList());
+ } else {
+ registryCenter.getComputeNodeStatusService().persistInstanceLabels(event.getInstanceId(), event.getLabels());
+ }
+ }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/NewClusterWorkerIdGenerator.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/NewClusterWorkerIdGenerator.java
new file mode 100644
index 00000000000..0dcf218d21a
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/NewClusterWorkerIdGenerator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator;
+
+import com.google.common.base.Preconditions;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
+import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
+import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.NewRegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.exception.WorkIdAssignedException;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.node.WorkerIdNode;
+import org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Properties;
+
+/**
+ * TODO replace the old implementation after meta data refactor completed
+ * New worker id generator for cluster mode.
+ */
+@RequiredArgsConstructor
+@Slf4j
+public final class NewClusterWorkerIdGenerator implements WorkerIdGenerator {
+
+ private final NewRegistryCenter registryCenter;
+
+ private final InstanceMetaData instanceMetaData;
+
+ private volatile boolean isWarned;
+
+ @Override
+ public int generate(final Properties props) {
+ int result = registryCenter.getComputeNodeStatusService().loadInstanceWorkerId(instanceMetaData.getId()).orElseGet(this::reGenerate);
+ checkIneffectiveConfiguration(result, props);
+ return result;
+ }
+
+ private Integer reGenerate() {
+ Optional<Integer> result;
+ do {
+ result = generateAvailableWorkerId();
+ } while (!result.isPresent());
+ Integer generatedWorkId = result.get();
+ registryCenter.getComputeNodeStatusService().persistInstanceWorkerId(instanceMetaData.getId(), generatedWorkId);
+ return generatedWorkId;
+ }
+
+ private Optional<Integer> generateAvailableWorkerId() {
+ Collection<Integer> assignedWorkerIds = registryCenter.getComputeNodeStatusService().getAssignedWorkerIds();
+ ShardingSpherePreconditions.checkState(assignedWorkerIds.size() <= 1024, WorkIdAssignedException::new);
+ Collection<Integer> availableWorkerIds = new LinkedList<>();
+ for (int i = 0; i < 1024; i++) {
+ availableWorkerIds.add(i);
+ }
+ PriorityQueue<Integer> priorityQueue = new PriorityQueue<>(availableWorkerIds);
+ for (Integer each : assignedWorkerIds) {
+ priorityQueue.remove(each);
+ }
+ Integer preselectedWorkerId = priorityQueue.poll();
+ Preconditions.checkState(null != preselectedWorkerId, "Preselected worker-id can not be null.");
+ try {
+ registryCenter.getRepository().persistExclusiveEphemeral(WorkerIdNode.getWorkerIdGeneratorPath(preselectedWorkerId.toString()), instanceMetaData.getId());
+ return Optional.of(preselectedWorkerId);
+ } catch (final ClusterPersistRepositoryException ignore) {
+ return Optional.empty();
+ }
+ }
+
+ private void checkIneffectiveConfiguration(final long generatedWorkerId, final Properties props) {
+ if (!isWarned && null != props && props.containsKey(WORKER_ID_KEY)) {
+ isWarned = true;
+ log.warn("No need to configured {} in cluster mode, system assigned {} was {}", WORKER_ID_KEY, WORKER_ID_KEY, generatedWorkerId);
+ }
+ }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/NewContextManagerSubscriberFacade.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/NewContextManagerSubscriberFacade.java
index 5159fd6c874..1672cf8390c 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/NewContextManagerSubscriberFacade.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/NewContextManagerSubscriberFacade.java
@@ -18,8 +18,8 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ProcessListChangedSubscriber;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.NewRegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.NewProcessListChangedSubscriber;
/**
* TODO replace the old implementation after meta data refactor completed
@@ -27,13 +27,13 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.proce
*/
public final class NewContextManagerSubscriberFacade {
- public NewContextManagerSubscriberFacade(final RegistryCenter registryCenter, final ContextManager contextManager) {
+ public NewContextManagerSubscriberFacade(final NewRegistryCenter registryCenter, final ContextManager contextManager) {
new NewConfigurationChangedSubscriber(contextManager);
new NewDataSourceChangedSubscriber(contextManager);
new ResourceMetaDataChangedSubscriber(contextManager);
new DatabaseChangedSubscriber(contextManager);
- new StateChangedSubscriber(registryCenter, contextManager);
- new ProcessListChangedSubscriber(registryCenter, contextManager);
+ new NewStateChangedSubscriber(registryCenter, contextManager);
+ new NewProcessListChangedSubscriber(registryCenter, contextManager);
new CacheEvictedSubscriber(contextManager.getInstanceContext().getEventBusContext());
}
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/NewStateChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/NewStateChangedSubscriber.java
new file mode 100644
index 00000000000..a64afbfd991
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/NewStateChangedSubscriber.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.infra.datasource.state.DataSourceState;
+import org.apache.shardingsphere.infra.datasource.state.DataSourceStateManager;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
+import org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
+import org.apache.shardingsphere.mode.event.storage.StorageNodeDataSourceChangedEvent;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.NewRegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterLockDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStatusChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
+
+import java.util.Optional;
+
+/**
+ * TODO replace the old StateChangedSubscriber after meta data refactor completed
+ * New state changed subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class NewStateChangedSubscriber {
+
+ private final NewRegistryCenter registryCenter;
+
+ private final ContextManager contextManager;
+
+ public NewStateChangedSubscriber(final NewRegistryCenter registryCenter, final ContextManager contextManager) {
+ this.registryCenter = registryCenter;
+ this.contextManager = contextManager;
+ contextManager.getInstanceContext().getEventBusContext().register(this);
+ }
+
+ /**
+ * Renew disabled data source names.
+ *
+ * @param event Storage node changed event
+ */
+ @Subscribe
+ public synchronized void renew(final StorageNodeChangedEvent event) {
+ if (!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getQualifiedDatabase().getDatabaseName())) {
+ return;
+ }
+ QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
+ Optional<StaticDataSourceContainedRule> staticDataSourceRule = contextManager.getMetaDataContexts()
+ .getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData().findSingleRule(StaticDataSourceContainedRule.class);
+ staticDataSourceRule.ifPresent(optional -> optional.updateStatus(new StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource())));
+ DataSourceStateManager.getInstance().updateState(
+ qualifiedDatabase.getDatabaseName(), qualifiedDatabase.getDataSourceName(), DataSourceState.valueOf(event.getDataSource().getStatus().name()));
+ }
+
+ /**
+ * Reset cluster state.
+ *
+ * @param event cluster lock deleted event
+ */
+ @Subscribe
+ public synchronized void renew(final ClusterLockDeletedEvent event) {
+ contextManager.getInstanceContext().getEventBusContext().post(new ClusterStatusChangedEvent(event.getState()));
+ }
+
+ /**
+ * Renew cluster state.
+ *
+ * @param event cluster state event
+ */
+ @Subscribe
+ public synchronized void renew(final ClusterStateEvent event) {
+ contextManager.updateClusterState(event.getStatus());
+ }
+
+ /**
+ * Renew instance status.
+ *
+ * @param event state event
+ */
+ @Subscribe
+ public synchronized void renew(final StateEvent event) {
+ contextManager.getInstanceContext().updateInstanceStatus(event.getInstanceId(), event.getStatus());
+ }
+
+ /**
+ * Renew instance worker id.
+ *
+ * @param event worker id event
+ */
+ @Subscribe
+ public synchronized void renew(final WorkerIdEvent event) {
+ contextManager.getInstanceContext().updateWorkerId(event.getInstanceId(), event.getWorkerId());
+ }
+
+ /**
+ * Renew instance labels.
+ *
+ * @param event label event
+ */
+ @Subscribe
+ public synchronized void renew(final LabelsEvent event) {
+ // TODO labels may be empty
+ contextManager.getInstanceContext().updateLabel(event.getInstanceId(), event.getLabels());
+ }
+
+ /**
+ * Renew instance list.
+ *
+ * @param event compute node online event
+ */
+ @Subscribe
+ public synchronized void renew(final InstanceOnlineEvent event) {
+ contextManager.getInstanceContext().addComputeNodeInstance(registryCenter.getComputeNodeStatusService().loadComputeNodeInstance(event.getInstanceMetaData()));
+ }
+
+ /**
+ * Renew instance list.
+ *
+ * @param event compute node offline event
+ */
+ @Subscribe
+ public synchronized void renew(final InstanceOfflineEvent event) {
+ contextManager.getInstanceContext().deleteComputeNodeInstance(new ComputeNodeInstance(event.getInstanceMetaData()));
+ }
+}