You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/03/31 02:20:50 UTC
[shardingsphere] branch master updated: Refactor PipelineContext to support JDBC mode in pipeline jobs (#24883)
This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new bd0a45b9b47 Refactor PipelineContext to support JDBC mode in pipeline jobs (#24883)
bd0a45b9b47 is described below
commit bd0a45b9b47e7d6471bf2a166d0dbccee96d6cc0
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Fri Mar 31 10:20:42 2023 +0800
Refactor PipelineContext to support JDBC mode in pipeline jobs (#24883)
* Move EVENT_LISTENER_EXECUTOR from PipelineContext to PipelineMetaDataNodeWatcher
* Refactor ContextManagerLifecycleListener, add instanceType and databaseName in onInitialized
* Add PipelineContextManager
* Add ContextManagerLifecycleListener.onDestroyed
* Add context manager lifecycle callback in ShardingSphereDataSource
* Remove PipelineContext.initModeConfig and initContextManager
* Update ContextManagerLifecycleListener.onInitialized
* Rename YamlPipelineJobConfiguration.getTargetDatabaseName to getDatabaseName
* Add getInstanceType() and getDatabaseName() in PipelineJobConfiguration
* Remove checkModeConfig()
* Refactor PipelineAPIFactory.ElasticJobAPIHolder and PipelineAPIFactory methods
* Refactor PipelineJobAPI.list and update related invocation
* Refactor PipelineJobId.getJobTypeCode
* Add getInstanceType() and getDatabaseName() in PipelineJobId
* Refactor BasePipelineJobId and PipelineJobIdUtils
* Add getContextKey() in PipelineJobId
* Refactor ConsistencyCheckJobId and invocations
* Refactor MigrationJobAPI and invocations
* Refactor CDCJobAPI and invocations
* Revert a81b34e679d (Add getInstanceType() and getDatabaseName() in PipelineJobConfiguration)
* Temp work around for compile error
* Simplify parseJobId to parseContextKey in PipelineJobIdUtils
* Refactor PipelineAPIFactory.getJobOperateAPI and invocations
* Add InstanceTypeUtilTest; Update PipelineJobIdUtilsTest
* Simplify PipelineJobAPI.list parameter
* Refactor PipelineAPIFactory.getJobConfigurationAPI and invocations
* Refactor CoordinatorRegistryCenterInitializer
* Refactor RegistryCenterHolder.getInstance, PipelineAPIFactory.getRegistryCenter and invocations
* Ignore database name persistence to job id in proxy mode
* Refactor PipelineContextKey hashCode equals and simplify PipelineContextManager
* Refactor PipelineContextKey construction
* Update PipelineContext.getContextManager invocations in CDC
* Update PipelineContext.getContextManager invocations in migration
* Update PipelineContext.getContextManager invocations in PipelineJobPreparer
* Refactor PipelineDistributedBarrier and invocations
* Refactor PipelineMetaDataChangedEventHandler.handle, add jobId param
* Refactor PipelineAPIFactory.getGovernanceRepositoryAPI and invocations
* Refactor PipelineMetaDataNodeWatcher
* Refactor InventoryIncrementalJobAPI alterProcessConfiguration and showProcessConfiguration
* Refactor PipelineMetaDataPersistService.persist
* Replace PipelineContextKey.buildForProxy() to PipelineContextUtil.getContextKey in unit tests
* Refactor PipelineMetaDataPersistService.load
* Remove PipelineContext.getContextManager() static method
* Refactor PipelineJobIdUtils: not encode databaseName in job id for proxy mode
* Update code for style
* Update unit test
* Update renamed classes references
* Add H2 IT
* Ignore incremental stage for H2 in IT
* Remove PipelineJobWorker
* Clean exception stack trace in IT
* Self review
* Update code style
* Refactor PipelineJobAPI.extendYamlJobConfiguration
* Refactor PipelineJobId impls constructor
* Update code style
* Update code style
* Avoid possible NPE in AbstractPipelineJob.innerStop
---
.../core/datasource/ShardingSphereDataSource.java | 29 ++++++
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 37 ++++---
.../data/pipeline/cdc/core/job/CDCJobId.java | 10 +-
.../cdc/yaml/job/YamlCDCJobConfiguration.java | 5 -
.../data/pipeline/cdc/core/job/CDCJobIdTest.java | 5 +-
.../job/yaml/YamlPipelineJobConfiguration.java | 2 +-
.../data/pipeline/api/job/PipelineJobId.java | 14 ++-
.../core/api/InventoryIncrementalJobAPI.java | 7 +-
.../data/pipeline/core/api/PipelineAPIFactory.java | 116 +++++++++++++--------
.../data/pipeline/core/api/PipelineJobAPI.java | 7 +-
.../core/api/PipelineMetaDataPersistService.java | 7 +-
.../AbstractInventoryIncrementalJobAPIImpl.java | 23 ++--
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 43 +++-----
.../api/impl/PipelineDataSourcePersistService.java | 9 +-
...PipelineProcessConfigurationPersistService.java | 9 +-
.../pipeline/core/context/PipelineContext.java | 59 ++---------
.../pipeline/core/context/PipelineContextKey.java | 88 ++++++++++++++++
.../core/context/PipelineContextManager.java | 67 ++++++++++++
.../pipeline/core/execute/PipelineJobWorker.java | 50 ---------
.../pipeline/core/job/AbstractPipelineJob.java | 16 +--
.../pipeline/core/job/AbstractPipelineJobId.java | 15 +--
.../data/pipeline/core/job/PipelineJobIdUtils.java | 41 +++++++-
.../pipeline/core/job/util/InstanceTypeUtil.java | 65 ++++++++++++
.../PipelineContextManagerLifecycleListener.java | 23 +++-
...gSphereDataContextManagerLifecycleListener.java | 10 +-
.../metadata/node/PipelineMetaDataNodeWatcher.java | 38 +++++--
.../AbstractChangedJobConfigurationProcessor.java | 9 +-
.../PipelineMetaDataChangedEventHandler.java | 3 +-
.../impl/BarrierMetaDataChangedEventHandler.java | 5 +-
.../impl/ConfigMetaDataChangedEventHandler.java | 2 +-
.../core/prepare/PipelineJobPreparerUtils.java | 10 +-
.../CoordinatorRegistryCenterInitializer.java | 8 +-
.../core/util/PipelineDistributedBarrier.java | 41 +++++---
.../core/context/PipelineContextKeyTest.java | 46 ++++++++
.../core/job/util/InstanceTypeUtilTest.java} | 27 +++--
.../handler/query/ShowStreamingListExecutor.java | 3 +-
.../handler/query/ShowMigrationListExecutor.java | 3 +-
.../ShowMigrationSourceStorageUnitsExecutor.java | 3 +-
.../handler/update/MigrateTableUpdater.java | 3 +-
.../RegisterMigrationSourceStorageUnitUpdater.java | 3 +-
...nregisterMigrationSourceStorageUnitUpdater.java | 3 +-
.../consistencycheck/ConsistencyCheckJobId.java | 15 ++-
.../api/impl/ConsistencyCheckJobAPI.java | 45 ++++----
.../pojo/CreateConsistencyCheckJobParameter.java | 2 +-
.../yaml/YamlConsistencyCheckJobConfiguration.java | 2 +-
.../task/ConsistencyCheckTasksRunner.java | 2 +-
.../scenario/migration/MigrationJobId.java | 10 +-
.../migration/api/impl/MigrationJobAPI.java | 60 ++++++-----
.../migration/context/MigrationJobItemContext.java | 1 +
.../migration/prepare/MigrationJobPreparer.java | 24 +++--
.../yaml/job/YamlMigrationJobConfiguration.java | 5 +
.../job/YamlMigrationJobConfigurationSwapper.java | 2 +-
.../pipeline/core/job/PipelineJobIdUtilsTest.java | 24 +++--
.../listener/ContextManagerLifecycleListener.java | 18 +++-
.../backend/handler/cdc/CDCBackendHandler.java | 20 ++--
.../ral/queryable/ShowMigrationRuleExecutor.java | 4 +-
.../AlterInventoryIncrementalRuleUpdater.java | 3 +-
.../proxy/initializer/BootstrapInitializer.java | 3 +-
test/it/pipeline/pom.xml | 6 ++
.../api/impl/GovernanceRepositoryAPIImplTest.java | 13 +--
...lineProcessConfigurationPersistServiceTest.java | 4 +-
.../fixture/FixtureIncrementalDumperCreator.java | 11 --
.../core/fixture/H2CreateTableSQLGenerator.java | 46 ++++++++
.../core/fixture/H2DataSourcePreparer.java | 48 +++++++++
.../core/fixture/H2PositionInitializer.java | 36 +++----
.../pipeline/core/task/IncrementalTaskTest.java | 4 +-
.../core/util/JobConfigurationBuilder.java | 35 +++++--
.../pipeline/core/util/PipelineContextUtils.java | 41 +++++++-
.../core/util/PipelineDistributedBarrierTest.java | 17 +--
.../consistencycheck/ConsistencyCheckJobTest.java | 10 +-
.../api/impl/ConsistencyCheckJobAPITest.java | 39 +++----
.../migration/api/impl/MigrationJobAPITest.java | 30 +++---
.../MigrationDataConsistencyCheckerTest.java | 6 +-
...ine.core.prepare.datasource.DataSourcePreparer} | 13 +--
...eline.spi.ddlgenerator.CreateTableSQLGenerator} | 13 +--
...peline.spi.ingest.position.PositionInitializer} | 13 +--
.../migration_sharding_sphere_jdbc_target.yaml | 4 +-
.../resources/migration_standard_jdbc_source.yaml | 2 +-
78 files changed, 1036 insertions(+), 559 deletions(-)
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSource.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSource.java
index c8b15e49ea8..3172ff044cf 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSource.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSource.java
@@ -26,10 +26,13 @@ import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import org.apache.shardingsphere.infra.config.rule.scope.GlobalRuleConfiguration;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaDataBuilder;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
+import org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -57,6 +60,7 @@ public final class ShardingSphereDataSource extends AbstractDataSourceAdapter im
this.databaseName = databaseName;
contextManager = createContextManager(databaseName, modeConfig, new HashMap<>(), new LinkedList<>(), new Properties());
jdbcContext = new JDBCContext(contextManager.getDataSourceMap(databaseName));
+ contextManagerInitializedCallback(modeConfig, contextManager, databaseName);
}
public ShardingSphereDataSource(final String databaseName, final ModeConfiguration modeConfig, final Map<String, DataSource> dataSourceMap,
@@ -64,6 +68,7 @@ public final class ShardingSphereDataSource extends AbstractDataSourceAdapter im
this.databaseName = databaseName;
contextManager = createContextManager(databaseName, modeConfig, dataSourceMap, ruleConfigs, null == props ? new Properties() : props);
jdbcContext = new JDBCContext(contextManager.getDataSourceMap(databaseName));
+ contextManagerInitializedCallback(modeConfig, contextManager, databaseName);
}
private ContextManager createContextManager(final String databaseName, final ModeConfiguration modeConfig, final Map<String, DataSource> dataSourceMap,
@@ -77,6 +82,17 @@ public final class ShardingSphereDataSource extends AbstractDataSourceAdapter im
return TypedSPILoader.getService(ContextManagerBuilder.class, null == modeConfig ? null : modeConfig.getType()).build(param);
}
+ private void contextManagerInitializedCallback(final ModeConfiguration modeConfig, final ContextManager contextManager, final String databaseName) {
+ for (ContextManagerLifecycleListener each : ShardingSphereServiceLoader.getServiceInstances(ContextManagerLifecycleListener.class)) {
+ try {
+ each.onInitialized(InstanceType.JDBC, databaseName, modeConfig, contextManager);
+ // CHECKSTYLE:OFF
+ } catch (final Exception ignored) {
+ // CHECKSTYLE:ON
+ }
+ }
+ }
+
@Override
public Connection getConnection() {
return DriverStateContext.getConnection(databaseName, contextManager, jdbcContext);
@@ -93,6 +109,7 @@ public final class ShardingSphereDataSource extends AbstractDataSourceAdapter im
* @param dataSourceNames data source names to be closed
* @throws Exception exception
*/
+ // TODO Replace public to private?
public void close(final Collection<String> dataSourceNames) throws Exception {
Map<String, DataSource> dataSourceMap = contextManager.getDataSourceMap(databaseName);
for (String each : dataSourceNames) {
@@ -109,9 +126,21 @@ public final class ShardingSphereDataSource extends AbstractDataSourceAdapter im
@Override
public void close() throws Exception {
+ contextManagerDestroyedCallback(databaseName);
close(contextManager.getDataSourceMap(databaseName).keySet());
}
+ private void contextManagerDestroyedCallback(final String databaseName) {
+ for (ContextManagerLifecycleListener each : ShardingSphereServiceLoader.getServiceInstances(ContextManagerLifecycleListener.class)) {
+ try {
+ each.onDestroyed(InstanceType.JDBC, databaseName);
+ // CHECKSTYLE:OFF
+ } catch (final Exception ignored) {
+ // CHECKSTYLE:ON
+ }
+ }
+ }
+
@Override
public int getLoginTimeout() throws SQLException {
Map<String, DataSource> dataSourceMap = contextManager.getDataSourceMap(databaseName);
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 841513c66fc..2d11a35d2c7 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -60,10 +60,12 @@ import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractInventoryIncrementalJobAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.prepare.PipelineJobPreparerUtils;
import org.apache.shardingsphere.data.pipeline.core.sharding.ShardingColumnsExtractor;
@@ -123,30 +125,32 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
sinkConfig.setSinkType(sinkType.name());
sinkConfig.setProps(sinkProps);
yamlJobConfig.setSinkConfig(sinkConfig);
- ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabaseName());
+ PipelineContextKey contextKey = PipelineContextKey.buildForProxy(param.getDatabaseName());
+ ShardingSphereDatabase database = PipelineContextManager.getContext(contextKey).getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabaseName());
yamlJobConfig.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
List<JobDataNodeLine> jobDataNodeLines = JobDataNodeLineConvertUtils.convertDataNodesToLines(param.getDataNodesMap());
yamlJobConfig.setJobShardingDataNodes(jobDataNodeLines.stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
JobDataNodeLine tableFirstDataNodes = new JobDataNodeLine(param.getDataNodesMap().entrySet().stream().map(each -> new JobDataNodeEntry(each.getKey(), each.getValue().subList(0, 1)))
.collect(Collectors.toList()));
yamlJobConfig.setTablesFirstDataNodes(tableFirstDataNodes.marshal());
- extendYamlJobConfiguration(yamlJobConfig);
+ extendYamlJobConfiguration(contextKey, yamlJobConfig);
CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
- ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
- GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
- String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobConfig.getJobId());
+ String jobId = jobConfig.getJobId();
+ ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobId));
+ GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId));
+ String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobId);
if (repositoryAPI.isExisted(jobConfigKey)) {
log.warn("CDC job already exists in registry center, ignore, jobConfigKey={}", jobConfigKey);
- return jobConfig.getJobId();
+ return jobId;
}
- repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()), getJobClassName());
+ repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), getJobClassName());
JobConfigurationPOJO jobConfigPOJO = convertJobConfiguration(jobConfig);
jobConfigPOJO.setDisabled(true);
repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO));
if (!param.isFull()) {
initIncrementalPosition(jobConfig);
}
- return jobConfig.getJobId();
+ return jobId;
}
private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
@@ -165,7 +169,8 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress();
incrementalTaskProgress.setPosition(PipelineJobPreparerUtils.getIncrementalPosition(null, dumperConfig, dataSourceManager));
jobItemProgress.setIncremental(new JobItemIncrementalTasksProgress(incrementalTaskProgress));
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, i, YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
+ PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(jobId, i,
+ YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
}
} catch (final SQLException ex) {
throw new PrepareJobWithGetBinlogPositionException(jobConfig.getJobId(), ex);
@@ -188,10 +193,10 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
}
@Override
- public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration yamlJobConfig) {
+ public void extendYamlJobConfiguration(final PipelineContextKey contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
YamlCDCJobConfiguration config = (YamlCDCJobConfiguration) yamlJobConfig;
if (null == yamlJobConfig.getJobId()) {
- config.setJobId(generateJobId(config));
+ config.setJobId(generateJobId(contextKey, config));
}
if (Strings.isNullOrEmpty(config.getSourceDatabaseType())) {
PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(config.getDataSourceConfiguration().getType(),
@@ -200,16 +205,16 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
}
}
- private String generateJobId(final YamlCDCJobConfiguration config) {
+ private String generateJobId(final PipelineContextKey contextKey, final YamlCDCJobConfiguration config) {
// TODO generate parameter add sink type
- CDCJobId jobId = new CDCJobId(config.getDatabaseName(), config.getSchemaTableNames(), config.isFull());
+ CDCJobId jobId = new CDCJobId(contextKey, config.getSchemaTableNames(), config.isFull());
return marshalJobId(jobId);
}
@Override
protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
CDCJobId jobId = (CDCJobId) pipelineJobId;
- String text = Joiner.on('|').join(jobId.getDatabaseName(), jobId.getSchemaTableNames(), jobId.isFull());
+ String text = Joiner.on('|').join(jobId.getContextKey().getDatabaseName(), jobId.getSchemaTableNames(), jobId.isFull());
return DigestUtils.md5Hex(text.getBytes(StandardCharsets.UTF_8));
}
@@ -265,7 +270,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
@Override
public CDCProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) {
- return new CDCProcessContext(pipelineJobConfig.getJobId(), showProcessConfiguration());
+ return new CDCProcessContext(pipelineJobConfig.getJobId(), showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId())));
}
@Override
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
index 1a6590b8f5c..ec62190399a 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.core.job;
import lombok.Getter;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
import java.util.List;
@@ -31,17 +32,12 @@ import java.util.List;
@ToString(callSuper = true)
public final class CDCJobId extends AbstractPipelineJobId {
- public static final String CURRENT_VERSION = "01";
-
- private final String databaseName;
-
private final List<String> schemaTableNames;
private final boolean full;
- public CDCJobId(final String databaseName, final List<String> schemaTableNames, final boolean full) {
- super(new CDCJobType(), CURRENT_VERSION);
- this.databaseName = databaseName;
+ public CDCJobId(final PipelineContextKey contextKey, final List<String> schemaTableNames, final boolean full) {
+ super(new CDCJobType(), contextKey);
this.schemaTableNames = schemaTableNames;
this.full = full;
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
index 5559dc2e9ec..934b4d175ff 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
@@ -56,11 +56,6 @@ public final class YamlCDCJobConfiguration implements YamlPipelineJobConfigurati
private int retryTimes;
- @Override
- public String getTargetDatabaseName() {
- throw new UnsupportedOperationException();
- }
-
/**
* Sink configuration for YAML.
*/
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
index eb040ec720f..9d6b1aade72 100644
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
+++ b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
@@ -18,8 +18,10 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.job;
import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
@@ -31,7 +33,8 @@ class CDCJobIdTest {
@Test
void parseJobType() {
- CDCJobId pipelineJobId = new CDCJobId("sharding_db", Arrays.asList("test", "t_order"), false);
+ PipelineContextKey contextKey = PipelineContextKey.build(InstanceType.PROXY, "sharding_db");
+ CDCJobId pipelineJobId = new CDCJobId(contextKey, Arrays.asList("test", "t_order"), false);
String jobId = PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + "abcd";
JobType actualJobType = PipelineJobIdUtils.parseJobType(jobId);
assertThat(actualJobType, instanceOf(CDCJobType.class));
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlPipelineJobConfiguration.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlPipelineJobConfiguration.java
index 89b0c9716dd..c0ab0ca640b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlPipelineJobConfiguration.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlPipelineJobConfiguration.java
@@ -36,5 +36,5 @@ public interface YamlPipelineJobConfiguration extends YamlConfiguration {
*
* @return database name
*/
- String getTargetDatabaseName();
+ String getDatabaseName();
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
index 127933a3086..4a36c00c9e0 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
@@ -17,17 +17,20 @@
package org.apache.shardingsphere.data.pipeline.api.job;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
+
/**
* Pipeline job id.
*/
public interface PipelineJobId {
/**
- * Get job type code.
+ * Get job type.
*
* @return type
*/
- String getJobTypeCode();
+ JobType getJobType();
/**
* Get format version.
@@ -35,4 +38,11 @@ public interface PipelineJobId {
* @return format version
*/
String getFormatVersion();
+
+ /**
+ * Get pipeline context key.
+ *
+ * @return context key
+ */
+ PipelineContextKey getContextKey();
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
index 7ed1701a46f..420ad67ddf7 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
@@ -25,6 +25,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.JobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import java.sql.SQLException;
@@ -42,16 +43,18 @@ public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
/**
* Alter process configuration.
*
+ * @param contextKey context key
* @param processConfig process configuration
*/
- void alterProcessConfiguration(PipelineProcessConfiguration processConfig);
+ void alterProcessConfiguration(PipelineContextKey contextKey, PipelineProcessConfiguration processConfig);
/**
* Show process configuration.
*
+ * @param contextKey context key
* @return process configuration, non-null
*/
- PipelineProcessConfiguration showProcessConfiguration();
+ PipelineProcessConfiguration showProcessConfiguration(PipelineContextKey contextKey);
/**
* Persist job offset info.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
index b6b8afb5c79..20edaa8f642 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
@@ -25,16 +25,23 @@ import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.apache.shardingsphere.data.pipeline.core.api.impl.GovernanceRepositoryAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.registry.CoordinatorRegistryCenterInitializer;
-import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobAPIFactory;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobOperateAPI;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobStatisticsAPI;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.operate.JobOperateAPIImpl;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.settings.JobConfigurationAPIImpl;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.statistics.JobStatisticsAPIImpl;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Pipeline API factory.
@@ -42,64 +49,70 @@ import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class PipelineAPIFactory {
- private static final LazyInitializer<GovernanceRepositoryAPI> REPOSITORY_API_LAZY_INITIALIZER = new LazyInitializer<GovernanceRepositoryAPI>() {
-
- @Override
- protected GovernanceRepositoryAPI initialize() {
- return new GovernanceRepositoryAPIImpl((ClusterPersistRepository) PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository());
- }
- };
+ private static final Map<PipelineContextKey, LazyInitializer<GovernanceRepositoryAPI>> GOVERNANCE_REPOSITORY_API_MAP = new ConcurrentHashMap<>();
/**
* Get governance repository API.
*
+ * @param contextKey context key
* @return governance repository API
*/
@SneakyThrows(ConcurrentException.class)
- public static GovernanceRepositoryAPI getGovernanceRepositoryAPI() {
- return REPOSITORY_API_LAZY_INITIALIZER.get();
+ public static GovernanceRepositoryAPI getGovernanceRepositoryAPI(final PipelineContextKey contextKey) {
+ return GOVERNANCE_REPOSITORY_API_MAP.computeIfAbsent(contextKey, key -> new LazyInitializer<GovernanceRepositoryAPI>() {
+
+ @Override
+ protected GovernanceRepositoryAPI initialize() {
+ ContextManager contextManager = PipelineContextManager.getContext(contextKey).getContextManager();
+ return new GovernanceRepositoryAPIImpl((ClusterPersistRepository) contextManager.getMetaDataContexts().getPersistService().getRepository());
+ }
+ }).get();
}
/**
* Get job statistics API.
*
+ * @param contextKey context key
* @return job statistics API
*/
- public static JobStatisticsAPI getJobStatisticsAPI() {
- return ElasticJobAPIHolder.getInstance().getJobStatisticsAPI();
+ public static JobStatisticsAPI getJobStatisticsAPI(final PipelineContextKey contextKey) {
+ return ElasticJobAPIHolder.getInstance(contextKey).getJobStatisticsAPI();
}
/**
* Get job configuration API.
*
+ * @param contextKey context key
* @return job configuration API
*/
- public static JobConfigurationAPI getJobConfigurationAPI() {
- return ElasticJobAPIHolder.getInstance().getJobConfigurationAPI();
+ public static JobConfigurationAPI getJobConfigurationAPI(final PipelineContextKey contextKey) {
+ return ElasticJobAPIHolder.getInstance(contextKey).getJobConfigurationAPI();
}
/**
* Get job operate API.
*
+ * @param contextKey context key
* @return job operate API
*/
- public static JobOperateAPI getJobOperateAPI() {
- return ElasticJobAPIHolder.getInstance().getJobOperateAPI();
+ public static JobOperateAPI getJobOperateAPI(final PipelineContextKey contextKey) {
+ return ElasticJobAPIHolder.getInstance(contextKey).getJobOperateAPI();
}
/**
* Get registry center.
*
+ * @param contextKey context key
* @return Coordinator registry center
*/
- public static CoordinatorRegistryCenter getRegistryCenter() {
- return RegistryCenterHolder.getInstance();
+ public static CoordinatorRegistryCenter getRegistryCenter(final PipelineContextKey contextKey) {
+ return RegistryCenterHolder.getInstance(contextKey);
}
@Getter
private static final class ElasticJobAPIHolder {
- private static volatile ElasticJobAPIHolder instance;
+ private static final Map<PipelineContextKey, ElasticJobAPIHolder> INSTANCE_MAP = new ConcurrentHashMap<>();
private final JobStatisticsAPI jobStatisticsAPI;
@@ -107,45 +120,60 @@ public final class PipelineAPIFactory {
private final JobOperateAPI jobOperateAPI;
- private ElasticJobAPIHolder() {
- ClusterPersistRepositoryConfiguration repositoryConfig = (ClusterPersistRepositoryConfiguration) PipelineContext.getModeConfig().getRepository();
- String namespace = repositoryConfig.getNamespace() + PipelineMetaDataNode.getElasticJobNamespace();
- jobStatisticsAPI = JobAPIFactory.createJobStatisticsAPI(repositoryConfig.getServerLists(), namespace, null);
- jobConfigurationAPI = JobAPIFactory.createJobConfigurationAPI(repositoryConfig.getServerLists(), namespace, null);
- jobOperateAPI = JobAPIFactory.createJobOperateAPI(repositoryConfig.getServerLists(), namespace, null);
+ private ElasticJobAPIHolder(final PipelineContextKey contextKey) {
+ CoordinatorRegistryCenter registryCenter = getRegistryCenter(contextKey);
+ jobStatisticsAPI = new JobStatisticsAPIImpl(registryCenter);
+ jobConfigurationAPI = new JobConfigurationAPIImpl(registryCenter);
+ jobOperateAPI = new JobOperateAPIImpl(registryCenter);
}
- public static ElasticJobAPIHolder getInstance() {
- if (null == instance) {
- synchronized (PipelineAPIFactory.class) {
- if (null == instance) {
- instance = new ElasticJobAPIHolder();
- }
+ public static ElasticJobAPIHolder getInstance(final PipelineContextKey contextKey) {
+ ElasticJobAPIHolder result = INSTANCE_MAP.get(contextKey);
+ if (null != result) {
+ return result;
+ }
+ synchronized (INSTANCE_MAP) {
+ result = INSTANCE_MAP.get(contextKey);
+ if (null == result) {
+ result = new ElasticJobAPIHolder(contextKey);
+ INSTANCE_MAP.put(contextKey, result);
}
}
- return instance;
+ return result;
}
}
private static final class RegistryCenterHolder {
- private static volatile CoordinatorRegistryCenter instance;
+ private static final Map<PipelineContextKey, CoordinatorRegistryCenter> INSTANCE_MAP = new ConcurrentHashMap<>();
- public static CoordinatorRegistryCenter getInstance() {
- if (null == instance) {
- synchronized (PipelineAPIFactory.class) {
- if (null == instance) {
- instance = createRegistryCenter();
- }
+ public static CoordinatorRegistryCenter getInstance(final PipelineContextKey contextKey) {
+ // TODO Extract common method; Reduce lock time
+ CoordinatorRegistryCenter result = INSTANCE_MAP.get(contextKey);
+ if (null != result) {
+ return result;
+ }
+ synchronized (INSTANCE_MAP) {
+ result = INSTANCE_MAP.get(contextKey);
+ if (null == result) {
+ result = createRegistryCenter(contextKey);
+ INSTANCE_MAP.put(contextKey, result);
}
}
- return instance;
+ return result;
}
- private static CoordinatorRegistryCenter createRegistryCenter() {
+ private static CoordinatorRegistryCenter createRegistryCenter(final PipelineContextKey contextKey) {
CoordinatorRegistryCenterInitializer registryCenterInitializer = new CoordinatorRegistryCenterInitializer();
- ModeConfiguration modeConfig = PipelineContext.getModeConfig();
- return registryCenterInitializer.createRegistryCenter(modeConfig, PipelineMetaDataNode.getElasticJobNamespace());
+ PipelineContext pipelineContext = PipelineContextManager.getContext(contextKey);
+ ModeConfiguration modeConfig = pipelineContext.getModeConfig();
+ String elasticJobNamespace = PipelineMetaDataNode.getElasticJobNamespace();
+ String clusterType = modeConfig.getRepository().getType();
+ if ("ZooKeeper".equals(clusterType)) {
+ return registryCenterInitializer.createZookeeperRegistryCenter(modeConfig, elasticJobNamespace);
+ } else {
+ throw new IllegalArgumentException("Unsupported cluster type: " + clusterType);
+ }
}
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
index 6e11e1a439f..252a8f80df4 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
@@ -59,9 +60,10 @@ public interface PipelineJobAPI extends TypedSPI {
/**
* Extend YAML job configuration.
*
+ * @param contextKey context key
* @param yamlJobConfig YAML job configuration
*/
- void extendYamlJobConfiguration(YamlPipelineJobConfiguration yamlJobConfig);
+ void extendYamlJobConfiguration(PipelineContextKey contextKey, YamlPipelineJobConfiguration yamlJobConfig);
/**
* Build task configuration.
@@ -115,9 +117,10 @@ public interface PipelineJobAPI extends TypedSPI {
/**
* Get pipeline job info.
*
+ * @param contextKey context key
* @return job info list
*/
- List<? extends PipelineJobInfo> list();
+ List<? extends PipelineJobInfo> list(PipelineContextKey contextKey);
/**
* Persist job item progress.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
index 18dd0e68a9d..f12948dc365 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.api;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
/**
@@ -29,16 +30,18 @@ public interface PipelineMetaDataPersistService<T> {
/**
* Load meta data.
*
+ * @param contextKey context key
* @param jobType job type, nullable
* @return configurations
*/
- T load(JobType jobType);
+ T load(PipelineContextKey contextKey, JobType jobType);
/**
* Persist meta data.
*
+ * @param contextKey context key
* @param jobType job type, nullable
* @param configs configurations
*/
- void persist(JobType jobType, T configs);
+ void persist(PipelineContextKey contextKey, JobType jobType, T configs);
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 9c9ddf03e8f..8199e850b03 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -41,6 +41,8 @@ import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsis
import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtils;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobOffsetInfo;
@@ -87,14 +89,14 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
public abstract InventoryIncrementalProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
@Override
- public void alterProcessConfiguration(final PipelineProcessConfiguration processConfig) {
+ public void alterProcessConfiguration(final PipelineContextKey contextKey, final PipelineProcessConfiguration processConfig) {
// TODO check rateLimiter type match or not
- processConfigPersistService.persist(getJobType(), processConfig);
+ processConfigPersistService.persist(contextKey, getJobType(), processConfig);
}
@Override
- public PipelineProcessConfiguration showProcessConfiguration() {
- PipelineProcessConfiguration result = processConfigPersistService.load(getJobType());
+ public PipelineProcessConfiguration showProcessConfiguration(final PipelineContextKey contextKey) {
+ PipelineProcessConfiguration result = processConfigPersistService.load(contextKey, getJobType());
result = PipelineProcessConfigurationUtils.convertWithDefaultValue(result);
return result;
}
@@ -151,7 +153,8 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
jobItemProgress.setProcessedRecordsCount(context.getProcessedRecordsCount());
jobItemProgress.setInventoryRecordsCount(context.getInventoryRecordsCount());
String value = YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress));
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(context.getJobId(), context.getShardingItem(), value);
+ String jobId = context.getJobId();
+ PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(jobId, context.getShardingItem(), value);
}
private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final Collection<IncrementalTask> incrementalTasks) {
@@ -170,12 +173,12 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
@Override
public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo jobOffsetInfo) {
String value = YamlEngine.marshal(jobOffsetInfoSwapper.swapToYamlConfiguration(jobOffsetInfo));
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobOffsetInfo(jobId, value);
+ PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobOffsetInfo(jobId, value);
}
@Override
public JobOffsetInfo getJobOffsetInfo(final String jobId) {
- Optional<String> offsetInfo = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobOffsetInfo(jobId);
+ Optional<String> offsetInfo = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetInfo(jobId);
if (offsetInfo.isPresent()) {
YamlJobOffsetInfo info = YamlEngine.unmarshal(offsetInfo.get(), YamlJobOffsetInfo.class);
return jobOffsetInfoSwapper.swapToObject(info);
@@ -185,7 +188,7 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
@Override
public Optional<InventoryIncrementalJobItemProgress> getJobItemProgress(final String jobId, final int shardingItem) {
- Optional<String> progress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, shardingItem);
+ Optional<String> progress = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId, shardingItem);
return progress.map(s -> jobItemProgressSwapper.swapToObject(YamlEngine.unmarshal(s, YamlInventoryIncrementalJobItemProgress.class)));
}
@@ -196,12 +199,12 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
return;
}
jobItemProgress.get().setStatus(status);
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress.get())));
+ PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(jobId, shardingItem,
+ YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress.get())));
}
@Override
public Collection<DataConsistencyCheckAlgorithmInfo> listDataConsistencyCheckAlgorithms() {
- checkModeConfig();
Collection<DataConsistencyCheckAlgorithmInfo> result = new LinkedList<>();
for (DataConsistencyCalculateAlgorithm each : ShardingSphereServiceLoader.getServiceInstances(DataConsistencyCalculateAlgorithm.class)) {
SPIDescription description = each.getClass().getAnnotation(SPIDescription.class);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index a158ba9ea1a..e60dced8736 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -27,19 +27,16 @@ import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobMetaData;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import org.apache.shardingsphere.data.pipeline.core.exception.job.ModeConfigNotFoundException;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
-import org.apache.shardingsphere.data.pipeline.core.exception.job.UnsupportedModeTypeException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.listener.PipelineElasticJobListener;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
-import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -68,19 +65,12 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
protected abstract String marshalJobIdLeftPart(PipelineJobId pipelineJobId);
@Override
- public List<? extends PipelineJobInfo> list() {
- checkModeConfig();
- return getJobBriefInfos().map(each -> getJobInfo(each.getJobName())).collect(Collectors.toList());
+ public List<? extends PipelineJobInfo> list(final PipelineContextKey contextKey) {
+ return getJobBriefInfos(contextKey).map(each -> getJobInfo(each.getJobName())).collect(Collectors.toList());
}
- protected void checkModeConfig() {
- ModeConfiguration modeConfig = PipelineContext.getModeConfig();
- ShardingSpherePreconditions.checkNotNull(modeConfig, ModeConfigNotFoundException::new);
- ShardingSpherePreconditions.checkState("Cluster".equals(modeConfig.getType()), () -> new UnsupportedModeTypeException(modeConfig.getType()));
- }
-
- private Stream<JobBriefInfo> getJobBriefInfos() {
- return PipelineAPIFactory.getJobStatisticsAPI().getAllJobsBriefInfo().stream().filter(each -> !each.getJobName().startsWith("_"))
+ private Stream<JobBriefInfo> getJobBriefInfos(final PipelineContextKey contextKey) {
+ return PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each -> !each.getJobName().startsWith("_"))
.filter(each -> PipelineJobIdUtils.parseJobType(each.getJobName()).getTypeCode().equals(getJobType().getTypeCode()));
}
@@ -96,7 +86,7 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
public Optional<String> start(final PipelineJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobId));
- GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
+ GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId));
String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobId);
if (repositoryAPI.isExisted(jobConfigKey)) {
log.warn("jobId already exists in registry center, ignore, jobConfigKey={}", jobConfigKey);
@@ -127,7 +117,7 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
@Override
public void startDisabledJob(final String jobId) {
- PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance();
+ PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () -> new PipelineJobHasAlreadyStartedException(jobId));
@@ -137,13 +127,13 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
jobConfigPOJO.getProps().remove("stop_time_millis");
String barrierEnablePath = PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
pipelineDistributedBarrier.register(barrierEnablePath, jobConfigPOJO.getShardingTotalCount());
- PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
+ PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
pipelineDistributedBarrier.await(barrierEnablePath, 5, TimeUnit.SECONDS);
}
@Override
public void stop(final String jobId) {
- PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance();
+ PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
if (jobConfigPOJO.isDisabled()) {
@@ -154,17 +144,18 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
jobConfigPOJO.getProps().setProperty("stop_time_millis", System.currentTimeMillis() + "");
String barrierPath = PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
pipelineDistributedBarrier.register(barrierPath, jobConfigPOJO.getShardingTotalCount());
- PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
+ PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
pipelineDistributedBarrier.await(barrierPath, 5, TimeUnit.SECONDS);
}
protected void dropJob(final String jobId) {
- PipelineAPIFactory.getJobOperateAPI().remove(String.valueOf(jobId), null);
- PipelineAPIFactory.getGovernanceRepositoryAPI().deleteJob(jobId);
+ PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(jobId);
+ PipelineAPIFactory.getJobOperateAPI(contextKey).remove(String.valueOf(jobId), null);
+ PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).deleteJob(jobId);
}
protected final JobConfigurationPOJO getElasticJobConfigPOJO(final String jobId) throws PipelineJobNotFoundException {
- JobConfigurationPOJO result = PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
+ JobConfigurationPOJO result = PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobConfiguration(jobId);
ShardingSpherePreconditions.checkNotNull(result, () -> new PipelineJobNotFoundException(jobId));
return result;
}
@@ -176,7 +167,7 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
@Override
public String getJobItemErrorMessage(final String jobId, final int shardingItem) {
- return Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemErrorMessage(jobId, shardingItem)).orElse("");
+ return Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId, shardingItem)).orElse("");
}
@Override
@@ -186,11 +177,11 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
if (null != error) {
value = error instanceof Throwable ? ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
}
- PipelineAPIFactory.getGovernanceRepositoryAPI().persist(key, value);
+ PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persist(key, value);
}
@Override
public void cleanJobItemErrorMessage(final String jobId, final int shardingItem) {
- PipelineAPIFactory.getGovernanceRepositoryAPI().cleanJobItemErrorMessage(jobId, shardingItem);
+ PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).cleanJobItemErrorMessage(jobId, shardingItem);
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java
index d7132d74318..67d7d62e838 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.api.impl;
import com.google.common.base.Strings;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineMetaDataPersistService;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -39,8 +40,8 @@ public final class PipelineDataSourcePersistService implements PipelineMetaDataP
@Override
@SuppressWarnings("unchecked")
- public Map<String, DataSourceProperties> load(final JobType jobType) {
- String dataSourcesProps = PipelineAPIFactory.getGovernanceRepositoryAPI().getMetaDataDataSources(jobType);
+ public Map<String, DataSourceProperties> load(final PipelineContextKey contextKey, final JobType jobType) {
+ String dataSourcesProps = PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataDataSources(jobType);
if (Strings.isNullOrEmpty(dataSourcesProps)) {
return Collections.emptyMap();
}
@@ -51,11 +52,11 @@ public final class PipelineDataSourcePersistService implements PipelineMetaDataP
}
@Override
- public void persist(final JobType jobType, final Map<String, DataSourceProperties> dataSourcePropsMap) {
+ public void persist(final PipelineContextKey contextKey, final JobType jobType, final Map<String, DataSourceProperties> dataSourcePropsMap) {
Map<String, Map<String, Object>> dataSourceMap = new LinkedHashMap<>(dataSourcePropsMap.size());
for (Entry<String, DataSourceProperties> entry : dataSourcePropsMap.entrySet()) {
dataSourceMap.put(entry.getKey(), swapper.swapToMap(entry.getValue()));
}
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistMetaDataDataSources(jobType, YamlEngine.marshal(dataSourceMap));
+ PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).persistMetaDataDataSources(jobType, YamlEngine.marshal(dataSourceMap));
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
index b70655a18c4..11170f6b210 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipel
import org.apache.shardingsphere.data.pipeline.api.config.process.yaml.swapper.YamlPipelineProcessConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineMetaDataPersistService;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -34,8 +35,8 @@ public final class PipelineProcessConfigurationPersistService implements Pipelin
private final YamlPipelineProcessConfigurationSwapper swapper = new YamlPipelineProcessConfigurationSwapper();
@Override
- public PipelineProcessConfiguration load(final JobType jobType) {
- String yamlText = PipelineAPIFactory.getGovernanceRepositoryAPI().getMetaDataProcessConfiguration(jobType);
+ public PipelineProcessConfiguration load(final PipelineContextKey contextKey, final JobType jobType) {
+ String yamlText = PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataProcessConfiguration(jobType);
if (Strings.isNullOrEmpty(yamlText)) {
return null;
}
@@ -44,8 +45,8 @@ public final class PipelineProcessConfigurationPersistService implements Pipelin
}
@Override
- public void persist(final JobType jobType, final PipelineProcessConfiguration processConfig) {
+ public void persist(final PipelineContextKey contextKey, final JobType jobType, final PipelineProcessConfiguration processConfig) {
String yamlText = YamlEngine.marshal(swapper.swapToYamlConfiguration(processConfig));
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistMetaDataProcessConfiguration(jobType, yamlText);
+ PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).persistMetaDataProcessConfiguration(jobType, yamlText);
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java
index 5673a99a0e0..8f76f79b8fc 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java
@@ -17,66 +17,19 @@
package org.apache.shardingsphere.data.pipeline.core.context;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
/**
* Pipeline context.
*/
+@RequiredArgsConstructor
+@Getter
public final class PipelineContext {
- private static volatile ModeConfiguration modeConfig;
-
- private static volatile ContextManager contextManager;
-
- private static final ExecutorService EVENT_LISTENER_EXECUTOR = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Pipeline-EventListener-%d").build());
-
- /**
- * Get mode configuration.
- *
- * @return mode configuration
- */
- public static ModeConfiguration getModeConfig() {
- return modeConfig;
- }
-
- /**
- * Initialize mode configuration.
- *
- * @param modeConfig configuration
- */
- public static void initModeConfig(final ModeConfiguration modeConfig) {
- PipelineContext.modeConfig = modeConfig;
- }
-
- /**
- * Get context manager.
- *
- * @return context manager
- */
- public static ContextManager getContextManager() {
- return contextManager;
- }
-
- /**
- * Initialize context manager.
- *
- * @param contextManager context manager
- */
- public static void initContextManager(final ContextManager contextManager) {
- PipelineContext.contextManager = contextManager;
- }
+ private final ModeConfiguration modeConfig;
- /**
- * Get pipeline executor.
- *
- * @return pipeline executor
- */
- public static ExecutorService getEventListenerExecutor() {
- return EVENT_LISTENER_EXECUTOR;
- }
+ private final ContextManager contextManager;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextKey.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextKey.java
new file mode 100644
index 00000000000..238eb25013a
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextKey.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.context;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+
+import java.util.Objects;
+
+/**
+ * Pipeline context key.
+ */
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+@Getter
+public final class PipelineContextKey {
+
+ private final InstanceType instanceType;
+
+ private final String databaseName;
+
+ /**
+ * Build context key.
+ *
+ * @param instanceType instance type
+ * @param databaseName database name
+ * @return context key
+ */
+ public static PipelineContextKey build(final InstanceType instanceType, final String databaseName) {
+ return new PipelineContextKey(instanceType, databaseName);
+ }
+
+ /**
+ * Build context key for proxy.
+ *
+ * @return context key
+ */
+ public static PipelineContextKey buildForProxy() {
+ return new PipelineContextKey(InstanceType.PROXY, "");
+ }
+
+ /**
+ * Build context key for proxy.
+ *
+ * @param databaseName database name
+ * @return context key
+ */
+ public static PipelineContextKey buildForProxy(final String databaseName) {
+ return new PipelineContextKey(InstanceType.PROXY, databaseName);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final PipelineContextKey that = (PipelineContextKey) o;
+ return instanceType == that.instanceType && Objects.equals(filterDatabaseName(this), filterDatabaseName(that));
+ }
+
+ private static String filterDatabaseName(final PipelineContextKey contextKey) {
+ return contextKey.getInstanceType() == InstanceType.PROXY ? "" : contextKey.getDatabaseName();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(instanceType, filterDatabaseName(this));
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextManager.java
new file mode 100644
index 00000000000..2f5f0d87528
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextManager.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.context;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Pipeline context manager.
+ */
+public final class PipelineContextManager {
+
+ private static final Map<PipelineContextKey, PipelineContext> CONTEXT_MAP = new ConcurrentHashMap<>();
+
+ /**
+ * Get context.
+ *
+ * @param key key
+ * @return context
+ */
+ public static PipelineContext getContext(final PipelineContextKey key) {
+ return CONTEXT_MAP.get(key);
+ }
+
+ /**
+ * Get context.
+ *
+ * @return context
+ */
+ public static PipelineContext getProxyContext() {
+ return CONTEXT_MAP.get(PipelineContextKey.buildForProxy());
+ }
+
+ /**
+ * Put context.
+ *
+ * @param key key
+ * @param context context
+ */
+ public static void putContext(final PipelineContextKey key, final PipelineContext context) {
+ CONTEXT_MAP.put(key, context);
+ }
+
+ /**
+ * Remove context.
+ *
+ * @param key key
+ */
+ public static void removeContext(final PipelineContextKey key) {
+ CONTEXT_MAP.remove(key);
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
deleted file mode 100644
index 6a9061539b1..00000000000
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.execute;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNodeWatcher;
-
-/**
- * Pipeline job worker.
- */
-@Slf4j
-public final class PipelineJobWorker {
-
- private static final AtomicBoolean WORKER_INITIALIZED = new AtomicBoolean(false);
-
- /**
- * Initialize job worker.
- */
- public static void initialize() {
- if (WORKER_INITIALIZED.get()) {
- return;
- }
- synchronized (WORKER_INITIALIZED) {
- if (WORKER_INITIALIZED.get()) {
- return;
- }
- log.info("start worker initialization");
- PipelineMetaDataNodeWatcher.getInstance();
- WORKER_INITIALIZED.set(true);
- log.info("worker initialization done");
- }
- }
-}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 5fcf64b5522..966f32c3138 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -100,8 +100,9 @@ public abstract class AbstractPipelineJob implements PipelineJob {
log.warn("shardingItem {} tasks runner exists, ignore", shardingItem);
return false;
}
- PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(), shardingItem);
- PipelineDistributedBarrier.getInstance().persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()), shardingItem);
+ String jobId = tasksRunner.getJobItemContext().getJobId();
+ PipelineJobProgressPersistService.addJobProgressPersistContext(jobId, shardingItem);
+ PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId)).persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(jobId), shardingItem);
return true;
}
@@ -121,12 +122,13 @@ public abstract class AbstractPipelineJob implements PipelineJob {
for (PipelineTasksRunner each : tasksRunnerMap.values()) {
each.stop();
}
- Optional<ElasticJobListener> pipelineJobListener = ElasticJobServiceLoader.getCachedTypedServiceInstance(ElasticJobListener.class, PipelineElasticJobListener.class.getName());
- pipelineJobListener.ifPresent(jobListener -> awaitJobStopped((PipelineElasticJobListener) jobListener, jobId, TimeUnit.SECONDS.toMillis(2)));
- if (null == jobBootstrap) {
- return;
+ if (null != jobId) {
+ Optional<ElasticJobListener> pipelineJobListener = ElasticJobServiceLoader.getCachedTypedServiceInstance(ElasticJobListener.class, PipelineElasticJobListener.class.getName());
+ pipelineJobListener.ifPresent(jobListener -> awaitJobStopped((PipelineElasticJobListener) jobListener, jobId, TimeUnit.SECONDS.toMillis(2)));
+ }
+ if (null != jobBootstrap) {
+ jobBootstrap.shutdown();
}
- jobBootstrap.shutdown();
}
private void awaitJobStopped(final PipelineElasticJobListener jobListener, final String jobId, final long timeoutMillis) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
index 96a75df0559..926402a2574 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
@@ -17,9 +17,9 @@
package org.apache.shardingsphere.data.pipeline.core.job;
-import com.google.common.base.Preconditions;
import lombok.Getter;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
/**
@@ -28,18 +28,19 @@ import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
@Getter
public abstract class AbstractPipelineJobId implements PipelineJobId {
+ public static final String CURRENT_VERSION = "02";
+
private final JobType jobType;
- private final String formatVersion;
+ private final PipelineContextKey contextKey;
- public AbstractPipelineJobId(final JobType jobType, final String formatVersion) {
+ public AbstractPipelineJobId(final JobType jobType, final PipelineContextKey contextKey) {
this.jobType = jobType;
- Preconditions.checkArgument(2 == formatVersion.length(), "formatVersion length is not 2");
- this.formatVersion = formatVersion;
+ this.contextKey = contextKey;
}
@Override
- public final String getJobTypeCode() {
- return jobType.getTypeCode();
+ public String getFormatVersion() {
+ return CURRENT_VERSION;
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
index e82549caec3..32c87b92605 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
@@ -18,11 +18,20 @@
package org.apache.shardingsphere.data.pipeline.core.job;
import com.google.common.base.Preconditions;
+import com.google.common.primitives.Shorts;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import lombok.SneakyThrows;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import org.apache.shardingsphere.data.pipeline.core.job.util.InstanceTypeUtil;
import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import org.apache.shardingsphere.data.pipeline.spi.job.JobTypeFactory;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+
+import java.nio.charset.StandardCharsets;
/**
* Pipeline job id utility class.
@@ -37,7 +46,12 @@ public final class PipelineJobIdUtils {
* @return job id common prefix
*/
public static String marshalJobIdCommonPrefix(final PipelineJobId pipelineJobId) {
- return 'j' + pipelineJobId.getJobTypeCode() + pipelineJobId.getFormatVersion();
+ InstanceType instanceType = pipelineJobId.getContextKey().getInstanceType();
+ String databaseName = instanceType == InstanceType.PROXY ? "" : pipelineJobId.getContextKey().getDatabaseName();
+ String databaseNameHex = Hex.encodeHexString(databaseName.getBytes(StandardCharsets.UTF_8), true);
+ String databaseNameLengthHex = Hex.encodeHexString(Shorts.toByteArray((short) databaseNameHex.length()), true);
+ char encodedInstanceType = InstanceTypeUtil.encode(instanceType);
+ return 'j' + pipelineJobId.getJobType().getTypeCode() + pipelineJobId.getFormatVersion() + encodedInstanceType + databaseNameLengthHex + databaseNameHex;
}
/**
@@ -47,9 +61,30 @@ public final class PipelineJobIdUtils {
* @return job type
*/
public static JobType parseJobType(final String jobId) {
- Preconditions.checkArgument(jobId.length() > 3, "Invalid jobId length, jobId=%s", jobId);
- Preconditions.checkArgument('j' == jobId.charAt(0), "Invalid jobId, first char=%s", jobId.charAt(0));
+ verifyJobId(jobId);
String typeCode = jobId.substring(1, 3);
return JobTypeFactory.getInstance(typeCode);
}
+
+ private static void verifyJobId(final String jobId) {
+ Preconditions.checkArgument(jobId.length() > 10, "Invalid job id length, job id: `%s`", jobId);
+ Preconditions.checkArgument('j' == jobId.charAt(0), "Invalid job id, first char: `%s`", jobId.charAt(0));
+ }
+
+ /**
+ * Parse context key.
+ *
+ * @param jobId job id
+ * @return pipeline context key
+ */
+ @SneakyThrows(DecoderException.class)
+ public static PipelineContextKey parseContextKey(final String jobId) {
+ verifyJobId(jobId);
+ String formatVersion = jobId.substring(3, 5);
+ Preconditions.checkArgument(AbstractPipelineJobId.CURRENT_VERSION.equals(formatVersion), "Format version doesn't match, format version: " + formatVersion);
+ char instanceType = jobId.charAt(5);
+ short databaseNameLength = Shorts.fromByteArray(Hex.decodeHex(jobId.substring(6, 10)));
+ String databaseName = new String(Hex.decodeHex(jobId.substring(10, 10 + databaseNameLength)), StandardCharsets.UTF_8);
+ return PipelineContextKey.build(InstanceTypeUtil.decode(instanceType), databaseName);
+ }
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/util/InstanceTypeUtil.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/util/InstanceTypeUtil.java
new file mode 100644
index 00000000000..d79405b1ce0
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/util/InstanceTypeUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job.util;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+
+/**
+ * Instance type util.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class InstanceTypeUtil {
+
+ /**
+ * Encode instance type.
+ *
+ * @param instanceType instance type
+ * @return encoded instance type
+ * @throws UnsupportedOperationException if instance type is unknown
+ */
+ public static char encode(final InstanceType instanceType) {
+ switch (instanceType) {
+ case PROXY:
+ return 'p';
+ case JDBC:
+ return 'j';
+ default:
+ throw new UnsupportedOperationException("Unknown instance type: " + instanceType);
+ }
+ }
+
+ /**
+ * Decode instance type.
+ *
+ * @param instanceType instance type
+ * @return decoded instance type
+ * @throws UnsupportedOperationException if instance type is unknown
+ */
+ public static InstanceType decode(final char instanceType) {
+ switch (instanceType) {
+ case 'p':
+ return InstanceType.PROXY;
+ case 'j':
+ return InstanceType.JDBC;
+ default:
+ throw new UnsupportedOperationException("Unknown instance type: " + instanceType);
+ }
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
index 864c16f0aa8..b0191bf3101 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
@@ -19,10 +19,14 @@ package org.apache.shardingsphere.data.pipeline.core.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobWorker;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNodeWatcher;
import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.infra.database.DefaultDatabase;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener;
@@ -33,7 +37,7 @@ import org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleLi
public final class PipelineContextManagerLifecycleListener implements ContextManagerLifecycleListener {
@Override
- public void onInitialized(final ModeConfiguration modeConfig, final ContextManager contextManager) {
+ public void onInitialized(final InstanceType instanceType, final String databaseName, final ModeConfiguration modeConfig, final ContextManager contextManager) {
if (null == modeConfig) {
return;
}
@@ -41,9 +45,18 @@ public final class PipelineContextManagerLifecycleListener implements ContextMan
log.info("mode type is not Cluster, mode type='{}', ignore", modeConfig.getType());
return;
}
- PipelineContext.initModeConfig(modeConfig);
- PipelineContext.initContextManager(contextManager);
- PipelineJobWorker.initialize();
+ // TODO When StandalonePersistRepository is equivalent with ClusterPersistRepository, use STANDALONE mode in pipeline IT and remove this check.
+ if (DefaultDatabase.LOGIC_NAME.equals(databaseName)) {
+ return;
+ }
+ PipelineContextKey contextKey = PipelineContextKey.build(instanceType, databaseName);
+ PipelineContextManager.putContext(contextKey, new PipelineContext(modeConfig, contextManager));
+ PipelineMetaDataNodeWatcher.getInstance(contextKey);
ElasticJobServiceLoader.registerTypedService(ElasticJobListener.class);
}
+
+ @Override
+ public void onDestroyed(final InstanceType instanceType, final String databaseName) {
+ PipelineContextManager.removeContext(PipelineContextKey.build(instanceType, databaseName));
+ }
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/ShardingSphereDataContextManagerLifecycleListener.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/ShardingSphereDataContextManagerLifecycleListener.java
index f9f1ff1f5ce..eb310cff02e 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/ShardingSphereDataContextManagerLifecycleListener.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/ShardingSphereDataContextManagerLifecycleListener.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.listener;
import org.apache.shardingsphere.data.pipeline.core.execute.ShardingSphereDataJobWorker;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener;
@@ -28,13 +29,20 @@ import org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleLi
public final class ShardingSphereDataContextManagerLifecycleListener implements ContextManagerLifecycleListener {
@Override
- public void onInitialized(final ModeConfiguration modeConfig, final ContextManager contextManager) {
+ public void onInitialized(final InstanceType instanceType, final String databaseName, final ModeConfiguration modeConfig, final ContextManager contextManager) {
if (null == modeConfig) {
return;
}
if (!contextManager.getInstanceContext().isCluster()) {
return;
}
+ if (instanceType != InstanceType.PROXY) {
+ return;
+ }
ShardingSphereDataJobWorker.initialize(contextManager);
}
+
+ @Override
+ public void onDestroyed(final InstanceType instanceType, final String databaseName) {
+ }
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java
index 05f836f406d..e3ec347c8d4 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java
@@ -17,10 +17,11 @@
package org.apache.shardingsphere.data.pipeline.core.metadata.node;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -29,6 +30,9 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -38,18 +42,20 @@ import java.util.stream.Collectors;
@Slf4j
public final class PipelineMetaDataNodeWatcher {
- private static final PipelineMetaDataNodeWatcher INSTANCE = new PipelineMetaDataNodeWatcher();
+ private static final Map<PipelineContextKey, PipelineMetaDataNodeWatcher> INSTANCE_MAP = new ConcurrentHashMap<>();
+
+ private static final ExecutorService EVENT_LISTENER_EXECUTOR = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Pipeline-EventListener-%d").build());
private final Map<Pattern, PipelineMetaDataChangedEventHandler> listenerMap = new ConcurrentHashMap<>();
- private PipelineMetaDataNodeWatcher() {
+ private PipelineMetaDataNodeWatcher(final PipelineContextKey contextKey) {
listenerMap.putAll(ShardingSphereServiceLoader.getServiceInstances(PipelineMetaDataChangedEventHandler.class)
.stream().collect(Collectors.toMap(PipelineMetaDataChangedEventHandler::getKeyPattern, each -> each, (key, value) -> value)));
- PipelineAPIFactory.getGovernanceRepositoryAPI().watch(DataPipelineConstants.DATA_PIPELINE_ROOT, this::dispatchEvent);
+ PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).watch(DataPipelineConstants.DATA_PIPELINE_ROOT, this::dispatchEvent);
}
private void dispatchEvent(final DataChangedEvent event) {
- CompletableFuture.runAsync(() -> dispatchEvent0(event), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
+ CompletableFuture.runAsync(() -> dispatchEvent0(event), EVENT_LISTENER_EXECUTOR).whenComplete((unused, throwable) -> {
if (null != throwable) {
log.error("dispatch event failed", throwable);
}
@@ -58,8 +64,10 @@ public final class PipelineMetaDataNodeWatcher {
private void dispatchEvent0(final DataChangedEvent event) {
for (Entry<Pattern, PipelineMetaDataChangedEventHandler> entry : listenerMap.entrySet()) {
- if (entry.getKey().matcher(event.getKey()).matches()) {
- entry.getValue().handle(event);
+ Matcher matcher = entry.getKey().matcher(event.getKey());
+ if (matcher.matches()) {
+ String jobId = matcher.group(1);
+ entry.getValue().handle(jobId, event);
return;
}
}
@@ -68,9 +76,21 @@ public final class PipelineMetaDataNodeWatcher {
/**
* Get instance.
*
+ * @param contextKey context key
* @return instance
*/
- public static PipelineMetaDataNodeWatcher getInstance() {
- return INSTANCE;
+ public static PipelineMetaDataNodeWatcher getInstance(final PipelineContextKey contextKey) {
+ PipelineMetaDataNodeWatcher result = INSTANCE_MAP.get(contextKey);
+ if (null != result) {
+ return result;
+ }
+ synchronized (INSTANCE_MAP) {
+ result = INSTANCE_MAP.get(contextKey);
+ if (null == result) {
+ result = new PipelineMetaDataNodeWatcher(contextKey);
+ INSTANCE_MAP.put(contextKey, result);
+ }
+ }
+ return result;
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
index 3c0a749f191..223a73b2aa8 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
@@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.ChangedJobConfigurationProcessor;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
@@ -69,8 +70,9 @@ public abstract class AbstractChangedJobConfigurationProcessor implements Change
protected void onDisabled(final JobConfiguration jobConfig, final Collection<Integer> jobItems) {
String jobId = jobConfig.getJobName();
+ PipelineDistributedBarrier distributedBarrier = PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
for (Integer each : jobItems) {
- PipelineDistributedBarrier.getInstance().persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId), each);
+ distributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId), each);
}
}
@@ -78,8 +80,9 @@ public abstract class AbstractChangedJobConfigurationProcessor implements Change
protected void executeJob(final JobConfiguration jobConfig) {
AbstractPipelineJob job = buildPipelineJob();
- PipelineJobCenter.addJob(jobConfig.getJobName(), job);
- OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfig);
+ String jobId = jobConfig.getJobName();
+ PipelineJobCenter.addJob(jobId, job);
+ OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)), job, jobConfig);
job.setJobBootstrap(oneOffJobBootstrap);
oneOffJobBootstrap.execute();
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandler.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandler.java
index 4b8bde727c7..7b372e910ae 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandler.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandler.java
@@ -38,7 +38,8 @@ public interface PipelineMetaDataChangedEventHandler {
/**
* Handle meta data changed event.
*
+ * @param jobId job id
* @param event changed event
*/
- void handle(DataChangedEvent event);
+ void handle(String jobId, DataChangedEvent event);
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
index 8752b3b3170..e2551b72d36 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.impl;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
@@ -36,9 +37,9 @@ public final class BarrierMetaDataChangedEventHandler implements PipelineMetaDat
}
@Override
- public void handle(final DataChangedEvent event) {
+ public void handle(final String jobId, final DataChangedEvent event) {
if (event.getType() == Type.ADDED) {
- PipelineDistributedBarrier.getInstance().notifyChildrenNodeCountCheck(event.getKey());
+ PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId)).notifyChildrenNodeCountCheck(event.getKey());
}
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ConfigMetaDataChangedEventHandler.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ConfigMetaDataChangedEventHandler.java
index 670cff12971..a97a3bbe673 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ConfigMetaDataChangedEventHandler.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ConfigMetaDataChangedEventHandler.java
@@ -42,7 +42,7 @@ public final class ConfigMetaDataChangedEventHandler implements PipelineMetaData
}
@Override
- public void handle(final DataChangedEvent event) {
+ public void handle(final String jobId, final DataChangedEvent event) {
JobConfiguration jobConfig;
try {
jobConfig = YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class, true).toJobConfiguration();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
index 0ce58b42331..d7739c3e3fc 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
@@ -28,7 +28,6 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.core.check.datasource.BasicDataSourceChecker;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetSchemasParameter;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
@@ -43,6 +42,7 @@ import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
@@ -64,6 +64,10 @@ public final class PipelineJobPreparerUtils {
* @return true if supported, otherwise false
*/
public static boolean isIncrementalSupported(final String databaseType) {
+ // TODO H2 doesn't support incremental, but H2DatabaseType.getTrunkDatabaseType() is MySQL. Ignore trunk database type for H2 for now.
+ if ("H2".equalsIgnoreCase(databaseType)) {
+ return TypedSPILoader.findService(IncrementalDumperCreator.class, databaseType).isPresent();
+ }
return PipelineTypedSPILoader.findDatabaseTypedService(IncrementalDumperCreator.class, databaseType).isPresent();
}
@@ -86,11 +90,11 @@ public final class PipelineJobPreparerUtils {
/**
* Get SQL parser engine.
*
+ * @param metaData meta data
* @param targetDatabaseName target database name
* @return SQL parser engine
*/
- public static SQLParserEngine getSQLParserEngine(final String targetDatabaseName) {
- ShardingSphereMetaData metaData = PipelineContext.getContextManager().getMetaDataContexts().getMetaData();
+ public static SQLParserEngine getSQLParserEngine(final ShardingSphereMetaData metaData, final String targetDatabaseName) {
ShardingSphereDatabase database = metaData.getDatabase(targetDatabaseName);
DatabaseType databaseType = database.getProtocolType();
if (databaseType instanceof BranchDatabaseType) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/registry/CoordinatorRegistryCenterInitializer.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/registry/CoordinatorRegistryCenterInitializer.java
index cb44d79c55c..36db76a53f1 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/registry/CoordinatorRegistryCenterInitializer.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/registry/CoordinatorRegistryCenterInitializer.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.registry;
-import com.google.common.base.Preconditions;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
@@ -34,16 +33,15 @@ import java.util.Properties;
public final class CoordinatorRegistryCenterInitializer {
/**
- * Create registry center instance.
+ * Create ZooKeeper registry center instance.
*
* @param modeConfig mode configuration
* @param namespaceRelativePath namespace relative path
* @return registry center instance
*/
- public CoordinatorRegistryCenter createRegistryCenter(final ModeConfiguration modeConfig, final String namespaceRelativePath) {
+ public CoordinatorRegistryCenter createZookeeperRegistryCenter(final ModeConfiguration modeConfig, final String namespaceRelativePath) {
ClusterPersistRepositoryConfiguration repositoryConfig = (ClusterPersistRepositoryConfiguration) modeConfig.getRepository();
- String clusterType = modeConfig.getRepository().getType();
- Preconditions.checkArgument("ZooKeeper".equals(clusterType), "Unsupported cluster type `%s`", clusterType);
+ // TODO Add registry center cache. Refer to RegistryCenterFactory.createCoordinatorRegistryCenter
CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(getZookeeperConfig(repositoryConfig, namespaceRelativePath));
result.init();
return result;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
index 90aca859bb9..16998486400 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
@@ -20,13 +20,13 @@ package org.apache.shardingsphere.data.pipeline.core.util;
import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.Getter;
-import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import java.util.Map;
@@ -37,34 +37,36 @@ import java.util.concurrent.TimeUnit;
/**
* Pipeline distributed barrier.
*/
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class PipelineDistributedBarrier {
- private static final PipelineDistributedBarrier INSTANCE = new PipelineDistributedBarrier();
+ private static final Map<PipelineContextKey, PipelineDistributedBarrier> INSTANCE_MAP = new ConcurrentHashMap<>();
- private static final LazyInitializer<ClusterPersistRepository> REPOSITORY_LAZY_INITIALIZER = new LazyInitializer<ClusterPersistRepository>() {
-
- @Override
- protected ClusterPersistRepository initialize() {
- return (ClusterPersistRepository) PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
- }
- };
+ private final PipelineContextKey contextKey;
+
+ private final LazyInitializer<ClusterPersistRepository> repositoryLazyInitializer = new PersistRepositoryLazyInitializer();
private final Map<String, InnerCountDownLatchHolder> countDownLatchHolders = new ConcurrentHashMap<>();
/**
* Get instance.
*
+ * @param contextKey context key
* @return instance
*/
- public static PipelineDistributedBarrier getInstance() {
- return INSTANCE;
+ public static PipelineDistributedBarrier getInstance(final PipelineContextKey contextKey) {
+ PipelineDistributedBarrier result = INSTANCE_MAP.get(contextKey);
+ if (null != result) {
+ return result;
+ }
+ INSTANCE_MAP.computeIfAbsent(contextKey, PipelineDistributedBarrier::new);
+ return INSTANCE_MAP.get(contextKey);
}
@SneakyThrows(ConcurrentException.class)
- private static ClusterPersistRepository getRepository() {
- return REPOSITORY_LAZY_INITIALIZER.get();
+ private ClusterPersistRepository getRepository() {
+ return repositoryLazyInitializer.get();
}
/**
@@ -154,4 +156,13 @@ public final class PipelineDistributedBarrier {
private final CountDownLatch countDownLatch;
}
+
+ @RequiredArgsConstructor
+ private final class PersistRepositoryLazyInitializer extends LazyInitializer<ClusterPersistRepository> {
+
+ @Override
+ protected ClusterPersistRepository initialize() {
+ return (ClusterPersistRepository) PipelineContextManager.getContext(contextKey).getContextManager().getMetaDataContexts().getPersistService().getRepository();
+ }
+ }
}
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextKeyTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextKeyTest.java
new file mode 100644
index 00000000000..3424a591cf8
--- /dev/null
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextKeyTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.context;
+
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+final class PipelineContextKeyTest {
+
+ @Test
+ void assertHashCodeEqualsForProxyMode() {
+ PipelineContextKey contextKey1 = PipelineContextKey.build(InstanceType.PROXY, null);
+ PipelineContextKey contextKey2 = PipelineContextKey.build(InstanceType.PROXY, "sharding_db");
+ assertThat(contextKey1.hashCode(), is(contextKey2.hashCode()));
+ assertEquals(contextKey1, contextKey2);
+ }
+
+ @Test
+ void assertHashCodeEqualsForJdbcMode() {
+ PipelineContextKey contextKey1 = PipelineContextKey.build(InstanceType.JDBC, "logic_db");
+ PipelineContextKey contextKey2 = PipelineContextKey.build(InstanceType.JDBC, "sharding_db");
+ assertTrue(contextKey1.hashCode() != contextKey2.hashCode());
+ assertNotEquals(contextKey1, contextKey2);
+ }
+}
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/pojo/CreateConsistencyCheckJobParameter.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/util/InstanceTypeUtilTest.java
similarity index 61%
copy from kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/pojo/CreateConsistencyCheckJobParameter.java
copy to kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/util/InstanceTypeUtilTest.java
index abcbdf584c5..27e6325604d 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/pojo/CreateConsistencyCheckJobParameter.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/util/InstanceTypeUtilTest.java
@@ -15,23 +15,20 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo;
+package org.apache.shardingsphere.data.pipeline.core.job.util;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import org.junit.jupiter.api.Test;
-import java.util.Properties;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
-/**
- * Create consistency check job parameter.
- */
-@RequiredArgsConstructor
-@Getter
-public final class CreateConsistencyCheckJobParameter {
-
- private final String jobId;
-
- private final String algorithmTypeName;
+final class InstanceTypeUtilTest {
- private final Properties algorithmProps;
+ @Test
+ void assertEncodeAndDecode() {
+ for (InstanceType each : InstanceType.values()) {
+ assertThat(InstanceTypeUtil.decode(InstanceTypeUtil.encode(each)), is(each));
+ }
+ }
}
diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
index 33587a16c32..7af5edb8400 100644
--- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
+++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.cdc.distsql.handler.query;
import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
import org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
@@ -37,7 +38,7 @@ public final class ShowStreamingListExecutor implements QueryableRALExecutor<Sho
@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowStreamingListStatement sqlStatement) {
- return jobAPI.list().stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
+ return jobAPI.list(PipelineContextKey.buildForProxy()).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
((TableBasedPipelineJobInfo) each).getDatabaseName(), ((TableBasedPipelineJobInfo) each).getTable(),
each.getJobMetaData().getJobItemCount(), each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(),
each.getJobMetaData().getCreateTime(), Optional.ofNullable(each.getJobMetaData().getStopTime()).orElse(""))).collect(Collectors.toList());
diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
index 17818d37b24..7824fbbc67b 100644
--- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
+++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.migration.distsql.handler.query;
import org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
@@ -36,7 +37,7 @@ public final class ShowMigrationListExecutor implements QueryableRALExecutor<Sho
@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationListStatement sqlStatement) {
- return jobAPI.list().stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
+ return jobAPI.list(PipelineContextKey.buildForProxy()).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
((TableBasedPipelineJobInfo) each).getTable(), each.getJobMetaData().getJobItemCount(),
each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(),
each.getJobMetaData().getCreateTime(), each.getJobMetaData().getStopTime())).collect(Collectors.toList());
diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
index 2e89e65a3a3..b8d9c01a7a0 100644
--- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
+++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.migration.distsql.handler.query;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
@@ -37,7 +38,7 @@ public final class ShowMigrationSourceStorageUnitsExecutor implements QueryableR
@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationSourceStorageUnitsStatement sqlStatement) {
- Iterator<Collection<Object>> data = jobAPI.listMigrationSourceResources().iterator();
+ Iterator<Collection<Object>> data = jobAPI.listMigrationSourceResources(PipelineContextKey.buildForProxy()).iterator();
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
while (data.hasNext()) {
result.add(new LocalDataQueryResultRow((List<Object>) data.next()));
diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
index b0f84811ca0..bb367b44820 100644
--- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
+++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.migration.distsql.handler.update;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.exception.job.MissingRequiredTargetDatabaseException;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
@@ -36,7 +37,7 @@ public final class MigrateTableUpdater implements RALUpdater<MigrateTableStateme
public void executeUpdate(final String databaseName, final MigrateTableStatement sqlStatement) {
String targetDatabaseName = null == sqlStatement.getTargetDatabaseName() ? databaseName : sqlStatement.getTargetDatabaseName();
ShardingSpherePreconditions.checkNotNull(targetDatabaseName, MissingRequiredTargetDatabaseException::new);
- jobAPI.createJobAndStart(new MigrateTableStatement(sqlStatement.getSourceTargetEntries(), targetDatabaseName));
+ jobAPI.createJobAndStart(PipelineContextKey.buildForProxy(), new MigrateTableStatement(sqlStatement.getSourceTargetEntries(), targetDatabaseName));
}
@Override
diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
index b18b28afa5a..19207f135da 100644
--- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
+++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.migration.distsql.handler.update;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import org.apache.shardingsphere.distsql.handler.validate.DataSourcePropertiesValidateHandler;
@@ -53,7 +54,7 @@ public final class RegisterMigrationSourceStorageUnitUpdater implements RALUpdat
DatabaseType databaseType = DatabaseTypeEngine.getDatabaseType(urlBasedDataSourceSegment.getUrl());
Map<String, DataSourceProperties> sourcePropertiesMap = DataSourceSegmentsConverter.convert(databaseType, dataSources);
validateHandler.validate(sourcePropertiesMap);
- jobAPI.addMigrationSourceResources(sourcePropertiesMap);
+ jobAPI.addMigrationSourceResources(PipelineContextKey.buildForProxy(), sourcePropertiesMap);
}
@Override
diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
index 9ea31556d53..91b4f2d89bc 100644
--- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
+++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.migration.distsql.handler.update;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import org.apache.shardingsphere.migration.distsql.statement.UnregisterMigrationSourceStorageUnitStatement;
@@ -30,7 +31,7 @@ public final class UnregisterMigrationSourceStorageUnitUpdater implements RALUpd
@Override
public void executeUpdate(final String databaseName, final UnregisterMigrationSourceStorageUnitStatement sqlStatement) {
- jobAPI.dropMigrationSourceResources(sqlStatement.getNames());
+ jobAPI.dropMigrationSourceResources(PipelineContextKey.buildForProxy(), sqlStatement.getNames());
}
@Override
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
index 085917dd0aa..f8c163cedb2 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
import lombok.Getter;
import lombok.ToString;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
import org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;
@@ -30,22 +31,20 @@ import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.Co
@ToString(callSuper = true)
public final class ConsistencyCheckJobId extends AbstractPipelineJobId {
- public static final String CURRENT_VERSION = "01";
-
private final String parentJobId;
private final int sequence;
- public ConsistencyCheckJobId(final String parentJobId) {
- this(parentJobId, ConsistencyCheckSequence.MIN_SEQUENCE);
+ public ConsistencyCheckJobId(final PipelineContextKey contextKey, final String parentJobId) {
+ this(contextKey, parentJobId, ConsistencyCheckSequence.MIN_SEQUENCE);
}
- public ConsistencyCheckJobId(final String parentJobId, final String latestCheckJobId) {
- this(parentJobId, ConsistencyCheckSequence.getNextSequence(parseSequence(latestCheckJobId)));
+ public ConsistencyCheckJobId(final PipelineContextKey contextKey, final String parentJobId, final String latestCheckJobId) {
+ this(contextKey, parentJobId, ConsistencyCheckSequence.getNextSequence(parseSequence(latestCheckJobId)));
}
- public ConsistencyCheckJobId(final String parentJobId, final int sequence) {
- super(new ConsistencyCheckJobType(), CURRENT_VERSION);
+ public ConsistencyCheckJobId(final PipelineContextKey contextKey, final String parentJobId, final int sequence) {
+ super(new ConsistencyCheckJobType(), contextKey);
this.parentJobId = parentJobId;
this.sequence = sequence > ConsistencyCheckSequence.MAX_SEQUENCE ? ConsistencyCheckSequence.MIN_SEQUENCE : sequence;
}
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 2429e01439d..cd1c4de219a 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -37,6 +37,7 @@ import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.exception.job.ConsistencyCheckJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
@@ -97,8 +98,8 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
* @throws UncompletedConsistencyCheckJobExistsException uncompleted consistency check job exists exception
*/
public String createJobAndStart(final CreateConsistencyCheckJobParameter param) {
- GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
- String parentJobId = param.getJobId();
+ String parentJobId = param.getParentJobId();
+ GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
Optional<String> latestCheckJobId = repositoryAPI.getLatestCheckJobId(parentJobId);
if (latestCheckJobId.isPresent()) {
Optional<ConsistencyCheckJobItemProgress> progress = getJobItemProgress(latestCheckJobId.get(), 0);
@@ -107,7 +108,8 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
throw new UncompletedConsistencyCheckJobExistsException(latestCheckJobId.get());
}
}
- String result = marshalJobId(latestCheckJobId.map(s -> new ConsistencyCheckJobId(parentJobId, s)).orElseGet(() -> new ConsistencyCheckJobId(parentJobId)));
+ PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(parentJobId);
+ String result = marshalJobId(latestCheckJobId.map(s -> new ConsistencyCheckJobId(contextKey, parentJobId, s)).orElseGet(() -> new ConsistencyCheckJobId(contextKey, parentJobId)));
repositoryAPI.persistLatestCheckJobId(parentJobId, result);
repositoryAPI.deleteCheckJobResult(parentJobId, result);
dropJob(result);
@@ -123,15 +125,16 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
/**
* Get latest data consistency check result.
*
- * @param jobId job id
+ * @param parentJobId parent job id
* @return latest data consistency check result
*/
- public Map<String, DataConsistencyCheckResult> getLatestDataConsistencyCheckResult(final String jobId) {
- Optional<String> latestCheckJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getLatestCheckJobId(jobId);
+ public Map<String, DataConsistencyCheckResult> getLatestDataConsistencyCheckResult(final String parentJobId) {
+ GovernanceRepositoryAPI governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
+ Optional<String> latestCheckJobId = governanceRepositoryAPI.getLatestCheckJobId(parentJobId);
if (!latestCheckJobId.isPresent()) {
return Collections.emptyMap();
}
- return PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckJobResult(jobId, latestCheckJobId.get());
+ return governanceRepositoryAPI.getCheckJobResult(parentJobId, latestCheckJobId.get());
}
@Override
@@ -144,12 +147,13 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
progressContext.getRecordsCount(), progressContext.getCheckBeginTimeMillis(), progressContext.getCheckEndTimeMillis(), progressContext.getTableCheckPositions());
jobItemProgress.setStatus(context.getStatus());
YamlConsistencyCheckJobItemProgress yamlJobProgress = swapper.swapToYamlConfiguration(jobItemProgress);
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(context.getJobId(), context.getShardingItem(), YamlEngine.marshal(yamlJobProgress));
+ String jobId = context.getJobId();
+ PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(jobId, context.getShardingItem(), YamlEngine.marshal(yamlJobProgress));
}
@Override
public Optional<ConsistencyCheckJobItemProgress> getJobItemProgress(final String jobId, final int shardingItem) {
- Optional<String> progress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, shardingItem);
+ Optional<String> progress = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId, shardingItem);
return progress.map(s -> swapper.swapToObject(YamlEngine.unmarshal(s, YamlConsistencyCheckJobItemProgress.class, true)));
}
@@ -161,7 +165,8 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
return;
}
jobItemProgress.get().setStatus(status);
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
+ PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(jobId, shardingItem,
+ YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
}
@Override
@@ -184,7 +189,7 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
}
private String getLatestCheckJobId(final String parentJobId) {
- Optional<String> result = PipelineAPIFactory.getGovernanceRepositoryAPI().getLatestCheckJobId(parentJobId);
+ Optional<String> result = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).getLatestCheckJobId(parentJobId);
ShardingSpherePreconditions.checkState(result.isPresent(), () -> new ConsistencyCheckJobNotFoundException(parentJobId));
return result.get();
}
@@ -206,12 +211,13 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
public void dropByParentJobId(final String parentJobId) {
String latestCheckJobId = getLatestCheckJobId(parentJobId);
stop(latestCheckJobId);
- GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
+ PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(parentJobId);
+ GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey);
Collection<String> checkJobIds = repositoryAPI.listCheckJobIds(parentJobId);
Optional<Integer> previousSequence = ConsistencyCheckSequence.getPreviousSequence(
checkJobIds.stream().map(ConsistencyCheckJobId::parseSequence).collect(Collectors.toList()), ConsistencyCheckJobId.parseSequence(latestCheckJobId));
if (previousSequence.isPresent()) {
- String checkJobId = marshalJobId(new ConsistencyCheckJobId(parentJobId, previousSequence.get()));
+ String checkJobId = marshalJobId(new ConsistencyCheckJobId(contextKey, parentJobId, previousSequence.get()));
repositoryAPI.persistLatestCheckJobId(parentJobId, checkJobId);
} else {
repositoryAPI.deleteLatestCheckJobId(parentJobId);
@@ -227,7 +233,8 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
* @return consistency job item infos
*/
public List<ConsistencyCheckJobItemInfo> getJobItemInfos(final String parentJobId) {
- Optional<String> latestCheckJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getLatestCheckJobId(parentJobId);
+ GovernanceRepositoryAPI governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
+ Optional<String> latestCheckJobId = governanceRepositoryAPI.getLatestCheckJobId(parentJobId);
ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(), () -> new ConsistencyCheckJobNotFoundException(parentJobId));
String checkJobId = latestCheckJobId.get();
Optional<ConsistencyCheckJobItemProgress> progressOptional = getJobItemProgress(checkJobId, 0);
@@ -237,7 +244,7 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
List<ConsistencyCheckJobItemInfo> result = new LinkedList<>();
ConsistencyCheckJobItemProgress jobItemProgress = progressOptional.get();
if (!Strings.isNullOrEmpty(jobItemProgress.getIgnoredTableNames())) {
- Map<String, DataConsistencyCheckResult> checkJobResult = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckJobResult(parentJobId, latestCheckJobId.get());
+ Map<String, DataConsistencyCheckResult> checkJobResult = governanceRepositoryAPI.getCheckJobResult(parentJobId, latestCheckJobId.get());
result.addAll(buildIgnoredTableInfo(jobItemProgress.getIgnoredTableNames().split(","), checkJobResult));
}
if (Objects.equals(jobItemProgress.getIgnoredTableNames(), jobItemProgress.getTableNames())) {
@@ -266,7 +273,8 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
}
private ConsistencyCheckJobItemInfo getJobItemInfo(final String parentJobId) {
- Optional<String> latestCheckJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getLatestCheckJobId(parentJobId);
+ GovernanceRepositoryAPI governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
+ Optional<String> latestCheckJobId = governanceRepositoryAPI.getLatestCheckJobId(parentJobId);
ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(), () -> new ConsistencyCheckJobNotFoundException(parentJobId));
String checkJobId = latestCheckJobId.get();
Optional<ConsistencyCheckJobItemProgress> progressOptional = getJobItemProgress(checkJobId, 0);
@@ -306,7 +314,7 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
result.setTableNames(Optional.ofNullable(tableNames).orElse(""));
result.setCheckBeginTime(DATE_TIME_FORMATTER.format(checkBeginTime));
result.setErrorMessage(getJobItemErrorMessage(checkJobId, 0));
- Map<String, DataConsistencyCheckResult> checkJobResult = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckJobResult(parentJobId, checkJobId);
+ Map<String, DataConsistencyCheckResult> checkJobResult = governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId);
InventoryIncrementalJobAPI inventoryIncrementalJobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(
PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(parentJobId).getTypeName());
result.setCheckSuccess(inventoryIncrementalJobAPI.aggregateDataConsistencyCheckResults(parentJobId, checkJobResult));
@@ -331,8 +339,7 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
}
@Override
- public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration yamlJobConfig) {
- throw new UnsupportedOperationException();
+ public void extendYamlJobConfiguration(final PipelineContextKey contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
}
@Override
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/pojo/CreateConsistencyCheckJobParameter.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/pojo/CreateConsistencyCheckJobParameter.java
index abcbdf584c5..f48442b2ada 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/pojo/CreateConsistencyCheckJobParameter.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/pojo/CreateConsistencyCheckJobParameter.java
@@ -29,7 +29,7 @@ import java.util.Properties;
@Getter
public final class CreateConsistencyCheckJobParameter {
- private final String jobId;
+ private final String parentJobId;
private final String algorithmTypeName;
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/yaml/YamlConsistencyCheckJobConfiguration.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/yaml/YamlConsistencyCheckJobConfiguration.java
index 2a64cc5b2e4..e2520560201 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/yaml/YamlConsistencyCheckJobConfiguration.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/yaml/YamlConsistencyCheckJobConfiguration.java
@@ -41,7 +41,7 @@ public final class YamlConsistencyCheckJobConfiguration implements YamlPipelineJ
private Properties algorithmProps;
@Override
- public String getTargetDatabaseName() {
+ public String getDatabaseName() {
throw new UnsupportedOperationException("");
}
}
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 0850dbbbb2d..483912c3a77 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -106,7 +106,7 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult;
try {
dataConsistencyCheckResult = jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm, jobItemContext.getProgressContext());
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(parentJobId, checkJobId, dataConsistencyCheckResult);
+ PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).persistCheckJobResult(parentJobId, checkJobId, dataConsistencyCheckResult);
} finally {
jobItemContext.getProgressContext().setCheckEndTimeMillis(System.currentTimeMillis());
}
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
index c8c1ab2f5b3..1891cbcfe46 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.Getter;
import lombok.ToString;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
import java.util.List;
@@ -30,15 +31,10 @@ import java.util.List;
@ToString(callSuper = true)
public final class MigrationJobId extends AbstractPipelineJobId {
- public static final String CURRENT_VERSION = "01";
-
private final List<String> jobShardingDataNodes;
- private final String targetDatabaseName;
-
- public MigrationJobId(final List<String> jobShardingDataNodes, final String targetDatabaseName) {
- super(new MigrationJobType(), CURRENT_VERSION);
+ public MigrationJobId(final PipelineContextKey contextKey, final List<String> jobShardingDataNodes) {
+ super(new MigrationJobType(), contextKey);
this.jobShardingDataNodes = jobShardingDataNodes;
- this.targetDatabaseName = targetDatabaseName;
}
}
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 6d6c116a978..9985e3aaba8 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -51,12 +51,14 @@ import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractInventoryIn
import org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineDataSourcePersistService;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.connection.RegisterMigrationSourceStorageUnitException;
import org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException;
import org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException;
import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtils;
import org.apache.shardingsphere.data.pipeline.core.sharding.ShardingColumnsExtractor;
import org.apache.shardingsphere.data.pipeline.core.util.JobDataNodeLineConvertUtils;
@@ -127,19 +129,20 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
/**
* Create job migration config and start.
*
+ * @param contextKey context key
* @param param create migration job parameter
* @return job id
*/
- public String createJobAndStart(final MigrateTableStatement param) {
- MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(param));
+ public String createJobAndStart(final PipelineContextKey contextKey, final MigrateTableStatement param) {
+ MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey, param));
start(jobConfig);
return jobConfig.getJobId();
}
- private YamlMigrationJobConfiguration buildYamlJobConfiguration(final MigrateTableStatement param) {
+ private YamlMigrationJobConfiguration buildYamlJobConfiguration(final PipelineContextKey contextKey, final MigrateTableStatement param) {
YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
result.setTargetDatabaseName(param.getTargetDatabaseName());
- Map<String, DataSourceProperties> metaDataDataSource = dataSourcePersistService.load(new MigrationJobType());
+ Map<String, DataSourceProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, new MigrationJobType());
Map<String, List<DataNode>> sourceDataNodes = new LinkedHashMap<>();
Map<String, YamlPipelineDataSourceConfiguration> configSources = new LinkedHashMap<>();
List<SourceTargetEntry> sourceTargetEntries = new ArrayList<>(new HashSet<>(param.getSourceTargetEntries())).stream().sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName)
@@ -168,7 +171,8 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
}
}
result.setSources(configSources);
- PipelineDataSourceConfiguration targetPipelineDataSourceConfig = buildTargetPipelineDataSourceConfiguration(param.getTargetDatabaseName());
+ ShardingSphereDatabase targetDatabase = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getTargetDatabaseName());
+ PipelineDataSourceConfiguration targetPipelineDataSourceConfig = buildTargetPipelineDataSourceConfiguration(targetDatabase);
result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(), targetPipelineDataSourceConfig.getParameter()));
result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType());
List<JobDataNodeEntry> tablesFirstDataNodes = sourceDataNodes.entrySet().stream()
@@ -177,7 +181,7 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
result.setTablesFirstDataNodes(new JobDataNodeLine(tablesFirstDataNodes).marshal());
result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
- extendYamlJobConfiguration(result);
+ extendYamlJobConfiguration(contextKey, result);
return result;
}
@@ -188,14 +192,13 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
return result;
}
- private PipelineDataSourceConfiguration buildTargetPipelineDataSourceConfiguration(final String targetDatabaseName) {
+ private PipelineDataSourceConfiguration buildTargetPipelineDataSourceConfiguration(final ShardingSphereDatabase targetDatabase) {
Map<String, Map<String, Object>> targetDataSourceProps = new HashMap<>();
- ShardingSphereDatabase targetDatabase = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(targetDatabaseName);
for (Entry<String, DataSource> entry : targetDatabase.getResourceMetaData().getDataSources().entrySet()) {
Map<String, Object> dataSourceProps = swapper.swapToMap(DataSourcePropertiesCreator.create(entry.getValue()));
targetDataSourceProps.put(entry.getKey(), dataSourceProps);
}
- YamlRootConfiguration targetRootConfig = buildYamlRootConfiguration(targetDatabaseName, targetDataSourceProps, targetDatabase.getRuleMetaData().getConfigurations());
+ YamlRootConfiguration targetRootConfig = buildYamlRootConfiguration(targetDatabase.getName(), targetDataSourceProps, targetDatabase.getRuleMetaData().getConfigurations());
return new ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
}
@@ -234,15 +237,15 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
}
@Override
- public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration yamlJobConfig) {
+ public void extendYamlJobConfiguration(final PipelineContextKey contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
YamlMigrationJobConfiguration config = (YamlMigrationJobConfiguration) yamlJobConfig;
if (null == yamlJobConfig.getJobId()) {
- config.setJobId(generateJobId(config));
+ config.setJobId(generateJobId(contextKey, config));
}
}
- private String generateJobId(final YamlMigrationJobConfiguration config) {
- MigrationJobId jobId = new MigrationJobId(config.getJobShardingDataNodes(), config.getTargetDatabaseName());
+ private String generateJobId(final PipelineContextKey contextKey, final YamlMigrationJobConfiguration config) {
+ MigrationJobId jobId = new MigrationJobId(contextKey, config.getJobShardingDataNodes());
return marshalJobId(jobId);
}
@@ -334,8 +337,7 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
@Override
public MigrationProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) {
- // TODO cache process config on local
- PipelineProcessConfiguration processConfig = showProcessConfiguration();
+ PipelineProcessConfiguration processConfig = showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId()));
return new MigrationProcessContext(pipelineJobConfig.getJobId(), processConfig);
}
@@ -348,7 +350,7 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
@Override
public void startDisabledJob(final String jobId) {
super.startDisabledJob(jobId);
- PipelineAPIFactory.getGovernanceRepositoryAPI().getLatestCheckJobId(jobId).ifPresent(optional -> {
+ PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getLatestCheckJobId(jobId).ifPresent(optional -> {
try {
TypedSPILoader.getService(PipelineJobAPI.class, "CONSISTENCY_CHECK").startDisabledJob(optional);
// CHECKSTYLE:OFF
@@ -361,7 +363,7 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
@Override
public void stop(final String jobId) {
- PipelineAPIFactory.getGovernanceRepositoryAPI().getLatestCheckJobId(jobId).ifPresent(optional -> {
+ PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getLatestCheckJobId(jobId).ifPresent(optional -> {
try {
TypedSPILoader.getService(PipelineJobAPI.class, "CONSISTENCY_CHECK").stop(optional);
// CHECKSTYLE:OFF
@@ -384,7 +386,7 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
}
private void dropCheckJobs(final String jobId) {
- Collection<String> checkJobIds = PipelineAPIFactory.getGovernanceRepositoryAPI().listCheckJobIds(jobId);
+ Collection<String> checkJobIds = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).listCheckJobIds(jobId);
if (checkJobIds.isEmpty()) {
return;
}
@@ -419,7 +421,6 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
@Override
public void commit(final String jobId) {
- checkModeConfig();
log.info("Commit job {}", jobId);
final long startTimeMillis = System.currentTimeMillis();
dropCheckJobs(jobId);
@@ -431,10 +432,11 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
/**
* Add migration source resources.
*
+ * @param contextKey context key
* @param dataSourcePropsMap data source properties map
*/
- public void addMigrationSourceResources(final Map<String, DataSourceProperties> dataSourcePropsMap) {
- Map<String, DataSourceProperties> existDataSources = dataSourcePersistService.load(getJobType());
+ public void addMigrationSourceResources(final PipelineContextKey contextKey, final Map<String, DataSourceProperties> dataSourcePropsMap) {
+ Map<String, DataSourceProperties> existDataSources = dataSourcePersistService.load(contextKey, getJobType());
Collection<String> duplicateDataSourceNames = new HashSet<>(dataSourcePropsMap.size(), 1);
for (Entry<String, DataSourceProperties> entry : dataSourcePropsMap.entrySet()) {
if (existDataSources.containsKey(entry.getKey())) {
@@ -444,31 +446,33 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
ShardingSpherePreconditions.checkState(duplicateDataSourceNames.isEmpty(), () -> new RegisterMigrationSourceStorageUnitException(duplicateDataSourceNames));
Map<String, DataSourceProperties> result = new LinkedHashMap<>(existDataSources);
result.putAll(dataSourcePropsMap);
- dataSourcePersistService.persist(getJobType(), result);
+ dataSourcePersistService.persist(contextKey, getJobType(), result);
}
/**
* Drop migration source resources.
*
+ * @param contextKey context key
* @param resourceNames resource names
*/
- public void dropMigrationSourceResources(final Collection<String> resourceNames) {
- Map<String, DataSourceProperties> metaDataDataSource = dataSourcePersistService.load(getJobType());
+ public void dropMigrationSourceResources(final PipelineContextKey contextKey, final Collection<String> resourceNames) {
+ Map<String, DataSourceProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, getJobType());
List<String> noExistResources = resourceNames.stream().filter(each -> !metaDataDataSource.containsKey(each)).collect(Collectors.toList());
ShardingSpherePreconditions.checkState(noExistResources.isEmpty(), () -> new UnregisterMigrationSourceStorageUnitException(noExistResources));
for (String each : resourceNames) {
metaDataDataSource.remove(each);
}
- dataSourcePersistService.persist(getJobType(), metaDataDataSource);
+ dataSourcePersistService.persist(contextKey, getJobType(), metaDataDataSource);
}
/**
* Query migration source resources list.
*
+ * @param contextKey context key
* @return migration source resources
*/
- public Collection<Collection<Object>> listMigrationSourceResources() {
- Map<String, DataSourceProperties> dataSourcePropertiesMap = dataSourcePersistService.load(getJobType());
+ public Collection<Collection<Object>> listMigrationSourceResources(final PipelineContextKey contextKey) {
+ Map<String, DataSourceProperties> dataSourcePropertiesMap = dataSourcePersistService.load(contextKey, getJobType());
Collection<Collection<Object>> result = new ArrayList<>(dataSourcePropertiesMap.size());
for (Entry<String, DataSourceProperties> entry : dataSourcePropertiesMap.entrySet()) {
String dataSourceName = entry.getKey();
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
index 3d27d01d41a..0cfc9de76b6 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
@@ -125,6 +125,7 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
return sourceMetaDataLoaderLazyInitializer.get();
}
+ // TODO Use SPI, configurable
@Override
public ImporterConnector getImporterConnector() {
return new DataSourceImporterConnector(dataSourceManager);
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index b4b3b0417bb..e1f7ac6fa02 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -28,10 +28,11 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitter;
import org.apache.shardingsphere.data.pipeline.core.prepare.PipelineJobPreparerUtils;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetSchemasParameter;
@@ -47,6 +48,7 @@ import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.lock.GlobalLockNames;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.LockDefinition;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
@@ -94,28 +96,29 @@ public final class MigrationJobPreparer {
@SuppressWarnings({"unchecked", "rawtypes"})
private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItemContext) throws SQLException {
MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
- LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
- if (!jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem()).isPresent()) {
+ String jobId = jobConfig.getJobId();
+ LockContext lockContext = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager().getInstanceContext().getLockContext();
+ if (!jobAPI.getJobItemProgress(jobId, jobItemContext.getShardingItem()).isPresent()) {
jobAPI.persistJobItemProgress(jobItemContext);
}
LockDefinition lockDefinition = new GlobalLockDefinition(String.format(GlobalLockNames.PREPARE.getLockName(), jobConfig.getJobId()));
long startTimeMillis = System.currentTimeMillis();
if (lockContext.tryLock(lockDefinition, 600000)) {
- log.info("try lock success, jobId={}, shardingItem={}, cost {} ms", jobConfig.getJobId(), jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
+ log.info("try lock success, jobId={}, shardingItem={}, cost {} ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
try {
- JobOffsetInfo offsetInfo = jobAPI.getJobOffsetInfo(jobConfig.getJobId());
+ JobOffsetInfo offsetInfo = jobAPI.getJobOffsetInfo(jobId);
if (!offsetInfo.isTargetSchemaTableCreated()) {
jobItemContext.setStatus(JobStatus.PREPARING);
- jobAPI.updateJobItemStatus(jobConfig.getJobId(), jobItemContext.getShardingItem(), JobStatus.PREPARING);
+ jobAPI.updateJobItemStatus(jobId, jobItemContext.getShardingItem(), JobStatus.PREPARING);
prepareAndCheckTarget(jobItemContext);
- jobAPI.persistJobOffsetInfo(jobConfig.getJobId(), new JobOffsetInfo(true));
+ jobAPI.persistJobOffsetInfo(jobId, new JobOffsetInfo(true));
}
} finally {
- log.info("unlock, jobId={}, shardingItem={}, cost {} ms", jobConfig.getJobId(), jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
+ log.info("unlock, jobId={}, shardingItem={}, cost {} ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
lockContext.unlock(lockDefinition);
}
} else {
- log.warn("try lock failed, jobId={}, shardingItem={}", jobConfig.getJobId(), jobItemContext.getShardingItem());
+ log.warn("try lock failed, jobId={}, shardingItem={}", jobId, jobItemContext.getShardingItem());
}
}
@@ -140,7 +143,8 @@ public final class MigrationJobPreparer {
PrepareTargetSchemasParameter prepareTargetSchemasParam = new PrepareTargetSchemasParameter(
PipelineTypedSPILoader.getDatabaseTypedService(DatabaseType.class, targetDatabaseType), createTableConfig, dataSourceManager);
PipelineJobPreparerUtils.prepareTargetSchema(targetDatabaseType, prepareTargetSchemasParam);
- SQLParserEngine sqlParserEngine = PipelineJobPreparerUtils.getSQLParserEngine(jobConfig.getTargetDatabaseName());
+ ShardingSphereMetaData metaData = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())).getContextManager().getMetaDataContexts().getMetaData();
+ SQLParserEngine sqlParserEngine = PipelineJobPreparerUtils.getSQLParserEngine(metaData, jobConfig.getTargetDatabaseName());
PipelineJobPreparerUtils.prepareTargetTables(targetDatabaseType, new PrepareTargetTablesParameter(createTableConfig, dataSourceManager, sqlParserEngine));
}
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfiguration.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfiguration.java
index 9b478aa8de0..9df547a3344 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfiguration.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfiguration.java
@@ -62,6 +62,11 @@ public final class YamlMigrationJobConfiguration implements YamlPipelineJobConfi
private int retryTimes = 3;
+ @Override
+ public String getDatabaseName() {
+ return targetDatabaseName;
+ }
+
/**
* Set sources.
*
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfigurationSwapper.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfigurationSwapper.java
index ad1d54568c3..f0b93ba6bb8 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfigurationSwapper.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfigurationSwapper.java
@@ -55,7 +55,7 @@ public final class YamlMigrationJobConfigurationSwapper implements YamlConfigura
@Override
public MigrationJobConfiguration swapToObject(final YamlMigrationJobConfiguration yamlConfig) {
- return new MigrationJobConfiguration(yamlConfig.getJobId(), yamlConfig.getTargetDatabaseName(),
+ return new MigrationJobConfiguration(yamlConfig.getJobId(), yamlConfig.getDatabaseName(),
yamlConfig.getSourceDatabaseType(), yamlConfig.getTargetDatabaseType(),
yamlConfig.getSources().entrySet().stream().collect(Collectors.toMap(Entry::getKey,
entry -> dataSourceConfigSwapper.swapToObject(entry.getValue()), (key, value) -> value, LinkedHashMap::new)),
diff --git a/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java b/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
index aa4e0a70585..7d17ecadd8b 100644
--- a/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
+++ b/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
@@ -17,23 +17,35 @@
package org.apache.shardingsphere.data.pipeline.core.job;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
-import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
class PipelineJobIdUtilsTest {
@Test
- void assertParseJobType() {
- MigrationJobId pipelineJobId = new MigrationJobId(Collections.singletonList("t_order:ds_0.t_order_0,ds_0.t_order_1"), "sharding_db");
- String jobId = PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + "abcd";
- JobType actualJobType = PipelineJobIdUtils.parseJobType(jobId);
- assertThat(actualJobType, instanceOf(MigrationJobType.class));
+ void assertParse() {
+ for (InstanceType each : InstanceType.values()) {
+ assertParse0(each);
+ }
+ }
+
+ private void assertParse0(final InstanceType instanceType) {
+ PipelineContextKey contextKey = PipelineContextKey.build(instanceType, "sharding_db");
+ MigrationJobId pipelineJobId = new MigrationJobId(contextKey, Collections.singletonList("t_order:ds_0.t_order_0,ds_0.t_order_1"));
+ String jobId = new MigrationJobAPI().marshalJobId(pipelineJobId);
+ assertThat(PipelineJobIdUtils.parseJobType(jobId), instanceOf(MigrationJobType.class));
+ PipelineContextKey actualContextKey = PipelineJobIdUtils.parseContextKey(jobId);
+ assertThat(actualContextKey.getInstanceType(), is(instanceType));
+ assertThat(actualContextKey.getDatabaseName(), is(instanceType == InstanceType.PROXY ? "" : "sharding_db"));
}
}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/listener/ContextManagerLifecycleListener.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/listener/ContextManagerLifecycleListener.java
index cbf879fc067..2eb2830f0ad 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/listener/ContextManagerLifecycleListener.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/listener/ContextManagerLifecycleListener.java
@@ -18,14 +18,12 @@
package org.apache.shardingsphere.mode.manager.listener;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
-import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.mode.manager.ContextManager;
/**
* Context manager lifecycle listener.
- * <p>
- * It just support <code>proxy</code> mode for now, <code>JDBC</code> mode is not supported.
- * </p>
*/
@SingletonSPI
public interface ContextManagerLifecycleListener {
@@ -33,8 +31,18 @@ public interface ContextManagerLifecycleListener {
/**
* Callback on initialized.
*
+ * @param instanceType instance type
+ * @param databaseName database name
* @param modeConfig mode configuration
* @param contextManager context manager
*/
- void onInitialized(ModeConfiguration modeConfig, ContextManager contextManager);
+ void onInitialized(InstanceType instanceType, String databaseName, ModeConfiguration modeConfig, ContextManager contextManager);
+
+ /**
+ * Callback on destroyed.
+ *
+ * @param instanceType instance type
+ * @param databaseName database name
+ */
+ void onDestroyed(InstanceType instanceType, String databaseName);
}
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
index 66c6cee7163..84747d34ac0 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
@@ -42,11 +42,13 @@ import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataR
import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtils;
import org.apache.shardingsphere.data.pipeline.cdc.util.CDCTableRuleUtils;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
@@ -83,7 +85,7 @@ public final class CDCBackendHandler {
* @return CDC response
*/
public CDCResponse streamData(final String requestId, final StreamDataRequestBody requestBody, final CDCConnectionContext connectionContext, final Channel channel) {
- ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(requestBody.getDatabase());
+ ShardingSphereDatabase database = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(requestBody.getDatabase());
ShardingSpherePreconditions.checkNotNull(database, () -> new CDCExceptionWrapper(requestId, new CDCServerException(String.format("%s database is not exists", requestBody.getDatabase()))));
Map<String, Set<String>> schemaTableNameMap;
Collection<String> tableNames;
@@ -136,17 +138,18 @@ public final class CDCBackendHandler {
if (PipelineJobCenter.isJobExisting(jobId)) {
PipelineJobCenter.stop(jobId);
}
- JobConfigurationPOJO jobConfigPOJO = PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
+ JobConfigurationAPI jobConfigAPI = PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId));
+ JobConfigurationPOJO jobConfigPOJO = jobConfigAPI.getJobConfiguration(jobId);
// TODO, ensure that there is only one consumer at a time, job config disable may not be updated when the program is forced to close
jobConfigPOJO.setDisabled(false);
- PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
- ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
+ jobConfigAPI.updateJobConfiguration(jobConfigPOJO);
+ ShardingSphereDatabase database = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
Comparator<DataRecord> dataRecordComparator = cdcJobConfig.isDecodeWithTX()
? DataRecordComparatorGenerator.generatorIncrementalComparator(database.getProtocolType())
: null;
CDCJob job = new CDCJob(new SocketSinkImporterConnector(channel, database, cdcJobConfig.getJobShardingCount(), cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
PipelineJobCenter.addJob(jobId, job);
- OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
+ OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)), job, jobConfigPOJO.toJobConfiguration());
job.setJobBootstrap(oneOffJobBootstrap);
oneOffJobBootstrap.execute();
connectionContext.setJobId(jobId);
@@ -163,9 +166,10 @@ public final class CDCBackendHandler {
return;
}
PipelineJobCenter.stop(jobId);
- JobConfigurationPOJO jobConfig = PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
+ JobConfigurationAPI jobConfigAPI = PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId));
+ JobConfigurationPOJO jobConfig = jobConfigAPI.getJobConfiguration(jobId);
jobConfig.setDisabled(true);
- PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfig);
+ jobConfigAPI.updateJobConfiguration(jobConfig);
}
/**
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
index 2e26da69ef1..2907ed469a9 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
@@ -21,6 +21,7 @@ import com.google.gson.Gson;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.distsql.parser.statement.ral.queryable.ShowMigrationRuleStatement;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
@@ -39,7 +40,8 @@ public final class ShowMigrationRuleExecutor implements QueryableRALExecutor<Sho
@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationRuleStatement sqlStatement) {
- PipelineProcessConfiguration processConfig = ((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION")).showProcessConfiguration();
+ PipelineProcessConfiguration processConfig = ((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION"))
+ .showProcessConfiguration(PipelineContextKey.buildForProxy());
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
result.add(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel())));
return result;
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
index 96b4f53b05b..fe5542b7226 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import org.apache.shardingsphere.distsql.parser.statement.ral.updatable.AlterInventoryIncrementalRuleStatement;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
@@ -34,7 +35,7 @@ public final class AlterInventoryIncrementalRuleUpdater implements RALUpdater<Al
public void executeUpdate(final String databaseName, final AlterInventoryIncrementalRuleStatement sqlStatement) {
InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, sqlStatement.getJobTypeName());
PipelineProcessConfiguration processConfig = InventoryIncrementalProcessConfigurationSegmentConverter.convert(sqlStatement.getProcessConfigSegment());
- jobAPI.alterProcessConfiguration(processConfig);
+ jobAPI.alterProcessConfiguration(PipelineContextKey.buildForProxy(), processConfig);
}
@Override
diff --git a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
index 0457026e9c5..8e46b981d07 100644
--- a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
+++ b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaDataBuilder;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.yaml.config.swapper.mode.YamlModeConfigurationSwapper;
@@ -78,7 +79,7 @@ public final class BootstrapInitializer {
private void contextManagerInitializedCallback(final ModeConfiguration modeConfig, final ContextManager contextManager) {
for (ContextManagerLifecycleListener each : ShardingSphereServiceLoader.getServiceInstances(ContextManagerLifecycleListener.class)) {
try {
- each.onInitialized(modeConfig, contextManager);
+ each.onInitialized(InstanceType.PROXY, null, modeConfig, contextManager);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
diff --git a/test/it/pipeline/pom.xml b/test/it/pipeline/pom.xml
index 0f0bdd75400..f5ec5e2dd15 100644
--- a/test/it/pipeline/pom.xml
+++ b/test/it/pipeline/pom.xml
@@ -75,6 +75,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-test-fixture-infra</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-test-fixture-jdbc</artifactId>
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 7a64a9f8399..d3e3274bcc3 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.test.it.data.pipeline.api.impl;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
@@ -31,7 +30,6 @@ import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstan
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.importer.connector.DataSourceImporterConnector;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
@@ -68,7 +66,7 @@ class GovernanceRepositoryAPIImplTest {
@BeforeAll
static void beforeClass() {
PipelineContextUtils.mockModeConfigAndContextManager();
- governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
+ governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
}
@Test
@@ -139,7 +137,6 @@ class GovernanceRepositoryAPIImplTest {
MigrationJobItemContext result = PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration());
MigrationTaskConfiguration taskConfig = result.getTaskConfig();
result.getInventoryTasks().add(mockInventoryTask(taskConfig));
- result.getIncrementalTasks().add(mockIncrementalTask(taskConfig));
return result;
}
@@ -159,12 +156,4 @@ class GovernanceRepositoryAPIImplTest {
private ImporterConnector mockImporterConnector() {
return new DataSourceImporterConnector(new DefaultPipelineDataSourceManager());
}
-
- private IncrementalTask mockIncrementalTask(final MigrationTaskConfiguration taskConfig) {
- DumperConfiguration dumperConfig = taskConfig.getDumperConfig();
- dumperConfig.setPosition(new PlaceholderPosition());
- PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
- return new IncrementalTask(3, dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtils.getPipelineChannelCreator(), mockImporterConnector(),
- metaDataLoader, PipelineContextUtils.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext());
- }
}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java
index 38542beaf0f..112f87690d1 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java
@@ -58,8 +58,8 @@ class PipelineProcessConfigurationPersistServiceTest {
PipelineProcessConfiguration processConfig = new YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig);
PipelineProcessConfigurationPersistService persistService = new PipelineProcessConfigurationPersistService();
JobType jobType = new MigrationJobType();
- persistService.persist(jobType, processConfig);
- String actualYamlText = YamlEngine.marshal(new YamlPipelineProcessConfigurationSwapper().swapToYamlConfiguration(persistService.load(jobType)));
+ persistService.persist(PipelineContextUtils.getContextKey(), jobType, processConfig);
+ String actualYamlText = YamlEngine.marshal(new YamlPipelineProcessConfigurationSwapper().swapToYamlConfiguration(persistService.load(PipelineContextUtils.getContextKey(), jobType)));
assertThat(actualYamlText, is(expectedYamlText));
}
}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
index 7e99096e4b0..350f6e2b4a1 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
@@ -25,17 +25,11 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-
/**
* Fixture incremental dumper creator.
*/
public final class FixtureIncrementalDumperCreator implements IncrementalDumperCreator<FinishedPosition> {
- private static final Collection<String> TYPE_ALIASES = Collections.unmodifiableList(Arrays.asList("Fixture", "H2"));
-
@Override
public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<FinishedPosition> position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
@@ -46,9 +40,4 @@ public final class FixtureIncrementalDumperCreator implements IncrementalDumperC
public String getType() {
return "Fixture";
}
-
- @Override
- public Collection<String> getTypeAliases() {
- return TYPE_ALIASES;
- }
}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2CreateTableSQLGenerator.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2CreateTableSQLGenerator.java
new file mode 100644
index 00000000000..e9128c7deba
--- /dev/null
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2CreateTableSQLGenerator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
+
+import org.apache.shardingsphere.data.pipeline.core.exception.syntax.CreateTableSQLGenerateException;
+import org.apache.shardingsphere.data.pipeline.spi.ddlgenerator.CreateTableSQLGenerator;
+import org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+* Create table SQL generator for H2.
+ */
+public final class H2CreateTableSQLGenerator implements CreateTableSQLGenerator {
+
+ @Override
+ public Collection<String> generate(final DataSource dataSource, final String schemaName, final String tableName) throws SQLException {
+ if ("t_order".equalsIgnoreCase(tableName)) {
+ return Collections.singletonList(PipelineContextUtils.getCreateOrderTableSchema());
+ }
+ throw new CreateTableSQLGenerateException(tableName);
+ }
+
+ @Override
+ public String getType() {
+ return "H2";
+ }
+}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2DataSourcePreparer.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2DataSourcePreparer.java
new file mode 100644
index 00000000000..40221615179
--- /dev/null
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2DataSourcePreparer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
+
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/**
+ * Data source preparer for H2.
+ */
+public final class H2DataSourcePreparer extends AbstractDataSourcePreparer {
+
+ @Override
+ public void prepareTargetTables(final PrepareTargetTablesParameter param) throws SQLException {
+ PipelineDataSourceManager dataSourceManager = param.getDataSourceManager();
+ for (CreateTableEntry each : param.getCreateTableConfig().getCreateTableEntries()) {
+ String createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine());
+ try (Connection targetConnection = getCachedDataSource(dataSourceManager, each.getTargetDataSourceConfig()).getConnection()) {
+ executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(createTargetTableSQL));
+ }
+ }
+ }
+
+ @Override
+ public String getType() {
+ return "H2";
+ }
+}
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/yaml/YamlConsistencyCheckJobConfiguration.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PositionInitializer.java
similarity index 54%
copy from kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/yaml/YamlConsistencyCheckJobConfiguration.java
copy to test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PositionInitializer.java
index 2a64cc5b2e4..4f321ba7a92 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/yaml/YamlConsistencyCheckJobConfiguration.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PositionInitializer.java
@@ -15,33 +15,27 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml;
+package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
-import java.util.Properties;
+import javax.sql.DataSource;
-/**
- * Consistency check job configuration for YAML.
- */
-@Getter
-@Setter
-@ToString
-public final class YamlConsistencyCheckJobConfiguration implements YamlPipelineJobConfiguration {
-
- private String jobId;
+public final class H2PositionInitializer implements PositionInitializer {
- private String parentJobId;
-
- private String algorithmTypeName;
+ @Override
+ public PlaceholderPosition init(final DataSource dataSource, final String slotNameSuffix) {
+ return new PlaceholderPosition();
+ }
- private Properties algorithmProps;
+ @Override
+ public PlaceholderPosition init(final String data) {
+ return new PlaceholderPosition();
+ }
@Override
- public String getTargetDatabaseName() {
- throw new UnsupportedOperationException("");
+ public String getType() {
+ return "H2";
}
}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java
index ab11dde895a..66dc8fbee93 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java
@@ -30,7 +30,6 @@ import org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContext
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -66,7 +65,8 @@ class IncrementalTaskTest {
incrementalTask.stop();
}
- @Test
+ // TODO H2 doesn't support incremental
+ // @Test
void assertStart() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture.allOf(incrementalTask.start().toArray(new CompletableFuture[0])).get(10, TimeUnit.SECONDS);
assertThat(incrementalTask.getTaskId(), is("ds_0"));
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
index ca387ae5e82..339c9c8b841 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -20,19 +20,27 @@ package org.apache.shardingsphere.test.it.data.pipeline.core.util;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.test.util.ConfigurationFileUtils;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -57,6 +65,7 @@ public final class JobConfigurationBuilder {
* Create YAML migration job configuration.
*
* @return created job configuration
+ * @throws SQLWrapperException if there's SQLException when creating table
*/
public static YamlMigrationJobConfiguration createYamlMigrationJobConfiguration() {
YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
@@ -65,22 +74,34 @@ public final class JobConfigurationBuilder {
result.setTargetDatabaseType("H2");
result.setTargetTableNames(Collections.singletonList("t_order"));
Map<String, String> targetTableSchemaMap = new LinkedHashMap<>();
- targetTableSchemaMap.put("t_order", "");
+ targetTableSchemaMap.put("t_order", null);
result.setTargetTableSchemaMap(targetTableSchemaMap);
result.setTablesFirstDataNodes("t_order:ds_0.t_order");
result.setJobShardingDataNodes(Collections.singletonList("t_order:ds_0.t_order"));
- result.setJobId(generateJobId(result));
+ PipelineContextKey contextKey = PipelineContextKey.build(InstanceType.PROXY, RandomStringUtils.randomAlphabetic(32));
+ result.setJobId(generateMigrationJobId(contextKey, result));
Map<String, YamlPipelineDataSourceConfiguration> sources = new LinkedHashMap<>();
- sources.put("ds_0", createYamlPipelineDataSourceConfiguration(new StandardPipelineDataSourceConfiguration(ConfigurationFileUtils.readFile("migration_standard_jdbc_source.yaml"))));
+ String databaseNameSuffix = RandomStringUtils.randomAlphabetic(9);
+ PipelineDataSourceConfiguration sourceDataSourceConfig = new StandardPipelineDataSourceConfiguration(
+ ConfigurationFileUtils.readFile("migration_standard_jdbc_source.yaml").replace("${databaseNameSuffix}", databaseNameSuffix));
+ try (
+ PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
+ Connection connection = dataSource.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(PipelineContextUtils.getCreateOrderTableSchema());
+ } catch (final SQLException ex) {
+ throw new SQLWrapperException(ex);
+ }
+ sources.put("ds_0", createYamlPipelineDataSourceConfiguration(sourceDataSourceConfig));
result.setSources(sources);
result.setTarget(createYamlPipelineDataSourceConfiguration(new ShardingSpherePipelineDataSourceConfiguration(
- ConfigurationFileUtils.readFile("migration_sharding_sphere_jdbc_target.yaml"))));
- TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION").extendYamlJobConfiguration(result);
+ ConfigurationFileUtils.readFile("migration_sharding_sphere_jdbc_target.yaml").replace("${databaseNameSuffix}", databaseNameSuffix))));
+ TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION").extendYamlJobConfiguration(contextKey, result);
return result;
}
- private static String generateJobId(final YamlMigrationJobConfiguration yamlJobConfig) {
- MigrationJobId migrationJobId = new MigrationJobId(yamlJobConfig.getJobShardingDataNodes(), RandomStringUtils.randomAlphabetic(32));
+ private static String generateMigrationJobId(final PipelineContextKey contextKey, final YamlMigrationJobConfiguration yamlJobConfig) {
+ MigrationJobId migrationJobId = new MigrationJobId(contextKey, yamlJobConfig.getJobShardingDataNodes());
return new MigrationJobAPI().marshalJobId(migrationJobId);
}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index 853608bb903..8635f8435e3 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -26,6 +26,8 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Shardi
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtils;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
@@ -62,6 +64,8 @@ import java.util.Map;
*/
public final class PipelineContextUtils {
+ private static final PipelineContextKey CONTEXT_KEY = PipelineContextKey.buildForProxy();
+
private static final ExecuteEngine EXECUTE_ENGINE = ExecuteEngine.newCachedThreadInstance(PipelineContextUtils.class.getSimpleName());
private static final PipelineChannelCreator PIPELINE_CHANNEL_CREATOR = TypedSPILoader.getService(PipelineChannelCreator.class, "MEMORY");
@@ -72,19 +76,20 @@ public final class PipelineContextUtils {
@SneakyThrows
public static void mockModeConfigAndContextManager() {
EmbedTestingServer.start();
- if (null != PipelineContext.getContextManager()) {
+ PipelineContextKey contextKey = getContextKey();
+ if (null != PipelineContextManager.getContext(contextKey)) {
return;
}
ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig = new ShardingSpherePipelineDataSourceConfiguration(
ConfigurationFileUtils.readFileAndIgnoreComments("config_sharding_sphere_jdbc_source.yaml"));
YamlRootConfiguration rootConfig = (YamlRootConfiguration) pipelineDataSourceConfig.getDataSourceConfiguration();
ModeConfiguration modeConfig = new YamlModeConfigurationSwapper().swapToObject(rootConfig.getMode());
- PipelineContext.initModeConfig(modeConfig);
ShardingSphereDataSource dataSource = (ShardingSphereDataSource) PipelineDataSourceFactory.newInstance(pipelineDataSourceConfig).getDataSource();
ContextManager contextManager = getContextManager(dataSource);
MetaDataPersistService persistService = new MetaDataPersistService(getClusterPersistRepository((ClusterPersistRepositoryConfiguration) modeConfig.getRepository()));
MetaDataContexts metaDataContexts = renewMetaDataContexts(contextManager.getMetaDataContexts(), persistService);
- PipelineContext.initContextManager(new ContextManager(metaDataContexts, contextManager.getInstanceContext()));
+ PipelineContext pipelineContext = new PipelineContext(modeConfig, new ContextManager(metaDataContexts, contextManager.getInstanceContext()));
+ PipelineContextManager.putContext(contextKey, pipelineContext);
}
@SneakyThrows(ReflectiveOperationException.class)
@@ -100,12 +105,29 @@ public final class PipelineContextUtils {
private static MetaDataContexts renewMetaDataContexts(final MetaDataContexts old, final MetaDataPersistService persistService) {
Map<String, ShardingSphereTable> tables = new HashMap<>(3, 1);
- tables.put("t_order", new ShardingSphereTable("t_order", Arrays.asList(new ShardingSphereColumn("order_id", Types.INTEGER, true, false, false, true, false),
- new ShardingSphereColumn("user_id", Types.VARCHAR, false, false, false, true, false)), Collections.emptyList(), Collections.emptyList()));
+ tables.put("t_order", new ShardingSphereTable("t_order", Arrays.asList(
+ new ShardingSphereColumn("order_id", Types.INTEGER, true, false, false, true, false),
+ new ShardingSphereColumn("user_id", Types.INTEGER, false, false, false, true, false),
+ new ShardingSphereColumn("status", Types.VARCHAR, false, false, false, true, false)), Collections.emptyList(), Collections.emptyList()));
+ tables.put("t_order_item", new ShardingSphereTable("t_order_item", Arrays.asList(
+ new ShardingSphereColumn("item_id", Types.INTEGER, true, false, false, true, false),
+ new ShardingSphereColumn("order_id", Types.INTEGER, false, false, false, true, false),
+ new ShardingSphereColumn("user_id", Types.INTEGER, false, false, false, true, false),
+ new ShardingSphereColumn("status", Types.VARCHAR, false, false, false, true, false)),
+ Collections.emptyList(), Collections.emptyList()));
old.getMetaData().getDatabase("logic_db").getSchema("logic_db").putAll(tables);
return new MetaDataContexts(persistService, old.getMetaData());
}
+ /**
+ * Get create order table schema.
+ *
+ * @return order table schema
+ */
+ public static String getCreateOrderTableSchema() {
+ return "CREATE TABLE IF NOT EXISTS t_order (order_id INT PRIMARY KEY, user_id INT, status VARCHAR(32))";
+ }
+
/**
* Mock order_id column meta data.
*
@@ -115,6 +137,15 @@ public final class PipelineContextUtils {
return new PipelineColumnMetaData(1, "order_id", Types.INTEGER, "int", false, true, true);
}
+ /**
+ * Get context key.
+ *
+ * @return context key
+ */
+ public static PipelineContextKey getContextKey() {
+ return CONTEXT_KEY;
+ }
+
/**
* Get execute engine.
*
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierTest.java
index 8ab7ec2d163..b4e15b29791 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierTest.java
@@ -17,7 +17,8 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.util;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.mode.spi.PersistRepository;
@@ -41,10 +42,11 @@ class PipelineDistributedBarrierTest {
@Test
void assertRegisterAndRemove() throws ReflectiveOperationException {
- String jobId = "j0130317c3054317c7363616c696e675f626d73716c";
- PersistRepository repository = PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
+ String jobId = JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId();
+ PipelineContextKey contextKey = PipelineContextUtils.getContextKey();
+ PersistRepository repository = PipelineContextManager.getContext(contextKey).getContextManager().getMetaDataContexts().getPersistService().getRepository();
repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), "");
- PipelineDistributedBarrier instance = PipelineDistributedBarrier.getInstance();
+ PipelineDistributedBarrier instance = PipelineDistributedBarrier.getInstance(contextKey);
String parentPath = "/barrier";
instance.register(parentPath, 1);
Map<?, ?> countDownLatchMap = (Map<?, ?>) Plugins.getMemberAccessor().get(PipelineDistributedBarrier.class.getDeclaredField("countDownLatchHolders"), instance);
@@ -56,10 +58,11 @@ class PipelineDistributedBarrierTest {
@Test
void assertAwait() {
- String jobId = "j0130317c3054317c7363616c696e675f626d73716c";
- PersistRepository repository = PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
+ String jobId = JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId();
+ PipelineContextKey contextKey = PipelineContextUtils.getContextKey();
+ PersistRepository repository = PipelineContextManager.getContext(contextKey).getContextManager().getMetaDataContexts().getPersistService().getRepository();
repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), "");
- PipelineDistributedBarrier instance = PipelineDistributedBarrier.getInstance();
+ PipelineDistributedBarrier instance = PipelineDistributedBarrier.getInstance(contextKey);
String barrierEnablePath = PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
instance.register(barrierEnablePath, 1);
instance.persistEphemeralChildrenNode(barrierEnablePath, 1);
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 0d28631e2cb..4e8969ae1f5 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -19,13 +19,17 @@ package org.apache.shardingsphere.test.it.data.pipeline.scenario.consistencychec
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -46,9 +50,11 @@ class ConsistencyCheckJobTest {
@Test
void assertBuildPipelineJobItemContext() throws ReflectiveOperationException {
- String checkJobId = "j0201001";
+ ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(PipelineContextKey.buildForProxy(), JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId());
+ String checkJobId = new ConsistencyCheckJobAPI().marshalJobId(pipelineJobId);
Map<String, Object> expectTableCheckPosition = Collections.singletonMap("t_order", 100);
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(checkJobId, 0, YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
+ PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).persistJobItemProgress(checkJobId, 0,
+ YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
ConsistencyCheckJob consistencyCheckJob = new ConsistencyCheckJob();
Plugins.getMemberAccessor().invoke(AbstractPipelineJob.class.getDeclaredMethod("setJobId", String.class), consistencyCheckJob, checkJobId);
ConsistencyCheckJobItemContext actual = consistencyCheckJob.buildPipelineJobItemContext(
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index ddab19151df..a22ba1b7529 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -23,14 +23,13 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
import org.junit.jupiter.api.BeforeAll;
@@ -48,24 +47,20 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class ConsistencyCheckJobAPITest {
- private static ConsistencyCheckJobAPI checkJobAPI;
-
- private static MigrationJobAPI migrationJobAPI;
+ private final ConsistencyCheckJobAPI checkJobAPI = new ConsistencyCheckJobAPI();
@BeforeAll
- static void beforeClass() {
+ public static void beforeClass() {
PipelineContextUtils.mockModeConfigAndContextManager();
- checkJobAPI = new ConsistencyCheckJobAPI();
- migrationJobAPI = new MigrationJobAPI();
}
@Test
void assertCreateJobConfig() {
- String migrationJobId = "j0101test";
- String checkJobId = checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(migrationJobId, null, null));
+ String parentJobId = JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId();
+ String checkJobId = checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null));
ConsistencyCheckJobConfiguration jobConfig = checkJobAPI.getJobConfiguration(checkJobId);
int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
- String expectCheckJobId = "j0201" + migrationJobId + expectedSequence;
+ String expectCheckJobId = checkJobAPI.marshalJobId(new ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), parentJobId, expectedSequence));
assertThat(jobConfig.getJobId(), is(expectCheckJobId));
assertNull(jobConfig.getAlgorithmTypeName());
int sequence = ConsistencyCheckJobId.parseSequence(expectCheckJobId);
@@ -74,22 +69,22 @@ class ConsistencyCheckJobAPITest {
@Test
void assertGetLatestDataConsistencyCheckResult() {
- Optional<String> jobId = migrationJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
- assertTrue(jobId.isPresent());
- String checkJobId = checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(jobId.get(), null, null));
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistLatestCheckJobId(jobId.get(), checkJobId);
+ String parentJobId = JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId();
+ String checkJobId = checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null));
+ GovernanceRepositoryAPI governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
+ governanceRepositoryAPI.persistLatestCheckJobId(parentJobId, checkJobId);
Map<String, DataConsistencyCheckResult> expectedCheckResult = Collections.singletonMap("t_order", new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(1, 1),
new DataConsistencyContentCheckResult(true)));
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(jobId.get(), checkJobId, expectedCheckResult);
- Map<String, DataConsistencyCheckResult> actualCheckResult = checkJobAPI.getLatestDataConsistencyCheckResult(jobId.get());
+ governanceRepositoryAPI.persistCheckJobResult(parentJobId, checkJobId, expectedCheckResult);
+ Map<String, DataConsistencyCheckResult> actualCheckResult = checkJobAPI.getLatestDataConsistencyCheckResult(parentJobId);
assertThat(actualCheckResult.size(), is(expectedCheckResult.size()));
assertThat(actualCheckResult.get("t_order").getCountCheckResult().isMatched(), is(expectedCheckResult.get("t_order").getContentCheckResult().isMatched()));
}
@Test
void assertDropByParentJobId() {
- String parentJobId = getParentJobId(JobConfigurationBuilder.createJobConfiguration());
- GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
+ String parentJobId = JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId();
+ GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
int expectedSequence = 1;
for (int i = 0; i < 3; i++) {
String checkJobId = checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null));
@@ -114,10 +109,4 @@ class ConsistencyCheckJobAPITest {
Optional<String> latestCheckJobId = repositoryAPI.getLatestCheckJobId(parentJobId);
assertFalse(latestCheckJobId.isPresent());
}
-
- private String getParentJobId(final MigrationJobConfiguration jobConfig) {
- Optional<String> result = migrationJobAPI.start(jobConfig);
- assertTrue(result.isPresent());
- return result.get();
- }
}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 0f8cfd4970d..81e37cedc2b 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -33,6 +33,7 @@ import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineDataSourcePersistService;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
@@ -82,6 +83,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -103,12 +105,12 @@ class MigrationJobAPITest {
props.put("jdbcUrl", jdbcUrl);
props.put("username", "root");
props.put("password", "root");
- jobAPI.addMigrationSourceResources(Collections.singletonMap("ds_0", new DataSourceProperties("com.zaxxer.hikari.HikariDataSource", props)));
+ jobAPI.addMigrationSourceResources(PipelineContextUtils.getContextKey(), Collections.singletonMap("ds_0", new DataSourceProperties("com.zaxxer.hikari.HikariDataSource", props)));
}
@AfterAll
static void afterClass() {
- jobAPI.dropMigrationSourceResources(Collections.singletonList("ds_0"));
+ jobAPI.dropMigrationSourceResources(PipelineContextUtils.getContextKey(), Collections.singletonList("ds_0"));
}
@Test
@@ -121,7 +123,7 @@ class MigrationJobAPITest {
}
private JobConfigurationPOJO getJobConfigurationPOJO(final String jobId) {
- return PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
+ return PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobConfiguration(jobId);
}
@Test
@@ -130,7 +132,7 @@ class MigrationJobAPITest {
assertTrue(jobId.isPresent());
assertFalse(getJobConfigurationPOJO(jobId.get()).isDisabled());
PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class);
- when(PipelineDistributedBarrier.getInstance()).thenReturn(mockBarrier);
+ when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
jobAPI.stop(jobId.get());
assertTrue(getJobConfigurationPOJO(jobId.get()).isDisabled());
jobAPI.startDisabledJob(jobId.get());
@@ -144,7 +146,7 @@ class MigrationJobAPITest {
MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(jobId.get());
initTableData(jobConfig);
PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class);
- when(PipelineDistributedBarrier.getInstance()).thenReturn(mockBarrier);
+ when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
jobAPI.rollback(jobId.get());
assertNull(getJobConfigurationPOJO(jobId.get()));
}
@@ -156,7 +158,7 @@ class MigrationJobAPITest {
MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(jobId.get());
initTableData(jobConfig);
PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class);
- when(PipelineDistributedBarrier.getInstance()).thenReturn(mockBarrier);
+ when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
jobAPI.commit(jobId.get());
assertNull(getJobConfigurationPOJO(jobId.get()));
}
@@ -271,7 +273,7 @@ class MigrationJobAPITest {
@Test
void assertAddMigrationSourceResources() {
PipelineDataSourcePersistService persistService = new PipelineDataSourcePersistService();
- Map<String, DataSourceProperties> actual = persistService.load(new MigrationJobType());
+ Map<String, DataSourceProperties> actual = persistService.load(PipelineContextUtils.getContextKey(), new MigrationJobType());
assertTrue(actual.containsKey("ds_0"));
}
@@ -279,20 +281,20 @@ class MigrationJobAPITest {
void assertCreateJobConfigFailedOnMoreThanOneSourceTable() {
List<SourceTargetEntry> sourceTargetEntries = Stream.of("t_order_0", "t_order_1")
.map(each -> new SourceTargetEntry("logic_db", new DataNode("ds_0", each), "t_order")).collect(Collectors.toList());
- assertThrows(PipelineInvalidParameterException.class, () -> jobAPI.createJobAndStart(new MigrateTableStatement(sourceTargetEntries, "logic_db")));
+ assertThrows(PipelineInvalidParameterException.class, () -> jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new MigrateTableStatement(sourceTargetEntries, "logic_db")));
}
@Test
void assertCreateJobConfigFailedOnDataSourceNotExist() {
List<SourceTargetEntry> sourceTargetEntries = Collections.singletonList(new SourceTargetEntry("logic_db", new DataNode("ds_not_exists", "t_order"), "t_order"));
- assertThrows(PipelineInvalidParameterException.class, () -> jobAPI.createJobAndStart(new MigrateTableStatement(sourceTargetEntries, "logic_db")));
+ assertThrows(PipelineInvalidParameterException.class, () -> jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new MigrateTableStatement(sourceTargetEntries, "logic_db")));
}
@Test
void assertCreateJobConfig() throws SQLException {
initIntPrimaryEnvironment();
SourceTargetEntry sourceTargetEntry = new SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order");
- String jobId = jobAPI.createJobAndStart(new MigrateTableStatement(Collections.singletonList(sourceTargetEntry), "logic_db"));
+ String jobId = jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new MigrateTableStatement(Collections.singletonList(sourceTargetEntry), "logic_db"));
MigrationJobConfiguration actual = jobAPI.getJobConfiguration(jobId);
assertThat(actual.getTargetDatabaseName(), is("logic_db"));
List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes();
@@ -307,7 +309,7 @@ class MigrationJobAPITest {
}
private void initIntPrimaryEnvironment() throws SQLException {
- Map<String, DataSourceProperties> metaDataDataSource = new PipelineDataSourcePersistService().load(new MigrationJobType());
+ Map<String, DataSourceProperties> metaDataDataSource = new PipelineDataSourcePersistService().load(PipelineContextUtils.getContextKey(), new MigrationJobType());
DataSourceProperties dataSourceProps = metaDataDataSource.get("ds_0");
try (
PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(DataSourcePoolCreator.create(dataSourceProps), databaseType);
@@ -320,7 +322,7 @@ class MigrationJobAPITest {
@Test
void assertShowMigrationSourceResources() {
- Collection<Collection<Object>> actual = jobAPI.listMigrationSourceResources();
+ Collection<Collection<Object>> actual = jobAPI.listMigrationSourceResources(PipelineContextUtils.getContextKey());
assertThat(actual.size(), is(1));
Collection<Object> objects = actual.iterator().next();
assertThat(objects.toArray()[0], is("ds_0"));
@@ -333,7 +335,7 @@ class MigrationJobAPITest {
String jobId = optional.get();
YamlInventoryIncrementalJobItemProgress yamlJobItemProgress = new YamlInventoryIncrementalJobItemProgress();
yamlJobItemProgress.setStatus(JobStatus.RUNNING.name());
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, 0, YamlEngine.marshal(yamlJobItemProgress));
+ PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).persistJobItemProgress(jobId, 0, YamlEngine.marshal(yamlJobItemProgress));
List<InventoryIncrementalJobItemInfo> jobItemInfos = jobAPI.getJobItemInfos(jobId);
assertThat(jobItemInfos.size(), is(1));
InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
@@ -350,7 +352,7 @@ class MigrationJobAPITest {
yamlJobItemProgress.setProcessedRecordsCount(100);
yamlJobItemProgress.setInventoryRecordsCount(50);
String jobId = optional.get();
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, 0, YamlEngine.marshal(yamlJobItemProgress));
+ PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).persistJobItemProgress(jobId, 0, YamlEngine.marshal(yamlJobItemProgress));
List<InventoryIncrementalJobItemInfo> jobItemInfos = jobAPI.getJobItemInfos(jobId);
InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
assertThat(jobItemInfo.getJobItemProgress().getStatus(), is(JobStatus.EXECUTE_INCREMENTAL_TASK));
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 02ddd993024..93d0986119f 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.test.it.data.pipeline.scenario.migration.check
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
@@ -56,8 +57,9 @@ class MigrationDataConsistencyCheckerTest {
MigrationJobConfiguration jobConfig = createJobConfiguration();
JobConfigurationPOJO jobConfigurationPOJO = new JobConfigurationPOJO();
jobConfigurationPOJO.setJobParameter(YamlEngine.marshal(new YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
- PipelineAPIFactory.getGovernanceRepositoryAPI().persist(String.format("/pipeline/jobs/%s/config", jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobConfig.getJobId(), 0, "");
+ GovernanceRepositoryAPI governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
+ governanceRepositoryAPI.persist(String.format("/pipeline/jobs/%s/config", jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
+ governanceRepositoryAPI.persistJobItemProgress(jobConfig.getJobId(), 0, "");
Map<String, DataConsistencyCheckResult> actual = new MigrationDataConsistencyChecker(jobConfig, new MigrationProcessContext(jobConfig.getJobId(), null),
createConsistencyCheckJobItemProgressContext()).check(new DataConsistencyCalculateAlgorithmFixture());
String checkKey = "ds_0.t_order";
diff --git a/test/it/pipeline/src/test/resources/migration_standard_jdbc_source.yaml b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer
similarity index 71%
copy from test/it/pipeline/src/test/resources/migration_standard_jdbc_source.yaml
copy to test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer
index 9b7240e57f4..d2cc59ec453 100644
--- a/test/it/pipeline/src/test/resources/migration_standard_jdbc_source.yaml
+++ b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer
@@ -15,15 +15,4 @@
# limitations under the License.
#
-url: jdbc:h2:mem:standard_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
-username: root
-password: root
-dataSourceClassName: com.zaxxer.hikari.HikariDataSource
-minimumIdle: 1
-minPoolSize: 1
-maxPoolSize: 50
-maximumPoolSize: 50
-readOnly: false
-idleTimeout: 60000
-connectionTimeout: 30000
-maxLifetime: 1800000
+org.apache.shardingsphere.test.it.data.pipeline.core.fixture.H2DataSourcePreparer
diff --git a/test/it/pipeline/src/test/resources/migration_standard_jdbc_source.yaml b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ddlgenerator.CreateTableSQLGenerator
similarity index 71%
copy from test/it/pipeline/src/test/resources/migration_standard_jdbc_source.yaml
copy to test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ddlgenerator.CreateTableSQLGenerator
index 9b7240e57f4..ebe233712dc 100644
--- a/test/it/pipeline/src/test/resources/migration_standard_jdbc_source.yaml
+++ b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ddlgenerator.CreateTableSQLGenerator
@@ -15,15 +15,4 @@
# limitations under the License.
#
-url: jdbc:h2:mem:standard_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
-username: root
-password: root
-dataSourceClassName: com.zaxxer.hikari.HikariDataSource
-minimumIdle: 1
-minPoolSize: 1
-maxPoolSize: 50
-maximumPoolSize: 50
-readOnly: false
-idleTimeout: 60000
-connectionTimeout: 30000
-maxLifetime: 1800000
+org.apache.shardingsphere.test.it.data.pipeline.core.fixture.H2CreateTableSQLGenerator
diff --git a/test/it/pipeline/src/test/resources/migration_standard_jdbc_source.yaml b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer
similarity index 71%
copy from test/it/pipeline/src/test/resources/migration_standard_jdbc_source.yaml
copy to test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer
index 9b7240e57f4..3814bacb8c5 100644
--- a/test/it/pipeline/src/test/resources/migration_standard_jdbc_source.yaml
+++ b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer
@@ -15,15 +15,4 @@
# limitations under the License.
#
-url: jdbc:h2:mem:standard_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
-username: root
-password: root
-dataSourceClassName: com.zaxxer.hikari.HikariDataSource
-minimumIdle: 1
-minPoolSize: 1
-maxPoolSize: 50
-maximumPoolSize: 50
-readOnly: false
-idleTimeout: 60000
-connectionTimeout: 30000
-maxLifetime: 1800000
+org.apache.shardingsphere.test.it.data.pipeline.core.fixture.H2PositionInitializer
diff --git a/test/it/pipeline/src/test/resources/migration_sharding_sphere_jdbc_target.yaml b/test/it/pipeline/src/test/resources/migration_sharding_sphere_jdbc_target.yaml
index b9aadb5c27d..7e105fe60cc 100644
--- a/test/it/pipeline/src/test/resources/migration_sharding_sphere_jdbc_target.yaml
+++ b/test/it/pipeline/src/test/resources/migration_sharding_sphere_jdbc_target.yaml
@@ -18,12 +18,12 @@
dataSources:
ds_1:
dataSourceClassName: com.zaxxer.hikari.HikariDataSource
- url: jdbc:h2:mem:test_ds_1;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
+ url: jdbc:h2:mem:test_ds_1_${databaseNameSuffix};DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
username: root
password: root
ds_2:
dataSourceClassName: com.zaxxer.hikari.HikariDataSource
- url: jdbc:h2:mem:test_ds_2;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
+ url: jdbc:h2:mem:test_ds_2_${databaseNameSuffix};DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
username: root
password: root
rules:
diff --git a/test/it/pipeline/src/test/resources/migration_standard_jdbc_source.yaml b/test/it/pipeline/src/test/resources/migration_standard_jdbc_source.yaml
index 9b7240e57f4..dd4e2825f41 100644
--- a/test/it/pipeline/src/test/resources/migration_standard_jdbc_source.yaml
+++ b/test/it/pipeline/src/test/resources/migration_standard_jdbc_source.yaml
@@ -15,7 +15,7 @@
# limitations under the License.
#
-url: jdbc:h2:mem:standard_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
+url: jdbc:h2:mem:test_ds_${databaseNameSuffix};DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
username: root
password: root
dataSourceClassName: com.zaxxer.hikari.HikariDataSource