You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/01/14 08:30:00 UTC

[shardingsphere] branch master updated: add log for scaling (#9026)

This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new bd5f210  add log for scaling (#9026)
bd5f210 is described below

commit bd5f21080a80b6b620cd7c7c872107a9aabead4a
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Thu Jan 14 16:29:37 2021 +0800

    add log for scaling (#9026)
    
    * add log for ScalingJobService
    
    * add scaling dependency into shardingsphere-proxy-bootstrap
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../shardingsphere-proxy-bootstrap/pom.xml                | 15 +++++++++++++++
 .../shardingsphere/scaling/core/job/ScalingJob.java       |  3 +++
 .../core/job/check/DataConsistencyCheckResult.java        |  2 ++
 .../scaling/core/job/preparer/ScalingJobPreparer.java     |  1 +
 .../scaling/core/schedule/ScalingTaskScheduler.java       |  4 ++++
 .../scaling/core/service/AbstractScalingJobService.java   |  7 +++++++
 .../scaling/elasticjob/ElasticJobScalingWorker.java       |  5 +++--
 .../scaling/elasticjob/job/ScalingElasticJob.java         |  2 +-
 8 files changed, 36 insertions(+), 3 deletions(-)

diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/pom.xml b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/pom.xml
index 5d317e6..498ab35 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/pom.xml
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/pom.xml
@@ -130,6 +130,21 @@
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-scaling-mysql</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-scaling-postgresql</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-scaling-elasticjob</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
             <groupId>com.zaxxer</groupId>
             <artifactId>HikariCP</artifactId>
             <scope>compile</scope>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
index 880bc41..ce5656e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
@@ -21,6 +21,7 @@ import lombok.Getter;
 import lombok.Setter;
 import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
 import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
+import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
 import org.apache.shardingsphere.scaling.core.schedule.JobStatus;
 import org.apache.shardingsphere.scaling.core.utils.TaskConfigurationUtil;
@@ -55,6 +56,8 @@ public final class ScalingJob {
     
     private String status = JobStatus.RUNNING.name();
     
+    private transient ResumeBreakPointManager resumeBreakPointManager;
+    
     public ScalingJob() {
         this(generateKey());
     }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckResult.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckResult.java
index 2ef81a8..40d293e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckResult.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckResult.java
@@ -19,12 +19,14 @@ package org.apache.shardingsphere.scaling.core.job.check;
 
 import lombok.Getter;
 import lombok.Setter;
+import lombok.ToString;
 
 /**
  * Data consistency check result.
  */
 @Getter
 @Setter
+@ToString
 public final class DataConsistencyCheckResult {
     
     private final long sourceCount;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
index 943d799..f8a369f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
@@ -61,6 +61,7 @@ public final class ScalingJobPreparer {
         try (DataSourceManager dataSourceManager = new DataSourceManager(scalingJob.getTaskConfigs())) {
             checkSourceDataSources(scalingJob, dataSourceManager);
             ResumeBreakPointManager resumeBreakPointManager = getResumeBreakPointManager(scalingJob);
+            scalingJob.setResumeBreakPointManager(resumeBreakPointManager);
             if (resumeBreakPointManager.isResumable()) {
                 scalingPositionResumer.resumePosition(scalingJob, dataSourceManager, resumeBreakPointManager);
             } else {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
index 5932d96..125717d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
@@ -50,15 +50,19 @@ public final class ScalingTaskScheduler implements Runnable {
      * Stop all scaling task.
      */
     public void stop() {
+        log.info("stop scaling job {}", scalingJob.getJobId());
         if (JobStatus.valueOf(scalingJob.getStatus()).isRunning()) {
             scalingJob.setStatus(JobStatus.STOPPING.name());
         }
         for (ScalingTask each : scalingJob.getInventoryTasks()) {
+            log.info("stop inventory task {} - {}", scalingJob.getJobId(), each.getTaskId());
             each.stop();
         }
         for (ScalingTask each : scalingJob.getIncrementalTasks()) {
+            log.info("stop incremental task {} - {}", scalingJob.getJobId(), each.getTaskId());
             each.stop();
         }
+        scalingJob.getResumeBreakPointManager().close();
     }
     
     @Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
index a0e34d8..40f5644 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
@@ -49,6 +49,11 @@ public abstract class AbstractScalingJobService implements ScalingJobService {
     
     @Override
     public Optional<ScalingJob> start(final String sourceDataSource, final String sourceRule, final String targetDataSource, final String targetRule, final ScalingCallback scalingCallback) {
+        log.info("start scaling job...");
+        log.info("sourceDataSource = {}", sourceDataSource);
+        log.info("sourceRule = {}", sourceRule);
+        log.info("targetDataSource = {}", targetDataSource);
+        log.info("targetRule = {}", targetRule);
         Optional<ScalingJob> result = start(sourceDataSource, sourceRule, targetDataSource, targetRule);
         if (!result.isPresent()) {
             return result;
@@ -68,12 +73,14 @@ public abstract class AbstractScalingJobService implements ScalingJobService {
     
     @Override
     public Map<String, DataConsistencyCheckResult> check(final long jobId) {
+        log.info("scaling job {} start data consistency check.", jobId);
         DataConsistencyChecker dataConsistencyChecker = DataConsistencyCheckerFactory.newInstance(getJob(jobId));
         Map<String, DataConsistencyCheckResult> result = dataConsistencyChecker.countCheck();
         if (result.values().stream().allMatch(DataConsistencyCheckResult::isCountValid)) {
             Map<String, Boolean> dataCheckResult = dataConsistencyChecker.dataCheck();
             result.forEach((key, value) -> value.setDataValid(dataCheckResult.getOrDefault(key, false)));
         }
+        log.info("scaling job {} data consistency checker result {}", jobId, result);
         return result;
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java
index fa4b52f..3abd77f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java
@@ -146,7 +146,8 @@ public final class ElasticJobScalingWorker implements ScalingWorker {
         }
         if (new LeaderService(registryCenter, jobId).isLeader()) {
             log.info("leader worker update config.");
-            JobAPIFactory.createJobConfigurationAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(), governanceConfig.getName(), null)
+            JobAPIFactory.createJobConfigurationAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
+                    governanceConfig.getName() + ScalingConstant.SCALING_ELASTIC_JOB_PATH, null)
                     .updateJobConfiguration(JobConfigurationPOJO.fromJobConfiguration(createJobConfig(jobId, scalingConfig)));
         }
         jobBootstrapWrapper.setRunning(scalingConfig.getJobConfiguration().isRunning());
@@ -154,7 +155,7 @@ public final class ElasticJobScalingWorker implements ScalingWorker {
     }
     
     private JobConfiguration createJobConfig(final String jobId, final ScalingConfiguration scalingConfig) {
-        return JobConfiguration.newBuilder(jobId, scalingConfig.getJobConfiguration().getShardingTables().length).jobParameter(GSON.toJson(scalingConfig)).build();
+        return JobConfiguration.newBuilder(jobId, scalingConfig.getJobConfiguration().getShardingTables().length).jobParameter(GSON.toJson(scalingConfig)).overwrite(true).build();
     }
     
     private void deleteJob(final String jobId, final ScalingConfiguration scalingConfig) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java
index 8b43029..79b9421 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java
@@ -58,8 +58,8 @@ public final class ScalingElasticJob implements SimpleJob {
     }
     
     private void stopJob(final ShardingContext shardingContext) {
-        log.info("stop job: {} - {}", shardingContext.getJobName(), shardingContext.getShardingItem());
         if (null != scalingJob) {
+            log.info("stop job: {} - {}", shardingContext.getJobName(), shardingContext.getShardingItem());
             SCALING_JOB_SERVICE.stop(scalingJob.getJobId());
             scalingJob = null;
         }