You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/05/15 07:16:51 UTC

[shardingsphere] branch master updated: Extract jobShardingItem from jobConfig for final usage (#17667)

This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2801517a7d4 Extract jobShardingItem from jobConfig for final usage (#17667)
2801517a7d4 is described below

commit 2801517a7d46360051748e5de0f8ce6e84a872af
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Sun May 15 15:16:45 2022 +0800

    Extract jobShardingItem from jobConfig for final usage (#17667)
---
 .../data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java | 4 ++--
 .../data/pipeline/api/config/job/PipelineJobConfiguration.java     | 7 -------
 .../api/config/rulealtered/RuleAlteredJobConfiguration.java        | 6 ------
 .../spi/rulealtered/RuleAlteredJobConfigurationPreparer.java       | 3 ++-
 .../data/pipeline/scenario/rulealtered/RuleAlteredJob.java         | 3 +--
 .../data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java  | 6 +++---
 .../data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java   | 5 +++--
 .../data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java    | 2 +-
 .../data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java     | 5 ++---
 .../core/check/consistency/DataConsistencyCheckerTest.java         | 2 +-
 .../data/pipeline/core/task/IncrementalTaskTest.java               | 2 +-
 .../shardingsphere/data/pipeline/core/task/InventoryTaskTest.java  | 2 +-
 .../pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java    | 2 +-
 .../scenario/rulealtered/prepare/InventoryTaskSplitterTest.java    | 2 +-
 14 files changed, 19 insertions(+), 32 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
index 3a96c8cd560..83be566edd2 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -134,11 +134,11 @@ public final class ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
     
     // TODO use jobConfig as parameter, jobShardingItem
     @Override
-    public TaskConfiguration createTaskConfiguration(final RuleAlteredJobConfiguration jobConfig, final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig) {
+    public TaskConfiguration createTaskConfiguration(final RuleAlteredJobConfiguration jobConfig, final int jobShardingItem, final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig) {
         ShardingSpherePipelineDataSourceConfiguration sourceConfig = getSourceConfiguration(jobConfig);
         ShardingRuleConfiguration sourceRuleConfig = ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(sourceConfig.getRootConfig().getRules());
         Map<String, DataSourceProperties> dataSourcePropsMap = new YamlDataSourceConfigurationSwapper().getDataSourcePropertiesMap(sourceConfig.getRootConfig());
-        JobDataNodeLine dataNodeLine = JobDataNodeLine.unmarshal(jobConfig.getJobShardingDataNodes().get(jobConfig.getJobShardingItem()));
+        JobDataNodeLine dataNodeLine = JobDataNodeLine.unmarshal(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
         String dataSourceName = dataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName();
         Map<ActualTableName, LogicTableName> tableNameMap = new LinkedHashMap<>();
         for (JobDataNodeEntry each : dataNodeLine.getEntries()) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
index 932fcf17537..58bf6cb2128 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
@@ -35,11 +35,4 @@ public interface PipelineJobConfiguration {
      * @return database name
      */
     String getDatabaseName();
-    
-    /**
-     * Get job sharding item.
-     *
-     * @return job sharding item
-     */
-    Integer getJobShardingItem();
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
index bbe8a4a86ec..242dcc22782 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
@@ -54,9 +54,6 @@ public final class RuleAlteredJobConfiguration implements PipelineJobConfigurati
     
     private String databaseName;
     
-    // TODO it should not put in jobConfig since it's mutable
-    private Integer jobShardingItem;
-    
     /**
      * Map{altered rule yaml class name, re-shard needed table names}.
      */
@@ -156,9 +153,6 @@ public final class RuleAlteredJobConfiguration implements PipelineJobConfigurati
             PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(target.getType(), target.getParameter());
             setTargetDatabaseType(targetDataSourceConfig.getDatabaseType().getType());
         }
-        if (null == jobShardingItem) {
-            jobShardingItem = 0;
-        }
     }
     
     private String generateJobId() {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
index 6d47590087d..d189d128da5 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
@@ -38,8 +38,9 @@ public interface RuleAlteredJobConfigurationPreparer extends RequiredSPI {
      * Create task configuration, used by underlying scheduler.
      *
      * @param jobConfig job configuration
+     * @param jobShardingItem job sharding item
      * @param onRuleAlteredActionConfig action configuration
      * @return task configuration
      */
-    TaskConfiguration createTaskConfiguration(RuleAlteredJobConfiguration jobConfig, OnRuleAlteredActionConfiguration onRuleAlteredActionConfig);
+    TaskConfiguration createTaskConfiguration(RuleAlteredJobConfiguration jobConfig, int jobShardingItem, OnRuleAlteredActionConfiguration onRuleAlteredActionConfig);
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index b437dbfe324..bb368119b3f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -43,8 +43,7 @@ public final class RuleAlteredJob implements SimpleJob {
     public void execute(final ShardingContext shardingContext) {
         log.info("Execute job {}-{}", shardingContext.getJobName(), shardingContext.getShardingItem());
         RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(shardingContext.getJobParameter(), RuleAlteredJobConfiguration.class, true);
-        jobConfig.setJobShardingItem(shardingContext.getShardingItem());
-        RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig);
+        RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig, shardingContext.getShardingItem());
         jobContext.setInitProgress(governanceRepositoryAPI.getJobProgress(jobContext.getJobId(), jobContext.getShardingItem()));
         jobContext.setJobPreparer(jobPreparer);
         try {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
index 965e10577a8..6f1550d7bab 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
@@ -83,13 +83,13 @@ public final class RuleAlteredJobContext {
     
     private RuleAlteredJobPreparer jobPreparer;
     
-    public RuleAlteredJobContext(final RuleAlteredJobConfiguration jobConfig) {
+    public RuleAlteredJobContext(final RuleAlteredJobConfiguration jobConfig, final int jobShardingItem) {
         ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
         this.jobConfig = jobConfig;
         jobConfig.buildHandleConfig();
         jobId = jobConfig.getJobId();
-        shardingItem = jobConfig.getJobShardingItem();
-        taskConfig = RuleAlteredJobWorker.buildTaskConfig(jobConfig, ruleAlteredContext.getOnRuleAlteredActionConfig());
+        this.shardingItem = jobShardingItem;
+        taskConfig = RuleAlteredJobWorker.buildTaskConfig(jobConfig, jobShardingItem, ruleAlteredContext.getOnRuleAlteredActionConfig());
     }
     
     /**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index 9f9adfbac22..a9ea71faba6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -274,11 +274,12 @@ public final class RuleAlteredJobWorker {
      * Build task configuration.
      *
      * @param jobConfig job configuration
+     * @param jobShardingItem job sharding item
      * @param onRuleAlteredActionConfig action configuration
      * @return task configuration
      */
-    public static TaskConfiguration buildTaskConfig(final RuleAlteredJobConfiguration jobConfig, final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig) {
-        return RuleAlteredJobConfigurationPreparerFactory.getInstance().createTaskConfiguration(jobConfig, onRuleAlteredActionConfig);
+    public static TaskConfiguration buildTaskConfig(final RuleAlteredJobConfiguration jobConfig, final int jobShardingItem, final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig) {
+        return RuleAlteredJobConfigurationPreparerFactory.getInstance().createTaskConfiguration(jobConfig, jobShardingItem, onRuleAlteredActionConfig);
     }
     
     private boolean hasUncompletedJobOfSameDatabaseName(final String databaseName) {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 9532fb43625..967adc6733b 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -140,7 +140,7 @@ public final class GovernanceRepositoryAPIImplTest {
     }
     
     private RuleAlteredJobContext mockJobContext() {
-        RuleAlteredJobContext result = new RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration());
+        RuleAlteredJobContext result = new RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0);
         TaskConfiguration taskConfig = result.getTaskConfig();
         result.getInventoryTasks().add(mockInventoryTask(taskConfig));
         result.getIncrementalTasks().add(mockIncrementalTask(taskConfig));
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
index 9a84bf22c1a..4a9f60f0adf 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
@@ -200,7 +200,7 @@ public final class RuleAlteredJobAPIImplTest {
         Optional<String> jobId = ruleAlteredJobAPI.start(jobConfig);
         assertTrue(jobId.isPresent());
         final GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
-        RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig);
+        RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig, 0);
         jobContext.setInitProgress(new JobProgress());
         repositoryAPI.persistJobProgress(jobContext);
         repositoryAPI.persistJobCheckResult(jobId.get(), true);
@@ -211,11 +211,10 @@ public final class RuleAlteredJobAPIImplTest {
     @Test
     public void assertSwitchClusterConfigurationSucceed() {
         final RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
-        jobConfig.setJobShardingItem(0);
         Optional<String> jobId = ruleAlteredJobAPI.start(jobConfig);
         assertTrue(jobId.isPresent());
         GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
-        RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig);
+        RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig, 0);
         jobContext.setInitProgress(new JobProgress());
         repositoryAPI.persistJobProgress(jobContext);
         repositoryAPI.persistJobCheckResult(jobId.get(), true);
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
index e1d8ed6c09a..09b3635128e 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
@@ -53,7 +53,7 @@ public final class DataConsistencyCheckerTest {
     }
     
     private RuleAlteredJobConfiguration createJobConfiguration() throws SQLException {
-        RuleAlteredJobContext jobContext = new RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration());
+        RuleAlteredJobContext jobContext = new RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0);
         initTableData(jobContext.getTaskConfig().getDumperConfig().getDataSourceConfig());
         initTableData(jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
         return jobContext.getJobConfig();
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index fd758bb0f73..0fe5da0352c 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -46,7 +46,7 @@ public final class IncrementalTaskTest {
     
     @Before
     public void setUp() {
-        TaskConfiguration taskConfig = new RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
+        TaskConfiguration taskConfig = new RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0).getTaskConfig();
         taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
         PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
         incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 6d2379d1c15..1fd66a11261 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -59,7 +59,7 @@ public final class InventoryTaskTest {
     
     @Before
     public void setUp() {
-        taskConfig = new RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
+        taskConfig = new RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0).getTaskConfig();
     }
     
     @Test(expected = IngestException.class)
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
index 5cd553369f8..577dee68b64 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
@@ -90,7 +90,7 @@ public final class RuleAlteredJobWorkerTest {
     // @Test
     public void assertHasUncompletedJob() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException, IOException {
         final RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
-        RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig);
+        RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig, 0);
         jobContext.setStatus(JobStatus.PREPARING);
         GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
         repositoryAPI.persistJobProgress(jobContext);
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
index 6bc71ec7b2b..213402e8531 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
@@ -62,7 +62,7 @@ public final class InventoryTaskSplitterTest {
     }
     
     private void initJobContext() {
-        jobContext = new RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration());
+        jobContext = new RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0);
         dataSourceManager = jobContext.getDataSourceManager();
         taskConfig = jobContext.getTaskConfig();
     }