You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/01/05 03:47:31 UTC
[shardingsphere] branch master updated: Disable sourceWritingStopper and checkoutLocker config by default; Compatible with empty scaling config (#14523)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ac472e8 Disable sourceWritingStopper and checkoutLocker config by default; Compatible with empty scaling config (#14523)
ac472e8 is described below
commit ac472e86a92890cfef08a7482c6aa5522832884b
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Wed Jan 5 11:46:47 2022 +0800
Disable sourceWritingStopper and checkoutLocker config by default; Compatible with empty scaling config (#14523)
* Cancel sourceWritingStopper and checkoutLocker config
* Compatible with removed scaling config
* Compatible with empty scaling config
---
docs/document/content/dev-manual/scaling.cn.md | 20 --------------------
docs/document/content/dev-manual/scaling.en.md | 20 --------------------
.../user-manual/shardingsphere-scaling/build.cn.md | 8 --------
.../user-manual/shardingsphere-scaling/build.en.md | 8 --------
.../test/resources/yaml/encrypt-dataConverters.yaml | 4 ----
.../schedule/ShardingRuleAlteredDetector.java | 10 +++++++++-
.../src/test/resources/yaml/sharding-rule.yaml | 4 ----
.../src/test/resources/yaml/sharding-scaling.yaml | 4 ----
.../OnRuleAlteredActionConfiguration.java | 4 ----
.../YamlOnRuleAlteredActionConfiguration.java | 4 ----
.../OnRuleAlteredActionConfigurationYamlSwapper.java | 12 +++++++-----
...uleAlteredActionConfigurationYamlSwapperTest.java | 17 +++++++++++++----
.../pipeline/core/api/impl/PipelineJobAPIImpl.java | 2 +-
.../scenario/rulealtered/RuleAlteredContext.java | 14 ++------------
.../scenario/rulealtered/RuleAlteredJobWorker.java | 2 +-
...hardingSpherePipelineDataSourceConfiguration.java | 2 +-
.../service/impl/SchemaRulePersistService.java | 2 +-
.../src/main/resources/conf/config-sharding.yaml | 8 --------
18 files changed, 35 insertions(+), 110 deletions(-)
diff --git a/docs/document/content/dev-manual/scaling.cn.md b/docs/document/content/dev-manual/scaling.cn.md
index 588b550..6d5c1b9 100644
--- a/docs/document/content/dev-manual/scaling.cn.md
+++ b/docs/document/content/dev-manual/scaling.cn.md
@@ -37,16 +37,6 @@ chapter = true
| ------------------------------------------- | ------------------------------------------- |
| IdleRuleAlteredJobCompletionDetectAlgorithm | 基于增量迁移任务空闲时长的检测算法 |
-## RowBasedJobLockAlgorithm
-
-| *SPI 名称* | *详细说明* |
-| ------------------------------------------- | ------------------------------------------- |
-| RowBasedJobLockAlgorithm | 用于保护记录的任务锁算法 |
-
-| *已知实现类* | *详细说明* |
-| ------------------------------------------- | ------------------------------------------- |
-| DefaultSourceWritingStopAlgorithm | 源端停写算法默认实现 |
-
## DataConsistencyCheckAlgorithm
| *SPI 名称* | *详细说明* |
@@ -57,13 +47,3 @@ chapter = true
| ------------------------------------------- | ------------------------------------------- |
| DataMatchDataConsistencyCheckAlgorithm | 基于数据匹配的一致性校验算法 |
| CRC32MatchDataConsistencyCheckAlgorithm | 基于数据CRC32匹配的一致性校验算法 |
-
-## RuleBasedJobLockAlgorithm
-
-| *SPI 名称* | *详细说明* |
-| ------------------------------------------- | ------------------------------------------- |
-| RuleBasedJobLockAlgorithm | 用于保护规则的任务锁算法 |
-
-| *已知实现类* | *详细说明* |
-| ------------------------------------------- | ------------------------------------------- |
-| DefaultMetadataCheckoutLockAlgorithm | 切换元数据锁算法默认实现 |
diff --git a/docs/document/content/dev-manual/scaling.en.md b/docs/document/content/dev-manual/scaling.en.md
index 0d632df..d3cbe5e 100644
--- a/docs/document/content/dev-manual/scaling.en.md
+++ b/docs/document/content/dev-manual/scaling.en.md
@@ -37,16 +37,6 @@ chapter = true
| ------------------------------------------- | ------------------------------------------- |
| IdleRuleAlteredJobCompletionDetectAlgorithm | Incremental task idle time based algorithm |
-## RowBasedJobLockAlgorithm
-
-| *SPI Name* | *Description* |
-| ------------------------------------------- | ------------------------------------------- |
-| RowBasedJobLockAlgorithm | Job lock algorithm for protecting row |
-
-| *Implementation Class* | *Description* |
-| ------------------------------------------- | ------------------------------------------- |
-| DefaultSourceWritingStopAlgorithm | Default lock implementation for stopping source writing |
-
## DataConsistencyCheckAlgorithm
| *SPI Name* | *Description* |
@@ -57,13 +47,3 @@ chapter = true
| ------------------------------------------- | ------------------------------------------- |
| DataMatchDataConsistencyCheckAlgorithm | Records content match implementation |
| CRC32MatchDataConsistencyCheckAlgorithm | Records CRC32 match implementation |
-
-## RuleBasedJobLockAlgorithm
-
-| *SPI Name* | *Description* |
-| ------------------------------------------- | ------------------------------------------- |
-| RuleBasedJobLockAlgorithm | Job lock algorithm for protecting rule |
-
-| *Implementation Class* | *Description* |
-| ------------------------------------------- | ------------------------------------------- |
-| DefaultMetadataCheckoutLockAlgorithm | Default lock implementation for metadata checkout |
diff --git a/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md b/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md
index f56f951..4a99571 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md
@@ -64,14 +64,10 @@ rules:
type: # 算法类型。可选项:IDLE
props: # 算法属性
incremental-task-idle-minute-threshold: # 如果增量同步任务不再活动超过一定时间,那么可以认为增量同步任务接近完成。适用算法类型:IDLE
- sourceWritingStopper: # 源端停写算法。如果不配置,那么系统会跳过这个步骤。
- type: # 算法类型。可选项:DEFAULT
dataConsistencyChecker: # 数据一致性校验算法。如果不配置,那么系统会跳过这个步骤。
type: # 算法类型。可选项:DATA_MATCH, CRC32_MATCH
props: # 算法属性
chunk-size: # 一次查询操作返回的最大记录数
- checkoutLocker: # 元数据切换算法。如果不配置,那么系统会跳过这个步骤。
- type: # 算法类型。可选项:DEFAULT
```
配置示例:
@@ -94,14 +90,10 @@ rules:
type: IDLE
props:
incremental-task-idle-minute-threshold: 30
- sourceWritingStopper:
- type: DEFAULT
dataConsistencyChecker:
type: DATA_MATCH
props:
chunk-size: 1000
- checkoutLocker:
- type: DEFAULT
```
以上的 `rateLimiter`,`completionDetector`,`sourceWritingStopper`,`dataConsistencyChecker` 和 `checkoutLocker` 都可以通过实现SPI自定义。可以参考现有实现,详情请参见[开发者手册#弹性伸缩](/cn/dev-manual/scaling/)。
diff --git a/docs/document/content/user-manual/shardingsphere-scaling/build.en.md b/docs/document/content/user-manual/shardingsphere-scaling/build.en.md
index 57c29ef..c41276d 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/build.en.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/build.en.md
@@ -63,14 +63,10 @@ rules:
type: # Algorithm type. Options: IDLE
props: # Algorithm properties
incremental-task-idle-minute-threshold: # If incremental tasks is idle more than so much minutes, then it could be considered as almost completed. Available for types: IDLE
- sourceWritingStopper: # Lock algorithm for stopping source writing. If it's not configured, then system will skip this step.
- type: # Algorithm type. Options: DEFAULT
dataConsistencyChecker: # Data consistency check algorithm. If it's not configured, then system will skip this step.
type: # Algorithm type. Options: DATA_MATCH, CRC32_MATCH
props: # Algorithm properties
chunk-size: # Maximum records count of a query operation for check
- checkoutLocker: # Lock algorithm for metadata checkout. If it's not configured, then system will skip this step.
- type: # Algorithm type. Options: DEFAULT
```
Configuration Example:
@@ -93,14 +89,10 @@ rules:
type: IDLE
props:
incremental-task-idle-minute-threshold: 30
- sourceWritingStopper:
- type: DEFAULT
dataConsistencyChecker:
type: DATA_MATCH
props:
chunk-size: 1000
- checkoutLocker:
- type: DEFAULT
```
You could customize `rateLimiter`, `completionDetector`, `sourceWritingStopper`, `dataConsistencyChecker` and `checkoutLocker` algorithm by implementing SPI. Current implementation could be referenced, please refer to [Dev Manual#Scaling](/en/dev-manual/scaling/) for more details.
diff --git a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
index bd3cb68..8767f19 100644
--- a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
+++ b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
@@ -29,11 +29,7 @@ dataConverters:
type: IDLE
props:
incremental-task-idle-minute-threshold: 1
- sourceWritingStopper:
- type: DEFAULT
dataConsistencyChecker:
type: DATA_MATCH
props:
chunk-size: 1000
- checkoutLocker:
- type: DEFAULT
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredDetector.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredDetector.java
index 798dbfd..bea04f4 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredDetector.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredDetector.java
@@ -21,6 +21,8 @@ import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetect
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRuleConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.swapper.rulealtered.OnRuleAlteredActionConfigurationYamlSwapper;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
import org.apache.shardingsphere.sharding.yaml.config.YamlShardingRuleConfiguration;
@@ -34,6 +36,8 @@ import java.util.Optional;
*/
public final class ShardingRuleAlteredDetector implements RuleAlteredDetector {
+ private static final OnRuleAlteredActionConfigurationYamlSwapper CONFIG_YAML_SWAPPER = new OnRuleAlteredActionConfigurationYamlSwapper();
+
@Override
public String getYamlRuleConfigClassName() {
return YamlShardingRuleConfiguration.class.getName();
@@ -79,6 +83,10 @@ public final class ShardingRuleAlteredDetector implements RuleAlteredDetector {
return Optional.empty();
}
OnRuleAlteredActionConfiguration result = shardingRuleConfig.getScaling().get(scalingName);
- return Optional.ofNullable(result);
+ if (null == result) {
+ YamlOnRuleAlteredActionConfiguration yamlConfig = new YamlOnRuleAlteredActionConfiguration();
+ result = CONFIG_YAML_SWAPPER.swapToObject(yamlConfig);
+ }
+ return Optional.of(result);
}
}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
index c768f47..4a55b94 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
@@ -119,14 +119,10 @@ rules:
type: IDLE
props:
incremental-task-idle-minute-threshold: 30
- sourceWritingStopper:
- type: DEFAULT
dataConsistencyChecker:
type: DATA_MATCH
props:
chunk-size: 1000
- checkoutLocker:
- type: DEFAULT
props:
sql-show: true
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
index 40fed15..42d2df1 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
@@ -29,11 +29,7 @@ scaling:
type: IDLE
props:
incremental-task-idle-minute-threshold: 30
- sourceWritingStopper:
- type: DEFAULT
dataConsistencyChecker:
type: DATA_MATCH
props:
chunk-size: 1000
- checkoutLocker:
- type: DEFAULT
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
index 29acf05..71dfd14 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
@@ -38,9 +38,5 @@ public final class OnRuleAlteredActionConfiguration {
private final ShardingSphereAlgorithmConfiguration completionDetector;
- private final ShardingSphereAlgorithmConfiguration sourceWritingStopper;
-
private final ShardingSphereAlgorithmConfiguration dataConsistencyChecker;
-
- private final ShardingSphereAlgorithmConfiguration checkoutLocker;
}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
index dfc5ab8..b909fc4 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
@@ -39,9 +39,5 @@ public final class YamlOnRuleAlteredActionConfiguration implements YamlConfigura
private YamlShardingSphereAlgorithmConfiguration completionDetector;
- private YamlShardingSphereAlgorithmConfiguration sourceWritingStopper;
-
private YamlShardingSphereAlgorithmConfiguration dataConsistencyChecker;
-
- private YamlShardingSphereAlgorithmConfiguration checkoutLocker;
}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
index 4646303..8ee36f6 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
@@ -31,25 +31,27 @@ public final class OnRuleAlteredActionConfigurationYamlSwapper implements YamlCo
@Override
public YamlOnRuleAlteredActionConfiguration swapToYamlConfiguration(final OnRuleAlteredActionConfiguration data) {
+ if (null == data) {
+ return null;
+ }
YamlOnRuleAlteredActionConfiguration result = new YamlOnRuleAlteredActionConfiguration();
result.setBlockQueueSize(data.getBlockQueueSize());
result.setWorkerThread(data.getWorkerThread());
result.setReadBatchSize(data.getReadBatchSize());
result.setRateLimiter(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
result.setCompletionDetector(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getCompletionDetector()));
- result.setSourceWritingStopper(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getSourceWritingStopper()));
result.setDataConsistencyChecker(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getDataConsistencyChecker()));
- result.setCheckoutLocker(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getCheckoutLocker()));
return result;
}
@Override
public OnRuleAlteredActionConfiguration swapToObject(final YamlOnRuleAlteredActionConfiguration yamlConfig) {
+ if (null == yamlConfig) {
+ return null;
+ }
return new OnRuleAlteredActionConfiguration(yamlConfig.getBlockQueueSize(), yamlConfig.getWorkerThread(), yamlConfig.getReadBatchSize(),
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getRateLimiter()),
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getCompletionDetector()),
- ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getSourceWritingStopper()),
- ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getDataConsistencyChecker()),
- ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getCheckoutLocker()));
+ ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getDataConsistencyChecker()));
}
}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
index eb9f8bf..bc0d82d 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
@@ -26,10 +26,13 @@ import org.junit.Test;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
public final class OnRuleAlteredActionConfigurationYamlSwapperTest {
+ private static final OnRuleAlteredActionConfigurationYamlSwapper CONFIG_YAML_SWAPPER = new OnRuleAlteredActionConfigurationYamlSwapper();
+
@Test
public void assertSwap() {
YamlOnRuleAlteredActionConfiguration yamlConfig = new YamlOnRuleAlteredActionConfiguration();
@@ -43,16 +46,22 @@ public final class OnRuleAlteredActionConfigurationYamlSwapperTest {
Properties completionDetectorProps = new Properties();
completionDetectorProps.setProperty("incremental-task-idle-minute-threshold", "30");
yamlConfig.setCompletionDetector(new YamlShardingSphereAlgorithmConfiguration("IDLE", completionDetectorProps));
- Properties sourceWritingStopperProps = new Properties();
- yamlConfig.setSourceWritingStopper(new YamlShardingSphereAlgorithmConfiguration("DEFAULT", sourceWritingStopperProps));
Properties dataConsistencyCheckerProps = new Properties();
dataConsistencyCheckerProps.setProperty("chunk-size", "1000");
yamlConfig.setDataConsistencyChecker(new YamlShardingSphereAlgorithmConfiguration("DATA_MATCH", dataConsistencyCheckerProps));
- Properties checkoutLockerProps = new Properties();
- yamlConfig.setCheckoutLocker(new YamlShardingSphereAlgorithmConfiguration("DEFAULT", checkoutLockerProps));
OnRuleAlteredActionConfigurationYamlSwapper yamlSwapper = new OnRuleAlteredActionConfigurationYamlSwapper();
OnRuleAlteredActionConfiguration actualConfig = yamlSwapper.swapToObject(yamlConfig);
YamlOnRuleAlteredActionConfiguration actualYamlConfig = yamlSwapper.swapToYamlConfiguration(actualConfig);
assertThat(YamlEngine.marshal(actualYamlConfig), is(YamlEngine.marshal(yamlConfig)));
}
+
+ @Test
+ public void assertYamlConfigNull() {
+ assertNull(CONFIG_YAML_SWAPPER.swapToYamlConfiguration(null));
+ }
+
+ @Test
+ public void assertConfigNull() {
+ assertNull(CONFIG_YAML_SWAPPER.swapToObject(null));
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineJobAPIImpl.java
index fc7b660..3e05941 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineJobAPIImpl.java
@@ -289,7 +289,7 @@ public final class PipelineJobAPIImpl implements PipelineJobAPI {
}
Optional<Collection<RuleAlteredJobContext>> optionalJobContexts = RuleAlteredJobSchedulerCenter.getJobContexts(jobId);
optionalJobContexts.ifPresent(jobContexts -> jobContexts.forEach(each -> each.setStatus(JobStatus.ALMOST_FINISHED)));
- YamlRootConfiguration yamlRootConfig = YamlEngine.unmarshal(jobConfig.getPipelineConfig().getTarget().getParameter(), YamlRootConfiguration.class);
+ YamlRootConfiguration yamlRootConfig = YamlEngine.unmarshal(jobConfig.getPipelineConfig().getTarget().getParameter(), YamlRootConfiguration.class, true);
WorkflowConfiguration workflowConfig = jobConfig.getWorkflowConfig();
String schemaName = workflowConfig.getSchemaName();
String ruleCacheId = workflowConfig.getRuleCacheId();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
index 9fe997c..4e4bee3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
@@ -82,24 +82,14 @@ public final class RuleAlteredContext {
} else {
completionDetectAlgorithm = null;
}
- ShardingSphereAlgorithmConfiguration sourceWritingStopper = onRuleAlteredActionConfig.getSourceWritingStopper();
- if (null != sourceWritingStopper) {
- sourceWritingStopAlgorithm = ShardingSphereAlgorithmFactory.createAlgorithm(sourceWritingStopper, RowBasedJobLockAlgorithm.class);
- } else {
- sourceWritingStopAlgorithm = null;
- }
+ sourceWritingStopAlgorithm = null;
ShardingSphereAlgorithmConfiguration dataConsistencyChecker = onRuleAlteredActionConfig.getDataConsistencyChecker();
if (null != dataConsistencyChecker) {
dataConsistencyCheckAlgorithm = ShardingSphereAlgorithmFactory.createAlgorithm(dataConsistencyChecker, DataConsistencyCheckAlgorithm.class);
} else {
dataConsistencyCheckAlgorithm = null;
}
- ShardingSphereAlgorithmConfiguration checkoutLocker = onRuleAlteredActionConfig.getCheckoutLocker();
- if (null != checkoutLocker) {
- checkoutLockAlgorithm = ShardingSphereAlgorithmFactory.createAlgorithm(checkoutLocker, RuleBasedJobLockAlgorithm.class);
- } else {
- checkoutLockAlgorithm = null;
- }
+ checkoutLockAlgorithm = null;
inventoryDumperExecuteEngine = ExecuteEngine.newFixedThreadInstance(onRuleAlteredActionConfig.getWorkerThread());
incrementalDumperExecuteEngine = ExecuteEngine.newCachedThreadInstance();
importerExecuteEngine = ExecuteEngine.newFixedThreadInstance(onRuleAlteredActionConfig.getWorkerThread());
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index 9a4a656..099ead0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -247,7 +247,7 @@ public final class RuleAlteredJobWorker {
Map<String, Map<String, Object>> yamlDataSources = YamlEngine.unmarshal(dataSources, Map.class);
disableSSLForMySQL(yamlDataSources);
result.setDataSources(yamlDataSources);
- Collection<YamlRuleConfiguration> yamlRuleConfigs = YamlEngine.unmarshal(rules, Collection.class);
+ Collection<YamlRuleConfiguration> yamlRuleConfigs = YamlEngine.unmarshal(rules, Collection.class, true);
result.setRules(yamlRuleConfigs);
return result;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
index 2589dbd..6f320f5 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
@@ -54,7 +54,7 @@ public final class ShardingSpherePipelineDataSourceConfiguration implements Pipe
public ShardingSpherePipelineDataSourceConfiguration(final String parameter) {
this.parameter = parameter;
- rootConfig = YamlEngine.unmarshal(parameter, YamlRootConfiguration.class);
+ rootConfig = YamlEngine.unmarshal(parameter, YamlRootConfiguration.class, true);
Map<String, Object> props = rootConfig.getDataSources().values().iterator().next();
databaseType = DatabaseTypeRegistry.getDatabaseTypeByURL(getJdbcUrl(props));
}
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/SchemaRulePersistService.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/SchemaRulePersistService.java
index bcc22e7..05a9ab0 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/SchemaRulePersistService.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/SchemaRulePersistService.java
@@ -60,7 +60,7 @@ public final class SchemaRulePersistService implements SchemaBasedPersistService
public Collection<RuleConfiguration> load(final String schemaName) {
return isExisted(schemaName)
// TODO process algorithm provided configuration
- ? new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(YamlEngine.unmarshal(repository.get(SchemaMetaDataNode.getRulePath(schemaName)), Collection.class))
+ ? new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(YamlEngine.unmarshal(repository.get(SchemaMetaDataNode.getRulePath(schemaName)), Collection.class, true))
: new LinkedList<>();
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
index 4ef066f..b1d2cd0 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
@@ -108,14 +108,10 @@
# type: IDLE
# props:
# incremental-task-idle-minute-threshold: 30
-# sourceWritingStopper:
-# type: DEFAULT
# dataConsistencyChecker:
# type: DATA_MATCH
# props:
# chunk-size: 1000
-# checkoutLocker:
-# type: DEFAULT
######################################################################################################
#
@@ -209,11 +205,7 @@
# type: IDLE
# props:
# incremental-task-idle-minute-threshold: 30
-# sourceWritingStopper:
-# type: DEFAULT
# dataConsistencyChecker:
# type: DATA_MATCH
# props:
# chunk-size: 1000
-# checkoutLocker:
-# type: DEFAULT