You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/10/10 03:41:09 UTC
[shardingsphere] branch master updated: collect sharding sphere data. (#21424)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4eeaca05d9f collect sharding sphere data. (#21424)
4eeaca05d9f is described below
commit 4eeaca05d9f42a119e8e75ed82a408c379ac523b
Author: Chuxin Chen <ch...@qq.com>
AuthorDate: Mon Oct 10 11:41:01 2022 +0800
collect sharding sphere data. (#21424)
* collect sharding sphere data.
* collect sharding sphere data.
* collect sharding sphere data.
---
.../data/ShardingStatisticsTableCollector.java | 124 +++++++++++++++++++++
....spi.data.collector.ShardingSphereDataCollector | 2 +-
.../collector/ShardingSphereDataCollector.java | 48 ++++++++
.../ShardingSphereDataCollectorFactory.java | 46 ++++++++
.../core/execute/ShardingSphereDataJobWorker.java | 52 +++++++++
.../ShardingSphereDataScheduleCollector.java | 112 +++++++++++++++++++
...gSphereDataContextManagerLifecycleListener.java | 40 +++++++
...anager.listener.ContextManagerLifecycleListener | 1 +
8 files changed, 424 insertions(+), 1 deletion(-)
diff --git a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
new file mode 100644
index 00000000000..b5a000f6b95
--- /dev/null
+++ b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sharding.metadata.data;
+
+import org.apache.shardingsphere.data.pipeline.spi.data.collector.ShardingSphereDataCollector;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereRowData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
+import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.sharding.rule.ShardingRule;
+import org.apache.shardingsphere.sharding.rule.TableRule;
+
+import javax.sql.DataSource;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Sharding statistics table data collector.
+ */
+public class ShardingStatisticsTableCollector implements ShardingSphereDataCollector {
+
+ private static final String SHARDING_STATISTICS_TABLE = "sharding_statistics_table";
+
+ private static final String SHARDING_SPHERE = "shardingsphere";
+
+ private static final String MYSQL_TABLE_ROWS_AND_DATA_LENGTH = "SELECT TABLE_ROWS, DATA_LENGTH FROM information_schema.TABLES WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s'";
+
+ @Override
+ public void collect(final ShardingSphereData shardingSphereData, final String databaseName, final ShardingSphereRuleMetaData ruleMetaData,
+ final Map<String, DataSource> dataSources, final DatabaseType databaseType) throws SQLException {
+ Optional<ShardingRule> shardingRule = ruleMetaData.findSingleRule(ShardingRule.class);
+ if (!shardingRule.isPresent()) {
+ return;
+ }
+ ShardingSphereTableData tableData = collectForShardingStatisticTable(databaseName, dataSources, databaseType, shardingRule.get());
+ // TODO refactor by dialect database
+ if (databaseType instanceof MySQLDatabaseType) {
+ Optional.ofNullable(shardingSphereData.getDatabaseData().get(SHARDING_SPHERE)).map(database -> database.getSchemaData().get(SHARDING_SPHERE))
+ .ifPresent(shardingSphereSchemaData -> shardingSphereSchemaData.getTableData().put(SHARDING_STATISTICS_TABLE, tableData));
+ } else if (databaseType instanceof PostgreSQLDatabaseType || databaseType instanceof OpenGaussDatabaseType) {
+ Optional.ofNullable(shardingSphereData.getDatabaseData().get(databaseName)).map(database -> database.getSchemaData().get(SHARDING_SPHERE))
+ .ifPresent(shardingSphereSchemaData -> shardingSphereSchemaData.getTableData().put(SHARDING_STATISTICS_TABLE, tableData));
+ }
+ }
+
+ private ShardingSphereTableData collectForShardingStatisticTable(final String databaseName, final Map<String, DataSource> dataSources,
+ final DatabaseType databaseType, final ShardingRule shardingRule) throws SQLException {
+ ShardingSphereTableData result = new ShardingSphereTableData(SHARDING_STATISTICS_TABLE);
+ int count = 1;
+ for (TableRule each : shardingRule.getTableRules().values()) {
+ for (DataNode dataNode : each.getActualDataNodes()) {
+ List<Object> row = new LinkedList<>();
+ row.add(count++);
+ row.add(databaseName);
+ row.add(each.getLogicTable());
+ row.add(dataNode.getDataSourceName());
+ row.add(dataNode.getTableName());
+ addTableRowsAndDataLength(dataSources, dataNode, row, databaseType);
+ result.getRows().add(new ShardingSphereRowData(row));
+ }
+ }
+ return result;
+ }
+
+ private void addTableRowsAndDataLength(final Map<String, DataSource> dataSources, final DataNode dataNode, final List<Object> row, final DatabaseType databaseType) throws SQLException {
+ if (databaseType instanceof MySQLDatabaseType) {
+ addForMySQL(dataSources, dataNode, row);
+ } else if (databaseType instanceof PostgreSQLDatabaseType || databaseType instanceof OpenGaussDatabaseType) {
+ // TODO get postgres rows and data length
+ row.add(BigDecimal.ZERO);
+ row.add(BigDecimal.ZERO);
+ }
+ }
+
+ private void addForMySQL(final Map<String, DataSource> dataSources, final DataNode dataNode, final List<Object> row) throws SQLException {
+ DataSource dataSource = dataSources.get(dataNode.getDataSourceName());
+ BigDecimal tableRows = BigDecimal.ZERO;
+ BigDecimal dataLength = BigDecimal.ZERO;
+ try (
+ Connection connection = dataSource.getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(String.format(MYSQL_TABLE_ROWS_AND_DATA_LENGTH, connection.getSchema(), dataNode.getTableName()))) {
+ if (resultSet.next()) {
+ tableRows = resultSet.getBigDecimal("TABLE_ROWS");
+ dataLength = resultSet.getBigDecimal("DATA_LENGTH");
+ }
+ }
+ }
+ row.add(tableRows);
+ row.add(dataLength);
+ }
+
+ @Override
+ public String getType() {
+ return SHARDING_STATISTICS_TABLE;
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener b/features/sharding/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.data.collector.ShardingSphereDataCollector
similarity index 89%
copy from kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener
copy to features/sharding/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.data.collector.ShardingSphereDataCollector
index 4f351a1a4a0..cf400ced718 100644
--- a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener
+++ b/features/sharding/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.data.collector.ShardingSphereDataCollector
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.core.listener.PipelineContextManagerLifecycleListener
+org.apache.shardingsphere.sharding.metadata.data.ShardingStatisticsTableCollector
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/data/collector/ShardingSphereDataCollector.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/data/collector/ShardingSphereDataCollector.java
new file mode 100644
index 00000000000..722b61ebb23
--- /dev/null
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/data/collector/ShardingSphereDataCollector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.spi.data.collector;
+
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
+import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Map;
+
+/**
+ * ShardingSphere data collector.
+ */
+@SingletonSPI
+public interface ShardingSphereDataCollector extends TypedSPI {
+
+ /**
+ * Collect.
+ *
+ * @param shardingSphereData ShardingSphere data
+ * @param databaseName database name
+ * @param ruleMetaData rule meta data
+ * @param dataSources data sources
+ * @param databaseType database type
+ * @throws SQLException sql exception
+ */
+ void collect(ShardingSphereData shardingSphereData, String databaseName, ShardingSphereRuleMetaData ruleMetaData,
+ Map<String, DataSource> dataSources, DatabaseType databaseType) throws SQLException;
+}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/data/collector/ShardingSphereDataCollectorFactory.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/data/collector/ShardingSphereDataCollectorFactory.java
new file mode 100644
index 00000000000..b3554a6d57a
--- /dev/null
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/data/collector/ShardingSphereDataCollectorFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.spi.data.collector;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
+
+import java.util.Optional;
+
+/**
+ * ShardingSphere data collector factory.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ShardingSphereDataCollectorFactory {
+
+ static {
+ ShardingSphereServiceLoader.register(ShardingSphereDataCollector.class);
+ }
+
+ /**
+ * Find instance of rule altered detector.
+ *
+ * @param tableName table name
+ * @return found instance
+ */
+ public static Optional<ShardingSphereDataCollector> findInstance(final String tableName) {
+ return TypedSPIRegistry.findRegisteredService(ShardingSphereDataCollector.class, tableName);
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataJobWorker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataJobWorker.java
new file mode 100644
index 00000000000..77543f31d6c
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataJobWorker.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.execute;
+
+import org.apache.shardingsphere.mode.manager.ContextManager;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * ShardingSphere data job worker.
+ */
+public final class ShardingSphereDataJobWorker {
+
+ private static final AtomicBoolean WORKER_INITIALIZED = new AtomicBoolean(false);
+
+ /**
+ * Initialize job worker.
+ *
+ * @param contextManager context manager
+ */
+ public static void initialize(final ContextManager contextManager) {
+ if (WORKER_INITIALIZED.get()) {
+ return;
+ }
+ synchronized (WORKER_INITIALIZED) {
+ if (WORKER_INITIALIZED.get()) {
+ return;
+ }
+ startScheduleThread(contextManager);
+ WORKER_INITIALIZED.set(true);
+ }
+ }
+
+ private static void startScheduleThread(final ContextManager contextManager) {
+ new ShardingSphereDataScheduleCollector(contextManager).start();
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
new file mode 100644
index 00000000000..60bc4e33e87
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.execute;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.spi.data.collector.ShardingSphereDataCollectorFactory;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
+import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ShardingSphere data schedule collector.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public final class ShardingSphereDataScheduleCollector {
+
+ private static final String SHARDING_SPHERE = "shardingsphere";
+
+ private final ScheduledExecutorService dataCollectorExecutor = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("data-collect-%d"));
+
+ private final ContextManager contextManager;
+
+ /**
+ * Start.
+ */
+ public void start() {
+ dataCollectorExecutor.scheduleWithFixedDelay(new ShardingSphereDataCollectorRunnable(contextManager), 0, 30, TimeUnit.SECONDS);
+ }
+
+ @RequiredArgsConstructor
+ private static final class ShardingSphereDataCollectorRunnable implements Runnable {
+
+ private final ContextManager contextManager;
+
+ @Override
+ public void run() {
+ ShardingSphereData shardingSphereData = contextManager.getMetaDataContexts().getShardingSphereData();
+ ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData();
+ DatabaseType databaseType = metaData.getDatabases().values().iterator().next().getProtocolType();
+ // TODO refactor by dialect database
+ if (databaseType instanceof MySQLDatabaseType) {
+ collectForMySQL(shardingSphereData, metaData, databaseType);
+ } else if (databaseType instanceof PostgreSQLDatabaseType || databaseType instanceof OpenGaussDatabaseType) {
+ collectForPostgreSQL(shardingSphereData, metaData, databaseType);
+ }
+ }
+
+ private void collectForMySQL(final ShardingSphereData shardingSphereData, final ShardingSphereMetaData metaData, final DatabaseType databaseType) {
+ Optional<Collection<ShardingSphereTable>> shardingSphereTables = Optional.ofNullable(metaData.getDatabase(SHARDING_SPHERE))
+ .map(database -> database.getSchema(SHARDING_SPHERE)).map(schema -> schema.getTables().values());
+ shardingSphereTables.ifPresent(tables -> tables.forEach(table -> metaData.getDatabases().forEach((key, value) -> {
+ if (!databaseType.getSystemDatabaseSchemaMap().containsKey(key)) {
+ collectForEachDatabase(shardingSphereData, table, value, databaseType);
+ }
+ })));
+ }
+
+ private void collectForPostgreSQL(final ShardingSphereData shardingSphereData, final ShardingSphereMetaData metaData, final DatabaseType databaseType) {
+ metaData.getDatabases().forEach((key, value) -> {
+ if (!databaseType.getSystemDatabaseSchemaMap().containsKey(key)) {
+ Optional<Collection<ShardingSphereTable>> shardingSphereTables = Optional.ofNullable(value.getSchema(SHARDING_SPHERE)).map(schema -> schema.getTables().values());
+ shardingSphereTables.ifPresent(tables -> tables.forEach(table -> collectForEachDatabase(shardingSphereData, table, value, databaseType)));
+ }
+ });
+ }
+
+ private void collectForEachDatabase(final ShardingSphereData shardingSphereData, final ShardingSphereTable table, final ShardingSphereDatabase database, final DatabaseType databaseType) {
+ String databaseName = database.getName();
+ Map<String, DataSource> dataSources = database.getResource().getDataSources();
+ ShardingSphereDataCollectorFactory.findInstance(table.getName()).ifPresent(shardingSphereDataCollector -> {
+ try {
+ shardingSphereDataCollector.collect(shardingSphereData, databaseName, database.getRuleMetaData(), dataSources, databaseType);
+ } catch (SQLException ex) {
+ log.error("Collect data for sharding_table_statistics error!", ex);
+ }
+ });
+ }
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/ShardingSphereDataContextManagerLifecycleListener.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/ShardingSphereDataContextManagerLifecycleListener.java
new file mode 100644
index 00000000000..f9f1ff1f5ce
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/ShardingSphereDataContextManagerLifecycleListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.listener;
+
+import org.apache.shardingsphere.data.pipeline.core.execute.ShardingSphereDataJobWorker;
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener;
+
+/**
+ * ShardingSphere data context manager lifecycle listener.
+ */
+public final class ShardingSphereDataContextManagerLifecycleListener implements ContextManagerLifecycleListener {
+
+ @Override
+ public void onInitialized(final ModeConfiguration modeConfig, final ContextManager contextManager) {
+ if (null == modeConfig) {
+ return;
+ }
+ if (!contextManager.getInstanceContext().isCluster()) {
+ return;
+ }
+ ShardingSphereDataJobWorker.initialize(contextManager);
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener
index 4f351a1a4a0..b60410cbef9 100644
--- a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener
+++ b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener
@@ -16,3 +16,4 @@
#
org.apache.shardingsphere.data.pipeline.core.listener.PipelineContextManagerLifecycleListener
+org.apache.shardingsphere.data.pipeline.core.listener.ShardingSphereDataContextManagerLifecycleListener