You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/01/13 11:36:58 UTC

[shardingsphere] branch master updated: Extract PipelineContext from RuleAlteredContext (#14742)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e70adcd  Extract PipelineContext from RuleAlteredContext (#14742)
e70adcd is described below

commit e70adcd66d9055a7e8828c69223fb5bc822d8afe
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Thu Jan 13 19:36:09 2022 +0800

    Extract PipelineContext from RuleAlteredContext (#14742)
    
    * Extract PipelineContext from RuleAlteredContext
    
    * Rename RuleAlteredContextUtil to PipelineContextUtil
    
    * Checkstyle
    
    * Fix package typo
---
 .../data/pipeline/core/api/PipelineAPIFactory.java | 10 ++--
 .../consistency/DataConsistencyCheckerImpl.java    |  4 +-
 .../pipeline/core/context/PipelineContext.java     | 67 ++++++++++++++++++++++
 .../scenario/rulealtered/RuleAlteredContext.java   | 42 --------------
 .../distsql/DistSQLBackendHandlerFactoryTest.java  |  4 +-
 .../proxy/initializer/BootstrapInitializer.java    |  8 +--
 .../api/impl/GovernanceRepositoryAPIImplTest.java  |  8 +--
 .../pipeline/api/impl/PipelineJobAPIImplTest.java  |  8 +--
 .../DataConsistencyCheckerImplTest.java            |  4 +-
 .../pipeline/core/job/FinishedCheckJobTest.java    |  4 +-
 .../core/prepare/InventoryTaskSplitterTest.java    |  6 +-
 .../pipeline/core/task/IncrementalTaskTest.java    |  6 +-
 .../data/pipeline/core/task/InventoryTaskTest.java | 10 ++--
 ...edContextUtil.java => PipelineContextUtil.java} |  8 +--
 .../scenario/rulealtered/RuleAlteredJobTest.java   |  4 +-
 15 files changed, 109 insertions(+), 84 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
index 7b18bf6..1725871 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
@@ -24,7 +24,7 @@ import lombok.Getter;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.data.pipeline.core.api.impl.GovernanceRepositoryAPIImpl;
 import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobAPIFactory;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobOperateAPI;
@@ -93,7 +93,7 @@ public final class PipelineAPIFactory {
     }
     
     private static void checkServerConfig() {
-        ModeConfiguration modeConfig = RuleAlteredContext.getModeConfig();
+        ModeConfiguration modeConfig = PipelineContext.getModeConfig();
         Preconditions.checkNotNull(modeConfig, "Mode configuration is required.");
         Preconditions.checkArgument("Cluster".equals(modeConfig.getType()), "Mode must be `Cluster`.");
     }
@@ -119,7 +119,7 @@ public final class PipelineAPIFactory {
         
         private static GovernanceRepositoryAPI createGovernanceRepositoryAPI() {
             checkServerConfig();
-            ClusterPersistRepositoryConfiguration repositoryConfig = (ClusterPersistRepositoryConfiguration) RuleAlteredContext.getModeConfig().getRepository();
+            ClusterPersistRepositoryConfiguration repositoryConfig = (ClusterPersistRepositoryConfiguration) PipelineContext.getModeConfig().getRepository();
             ClusterPersistRepository repository = TypedSPIRegistry.getRegisteredService(ClusterPersistRepository.class, repositoryConfig.getType(), repositoryConfig.getProps());
             repository.init(repositoryConfig);
             return new GovernanceRepositoryAPIImpl(repository);
@@ -139,7 +139,7 @@ public final class PipelineAPIFactory {
         
         private ElasticJobAPIHolder() {
             checkServerConfig();
-            ClusterPersistRepositoryConfiguration repositoryConfig = (ClusterPersistRepositoryConfiguration) RuleAlteredContext.getModeConfig().getRepository();
+            ClusterPersistRepositoryConfiguration repositoryConfig = (ClusterPersistRepositoryConfiguration) PipelineContext.getModeConfig().getRepository();
             String namespace = repositoryConfig.getNamespace() + DataPipelineConstants.DATA_PIPELINE_ROOT;
             jobStatisticsAPI = JobAPIFactory.createJobStatisticsAPI(repositoryConfig.getServerLists(), namespace, null);
             jobConfigurationAPI = JobAPIFactory.createJobConfigurationAPI(repositoryConfig.getServerLists(), namespace, null);
@@ -181,7 +181,7 @@ public final class PipelineAPIFactory {
         
         private static ZookeeperConfiguration getZookeeperConfig() {
             checkServerConfig();
-            ClusterPersistRepositoryConfiguration repositoryConfig = (ClusterPersistRepositoryConfiguration) RuleAlteredContext.getModeConfig().getRepository();
+            ClusterPersistRepositoryConfiguration repositoryConfig = (ClusterPersistRepositoryConfiguration) PipelineContext.getModeConfig().getRepository();
             ZookeeperConfiguration result = new ZookeeperConfiguration(repositoryConfig.getServerLists(), repositoryConfig.getNamespace() + DataPipelineConstants.DATA_PIPELINE_ROOT);
             Properties props = repositoryConfig.getProps();
             result.setMaxSleepTimeMilliseconds(getProperty(props, "max.sleep.time.milliseconds", result.getMaxSleepTimeMilliseconds()));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
index d4c3885..cd81878 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
@@ -27,9 +27,9 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
@@ -199,7 +199,7 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
     }
     
     private Map<String, TableMetaData> getTableMetaDataMap(final String schemaName) {
-        ContextManager contextManager = RuleAlteredContext.getContextManager();
+        ContextManager contextManager = PipelineContext.getContextManager();
         Preconditions.checkNotNull(contextManager, "contextManager null");
         ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData(schemaName);
         return metaData.getSchema().getTables();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java
new file mode 100644
index 0000000..8f19051
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.context;
+
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+
+/**
+ * Pipeline context.
+ */
+public final class PipelineContext {
+    
+    private static volatile ModeConfiguration modeConfig;
+    
+    private static volatile ContextManager contextManager;
+    
+    /**
+     * Get mode configuration.
+     *
+     * @return mode configuration
+     */
+    public static ModeConfiguration getModeConfig() {
+        return modeConfig;
+    }
+    
+    /**
+     * Initialize mode configuration.
+     *
+     * @param modeConfig configuration
+     */
+    public static void initModeConfig(final ModeConfiguration modeConfig) {
+        PipelineContext.modeConfig = modeConfig;
+    }
+    
+    /**
+     * Get context manager.
+     *
+     * @return context manager
+     */
+    public static ContextManager getContextManager() {
+        return contextManager;
+    }
+    
+    /**
+     * Initialize context manager.
+     *
+     * @param contextManager context manager
+     */
+    public static void initContextManager(final ContextManager contextManager) {
+        PipelineContext.contextManager = contextManager;
+    }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
index dd275ac..874ca43 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
@@ -30,7 +30,6 @@ import org.apache.shardingsphere.data.pipeline.spi.lock.RuleBasedJobLockAlgorith
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
 import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
-import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
 import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
 import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration.OutputConfiguration;
@@ -38,7 +37,6 @@ import org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAl
 import org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration.YamlOutputConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.swapper.rulealtered.OnRuleAlteredActionConfigurationYamlSwapper.InputConfigurationSwapper;
 import org.apache.shardingsphere.infra.yaml.config.swapper.rulealtered.OnRuleAlteredActionConfigurationYamlSwapper.OutputConfigurationSwapper;
-import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
 
 import java.util.Properties;
@@ -60,10 +58,6 @@ public final class RuleAlteredContext {
         ShardingSphereServiceLoader.register(RuleBasedJobLockAlgorithm.class);
     }
     
-    private static volatile ModeConfiguration modeConfig;
-    
-    private static volatile ContextManager contextManager;
-    
     private final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig;
     
     private final JobRateLimitAlgorithm inputRateLimitAlgorithm;
@@ -115,40 +109,4 @@ public final class RuleAlteredContext {
         incrementalDumperExecuteEngine = ExecuteEngine.newCachedThreadInstance();
         importerExecuteEngine = ExecuteEngine.newFixedThreadInstance(outputConfig.getWorkerThread());
     }
-    
-    /**
-     * Get mode configuration.
-     *
-     * @return mode configuration
-     */
-    public static ModeConfiguration getModeConfig() {
-        return modeConfig;
-    }
-    
-    /**
-     * Initialize mode configuration.
-     *
-     * @param modeConfig configuration
-     */
-    public static void initModeConfig(final ModeConfiguration modeConfig) {
-        RuleAlteredContext.modeConfig = modeConfig;
-    }
-    
-    /**
-     * Get context manager.
-     *
-     * @return context manager
-     */
-    public static ContextManager getContextManager() {
-        return contextManager;
-    }
-    
-    /**
-     * Initialize context manager.
-     *
-     * @param contextManager context manager
-     */
-    public static void initContextManager(final ContextManager contextManager) {
-        RuleAlteredContext.contextManager = contextManager;
-    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/DistSQLBackendHandlerFactoryTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/DistSQLBackendHandlerFactoryTest.java
index c2c95ea..085ea62 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/DistSQLBackendHandlerFactoryTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/DistSQLBackendHandlerFactoryTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.proxy.backend.text.distsql;
 
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.distsql.parser.statement.rdl.alter.AlterResourceStatement;
 import org.apache.shardingsphere.distsql.parser.statement.rdl.create.AddResourceStatement;
 import org.apache.shardingsphere.distsql.parser.statement.rdl.drop.DropResourceStatement;
@@ -282,7 +282,7 @@ public final class DistSQLBackendHandlerFactoryTest {
     private void mockScalingContext() {
         ModeConfiguration modeConfiguration = mock(ModeConfiguration.class);
         when(modeConfiguration.getType()).thenReturn("Cluster");
-        RuleAlteredContext.initModeConfig(modeConfiguration);
+        PipelineContext.initModeConfig(modeConfiguration);
     }
     
     @After
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
index bc7cdd8..60c0716 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
@@ -20,13 +20,12 @@ package org.apache.shardingsphere.proxy.initializer;
 import com.google.common.base.Strings;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
 import org.apache.shardingsphere.db.protocol.CommonConstants;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLServerInfo;
 import org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLServerInfo;
 import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
-import org.apache.shardingsphere.proxy.config.resource.ProxyResourceConfiguration;
 import org.apache.shardingsphere.infra.config.datasource.creator.DataSourcePoolCreatorUtil;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import org.apache.shardingsphere.infra.instance.InstanceDefinition;
@@ -40,6 +39,7 @@ import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.config.ProxyConfiguration;
 import org.apache.shardingsphere.proxy.config.YamlProxyConfiguration;
+import org.apache.shardingsphere.proxy.config.resource.ProxyResourceConfiguration;
 import org.apache.shardingsphere.proxy.config.resource.ProxyResourceConfigurationConverter;
 import org.apache.shardingsphere.proxy.config.yaml.swapper.YamlProxyConfigurationSwapper;
 import org.apache.shardingsphere.proxy.database.DatabaseServerInfo;
@@ -106,8 +106,8 @@ public final class BootstrapInitializer {
             log.info("mode type is not Cluster, ignore initRuleAlteredJobWorker");
             return;
         }
-        RuleAlteredContext.initModeConfig(modeConfig);
-        RuleAlteredContext.initContextManager(contextManager);
+        PipelineContext.initModeConfig(modeConfig);
+        PipelineContext.initContextManager(contextManager);
         // TODO init worker only if necessary, e.g. 1) rule altered action configured, 2) enabled job exists, 3) stopped job restarted
         RuleAlteredJobWorker.initWorkerIfNecessary();
     }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 78dfafc..3013e6e 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -29,8 +29,8 @@ import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.JobProgressYamlSwapper;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
 import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -61,7 +61,7 @@ public final class GovernanceRepositoryAPIImplTest {
     @BeforeClass
     public static void beforeClass() {
         EmbedTestingServer.start();
-        RuleAlteredContextUtil.mockModeConfig();
+        PipelineContextUtil.mockModeConfig();
         governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
     }
     
@@ -129,12 +129,12 @@ public final class GovernanceRepositoryAPIImplTest {
         dumperConfig.setTableName("t_order");
         dumperConfig.setPrimaryKey("order_id");
         dumperConfig.setShardingItem(0);
-        return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), RuleAlteredContextUtil.getPipelineChannelFactory(), RuleAlteredContextUtil.getExecuteEngine());
+        return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelFactory(), PipelineContextUtil.getExecuteEngine());
     }
     
     private IncrementalTask mockIncrementalTask(final TaskConfiguration taskConfig) {
         DumperConfiguration dumperConfig = taskConfig.getDumperConfig();
         dumperConfig.setPosition(new PlaceholderPosition());
-        return new IncrementalTask(3, dumperConfig, taskConfig.getImporterConfig(), RuleAlteredContextUtil.getPipelineChannelFactory(), RuleAlteredContextUtil.getExecuteEngine());
+        return new IncrementalTask(3, dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelFactory(), PipelineContextUtil.getExecuteEngine());
     }
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
index fd74b07..0072871 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
@@ -32,8 +32,8 @@ import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
 import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
 import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
 import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureDataConsistencyCheckAlgorithm;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -60,7 +60,7 @@ public final class PipelineJobAPIImplTest {
     @BeforeClass
     public static void beforeClass() {
         EmbedTestingServer.start();
-        RuleAlteredContextUtil.mockModeConfig();
+        PipelineContextUtil.mockModeConfig();
         pipelineJobAPI = PipelineJobAPIFactory.getPipelineJobAPI();
     }
     
@@ -141,7 +141,7 @@ public final class PipelineJobAPIImplTest {
         assertTrue(jobId.isPresent());
         JobConfiguration jobConfig = pipelineJobAPI.getJobConfig(jobId.get());
         initTableData(jobConfig.getPipelineConfig());
-        RuleAlteredContextUtil.mockContextManager();
+        PipelineContextUtil.mockContextManager();
         Map<String, DataConsistencyCheckResult> checkResultMap = pipelineJobAPI.dataConsistencyCheck(jobId.get());
         assertThat(checkResultMap.size(), is(1));
     }
@@ -152,7 +152,7 @@ public final class PipelineJobAPIImplTest {
         assertTrue(jobId.isPresent());
         JobConfiguration jobConfig = pipelineJobAPI.getJobConfig(jobId.get());
         initTableData(jobConfig.getPipelineConfig());
-        RuleAlteredContextUtil.mockContextManager();
+        PipelineContextUtil.mockContextManager();
         Map<String, DataConsistencyCheckResult> checkResultMap = pipelineJobAPI.dataConsistencyCheck(jobId.get(), FixtureDataConsistencyCheckAlgorithm.TYPE);
         assertThat(checkResultMap.size(), is(1));
         assertTrue(checkResultMap.get("t_order").isRecordsCountMatched());
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
index a9063a3..79a0029 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
@@ -22,8 +22,8 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureDataConsistencyCheckAlgorithm;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
 import org.apache.shardingsphere.scaling.core.job.check.EnvironmentCheckerFactory;
 import org.junit.Test;
@@ -49,7 +49,7 @@ public final class DataConsistencyCheckerImplTest {
         RuleAlteredJobContext jobContext = new RuleAlteredJobContext(ResourceUtil.mockJobConfig());
         initTableData(jobContext.getTaskConfigs().iterator().next().getDumperConfig().getDataSourceConfig());
         initTableData(jobContext.getTaskConfigs().iterator().next().getImporterConfig().getDataSourceConfig());
-        RuleAlteredContextUtil.mockContextManager();
+        PipelineContextUtil.mockContextManager();
         DataConsistencyChecker dataConsistencyChecker = EnvironmentCheckerFactory.newInstance(jobContext);
         Map<String, DataConsistencyCheckResult> resultMap = dataConsistencyChecker.checkRecordsCount();
         assertTrue(resultMap.get("t_order").isRecordsCountMatched());
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
index 42b596a..cd72dea 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
@@ -22,9 +22,9 @@ import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPI;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
 import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -49,7 +49,7 @@ public final class FinishedCheckJobTest {
     @BeforeClass
     public static void beforeClass() {
         EmbedTestingServer.start();
-        RuleAlteredContextUtil.mockModeConfig();
+        PipelineContextUtil.mockModeConfig();
         finishedCheckJob = new FinishedCheckJob();
     }
     
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index be7c39c..5e8368f 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -23,8 +23,8 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPos
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
 import org.junit.After;
@@ -43,9 +43,9 @@ import static org.junit.Assert.assertThat;
 
 public final class InventoryTaskSplitterTest {
     
-    private static final ExecuteEngine EXECUTE_ENGINE = RuleAlteredContextUtil.getExecuteEngine();
+    private static final ExecuteEngine EXECUTE_ENGINE = PipelineContextUtil.getExecuteEngine();
     
-    private static final PipelineChannelFactory PIPELINE_CHANNEL_FACTORY = RuleAlteredContextUtil.getPipelineChannelFactory();
+    private static final PipelineChannelFactory PIPELINE_CHANNEL_FACTORY = PipelineContextUtil.getPipelineChannelFactory();
     
     private RuleAlteredJobContext jobContext;
     
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index c684563..fefdd1b 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.data.pipeline.core.task;
 
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
 import org.junit.After;
 import org.junit.Before;
@@ -37,7 +37,7 @@ public final class IncrementalTaskTest {
     
     @BeforeClass
     public static void beforeClass() {
-        RuleAlteredContextUtil.mockModeConfig();
+        PipelineContextUtil.mockModeConfig();
     }
     
     @Before
@@ -45,7 +45,7 @@ public final class IncrementalTaskTest {
         TaskConfiguration taskConfig = new RuleAlteredJobContext(ResourceUtil.mockJobConfig()).getTaskConfigs().iterator().next();
         taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
         incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
-                RuleAlteredContextUtil.getPipelineChannelFactory(), RuleAlteredContextUtil.getExecuteEngine());
+                PipelineContextUtil.getPipelineChannelFactory(), PipelineContextUtil.getExecuteEngine());
     }
     
     @Test
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 3072163..32ea26a 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -20,14 +20,14 @@ package org.apache.shardingsphere.data.pipeline.core.task;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPosition;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -44,7 +44,7 @@ public final class InventoryTaskTest {
     
     @BeforeClass
     public static void beforeClass() {
-        RuleAlteredContextUtil.mockModeConfig();
+        PipelineContextUtil.mockModeConfig();
         taskConfig = new RuleAlteredJobContext(ResourceUtil.mockJobConfig()).getTaskConfigs().iterator().next();
     }
     
@@ -58,7 +58,7 @@ public final class InventoryTaskTest {
         }
         inventoryDumperConfig.setPosition(position);
         try (InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
-                RuleAlteredContextUtil.getPipelineChannelFactory(), RuleAlteredContextUtil.getExecuteEngine())) {
+                PipelineContextUtil.getPipelineChannelFactory(), PipelineContextUtil.getExecuteEngine())) {
             inventoryTask.start();
         }
     }
@@ -74,7 +74,7 @@ public final class InventoryTaskTest {
         }
         inventoryDumperConfig.setPosition(position);
         try (InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
-                RuleAlteredContextUtil.getPipelineChannelFactory(), RuleAlteredContextUtil.getExecuteEngine())) {
+                PipelineContextUtil.getPipelineChannelFactory(), PipelineContextUtil.getExecuteEngine())) {
             inventoryTask.start();
             assertFalse(inventoryTask.getProgress().getPosition() instanceof FinishedPosition);
         }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
similarity index 93%
rename from shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java
rename to shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
index cf81516..8e7b912 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
@@ -20,11 +20,11 @@ package org.apache.shardingsphere.data.pipeline.core.util;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
 import org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel.MemoryPipelineChannelFactory;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
 import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
@@ -34,7 +34,7 @@ import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
 import java.util.Properties;
 
 @Slf4j
-public final class RuleAlteredContextUtil {
+public final class PipelineContextUtil {
     
     private static final ExecuteEngine EXECUTE_ENGINE = ExecuteEngine.newCachedThreadInstance();
     
@@ -45,7 +45,7 @@ public final class RuleAlteredContextUtil {
      */
     @SneakyThrows
     public static void mockModeConfig() {
-        RuleAlteredContext.initModeConfig(createModeConfig());
+        PipelineContext.initModeConfig(createModeConfig());
     }
     
     private static ModeConfiguration createModeConfig() {
@@ -60,7 +60,7 @@ public final class RuleAlteredContextUtil {
                 ResourceUtil.readFileToString("/config_sharding_sphere_jdbc_source.yaml"));
         ShardingSphereDataSource shardingSphereDataSource = (ShardingSphereDataSource) new PipelineDataSourceFactory().newInstance(pipelineDataSourceConfig).getDataSource();
         ContextManager contextManager = shardingSphereDataSource.getContextManager();
-        RuleAlteredContext.initContextManager(contextManager);
+        PipelineContext.initContextManager(contextManager);
         try {
             shardingSphereDataSource.close();
             // CHECKSTYLE:OFF
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
index 1341353..a320d48 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
@@ -19,9 +19,9 @@ package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import org.junit.BeforeClass;
@@ -37,7 +37,7 @@ public final class RuleAlteredJobTest {
     @BeforeClass
     public static void beforeClass() throws Exception {
         EmbedTestingServer.start();
-        RuleAlteredContextUtil.mockModeConfig();
+        PipelineContextUtil.mockModeConfig();
     }
     
     @Test