You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/05/16 03:11:23 UTC
[shardingsphere] branch master updated: Refactor and make RuleAlteredJobConfiguration immutable (#17682)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new fa4a9d61b73 Refactor and make RuleAlteredJobConfiguration immutable (#17682)
fa4a9d61b73 is described below
commit fa4a9d61b730c9b242a2f3c6d4d8ebcb40f443bc
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Mon May 16 11:11:18 2022 +0800
Refactor and make RuleAlteredJobConfiguration immutable (#17682)
---
...hardingRuleAlteredJobConfigurationPreparer.java | 14 +--
.../rulealtered/RuleAlteredJobConfiguration.java | 115 ++++-----------------
.../yaml/RuleAlteredJobConfigurationSwapper.java | 76 ++++++++++++++
.../YamlRuleAlteredJobConfiguration.java} | 61 ++++-------
.../PipelineDataSourceConfigurationSwapper.java | 47 +++++++++
.../RuleAlteredJobConfigurationPreparer.java | 5 +-
.../RuleAlteredJobConfigurationTest.java | 18 +++-
.../core/api/impl/RuleAlteredJobAPIImpl.java | 14 +--
.../check/consistency/DataConsistencyChecker.java | 15 ++-
.../pipeline/core/execute/PipelineJobExecutor.java | 9 +-
.../data/pipeline/core/job/FinishedCheckJob.java | 6 +-
.../scenario/rulealtered/RuleAlteredJob.java | 4 +-
.../rulealtered/RuleAlteredJobContext.java | 1 -
.../scenario/rulealtered/RuleAlteredJobWorker.java | 15 ++-
.../job/environment/ScalingEnvironmentManager.java | 4 +-
.../datasource/MySQLDataSourcePreparerTest.java | 10 +-
.../data/pipeline/env/ITEnvironmentContext.java | 4 +-
.../core/util/JobConfigurationBuilder.java | 8 +-
.../scenario/rulealtered/RuleAlteredJobTest.java | 74 -------------
.../rulealtered/RuleAlteredJobWorkerTest.java | 8 +-
20 files changed, 238 insertions(+), 270 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
index 9121aec9cf1..aa13d53a231 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -24,6 +24,8 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -73,14 +75,14 @@ import java.util.stream.Collectors;
public final class ShardingRuleAlteredJobConfigurationPreparer implements RuleAlteredJobConfigurationPreparer {
@Override
- public void extendJobConfiguration(final RuleAlteredJobConfiguration jobConfig) {
- Map<String, List<DataNode>> shouldScalingActualDataNodes = getShouldScalingActualDataNodes(jobConfig);
- jobConfig.setJobShardingDataNodes(getJobShardingDataNodes(shouldScalingActualDataNodes));
- jobConfig.setLogicTables(getLogicTables(shouldScalingActualDataNodes.keySet()));
- jobConfig.setTablesFirstDataNodes(getTablesFirstDataNodes(shouldScalingActualDataNodes));
+ public void extendJobConfiguration(final YamlRuleAlteredJobConfiguration yamlJobConfig) {
+ Map<String, List<DataNode>> actualDataNodes = getActualDataNodes(new RuleAlteredJobConfigurationSwapper().swapToObject(yamlJobConfig));
+ yamlJobConfig.setJobShardingDataNodes(getJobShardingDataNodes(actualDataNodes));
+ yamlJobConfig.setLogicTables(getLogicTables(actualDataNodes.keySet()));
+ yamlJobConfig.setTablesFirstDataNodes(getTablesFirstDataNodes(actualDataNodes));
}
- private static Map<String, List<DataNode>> getShouldScalingActualDataNodes(final RuleAlteredJobConfiguration jobConfig) {
+ private static Map<String, List<DataNode>> getActualDataNodes(final RuleAlteredJobConfiguration jobConfig) {
PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
ShardingSpherePipelineDataSourceConfiguration source = (ShardingSpherePipelineDataSourceConfiguration) sourceDataSourceConfig;
ShardingRuleConfiguration sourceRuleConfig = ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(source.getRootConfig().getRules());
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
index 9565b9539c8..b32411d9d19 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
@@ -17,59 +17,46 @@
package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
-import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-import lombok.AllArgsConstructor;
import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobSubType;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
- * Scaling job configuration.
+ * Rule altered job configuration.
*/
-@NoArgsConstructor
-@AllArgsConstructor
+@RequiredArgsConstructor
@Getter
-@Setter
@Slf4j
-// TODO share for totally new scenario
-// TODO rename to Yaml, add config class
public final class RuleAlteredJobConfiguration implements PipelineJobConfiguration {
- private String jobId;
+ private final String jobId;
- private String databaseName;
+ private final String databaseName;
- /**
- * Map{altered rule yaml class name, re-shard needed table names}.
- */
- private Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
+ private final Integer activeVersion;
+
+ private final Integer newVersion;
- private Integer activeVersion;
+ private final String sourceDatabaseType;
- private Integer newVersion;
+ private final String targetDatabaseType;
- private YamlPipelineDataSourceConfiguration source;
+ private final PipelineDataSourceConfiguration source;
- private YamlPipelineDataSourceConfiguration target;
+ private final PipelineDataSourceConfiguration target;
- private int concurrency = 3;
+ /**
+ * Map{altered rule yaml class name, re-shard needed table names}.
+ */
+ private final Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
- private int retryTimes = 3;
+ private final String logicTables;
/**
* Collection of each logic table's first data node.
@@ -78,41 +65,13 @@ public final class RuleAlteredJobConfiguration implements PipelineJobConfigurati
* then value may be: {@code t_order:ds_0.t_order_0|t_order_item:ds_0.t_order_item_0}.
* </p>
*/
- private String tablesFirstDataNodes;
+ private final String tablesFirstDataNodes;
- private List<String> jobShardingDataNodes;
-
- private String logicTables;
-
- private String sourceDatabaseType;
-
- private String targetDatabaseType;
-
- /**
- * Set source.
- *
- * @param source source configuration
- */
- public void setSource(final YamlPipelineDataSourceConfiguration source) {
- checkParameters(source);
- this.source = source;
- }
+ private final List<String> jobShardingDataNodes;
- /**
- * Set target.
- *
- * @param target target configuration
- */
- public void setTarget(final YamlPipelineDataSourceConfiguration target) {
- checkParameters(target);
- this.target = target;
- }
+ private final int concurrency;
- private void checkParameters(final YamlPipelineDataSourceConfiguration yamlConfig) {
- Preconditions.checkNotNull(yamlConfig);
- Preconditions.checkNotNull(yamlConfig.getType());
- Preconditions.checkNotNull(yamlConfig.getParameter());
- }
+ private final int retryTimes;
/**
* Get job sharding count.
@@ -132,38 +91,6 @@ public final class RuleAlteredJobConfiguration implements PipelineJobConfigurati
return Splitter.on(',').splitToList(logicTables);
}
- /**
- * Build handle configuration.
- */
- public void buildHandleConfig() {
- if (null == getJobShardingDataNodes()) {
- RuleAlteredJobConfigurationPreparerFactory.getInstance().extendJobConfiguration(this);
- }
- if (null == jobId) {
- jobId = generateJobId();
- }
- if (Strings.isNullOrEmpty(getSourceDatabaseType())) {
- PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(source.getType(), source.getParameter());
- setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType());
- }
- if (Strings.isNullOrEmpty(getTargetDatabaseType())) {
- PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(target.getType(), target.getParameter());
- setTargetDatabaseType(targetDataSourceConfig.getDatabaseType().getType());
- }
- }
-
- private String generateJobId() {
- RuleAlteredJobId jobId = new RuleAlteredJobId();
- // TODO type, subTypes
- jobId.setType(JobType.RULE_ALTERED.getValue());
- jobId.setFormatVersion(RuleAlteredJobId.CURRENT_VERSION);
- jobId.setSubTypes(Collections.singletonList(JobSubType.SCALING.getValue()));
- jobId.setCurrentMetadataVersion(activeVersion);
- jobId.setNewMetadataVersion(newVersion);
- jobId.setDatabaseName(databaseName);
- return jobId.marshal();
- }
-
@Override
public String toString() {
return "RuleAlteredJobConfiguration{"
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java
new file mode 100644
index 00000000000..df11b1661e9
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml;
+
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.PipelineDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.infra.yaml.config.swapper.YamlConfigurationSwapper;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
+
+/**
+ * Rule altered job configuration swapper.
+ */
+// TODO add RuleAlteredJobConfigurationSwapper test
+public final class RuleAlteredJobConfigurationSwapper implements YamlConfigurationSwapper<YamlRuleAlteredJobConfiguration, RuleAlteredJobConfiguration> {
+
+ private static final RuleAlteredJobConfigurationSwapper JOB_CONFIG_SWAPPER = new RuleAlteredJobConfigurationSwapper();
+
+ private final PipelineDataSourceConfigurationSwapper dataSourceConfigSwapper = new PipelineDataSourceConfigurationSwapper();
+
+ @Override
+ public YamlRuleAlteredJobConfiguration swapToYamlConfiguration(final RuleAlteredJobConfiguration data) {
+ YamlRuleAlteredJobConfiguration result = new YamlRuleAlteredJobConfiguration();
+ result.setJobId(data.getJobId());
+ result.setDatabaseName(data.getDatabaseName());
+ result.setActiveVersion(data.getActiveVersion());
+ result.setNewVersion(data.getNewVersion());
+ result.setSourceDatabaseType(data.getSourceDatabaseType());
+ result.setTargetDatabaseType(data.getTargetDatabaseType());
+ result.setSource(dataSourceConfigSwapper.swapToYamlConfiguration(data.getSource()));
+ result.setTarget(dataSourceConfigSwapper.swapToYamlConfiguration(data.getTarget()));
+ result.setAlteredRuleYamlClassNameTablesMap(data.getAlteredRuleYamlClassNameTablesMap());
+ result.setLogicTables(data.getLogicTables());
+ result.setTablesFirstDataNodes(data.getTablesFirstDataNodes());
+ result.setJobShardingDataNodes(data.getJobShardingDataNodes());
+ result.setConcurrency(data.getConcurrency());
+ result.setRetryTimes(data.getRetryTimes());
+ return result;
+ }
+
+ @Override
+ public RuleAlteredJobConfiguration swapToObject(final YamlRuleAlteredJobConfiguration yamlConfig) {
+ return new RuleAlteredJobConfiguration(yamlConfig.getJobId(), yamlConfig.getDatabaseName(),
+ yamlConfig.getActiveVersion(), yamlConfig.getNewVersion(),
+ yamlConfig.getSourceDatabaseType(), yamlConfig.getTargetDatabaseType(),
+ dataSourceConfigSwapper.swapToObject(yamlConfig.getSource()), dataSourceConfigSwapper.swapToObject(yamlConfig.getTarget()),
+ yamlConfig.getAlteredRuleYamlClassNameTablesMap(), yamlConfig.getLogicTables(),
+ yamlConfig.getTablesFirstDataNodes(), yamlConfig.getJobShardingDataNodes(),
+ yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
+ }
+
+ /**
+ * Swap to job configuration from text.
+ *
+ * @param jobParameter job parameter
+ * @return job configuration
+ */
+ public static RuleAlteredJobConfiguration swapToObject(final String jobParameter) {
+ YamlRuleAlteredJobConfiguration yamlJobConfig = YamlEngine.unmarshal(jobParameter, YamlRuleAlteredJobConfiguration.class, true);
+ return JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
similarity index 84%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
index 9565b9539c8..a7b0cd9c7aa 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
@@ -15,17 +15,13 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
+package org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml;
import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
import com.google.common.base.Strings;
-import lombok.AllArgsConstructor;
import lombok.Getter;
-import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
@@ -33,43 +29,42 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobSubType;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlConfiguration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
- * Scaling job configuration.
+ * Rule altered job configuration for YAML.
*/
-@NoArgsConstructor
-@AllArgsConstructor
@Getter
@Setter
@Slf4j
-// TODO share for totally new scenario
-// TODO rename to Yaml, add config class
-public final class RuleAlteredJobConfiguration implements PipelineJobConfiguration {
+public final class YamlRuleAlteredJobConfiguration implements YamlConfiguration {
private String jobId;
private String databaseName;
- /**
- * Map{altered rule yaml class name, re-shard needed table names}.
- */
- private Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
-
private Integer activeVersion;
private Integer newVersion;
+ private String sourceDatabaseType;
+
+ private String targetDatabaseType;
+
private YamlPipelineDataSourceConfiguration source;
private YamlPipelineDataSourceConfiguration target;
- private int concurrency = 3;
+ /**
+ * Map{altered rule yaml class name, re-shard needed table names}.
+ */
+ private Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
- private int retryTimes = 3;
+ private String logicTables;
/**
* Collection of each logic table's first data node.
@@ -82,11 +77,9 @@ public final class RuleAlteredJobConfiguration implements PipelineJobConfigurati
private List<String> jobShardingDataNodes;
- private String logicTables;
-
- private String sourceDatabaseType;
+ private int concurrency = 3;
- private String targetDatabaseType;
+ private int retryTimes = 3;
/**
* Set source.
@@ -115,27 +108,9 @@ public final class RuleAlteredJobConfiguration implements PipelineJobConfigurati
}
/**
- * Get job sharding count.
- *
- * @return job sharding count
- */
- public int getJobShardingCount() {
- return null == jobShardingDataNodes ? 0 : jobShardingDataNodes.size();
- }
-
- /**
- * Split {@linkplain #logicTables} to logic table names.
- *
- * @return logic table names
- */
- public List<String> splitLogicTableNames() {
- return Splitter.on(',').splitToList(logicTables);
- }
-
- /**
- * Build handle configuration.
+ * Extend configuration.
*/
- public void buildHandleConfig() {
+ public void extendConfiguration() {
if (null == getJobShardingDataNodes()) {
RuleAlteredJobConfigurationPreparerFactory.getInstance().extendJobConfiguration(this);
}
@@ -166,7 +141,7 @@ public final class RuleAlteredJobConfiguration implements PipelineJobConfigurati
@Override
public String toString() {
- return "RuleAlteredJobConfiguration{"
+ return "YamlRuleAlteredJobConfiguration{"
+ "jobId='" + jobId + '\'' + ", databaseName='" + databaseName + '\''
+ ", activeVersion=" + activeVersion + ", newVersion=" + newVersion
+ ", sourceDatabaseType='" + sourceDatabaseType + '\'' + ", targetDatabaseType='" + targetDatabaseType + '\''
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/yaml/PipelineDataSourceConfigurationSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/yaml/PipelineDataSourceConfigurationSwapper.java
new file mode 100644
index 00000000000..b80203f0172
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/yaml/PipelineDataSourceConfigurationSwapper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml;
+
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
+import org.apache.shardingsphere.infra.yaml.config.swapper.YamlConfigurationSwapper;
+
+/**
+ * Pipeline data source configuration YAML swapper.
+ */
+public final class PipelineDataSourceConfigurationSwapper implements YamlConfigurationSwapper<YamlPipelineDataSourceConfiguration, PipelineDataSourceConfiguration> {
+
+ @Override
+ public YamlPipelineDataSourceConfiguration swapToYamlConfiguration(final PipelineDataSourceConfiguration data) {
+ if (null == data) {
+ return null;
+ }
+ YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration();
+ result.setType(data.getType());
+ result.setParameter(data.getParameter());
+ return result;
+ }
+
+ @Override
+ public PipelineDataSourceConfiguration swapToObject(final YamlPipelineDataSourceConfiguration yamlConfig) {
+ if (null == yamlConfig) {
+ return null;
+ }
+ return PipelineDataSourceConfigurationFactory.newInstance(yamlConfig.getType(), yamlConfig.getParameter());
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
index d189d128da5..c947e3874bd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.spi.rulealtered;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
import org.apache.shardingsphere.spi.type.required.RequiredSPI;
@@ -30,9 +31,9 @@ public interface RuleAlteredJobConfigurationPreparer extends RequiredSPI {
/**
* Extend job configuration.
*
- * @param jobConfig job configuration
+ * @param yamlJobConfig YAML job configuration
*/
- void extendJobConfiguration(RuleAlteredJobConfiguration jobConfig);
+ void extendJobConfiguration(YamlRuleAlteredJobConfiguration yamlJobConfig);
/**
* Create task configuration, used by underlying scheduler.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java
index 22a227032a2..2d783b94831 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java
@@ -17,6 +17,8 @@
package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
import org.junit.Test;
import java.util.Arrays;
@@ -26,22 +28,28 @@ import static org.junit.Assert.assertThat;
public final class RuleAlteredJobConfigurationTest {
+ private static final RuleAlteredJobConfigurationSwapper JOB_CONFIG_SWAPPER = new RuleAlteredJobConfigurationSwapper();
+
@Test
public void assertGetJobShardingCountByNull() {
- assertThat(new RuleAlteredJobConfiguration().getJobShardingCount(), is(0));
+ YamlRuleAlteredJobConfiguration yamlJobConfig = new YamlRuleAlteredJobConfiguration();
+ RuleAlteredJobConfiguration jobConfig = JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
+ assertThat(jobConfig.getJobShardingCount(), is(0));
}
@Test
public void assertGetJobShardingCount() {
- RuleAlteredJobConfiguration jobConfig = new RuleAlteredJobConfiguration();
- jobConfig.setJobShardingDataNodes(Arrays.asList("node1", "node2"));
+ YamlRuleAlteredJobConfiguration yamlJobConfig = new YamlRuleAlteredJobConfiguration();
+ yamlJobConfig.setJobShardingDataNodes(Arrays.asList("node1", "node2"));
+ RuleAlteredJobConfiguration jobConfig = JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
assertThat(jobConfig.getJobShardingCount(), is(2));
}
@Test
public void assertSplitLogicTableNames() {
- RuleAlteredJobConfiguration jobConfig = new RuleAlteredJobConfiguration();
- jobConfig.setLogicTables("foo_tbl,bar_tbl");
+ YamlRuleAlteredJobConfiguration yamlJobConfig = new YamlRuleAlteredJobConfiguration();
+ yamlJobConfig.setLogicTables("foo_tbl,bar_tbl");
+ RuleAlteredJobConfiguration jobConfig = JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
assertThat(jobConfig.splitLogicTableNames(), is(Arrays.asList("foo_tbl", "bar_tbl")));
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 8a5fd5cb12b..56426ee3ab1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
@@ -102,8 +103,7 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
@Override
public Optional<String> start(final RuleAlteredJobConfiguration jobConfig) {
- jobConfig.buildHandleConfig();
- if (jobConfig.getJobShardingCount() == 0) {
+ if (0 == jobConfig.getJobShardingCount()) {
log.warn("Invalid scaling job config!");
throw new PipelineJobCreationException("handleConfig shardingTotalCount is 0");
}
@@ -116,15 +116,15 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
return Optional.of(jobId);
}
repositoryAPI.persist(String.format("%s/%s", DataPipelineConstants.DATA_PIPELINE_ROOT, jobId), RuleAlteredJob.class.getName());
- repositoryAPI.persist(jobConfigKey, createJobConfig(jobConfig));
+ repositoryAPI.persist(jobConfigKey, createJobConfigText(jobConfig));
return Optional.of(jobId);
}
- private String createJobConfig(final RuleAlteredJobConfiguration jobConfig) {
+ private String createJobConfigText(final RuleAlteredJobConfiguration jobConfig) {
JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
jobConfigPOJO.setJobName(jobConfig.getJobId());
jobConfigPOJO.setShardingTotalCount(jobConfig.getJobShardingCount());
- jobConfigPOJO.setJobParameter(YamlEngine.marshal(jobConfig));
+ jobConfigPOJO.setJobParameter(YamlEngine.marshal(new RuleAlteredJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
jobConfigPOJO.getProps().setProperty("create_time", LocalDateTime.now().format(DATE_TIME_FORMATTER));
return YamlEngine.marshal(jobConfigPOJO);
}
@@ -365,7 +365,7 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
return getJobConfig(getElasticJobConfigPOJO(jobId));
}
- private RuleAlteredJobConfiguration getJobConfig(final JobConfigurationPOJO elasticJobConfigPOJO) {
- return YamlEngine.unmarshal(elasticJobConfigPOJO.getJobParameter(), RuleAlteredJobConfiguration.class, true);
+ private RuleAlteredJobConfiguration getJobConfig(final JobConfigurationPOJO jobConfigPOJO) {
+ return RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index 67c712cc391..eda9c1f476c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -27,7 +27,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAltere
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
@@ -155,8 +154,10 @@ public final class DataConsistencyChecker {
}
private Map<String, DataConsistencyContentCheckResult> checkData(final DataConsistencyCalculateAlgorithm calculator) {
- PipelineDataSourceConfiguration sourceDataSourceConfig = getPipelineDataSourceConfiguration(calculator, jobConfig.getSource());
- PipelineDataSourceConfiguration targetDataSourceConfig = getPipelineDataSourceConfiguration(calculator, jobConfig.getTarget());
+ decoratePipelineDataSourceConfiguration(calculator, jobConfig.getSource());
+ PipelineDataSourceConfiguration sourceDataSourceConfig = jobConfig.getSource();
+ decoratePipelineDataSourceConfiguration(calculator, jobConfig.getTarget());
+ PipelineDataSourceConfiguration targetDataSourceConfig = jobConfig.getTarget();
ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobConfig.getJobId()) + "-data-check-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
JobRateLimitAlgorithm inputRateLimitAlgorithm = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig).getInputRateLimitAlgorithm();
@@ -202,11 +203,9 @@ public final class DataConsistencyChecker {
return result;
}
- private PipelineDataSourceConfiguration getPipelineDataSourceConfiguration(final DataConsistencyCalculateAlgorithm calculator, final YamlPipelineDataSourceConfiguration dataSourceConfig) {
- PipelineDataSourceConfiguration result = PipelineDataSourceConfigurationFactory.newInstance(dataSourceConfig.getType(), dataSourceConfig.getParameter());
- checkDatabaseTypeSupported(calculator.getSupportedDatabaseTypes(), result.getDatabaseType().getType());
- addMySQLDataSourceConfig(result);
- return result;
+ private void decoratePipelineDataSourceConfiguration(final DataConsistencyCalculateAlgorithm calculator, final PipelineDataSourceConfiguration dataSourceConfig) {
+ checkDatabaseTypeSupported(calculator.getSupportedDatabaseTypes(), dataSourceConfig.getDatabaseType().getType());
+ addMySQLDataSourceConfig(dataSourceConfig);
}
private void checkDatabaseTypeSupported(final Collection<String> supportedDatabaseTypes, final String databaseType) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index a3d988f01a8..51321db57f5 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -18,9 +18,10 @@
package org.apache.shardingsphere.data.pipeline.core.execute;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
@@ -47,6 +48,8 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
private static final Pattern CONFIG_PATTERN = Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT + "/(\\d{2}[0-9a-f]+)/config");
+ private static final RuleAlteredJobConfigurationSwapper JOB_CONFIG_SWAPPER = new RuleAlteredJobConfigurationSwapper();
+
@Override
protected void doStart() {
watchGovernanceRepositoryConfiguration();
@@ -66,7 +69,7 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
log.info("jobId={}, deleted={}, disabled={}", jobConfigPOJO.getJobName(), deleted, disabled);
RuleAlteredJobSchedulerCenter.stop(jobConfigPOJO.getJobName());
// TODO refactor: dispatch to different job types
- RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), RuleAlteredJobConfiguration.class, true);
+ RuleAlteredJobConfiguration jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
if (deleted) {
new RuleAlteredJobPreparer().cleanup(jobConfig);
} else if (RuleAlteredJobProgressDetector.isJobSuccessful(jobConfig.getJobShardingCount(), ruleAlteredJobAPI.getProgress(jobConfig).values())) {
@@ -80,7 +83,7 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
switch (event.getType()) {
case ADDED:
case UPDATED:
- RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), RuleAlteredJobConfiguration.class, true);
+ RuleAlteredJobConfiguration jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
String databaseName = jobConfig.getDatabaseName();
if (PipelineSimpleLock.getInstance().tryLock(databaseName, 1000)) {
execute(jobConfigPOJO);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index 870a0339573..724c70fca0a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -19,9 +19,10 @@ package org.apache.shardingsphere.data.pipeline.core.job;
import io.vertx.core.impl.ConcurrentHashSet;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
@@ -32,7 +33,6 @@ import org.apache.shardingsphere.data.pipeline.spi.lock.RowBasedJobLock;
import org.apache.shardingsphere.data.pipeline.spi.lock.RuleBasedJobLock;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import java.util.List;
import java.util.Map;
@@ -64,7 +64,7 @@ public final class FinishedCheckJob implements SimpleJob {
onCheckJobIds.add(jobId);
try {
// TODO refactor: dispatch to different job types
- RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(jobInfo.getJobParameter(), RuleAlteredJobConfiguration.class, true);
+ RuleAlteredJobConfiguration jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject(jobInfo.getJobParameter());
RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
if (null == ruleAlteredContext.getCompletionDetectAlgorithm()) {
log.info("completionDetector not configured, auto switch will not be enabled. You could query job progress and switch config manually with DistSQL.");
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index bb368119b3f..cbfa0940ef2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -19,13 +19,13 @@ package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingReleaseDatabaseLevelLockEvent;
/**
@@ -42,7 +42,7 @@ public final class RuleAlteredJob implements SimpleJob {
@Override
public void execute(final ShardingContext shardingContext) {
log.info("Execute job {}-{}", shardingContext.getJobName(), shardingContext.getShardingItem());
- RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(shardingContext.getJobParameter(), RuleAlteredJobConfiguration.class, true);
+ RuleAlteredJobConfiguration jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig, shardingContext.getShardingItem());
jobContext.setInitProgress(governanceRepositoryAPI.getJobProgress(jobContext.getJobId(), jobContext.getShardingItem()));
jobContext.setJobPreparer(jobPreparer);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
index 6f1550d7bab..35059e626c7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
@@ -86,7 +86,6 @@ public final class RuleAlteredJobContext {
public RuleAlteredJobContext(final RuleAlteredJobConfiguration jobConfig, final int jobShardingItem) {
ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
this.jobConfig = jobConfig;
- jobConfig.buildHandleConfig();
jobId = jobConfig.getJobId();
this.shardingItem = jobShardingItem;
taskConfig = RuleAlteredJobWorker.buildTaskConfig(jobConfig, jobShardingItem, ruleAlteredContext.getOnRuleAlteredActionConfig());
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index a9ea71faba6..694dc084a3c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -24,6 +24,8 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
@@ -204,14 +206,15 @@ public final class RuleAlteredJobWorker {
log.error("more than 1 rule altered");
throw new PipelineJobCreationException("more than 1 rule altered");
}
- RuleAlteredJobConfiguration result = new RuleAlteredJobConfiguration();
+ YamlRuleAlteredJobConfiguration result = new YamlRuleAlteredJobConfiguration();
result.setDatabaseName(event.getDatabaseName());
result.setAlteredRuleYamlClassNameTablesMap(alteredRuleYamlClassNameTablesMap);
result.setActiveVersion(event.getActiveVersion());
result.setNewVersion(event.getNewVersion());
result.setSource(createYamlPipelineDataSourceConfiguration(sourceRootConfig));
result.setTarget(createYamlPipelineDataSourceConfiguration(targetRootConfig));
- return Optional.of(result);
+ result.extendConfiguration();
+ return Optional.of(new RuleAlteredJobConfigurationSwapper().swapToObject(result));
}
private Collection<Pair<YamlRuleConfiguration, YamlRuleConfiguration>> groupSourceTargetRuleConfigsByType(final Collection<YamlRuleConfiguration> sourceRules,
@@ -289,8 +292,8 @@ public final class RuleAlteredJobWorker {
.allMatch(progress -> null != progress && progress.getStatus().equals(JobStatus.FINISHED))) {
continue;
}
- RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(each.getJobParameter(), RuleAlteredJobConfiguration.class, true);
- if (hasUncompletedJobOfSameDatabaseName(jobConfig, databaseName)) {
+ RuleAlteredJobConfiguration jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject(each.getJobParameter());
+ if (databaseName.equals(jobConfig.getDatabaseName())) {
result = true;
break;
}
@@ -298,10 +301,6 @@ public final class RuleAlteredJobWorker {
return result;
}
- private boolean hasUncompletedJobOfSameDatabaseName(final RuleAlteredJobConfiguration jobConfig, final String currentDatabaseName) {
- return currentDatabaseName.equals(jobConfig.getDatabaseName());
- }
-
/**
* scaling release database level lock.
*
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
index 61c31562e35..1e963116474 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
@@ -21,8 +21,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
@@ -50,7 +50,7 @@ public final class ScalingEnvironmentManager {
public void cleanupTargetTables(final RuleAlteredJobConfiguration jobConfig) throws SQLException {
Collection<String> tables = jobConfig.splitLogicTableNames();
log.info("cleanupTargetTables, tables={}", tables);
- YamlPipelineDataSourceConfiguration target = jobConfig.getTarget();
+ PipelineDataSourceConfiguration target = jobConfig.getTarget();
PipelineSQLBuilder pipelineSQLBuilder = PipelineSQLBuilderFactory.getInstance(jobConfig.getTargetDatabaseType());
ShardingSphereMetaData metaData = PipelineContext.getContextManager().getMetaDataContexts().getMetaData(jobConfig.getDatabaseName());
TableNameSchemaNameMapping tableNameSchemaNameMapping = new TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(metaData.getSchemas()));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
index 96db014402d..140528e89bd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
@@ -20,9 +20,9 @@ package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
@@ -56,10 +56,10 @@ public final class MySQLDataSourcePreparerTest {
private RuleAlteredJobConfiguration jobConfig;
@Mock
- private YamlPipelineDataSourceConfiguration sourceYamlPipelineDataSourceConfig;
+ private PipelineDataSourceConfiguration sourcePipelineDataSourceConfig;
@Mock
- private YamlPipelineDataSourceConfiguration targetYamlPipelineDataSourceConfig;
+ private PipelineDataSourceConfiguration targetPipelineDataSourceConfig;
@Mock
private ShardingSpherePipelineDataSourceConfiguration sourceScalingDataSourceConfig;
@@ -79,10 +79,10 @@ public final class MySQLDataSourcePreparerTest {
when(mockPipelineDataSourceManager.getDataSource(same(sourceScalingDataSourceConfig))).thenReturn(sourceDataSourceWrapper);
when(mockPipelineDataSourceManager.getDataSource(same(targetScalingDataSourceConfig))).thenReturn(targetDataSourceWrapper);
when(prepareTargetTablesParameter.getDataSourceManager()).thenReturn(mockPipelineDataSourceManager);
- when(jobConfig.getSource()).thenReturn(sourceYamlPipelineDataSourceConfig);
+ when(jobConfig.getSource()).thenReturn(sourcePipelineDataSourceConfig);
when(jobConfig.getSource().getType()).thenReturn("ShardingSphereJDBC");
when(jobConfig.getSource().getParameter()).thenReturn("source");
- when(jobConfig.getTarget()).thenReturn(targetYamlPipelineDataSourceConfig);
+ when(jobConfig.getTarget()).thenReturn(targetPipelineDataSourceConfig);
when(jobConfig.getTarget().getType()).thenReturn("ShardingSphereJDBC");
when(jobConfig.getTarget().getParameter()).thenReturn("target");
when(prepareTargetTablesParameter.getJobConfig()).thenReturn(jobConfig);
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java
index 30a8a7ff919..4de500bd417 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.integration.data.pipeline.env;
import com.google.gson.Gson;
import lombok.Getter;
import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
@@ -81,7 +81,7 @@ public final class ITEnvironmentContext {
}
private static String createScalingConfiguration(final Map<String, YamlTableRuleConfiguration> tableRules) {
- RuleAlteredJobConfiguration jobConfig = new RuleAlteredJobConfiguration();
+ YamlRuleAlteredJobConfiguration jobConfig = new YamlRuleAlteredJobConfiguration();
jobConfig.setSource(createYamlPipelineDataSourceConfiguration(SourceConfiguration.getDockerConfiguration(tableRules)));
jobConfig.setTarget(createYamlPipelineDataSourceConfiguration(TargetConfiguration.getDockerConfiguration()));
return new Gson().toJson(jobConfig);
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
index 4b3fe083426..1900339730a 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -20,6 +20,8 @@ package org.apache.shardingsphere.data.pipeline.core.util;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
@@ -44,7 +46,7 @@ public final class JobConfigurationBuilder {
* @return created job configuration
*/
public static RuleAlteredJobConfiguration createJobConfiguration() {
- RuleAlteredJobConfiguration result = new RuleAlteredJobConfiguration();
+ YamlRuleAlteredJobConfiguration result = new YamlRuleAlteredJobConfiguration();
result.setDatabaseName("logic_db");
result.setAlteredRuleYamlClassNameTablesMap(Collections.singletonMap(YamlShardingRuleConfiguration.class.getName(), Collections.singletonList("t_order")));
result.setActiveVersion(0);
@@ -53,10 +55,10 @@ public final class JobConfigurationBuilder {
result.setSource(createYamlPipelineDataSourceConfiguration(
new ShardingSpherePipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_sharding_sphere_jdbc_source.yaml"))));
result.setTarget(createYamlPipelineDataSourceConfiguration(new StandardPipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_standard_jdbc_target.yaml"))));
- result.buildHandleConfig();
+ result.extendConfiguration();
int activeVersion = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 10) + 1;
result.setJobId(generateJobId(activeVersion, "logic_db"));
- return result;
+ return new RuleAlteredJobConfigurationSwapper().swapToObject(result);
}
private static YamlPipelineDataSourceConfiguration createYamlPipelineDataSourceConfiguration(final PipelineDataSourceConfiguration config) {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
deleted file mode 100644
index 1600b0127f1..00000000000
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
-
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
-import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Map;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-
-public final class RuleAlteredJobTest {
-
- @BeforeClass
- public static void beforeClass() {
- PipelineContextUtil.mockModeConfigAndContextManager();
- }
-
- @Test
- @SuppressWarnings("unchecked")
- @SneakyThrows(ReflectiveOperationException.class)
- public void assertExecute() {
- RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
- initTableData(jobConfig);
- ShardingContext shardingContext = new ShardingContext("1", null, 2, YamlEngine.marshal(jobConfig), 0, null);
- new RuleAlteredJob().execute(shardingContext);
- Map<String, RuleAlteredJobScheduler> jobSchedulerMap = ReflectionUtil.getStaticFieldValue(RuleAlteredJobSchedulerCenter.class, "JOB_SCHEDULER_MAP", Map.class);
- assertNotNull(jobSchedulerMap);
- assertFalse(jobSchedulerMap.isEmpty());
- }
-
- @SneakyThrows(SQLException.class)
- private void initTableData(final RuleAlteredJobConfiguration jobConfig) {
- YamlPipelineDataSourceConfiguration source = jobConfig.getSource();
- try (
- PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(source.getType(), source.getParameter()));
- Connection connection = dataSource.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("DROP TABLE IF EXISTS t_order");
- statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id VARCHAR(12))");
- statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
- }
- }
-}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
index 577dee68b64..2ed00596652 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
@@ -19,6 +19,8 @@ package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import org.apache.commons.io.FileUtils;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
@@ -56,8 +58,10 @@ public final class RuleAlteredJobWorkerTest {
@Test(expected = PipelineJobCreationException.class)
public void assertCreateRuleAlteredContextNoAlteredRule() {
RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
- jobConfig.setAlteredRuleYamlClassNameTablesMap(Collections.emptyMap());
- RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+ RuleAlteredJobConfigurationSwapper swapper = new RuleAlteredJobConfigurationSwapper();
+ YamlRuleAlteredJobConfiguration yamlJobConfig = swapper.swapToYamlConfiguration(jobConfig);
+ yamlJobConfig.setAlteredRuleYamlClassNameTablesMap(Collections.emptyMap());
+ RuleAlteredJobWorker.createRuleAlteredContext(swapper.swapToObject(yamlJobConfig));
}
@Test