You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/01/13 11:36:58 UTC
[shardingsphere] branch master updated: Extract PipelineContext from RuleAlteredContext (#14742)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e70adcd Extract PipelineContext from RuleAlteredContext (#14742)
e70adcd is described below
commit e70adcd66d9055a7e8828c69223fb5bc822d8afe
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Thu Jan 13 19:36:09 2022 +0800
Extract PipelineContext from RuleAlteredContext (#14742)
* Extract PipelineContext from RuleAlteredContext
* Rename RuleAlteredContextUtil to PipelineContextUtil
* Checkstyle
* Fix package typo
---
.../data/pipeline/core/api/PipelineAPIFactory.java | 10 ++--
.../consistency/DataConsistencyCheckerImpl.java | 4 +-
.../pipeline/core/context/PipelineContext.java | 67 ++++++++++++++++++++++
.../scenario/rulealtered/RuleAlteredContext.java | 42 --------------
.../distsql/DistSQLBackendHandlerFactoryTest.java | 4 +-
.../proxy/initializer/BootstrapInitializer.java | 8 +--
.../api/impl/GovernanceRepositoryAPIImplTest.java | 8 +--
.../pipeline/api/impl/PipelineJobAPIImplTest.java | 8 +--
.../DataConsistencyCheckerImplTest.java | 4 +-
.../pipeline/core/job/FinishedCheckJobTest.java | 4 +-
.../core/prepare/InventoryTaskSplitterTest.java | 6 +-
.../pipeline/core/task/IncrementalTaskTest.java | 6 +-
.../data/pipeline/core/task/InventoryTaskTest.java | 10 ++--
...edContextUtil.java => PipelineContextUtil.java} | 8 +--
.../scenario/rulealtered/RuleAlteredJobTest.java | 4 +-
15 files changed, 109 insertions(+), 84 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
index 7b18bf6..1725871 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
@@ -24,7 +24,7 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.data.pipeline.core.api.impl.GovernanceRepositoryAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobAPIFactory;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobOperateAPI;
@@ -93,7 +93,7 @@ public final class PipelineAPIFactory {
}
private static void checkServerConfig() {
- ModeConfiguration modeConfig = RuleAlteredContext.getModeConfig();
+ ModeConfiguration modeConfig = PipelineContext.getModeConfig();
Preconditions.checkNotNull(modeConfig, "Mode configuration is required.");
Preconditions.checkArgument("Cluster".equals(modeConfig.getType()), "Mode must be `Cluster`.");
}
@@ -119,7 +119,7 @@ public final class PipelineAPIFactory {
private static GovernanceRepositoryAPI createGovernanceRepositoryAPI() {
checkServerConfig();
- ClusterPersistRepositoryConfiguration repositoryConfig = (ClusterPersistRepositoryConfiguration) RuleAlteredContext.getModeConfig().getRepository();
+ ClusterPersistRepositoryConfiguration repositoryConfig = (ClusterPersistRepositoryConfiguration) PipelineContext.getModeConfig().getRepository();
ClusterPersistRepository repository = TypedSPIRegistry.getRegisteredService(ClusterPersistRepository.class, repositoryConfig.getType(), repositoryConfig.getProps());
repository.init(repositoryConfig);
return new GovernanceRepositoryAPIImpl(repository);
@@ -139,7 +139,7 @@ public final class PipelineAPIFactory {
private ElasticJobAPIHolder() {
checkServerConfig();
- ClusterPersistRepositoryConfiguration repositoryConfig = (ClusterPersistRepositoryConfiguration) RuleAlteredContext.getModeConfig().getRepository();
+ ClusterPersistRepositoryConfiguration repositoryConfig = (ClusterPersistRepositoryConfiguration) PipelineContext.getModeConfig().getRepository();
String namespace = repositoryConfig.getNamespace() + DataPipelineConstants.DATA_PIPELINE_ROOT;
jobStatisticsAPI = JobAPIFactory.createJobStatisticsAPI(repositoryConfig.getServerLists(), namespace, null);
jobConfigurationAPI = JobAPIFactory.createJobConfigurationAPI(repositoryConfig.getServerLists(), namespace, null);
@@ -181,7 +181,7 @@ public final class PipelineAPIFactory {
private static ZookeeperConfiguration getZookeeperConfig() {
checkServerConfig();
- ClusterPersistRepositoryConfiguration repositoryConfig = (ClusterPersistRepositoryConfiguration) RuleAlteredContext.getModeConfig().getRepository();
+ ClusterPersistRepositoryConfiguration repositoryConfig = (ClusterPersistRepositoryConfiguration) PipelineContext.getModeConfig().getRepository();
ZookeeperConfiguration result = new ZookeeperConfiguration(repositoryConfig.getServerLists(), repositoryConfig.getNamespace() + DataPipelineConstants.DATA_PIPELINE_ROOT);
Properties props = repositoryConfig.getProps();
result.setMaxSleepTimeMilliseconds(getProperty(props, "max.sleep.time.milliseconds", result.getMaxSleepTimeMilliseconds()));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
index d4c3885..cd81878 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
@@ -27,9 +27,9 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
@@ -199,7 +199,7 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
}
private Map<String, TableMetaData> getTableMetaDataMap(final String schemaName) {
- ContextManager contextManager = RuleAlteredContext.getContextManager();
+ ContextManager contextManager = PipelineContext.getContextManager();
Preconditions.checkNotNull(contextManager, "contextManager null");
ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData(schemaName);
return metaData.getSchema().getTables();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java
new file mode 100644
index 0000000..8f19051
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.context;
+
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+
+/**
+ * Pipeline context.
+ */
+public final class PipelineContext {
+
+ private static volatile ModeConfiguration modeConfig;
+
+ private static volatile ContextManager contextManager;
+
+ /**
+ * Get mode configuration.
+ *
+ * @return mode configuration
+ */
+ public static ModeConfiguration getModeConfig() {
+ return modeConfig;
+ }
+
+ /**
+ * Initialize mode configuration.
+ *
+ * @param modeConfig configuration
+ */
+ public static void initModeConfig(final ModeConfiguration modeConfig) {
+ PipelineContext.modeConfig = modeConfig;
+ }
+
+ /**
+ * Get context manager.
+ *
+ * @return context manager
+ */
+ public static ContextManager getContextManager() {
+ return contextManager;
+ }
+
+ /**
+ * Initialize context manager.
+ *
+ * @param contextManager context manager
+ */
+ public static void initContextManager(final ContextManager contextManager) {
+ PipelineContext.contextManager = contextManager;
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
index dd275ac..874ca43 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
@@ -30,7 +30,6 @@ import org.apache.shardingsphere.data.pipeline.spi.lock.RuleBasedJobLockAlgorith
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
-import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration.OutputConfiguration;
@@ -38,7 +37,6 @@ import org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAl
import org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration.YamlOutputConfiguration;
import org.apache.shardingsphere.infra.yaml.config.swapper.rulealtered.OnRuleAlteredActionConfigurationYamlSwapper.InputConfigurationSwapper;
import org.apache.shardingsphere.infra.yaml.config.swapper.rulealtered.OnRuleAlteredActionConfigurationYamlSwapper.OutputConfigurationSwapper;
-import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
import java.util.Properties;
@@ -60,10 +58,6 @@ public final class RuleAlteredContext {
ShardingSphereServiceLoader.register(RuleBasedJobLockAlgorithm.class);
}
- private static volatile ModeConfiguration modeConfig;
-
- private static volatile ContextManager contextManager;
-
private final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig;
private final JobRateLimitAlgorithm inputRateLimitAlgorithm;
@@ -115,40 +109,4 @@ public final class RuleAlteredContext {
incrementalDumperExecuteEngine = ExecuteEngine.newCachedThreadInstance();
importerExecuteEngine = ExecuteEngine.newFixedThreadInstance(outputConfig.getWorkerThread());
}
-
- /**
- * Get mode configuration.
- *
- * @return mode configuration
- */
- public static ModeConfiguration getModeConfig() {
- return modeConfig;
- }
-
- /**
- * Initialize mode configuration.
- *
- * @param modeConfig configuration
- */
- public static void initModeConfig(final ModeConfiguration modeConfig) {
- RuleAlteredContext.modeConfig = modeConfig;
- }
-
- /**
- * Get context manager.
- *
- * @return context manager
- */
- public static ContextManager getContextManager() {
- return contextManager;
- }
-
- /**
- * Initialize context manager.
- *
- * @param contextManager context manager
- */
- public static void initContextManager(final ContextManager contextManager) {
- RuleAlteredContext.contextManager = contextManager;
- }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/DistSQLBackendHandlerFactoryTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/DistSQLBackendHandlerFactoryTest.java
index c2c95ea..085ea62 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/DistSQLBackendHandlerFactoryTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/DistSQLBackendHandlerFactoryTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.proxy.backend.text.distsql;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.distsql.parser.statement.rdl.alter.AlterResourceStatement;
import org.apache.shardingsphere.distsql.parser.statement.rdl.create.AddResourceStatement;
import org.apache.shardingsphere.distsql.parser.statement.rdl.drop.DropResourceStatement;
@@ -282,7 +282,7 @@ public final class DistSQLBackendHandlerFactoryTest {
private void mockScalingContext() {
ModeConfiguration modeConfiguration = mock(ModeConfiguration.class);
when(modeConfiguration.getType()).thenReturn("Cluster");
- RuleAlteredContext.initModeConfig(modeConfiguration);
+ PipelineContext.initModeConfig(modeConfiguration);
}
@After
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
index bc7cdd8..60c0716 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
@@ -20,13 +20,12 @@ package org.apache.shardingsphere.proxy.initializer;
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import org.apache.shardingsphere.db.protocol.CommonConstants;
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLServerInfo;
import org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLServerInfo;
import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
-import org.apache.shardingsphere.proxy.config.resource.ProxyResourceConfiguration;
import org.apache.shardingsphere.infra.config.datasource.creator.DataSourcePoolCreatorUtil;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.instance.InstanceDefinition;
@@ -40,6 +39,7 @@ import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.config.ProxyConfiguration;
import org.apache.shardingsphere.proxy.config.YamlProxyConfiguration;
+import org.apache.shardingsphere.proxy.config.resource.ProxyResourceConfiguration;
import org.apache.shardingsphere.proxy.config.resource.ProxyResourceConfigurationConverter;
import org.apache.shardingsphere.proxy.config.yaml.swapper.YamlProxyConfigurationSwapper;
import org.apache.shardingsphere.proxy.database.DatabaseServerInfo;
@@ -106,8 +106,8 @@ public final class BootstrapInitializer {
log.info("mode type is not Cluster, ignore initRuleAlteredJobWorker");
return;
}
- RuleAlteredContext.initModeConfig(modeConfig);
- RuleAlteredContext.initContextManager(contextManager);
+ PipelineContext.initModeConfig(modeConfig);
+ PipelineContext.initContextManager(contextManager);
// TODO init worker only if necessary, e.g. 1) rule altered action configured, 2) enabled job exists, 3) stopped job restarted
RuleAlteredJobWorker.initWorkerIfNecessary();
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 78dfafc..3013e6e 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -29,8 +29,8 @@ import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.JobProgressYamlSwapper;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -61,7 +61,7 @@ public final class GovernanceRepositoryAPIImplTest {
@BeforeClass
public static void beforeClass() {
EmbedTestingServer.start();
- RuleAlteredContextUtil.mockModeConfig();
+ PipelineContextUtil.mockModeConfig();
governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
}
@@ -129,12 +129,12 @@ public final class GovernanceRepositoryAPIImplTest {
dumperConfig.setTableName("t_order");
dumperConfig.setPrimaryKey("order_id");
dumperConfig.setShardingItem(0);
- return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), RuleAlteredContextUtil.getPipelineChannelFactory(), RuleAlteredContextUtil.getExecuteEngine());
+ return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelFactory(), PipelineContextUtil.getExecuteEngine());
}
private IncrementalTask mockIncrementalTask(final TaskConfiguration taskConfig) {
DumperConfiguration dumperConfig = taskConfig.getDumperConfig();
dumperConfig.setPosition(new PlaceholderPosition());
- return new IncrementalTask(3, dumperConfig, taskConfig.getImporterConfig(), RuleAlteredContextUtil.getPipelineChannelFactory(), RuleAlteredContextUtil.getExecuteEngine());
+ return new IncrementalTask(3, dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelFactory(), PipelineContextUtil.getExecuteEngine());
}
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
index fd74b07..0072871 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
@@ -32,8 +32,8 @@ import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureDataConsistencyCheckAlgorithm;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -60,7 +60,7 @@ public final class PipelineJobAPIImplTest {
@BeforeClass
public static void beforeClass() {
EmbedTestingServer.start();
- RuleAlteredContextUtil.mockModeConfig();
+ PipelineContextUtil.mockModeConfig();
pipelineJobAPI = PipelineJobAPIFactory.getPipelineJobAPI();
}
@@ -141,7 +141,7 @@ public final class PipelineJobAPIImplTest {
assertTrue(jobId.isPresent());
JobConfiguration jobConfig = pipelineJobAPI.getJobConfig(jobId.get());
initTableData(jobConfig.getPipelineConfig());
- RuleAlteredContextUtil.mockContextManager();
+ PipelineContextUtil.mockContextManager();
Map<String, DataConsistencyCheckResult> checkResultMap = pipelineJobAPI.dataConsistencyCheck(jobId.get());
assertThat(checkResultMap.size(), is(1));
}
@@ -152,7 +152,7 @@ public final class PipelineJobAPIImplTest {
assertTrue(jobId.isPresent());
JobConfiguration jobConfig = pipelineJobAPI.getJobConfig(jobId.get());
initTableData(jobConfig.getPipelineConfig());
- RuleAlteredContextUtil.mockContextManager();
+ PipelineContextUtil.mockContextManager();
Map<String, DataConsistencyCheckResult> checkResultMap = pipelineJobAPI.dataConsistencyCheck(jobId.get(), FixtureDataConsistencyCheckAlgorithm.TYPE);
assertThat(checkResultMap.size(), is(1));
assertTrue(checkResultMap.get("t_order").isRecordsCountMatched());
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
index a9063a3..79a0029 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
@@ -22,8 +22,8 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureDataConsistencyCheckAlgorithm;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
import org.apache.shardingsphere.scaling.core.job.check.EnvironmentCheckerFactory;
import org.junit.Test;
@@ -49,7 +49,7 @@ public final class DataConsistencyCheckerImplTest {
RuleAlteredJobContext jobContext = new RuleAlteredJobContext(ResourceUtil.mockJobConfig());
initTableData(jobContext.getTaskConfigs().iterator().next().getDumperConfig().getDataSourceConfig());
initTableData(jobContext.getTaskConfigs().iterator().next().getImporterConfig().getDataSourceConfig());
- RuleAlteredContextUtil.mockContextManager();
+ PipelineContextUtil.mockContextManager();
DataConsistencyChecker dataConsistencyChecker = EnvironmentCheckerFactory.newInstance(jobContext);
Map<String, DataConsistencyCheckResult> resultMap = dataConsistencyChecker.checkRecordsCount();
assertTrue(resultMap.get("t_order").isRecordsCountMatched());
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
index 42b596a..cd72dea 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
@@ -22,9 +22,9 @@ import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -49,7 +49,7 @@ public final class FinishedCheckJobTest {
@BeforeClass
public static void beforeClass() {
EmbedTestingServer.start();
- RuleAlteredContextUtil.mockModeConfig();
+ PipelineContextUtil.mockModeConfig();
finishedCheckJob = new FinishedCheckJob();
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index be7c39c..5e8368f 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -23,8 +23,8 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPos
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
import org.junit.After;
@@ -43,9 +43,9 @@ import static org.junit.Assert.assertThat;
public final class InventoryTaskSplitterTest {
- private static final ExecuteEngine EXECUTE_ENGINE = RuleAlteredContextUtil.getExecuteEngine();
+ private static final ExecuteEngine EXECUTE_ENGINE = PipelineContextUtil.getExecuteEngine();
- private static final PipelineChannelFactory PIPELINE_CHANNEL_FACTORY = RuleAlteredContextUtil.getPipelineChannelFactory();
+ private static final PipelineChannelFactory PIPELINE_CHANNEL_FACTORY = PipelineContextUtil.getPipelineChannelFactory();
private RuleAlteredJobContext jobContext;
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index c684563..fefdd1b 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.data.pipeline.core.task;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
import org.junit.After;
import org.junit.Before;
@@ -37,7 +37,7 @@ public final class IncrementalTaskTest {
@BeforeClass
public static void beforeClass() {
- RuleAlteredContextUtil.mockModeConfig();
+ PipelineContextUtil.mockModeConfig();
}
@Before
@@ -45,7 +45,7 @@ public final class IncrementalTaskTest {
TaskConfiguration taskConfig = new RuleAlteredJobContext(ResourceUtil.mockJobConfig()).getTaskConfigs().iterator().next();
taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
- RuleAlteredContextUtil.getPipelineChannelFactory(), RuleAlteredContextUtil.getExecuteEngine());
+ PipelineContextUtil.getPipelineChannelFactory(), PipelineContextUtil.getExecuteEngine());
}
@Test
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 3072163..32ea26a 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -20,14 +20,14 @@ package org.apache.shardingsphere.data.pipeline.core.task;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPosition;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -44,7 +44,7 @@ public final class InventoryTaskTest {
@BeforeClass
public static void beforeClass() {
- RuleAlteredContextUtil.mockModeConfig();
+ PipelineContextUtil.mockModeConfig();
taskConfig = new RuleAlteredJobContext(ResourceUtil.mockJobConfig()).getTaskConfigs().iterator().next();
}
@@ -58,7 +58,7 @@ public final class InventoryTaskTest {
}
inventoryDumperConfig.setPosition(position);
try (InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
- RuleAlteredContextUtil.getPipelineChannelFactory(), RuleAlteredContextUtil.getExecuteEngine())) {
+ PipelineContextUtil.getPipelineChannelFactory(), PipelineContextUtil.getExecuteEngine())) {
inventoryTask.start();
}
}
@@ -74,7 +74,7 @@ public final class InventoryTaskTest {
}
inventoryDumperConfig.setPosition(position);
try (InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
- RuleAlteredContextUtil.getPipelineChannelFactory(), RuleAlteredContextUtil.getExecuteEngine())) {
+ PipelineContextUtil.getPipelineChannelFactory(), PipelineContextUtil.getExecuteEngine())) {
inventoryTask.start();
assertFalse(inventoryTask.getProgress().getPosition() instanceof FinishedPosition);
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
similarity index 93%
rename from shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java
rename to shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
index cf81516..8e7b912 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
@@ -20,11 +20,11 @@ package org.apache.shardingsphere.data.pipeline.core.util;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
import org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel.MemoryPipelineChannelFactory;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
@@ -34,7 +34,7 @@ import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
import java.util.Properties;
@Slf4j
-public final class RuleAlteredContextUtil {
+public final class PipelineContextUtil {
private static final ExecuteEngine EXECUTE_ENGINE = ExecuteEngine.newCachedThreadInstance();
@@ -45,7 +45,7 @@ public final class RuleAlteredContextUtil {
*/
@SneakyThrows
public static void mockModeConfig() {
- RuleAlteredContext.initModeConfig(createModeConfig());
+ PipelineContext.initModeConfig(createModeConfig());
}
private static ModeConfiguration createModeConfig() {
@@ -60,7 +60,7 @@ public final class RuleAlteredContextUtil {
ResourceUtil.readFileToString("/config_sharding_sphere_jdbc_source.yaml"));
ShardingSphereDataSource shardingSphereDataSource = (ShardingSphereDataSource) new PipelineDataSourceFactory().newInstance(pipelineDataSourceConfig).getDataSource();
ContextManager contextManager = shardingSphereDataSource.getContextManager();
- RuleAlteredContext.initContextManager(contextManager);
+ PipelineContext.initContextManager(contextManager);
try {
shardingSphereDataSource.close();
// CHECKSTYLE:OFF
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
index 1341353..a320d48 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
@@ -19,9 +19,9 @@ package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.junit.BeforeClass;
@@ -37,7 +37,7 @@ public final class RuleAlteredJobTest {
@BeforeClass
public static void beforeClass() throws Exception {
EmbedTestingServer.start();
- RuleAlteredContextUtil.mockModeConfig();
+ PipelineContextUtil.mockModeConfig();
}
@Test