You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/12/03 08:09:15 UTC
[shardingsphere] branch master updated: Refactor pipeline JobType as SPI (#22598)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 36a943c2286 Refactor pipeline JobType as SPI (#22598)
36a943c2286 is described below
commit 36a943c2286f2f2908fb2ca39ab0a1b5c53cac8f
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Sat Dec 3 16:09:09 2022 +0800
Refactor pipeline JobType as SPI (#22598)
* Remove lowercaseTypeName in JobType
* Refactor JobType as interface and AbstractJobType
* Refactor JobType as SPI
---
.../data/pipeline/api/PipelineJobPublicAPI.java | 2 +-
.../{JobType.java => type/AbstractJobType.java} | 35 ++-----------
.../data/pipeline/spi/job/JobType.java} | 25 +++++-----
.../data/pipeline/spi/job/JobTypeFactory.java | 57 ++++++++++++++++++++++
.../pipeline/core/api/GovernanceRepositoryAPI.java | 2 +-
.../data/pipeline/core/api/PipelineAPIFactory.java | 2 +-
.../core/api/PipelineMetaDataPersistService.java | 2 +-
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 6 +--
.../api/impl/PipelineDataSourcePersistService.java | 2 +-
...PipelineProcessConfigurationPersistService.java | 6 +--
.../pipeline/core/job/AbstractPipelineJobId.java | 4 +-
.../data/pipeline/core/job/PipelineJobIdUtils.java | 7 ++-
.../type/ConsistencyCheckJobType.java} | 28 +++--------
.../core/metadata/node/PipelineMetaDataNode.java | 4 +-
...ineChangedJobConfigurationProcessorFactory.java | 2 +-
...he.shardingsphere.data.pipeline.spi.job.JobType | 18 +++++++
.../consistencycheck/ConsistencyCheckJobId.java | 4 +-
.../api/impl/ConsistencyCheckJobAPIImpl.java | 6 ++-
...tencyCheckChangedJobConfigurationProcessor.java | 4 +-
.../task/ConsistencyCheckTasksRunner.java | 2 +-
.../scenario/migration/MigrationJobId.java | 3 +-
.../scenario/migration/MigrationJobType.java} | 28 +++--------
.../migration/api/impl/MigrationJobAPIImpl.java | 13 +++--
.../MigrationChangedJobConfigurationProcessor.java | 4 +-
...he.shardingsphere.data.pipeline.spi.job.JobType | 18 +++++++
.../pipeline/core/job/PipelineJobIdUtilsTest.java | 7 +--
.../data/pipeline/api/PipelineAPIFactoryTest.java | 7 +--
.../api/PipelineJobPublicAPIFactoryTest.java | 5 +-
.../core/api/impl/MigrationJobAPIImplTest.java | 6 +--
...lineProcessConfigurationPersistServiceTest.java | 9 ++--
.../metadata/node/PipelineMetaDataNodeTest.java | 6 +--
...hangedJobConfigurationProcessorFactoryTest.java | 7 +--
.../core/util/JobConfigurationBuilder.java | 4 +-
.../job/JobTypeFactoryTest.java} | 21 +++++---
34 files changed, 205 insertions(+), 151 deletions(-)
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
index c4e76b72991..b5e4f2cbd2a 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.data.pipeline.api;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import java.util.List;
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/type/AbstractJobType.java
similarity index 56%
rename from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
rename to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/type/AbstractJobType.java
index 71724be7cbe..5b87a771edc 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/type/AbstractJobType.java
@@ -15,50 +15,25 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.job;
+package org.apache.shardingsphere.data.pipeline.api.job.type;
import com.google.common.base.Preconditions;
import lombok.Getter;
-
-import java.util.Arrays;
-import java.util.Map;
-import java.util.stream.Collectors;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
/**
- * Job type.
+ * Abstract job type.
*/
@Getter
-public enum JobType {
-
- MIGRATION("MIGRATION", "01"),
- CONSISTENCY_CHECK("CONSISTENCY_CHECK", "02");
-
- private static final Map<String, JobType> CODE_JOB_TYPE_MAP;
-
- static {
- CODE_JOB_TYPE_MAP = Arrays.stream(JobType.values()).collect(Collectors.toMap(JobType::getTypeCode, each -> each));
- }
+public abstract class AbstractJobType implements JobType {
private final String typeName;
- private final String lowercaseTypeName;
-
private final String typeCode;
- JobType(final String typeName, final String typeCode) {
+ public AbstractJobType(final String typeName, final String typeCode) {
this.typeName = typeName;
- lowercaseTypeName = typeName.toLowerCase();
Preconditions.checkArgument(2 == typeCode.length(), "code length is not 2");
this.typeCode = typeCode;
}
-
- /**
- * Value of by code.
- *
- * @param typeCode type code
- * @return job type, might be null
- */
- public static JobType valueOfByCode(final String typeCode) {
- return CODE_JOB_TYPE_MAP.get(typeCode);
- }
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/job/JobType.java
similarity index 62%
copy from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
copy to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/job/JobType.java
index 517fbe08fed..b80ae2aa106 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/job/JobType.java
@@ -15,30 +15,27 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.api;
+package org.apache.shardingsphere.data.pipeline.spi.job;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
/**
- * Pipeline meta data persist service.
- *
- * @param <T> type of configuration
+ * Job type.
*/
-public interface PipelineMetaDataPersistService<T> {
+@SingletonSPI
+public interface JobType {
/**
- * Load meta data.
+ * Get type name.
*
- * @param jobType job type, nullable
- * @return configurations
+ * @return type name
*/
- T load(JobType jobType);
+ String getTypeName();
/**
- * Persist meta data.
+ * Get type code.
*
- * @param jobType job type, nullable
- * @param configs configurations
+ * @return type code
*/
- void persist(JobType jobType, T configs);
+ String getTypeCode();
}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/job/JobTypeFactory.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/job/JobTypeFactory.java
new file mode 100644
index 00000000000..468e22f5d33
--- /dev/null
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/job/JobTypeFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.spi.job;
+
+import com.google.common.base.Preconditions;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Job type factory.
+ */
+@Slf4j
+public final class JobTypeFactory {
+
+ private static final Map<String, JobType> CODE_JOB_TYPE_MAP = new ConcurrentHashMap<>();
+
+ static {
+ ShardingSphereServiceLoader.register(JobType.class);
+ for (JobType each : ShardingSphereServiceLoader.getServiceInstances(JobType.class)) {
+ String typeCode = each.getTypeCode();
+ JobType replaced = CODE_JOB_TYPE_MAP.put(typeCode, each);
+ if (replaced != null) {
+ log.error("Type code already exists, typeCode={}, replaced={}, current={}", typeCode, replaced, each, new Exception());
+ }
+ }
+ }
+
+ /**
+ * Get job type instance.
+ *
+ * @param jobTypeCode job type code
+ * @return job type
+ */
+ public static JobType getInstance(final String jobTypeCode) {
+ JobType result = CODE_JOB_TYPE_MAP.get(jobTypeCode);
+ Preconditions.checkNotNull(result, "Can not get job type by `%s`", jobTypeCode);
+ return result;
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index f910062508b..75fb805a811 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.api;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import java.util.Collection;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
index 52972546a8d..1fe434122bb 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
@@ -23,11 +23,11 @@ import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.api.impl.GovernanceRepositoryAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.registry.CoordinatorRegistryCenterInitializer;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobAPIFactory;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobOperateAPI;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
index 517fbe08fed..18dd0e68a9d 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.api;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
/**
* Pipeline meta data persist service.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index d150ccf2811..37c27a7942e 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -21,11 +21,11 @@ import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.yaml.consistency.YamlDataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.yaml.consistency.YamlDataConsistencyCheckResultSwapper;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
+import org.apache.shardingsphere.data.pipeline.yaml.consistency.YamlDataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.yaml.consistency.YamlDataConsistencyCheckResultSwapper;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java
index ae457b2ddd4..d7132d74318 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java
@@ -18,9 +18,9 @@
package org.apache.shardingsphere.data.pipeline.core.api.impl;
import com.google.common.base.Strings;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineMetaDataPersistService;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
index b16c95fa6c7..4d7f932bd3e 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
@@ -19,11 +19,11 @@ package org.apache.shardingsphere.data.pipeline.core.api.impl;
import com.google.common.base.Strings;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineMetaDataPersistService;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
+import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfigurationSwapper;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
/**
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
index 762eb3eeb79..96a75df0559 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
@@ -19,15 +19,13 @@ package org.apache.shardingsphere.data.pipeline.core.job;
import com.google.common.base.Preconditions;
import lombok.Getter;
-import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
/**
* Abstract pipeline job id.
*/
@Getter
-@ToString
public abstract class AbstractPipelineJobId implements PipelineJobId {
private final JobType jobType;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
index e6773a51cbb..5b6ddcc0a80 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
@@ -20,8 +20,9 @@ package org.apache.shardingsphere.data.pipeline.core.job;
import com.google.common.base.Preconditions;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobTypeFactory;
/**
* Pipeline job id utils.
@@ -49,8 +50,6 @@ public final class PipelineJobIdUtils {
Preconditions.checkArgument(jobId.length() > 3, "Invalid jobId length, jobId=%s", jobId);
Preconditions.checkArgument('j' == jobId.charAt(0), "Invalid jobId, first char=%s", jobId.charAt(0));
String typeCode = jobId.substring(1, 3);
- JobType result = JobType.valueOfByCode(typeCode);
- Preconditions.checkNotNull(result, "Can not get job type by `%s`, job ID is `%s`", typeCode, jobId);
- return result;
+ return JobTypeFactory.getInstance(typeCode);
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/ConsistencyCheckJobType.java
similarity index 59%
copy from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
copy to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/ConsistencyCheckJobType.java
index 517fbe08fed..6f285e34aea 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/ConsistencyCheckJobType.java
@@ -15,30 +15,18 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.api;
+package org.apache.shardingsphere.data.pipeline.core.job.type;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.type.AbstractJobType;
/**
- * Pipeline meta data persist service.
- *
- * @param <T> type of configuration
+ * Consistency check job type.
*/
-public interface PipelineMetaDataPersistService<T> {
+public final class ConsistencyCheckJobType extends AbstractJobType {
- /**
- * Load meta data.
- *
- * @param jobType job type, nullable
- * @return configurations
- */
- T load(JobType jobType);
+ public static final String TYPE_CODE = "02";
- /**
- * Persist meta data.
- *
- * @param jobType job type, nullable
- * @param configs configurations
- */
- void persist(JobType jobType, T configs);
+ public ConsistencyCheckJobType() {
+ super("CONSISTENCY_CHECK", TYPE_CODE);
+ }
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
index 33e3eb7bcdb..14659824fc6 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.data.pipeline.core.metadata.node;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import java.util.regex.Pattern;
@@ -49,7 +49,7 @@ public final class PipelineMetaDataNode {
private static String getMetaDataRootPath(final JobType jobType) {
return null == jobType
? String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, "metadata")
- : String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, jobType.getLowercaseTypeName(), "metadata");
+ : String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, jobType.getTypeName().toLowerCase(), "metadata");
}
/**
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java
index 71a5a4febc6..98e7d85aa87 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
diff --git a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.job.JobType b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.job.JobType
new file mode 100644
index 00000000000..51dc1de8e14
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.job.JobType
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
index 5f5158f29b0..085917dd0aa 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
import lombok.Getter;
import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
+import org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;
/**
@@ -45,7 +45,7 @@ public final class ConsistencyCheckJobId extends AbstractPipelineJobId {
}
public ConsistencyCheckJobId(final String parentJobId, final int sequence) {
- super(JobType.CONSISTENCY_CHECK, CURRENT_VERSION);
+ super(new ConsistencyCheckJobType(), CURRENT_VERSION);
this.parentJobId = parentJobId;
this.sequence = sequence > ConsistencyCheckSequence.MAX_SEQUENCE ? ConsistencyCheckSequence.MIN_SEQUENCE : sequence;
}
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPIImpl.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPIImpl.java
index 0cbfb235968..02982d33aef 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPIImpl.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPIImpl.java
@@ -28,7 +28,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProces
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
@@ -45,11 +44,14 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedCon
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
+import org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobTypeFactory;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfiguration;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -278,6 +280,6 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
@Override
public JobType getJobType() {
- return JobType.CONSISTENCY_CHECK;
+ return JobTypeFactory.getInstance(ConsistencyCheckJobType.TYPE_CODE);
}
}
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
index 77dd7c346b7..d4c9978ebfd 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -18,10 +18,10 @@
package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.metadata.processor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
@@ -81,6 +81,6 @@ public final class ConsistencyCheckChangedJobConfigurationProcessor implements P
@Override
public String getType() {
- return JobType.CONSISTENCY_CHECK.getTypeName();
+ return new ConsistencyCheckJobType().getTypeName();
}
}
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index b8e3f417e7e..3f6752b9a06 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -27,7 +27,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfigu
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -38,6 +37,7 @@ import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.Con
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import java.sql.SQLException;
import java.util.Map;
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
index c93e338e300..15cc3991430 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
/**
@@ -44,7 +43,7 @@ public final class MigrationJobId extends AbstractPipelineJobId {
public MigrationJobId(@NonNull final String sourceResourceName, final String sourceSchemaName, @NonNull final String sourceTableName,
@NonNull final String targetDatabaseName, @NonNull final String targetTableName) {
- super(JobType.MIGRATION, CURRENT_VERSION);
+ super(new MigrationJobType(), CURRENT_VERSION);
this.sourceResourceName = sourceResourceName;
this.sourceSchemaName = sourceSchemaName;
this.sourceTableName = sourceTableName;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
similarity index 59%
copy from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
copy to kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
index 517fbe08fed..7fd75061041 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
@@ -15,30 +15,18 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.api;
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.type.AbstractJobType;
/**
- * Pipeline meta data persist service.
- *
- * @param <T> type of configuration
+ * Migration job type.
*/
-public interface PipelineMetaDataPersistService<T> {
+public final class MigrationJobType extends AbstractJobType {
- /**
- * Load meta data.
- *
- * @param jobType job type, nullable
- * @return configurations
- */
- T load(JobType jobType);
+ public static final String TYPE_CODE = "01";
- /**
- * Persist meta data.
- *
- * @param jobType job type, nullable
- * @param configs configurations
- */
- void persist(JobType jobType, T configs);
+ public MigrationJobType() {
+ super("MIGRATION", TYPE_CODE);
+ }
}
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPIImpl.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPIImpl.java
index e4c6a8d4b0d..a04dd264549 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPIImpl.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPIImpl.java
@@ -40,7 +40,6 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
@@ -59,16 +58,20 @@ import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.connection.RegisterMigrationSourceStorageUnitException;
import org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException;
+import org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtil;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobTypeFactory;
import org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtractorFactory;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
@@ -256,7 +259,7 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
super.startDisabledJob(jobId);
PipelineAPIFactory.getGovernanceRepositoryAPI().getLatestCheckJobId(jobId).ifPresent(optional -> {
try {
- PipelineAPIFactory.getPipelineJobAPI(JobType.CONSISTENCY_CHECK).startDisabledJob(optional);
+ PipelineAPIFactory.getPipelineJobAPI(new ConsistencyCheckJobType()).startDisabledJob(optional);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
@@ -269,7 +272,7 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
public void stop(final String jobId) {
PipelineAPIFactory.getGovernanceRepositoryAPI().getLatestCheckJobId(jobId).ifPresent(optional -> {
try {
- PipelineAPIFactory.getPipelineJobAPI(JobType.CONSISTENCY_CHECK).stop(optional);
+ PipelineAPIFactory.getPipelineJobAPI(new ConsistencyCheckJobType()).stop(optional);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
@@ -399,7 +402,7 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
@Override
public String createJobAndStart(final CreateMigrationJobParameter param) {
YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
- Map<String, DataSourceProperties> metaDataDataSource = dataSourcePersistService.load(JobType.MIGRATION);
+ Map<String, DataSourceProperties> metaDataDataSource = dataSourcePersistService.load(new MigrationJobType());
Map<String, Object> sourceDataSourceProps = swapper.swapToMap(metaDataDataSource.get(param.getSourceResourceName()));
YamlPipelineDataSourceConfiguration sourcePipelineDataSourceConfig = createYamlPipelineDataSourceConfiguration(
StandardPipelineDataSourceConfiguration.TYPE, YamlEngine.marshal(sourceDataSourceProps));
@@ -458,7 +461,7 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
@Override
public JobType getJobType() {
- return JobType.MIGRATION;
+ return JobTypeFactory.getInstance(MigrationJobType.TYPE_CODE);
}
@Override
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
index 9f485f4365d..83d6a791561 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
@@ -18,13 +18,13 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration.metadata.processor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.data.pipeline.scenario.migration.prepare.MigrationJobPreparer;
import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
@@ -86,6 +86,6 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
@Override
public String getType() {
- return JobType.MIGRATION.getTypeName();
+ return new MigrationJobType().getTypeName();
}
}
diff --git a/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.job.JobType b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.job.JobType
new file mode 100644
index 00000000000..c1ca2ef098d
--- /dev/null
+++ b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.job.JobType
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType
diff --git a/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java b/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
index 98ccd8398d2..2a1d6e902a7 100644
--- a/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
+++ b/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
@@ -17,11 +17,12 @@
package org.apache.shardingsphere.data.pipeline.core.job;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import org.junit.Test;
-import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
public final class PipelineJobIdUtilsTest {
@@ -31,6 +32,6 @@ public final class PipelineJobIdUtilsTest {
MigrationJobId pipelineJobId = new MigrationJobId("ds_0", null, "t_order", "sharding_db", "t_order");
String jobId = PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + "abcd";
JobType actualJobType = PipelineJobIdUtils.parseJobType(jobId);
- assertThat(actualJobType, is(JobType.MIGRATION));
+ assertThat(actualJobType, instanceOf(MigrationJobType.class));
}
}
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
index 613d82a122b..5274d5e9364 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
@@ -17,9 +17,10 @@
package org.apache.shardingsphere.data.pipeline.api;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPIImpl;
import org.junit.Test;
@@ -30,11 +31,11 @@ public final class PipelineAPIFactoryTest {
@Test
public void assertGetPipelineJobAPI() {
- assertThat(PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION), instanceOf(MigrationJobAPIImpl.class));
+ assertThat(PipelineAPIFactory.getPipelineJobAPI(new MigrationJobType()), instanceOf(MigrationJobAPIImpl.class));
}
@Test
public void assertGetConsistencyCheckJobAPI() {
- assertThat(PipelineAPIFactory.getPipelineJobAPI(JobType.CONSISTENCY_CHECK), instanceOf(ConsistencyCheckJobAPIImpl.class));
+ assertThat(PipelineAPIFactory.getPipelineJobAPI(new ConsistencyCheckJobType()), instanceOf(ConsistencyCheckJobAPIImpl.class));
}
}
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
index f4859f279a0..8810d1f5ee0 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
@@ -18,9 +18,10 @@
package org.apache.shardingsphere.data.pipeline.api;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import org.junit.Test;
import java.util.Collection;
@@ -34,7 +35,7 @@ public final class PipelineJobPublicAPIFactoryTest {
@Test
public void assertGetInventoryIncrementalJobPublicAPI() {
Collection<Pair<JobType, Class<? extends InventoryIncrementalJobPublicAPI>>> paramResult = new LinkedList<>();
- paramResult.add(Pair.of(JobType.MIGRATION, MigrationJobAPIImpl.class));
+ paramResult.add(Pair.of(new MigrationJobType(), MigrationJobAPIImpl.class));
for (Pair<JobType, Class<? extends InventoryIncrementalJobPublicAPI>> each : paramResult) {
assertThat(PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(each.getKey().getTypeName()), instanceOf(each.getValue()));
}
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index d64ff38dd71..69b5965350c 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -25,7 +25,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfig
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
@@ -35,6 +34,7 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineD
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
@@ -243,7 +243,7 @@ public final class MigrationJobAPIImplTest {
@Test
public void assertAddMigrationSourceResources() {
PipelineDataSourcePersistService persistService = new PipelineDataSourcePersistService();
- Map<String, DataSourceProperties> actual = persistService.load(JobType.MIGRATION);
+ Map<String, DataSourceProperties> actual = persistService.load(new MigrationJobType());
assertTrue(actual.containsKey("ds_0"));
}
@@ -259,7 +259,7 @@ public final class MigrationJobAPIImplTest {
}
private void initIntPrimaryEnvironment() throws SQLException {
- Map<String, DataSourceProperties> metaDataDataSource = new PipelineDataSourcePersistService().load(JobType.MIGRATION);
+ Map<String, DataSourceProperties> metaDataDataSource = new PipelineDataSourcePersistService().load(new MigrationJobType());
DataSourceProperties dataSourceProps = metaDataDataSource.get("ds_0");
DataSource dataSource = DataSourcePoolCreator.create(dataSourceProps);
try (
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java
index dddc2c8ce9f..5c7f50a7e66 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java
@@ -18,13 +18,14 @@
package org.apache.shardingsphere.data.pipeline.core.api.impl;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelCreator;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineReadConfiguration;
import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineWriteConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelCreator;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
import org.junit.BeforeClass;
@@ -56,7 +57,7 @@ public final class PipelineProcessConfigurationPersistServiceTest {
String expectedYamlText = YamlEngine.marshal(yamlProcessConfig);
PipelineProcessConfiguration processConfig = new YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig);
PipelineProcessConfigurationPersistService persistService = new PipelineProcessConfigurationPersistService();
- JobType jobType = JobType.MIGRATION;
+ JobType jobType = new MigrationJobType();
persistService.persist(jobType, processConfig);
String actualYamlText = YamlEngine.marshal(new YamlPipelineProcessConfigurationSwapper().swapToYamlConfiguration(persistService.load(jobType)));
assertThat(actualYamlText, is(expectedYamlText));
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
similarity index 93%
rename from kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
rename to test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
index 03c30a92b99..fc025e94fa1 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.metadata.node;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
@@ -37,12 +37,12 @@ public final class PipelineMetaDataNodeTest {
@Test
public void assertGetMetaDataDataSourcesPath() {
- assertThat(PipelineMetaDataNode.getMetaDataDataSourcesPath(JobType.MIGRATION), is(migrationMetaDataRootPath + "/data_sources"));
+ assertThat(PipelineMetaDataNode.getMetaDataDataSourcesPath(new MigrationJobType()), is(migrationMetaDataRootPath + "/data_sources"));
}
@Test
public void assertGetMetaDataProcessConfigPath() {
- assertThat(PipelineMetaDataNode.getMetaDataProcessConfigPath(JobType.MIGRATION), is(migrationMetaDataRootPath + "/process_config"));
+ assertThat(PipelineMetaDataNode.getMetaDataProcessConfigPath(new MigrationJobType()), is(migrationMetaDataRootPath + "/process_config"));
}
@Test
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java
index e9c2fa7af81..e60fb222d2c 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java
@@ -17,8 +17,9 @@
package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.metadata.processor.ConsistencyCheckChangedJobConfigurationProcessor;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.data.pipeline.scenario.migration.metadata.processor.MigrationChangedJobConfigurationProcessor;
import org.junit.Test;
@@ -29,7 +30,7 @@ public final class PipelineChangedJobConfigurationProcessorFactoryTest {
@Test
public void assertGetInstance() {
- assertThat(PipelineChangedJobConfigurationProcessorFactory.getInstance(JobType.MIGRATION), instanceOf(MigrationChangedJobConfigurationProcessor.class));
- assertThat(PipelineChangedJobConfigurationProcessorFactory.getInstance(JobType.CONSISTENCY_CHECK), instanceOf(ConsistencyCheckChangedJobConfigurationProcessor.class));
+ assertThat(PipelineChangedJobConfigurationProcessorFactory.getInstance(new MigrationJobType()), instanceOf(MigrationChangedJobConfigurationProcessor.class));
+ assertThat(PipelineChangedJobConfigurationProcessorFactory.getInstance(new ConsistencyCheckJobType()), instanceOf(ConsistencyCheckChangedJobConfigurationProcessor.class));
}
}
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
index 0317b95a374..85defe6b77b 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -25,9 +25,9 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
@@ -55,7 +55,7 @@ public final class JobConfigurationBuilder {
result.setTarget(createYamlPipelineDataSourceConfiguration(new ShardingSpherePipelineDataSourceConfiguration(
ConfigurationFileUtil.readFile("migration_sharding_sphere_jdbc_target.yaml"))));
result.setUniqueKeyColumn(createYamlPipelineColumnMetaData());
- PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION).extendYamlJobConfiguration(result);
+ PipelineAPIFactory.getPipelineJobAPI(new MigrationJobType()).extendYamlJobConfiguration(result);
return new YamlMigrationJobConfigurationSwapper().swapToObject(result);
}
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/spi/job/JobTypeFactoryTest.java
similarity index 58%
copy from test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java
copy to test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/spi/job/JobTypeFactoryTest.java
index e9c2fa7af81..3dc726ff68d 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/spi/job/JobTypeFactoryTest.java
@@ -15,21 +15,28 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler;
+package org.apache.shardingsphere.data.pipeline.spi.job;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.metadata.processor.ConsistencyCheckChangedJobConfigurationProcessor;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.metadata.processor.MigrationChangedJobConfigurationProcessor;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.junit.Test;
+import java.util.Arrays;
+import java.util.Collection;
+
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
-public final class PipelineChangedJobConfigurationProcessorFactoryTest {
+public final class JobTypeFactoryTest {
@Test
public void assertGetInstance() {
- assertThat(PipelineChangedJobConfigurationProcessorFactory.getInstance(JobType.MIGRATION), instanceOf(MigrationChangedJobConfigurationProcessor.class));
- assertThat(PipelineChangedJobConfigurationProcessorFactory.getInstance(JobType.CONSISTENCY_CHECK), instanceOf(ConsistencyCheckChangedJobConfigurationProcessor.class));
+ Collection<Pair<String, Class<? extends JobType>>> paramResult = Arrays.asList(
+ Pair.of(MigrationJobType.TYPE_CODE, MigrationJobType.class), Pair.of(ConsistencyCheckJobType.TYPE_CODE, ConsistencyCheckJobType.class));
+ for (Pair<String, Class<? extends JobType>> each : paramResult) {
+ JobType actual = JobTypeFactory.getInstance(each.getKey());
+ assertThat(actual, instanceOf(each.getValue()));
+ }
}
}