You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/06/17 06:10:01 UTC

[shardingsphere] branch master updated: Add new subscriber for new metadata structure (#26398)

This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 10b9bb9e530 Add new subscriber for new metadata structure (#26398)
10b9bb9e530 is described below

commit 10b9bb9e5307d58232143e53d0bb3249c1d9c744
Author: ChenJiaHao <Pa...@163.com>
AuthorDate: Sat Jun 17 14:09:54 2023 +0800

    Add new subscriber for new metadata structure (#26398)
---
 .../cluster/NewClusterContextManagerBuilder.java   |  14 +-
 .../cluster/coordinator/NewRegistryCenter.java     | 129 ++++++++++++++++++
 .../registry/NewGovernanceWatcherFactory.java      |  60 +++++++++
 .../NewProcessListChangedSubscriber.java           | 115 ++++++++++++++++
 .../subscriber/NewComputeNodeStatusSubscriber.java |  69 ++++++++++
 .../generator/NewClusterWorkerIdGenerator.java     |  95 ++++++++++++++
 .../NewContextManagerSubscriberFacade.java         |  10 +-
 .../subscriber/NewStateChangedSubscriber.java      | 146 +++++++++++++++++++++
 8 files changed, 626 insertions(+), 12 deletions(-)

diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/NewClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/NewClusterContextManagerBuilder.java
index 971e8d31763..27a9992051e 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/NewClusterContextManagerBuilder.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/NewClusterContextManagerBuilder.java
@@ -31,8 +31,8 @@ import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerAware;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.NewRegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.NewClusterWorkerIdGenerator;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.NewContextManagerSubscriberFacade;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.mode.metadata.NewMetaDataContextsFactory;
@@ -50,7 +50,7 @@ public final class NewClusterContextManagerBuilder implements ContextManagerBuil
     @Override
     public ContextManager build(final ContextManagerBuilderParameter param) throws SQLException {
         ClusterPersistRepository repository = getClusterPersistRepository((ClusterPersistRepositoryConfiguration) param.getModeConfiguration().getRepository());
-        RegistryCenter registryCenter = new RegistryCenter(repository, new EventBusContext(), param.getInstanceMetaData(), param.getDatabaseConfigs());
+        NewRegistryCenter registryCenter = new NewRegistryCenter(repository, new EventBusContext(), param.getInstanceMetaData(), param.getDatabaseConfigs());
         InstanceContext instanceContext = buildInstanceContext(registryCenter, param);
         if (registryCenter.getRepository() instanceof InstanceContextAware) {
             ((InstanceContextAware) registryCenter.getRepository()).setInstanceContext(instanceContext);
@@ -70,8 +70,8 @@ public final class NewClusterContextManagerBuilder implements ContextManagerBuil
         return result;
     }
     
-    private InstanceContext buildInstanceContext(final RegistryCenter registryCenter, final ContextManagerBuilderParameter param) {
-        return new InstanceContext(new ComputeNodeInstance(param.getInstanceMetaData()), new ClusterWorkerIdGenerator(registryCenter, param.getInstanceMetaData()),
+    private InstanceContext buildInstanceContext(final NewRegistryCenter registryCenter, final ContextManagerBuilderParameter param) {
+        return new InstanceContext(new ComputeNodeInstance(param.getInstanceMetaData()), new NewClusterWorkerIdGenerator(registryCenter, param.getInstanceMetaData()),
                 param.getModeConfiguration(), new NewClusterModeContextManager(), new GlobalLockContext(registryCenter.getGlobalLockPersistService()), registryCenter.getEventBusContext());
     }
     
@@ -79,7 +79,7 @@ public final class NewClusterContextManagerBuilder implements ContextManagerBuil
         ((ContextManagerAware) contextManager.getInstanceContext().getModeContextManager()).setContextManagerAware(contextManager);
     }
     
-    private void registerOnline(final RegistryCenter registryCenter, final ContextManagerBuilderParameter param, final ContextManager contextManager) {
+    private void registerOnline(final NewRegistryCenter registryCenter, final ContextManagerBuilderParameter param, final ContextManager contextManager) {
         registryCenter.onlineInstance(contextManager.getInstanceContext().getInstance());
         loadClusterStatus(registryCenter, contextManager);
         contextManager.getInstanceContext().getInstance().setLabels(param.getLabels());
@@ -88,7 +88,7 @@ public final class NewClusterContextManagerBuilder implements ContextManagerBuil
         new NewContextManagerSubscriberFacade(registryCenter, contextManager);
     }
     
-    private void loadClusterStatus(final RegistryCenter registryCenter, final ContextManager contextManager) {
+    private void loadClusterStatus(final NewRegistryCenter registryCenter, final ContextManager contextManager) {
         registryCenter.persistClusterState(contextManager);
         contextManager.updateClusterState(registryCenter.getClusterStatusService().loadClusterStatus());
     }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/NewRegistryCenter.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/NewRegistryCenter.java
new file mode 100644
index 00000000000..b3ee76c8cee
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/NewRegistryCenter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator;
+
+import com.google.common.base.Strings;
+import lombok.Getter;
+import org.apache.shardingsphere.infra.config.database.DatabaseConfiguration;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
+import org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.NewGovernanceWatcherFactory;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber.ShardingSphereSchemaDataRegistrySubscriber;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ClusterProcessSubscriber;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.service.ClusterStatusService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber.ClusterStatusSubscriber;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber.NewComputeNodeStatusSubscriber;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.service.StorageNodeStatusService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.StorageNodeStatusSubscriber;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
+import org.apache.shardingsphere.mode.repository.cluster.lock.impl.props.DefaultLockTypedProperties;
+
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * TODO replace the old RegistryCenter after meta data refactor completed
+ * New registry center.
+ */
+public final class NewRegistryCenter {
+    
+    @Getter
+    private final ClusterPersistRepository repository;
+    
+    @Getter
+    private final StorageNodeStatusService storageNodeStatusService;
+    
+    @Getter
+    private final ClusterStatusService clusterStatusService;
+    
+    @Getter
+    private final ComputeNodeStatusService computeNodeStatusService;
+    
+    @Getter
+    private final GlobalLockPersistService globalLockPersistService;
+    
+    @Getter
+    private final EventBusContext eventBusContext;
+    
+    private final InstanceMetaData instanceMetaData;
+    
+    private final Map<String, DatabaseConfiguration> databaseConfigs;
+    
+    private final NewGovernanceWatcherFactory listenerFactory;
+    
+    public NewRegistryCenter(final ClusterPersistRepository repository, final EventBusContext eventBusContext,
+                             final InstanceMetaData instanceMetaData, final Map<String, DatabaseConfiguration> databaseConfigs) {
+        this.repository = repository;
+        this.eventBusContext = eventBusContext;
+        this.instanceMetaData = instanceMetaData;
+        this.databaseConfigs = databaseConfigs;
+        storageNodeStatusService = new StorageNodeStatusService(repository);
+        clusterStatusService = new ClusterStatusService(repository);
+        computeNodeStatusService = new ComputeNodeStatusService(repository);
+        globalLockPersistService = new GlobalLockPersistService(initDistributedLockHolder(repository));
+        listenerFactory = new NewGovernanceWatcherFactory(repository, eventBusContext, getJDBCDatabaseName());
+        createSubscribers(repository);
+    }
+    
+    private DistributedLockHolder initDistributedLockHolder(final ClusterPersistRepository repository) {
+        DistributedLockHolder distributedLockHolder = repository.getDistributedLockHolder();
+        return null == distributedLockHolder ? new DistributedLockHolder("default", repository, new DefaultLockTypedProperties(new Properties())) : distributedLockHolder;
+    }
+    
+    private String getJDBCDatabaseName() {
+        return instanceMetaData instanceof JDBCInstanceMetaData ? databaseConfigs.keySet().stream().findFirst().orElse(null) : null;
+    }
+    
+    private void createSubscribers(final ClusterPersistRepository repository) {
+        new NewComputeNodeStatusSubscriber(this, repository);
+        new ClusterStatusSubscriber(repository, eventBusContext);
+        new StorageNodeStatusSubscriber(repository, eventBusContext);
+        new ClusterProcessSubscriber(repository, eventBusContext);
+        new ShardingSphereSchemaDataRegistrySubscriber(repository, globalLockPersistService, eventBusContext);
+    }
+    
+    /**
+     * Online instance.
+     * 
+     * @param computeNodeInstance compute node instance
+     */
+    public void onlineInstance(final ComputeNodeInstance computeNodeInstance) {
+        computeNodeStatusService.registerOnline(computeNodeInstance.getMetaData());
+        computeNodeStatusService.persistInstanceLabels(computeNodeInstance.getCurrentInstanceId(), computeNodeInstance.getLabels());
+        computeNodeStatusService.persistInstanceState(computeNodeInstance.getCurrentInstanceId(), computeNodeInstance.getState());
+        listenerFactory.watchListeners();
+    }
+    
+    /**
+     * Persist cluster state.
+     *
+     * @param contextManager context manager
+     */
+    public void persistClusterState(final ContextManager contextManager) {
+        if (Strings.isNullOrEmpty(repository.getDirectly(ComputeNode.getClusterStatusNodePath()))) {
+            clusterStatusService.persistClusterState(contextManager.getClusterStateContext());
+        }
+    }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/NewGovernanceWatcherFactory.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/NewGovernanceWatcherFactory.java
new file mode 100644
index 00000000000..8a4b535ecda
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/NewGovernanceWatcherFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+/**
+ * TODO replace the old GovernanceWatcherFactory after meta data refactor completed
+ * New governance watcher factory.
+ */
+@RequiredArgsConstructor
+public final class NewGovernanceWatcherFactory {
+    
+    private final ClusterPersistRepository repository;
+    
+    private final EventBusContext eventBusContext;
+    
+    private final String databaseName;
+    
+    /**
+     * Watch listeners.
+     */
+    public void watchListeners() {
+        for (NewGovernanceWatcher<?> each : ShardingSphereServiceLoader.getServiceInstances(NewGovernanceWatcher.class)) {
+            watch(each);
+        }
+    }
+    
+    private void watch(final NewGovernanceWatcher<?> listener) {
+        for (String each : listener.getWatchingKeys(databaseName)) {
+            watch(each, listener);
+        }
+    }
+    
+    private void watch(final String watchingKey, final NewGovernanceWatcher<?> listener) {
+        repository.watch(watchingKey, dataChangedEventListener -> {
+            if (listener.getWatchingTypes().contains(dataChangedEventListener.getType())) {
+                listener.createGovernanceEvent(dataChangedEventListener).ifPresent(eventBusContext::post);
+            }
+        });
+    }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/NewProcessListChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/NewProcessListChangedSubscriber.java
new file mode 100644
index 00000000000..f6aba3dd3d0
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/NewProcessListChangedSubscriber.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.infra.executor.sql.process.Process;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
+import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
+import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
+import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.NewRegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessCompletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesEvent;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+
+/**
+ * TODO replace the old ProcessListChangedSubscriber after meta data refactor completed
+ * New process list changed subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class NewProcessListChangedSubscriber {
+    
+    private final NewRegistryCenter registryCenter;
+    
+    private final ContextManager contextManager;
+    
+    private final YamlProcessListSwapper swapper = new YamlProcessListSwapper();
+    
+    public NewProcessListChangedSubscriber(final NewRegistryCenter registryCenter, final ContextManager contextManager) {
+        this.registryCenter = registryCenter;
+        this.contextManager = contextManager;
+        contextManager.getInstanceContext().getEventBusContext().register(this);
+    }
+    
+    /**
+     * Report local processes.
+     *
+     * @param event show process list trigger event
+     */
+    @Subscribe
+    public void reportLocalProcesses(final ReportLocalProcessesEvent event) {
+        if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
+            return;
+        }
+        Collection<Process> processes = ProcessRegistry.getInstance().listAll();
+        if (!processes.isEmpty()) {
+            registryCenter.getRepository().persist(
+                    ProcessNode.getProcessListInstancePath(event.getTaskId(), event.getInstanceId()), YamlEngine.marshal(swapper.swapToYamlConfiguration(processes)));
+        }
+        registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(), event.getTaskId()));
+    }
+    
+    /**
+     * Complete to report local processes.
+     *
+     * @param event report local processes completed event
+     */
+    @Subscribe
+    public synchronized void completeToReportLocalProcesses(final ReportLocalProcessesCompletedEvent event) {
+        ProcessOperationLockRegistry.getInstance().notify(event.getTaskId());
+    }
+    
+    /**
+     * Kill local process.
+     *
+     * @param event kill local process event
+     * @throws SQLException SQL exception
+     */
+    @Subscribe
+    public synchronized void killLocalProcess(final KillLocalProcessEvent event) throws SQLException {
+        if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
+            return;
+        }
+        Process process = ProcessRegistry.getInstance().get(event.getProcessId());
+        if (null != process) {
+            for (Statement each : process.getProcessStatements()) {
+                each.cancel();
+            }
+        }
+        registryCenter.getRepository().delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(), event.getProcessId()));
+    }
+    
+    /**
+     * Complete to kill local process.
+     *
+     * @param event kill local process completed event
+     */
+    @Subscribe
+    public synchronized void completeToKillLocalProcess(final KillLocalProcessCompletedEvent event) {
+        ProcessOperationLockRegistry.getInstance().notify(event.getProcessId());
+    }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/NewComputeNodeStatusSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/NewComputeNodeStatusSubscriber.java
new file mode 100644
index 00000000000..53dee90fc41
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/NewComputeNodeStatusSubscriber.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
+import org.apache.shardingsphere.mode.event.compute.ComputeNodeStatusChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.NewRegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsChangedEvent;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+import java.util.Collections;
+
+/**
+ * TODO replace the old ComputeNodeStatusSubscriber after meta data refactor completed
+ * New compute node status subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class NewComputeNodeStatusSubscriber {
+    
+    private final NewRegistryCenter registryCenter;
+    
+    private final ClusterPersistRepository repository;
+    
+    public NewComputeNodeStatusSubscriber(final NewRegistryCenter registryCenter, final ClusterPersistRepository repository) {
+        this.registryCenter = registryCenter;
+        this.repository = repository;
+        registryCenter.getEventBusContext().register(this);
+    }
+    
+    /**
+     * Update compute node status.
+     *
+     * @param event compute node status changed event
+     */
+    @Subscribe
+    public void update(final ComputeNodeStatusChangedEvent event) {
+        repository.persistEphemeral(ComputeNode.getInstanceStatusNodePath(event.getInstanceId()), event.getState().name());
+    }
+    
+    /**
+     * Update compute node labels.
+     * 
+     * @param event labels changed event
+     */
+    @Subscribe
+    public void update(final LabelsChangedEvent event) {
+        if (event.getLabels().isEmpty()) {
+            registryCenter.getComputeNodeStatusService().persistInstanceLabels(event.getInstanceId(), Collections.emptyList());
+        } else {
+            registryCenter.getComputeNodeStatusService().persistInstanceLabels(event.getInstanceId(), event.getLabels());
+        }
+    }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/NewClusterWorkerIdGenerator.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/NewClusterWorkerIdGenerator.java
new file mode 100644
index 00000000000..0dcf218d21a
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/NewClusterWorkerIdGenerator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator;
+
+import com.google.common.base.Preconditions;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
+import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
+import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.NewRegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.exception.WorkIdAssignedException;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.node.WorkerIdNode;
+import org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Properties;
+
+/**
+ * TODO replace the old implementation after meta data refactor completed
+ * New worker id generator for cluster mode.
+ */
+@RequiredArgsConstructor
+@Slf4j
+public final class NewClusterWorkerIdGenerator implements WorkerIdGenerator {
+    
+    private final NewRegistryCenter registryCenter;
+    
+    private final InstanceMetaData instanceMetaData;
+    
+    private volatile boolean isWarned;
+    
+    @Override
+    public int generate(final Properties props) {
+        int result = registryCenter.getComputeNodeStatusService().loadInstanceWorkerId(instanceMetaData.getId()).orElseGet(this::reGenerate);
+        checkIneffectiveConfiguration(result, props);
+        return result;
+    }
+    
+    private Integer reGenerate() {
+        Optional<Integer> result;
+        do {
+            result = generateAvailableWorkerId();
+        } while (!result.isPresent());
+        Integer generatedWorkId = result.get();
+        registryCenter.getComputeNodeStatusService().persistInstanceWorkerId(instanceMetaData.getId(), generatedWorkId);
+        return generatedWorkId;
+    }
+    
+    private Optional<Integer> generateAvailableWorkerId() {
+        Collection<Integer> assignedWorkerIds = registryCenter.getComputeNodeStatusService().getAssignedWorkerIds();
+        ShardingSpherePreconditions.checkState(assignedWorkerIds.size() <= 1024, WorkIdAssignedException::new);
+        Collection<Integer> availableWorkerIds = new LinkedList<>();
+        for (int i = 0; i < 1024; i++) {
+            availableWorkerIds.add(i);
+        }
+        PriorityQueue<Integer> priorityQueue = new PriorityQueue<>(availableWorkerIds);
+        for (Integer each : assignedWorkerIds) {
+            priorityQueue.remove(each);
+        }
+        Integer preselectedWorkerId = priorityQueue.poll();
+        Preconditions.checkState(null != preselectedWorkerId, "Preselected worker-id can not be null.");
+        try {
+            registryCenter.getRepository().persistExclusiveEphemeral(WorkerIdNode.getWorkerIdGeneratorPath(preselectedWorkerId.toString()), instanceMetaData.getId());
+            return Optional.of(preselectedWorkerId);
+        } catch (final ClusterPersistRepositoryException ignore) {
+            return Optional.empty();
+        }
+    }
+    
+    private void checkIneffectiveConfiguration(final long generatedWorkerId, final Properties props) {
+        if (!isWarned && null != props && props.containsKey(WORKER_ID_KEY)) {
+            isWarned = true;
+            log.warn("No need to configured {} in cluster mode, system assigned {} was {}", WORKER_ID_KEY, WORKER_ID_KEY, generatedWorkerId);
+        }
+    }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/NewContextManagerSubscriberFacade.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/NewContextManagerSubscriberFacade.java
index 5159fd6c874..1672cf8390c 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/NewContextManagerSubscriberFacade.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/NewContextManagerSubscriberFacade.java
@@ -18,8 +18,8 @@
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
 
 import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ProcessListChangedSubscriber;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.NewRegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.NewProcessListChangedSubscriber;
 
 /**
  * TODO replace the old implementation after meta data refactor completed
@@ -27,13 +27,13 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.proce
  */
 public final class NewContextManagerSubscriberFacade {
     
-    public NewContextManagerSubscriberFacade(final RegistryCenter registryCenter, final ContextManager contextManager) {
+    public NewContextManagerSubscriberFacade(final NewRegistryCenter registryCenter, final ContextManager contextManager) {
         new NewConfigurationChangedSubscriber(contextManager);
         new NewDataSourceChangedSubscriber(contextManager);
         new ResourceMetaDataChangedSubscriber(contextManager);
         new DatabaseChangedSubscriber(contextManager);
-        new StateChangedSubscriber(registryCenter, contextManager);
-        new ProcessListChangedSubscriber(registryCenter, contextManager);
+        new NewStateChangedSubscriber(registryCenter, contextManager);
+        new NewProcessListChangedSubscriber(registryCenter, contextManager);
         new CacheEvictedSubscriber(contextManager.getInstanceContext().getEventBusContext());
     }
 }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/NewStateChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/NewStateChangedSubscriber.java
new file mode 100644
index 00000000000..a64afbfd991
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/NewStateChangedSubscriber.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.infra.datasource.state.DataSourceState;
+import org.apache.shardingsphere.infra.datasource.state.DataSourceStateManager;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
+import org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
+import org.apache.shardingsphere.mode.event.storage.StorageNodeDataSourceChangedEvent;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.NewRegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterLockDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStatusChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
+
+import java.util.Optional;
+
+/**
+ * TODO replace the old StateChangedSubscriber after meta data refactor completed
+ * New state changed subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class NewStateChangedSubscriber {
+    
+    private final NewRegistryCenter registryCenter;
+    
+    private final ContextManager contextManager;
+    
+    public NewStateChangedSubscriber(final NewRegistryCenter registryCenter, final ContextManager contextManager) {
+        this.registryCenter = registryCenter;
+        this.contextManager = contextManager;
+        contextManager.getInstanceContext().getEventBusContext().register(this);
+    }
+    
+    /**
+     * Renew disabled data source names.
+     * 
+     * @param event Storage node changed event
+     */
+    @Subscribe
+    public synchronized void renew(final StorageNodeChangedEvent event) {
+        if (!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getQualifiedDatabase().getDatabaseName())) {
+            return;
+        }
+        QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
+        Optional<StaticDataSourceContainedRule> staticDataSourceRule = contextManager.getMetaDataContexts()
+                .getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData().findSingleRule(StaticDataSourceContainedRule.class);
+        staticDataSourceRule.ifPresent(optional -> optional.updateStatus(new StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource())));
+        DataSourceStateManager.getInstance().updateState(
+                qualifiedDatabase.getDatabaseName(), qualifiedDatabase.getDataSourceName(), DataSourceState.valueOf(event.getDataSource().getStatus().name()));
+    }
+    
+    /**
+     * Reset cluster state.
+     * 
+     * @param event cluster lock deleted event
+     */
+    @Subscribe
+    public synchronized void renew(final ClusterLockDeletedEvent event) {
+        contextManager.getInstanceContext().getEventBusContext().post(new ClusterStatusChangedEvent(event.getState()));
+    }
+    
+    /**
+     * Renew cluster state.
+     * 
+     * @param event cluster state event
+     */
+    @Subscribe
+    public synchronized void renew(final ClusterStateEvent event) {
+        contextManager.updateClusterState(event.getStatus());
+    }
+    
+    /**
+     * Renew instance status.
+     * 
+     * @param event state event
+     */
+    @Subscribe
+    public synchronized void renew(final StateEvent event) {
+        contextManager.getInstanceContext().updateInstanceStatus(event.getInstanceId(), event.getStatus());
+    }
+    
+    /**
+     * Renew instance worker id.
+     * 
+     * @param event worker id event
+     */
+    @Subscribe
+    public synchronized void renew(final WorkerIdEvent event) {
+        contextManager.getInstanceContext().updateWorkerId(event.getInstanceId(), event.getWorkerId());
+    }
+    
+    /**
+     * Renew instance labels.
+     * 
+     * @param event label event
+     */
+    @Subscribe
+    public synchronized void renew(final LabelsEvent event) {
+        // TODO labels may be empty
+        contextManager.getInstanceContext().updateLabel(event.getInstanceId(), event.getLabels());
+    }
+    
+    /**
+     * Renew instance list.
+     *
+     * @param event compute node online event
+     */
+    @Subscribe
+    public synchronized void renew(final InstanceOnlineEvent event) {
+        contextManager.getInstanceContext().addComputeNodeInstance(registryCenter.getComputeNodeStatusService().loadComputeNodeInstance(event.getInstanceMetaData()));
+    }
+    
+    /**
+     * Renew instance list.
+     *
+     * @param event compute node offline event
+     */
+    @Subscribe
+    public synchronized void renew(final InstanceOfflineEvent event) {
+        contextManager.getInstanceContext().deleteComputeNodeInstance(new ComputeNodeInstance(event.getInstanceMetaData()));
+    }
+}