You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/05/16 03:11:23 UTC

[shardingsphere] branch master updated: Refactor and make RuleAlteredJobConfiguration immutable (#17682)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new fa4a9d61b73 Refactor and make RuleAlteredJobConfiguration immutable (#17682)
fa4a9d61b73 is described below

commit fa4a9d61b730c9b242a2f3c6d4d8ebcb40f443bc
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Mon May 16 11:11:18 2022 +0800

    Refactor and make RuleAlteredJobConfiguration immutable (#17682)
---
 ...hardingRuleAlteredJobConfigurationPreparer.java |  14 +--
 .../rulealtered/RuleAlteredJobConfiguration.java   | 115 ++++-----------------
 .../yaml/RuleAlteredJobConfigurationSwapper.java   |  76 ++++++++++++++
 .../YamlRuleAlteredJobConfiguration.java}          |  61 ++++-------
 .../PipelineDataSourceConfigurationSwapper.java    |  47 +++++++++
 .../RuleAlteredJobConfigurationPreparer.java       |   5 +-
 .../RuleAlteredJobConfigurationTest.java           |  18 +++-
 .../core/api/impl/RuleAlteredJobAPIImpl.java       |  14 +--
 .../check/consistency/DataConsistencyChecker.java  |  15 ++-
 .../pipeline/core/execute/PipelineJobExecutor.java |   9 +-
 .../data/pipeline/core/job/FinishedCheckJob.java   |   6 +-
 .../scenario/rulealtered/RuleAlteredJob.java       |   4 +-
 .../rulealtered/RuleAlteredJobContext.java         |   1 -
 .../scenario/rulealtered/RuleAlteredJobWorker.java |  15 ++-
 .../job/environment/ScalingEnvironmentManager.java |   4 +-
 .../datasource/MySQLDataSourcePreparerTest.java    |  10 +-
 .../data/pipeline/env/ITEnvironmentContext.java    |   4 +-
 .../core/util/JobConfigurationBuilder.java         |   8 +-
 .../scenario/rulealtered/RuleAlteredJobTest.java   |  74 -------------
 .../rulealtered/RuleAlteredJobWorkerTest.java      |   8 +-
 20 files changed, 238 insertions(+), 270 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
index 9121aec9cf1..aa13d53a231 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -24,6 +24,8 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -73,14 +75,14 @@ import java.util.stream.Collectors;
 public final class ShardingRuleAlteredJobConfigurationPreparer implements RuleAlteredJobConfigurationPreparer {
     
     @Override
-    public void extendJobConfiguration(final RuleAlteredJobConfiguration jobConfig) {
-        Map<String, List<DataNode>> shouldScalingActualDataNodes = getShouldScalingActualDataNodes(jobConfig);
-        jobConfig.setJobShardingDataNodes(getJobShardingDataNodes(shouldScalingActualDataNodes));
-        jobConfig.setLogicTables(getLogicTables(shouldScalingActualDataNodes.keySet()));
-        jobConfig.setTablesFirstDataNodes(getTablesFirstDataNodes(shouldScalingActualDataNodes));
+    public void extendJobConfiguration(final YamlRuleAlteredJobConfiguration yamlJobConfig) {
+        Map<String, List<DataNode>> actualDataNodes = getActualDataNodes(new RuleAlteredJobConfigurationSwapper().swapToObject(yamlJobConfig));
+        yamlJobConfig.setJobShardingDataNodes(getJobShardingDataNodes(actualDataNodes));
+        yamlJobConfig.setLogicTables(getLogicTables(actualDataNodes.keySet()));
+        yamlJobConfig.setTablesFirstDataNodes(getTablesFirstDataNodes(actualDataNodes));
     }
     
-    private static Map<String, List<DataNode>> getShouldScalingActualDataNodes(final RuleAlteredJobConfiguration jobConfig) {
+    private static Map<String, List<DataNode>> getActualDataNodes(final RuleAlteredJobConfiguration jobConfig) {
         PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
         ShardingSpherePipelineDataSourceConfiguration source = (ShardingSpherePipelineDataSourceConfiguration) sourceDataSourceConfig;
         ShardingRuleConfiguration sourceRuleConfig = ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(source.getRootConfig().getRules());
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
index 9565b9539c8..b32411d9d19 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
@@ -17,59 +17,46 @@
 
 package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
 
-import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-import lombok.AllArgsConstructor;
 import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobSubType;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Scaling job configuration.
+ * Rule altered job configuration.
  */
-@NoArgsConstructor
-@AllArgsConstructor
+@RequiredArgsConstructor
 @Getter
-@Setter
 @Slf4j
-// TODO share for totally new scenario
-// TODO rename to Yaml, add config class
 public final class RuleAlteredJobConfiguration implements PipelineJobConfiguration {
     
-    private String jobId;
+    private final String jobId;
     
-    private String databaseName;
+    private final String databaseName;
     
-    /**
-     * Map{altered rule yaml class name, re-shard needed table names}.
-     */
-    private Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
+    private final Integer activeVersion;
+    
+    private final Integer newVersion;
     
-    private Integer activeVersion;
+    private final String sourceDatabaseType;
     
-    private Integer newVersion;
+    private final String targetDatabaseType;
     
-    private YamlPipelineDataSourceConfiguration source;
+    private final PipelineDataSourceConfiguration source;
     
-    private YamlPipelineDataSourceConfiguration target;
+    private final PipelineDataSourceConfiguration target;
     
-    private int concurrency = 3;
+    /**
+     * Map{altered rule yaml class name, re-shard needed table names}.
+     */
+    private final Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
     
-    private int retryTimes = 3;
+    private final String logicTables;
     
     /**
      * Collection of each logic table's first data node.
@@ -78,41 +65,13 @@ public final class RuleAlteredJobConfiguration implements PipelineJobConfigurati
      * then value may be: {@code t_order:ds_0.t_order_0|t_order_item:ds_0.t_order_item_0}.
      * </p>
      */
-    private String tablesFirstDataNodes;
+    private final String tablesFirstDataNodes;
     
-    private List<String> jobShardingDataNodes;
-    
-    private String logicTables;
-    
-    private String sourceDatabaseType;
-    
-    private String targetDatabaseType;
-    
-    /**
-     * Set source.
-     *
-     * @param source source configuration
-     */
-    public void setSource(final YamlPipelineDataSourceConfiguration source) {
-        checkParameters(source);
-        this.source = source;
-    }
+    private final List<String> jobShardingDataNodes;
     
-    /**
-     * Set target.
-     *
-     * @param target target configuration
-     */
-    public void setTarget(final YamlPipelineDataSourceConfiguration target) {
-        checkParameters(target);
-        this.target = target;
-    }
+    private final int concurrency;
     
-    private void checkParameters(final YamlPipelineDataSourceConfiguration yamlConfig) {
-        Preconditions.checkNotNull(yamlConfig);
-        Preconditions.checkNotNull(yamlConfig.getType());
-        Preconditions.checkNotNull(yamlConfig.getParameter());
-    }
+    private final int retryTimes;
     
     /**
      * Get job sharding count.
@@ -132,38 +91,6 @@ public final class RuleAlteredJobConfiguration implements PipelineJobConfigurati
         return Splitter.on(',').splitToList(logicTables);
     }
     
-    /**
-     * Build handle configuration.
-     */
-    public void buildHandleConfig() {
-        if (null == getJobShardingDataNodes()) {
-            RuleAlteredJobConfigurationPreparerFactory.getInstance().extendJobConfiguration(this);
-        }
-        if (null == jobId) {
-            jobId = generateJobId();
-        }
-        if (Strings.isNullOrEmpty(getSourceDatabaseType())) {
-            PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(source.getType(), source.getParameter());
-            setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType());
-        }
-        if (Strings.isNullOrEmpty(getTargetDatabaseType())) {
-            PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(target.getType(), target.getParameter());
-            setTargetDatabaseType(targetDataSourceConfig.getDatabaseType().getType());
-        }
-    }
-    
-    private String generateJobId() {
-        RuleAlteredJobId jobId = new RuleAlteredJobId();
-        // TODO type, subTypes
-        jobId.setType(JobType.RULE_ALTERED.getValue());
-        jobId.setFormatVersion(RuleAlteredJobId.CURRENT_VERSION);
-        jobId.setSubTypes(Collections.singletonList(JobSubType.SCALING.getValue()));
-        jobId.setCurrentMetadataVersion(activeVersion);
-        jobId.setNewMetadataVersion(newVersion);
-        jobId.setDatabaseName(databaseName);
-        return jobId.marshal();
-    }
-    
     @Override
     public String toString() {
         return "RuleAlteredJobConfiguration{"
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java
new file mode 100644
index 00000000000..df11b1661e9
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml;
+
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.PipelineDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.infra.yaml.config.swapper.YamlConfigurationSwapper;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
+
+/**
+ * Rule altered job configuration swapper.
+ */
+// TODO add RuleAlteredJobConfigurationSwapper test
+public final class RuleAlteredJobConfigurationSwapper implements YamlConfigurationSwapper<YamlRuleAlteredJobConfiguration, RuleAlteredJobConfiguration> {
+    
+    private static final RuleAlteredJobConfigurationSwapper JOB_CONFIG_SWAPPER = new RuleAlteredJobConfigurationSwapper();
+    
+    private final PipelineDataSourceConfigurationSwapper dataSourceConfigSwapper = new PipelineDataSourceConfigurationSwapper();
+    
+    @Override
+    public YamlRuleAlteredJobConfiguration swapToYamlConfiguration(final RuleAlteredJobConfiguration data) {
+        YamlRuleAlteredJobConfiguration result = new YamlRuleAlteredJobConfiguration();
+        result.setJobId(data.getJobId());
+        result.setDatabaseName(data.getDatabaseName());
+        result.setActiveVersion(data.getActiveVersion());
+        result.setNewVersion(data.getNewVersion());
+        result.setSourceDatabaseType(data.getSourceDatabaseType());
+        result.setTargetDatabaseType(data.getTargetDatabaseType());
+        result.setSource(dataSourceConfigSwapper.swapToYamlConfiguration(data.getSource()));
+        result.setTarget(dataSourceConfigSwapper.swapToYamlConfiguration(data.getTarget()));
+        result.setAlteredRuleYamlClassNameTablesMap(data.getAlteredRuleYamlClassNameTablesMap());
+        result.setLogicTables(data.getLogicTables());
+        result.setTablesFirstDataNodes(data.getTablesFirstDataNodes());
+        result.setJobShardingDataNodes(data.getJobShardingDataNodes());
+        result.setConcurrency(data.getConcurrency());
+        result.setRetryTimes(data.getRetryTimes());
+        return result;
+    }
+    
+    @Override
+    public RuleAlteredJobConfiguration swapToObject(final YamlRuleAlteredJobConfiguration yamlConfig) {
+        return new RuleAlteredJobConfiguration(yamlConfig.getJobId(), yamlConfig.getDatabaseName(),
+                yamlConfig.getActiveVersion(), yamlConfig.getNewVersion(),
+                yamlConfig.getSourceDatabaseType(), yamlConfig.getTargetDatabaseType(),
+                dataSourceConfigSwapper.swapToObject(yamlConfig.getSource()), dataSourceConfigSwapper.swapToObject(yamlConfig.getTarget()),
+                yamlConfig.getAlteredRuleYamlClassNameTablesMap(), yamlConfig.getLogicTables(),
+                yamlConfig.getTablesFirstDataNodes(), yamlConfig.getJobShardingDataNodes(),
+                yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
+    }
+    
+    /**
+     * Swap to job configuration from text.
+     *
+     * @param jobParameter job parameter
+     * @return job configuration
+     */
+    public static RuleAlteredJobConfiguration swapToObject(final String jobParameter) {
+        YamlRuleAlteredJobConfiguration yamlJobConfig = YamlEngine.unmarshal(jobParameter, YamlRuleAlteredJobConfiguration.class, true);
+        return JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
+    }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
similarity index 84%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
index 9565b9539c8..a7b0cd9c7aa 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
@@ -15,17 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
+package org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
-import lombok.AllArgsConstructor;
 import lombok.Getter;
-import lombok.NoArgsConstructor;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
@@ -33,43 +29,42 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobSubType;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
 import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlConfiguration;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Scaling job configuration.
+ * Rule altered job configuration for YAML.
  */
-@NoArgsConstructor
-@AllArgsConstructor
 @Getter
 @Setter
 @Slf4j
-// TODO share for totally new scenario
-// TODO rename to Yaml, add config class
-public final class RuleAlteredJobConfiguration implements PipelineJobConfiguration {
+public final class YamlRuleAlteredJobConfiguration implements YamlConfiguration {
     
     private String jobId;
     
     private String databaseName;
     
-    /**
-     * Map{altered rule yaml class name, re-shard needed table names}.
-     */
-    private Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
-    
     private Integer activeVersion;
     
     private Integer newVersion;
     
+    private String sourceDatabaseType;
+    
+    private String targetDatabaseType;
+    
     private YamlPipelineDataSourceConfiguration source;
     
     private YamlPipelineDataSourceConfiguration target;
     
-    private int concurrency = 3;
+    /**
+     * Map{altered rule yaml class name, re-shard needed table names}.
+     */
+    private Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
     
-    private int retryTimes = 3;
+    private String logicTables;
     
     /**
      * Collection of each logic table's first data node.
@@ -82,11 +77,9 @@ public final class RuleAlteredJobConfiguration implements PipelineJobConfigurati
     
     private List<String> jobShardingDataNodes;
     
-    private String logicTables;
-    
-    private String sourceDatabaseType;
+    private int concurrency = 3;
     
-    private String targetDatabaseType;
+    private int retryTimes = 3;
     
     /**
      * Set source.
@@ -115,27 +108,9 @@ public final class RuleAlteredJobConfiguration implements PipelineJobConfigurati
     }
     
     /**
-     * Get job sharding count.
-     *
-     * @return job sharding count
-     */
-    public int getJobShardingCount() {
-        return null == jobShardingDataNodes ? 0 : jobShardingDataNodes.size();
-    }
-    
-    /**
-     * Split {@linkplain #logicTables} to logic table names.
-     *
-     * @return logic table names
-     */
-    public List<String> splitLogicTableNames() {
-        return Splitter.on(',').splitToList(logicTables);
-    }
-    
-    /**
-     * Build handle configuration.
+     * Extend configuration.
      */
-    public void buildHandleConfig() {
+    public void extendConfiguration() {
         if (null == getJobShardingDataNodes()) {
             RuleAlteredJobConfigurationPreparerFactory.getInstance().extendJobConfiguration(this);
         }
@@ -166,7 +141,7 @@ public final class RuleAlteredJobConfiguration implements PipelineJobConfigurati
     
     @Override
     public String toString() {
-        return "RuleAlteredJobConfiguration{"
+        return "YamlRuleAlteredJobConfiguration{"
                 + "jobId='" + jobId + '\'' + ", databaseName='" + databaseName + '\''
                 + ", activeVersion=" + activeVersion + ", newVersion=" + newVersion
                 + ", sourceDatabaseType='" + sourceDatabaseType + '\'' + ", targetDatabaseType='" + targetDatabaseType + '\''
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/yaml/PipelineDataSourceConfigurationSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/yaml/PipelineDataSourceConfigurationSwapper.java
new file mode 100644
index 00000000000..b80203f0172
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/yaml/PipelineDataSourceConfigurationSwapper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml;
+
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
+import org.apache.shardingsphere.infra.yaml.config.swapper.YamlConfigurationSwapper;
+
+/**
+ * Pipeline data source configuration YAML swapper.
+ */
+public final class PipelineDataSourceConfigurationSwapper implements YamlConfigurationSwapper<YamlPipelineDataSourceConfiguration, PipelineDataSourceConfiguration> {
+    
+    @Override
+    public YamlPipelineDataSourceConfiguration swapToYamlConfiguration(final PipelineDataSourceConfiguration data) {
+        if (null == data) {
+            return null;
+        }
+        YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration();
+        result.setType(data.getType());
+        result.setParameter(data.getParameter());
+        return result;
+    }
+    
+    @Override
+    public PipelineDataSourceConfiguration swapToObject(final YamlPipelineDataSourceConfiguration yamlConfig) {
+        if (null == yamlConfig) {
+            return null;
+        }
+        return PipelineDataSourceConfigurationFactory.newInstance(yamlConfig.getType(), yamlConfig.getParameter());
+    }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
index d189d128da5..c947e3874bd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.spi.rulealtered;
 
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
 import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
 import org.apache.shardingsphere.spi.type.required.RequiredSPI;
 
@@ -30,9 +31,9 @@ public interface RuleAlteredJobConfigurationPreparer extends RequiredSPI {
     /**
      * Extend job configuration.
      *
-     * @param jobConfig job configuration
+     * @param yamlJobConfig YAML job configuration
      */
-    void extendJobConfiguration(RuleAlteredJobConfiguration jobConfig);
+    void extendJobConfiguration(YamlRuleAlteredJobConfiguration yamlJobConfig);
     
     /**
      * Create task configuration, used by underlying scheduler.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java
index 22a227032a2..2d783b94831 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
 
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -26,22 +28,28 @@ import static org.junit.Assert.assertThat;
 
 public final class RuleAlteredJobConfigurationTest {
     
+    private static final RuleAlteredJobConfigurationSwapper JOB_CONFIG_SWAPPER = new RuleAlteredJobConfigurationSwapper();
+    
     @Test
     public void assertGetJobShardingCountByNull() {
-        assertThat(new RuleAlteredJobConfiguration().getJobShardingCount(), is(0));
+        YamlRuleAlteredJobConfiguration yamlJobConfig = new YamlRuleAlteredJobConfiguration();
+        RuleAlteredJobConfiguration jobConfig = JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
+        assertThat(jobConfig.getJobShardingCount(), is(0));
     }
     
     @Test
     public void assertGetJobShardingCount() {
-        RuleAlteredJobConfiguration jobConfig = new RuleAlteredJobConfiguration();
-        jobConfig.setJobShardingDataNodes(Arrays.asList("node1", "node2"));
+        YamlRuleAlteredJobConfiguration yamlJobConfig = new YamlRuleAlteredJobConfiguration();
+        yamlJobConfig.setJobShardingDataNodes(Arrays.asList("node1", "node2"));
+        RuleAlteredJobConfiguration jobConfig = JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
         assertThat(jobConfig.getJobShardingCount(), is(2));
     }
     
     @Test
     public void assertSplitLogicTableNames() {
-        RuleAlteredJobConfiguration jobConfig = new RuleAlteredJobConfiguration();
-        jobConfig.setLogicTables("foo_tbl,bar_tbl");
+        YamlRuleAlteredJobConfiguration yamlJobConfig = new YamlRuleAlteredJobConfiguration();
+        yamlJobConfig.setLogicTables("foo_tbl,bar_tbl");
+        RuleAlteredJobConfiguration jobConfig = JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
         assertThat(jobConfig.splitLogicTableNames(), is(Arrays.asList("foo_tbl", "bar_tbl")));
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 8a5fd5cb12b..56426ee3ab1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
@@ -102,8 +103,7 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
     
     @Override
     public Optional<String> start(final RuleAlteredJobConfiguration jobConfig) {
-        jobConfig.buildHandleConfig();
-        if (jobConfig.getJobShardingCount() == 0) {
+        if (0 == jobConfig.getJobShardingCount()) {
             log.warn("Invalid scaling job config!");
             throw new PipelineJobCreationException("handleConfig shardingTotalCount is 0");
         }
@@ -116,15 +116,15 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
             return Optional.of(jobId);
         }
         repositoryAPI.persist(String.format("%s/%s", DataPipelineConstants.DATA_PIPELINE_ROOT, jobId), RuleAlteredJob.class.getName());
-        repositoryAPI.persist(jobConfigKey, createJobConfig(jobConfig));
+        repositoryAPI.persist(jobConfigKey, createJobConfigText(jobConfig));
         return Optional.of(jobId);
     }
     
-    private String createJobConfig(final RuleAlteredJobConfiguration jobConfig) {
+    private String createJobConfigText(final RuleAlteredJobConfiguration jobConfig) {
         JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
         jobConfigPOJO.setJobName(jobConfig.getJobId());
         jobConfigPOJO.setShardingTotalCount(jobConfig.getJobShardingCount());
-        jobConfigPOJO.setJobParameter(YamlEngine.marshal(jobConfig));
+        jobConfigPOJO.setJobParameter(YamlEngine.marshal(new RuleAlteredJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
         jobConfigPOJO.getProps().setProperty("create_time", LocalDateTime.now().format(DATE_TIME_FORMATTER));
         return YamlEngine.marshal(jobConfigPOJO);
     }
@@ -365,7 +365,7 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
         return getJobConfig(getElasticJobConfigPOJO(jobId));
     }
     
-    private RuleAlteredJobConfiguration getJobConfig(final JobConfigurationPOJO elasticJobConfigPOJO) {
-        return YamlEngine.unmarshal(elasticJobConfigPOJO.getJobParameter(), RuleAlteredJobConfiguration.class, true);
+    private RuleAlteredJobConfiguration getJobConfig(final JobConfigurationPOJO jobConfigPOJO) {
+        return RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index 67c712cc391..eda9c1f476c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -27,7 +27,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAltere
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
@@ -155,8 +154,10 @@ public final class DataConsistencyChecker {
     }
     
     private Map<String, DataConsistencyContentCheckResult> checkData(final DataConsistencyCalculateAlgorithm calculator) {
-        PipelineDataSourceConfiguration sourceDataSourceConfig = getPipelineDataSourceConfiguration(calculator, jobConfig.getSource());
-        PipelineDataSourceConfiguration targetDataSourceConfig = getPipelineDataSourceConfiguration(calculator, jobConfig.getTarget());
+        decoratePipelineDataSourceConfiguration(calculator, jobConfig.getSource());
+        PipelineDataSourceConfiguration sourceDataSourceConfig = jobConfig.getSource();
+        decoratePipelineDataSourceConfiguration(calculator, jobConfig.getTarget());
+        PipelineDataSourceConfiguration targetDataSourceConfig = jobConfig.getTarget();
         ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobConfig.getJobId()) + "-data-check-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
         JobRateLimitAlgorithm inputRateLimitAlgorithm = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig).getInputRateLimitAlgorithm();
@@ -202,11 +203,9 @@ public final class DataConsistencyChecker {
         return result;
     }
     
-    private PipelineDataSourceConfiguration getPipelineDataSourceConfiguration(final DataConsistencyCalculateAlgorithm calculator, final YamlPipelineDataSourceConfiguration dataSourceConfig) {
-        PipelineDataSourceConfiguration result = PipelineDataSourceConfigurationFactory.newInstance(dataSourceConfig.getType(), dataSourceConfig.getParameter());
-        checkDatabaseTypeSupported(calculator.getSupportedDatabaseTypes(), result.getDatabaseType().getType());
-        addMySQLDataSourceConfig(result);
-        return result;
+    private void decoratePipelineDataSourceConfiguration(final DataConsistencyCalculateAlgorithm calculator, final PipelineDataSourceConfiguration dataSourceConfig) {
+        checkDatabaseTypeSupported(calculator.getSupportedDatabaseTypes(), dataSourceConfig.getDatabaseType().getType());
+        addMySQLDataSourceConfig(dataSourceConfig);
     }
     
     private void checkDatabaseTypeSupported(final Collection<String> supportedDatabaseTypes, final String databaseType) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index a3d988f01a8..51321db57f5 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -18,9 +18,10 @@
 package org.apache.shardingsphere.data.pipeline.core.execute;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
@@ -47,6 +48,8 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
     
     private static final Pattern CONFIG_PATTERN = Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT + "/(\\d{2}[0-9a-f]+)/config");
     
+    private static final RuleAlteredJobConfigurationSwapper JOB_CONFIG_SWAPPER = new RuleAlteredJobConfigurationSwapper();
+    
     @Override
     protected void doStart() {
         watchGovernanceRepositoryConfiguration();
@@ -66,7 +69,7 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
                 log.info("jobId={}, deleted={}, disabled={}", jobConfigPOJO.getJobName(), deleted, disabled);
                 RuleAlteredJobSchedulerCenter.stop(jobConfigPOJO.getJobName());
                 // TODO refactor: dispatch to different job types
-                RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), RuleAlteredJobConfiguration.class, true);
+                RuleAlteredJobConfiguration jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
                 if (deleted) {
                     new RuleAlteredJobPreparer().cleanup(jobConfig);
                 } else if (RuleAlteredJobProgressDetector.isJobSuccessful(jobConfig.getJobShardingCount(), ruleAlteredJobAPI.getProgress(jobConfig).values())) {
@@ -80,7 +83,7 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
             switch (event.getType()) {
                 case ADDED:
                 case UPDATED:
-                    RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), RuleAlteredJobConfiguration.class, true);
+                    RuleAlteredJobConfiguration jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
                     String databaseName = jobConfig.getDatabaseName();
                     if (PipelineSimpleLock.getInstance().tryLock(databaseName, 1000)) {
                         execute(jobConfigPOJO);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index 870a0339573..724c70fca0a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -19,9 +19,10 @@ package org.apache.shardingsphere.data.pipeline.core.job;
 
 import io.vertx.core.impl.ConcurrentHashSet;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
@@ -32,7 +33,6 @@ import org.apache.shardingsphere.data.pipeline.spi.lock.RowBasedJobLock;
 import org.apache.shardingsphere.data.pipeline.spi.lock.RuleBasedJobLock;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 
 import java.util.List;
 import java.util.Map;
@@ -64,7 +64,7 @@ public final class FinishedCheckJob implements SimpleJob {
             onCheckJobIds.add(jobId);
             try {
                 // TODO refactor: dispatch to different job types
-                RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(jobInfo.getJobParameter(), RuleAlteredJobConfiguration.class, true);
+                RuleAlteredJobConfiguration jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject(jobInfo.getJobParameter());
                 RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
                 if (null == ruleAlteredContext.getCompletionDetectAlgorithm()) {
                     log.info("completionDetector not configured, auto switch will not be enabled. You could query job progress and switch config manually with DistSQL.");
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index bb368119b3f..cbfa0940ef2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -19,13 +19,13 @@ package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingReleaseDatabaseLevelLockEvent;
 
 /**
@@ -42,7 +42,7 @@ public final class RuleAlteredJob implements SimpleJob {
     @Override
     public void execute(final ShardingContext shardingContext) {
         log.info("Execute job {}-{}", shardingContext.getJobName(), shardingContext.getShardingItem());
-        RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(shardingContext.getJobParameter(), RuleAlteredJobConfiguration.class, true);
+        RuleAlteredJobConfiguration jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
         RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig, shardingContext.getShardingItem());
         jobContext.setInitProgress(governanceRepositoryAPI.getJobProgress(jobContext.getJobId(), jobContext.getShardingItem()));
         jobContext.setJobPreparer(jobPreparer);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
index 6f1550d7bab..35059e626c7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
@@ -86,7 +86,6 @@ public final class RuleAlteredJobContext {
     public RuleAlteredJobContext(final RuleAlteredJobConfiguration jobConfig, final int jobShardingItem) {
         ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
         this.jobConfig = jobConfig;
-        jobConfig.buildHandleConfig();
         jobId = jobConfig.getJobId();
         this.shardingItem = jobShardingItem;
         taskConfig = RuleAlteredJobWorker.buildTaskConfig(jobConfig, jobShardingItem, ruleAlteredContext.getOnRuleAlteredActionConfig());
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index a9ea71faba6..694dc084a3c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -24,6 +24,8 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
@@ -204,14 +206,15 @@ public final class RuleAlteredJobWorker {
             log.error("more than 1 rule altered");
             throw new PipelineJobCreationException("more than 1 rule altered");
         }
-        RuleAlteredJobConfiguration result = new RuleAlteredJobConfiguration();
+        YamlRuleAlteredJobConfiguration result = new YamlRuleAlteredJobConfiguration();
         result.setDatabaseName(event.getDatabaseName());
         result.setAlteredRuleYamlClassNameTablesMap(alteredRuleYamlClassNameTablesMap);
         result.setActiveVersion(event.getActiveVersion());
         result.setNewVersion(event.getNewVersion());
         result.setSource(createYamlPipelineDataSourceConfiguration(sourceRootConfig));
         result.setTarget(createYamlPipelineDataSourceConfiguration(targetRootConfig));
-        return Optional.of(result);
+        result.extendConfiguration();
+        return Optional.of(new RuleAlteredJobConfigurationSwapper().swapToObject(result));
     }
     
     private Collection<Pair<YamlRuleConfiguration, YamlRuleConfiguration>> groupSourceTargetRuleConfigsByType(final Collection<YamlRuleConfiguration> sourceRules,
@@ -289,8 +292,8 @@ public final class RuleAlteredJobWorker {
                     .allMatch(progress -> null != progress && progress.getStatus().equals(JobStatus.FINISHED))) {
                 continue;
             }
-            RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(each.getJobParameter(), RuleAlteredJobConfiguration.class, true);
-            if (hasUncompletedJobOfSameDatabaseName(jobConfig, databaseName)) {
+            RuleAlteredJobConfiguration jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject(each.getJobParameter());
+            if (databaseName.equals(jobConfig.getDatabaseName())) {
                 result = true;
                 break;
             }
@@ -298,10 +301,6 @@ public final class RuleAlteredJobWorker {
         return result;
     }
     
-    private boolean hasUncompletedJobOfSameDatabaseName(final RuleAlteredJobConfiguration jobConfig, final String currentDatabaseName) {
-        return currentDatabaseName.equals(jobConfig.getDatabaseName());
-    }
-    
     /**
      * scaling release database level lock.
      *
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
index 61c31562e35..1e963116474 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
@@ -21,8 +21,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
@@ -50,7 +50,7 @@ public final class ScalingEnvironmentManager {
     public void cleanupTargetTables(final RuleAlteredJobConfiguration jobConfig) throws SQLException {
         Collection<String> tables = jobConfig.splitLogicTableNames();
         log.info("cleanupTargetTables, tables={}", tables);
-        YamlPipelineDataSourceConfiguration target = jobConfig.getTarget();
+        PipelineDataSourceConfiguration target = jobConfig.getTarget();
         PipelineSQLBuilder pipelineSQLBuilder = PipelineSQLBuilderFactory.getInstance(jobConfig.getTargetDatabaseType());
         ShardingSphereMetaData metaData = PipelineContext.getContextManager().getMetaDataContexts().getMetaData(jobConfig.getDatabaseName());
         TableNameSchemaNameMapping tableNameSchemaNameMapping = new TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(metaData.getSchemas()));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
index 96db014402d..140528e89bd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
@@ -20,9 +20,9 @@ package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
@@ -56,10 +56,10 @@ public final class MySQLDataSourcePreparerTest {
     private RuleAlteredJobConfiguration jobConfig;
     
     @Mock
-    private YamlPipelineDataSourceConfiguration sourceYamlPipelineDataSourceConfig;
+    private PipelineDataSourceConfiguration sourcePipelineDataSourceConfig;
     
     @Mock
-    private YamlPipelineDataSourceConfiguration targetYamlPipelineDataSourceConfig;
+    private PipelineDataSourceConfiguration targetPipelineDataSourceConfig;
     
     @Mock
     private ShardingSpherePipelineDataSourceConfiguration sourceScalingDataSourceConfig;
@@ -79,10 +79,10 @@ public final class MySQLDataSourcePreparerTest {
         when(mockPipelineDataSourceManager.getDataSource(same(sourceScalingDataSourceConfig))).thenReturn(sourceDataSourceWrapper);
         when(mockPipelineDataSourceManager.getDataSource(same(targetScalingDataSourceConfig))).thenReturn(targetDataSourceWrapper);
         when(prepareTargetTablesParameter.getDataSourceManager()).thenReturn(mockPipelineDataSourceManager);
-        when(jobConfig.getSource()).thenReturn(sourceYamlPipelineDataSourceConfig);
+        when(jobConfig.getSource()).thenReturn(sourcePipelineDataSourceConfig);
         when(jobConfig.getSource().getType()).thenReturn("ShardingSphereJDBC");
         when(jobConfig.getSource().getParameter()).thenReturn("source");
-        when(jobConfig.getTarget()).thenReturn(targetYamlPipelineDataSourceConfig);
+        when(jobConfig.getTarget()).thenReturn(targetPipelineDataSourceConfig);
         when(jobConfig.getTarget().getType()).thenReturn("ShardingSphereJDBC");
         when(jobConfig.getTarget().getParameter()).thenReturn("target");
         when(prepareTargetTablesParameter.getJobConfig()).thenReturn(jobConfig);
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java
index 30a8a7ff919..4de500bd417 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.integration.data.pipeline.env;
 import com.google.gson.Gson;
 import lombok.Getter;
 import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
@@ -81,7 +81,7 @@ public final class ITEnvironmentContext {
     }
     
     private static String createScalingConfiguration(final Map<String, YamlTableRuleConfiguration> tableRules) {
-        RuleAlteredJobConfiguration jobConfig = new RuleAlteredJobConfiguration();
+        YamlRuleAlteredJobConfiguration jobConfig = new YamlRuleAlteredJobConfiguration();
         jobConfig.setSource(createYamlPipelineDataSourceConfiguration(SourceConfiguration.getDockerConfiguration(tableRules)));
         jobConfig.setTarget(createYamlPipelineDataSourceConfiguration(TargetConfiguration.getDockerConfiguration()));
         return new Gson().toJson(jobConfig);
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
index 4b3fe083426..1900339730a 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -20,6 +20,8 @@ package org.apache.shardingsphere.data.pipeline.core.util;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
@@ -44,7 +46,7 @@ public final class JobConfigurationBuilder {
      * @return created job configuration
      */
     public static RuleAlteredJobConfiguration createJobConfiguration() {
-        RuleAlteredJobConfiguration result = new RuleAlteredJobConfiguration();
+        YamlRuleAlteredJobConfiguration result = new YamlRuleAlteredJobConfiguration();
         result.setDatabaseName("logic_db");
         result.setAlteredRuleYamlClassNameTablesMap(Collections.singletonMap(YamlShardingRuleConfiguration.class.getName(), Collections.singletonList("t_order")));
         result.setActiveVersion(0);
@@ -53,10 +55,10 @@ public final class JobConfigurationBuilder {
         result.setSource(createYamlPipelineDataSourceConfiguration(
                 new ShardingSpherePipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_sharding_sphere_jdbc_source.yaml"))));
         result.setTarget(createYamlPipelineDataSourceConfiguration(new StandardPipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_standard_jdbc_target.yaml"))));
-        result.buildHandleConfig();
+        result.extendConfiguration();
         int activeVersion = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 10) + 1;
         result.setJobId(generateJobId(activeVersion, "logic_db"));
-        return result;
+        return new RuleAlteredJobConfigurationSwapper().swapToObject(result);
     }
     
     private static YamlPipelineDataSourceConfiguration createYamlPipelineDataSourceConfiguration(final PipelineDataSourceConfiguration config) {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
deleted file mode 100644
index 1600b0127f1..00000000000
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
-
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
-import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Map;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-
-public final class RuleAlteredJobTest {
-    
-    @BeforeClass
-    public static void beforeClass() {
-        PipelineContextUtil.mockModeConfigAndContextManager();
-    }
-    
-    @Test
-    @SuppressWarnings("unchecked")
-    @SneakyThrows(ReflectiveOperationException.class)
-    public void assertExecute() {
-        RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
-        initTableData(jobConfig);
-        ShardingContext shardingContext = new ShardingContext("1", null, 2, YamlEngine.marshal(jobConfig), 0, null);
-        new RuleAlteredJob().execute(shardingContext);
-        Map<String, RuleAlteredJobScheduler> jobSchedulerMap = ReflectionUtil.getStaticFieldValue(RuleAlteredJobSchedulerCenter.class, "JOB_SCHEDULER_MAP", Map.class);
-        assertNotNull(jobSchedulerMap);
-        assertFalse(jobSchedulerMap.isEmpty());
-    }
-    
-    @SneakyThrows(SQLException.class)
-    private void initTableData(final RuleAlteredJobConfiguration jobConfig) {
-        YamlPipelineDataSourceConfiguration source = jobConfig.getSource();
-        try (
-                PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(source.getType(), source.getParameter()));
-                Connection connection = dataSource.getConnection();
-                Statement statement = connection.createStatement()) {
-            statement.execute("DROP TABLE IF EXISTS t_order");
-            statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id VARCHAR(12))");
-            statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
-        }
-    }
-}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
index 577dee68b64..2ed00596652 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
@@ -19,6 +19,8 @@ package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
@@ -56,8 +58,10 @@ public final class RuleAlteredJobWorkerTest {
     @Test(expected = PipelineJobCreationException.class)
     public void assertCreateRuleAlteredContextNoAlteredRule() {
         RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
-        jobConfig.setAlteredRuleYamlClassNameTablesMap(Collections.emptyMap());
-        RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+        RuleAlteredJobConfigurationSwapper swapper = new RuleAlteredJobConfigurationSwapper();
+        YamlRuleAlteredJobConfiguration yamlJobConfig = swapper.swapToYamlConfiguration(jobConfig);
+        yamlJobConfig.setAlteredRuleYamlClassNameTablesMap(Collections.emptyMap());
+        RuleAlteredJobWorker.createRuleAlteredContext(swapper.swapToObject(yamlJobConfig));
     }
     
     @Test