You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2020/12/25 09:00:30 UTC
[shardingsphere] branch master updated: Add
ScalingEnvironmentalManager (#8775)
This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1a98884 Add ScalingEnvironmentalManager (#8775)
1a98884 is described below
commit 1a988844c772a6f0c64180dd943828d9b99da001
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Fri Dec 25 17:00:02 2020 +0800
Add ScalingEnvironmentalManager (#8775)
* Add ScalingEnvironmentalManager
* Add unit test for StandaloneScalingJobService
* move function.
* add unit test for DistributedScalingJobService
Co-authored-by: qiulu3 <Lucas209910>
---
.../scaling/web/HttpServerHandler.java | 14 ++++++
.../sqlbuilder/AbstractScalingSQLBuilder.java | 5 ++
.../executor/sqlbuilder/ScalingSQLBuilder.java | 8 ++++
.../job/check/DataConsistencyCheckerFactory.java | 2 +-
.../environmental/ScalingEnvironmentalManager.java | 55 ++++++++++++++++++++++
.../resume/FileSystemResumeBreakPointManager.java | 2 +-
.../core/service/AbstractScalingJobService.java | 26 +++++-----
.../scaling/core/service/ScalingJobService.java | 6 ++-
.../service/impl/DistributedScalingJobService.java | 15 +++---
.../service/impl/StandaloneScalingJobService.java | 26 +++-------
.../impl/DistributedScalingJobServiceTest.java | 6 +++
.../impl/StandaloneScalingJobServiceTest.java | 55 ++++++++++++++++++----
12 files changed, 165 insertions(+), 55 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
index 27b7af3..5e6ba31 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
@@ -41,6 +41,7 @@ import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
import org.apache.shardingsphere.scaling.core.service.ScalingJobServiceFactory;
import org.apache.shardingsphere.scaling.util.ResponseContentUtil;
+import java.sql.SQLException;
import java.util.List;
import java.util.Optional;
@@ -80,6 +81,10 @@ public final class HttpServerHandler extends SimpleChannelInboundHandler<FullHtt
checkJob(context, requestPath);
return;
}
+ if (requestPath.contains("/scaling/job/reset/")) {
+ resetJob(context, requestPath);
+ return;
+ }
response(ResponseContentUtil.handleBadRequest("Not support request!"), context, HttpResponseStatus.BAD_REQUEST);
}
@@ -115,6 +120,15 @@ public final class HttpServerHandler extends SimpleChannelInboundHandler<FullHtt
}
}
+ private void resetJob(final ChannelHandlerContext context, final String requestPath) {
+ try {
+ SCALING_JOB_SERVICE.reset(getJobId(requestPath));
+ response(ResponseContentUtil.success(), context, HttpResponseStatus.OK);
+ } catch (final ScalingJobNotFoundException | SQLException ex) {
+ response(ResponseContentUtil.handleBadRequest(ex.getMessage()), context, HttpResponseStatus.BAD_REQUEST);
+ }
+ }
+
private long getJobId(final String requestPath) {
return Long.parseLong(requestPath.split("/")[4]);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/AbstractScalingSQLBuilder.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/AbstractScalingSQLBuilder.java
index 6f9edb742..8c4db1f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/AbstractScalingSQLBuilder.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/AbstractScalingSQLBuilder.java
@@ -124,6 +124,11 @@ public abstract class AbstractScalingSQLBuilder implements ScalingSQLBuilder {
return sqlCacheMap.get(sqlCacheKey);
}
+ @Override
+ public String buildTruncateSQL(final String tableName) {
+ return String.format("TRUNCATE TABLE %s", quote(tableName));
+ }
+
private String buildDeleteSQLInternal(final String tableName, final Collection<Column> conditionColumns) {
return String.format("DELETE FROM %s WHERE %s", quote(tableName), buildWhereSQL(conditionColumns));
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/ScalingSQLBuilder.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/ScalingSQLBuilder.java
index 09c98fa..4efe09b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/ScalingSQLBuilder.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/ScalingSQLBuilder.java
@@ -54,6 +54,14 @@ public interface ScalingSQLBuilder {
String buildDeleteSQL(DataRecord dataRecord, Collection<Column> conditionColumns);
/**
+ * Build truncate SQL.
+ *
+ * @param tableName table name
+ * @return truncate SQL
+ */
+ String buildTruncateSQL(String tableName);
+
+ /**
* Build count SQL.
*
* @param tableName table name
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckerFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckerFactory.java
index 81ec835..6788b53 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckerFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckerFactory.java
@@ -28,7 +28,7 @@ import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
public final class DataConsistencyCheckerFactory {
/**
- * create data consistency checker instance.
+ * Create data consistency checker instance.
*
* @param scalingJob scaling job
* @return data consistency checker
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environmental/ScalingEnvironmentalManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environmental/ScalingEnvironmentalManager.java
new file mode 100644
index 0000000..42bf74f
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environmental/ScalingEnvironmentalManager.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.environmental;
+
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceFactory;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceWrapper;
+import org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilderFactory;
+import org.apache.shardingsphere.scaling.core.job.ScalingJob;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Scaling environmental manager.
+ */
+public final class ScalingEnvironmentalManager {
+
+ private final DataSourceFactory dataSourceFactory = new DataSourceFactory();
+
+ /**
+ * Reset target table, Truncate target table.
+ *
+ * @param scalingJob scalingjob
+ * @throws SQLException SQL exception
+ */
+ public void resetTargetTable(final ScalingJob scalingJob) throws SQLException {
+ Set<String> tables = scalingJob.getTaskConfigs().stream().flatMap(each -> each.getDumperConfig().getTableNameMap().values().stream()).collect(Collectors.toSet());
+ try (DataSourceWrapper dataSource = dataSourceFactory.newInstance(scalingJob.getScalingConfig().getRuleConfiguration().getTarget().unwrap());
+ Connection connection = dataSource.getConnection()) {
+ for (String each : tables) {
+ try (PreparedStatement preparedStatement = connection.prepareStatement(ScalingSQLBuilderFactory.newInstance(scalingJob.getDatabaseType()).buildTruncateSQL(each))) {
+ preparedStatement.execute();
+ }
+ }
+ }
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/FileSystemResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/FileSystemResumeBreakPointManager.java
index 1daac29..284cbe2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/FileSystemResumeBreakPointManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/FileSystemResumeBreakPointManager.java
@@ -30,7 +30,7 @@ import java.nio.charset.StandardCharsets;
public final class FileSystemResumeBreakPointManager extends AbstractResumeBreakPointManager implements ResumeBreakPointManager {
public FileSystemResumeBreakPointManager(final String databaseType, final String taskPath) {
- super(databaseType, taskPath.startsWith("/") ? ".scaling/" + taskPath : taskPath);
+ super(databaseType, taskPath.startsWith("/") ? ".scaling" + taskPath : taskPath);
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
index 81c22c9..d876b59 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
@@ -28,8 +28,10 @@ import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckerFactory;
+import org.apache.shardingsphere.scaling.core.job.environmental.ScalingEnvironmentalManager;
import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
+import java.sql.SQLException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
@@ -55,25 +57,16 @@ public abstract class AbstractScalingJobService implements ScalingJobService {
private Optional<ScalingJob> start(final String sourceDataSource, final String sourceRule, final String targetDataSource, final String targetRule) {
ScalingConfiguration scalingConfig = new ScalingConfiguration();
- scalingConfig.setRuleConfiguration(
- new RuleConfiguration(new ShardingSphereJDBCDataSourceConfiguration(sourceDataSource, sourceRule), new ShardingSphereJDBCDataSourceConfiguration(targetDataSource, targetRule)));
+ scalingConfig.setRuleConfiguration(new RuleConfiguration(
+ new ShardingSphereJDBCDataSourceConfiguration(sourceDataSource, sourceRule),
+ new ShardingSphereJDBCDataSourceConfiguration(targetDataSource, targetRule)));
scalingConfig.setJobConfiguration(new JobConfiguration());
return start(scalingConfig);
}
@Override
- public void reset(final long jobId) {
- // TODO reset target tables.
- }
-
- /**
- * Do data consistency check.
- *
- * @param scalingJob scaling job
- * @return data consistency check result
- */
- protected Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final ScalingJob scalingJob) {
- DataConsistencyChecker dataConsistencyChecker = DataConsistencyCheckerFactory.newInstance(scalingJob);
+ public Map<String, DataConsistencyCheckResult> check(final long jobId) {
+ DataConsistencyChecker dataConsistencyChecker = DataConsistencyCheckerFactory.newInstance(getJob(jobId));
Map<String, DataConsistencyCheckResult> result = dataConsistencyChecker.countCheck();
if (result.values().stream().allMatch(DataConsistencyCheckResult::isCountValid)) {
Map<String, Boolean> dataCheckResult = dataConsistencyChecker.dataCheck();
@@ -82,6 +75,11 @@ public abstract class AbstractScalingJobService implements ScalingJobService {
return result;
}
+ @Override
+ public void reset(final long jobId) throws SQLException {
+ new ScalingEnvironmentalManager().resetTargetTable(getJob(jobId));
+ }
+
@RequiredArgsConstructor
private class JobFinishChecker implements Runnable {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
index 0639458..a488f46 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
+import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -59,7 +60,7 @@ public interface ScalingJobService {
Optional<ScalingJob> start(String sourceDataSource, String sourceRule, String targetDataSource, String targetRule, ScalingCallback scalingCallback);
/**
- * Stop a job.
+ * Stop job.
*
* @param jobId job id
*/
@@ -93,8 +94,9 @@ public interface ScalingJobService {
* Reset target tables.
*
* @param jobId job id
+ * @throws SQLException SQL exception
*/
- void reset(long jobId);
+ void reset(long jobId) throws SQLException;
/**
* remove job.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
index 069a044..07d07d9 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
@@ -17,16 +17,17 @@
package org.apache.shardingsphere.scaling.core.service.impl;
+import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
+import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundException;
import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.job.TaskProgress;
-import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.job.position.InventoryPositionGroup;
import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
@@ -37,7 +38,6 @@ import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
import org.apache.shardingsphere.scaling.core.utils.TaskConfigurationUtil;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -83,8 +83,12 @@ public final class DistributedScalingJobService extends AbstractScalingJobServic
@Override
public ScalingJob getJob(final long jobId) {
+ String data = REGISTRY_REPOSITORY.get(ScalingTaskUtil.getScalingListenerPath(jobId, ScalingConstant.CONFIG));
+ if (Strings.isNullOrEmpty(data)) {
+ throw new ScalingJobNotFoundException(String.format("Can't find scaling job id %s", jobId));
+ }
ScalingJob result = new ScalingJob(jobId);
- result.setScalingConfig(GSON.fromJson(REGISTRY_REPOSITORY.get(ScalingTaskUtil.getScalingListenerPath(jobId, ScalingConstant.CONFIG)), ScalingConfiguration.class));
+ result.setScalingConfig(GSON.fromJson(data, ScalingConfiguration.class));
return result;
}
@@ -117,11 +121,6 @@ public final class DistributedScalingJobService extends AbstractScalingJobServic
}
@Override
- public Map<String, DataConsistencyCheckResult> check(final long jobId) {
- return dataConsistencyCheck(getJob(jobId));
- }
-
- @Override
public void remove(final long jobId) {
REGISTRY_REPOSITORY.delete(ScalingTaskUtil.getScalingListenerPath(jobId));
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobService.java
index d70393d..9363a4a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobService.java
@@ -21,10 +21,9 @@ import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundException;
import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
-import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.job.preparer.ScalingJobPreparer;
-import org.apache.shardingsphere.scaling.core.schedule.ScalingTaskScheduler;
import org.apache.shardingsphere.scaling.core.schedule.JobStatus;
+import org.apache.shardingsphere.scaling.core.schedule.ScalingTaskScheduler;
import org.apache.shardingsphere.scaling.core.service.AbstractScalingJobService;
import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
@@ -65,25 +64,22 @@ public final class StandaloneScalingJobService extends AbstractScalingJobService
@Override
public void stop(final long jobId) {
- if (!scalingJobMap.containsKey(jobId)) {
- throw new ScalingJobNotFoundException(String.format("Can't find scaling job id %s", jobId));
- }
+ ScalingJob scalingJob = getJob(jobId);
scalingTaskSchedulerMap.get(jobId).stop();
- scalingJobMap.get(jobId).setStatus(JobStatus.STOPPED.name());
+ scalingJob.setStatus(JobStatus.STOPPED.name());
}
@Override
public ScalingJob getJob(final long jobId) {
+ if (!scalingJobMap.containsKey(jobId)) {
+ throw new ScalingJobNotFoundException(String.format("Can't find scaling job id %s", jobId));
+ }
return scalingJobMap.get(jobId);
}
@Override
public JobProgress getProgress(final long jobId) {
- if (!scalingJobMap.containsKey(jobId)) {
- throw new ScalingJobNotFoundException(String.format("Can't find scaling job id %s", jobId));
- }
- ScalingJob scalingJob = scalingJobMap.get(jobId);
- JobProgress result = new JobProgress(jobId, scalingJob.getStatus());
+ JobProgress result = new JobProgress(jobId, getJob(jobId).getStatus());
if (scalingTaskSchedulerMap.containsKey(jobId)) {
result.getInventoryTaskProgress().put("0", scalingTaskSchedulerMap.get(jobId).getInventoryTaskProgress());
result.getIncrementalTaskProgress().put("0", scalingTaskSchedulerMap.get(jobId).getIncrementalTaskProgress());
@@ -92,14 +88,6 @@ public final class StandaloneScalingJobService extends AbstractScalingJobService
}
@Override
- public Map<String, DataConsistencyCheckResult> check(final long jobId) {
- if (!scalingJobMap.containsKey(jobId)) {
- throw new ScalingJobNotFoundException(String.format("Can't find scaling job id %s", jobId));
- }
- return dataConsistencyCheck(scalingJobMap.get(jobId));
- }
-
- @Override
public void remove(final long jobId) {
stop(jobId);
scalingJobMap.remove(jobId);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
index 62258f6..c886d3b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
@@ -26,6 +26,7 @@ import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
+import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundException;
import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
@@ -102,6 +103,11 @@ public final class DistributedScalingJobServiceTest {
assertTrue(registryRepository.get(ScalingTaskUtil.getScalingListenerPath(scalingJob.get().getJobId(), ScalingConstant.CONFIG)).contains("\"running\":false"));
}
+ @Test(expected = ScalingJobNotFoundException.class)
+ public void assertGetNotExistJob() {
+ scalingJobService.getJob(0);
+ }
+
@Test
public void assertGetProgress() {
registryRepository.persist(ScalingTaskUtil.getScalingListenerPath("1/config"), "{'ruleConfiguration':{'source':{},'target':{}},'jobConfiguration':{'running':true}}");
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
index 6b93618..f5e5c7c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
@@ -21,6 +21,8 @@ import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
+import org.apache.shardingsphere.scaling.core.config.datasource.DataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundException;
import org.apache.shardingsphere.scaling.core.execute.engine.TaskExecuteEngine;
import org.apache.shardingsphere.scaling.core.fixture.FixtureResumeBreakPointManager;
@@ -41,7 +43,12 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import javax.sql.DataSource;
import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.Map;
import java.util.Optional;
@@ -81,10 +88,15 @@ public final class StandaloneScalingJobServiceTest {
assertThat(progress.getInventoryTaskProgress().size(), is(1));
}
+ @Test(expected = ScalingJobNotFoundException.class)
+ public void assertGetNotExistJob() {
+ scalingJobService.getJob(0);
+ }
+
@Test
@SuppressWarnings("unchecked")
@SneakyThrows(ReflectiveOperationException.class)
- public void assertStopExistJob() {
+ public void assertStopJob() {
Map<Long, ScalingJob> scalingJobMap = ReflectionUtil.getFieldValue(scalingJobService, "scalingJobMap", Map.class);
Map<Long, ScalingTaskScheduler> scalingTaskSchedulerMap = ReflectionUtil.getFieldValue(scalingJobService, "scalingTaskSchedulerMap", Map.class);
assertNotNull(scalingJobMap);
@@ -96,11 +108,6 @@ public final class StandaloneScalingJobServiceTest {
verify(scalingJob).setStatus(JobStatus.STOPPED.name());
}
- @Test(expected = ScalingJobNotFoundException.class)
- public void assertStopNotExistJob() {
- scalingJobService.stop(0);
- }
-
@Test
public void assertListJobs() {
assertThat(scalingJobService.listJobs().size(), is(0));
@@ -109,7 +116,7 @@ public final class StandaloneScalingJobServiceTest {
}
@Test
- public void assertCheckExistJob() {
+ public void assertCheckJob() {
Optional<ScalingJob> scalingJobOptional = scalingJobService.start(mockScalingConfiguration());
assertTrue(scalingJobOptional.isPresent());
ScalingJob scalingJob = scalingJobOptional.get();
@@ -119,9 +126,17 @@ public final class StandaloneScalingJobServiceTest {
assertTrue(checkResult.isEmpty());
}
- @Test(expected = ScalingJobNotFoundException.class)
- public void assertCheckNotExistJob() {
- scalingJobService.check(0);
+ @Test
+ @SneakyThrows(SQLException.class)
+ public void assertResetJob() {
+ Optional<ScalingJob> scalingJobOptional = scalingJobService.start(mockScalingConfiguration());
+ assertTrue(scalingJobOptional.isPresent());
+ ScalingJob scalingJob = scalingJobOptional.get();
+ DataSourceConfiguration dataSourceConfig = scalingJob.getTaskConfigs().get(0).getImporterConfig().getDataSourceConfig();
+ initTableData(dataSourceConfig);
+ assertThat(countTableData(dataSourceConfig), is(2L));
+ scalingJobService.reset(scalingJob.getJobId());
+ assertThat(countTableData(dataSourceConfig), is(0L));
}
@SneakyThrows(IOException.class)
@@ -129,6 +144,26 @@ public final class StandaloneScalingJobServiceTest {
return ScalingConfigurationUtil.initConfig("/config.json");
}
+ private void initTableData(final DataSourceConfiguration dataSourceConfig) throws SQLException {
+ DataSource dataSource = new DataSourceManager().getDataSource(dataSourceConfig);
+ try (Connection connection = dataSource.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("DROP TABLE IF EXISTS `t1`");
+ statement.execute("CREATE TABLE `t1` (id INT PRIMARY KEY, user_id VARCHAR(12))");
+ statement.execute("INSERT INTO `t1` (id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
+ }
+ }
+
+ private long countTableData(final DataSourceConfiguration dataSourceConfig) throws SQLException {
+ DataSource dataSource = new DataSourceManager().getDataSource(dataSourceConfig);
+ try (Connection connection = dataSource.getConnection();
+ Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery("SELECT COUNT(*) FROM `t1`");
+ resultSet.next();
+ return resultSet.getLong(1);
+ }
+ }
+
@After
@SneakyThrows(ReflectiveOperationException.class)
public void tearDown() {