You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/01/14 08:30:00 UTC
[shardingsphere] branch master updated: add log for scaling (#9026)
This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new bd5f210 add log for scaling (#9026)
bd5f210 is described below
commit bd5f21080a80b6b620cd7c7c872107a9aabead4a
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Thu Jan 14 16:29:37 2021 +0800
add log for scaling (#9026)
* add log for ScalingJobService
* add scaling dependency into shardingsphere-proxy-bootstrap
Co-authored-by: qiulu3 <Lucas209910>
---
.../shardingsphere-proxy-bootstrap/pom.xml | 15 +++++++++++++++
.../shardingsphere/scaling/core/job/ScalingJob.java | 3 +++
.../core/job/check/DataConsistencyCheckResult.java | 2 ++
.../scaling/core/job/preparer/ScalingJobPreparer.java | 1 +
.../scaling/core/schedule/ScalingTaskScheduler.java | 4 ++++
.../scaling/core/service/AbstractScalingJobService.java | 7 +++++++
.../scaling/elasticjob/ElasticJobScalingWorker.java | 5 +++--
.../scaling/elasticjob/job/ScalingElasticJob.java | 2 +-
8 files changed, 36 insertions(+), 3 deletions(-)
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/pom.xml b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/pom.xml
index 5d317e6..498ab35 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/pom.xml
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/pom.xml
@@ -130,6 +130,21 @@
<version>${project.parent.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-scaling-mysql</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-scaling-postgresql</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-scaling-elasticjob</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<scope>compile</scope>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
index 880bc41..ce5656e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
@@ -21,6 +21,7 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
+import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import org.apache.shardingsphere.scaling.core.schedule.JobStatus;
import org.apache.shardingsphere.scaling.core.utils.TaskConfigurationUtil;
@@ -55,6 +56,8 @@ public final class ScalingJob {
private String status = JobStatus.RUNNING.name();
+ private transient ResumeBreakPointManager resumeBreakPointManager;
+
public ScalingJob() {
this(generateKey());
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckResult.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckResult.java
index 2ef81a8..40d293e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckResult.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckResult.java
@@ -19,12 +19,14 @@ package org.apache.shardingsphere.scaling.core.job.check;
import lombok.Getter;
import lombok.Setter;
+import lombok.ToString;
/**
* Data consistency check result.
*/
@Getter
@Setter
+@ToString
public final class DataConsistencyCheckResult {
private final long sourceCount;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
index 943d799..f8a369f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
@@ -61,6 +61,7 @@ public final class ScalingJobPreparer {
try (DataSourceManager dataSourceManager = new DataSourceManager(scalingJob.getTaskConfigs())) {
checkSourceDataSources(scalingJob, dataSourceManager);
ResumeBreakPointManager resumeBreakPointManager = getResumeBreakPointManager(scalingJob);
+ scalingJob.setResumeBreakPointManager(resumeBreakPointManager);
if (resumeBreakPointManager.isResumable()) {
scalingPositionResumer.resumePosition(scalingJob, dataSourceManager, resumeBreakPointManager);
} else {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
index 5932d96..125717d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
@@ -50,15 +50,19 @@ public final class ScalingTaskScheduler implements Runnable {
* Stop all scaling task.
*/
public void stop() {
+ log.info("stop scaling job {}", scalingJob.getJobId());
if (JobStatus.valueOf(scalingJob.getStatus()).isRunning()) {
scalingJob.setStatus(JobStatus.STOPPING.name());
}
for (ScalingTask each : scalingJob.getInventoryTasks()) {
+ log.info("stop inventory task {} - {}", scalingJob.getJobId(), each.getTaskId());
each.stop();
}
for (ScalingTask each : scalingJob.getIncrementalTasks()) {
+ log.info("stop incremental task {} - {}", scalingJob.getJobId(), each.getTaskId());
each.stop();
}
+ scalingJob.getResumeBreakPointManager().close();
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
index a0e34d8..40f5644 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
@@ -49,6 +49,11 @@ public abstract class AbstractScalingJobService implements ScalingJobService {
@Override
public Optional<ScalingJob> start(final String sourceDataSource, final String sourceRule, final String targetDataSource, final String targetRule, final ScalingCallback scalingCallback) {
+ log.info("start scaling job...");
+ log.info("sourceDataSource = {}", sourceDataSource);
+ log.info("sourceRule = {}", sourceRule);
+ log.info("targetDataSource = {}", targetDataSource);
+ log.info("targetRule = {}", targetRule);
Optional<ScalingJob> result = start(sourceDataSource, sourceRule, targetDataSource, targetRule);
if (!result.isPresent()) {
return result;
@@ -68,12 +73,14 @@ public abstract class AbstractScalingJobService implements ScalingJobService {
@Override
public Map<String, DataConsistencyCheckResult> check(final long jobId) {
+ log.info("scaling job {} start data consistency check.", jobId);
DataConsistencyChecker dataConsistencyChecker = DataConsistencyCheckerFactory.newInstance(getJob(jobId));
Map<String, DataConsistencyCheckResult> result = dataConsistencyChecker.countCheck();
if (result.values().stream().allMatch(DataConsistencyCheckResult::isCountValid)) {
Map<String, Boolean> dataCheckResult = dataConsistencyChecker.dataCheck();
result.forEach((key, value) -> value.setDataValid(dataCheckResult.getOrDefault(key, false)));
}
+ log.info("scaling job {} data consistency checker result {}", jobId, result);
return result;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java
index fa4b52f..3abd77f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java
@@ -146,7 +146,8 @@ public final class ElasticJobScalingWorker implements ScalingWorker {
}
if (new LeaderService(registryCenter, jobId).isLeader()) {
log.info("leader worker update config.");
- JobAPIFactory.createJobConfigurationAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(), governanceConfig.getName(), null)
+ JobAPIFactory.createJobConfigurationAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
+ governanceConfig.getName() + ScalingConstant.SCALING_ELASTIC_JOB_PATH, null)
.updateJobConfiguration(JobConfigurationPOJO.fromJobConfiguration(createJobConfig(jobId, scalingConfig)));
}
jobBootstrapWrapper.setRunning(scalingConfig.getJobConfiguration().isRunning());
@@ -154,7 +155,7 @@ public final class ElasticJobScalingWorker implements ScalingWorker {
}
private JobConfiguration createJobConfig(final String jobId, final ScalingConfiguration scalingConfig) {
- return JobConfiguration.newBuilder(jobId, scalingConfig.getJobConfiguration().getShardingTables().length).jobParameter(GSON.toJson(scalingConfig)).build();
+ return JobConfiguration.newBuilder(jobId, scalingConfig.getJobConfiguration().getShardingTables().length).jobParameter(GSON.toJson(scalingConfig)).overwrite(true).build();
}
private void deleteJob(final String jobId, final ScalingConfiguration scalingConfig) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java
index 8b43029..79b9421 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java
@@ -58,8 +58,8 @@ public final class ScalingElasticJob implements SimpleJob {
}
private void stopJob(final ShardingContext shardingContext) {
- log.info("stop job: {} - {}", shardingContext.getJobName(), shardingContext.getShardingItem());
if (null != scalingJob) {
+ log.info("stop job: {} - {}", shardingContext.getJobName(), shardingContext.getShardingItem());
SCALING_JOB_SERVICE.stop(scalingJob.getJobId());
scalingJob = null;
}