You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2021/09/15 12:54:32 UTC

[shardingsphere] branch master updated: Improve scaling job status showing after job finished (#12431)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a9dbfe  Improve scaling job status showing after job finished (#12431)
8a9dbfe is described below

commit 8a9dbfead73a215f27f825c2dd863a4ed0b0939a
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Wed Sep 15 20:53:49 2021 +0800

    Improve scaling job status showing after job finished (#12431)
    
    * Persist FINISHED status when scaling task finished
    
    * Clean stopping status
---
 .../scaling/core/job/FinishedCheckJob.java         |  9 ++++-
 .../shardingsphere/scaling/core/job/JobStatus.java |  9 +----
 .../scaling/core/job/schedule/JobScheduler.java    |  4 --
 .../core/job/schedule/JobSchedulerCenter.java      | 45 +++++++++++++++-------
 4 files changed, 42 insertions(+), 25 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
index fd5cb73..962808f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
@@ -34,8 +34,10 @@ import org.apache.shardingsphere.scaling.core.job.check.consistency.DataConsiste
 import org.apache.shardingsphere.scaling.core.job.schedule.JobSchedulerCenter;
 import org.apache.shardingsphere.scaling.core.util.ScalingTaskUtil;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 @Slf4j
 public final class FinishedCheckJob implements SimpleJob {
@@ -75,10 +77,15 @@ public final class FinishedCheckJob implements SimpleJob {
         } else {
             log.info("dataConsistencyCheckAlgorithm is not configured, data consistency check will be ignored.");
         }
-        JobSchedulerCenter.getJobContext(jobId).ifPresent(jobContext -> jobContext.setStatus(JobStatus.ALMOST_FINISHED));
+        Optional<Collection<JobContext>> optionalJobContexts = JobSchedulerCenter.getJobContexts(jobId);
+        optionalJobContexts.ifPresent(jobContexts -> jobContexts.forEach(each -> each.setStatus(JobStatus.ALMOST_FINISHED)));
         ScalingDataSourceConfigurationWrap targetConfig = jobConfig.getRuleConfig().getTarget();
         ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(targetConfig.getSchemaName(), targetConfig.getParameter());
         ShardingSphereEventBus.getInstance().post(taskFinishedEvent);
+        optionalJobContexts.ifPresent(jobContexts -> jobContexts.forEach(each -> {
+            each.setStatus(JobStatus.FINISHED);
+            JobSchedulerCenter.persistJobProgress(each);
+        }));
         scalingAPI.stop(jobId);
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobStatus.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobStatus.java
index 68d7bab..2c17ef9 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobStatus.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobStatus.java
@@ -53,14 +53,9 @@ public enum JobStatus {
     ALMOST_FINISHED(true),
     
     /**
-     * Job is stopping.
+     * Job is finished.
      */
-    STOPPING(true),
-    
-    /**
-     * Task has stopped.
-     */
-    STOPPED(false),
+    FINISHED(false),
     
     /**
      * Task has stopped by failing to prepare work.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobScheduler.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobScheduler.java
index 376214338..000e137 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobScheduler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobScheduler.java
@@ -51,9 +51,6 @@ public final class JobScheduler implements Runnable {
     public void stop() {
         log.info("stop scaling job {}", jobContext.getJobId());
         final boolean almostFinished = jobContext.getStatus() == JobStatus.ALMOST_FINISHED;
-        if (jobContext.getStatus().isRunning()) {
-            jobContext.setStatus(JobStatus.STOPPING);
-        }
         for (ScalingTask each : jobContext.getInventoryTasks()) {
             log.info("stop inventory task {} - {}", jobContext.getJobId(), each.getTaskId());
             each.stop();
@@ -129,7 +126,6 @@ public final class JobScheduler implements Runnable {
             
             @Override
             public void onSuccess() {
-                jobContext.setStatus(JobStatus.STOPPED);
             }
             
             @Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobSchedulerCenter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobSchedulerCenter.java
index 4f2dd25..893e4fe 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobSchedulerCenter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobSchedulerCenter.java
@@ -26,12 +26,14 @@ import org.apache.shardingsphere.scaling.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
 import org.apache.shardingsphere.scaling.core.job.JobContext;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * Job scheduler center.
@@ -40,7 +42,7 @@ import java.util.concurrent.TimeUnit;
 @Slf4j
 public final class JobSchedulerCenter {
     
-    private static final Map<Long, JobScheduler> JOB_SCHEDULER_MAP = Maps.newConcurrentMap();
+    private static final Map<Long, Map<Integer, JobScheduler>> JOB_SCHEDULER_MAP = Maps.newConcurrentMap();
     
     private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-persist-%d"));
     
@@ -56,13 +58,15 @@ public final class JobSchedulerCenter {
      * @param jobContext job context
      */
     public static void start(final JobContext jobContext) {
-        Long key = jobContext.getJobId();
-        if (JOB_SCHEDULER_MAP.containsKey(key)) {
+        long jobId = jobContext.getJobId();
+        Map<Integer, JobScheduler> schedulerMap = JOB_SCHEDULER_MAP.computeIfAbsent(jobId, key -> Maps.newConcurrentMap());
+        int shardingItem = jobContext.getShardingItem();
+        if (schedulerMap.containsKey(shardingItem)) {
             return;
         }
         JobScheduler jobScheduler = new JobScheduler(jobContext);
         jobScheduler.start();
-        JOB_SCHEDULER_MAP.put(key, jobScheduler);
+        schedulerMap.put(shardingItem, jobScheduler);
     }
     
     /**
@@ -71,30 +75,45 @@ public final class JobSchedulerCenter {
      * @param jobId job id
      */
     public static void stop(final long jobId) {
-        JobScheduler jobScheduler = JOB_SCHEDULER_MAP.remove(jobId);
-        if (null != jobScheduler) {
-            jobScheduler.stop();
+        Map<Integer, JobScheduler> schedulerMap = JOB_SCHEDULER_MAP.remove(jobId);
+        if (null == schedulerMap) {
+            return;
+        }
+        for (Entry<Integer, JobScheduler> entry : schedulerMap.entrySet()) {
+            entry.getValue().stop();
         }
     }
     
     /**
-     * Get job context.
+     * Get job contexts.
      *
      * @param jobId job id
      * @return job context
      */
-    public static Optional<JobContext> getJobContext(final long jobId) {
-        JobScheduler jobScheduler = JOB_SCHEDULER_MAP.get(jobId);
-        return Optional.ofNullable(null != jobScheduler ? jobScheduler.getJobContext() : null);
+    public static Optional<Collection<JobContext>> getJobContexts(final long jobId) {
+        Map<Integer, JobScheduler> schedulerMap = JOB_SCHEDULER_MAP.get(jobId);
+        if (null == schedulerMap) {
+            return Optional.empty();
+        }
+        return Optional.of(schedulerMap.values().stream().map(JobScheduler::getJobContext).collect(Collectors.toList()));
+    }
+    
+    /**
+     * Persist job progress.
+     *
+     * @param jobContext job context
+     */
+    public static void persistJobProgress(final JobContext jobContext) {
+        REGISTRY_REPOSITORY_API.persistJobProgress(jobContext);
     }
     
     private static final class PersistJobContextRunnable implements Runnable {
         
         @Override
         public void run() {
-            for (Entry<Long, JobScheduler> entry : JOB_SCHEDULER_MAP.entrySet()) {
+            for (Entry<Long, Map<Integer, JobScheduler>> entry : JOB_SCHEDULER_MAP.entrySet()) {
                 try {
-                    REGISTRY_REPOSITORY_API.persistJobProgress(entry.getValue().getJobContext());
+                    entry.getValue().forEach((shardingItem, jobScheduler) -> REGISTRY_REPOSITORY_API.persistJobProgress(jobScheduler.getJobContext()));
                     // CHECKSTYLE:OFF
                 } catch (final Exception ex) {
                     // CHECKSTYLE:ON