You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/10/10 03:41:09 UTC

[shardingsphere] branch master updated: collect sharding sphere data. (#21424)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4eeaca05d9f collect sharding sphere data. (#21424)
4eeaca05d9f is described below

commit 4eeaca05d9f42a119e8e75ed82a408c379ac523b
Author: Chuxin Chen <ch...@qq.com>
AuthorDate: Mon Oct 10 11:41:01 2022 +0800

    collect sharding sphere data. (#21424)
    
    * collect sharding sphere data.
    
    * collect sharding sphere data.
    
    * collect sharding sphere data.
---
 .../data/ShardingStatisticsTableCollector.java     | 124 +++++++++++++++++++++
 ....spi.data.collector.ShardingSphereDataCollector |   2 +-
 .../collector/ShardingSphereDataCollector.java     |  48 ++++++++
 .../ShardingSphereDataCollectorFactory.java        |  46 ++++++++
 .../core/execute/ShardingSphereDataJobWorker.java  |  52 +++++++++
 .../ShardingSphereDataScheduleCollector.java       | 112 +++++++++++++++++++
 ...gSphereDataContextManagerLifecycleListener.java |  40 +++++++
 ...anager.listener.ContextManagerLifecycleListener |   1 +
 8 files changed, 424 insertions(+), 1 deletion(-)

diff --git a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
new file mode 100644
index 00000000000..b5a000f6b95
--- /dev/null
+++ b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sharding.metadata.data;
+
+import org.apache.shardingsphere.data.pipeline.spi.data.collector.ShardingSphereDataCollector;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereRowData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
+import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.sharding.rule.ShardingRule;
+import org.apache.shardingsphere.sharding.rule.TableRule;
+
+import javax.sql.DataSource;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Sharding statistics table data collector.
+ */
+public class ShardingStatisticsTableCollector implements ShardingSphereDataCollector {
+    
+    private static final String SHARDING_STATISTICS_TABLE = "sharding_statistics_table";
+    
+    private static final String SHARDING_SPHERE = "shardingsphere";
+    
+    private static final String MYSQL_TABLE_ROWS_AND_DATA_LENGTH = "SELECT TABLE_ROWS, DATA_LENGTH FROM information_schema.TABLES WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s'";
+    
+    @Override
+    public void collect(final ShardingSphereData shardingSphereData, final String databaseName, final ShardingSphereRuleMetaData ruleMetaData,
+                        final Map<String, DataSource> dataSources, final DatabaseType databaseType) throws SQLException {
+        Optional<ShardingRule> shardingRule = ruleMetaData.findSingleRule(ShardingRule.class);
+        if (!shardingRule.isPresent()) {
+            return;
+        }
+        ShardingSphereTableData tableData = collectForShardingStatisticTable(databaseName, dataSources, databaseType, shardingRule.get());
+        // TODO refactor by dialect database
+        if (databaseType instanceof MySQLDatabaseType) {
+            Optional.ofNullable(shardingSphereData.getDatabaseData().get(SHARDING_SPHERE)).map(database -> database.getSchemaData().get(SHARDING_SPHERE))
+                    .ifPresent(shardingSphereSchemaData -> shardingSphereSchemaData.getTableData().put(SHARDING_STATISTICS_TABLE, tableData));
+        } else if (databaseType instanceof PostgreSQLDatabaseType || databaseType instanceof OpenGaussDatabaseType) {
+            Optional.ofNullable(shardingSphereData.getDatabaseData().get(databaseName)).map(database -> database.getSchemaData().get(SHARDING_SPHERE))
+                    .ifPresent(shardingSphereSchemaData -> shardingSphereSchemaData.getTableData().put(SHARDING_STATISTICS_TABLE, tableData));
+        }
+    }
+    
+    private ShardingSphereTableData collectForShardingStatisticTable(final String databaseName, final Map<String, DataSource> dataSources,
+                                                                     final DatabaseType databaseType, final ShardingRule shardingRule) throws SQLException {
+        ShardingSphereTableData result = new ShardingSphereTableData(SHARDING_STATISTICS_TABLE);
+        int count = 1;
+        for (TableRule each : shardingRule.getTableRules().values()) {
+            for (DataNode dataNode : each.getActualDataNodes()) {
+                List<Object> row = new LinkedList<>();
+                row.add(count++);
+                row.add(databaseName);
+                row.add(each.getLogicTable());
+                row.add(dataNode.getDataSourceName());
+                row.add(dataNode.getTableName());
+                addTableRowsAndDataLength(dataSources, dataNode, row, databaseType);
+                result.getRows().add(new ShardingSphereRowData(row));
+            }
+        }
+        return result;
+    }
+    
+    private void addTableRowsAndDataLength(final Map<String, DataSource> dataSources, final DataNode dataNode, final List<Object> row, final DatabaseType databaseType) throws SQLException {
+        if (databaseType instanceof MySQLDatabaseType) {
+            addForMySQL(dataSources, dataNode, row);
+        } else if (databaseType instanceof PostgreSQLDatabaseType || databaseType instanceof OpenGaussDatabaseType) {
+            // TODO get postgres rows and data length
+            row.add(BigDecimal.ZERO);
+            row.add(BigDecimal.ZERO);
+        }
+    }
+    
+    private void addForMySQL(final Map<String, DataSource> dataSources, final DataNode dataNode, final List<Object> row) throws SQLException {
+        DataSource dataSource = dataSources.get(dataNode.getDataSourceName());
+        BigDecimal tableRows = BigDecimal.ZERO;
+        BigDecimal dataLength = BigDecimal.ZERO;
+        try (
+                Connection connection = dataSource.getConnection();
+                Statement statement = connection.createStatement()) {
+            try (ResultSet resultSet = statement.executeQuery(String.format(MYSQL_TABLE_ROWS_AND_DATA_LENGTH, connection.getSchema(), dataNode.getTableName()))) {
+                if (resultSet.next()) {
+                    tableRows = resultSet.getBigDecimal("TABLE_ROWS");
+                    dataLength = resultSet.getBigDecimal("DATA_LENGTH");
+                }
+            }
+        }
+        row.add(tableRows);
+        row.add(dataLength);
+    }
+    
+    @Override
+    public String getType() {
+        return SHARDING_STATISTICS_TABLE;
+    }
+}
diff --git a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener b/features/sharding/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.data.collector.ShardingSphereDataCollector
similarity index 89%
copy from kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener
copy to features/sharding/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.data.collector.ShardingSphereDataCollector
index 4f351a1a4a0..cf400ced718 100644
--- a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener
+++ b/features/sharding/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.data.collector.ShardingSphereDataCollector
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.core.listener.PipelineContextManagerLifecycleListener
+org.apache.shardingsphere.sharding.metadata.data.ShardingStatisticsTableCollector
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/data/collector/ShardingSphereDataCollector.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/data/collector/ShardingSphereDataCollector.java
new file mode 100644
index 00000000000..722b61ebb23
--- /dev/null
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/data/collector/ShardingSphereDataCollector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.spi.data.collector;
+
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
+import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Map;
+
+/**
+ * ShardingSphere data collector.
+ */
+@SingletonSPI
+public interface ShardingSphereDataCollector extends TypedSPI {
+    
+    /**
+     * Collect.
+     * 
+     * @param shardingSphereData ShardingSphere data
+     * @param databaseName database name
+     * @param ruleMetaData rule meta data
+     * @param dataSources data sources
+     * @param databaseType database type
+     * @throws SQLException sql exception
+     */
+    void collect(ShardingSphereData shardingSphereData, String databaseName, ShardingSphereRuleMetaData ruleMetaData,
+                 Map<String, DataSource> dataSources, DatabaseType databaseType) throws SQLException;
+}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/data/collector/ShardingSphereDataCollectorFactory.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/data/collector/ShardingSphereDataCollectorFactory.java
new file mode 100644
index 00000000000..b3554a6d57a
--- /dev/null
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/data/collector/ShardingSphereDataCollectorFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.spi.data.collector;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
+
+import java.util.Optional;
+
+/**
+ * ShardingSphere data collector factory.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ShardingSphereDataCollectorFactory {
+    
+    static {
+        ShardingSphereServiceLoader.register(ShardingSphereDataCollector.class);
+    }
+    
+    /**
+     * Find instance of rule altered detector.
+     *
+     * @param tableName table name
+     * @return found instance
+     */
+    public static Optional<ShardingSphereDataCollector> findInstance(final String tableName) {
+        return TypedSPIRegistry.findRegisteredService(ShardingSphereDataCollector.class, tableName);
+    }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataJobWorker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataJobWorker.java
new file mode 100644
index 00000000000..77543f31d6c
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataJobWorker.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.execute;
+
+import org.apache.shardingsphere.mode.manager.ContextManager;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * ShardingSphere data job worker.
+ */
+public final class ShardingSphereDataJobWorker {
+    
+    private static final AtomicBoolean WORKER_INITIALIZED = new AtomicBoolean(false);
+    
+    /**
+     * Initialize job worker.
+     *
+     * @param contextManager context manager
+     */
+    public static void initialize(final ContextManager contextManager) {
+        if (WORKER_INITIALIZED.get()) {
+            return;
+        }
+        synchronized (WORKER_INITIALIZED) {
+            if (WORKER_INITIALIZED.get()) {
+                return;
+            }
+            startScheduleThread(contextManager);
+            WORKER_INITIALIZED.set(true);
+        }
+    }
+    
+    private static void startScheduleThread(final ContextManager contextManager) {
+        new ShardingSphereDataScheduleCollector(contextManager).start();
+    }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
new file mode 100644
index 00000000000..60bc4e33e87
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.execute;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.spi.data.collector.ShardingSphereDataCollectorFactory;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
+import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ShardingSphere data schedule collector.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public final class ShardingSphereDataScheduleCollector {
+    
+    private static final String SHARDING_SPHERE = "shardingsphere";
+    
+    private final ScheduledExecutorService dataCollectorExecutor = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("data-collect-%d"));
+    
+    private final ContextManager contextManager;
+    
+    /**
+     * Start.
+     */
+    public void start() {
+        dataCollectorExecutor.scheduleWithFixedDelay(new ShardingSphereDataCollectorRunnable(contextManager), 0, 30, TimeUnit.SECONDS);
+    }
+    
+    @RequiredArgsConstructor
+    private static final class ShardingSphereDataCollectorRunnable implements Runnable {
+        
+        private final ContextManager contextManager;
+        
+        @Override
+        public void run() {
+            ShardingSphereData shardingSphereData = contextManager.getMetaDataContexts().getShardingSphereData();
+            ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData();
+            DatabaseType databaseType = metaData.getDatabases().values().iterator().next().getProtocolType();
+            // TODO refactor by dialect database
+            if (databaseType instanceof MySQLDatabaseType) {
+                collectForMySQL(shardingSphereData, metaData, databaseType);
+            } else if (databaseType instanceof PostgreSQLDatabaseType || databaseType instanceof OpenGaussDatabaseType) {
+                collectForPostgreSQL(shardingSphereData, metaData, databaseType);
+            }
+        }
+        
+        private void collectForMySQL(final ShardingSphereData shardingSphereData, final ShardingSphereMetaData metaData, final DatabaseType databaseType) {
+            Optional<Collection<ShardingSphereTable>> shardingSphereTables = Optional.ofNullable(metaData.getDatabase(SHARDING_SPHERE))
+                    .map(database -> database.getSchema(SHARDING_SPHERE)).map(schema -> schema.getTables().values());
+            shardingSphereTables.ifPresent(tables -> tables.forEach(table -> metaData.getDatabases().forEach((key, value) -> {
+                if (!databaseType.getSystemDatabaseSchemaMap().containsKey(key)) {
+                    collectForEachDatabase(shardingSphereData, table, value, databaseType);
+                }
+            })));
+        }
+        
+        private void collectForPostgreSQL(final ShardingSphereData shardingSphereData, final ShardingSphereMetaData metaData, final DatabaseType databaseType) {
+            metaData.getDatabases().forEach((key, value) -> {
+                if (!databaseType.getSystemDatabaseSchemaMap().containsKey(key)) {
+                    Optional<Collection<ShardingSphereTable>> shardingSphereTables = Optional.ofNullable(value.getSchema(SHARDING_SPHERE)).map(schema -> schema.getTables().values());
+                    shardingSphereTables.ifPresent(tables -> tables.forEach(table -> collectForEachDatabase(shardingSphereData, table, value, databaseType)));
+                }
+            });
+        }
+        
+        private void collectForEachDatabase(final ShardingSphereData shardingSphereData, final ShardingSphereTable table, final ShardingSphereDatabase database, final DatabaseType databaseType) {
+            String databaseName = database.getName();
+            Map<String, DataSource> dataSources = database.getResource().getDataSources();
+            ShardingSphereDataCollectorFactory.findInstance(table.getName()).ifPresent(shardingSphereDataCollector -> {
+                try {
+                    shardingSphereDataCollector.collect(shardingSphereData, databaseName, database.getRuleMetaData(), dataSources, databaseType);
+                } catch (SQLException ex) {
+                    log.error("Collect data for sharding_table_statistics error!", ex);
+                }
+            });
+        }
+    }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/ShardingSphereDataContextManagerLifecycleListener.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/ShardingSphereDataContextManagerLifecycleListener.java
new file mode 100644
index 00000000000..f9f1ff1f5ce
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/ShardingSphereDataContextManagerLifecycleListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.listener;
+
+import org.apache.shardingsphere.data.pipeline.core.execute.ShardingSphereDataJobWorker;
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener;
+
+/**
+ * ShardingSphere data context manager lifecycle listener.
+ */
+public final class ShardingSphereDataContextManagerLifecycleListener implements ContextManagerLifecycleListener {
+    
+    @Override
+    public void onInitialized(final ModeConfiguration modeConfig, final ContextManager contextManager) {
+        if (null == modeConfig) {
+            return;
+        }
+        if (!contextManager.getInstanceContext().isCluster()) {
+            return;
+        }
+        ShardingSphereDataJobWorker.initialize(contextManager);
+    }
+}
diff --git a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener
index 4f351a1a4a0..b60410cbef9 100644
--- a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener
+++ b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener
@@ -16,3 +16,4 @@
 #
 
 org.apache.shardingsphere.data.pipeline.core.listener.PipelineContextManagerLifecycleListener
+org.apache.shardingsphere.data.pipeline.core.listener.ShardingSphereDataContextManagerLifecycleListener