You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/12/14 11:27:09 UTC
[shardingsphere] branch master updated: lazy initialization
DataConsistencyChecker (#8612)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b4fc6ae lazy initialization DataConsistencyChecker (#8612)
b4fc6ae is described below
commit b4fc6ae3b6663d918401708b3f86e7c5e4d86146
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Mon Dec 14 19:26:41 2020 +0800
lazy initialization DataConsistencyChecker (#8612)
* delay init DataConsistencyChecker.
* Optimize name and javadoc.
Co-authored-by: qiulu3 <Lucas209910>
---
.../scaling/web/HttpServerHandler.java | 6 +--
.../core/execute/engine/TaskExecuteEngine.java | 14 +++----
.../scaling/core/job/ScalingJob.java | 6 +--
.../job/check/DataConsistencyCheckerFactory.java | 5 +--
.../core/job/preparer/ScalingJobPreparer.java | 28 +++++--------
.../core/service/AbstractScalingJobService.java | 4 +-
.../scaling/core/service/ScalingJobService.java | 6 +--
.../core/execute/engine/TaskExecuteEngineTest.java | 6 +--
.../check/AbstractDataConsistencyCheckerTest.java | 6 +--
.../resumer/ScalingPositionResumerTest.java | 4 +-
.../impl/DistributedScalingJobServiceTest.java | 33 ++++-----------
.../impl/StandaloneScalingJobServiceTest.java | 47 ++++++++--------------
.../core/util/ScalingConfigurationUtil.java | 2 +-
13 files changed, 63 insertions(+), 104 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
index 4e58df8..27b7af3 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
@@ -84,9 +84,9 @@ public final class HttpServerHandler extends SimpleChannelInboundHandler<FullHtt
}
private void startJob(final ChannelHandlerContext context, final String requestBody) {
- Optional<ScalingJob> shardingScalingJob = SCALING_JOB_SERVICE.start(GSON.fromJson(requestBody, ScalingConfiguration.class));
- Preconditions.checkState(shardingScalingJob.isPresent());
- response(ResponseContentUtil.build(shardingScalingJob.get()), context, HttpResponseStatus.OK);
+ Optional<ScalingJob> scalingJob = SCALING_JOB_SERVICE.start(GSON.fromJson(requestBody, ScalingConfiguration.class));
+ Preconditions.checkState(scalingJob.isPresent());
+ response(ResponseContentUtil.build(scalingJob.get()), context, HttpResponseStatus.OK);
}
private void listJobs(final ChannelHandlerContext context) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/TaskExecuteEngine.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/TaskExecuteEngine.java
index 922224e..1ccc068 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/TaskExecuteEngine.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/TaskExecuteEngine.java
@@ -64,7 +64,7 @@ public final class TaskExecuteEngine {
}
/**
- * Submit a {@code ShardingScalingExecutor} without callback to execute.
+ * Submit a {@code ScalingExecutor} without callback to execute.
*
* @param scalingExecutor scaling executor
* @return execute future
@@ -74,7 +74,7 @@ public final class TaskExecuteEngine {
}
/**
- * Submit a {@code ShardingScalingExecutor} with callback {@code ExecuteCallback} to execute.
+ * Submit a {@code ScalingExecutor} with callback {@code ExecuteCallback} to execute.
*
* @param scalingExecutor scaling executor
* @param executeCallback execute callback
@@ -87,15 +87,15 @@ public final class TaskExecuteEngine {
}
/**
- * Submit a collection of {@code ShardingScalingExecutor} with callback {@code ExecuteCallback} to execute.
+ * Submit a collection of {@code ScalingExecutor} with callback {@code ExecuteCallback} to execute.
*
- * @param shardingScalingExecutors scaling executor
+ * @param scalingExecutors scaling executor
* @param executeCallback execute callback
* @return execute future of all
*/
- public Future<?> submitAll(final Collection<? extends ScalingExecutor> shardingScalingExecutors, final ExecuteCallback executeCallback) {
- Collection<ListenableFuture<?>> listenableFutures = new ArrayList<>(shardingScalingExecutors.size());
- for (ScalingExecutor each : shardingScalingExecutors) {
+ public Future<?> submitAll(final Collection<? extends ScalingExecutor> scalingExecutors, final ExecuteCallback executeCallback) {
+ Collection<ListenableFuture<?>> listenableFutures = new ArrayList<>(scalingExecutors.size());
+ for (ScalingExecutor each : scalingExecutors) {
ListenableFuture<?> listenableFuture = executorService.submit(each);
listenableFutures.add(listenableFuture);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
index f6d2eed..0ec7f83 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
@@ -21,7 +21,6 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
-import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import org.apache.shardingsphere.scaling.core.schedule.JobStatus;
import org.apache.shardingsphere.scaling.core.utils.TaskConfigurationUtil;
@@ -44,6 +43,8 @@ public final class ScalingJob {
private int shardingItem;
+ private String databaseType;
+
private final transient List<TaskConfiguration> taskConfigs = new LinkedList<>();
private final transient List<ScalingTask> inventoryTasks = new LinkedList<>();
@@ -52,8 +53,6 @@ public final class ScalingJob {
private transient ScalingConfiguration scalingConfig;
- private transient DataConsistencyChecker dataConsistencyChecker;
-
private String status = JobStatus.RUNNING.name();
public ScalingJob() {
@@ -69,6 +68,7 @@ public final class ScalingJob {
this.scalingConfig = scalingConfig;
shardingItem = scalingConfig.getJobConfiguration().getShardingItem();
taskConfigs.addAll(TaskConfigurationUtil.toTaskConfigs(scalingConfig));
+ databaseType = taskConfigs.get(0).getDumperConfig().getDataSourceConfig().getDatabaseType().getName();
}
private static SnowflakeKeyGenerateAlgorithm initIdAutoIncreaseGenerator() {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckerFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckerFactory.java
index 78e8304..81ec835 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckerFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckerFactory.java
@@ -30,13 +30,12 @@ public final class DataConsistencyCheckerFactory {
/**
* create data consistency checker instance.
*
- * @param databaseType database type
* @param scalingJob scaling job
* @return data consistency checker
*/
@SneakyThrows(ReflectiveOperationException.class)
- public static DataConsistencyChecker newInstance(final String databaseType, final ScalingJob scalingJob) {
- ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
+ public static DataConsistencyChecker newInstance(final ScalingJob scalingJob) {
+ ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(scalingJob.getDatabaseType());
return scalingEntry.getDataConsistencyCheckerClass().getConstructor(ScalingJob.class).newInstance(scalingJob);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
index 80d9eba..3a53d8f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
@@ -24,8 +24,6 @@ import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
-import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
-import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckerFactory;
import org.apache.shardingsphere.scaling.core.job.position.PositionManagerFactory;
import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManagerFactory;
@@ -60,26 +58,24 @@ public final class ScalingJobPreparer {
* @param scalingJob scaling job
*/
public void prepare(final ScalingJob scalingJob) {
- String databaseType = scalingJob.getTaskConfigs().get(0).getDumperConfig().getDataSourceConfig().getDatabaseType().getName();
try (DataSourceManager dataSourceManager = new DataSourceManager(scalingJob.getTaskConfigs())) {
- checkDataSources(databaseType, dataSourceManager);
- ResumeBreakPointManager resumeBreakPointManager = getResumeBreakPointManager(databaseType, scalingJob);
+ checkDataSources(scalingJob.getDatabaseType(), dataSourceManager);
+ ResumeBreakPointManager resumeBreakPointManager = getResumeBreakPointManager(scalingJob);
if (resumeBreakPointManager.isResumable()) {
scalingPositionResumer.resumePosition(scalingJob, dataSourceManager, resumeBreakPointManager);
} else {
- initIncrementalTasks(databaseType, scalingJob, dataSourceManager);
- initInventoryTasks(databaseType, scalingJob, dataSourceManager);
+ initIncrementalTasks(scalingJob, dataSourceManager);
+ initInventoryTasks(scalingJob, dataSourceManager);
scalingPositionResumer.persistPosition(scalingJob, resumeBreakPointManager);
}
- scalingJob.setDataConsistencyChecker(initDataConsistencyChecker(databaseType, scalingJob));
} catch (final PrepareFailedException ex) {
log.error("Preparing scaling job {} failed", scalingJob.getJobId(), ex);
scalingJob.setStatus(JobStatus.PREPARING_FAILURE.name());
}
}
- private ResumeBreakPointManager getResumeBreakPointManager(final String databaseType, final ScalingJob scalingJob) {
- return ResumeBreakPointManagerFactory.newInstance(databaseType,
+ private ResumeBreakPointManager getResumeBreakPointManager(final ScalingJob scalingJob) {
+ return ResumeBreakPointManagerFactory.newInstance(scalingJob.getDatabaseType(),
ScalingTaskUtil.getScalingListenerPath(scalingJob.getJobId(), ScalingConstant.POSITION, scalingJob.getShardingItem()));
}
@@ -90,23 +86,19 @@ public final class ScalingJobPreparer {
dataSourceChecker.checkVariable(dataSourceManager.getSourceDataSources().values());
}
- private void initInventoryTasks(final String databaseType, final ScalingJob scalingJob, final DataSourceManager dataSourceManager) {
+ private void initInventoryTasks(final ScalingJob scalingJob, final DataSourceManager dataSourceManager) {
List<ScalingTask> allInventoryTasks = new LinkedList<>();
for (TaskConfiguration each : scalingJob.getTaskConfigs()) {
- allInventoryTasks.addAll(inventoryTaskSplitter.splitInventoryData(databaseType, each, dataSourceManager));
+ allInventoryTasks.addAll(inventoryTaskSplitter.splitInventoryData(scalingJob.getDatabaseType(), each, dataSourceManager));
}
scalingJob.getInventoryTasks().addAll(allInventoryTasks);
}
- private void initIncrementalTasks(final String databaseType, final ScalingJob scalingJob, final DataSourceManager dataSourceManager) {
+ private void initIncrementalTasks(final ScalingJob scalingJob, final DataSourceManager dataSourceManager) {
for (TaskConfiguration each : scalingJob.getTaskConfigs()) {
DataSourceConfiguration dataSourceConfig = each.getDumperConfig().getDataSourceConfig();
- each.getDumperConfig().setPositionManager(PositionManagerFactory.newInstance(databaseType, dataSourceManager.getDataSource(dataSourceConfig)));
+ each.getDumperConfig().setPositionManager(PositionManagerFactory.newInstance(scalingJob.getDatabaseType(), dataSourceManager.getDataSource(dataSourceConfig)));
scalingJob.getIncrementalTasks().add(scalingTaskFactory.createIncrementalTask(each.getJobConfig().getConcurrency(), each.getDumperConfig(), each.getImporterConfig()));
}
}
-
- private DataConsistencyChecker initDataConsistencyChecker(final String databaseType, final ScalingJob scalingJob) {
- return DataConsistencyCheckerFactory.newInstance(databaseType, scalingJob);
- }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
index 93f65f7..81c22c9 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
+import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckerFactory;
import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
import java.util.Map;
@@ -46,7 +47,6 @@ public abstract class AbstractScalingJobService implements ScalingJobService {
public Optional<ScalingJob> start(final String sourceDataSource, final String sourceRule, final String targetDataSource, final String targetRule, final ScalingCallback scalingCallback) {
Optional<ScalingJob> result = start(sourceDataSource, sourceRule, targetDataSource, targetRule);
if (!result.isPresent()) {
- scalingCallback.onSuccess();
return result;
}
FINISH_CHECK_EXECUTOR.scheduleWithFixedDelay(new JobFinishChecker(result.get(), scalingCallback), 3, 1, TimeUnit.MINUTES);
@@ -73,7 +73,7 @@ public abstract class AbstractScalingJobService implements ScalingJobService {
* @return data consistency check result
*/
protected Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final ScalingJob scalingJob) {
- DataConsistencyChecker dataConsistencyChecker = scalingJob.getDataConsistencyChecker();
+ DataConsistencyChecker dataConsistencyChecker = DataConsistencyCheckerFactory.newInstance(scalingJob);
Map<String, DataConsistencyCheckResult> result = dataConsistencyChecker.countCheck();
if (result.values().stream().allMatch(DataConsistencyCheckResult::isCountValid)) {
Map<String, Boolean> dataCheckResult = dataConsistencyChecker.dataCheck();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
index 87aaa3a..0639458 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
@@ -32,7 +32,7 @@ import java.util.Optional;
public interface ScalingJobService {
/**
- * Get {@code ShardingScalingJob} list.
+ * Get {@code ScalingJob} list.
*
* @return scaling job service list
*/
@@ -66,10 +66,10 @@ public interface ScalingJobService {
void stop(long jobId);
/**
- * Get {@code ShardingScalingJob} by id.
+ * Get {@code ScalingJob} by id.
*
* @param jobId job id
- * @return {@code ShardingScalingJob} instance
+ * @return {@code ScalingJob} instance
*/
ScalingJob getJob(long jobId);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/TaskExecuteEngineTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/TaskExecuteEngineTest.java
index da891f7..056ec58 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/TaskExecuteEngineTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/TaskExecuteEngineTest.java
@@ -34,7 +34,7 @@ public final class TaskExecuteEngineTest {
TaskExecuteEngine executeEngine = TaskExecuteEngine.newFixedThreadInstance(2);
try {
for (int i = 0; i < 5; i++) {
- Future<?> submit = executeEngine.submit(mockShardingScalingExecutor());
+ Future<?> submit = executeEngine.submit(mockScalingExecutor());
assertFalse(submit.isCancelled());
}
} catch (final RejectedExecutionException ex) {
@@ -47,7 +47,7 @@ public final class TaskExecuteEngineTest {
TaskExecuteEngine executeEngine = TaskExecuteEngine.newFixedThreadInstance(2);
try {
for (int i = 0; i < 5; i++) {
- Future<?> submit = executeEngine.submitAll(Collections.singletonList(mockShardingScalingExecutor()), mockExecuteCallback());
+ Future<?> submit = executeEngine.submitAll(Collections.singletonList(mockScalingExecutor()), mockExecuteCallback());
assertFalse(submit.isCancelled());
}
} catch (final RejectedExecutionException ex) {
@@ -69,7 +69,7 @@ public final class TaskExecuteEngineTest {
};
}
- private ScalingExecutor mockShardingScalingExecutor() {
+ private ScalingExecutor mockScalingExecutor() {
return new ScalingExecutor() {
@Override
public void run() {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyCheckerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyCheckerTest.java
index 3fe4084..8593a1a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyCheckerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyCheckerTest.java
@@ -39,8 +39,8 @@ public final class AbstractDataConsistencyCheckerTest {
@Test
public void assertCountCheck() {
- ScalingJob scalingJob = mockShardingScalingJob();
- DataConsistencyChecker dataConsistencyChecker = DataConsistencyCheckerFactory.newInstance("H2", scalingJob);
+ ScalingJob scalingJob = mockScalingJob();
+ DataConsistencyChecker dataConsistencyChecker = DataConsistencyCheckerFactory.newInstance(scalingJob);
initTableData(scalingJob.getTaskConfigs().get(0).getDumperConfig().getDataSourceConfig());
initTableData(scalingJob.getTaskConfigs().get(0).getImporterConfig().getDataSourceConfig());
Map<String, DataConsistencyCheckResult> resultMap = dataConsistencyChecker.countCheck();
@@ -60,7 +60,7 @@ public final class AbstractDataConsistencyCheckerTest {
}
@SneakyThrows(IOException.class)
- private ScalingJob mockShardingScalingJob() {
+ private ScalingJob mockScalingJob() {
return ScalingConfigurationUtil.initJob("/config.json");
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/ScalingPositionResumerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/ScalingPositionResumerTest.java
index 1b0b9d3..60ee900 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/ScalingPositionResumerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/ScalingPositionResumerTest.java
@@ -48,7 +48,7 @@ public final class ScalingPositionResumerTest {
@Before
public void setUp() {
ScalingContext.getInstance().init(new ServerConfiguration());
- scalingJob = mockShardingScalingJob();
+ scalingJob = mockScalingJob();
scalingPositionResumer = new ScalingPositionResumer();
}
@@ -71,7 +71,7 @@ public final class ScalingPositionResumerTest {
}
@SneakyThrows(IOException.class)
- private ScalingJob mockShardingScalingJob() {
+ private ScalingJob mockScalingJob() {
return ScalingConfigurationUtil.initJob("/config.json");
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
index 345fe89..62258f6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
@@ -42,7 +42,6 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Optional;
-import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
@@ -73,19 +72,17 @@ public final class DistributedScalingJobServiceTest {
@Test
public void assertStartWithScalingConfig() {
- Optional<ScalingJob> shardingScalingJob = scalingJobService.start(mockScalingConfiguration());
- assertTrue(shardingScalingJob.isPresent());
- assertTrue(registryRepository.get(ScalingTaskUtil.getScalingListenerPath(shardingScalingJob.get().getJobId(), ScalingConstant.CONFIG)).contains("\"running\":true"));
+ Optional<ScalingJob> scalingJob = scalingJobService.start(mockScalingConfiguration());
+ assertTrue(scalingJob.isPresent());
+ assertTrue(registryRepository.get(ScalingTaskUtil.getScalingListenerPath(scalingJob.get().getJobId(), ScalingConstant.CONFIG)).contains("\"running\":true"));
}
@Test
public void assertStartWithCallbackImmediately() {
ScalingConfiguration scalingConfig = mockScalingConfiguration();
ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getSource().unwrap();
- AtomicBoolean successCallback = new AtomicBoolean();
- Optional<ScalingJob> scalingJob = scalingJobService.start(source.getDataSource(), source.getRule(), source.getDataSource(), source.getRule(), mockScalingCallback(successCallback));
+ Optional<ScalingJob> scalingJob = scalingJobService.start(source.getDataSource(), source.getRule(), source.getDataSource(), source.getRule(), mockScalingCallback());
assertFalse(scalingJob.isPresent());
- assertTrue(successCallback.get());
}
@Test
@@ -99,10 +96,10 @@ public final class DistributedScalingJobServiceTest {
@Test
public void assertStop() {
- Optional<ScalingJob> shardingScalingJob = scalingJobService.start(mockScalingConfiguration());
- assertTrue(shardingScalingJob.isPresent());
- scalingJobService.stop(shardingScalingJob.get().getJobId());
- assertTrue(registryRepository.get(ScalingTaskUtil.getScalingListenerPath(shardingScalingJob.get().getJobId(), ScalingConstant.CONFIG)).contains("\"running\":false"));
+ Optional<ScalingJob> scalingJob = scalingJobService.start(mockScalingConfiguration());
+ assertTrue(scalingJob.isPresent());
+ scalingJobService.stop(scalingJob.get().getJobId());
+ assertTrue(registryRepository.get(ScalingTaskUtil.getScalingListenerPath(scalingJob.get().getJobId(), ScalingConstant.CONFIG)).contains("\"running\":false"));
}
@Test
@@ -165,20 +162,6 @@ public final class DistributedScalingJobServiceTest {
};
}
- private ScalingCallback mockScalingCallback(final AtomicBoolean successCallback) {
- return new ScalingCallback() {
-
- @Override
- public void onSuccess() {
- successCallback.set(true);
- }
-
- @Override
- public void onFailure() {
- }
- };
- }
-
@SneakyThrows(ReflectiveOperationException.class)
private void resetRegistryRepositoryAvailable() {
ReflectionUtil.setStaticFieldValue(RegistryRepositoryHolder.class, "available", null);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
index df60556..cbdbd41 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.scaling.core.service.impl;
-import com.google.common.collect.Maps;
import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
@@ -27,7 +26,6 @@ import org.apache.shardingsphere.scaling.core.execute.engine.TaskExecuteEngine;
import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
-import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.job.position.resume.FakeResumeBreakPointManager;
import org.apache.shardingsphere.scaling.core.job.position.resume.IncrementalPositionResumeBreakPointManager;
import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManagerFactory;
@@ -60,9 +58,9 @@ public final class StandaloneScalingJobServiceTest {
@Test
public void assertStartJob() {
- Optional<ScalingJob> shardingScalingJob = scalingJobService.start(mockScalingConfiguration());
- assertTrue(shardingScalingJob.isPresent());
- long jobId = shardingScalingJob.get().getJobId();
+ Optional<ScalingJob> scalingJob = scalingJobService.start(mockScalingConfiguration());
+ assertTrue(scalingJob.isPresent());
+ long jobId = scalingJob.get().getJobId();
JobProgress progress = scalingJobService.getProgress(jobId);
assertThat(progress.getIncrementalTaskProgress().size(), is(1));
assertThat(progress.getInventoryTaskProgress().size(), is(1));
@@ -70,9 +68,9 @@ public final class StandaloneScalingJobServiceTest {
@Test
public void assertStopExistJob() {
- Optional<ScalingJob> shardingScalingJob = scalingJobService.start(mockScalingConfiguration());
- assertTrue(shardingScalingJob.isPresent());
- long jobId = shardingScalingJob.get().getJobId();
+ Optional<ScalingJob> scalingJob = scalingJobService.start(mockScalingConfiguration());
+ assertTrue(scalingJob.isPresent());
+ long jobId = scalingJob.get().getJobId();
scalingJobService.stop(jobId);
JobProgress progress = scalingJobService.getProgress(jobId);
assertThat(progress.getStatus(), is(JobStatus.STOPPED.name()));
@@ -94,9 +92,9 @@ public final class StandaloneScalingJobServiceTest {
@Test
public void assertIncrementalTasksOnly() throws NoSuchFieldException, IllegalAccessException {
ReflectionUtil.setStaticFieldValue(ResumeBreakPointManagerFactory.class, "clazz", IncrementalPositionResumeBreakPointManager.class);
- Optional<ScalingJob> shardingScalingJob = scalingJobService.start(mockScalingConfiguration());
- assertTrue(shardingScalingJob.isPresent());
- long jobId = shardingScalingJob.get().getJobId();
+ Optional<ScalingJob> scalingJob = scalingJobService.start(mockScalingConfiguration());
+ assertTrue(scalingJob.isPresent());
+ long jobId = scalingJob.get().getJobId();
JobProgress progress = scalingJobService.getProgress(jobId);
assertThat(progress.getIncrementalTaskProgress().size(), is(1));
assertThat(progress.getInventoryTaskProgress().size(), is(1));
@@ -105,26 +103,13 @@ public final class StandaloneScalingJobServiceTest {
@Test
public void assertCheckExistJob() {
- Optional<ScalingJob> shardingScalingJob = scalingJobService.start(mockScalingConfiguration());
- assertTrue(shardingScalingJob.isPresent());
- shardingScalingJob.get().setDataConsistencyChecker(new DataConsistencyChecker() {
- @Override
- public Map<String, DataConsistencyCheckResult> countCheck() {
- Map<String, DataConsistencyCheckResult> result = Maps.newHashMapWithExpectedSize(1);
- result.put("t1", new DataConsistencyCheckResult(1, 1));
- return result;
- }
-
- @Override
- public Map<String, Boolean> dataCheck() {
- Map<String, Boolean> result = Maps.newHashMapWithExpectedSize(1);
- result.put("t1", true);
- return result;
- }
- });
- Map<String, DataConsistencyCheckResult> checkResult = scalingJobService.check(shardingScalingJob.get().getJobId());
- assertTrue(checkResult.get("t1").isCountValid());
- assertTrue(checkResult.get("t1").isDataValid());
+ Optional<ScalingJob> scalingJobOptional = scalingJobService.start(mockScalingConfiguration());
+ assertTrue(scalingJobOptional.isPresent());
+ ScalingJob scalingJob = scalingJobOptional.get();
+ scalingJob.setDatabaseType("H2");
+ scalingJob.getTaskConfigs().clear();
+ Map<String, DataConsistencyCheckResult> checkResult = scalingJobService.check(scalingJob.getJobId());
+ assertTrue(checkResult.isEmpty());
}
@Test(expected = ScalingJobNotFoundException.class)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/ScalingConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/ScalingConfigurationUtil.java
index 6b0d13c..6d36e1c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/ScalingConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/ScalingConfigurationUtil.java
@@ -53,7 +53,7 @@ public final class ScalingConfigurationUtil {
* Init job from config file.
*
* @param configFile config file
- * @return ShardingScalingJob
+ * @return scaling job
* @throws IOException IO exception
*/
public static ScalingJob initJob(final String configFile) throws IOException {