You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/12/01 06:54:09 UTC
[shardingsphere] branch master updated: Rename SyncConfiguration to
TaskConfiguration (#8438)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ae5e4bf Rename SyncConfiguration to TaskConfiguration (#8438)
ae5e4bf is described below
commit ae5e4bf834fb5e2dc2337868b8e51c2d662ecf64
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Tue Dec 1 14:53:45 2020 +0800
Rename SyncConfiguration to TaskConfiguration (#8438)
* Add log for ScalingTaskScheduler
* Rename SyncConfiguration to TaskConfiguration
Co-authored-by: qiulu3 <Lucas209910>
---
...ncConfiguration.java => TaskConfiguration.java} | 4 ++--
.../scaling/core/datasource/DataSourceManager.java | 18 ++++++++--------
.../scaling/core/job/ShardingScalingJob.java | 8 ++++----
.../job/check/AbstractDataConsistencyChecker.java | 2 +-
.../job/preparer/ShardingScalingJobPreparer.java | 10 ++++-----
.../job/preparer/resumer/SyncPositionResumer.java | 6 +++---
.../splitter/InventoryDataTaskSplitter.java | 10 ++++-----
.../core/schedule/ScalingTaskScheduler.java | 6 ++++--
.../core/service/AbstractScalingJobService.java | 6 +++---
.../service/impl/DistributedScalingJobService.java | 4 ++--
...urationUtil.java => TaskConfigurationUtil.java} | 16 +++++++--------
.../core/datasource/DataSourceManagerTest.java | 10 ++++-----
.../check/AbstractDataConsistencyCheckerTest.java | 4 ++--
.../splitter/InventoryDataTaskSplitterTest.java | 24 +++++++++++-----------
.../inventory/InventoryDataScalingTaskTest.java | 18 ++++++++--------
.../component/MySQLDataConsistencyChecker.java | 2 +-
16 files changed, 75 insertions(+), 73 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/SyncConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/TaskConfiguration.java
similarity index 94%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/SyncConfiguration.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/TaskConfiguration.java
index 566c29b..113c30b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/SyncConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/TaskConfiguration.java
@@ -21,11 +21,11 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
- * Sync configuration.
+ * Task configuration.
*/
@Getter
@RequiredArgsConstructor
-public final class SyncConfiguration {
+public final class TaskConfiguration {
private final JobConfiguration jobConfig;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManager.java
index 78f8d47..e381c5b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManager.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.scaling.core.datasource;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.DataSourceConfiguration;
import javax.sql.DataSource;
@@ -44,18 +44,18 @@ public final class DataSourceManager implements AutoCloseable {
@Getter
private final Map<DataSourceConfiguration, DataSourceWrapper> sourceDataSources = new ConcurrentHashMap<>();
- public DataSourceManager(final List<SyncConfiguration> syncConfigs) {
- createDataSources(syncConfigs);
+ public DataSourceManager(final List<TaskConfiguration> taskConfigs) {
+ createDataSources(taskConfigs);
}
- private void createDataSources(final List<SyncConfiguration> syncConfigs) {
- createSourceDataSources(syncConfigs);
- createTargetDataSources(syncConfigs.iterator().next().getImporterConfig().getDataSourceConfig());
+ private void createDataSources(final List<TaskConfiguration> taskConfigs) {
+ createSourceDataSources(taskConfigs);
+ createTargetDataSources(taskConfigs.iterator().next().getImporterConfig().getDataSourceConfig());
}
- private void createSourceDataSources(final List<SyncConfiguration> syncConfigs) {
- for (SyncConfiguration syncConfig : syncConfigs) {
- DataSourceConfiguration dataSourceConfig = syncConfig.getDumperConfig().getDataSourceConfig();
+ private void createSourceDataSources(final List<TaskConfiguration> taskConfigs) {
+ for (TaskConfiguration taskConfig : taskConfigs) {
+ DataSourceConfiguration dataSourceConfig = taskConfig.getDumperConfig().getDataSourceConfig();
DataSourceWrapper dataSource = dataSourceFactory.newInstance(dataSourceConfig);
cachedDataSources.put(dataSourceConfig, dataSource);
sourceDataSources.put(dataSourceConfig, dataSource);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ShardingScalingJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ShardingScalingJob.java
index 4e022ac..af0e9e1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ShardingScalingJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ShardingScalingJob.java
@@ -20,11 +20,11 @@ package org.apache.shardingsphere.scaling.core.job;
import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
-import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import org.apache.shardingsphere.scaling.core.schedule.SyncTaskControlStatus;
-import org.apache.shardingsphere.scaling.core.utils.SyncConfigurationUtil;
+import org.apache.shardingsphere.scaling.core.utils.TaskConfigurationUtil;
import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
import java.util.LinkedList;
@@ -44,7 +44,7 @@ public final class ShardingScalingJob {
private int shardingItem;
- private final transient List<SyncConfiguration> syncConfigs = new LinkedList<>();
+ private final transient List<TaskConfiguration> taskConfigs = new LinkedList<>();
private final transient List<ScalingTask> inventoryDataTasks = new LinkedList<>();
@@ -68,7 +68,7 @@ public final class ShardingScalingJob {
this(Optional.ofNullable(scalingConfig.getJobConfiguration().getJobId()).orElse(generateKey()));
this.scalingConfig = scalingConfig;
shardingItem = scalingConfig.getJobConfiguration().getShardingItem();
- syncConfigs.addAll(SyncConfigurationUtil.toSyncConfigs(scalingConfig));
+ taskConfigs.addAll(TaskConfigurationUtil.toTaskConfigs(scalingConfig));
}
private static SnowflakeKeyGenerateAlgorithm initIdAutoIncreaseGenerator() {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyChecker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyChecker.java
index 49f8cb9..0768cbf 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyChecker.java
@@ -50,7 +50,7 @@ public abstract class AbstractDataConsistencyChecker implements DataConsistencyC
@Override
public Map<String, DataConsistencyCheckResult> countCheck() {
- return shardingScalingJob.getSyncConfigs()
+ return shardingScalingJob.getTaskConfigs()
.stream().flatMap(each -> each.getDumperConfig().getTableNameMap().values().stream()).collect(Collectors.toSet())
.stream().collect(Collectors.toMap(Function.identity(), this::countCheck, (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
index 7f3fb4c..f565a76 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.scaling.core.job.preparer;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.DataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
@@ -60,8 +60,8 @@ public final class ShardingScalingJobPreparer {
* @param shardingScalingJob sharding scaling job
*/
public void prepare(final ShardingScalingJob shardingScalingJob) {
- String databaseType = shardingScalingJob.getSyncConfigs().get(0).getDumperConfig().getDataSourceConfig().getDatabaseType().getName();
- try (DataSourceManager dataSourceManager = new DataSourceManager(shardingScalingJob.getSyncConfigs())) {
+ String databaseType = shardingScalingJob.getTaskConfigs().get(0).getDumperConfig().getDataSourceConfig().getDatabaseType().getName();
+ try (DataSourceManager dataSourceManager = new DataSourceManager(shardingScalingJob.getTaskConfigs())) {
checkDataSources(databaseType, dataSourceManager);
ResumeBreakPointManager resumeBreakPointManager = getResumeBreakPointManager(databaseType, shardingScalingJob);
if (resumeBreakPointManager.isResumable()) {
@@ -92,14 +92,14 @@ public final class ShardingScalingJobPreparer {
private void initInventoryDataTasks(final String databaseType, final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager) {
List<ScalingTask> allInventoryDataTasks = new LinkedList<>();
- for (SyncConfiguration each : shardingScalingJob.getSyncConfigs()) {
+ for (TaskConfiguration each : shardingScalingJob.getTaskConfigs()) {
allInventoryDataTasks.addAll(inventoryDataTaskSplitter.splitInventoryData(databaseType, each, dataSourceManager));
}
shardingScalingJob.getInventoryDataTasks().addAll(allInventoryDataTasks);
}
private void initIncrementalDataTasks(final String databaseType, final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager) {
- for (SyncConfiguration each : shardingScalingJob.getSyncConfigs()) {
+ for (TaskConfiguration each : shardingScalingJob.getTaskConfigs()) {
DataSourceConfiguration dataSourceConfig = each.getDumperConfig().getDataSourceConfig();
each.getDumperConfig().setPositionManager(PositionManagerFactory.newInstance(databaseType, dataSourceManager.getDataSource(dataSourceConfig)));
shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each.getJobConfig().getConcurrency(), each.getDumperConfig(), each.getImporterConfig()));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
index f64ce7a..6cab397 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.job.preparer.resumer;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
-import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
@@ -63,7 +63,7 @@ public final class SyncPositionResumer {
private List<ScalingTask> getAllInventoryDataTasks(final ShardingScalingJob shardingScalingJob,
final DataSourceManager dataSourceManager, final ResumeBreakPointManager resumeBreakPointManager) {
List<ScalingTask> result = new LinkedList<>();
- for (SyncConfiguration each : shardingScalingJob.getSyncConfigs()) {
+ for (TaskConfiguration each : shardingScalingJob.getTaskConfigs()) {
MetaDataManager metaDataManager = new MetaDataManager(dataSourceManager.getDataSource(each.getDumperConfig().getDataSourceConfig()));
for (Entry<String, PositionManager> entry : getInventoryPositionMap(each.getDumperConfig(), resumeBreakPointManager).entrySet()) {
result.add(syncTaskFactory.createInventoryDataSyncTask(newInventoryDumperConfig(each.getDumperConfig(), metaDataManager, entry), each.getImporterConfig()));
@@ -92,7 +92,7 @@ public final class SyncPositionResumer {
}
private void resumeIncrementalPosition(final ShardingScalingJob shardingScalingJob, final ResumeBreakPointManager resumeBreakPointManager) {
- for (SyncConfiguration each : shardingScalingJob.getSyncConfigs()) {
+ for (TaskConfiguration each : shardingScalingJob.getTaskConfigs()) {
each.getDumperConfig().setPositionManager(resumeBreakPointManager.getIncrementalPositionManagerMap().get(each.getDumperConfig().getDataSourceName()));
shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each.getJobConfig().getConcurrency(), each.getDumperConfig(), each.getImporterConfig()));
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
index 3b66c38..441ca41 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
@@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
-import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
import org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
@@ -56,14 +56,14 @@ public final class InventoryDataTaskSplitter {
* Split inventory data to multi-tasks.
*
* @param databaseType database type
- * @param syncConfig synchronize configuration
+ * @param taskConfig task configuration
* @param dataSourceManager data source manager
* @return split inventory data task
*/
- public Collection<ScalingTask> splitInventoryData(final String databaseType, final SyncConfiguration syncConfig, final DataSourceManager dataSourceManager) {
+ public Collection<ScalingTask> splitInventoryData(final String databaseType, final TaskConfiguration taskConfig, final DataSourceManager dataSourceManager) {
Collection<ScalingTask> result = new LinkedList<>();
- for (InventoryDumperConfiguration each : splitDumperConfig(databaseType, syncConfig.getJobConfig().getShardingSize(), syncConfig.getDumperConfig(), dataSourceManager)) {
- result.add(syncTaskFactory.createInventoryDataSyncTask(each, syncConfig.getImporterConfig()));
+ for (InventoryDumperConfiguration each : splitDumperConfig(databaseType, taskConfig.getJobConfig().getShardingSize(), taskConfig.getDumperConfig(), dataSourceManager)) {
+ result.add(syncTaskFactory.createInventoryDataSyncTask(each, taskConfig.getImporterConfig()));
}
return result;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
index 1e88663..3f9cb52 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
@@ -68,10 +68,11 @@ public final class ScalingTaskScheduler implements Runnable {
}
private synchronized boolean executeInventoryDataSyncTask() {
- log.info("-------------- Start inventory data sync task --------------");
if (ScalingTaskUtil.allInventoryTasksFinished(shardingScalingJob.getInventoryDataTasks())) {
+ log.info("All inventory tasks finished.");
return true;
}
+ log.info("-------------- Start inventory data sync task --------------");
shardingScalingJob.setStatus(SyncTaskControlStatus.MIGRATE_INVENTORY_DATA.name());
ExecuteCallback inventoryDataTaskCallback = createInventoryDataTaskCallback();
for (ScalingTask each : shardingScalingJob.getInventoryDataTasks()) {
@@ -86,6 +87,7 @@ public final class ScalingTaskScheduler implements Runnable {
@Override
public void onSuccess() {
if (ScalingTaskUtil.allInventoryTasksFinished(shardingScalingJob.getInventoryDataTasks())) {
+ log.info("All inventory tasks finished.");
executeIncrementalDataSyncTask();
}
}
@@ -100,10 +102,10 @@ public final class ScalingTaskScheduler implements Runnable {
}
private synchronized void executeIncrementalDataSyncTask() {
- log.info("-------------- Start incremental data sync task --------------");
if (SyncTaskControlStatus.SYNCHRONIZE_INCREMENTAL_DATA.name().equals(shardingScalingJob.getStatus())) {
return;
}
+ log.info("-------------- Start incremental data sync task --------------");
shardingScalingJob.setStatus(SyncTaskControlStatus.SYNCHRONIZE_INCREMENTAL_DATA.name());
ExecuteCallback incrementalDataTaskCallback = createIncrementalDataTaskCallback();
for (ScalingTask each : shardingScalingJob.getIncrementalDataTasks()) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
index 053cf67..e332429 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.utils.ProxyConfigurationUtil;
-import org.apache.shardingsphere.scaling.core.utils.SyncConfigurationUtil;
+import org.apache.shardingsphere.scaling.core.utils.TaskConfigurationUtil;
import java.util.Map;
import java.util.Optional;
@@ -35,7 +35,7 @@ public abstract class AbstractScalingJobService implements ScalingJobService {
@Override
public boolean shouldScaling(final String oldYamlProxyConfig, final String newYamlProxyConfig) {
ScalingConfiguration scalingConfig = ProxyConfigurationUtil.toScalingConfig(oldYamlProxyConfig, newYamlProxyConfig);
- SyncConfigurationUtil.fillInShardingTables(scalingConfig);
+ TaskConfigurationUtil.fillInShardingTables(scalingConfig);
return shouldScaling(scalingConfig);
}
@@ -46,7 +46,7 @@ public abstract class AbstractScalingJobService implements ScalingJobService {
@Override
public Optional<ShardingScalingJob> start(final String oldYamlProxyConfig, final String newYamlProxyConfig) {
ScalingConfiguration scalingConfig = ProxyConfigurationUtil.toScalingConfig(oldYamlProxyConfig, newYamlProxyConfig);
- SyncConfigurationUtil.fillInShardingTables(scalingConfig);
+ TaskConfigurationUtil.fillInShardingTables(scalingConfig);
if (!shouldScaling(scalingConfig)) {
return Optional.empty();
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
index 928d983..8ddf8c3 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
@@ -34,7 +34,7 @@ import org.apache.shardingsphere.scaling.core.service.AbstractScalingJobService;
import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
-import org.apache.shardingsphere.scaling.core.utils.SyncConfigurationUtil;
+import org.apache.shardingsphere.scaling.core.utils.TaskConfigurationUtil;
import java.util.List;
import java.util.Map;
@@ -58,7 +58,7 @@ public final class DistributedScalingJobService extends AbstractScalingJobServic
@Override
public Optional<ShardingScalingJob> start(final ScalingConfiguration scalingConfig) {
ShardingScalingJob shardingScalingJob = new ShardingScalingJob();
- SyncConfigurationUtil.fillInShardingTables(scalingConfig);
+ TaskConfigurationUtil.fillInShardingTables(scalingConfig);
updateScalingConfig(shardingScalingJob.getJobId(), scalingConfig);
return Optional.of(shardingScalingJob);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/SyncConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/TaskConfigurationUtil.java
similarity index 97%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/SyncConfigurationUtil.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/TaskConfigurationUtil.java
index 76f74ee..8009863 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/SyncConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/TaskConfigurationUtil.java
@@ -29,7 +29,7 @@ import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
-import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.DataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
@@ -59,19 +59,19 @@ import java.util.function.Function;
import java.util.stream.Collectors;
/**
- * Sync configuration Util.
+ * Task configuration Util.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class SyncConfigurationUtil {
+public final class TaskConfigurationUtil {
/**
- * Split Scaling configuration to Sync configurations.
+ * Split Scaling configuration to task configurations.
*
* @param scalingConfig scaling configuration
- * @return list of sync configurations
+ * @return list of task configurations
*/
- public static Collection<SyncConfiguration> toSyncConfigs(final ScalingConfiguration scalingConfig) {
- Collection<SyncConfiguration> result = new LinkedList<>();
+ public static Collection<TaskConfiguration> toTaskConfigs(final ScalingConfiguration scalingConfig) {
+ Collection<TaskConfiguration> result = new LinkedList<>();
ShardingSphereJDBCDataSourceConfiguration sourceConfig = getSourceConfig(scalingConfig);
ShardingRuleConfiguration sourceRuleConfig = ConfigurationYamlConverter.loadShardingRuleConfig(sourceConfig.getRule());
Map<String, org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration> sourceDataSource = ConfigurationYamlConverter.loadDataSourceConfigs(sourceConfig.getDataSource());
@@ -83,7 +83,7 @@ public final class SyncConfigurationUtil {
for (Entry<String, Map<String, String>> entry : dataSourceTableNameMap.entrySet()) {
DumperConfiguration dumperConfig = createDumperConfig(entry.getKey(), sourceDataSource.get(entry.getKey()).getProps(), entry.getValue());
ImporterConfiguration importerConfig = createImporterConfig(scalingConfig, shardingColumnsMap);
- result.add(new SyncConfiguration(scalingConfig.getJobConfiguration(), dumperConfig, importerConfig));
+ result.add(new TaskConfiguration(scalingConfig.getJobConfiguration(), dumperConfig, importerConfig));
}
return result;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManagerTest.java
index b5cd1a8..af8e097 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManagerTest.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.scaling.core.datasource;
import lombok.SneakyThrows;
-import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
import org.apache.shardingsphere.scaling.core.util.ScalingConfigurationUtil;
import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
import org.junit.Before;
@@ -36,24 +36,24 @@ import static org.junit.Assert.assertThat;
public final class DataSourceManagerTest {
- private List<SyncConfiguration> syncConfigurations;
+ private List<TaskConfiguration> taskConfigurations;
@Before
@SneakyThrows(IOException.class)
public void setUp() {
- syncConfigurations = ScalingConfigurationUtil.initJob("/config.json").getSyncConfigs();
+ taskConfigurations = ScalingConfigurationUtil.initJob("/config.json").getTaskConfigs();
}
@Test
public void assertGetDataSource() {
DataSourceManager dataSourceManager = new DataSourceManager();
- DataSource actual = dataSourceManager.getDataSource(syncConfigurations.get(0).getDumperConfig().getDataSourceConfig());
+ DataSource actual = dataSourceManager.getDataSource(taskConfigurations.get(0).getDumperConfig().getDataSourceConfig());
assertThat(actual, instanceOf(DataSourceWrapper.class));
}
@Test
public void assertClose() throws NoSuchFieldException, IllegalAccessException {
- DataSourceManager dataSourceManager = new DataSourceManager(syncConfigurations);
+ DataSourceManager dataSourceManager = new DataSourceManager(taskConfigurations);
Map<?, ?> cachedDataSources = ReflectionUtil.getFieldValue(dataSourceManager, "cachedDataSources", Map.class);
assertNotNull(cachedDataSources);
assertThat(cachedDataSources.size(), is(2));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyCheckerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyCheckerTest.java
index 5914251..25136ac 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyCheckerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyCheckerTest.java
@@ -41,8 +41,8 @@ public final class AbstractDataConsistencyCheckerTest {
public void assertCountCheck() {
ShardingScalingJob shardingScalingJob = mockShardingScalingJob();
DataConsistencyChecker dataConsistencyChecker = DataConsistencyCheckerFactory.newInstance("H2", shardingScalingJob);
- initTableData(shardingScalingJob.getSyncConfigs().get(0).getDumperConfig().getDataSourceConfig());
- initTableData(shardingScalingJob.getSyncConfigs().get(0).getImporterConfig().getDataSourceConfig());
+ initTableData(shardingScalingJob.getTaskConfigs().get(0).getDumperConfig().getDataSourceConfig());
+ initTableData(shardingScalingJob.getTaskConfigs().get(0).getImporterConfig().getDataSourceConfig());
Map<String, DataConsistencyCheckResult> resultMap = dataConsistencyChecker.countCheck();
assertTrue(resultMap.get("t1").isCountValid());
assertThat(resultMap.get("t1").getSourceCount(), is(resultMap.get("t1").getTargetCount()));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java
index f3d75a6..3b04c13 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.scaling.core.job.preparer.splitter;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
-import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.DataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
@@ -53,7 +53,7 @@ public final class InventoryDataTaskSplitterTest {
private static final String DATABASE_TYPE = "H2";
- private SyncConfiguration syncConfig;
+ private TaskConfiguration taskConfig;
private DataSourceManager dataSourceManager;
@@ -63,7 +63,7 @@ public final class InventoryDataTaskSplitterTest {
public void setUp() {
DumperConfiguration dumperConfig = mockDumperConfig();
ImporterConfiguration importerConfig = new ImporterConfiguration();
- syncConfig = new SyncConfiguration(new JobConfiguration(), dumperConfig, importerConfig);
+ taskConfig = new TaskConfiguration(new JobConfiguration(), dumperConfig, importerConfig);
dataSourceManager = new DataSourceManager();
inventoryDataTaskSplitter = new InventoryDataTaskSplitter();
}
@@ -75,9 +75,9 @@ public final class InventoryDataTaskSplitterTest {
@Test
public void assertSplitInventoryDataWithIntPrimary() throws SQLException {
- syncConfig.getJobConfig().setShardingSize(10);
- initIntPrimaryEnvironment(syncConfig.getDumperConfig());
- List<ScalingTask> actual = (List<ScalingTask>) inventoryDataTaskSplitter.splitInventoryData(DATABASE_TYPE, syncConfig, dataSourceManager);
+ taskConfig.getJobConfig().setShardingSize(10);
+ initIntPrimaryEnvironment(taskConfig.getDumperConfig());
+ List<ScalingTask> actual = (List<ScalingTask>) inventoryDataTaskSplitter.splitInventoryData(DATABASE_TYPE, taskConfig, dataSourceManager);
assertNotNull(actual);
assertThat(actual.size(), is(10));
assertThat(((PrimaryKeyPosition) actual.get(9).getPositionManager().getPosition()).getBeginValue(), is(91L));
@@ -86,24 +86,24 @@ public final class InventoryDataTaskSplitterTest {
@Test
public void assertSplitInventoryDataWithCharPrimary() throws SQLException {
- initCharPrimaryEnvironment(syncConfig.getDumperConfig());
- Collection<ScalingTask> actual = inventoryDataTaskSplitter.splitInventoryData(DATABASE_TYPE, syncConfig, dataSourceManager);
+ initCharPrimaryEnvironment(taskConfig.getDumperConfig());
+ Collection<ScalingTask> actual = inventoryDataTaskSplitter.splitInventoryData(DATABASE_TYPE, taskConfig, dataSourceManager);
assertNotNull(actual);
assertThat(actual.size(), is(1));
}
@Test
public void assertSplitInventoryDataWithUnionPrimary() throws SQLException {
- initUnionPrimaryEnvironment(syncConfig.getDumperConfig());
- Collection<ScalingTask> actual = inventoryDataTaskSplitter.splitInventoryData(DATABASE_TYPE, syncConfig, dataSourceManager);
+ initUnionPrimaryEnvironment(taskConfig.getDumperConfig());
+ Collection<ScalingTask> actual = inventoryDataTaskSplitter.splitInventoryData(DATABASE_TYPE, taskConfig, dataSourceManager);
assertNotNull(actual);
assertThat(actual.size(), is(1));
}
@Test
public void assertSplitInventoryDataWithoutPrimary() throws SQLException {
- initNoPrimaryEnvironment(syncConfig.getDumperConfig());
- Collection<ScalingTask> actual = inventoryDataTaskSplitter.splitInventoryData(DATABASE_TYPE, syncConfig, dataSourceManager);
+ initNoPrimaryEnvironment(taskConfig.getDumperConfig());
+ Collection<ScalingTask> actual = inventoryDataTaskSplitter.splitInventoryData(DATABASE_TYPE, taskConfig, dataSourceManager);
assertNotNull(actual);
assertThat(actual.size(), is(1));
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
index 60a4c7d..8e596c6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
@@ -25,7 +25,7 @@ import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCData
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.datasource.DataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
-import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
@@ -50,7 +50,7 @@ public final class InventoryDataScalingTaskTest {
private static final String PASSWORD = "password";
- private SyncConfiguration syncConfig;
+ private TaskConfiguration taskConfig;
private DataSourceManager dataSourceManager;
@@ -59,7 +59,7 @@ public final class InventoryDataScalingTaskTest {
DumperConfiguration dumperConfig = mockDumperConfig();
ImporterConfiguration importerConfig = mockImporterConfig();
ScalingContext.getInstance().init(new ServerConfiguration());
- syncConfig = new SyncConfiguration(new JobConfiguration(), dumperConfig, importerConfig);
+ taskConfig = new TaskConfiguration(new JobConfiguration(), dumperConfig, importerConfig);
dataSourceManager = new DataSourceManager();
}
@@ -70,19 +70,19 @@ public final class InventoryDataScalingTaskTest {
@Test(expected = SyncTaskExecuteException.class)
public void assertStartWithGetEstimatedRowsFailure() {
- InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(syncConfig.getDumperConfig());
+ InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(taskConfig.getDumperConfig());
inventoryDumperConfig.setTableName("t_non_exist");
- InventoryDataScalingTask inventoryDataSyncTask = new InventoryDataScalingTask(inventoryDumperConfig, syncConfig.getImporterConfig(), dataSourceManager);
+ InventoryDataScalingTask inventoryDataSyncTask = new InventoryDataScalingTask(inventoryDumperConfig, taskConfig.getImporterConfig(), dataSourceManager);
inventoryDataSyncTask.start();
}
@Test
public void assertGetProgress() throws SQLException {
- initTableData(syncConfig.getDumperConfig());
- InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(syncConfig.getDumperConfig());
+ initTableData(taskConfig.getDumperConfig());
+ InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(taskConfig.getDumperConfig());
inventoryDumperConfig.setTableName("t_order");
- inventoryDumperConfig.setPositionManager(syncConfig.getDumperConfig().getPositionManager());
- InventoryDataScalingTask inventoryDataSyncTask = new InventoryDataScalingTask(inventoryDumperConfig, syncConfig.getImporterConfig(), dataSourceManager);
+ inventoryDumperConfig.setPositionManager(taskConfig.getDumperConfig().getPositionManager());
+ InventoryDataScalingTask inventoryDataSyncTask = new InventoryDataScalingTask(inventoryDumperConfig, taskConfig.getImporterConfig(), dataSourceManager);
inventoryDataSyncTask.start();
assertFalse(((InventoryDataSyncTaskProgress) inventoryDataSyncTask.getProgress()).isFinished());
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLDataConsistencyChecker.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLDataConsistencyChecker.java
index bdaaf1a..fd67b35 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLDataConsistencyChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLDataConsistencyChecker.java
@@ -49,7 +49,7 @@ public final class MySQLDataConsistencyChecker extends AbstractDataConsistencyCh
@Override
public Map<String, Boolean> dataCheck() {
- return distinctByValue(getShardingScalingJob().getSyncConfigs()
+ return distinctByValue(getShardingScalingJob().getTaskConfigs()
.stream().flatMap(each -> each.getDumperConfig().getTableNameMap().entrySet().stream())
.collect(Collectors.toMap(Entry::getKey, Entry::getValue, (oldValue, currentValue) -> oldValue, LinkedHashMap::new)))
.entrySet().stream().collect(Collectors.toMap(Entry::getValue, entry -> dataValid(entry.getKey(), entry.getValue()), (oldValue, currentValue) -> oldValue, LinkedHashMap::new));