You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2021/09/15 12:54:32 UTC
[shardingsphere] branch master updated: Improve scaling job status
showing after job finished (#12431)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8a9dbfe Improve scaling job status showing after job finished (#12431)
8a9dbfe is described below
commit 8a9dbfead73a215f27f825c2dd863a4ed0b0939a
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Wed Sep 15 20:53:49 2021 +0800
Improve scaling job status showing after job finished (#12431)
* Persist FINISHED status when scaling task finished
* Clean stopping status
---
.../scaling/core/job/FinishedCheckJob.java | 9 ++++-
.../shardingsphere/scaling/core/job/JobStatus.java | 9 +----
.../scaling/core/job/schedule/JobScheduler.java | 4 --
.../core/job/schedule/JobSchedulerCenter.java | 45 +++++++++++++++-------
4 files changed, 42 insertions(+), 25 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
index fd5cb73..962808f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
@@ -34,8 +34,10 @@ import org.apache.shardingsphere.scaling.core.job.check.consistency.DataConsiste
import org.apache.shardingsphere.scaling.core.job.schedule.JobSchedulerCenter;
import org.apache.shardingsphere.scaling.core.util.ScalingTaskUtil;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
@Slf4j
public final class FinishedCheckJob implements SimpleJob {
@@ -75,10 +77,15 @@ public final class FinishedCheckJob implements SimpleJob {
} else {
log.info("dataConsistencyCheckAlgorithm is not configured, data consistency check will be ignored.");
}
- JobSchedulerCenter.getJobContext(jobId).ifPresent(jobContext -> jobContext.setStatus(JobStatus.ALMOST_FINISHED));
+ Optional<Collection<JobContext>> optionalJobContexts = JobSchedulerCenter.getJobContexts(jobId);
+ optionalJobContexts.ifPresent(jobContexts -> jobContexts.forEach(each -> each.setStatus(JobStatus.ALMOST_FINISHED)));
ScalingDataSourceConfigurationWrap targetConfig = jobConfig.getRuleConfig().getTarget();
ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(targetConfig.getSchemaName(), targetConfig.getParameter());
ShardingSphereEventBus.getInstance().post(taskFinishedEvent);
+ optionalJobContexts.ifPresent(jobContexts -> jobContexts.forEach(each -> {
+ each.setStatus(JobStatus.FINISHED);
+ JobSchedulerCenter.persistJobProgress(each);
+ }));
scalingAPI.stop(jobId);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobStatus.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobStatus.java
index 68d7bab..2c17ef9 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobStatus.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobStatus.java
@@ -53,14 +53,9 @@ public enum JobStatus {
ALMOST_FINISHED(true),
/**
- * Job is stopping.
+ * Job is finished.
*/
- STOPPING(true),
-
- /**
- * Task has stopped.
- */
- STOPPED(false),
+ FINISHED(false),
/**
* Task has stopped by failing to prepare work.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobScheduler.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobScheduler.java
index 376214338..000e137 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobScheduler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobScheduler.java
@@ -51,9 +51,6 @@ public final class JobScheduler implements Runnable {
public void stop() {
log.info("stop scaling job {}", jobContext.getJobId());
final boolean almostFinished = jobContext.getStatus() == JobStatus.ALMOST_FINISHED;
- if (jobContext.getStatus().isRunning()) {
- jobContext.setStatus(JobStatus.STOPPING);
- }
for (ScalingTask each : jobContext.getInventoryTasks()) {
log.info("stop inventory task {} - {}", jobContext.getJobId(), each.getTaskId());
each.stop();
@@ -129,7 +126,6 @@ public final class JobScheduler implements Runnable {
@Override
public void onSuccess() {
- jobContext.setStatus(JobStatus.STOPPED);
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobSchedulerCenter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobSchedulerCenter.java
index 4f2dd25..893e4fe 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobSchedulerCenter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobSchedulerCenter.java
@@ -26,12 +26,14 @@ import org.apache.shardingsphere.scaling.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
import org.apache.shardingsphere.scaling.core.job.JobContext;
+import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
/**
* Job scheduler center.
@@ -40,7 +42,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public final class JobSchedulerCenter {
- private static final Map<Long, JobScheduler> JOB_SCHEDULER_MAP = Maps.newConcurrentMap();
+ private static final Map<Long, Map<Integer, JobScheduler>> JOB_SCHEDULER_MAP = Maps.newConcurrentMap();
private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-persist-%d"));
@@ -56,13 +58,15 @@ public final class JobSchedulerCenter {
* @param jobContext job context
*/
public static void start(final JobContext jobContext) {
- Long key = jobContext.getJobId();
- if (JOB_SCHEDULER_MAP.containsKey(key)) {
+ long jobId = jobContext.getJobId();
+ Map<Integer, JobScheduler> schedulerMap = JOB_SCHEDULER_MAP.computeIfAbsent(jobId, key -> Maps.newConcurrentMap());
+ int shardingItem = jobContext.getShardingItem();
+ if (schedulerMap.containsKey(shardingItem)) {
return;
}
JobScheduler jobScheduler = new JobScheduler(jobContext);
jobScheduler.start();
- JOB_SCHEDULER_MAP.put(key, jobScheduler);
+ schedulerMap.put(shardingItem, jobScheduler);
}
/**
@@ -71,30 +75,45 @@ public final class JobSchedulerCenter {
* @param jobId job id
*/
public static void stop(final long jobId) {
- JobScheduler jobScheduler = JOB_SCHEDULER_MAP.remove(jobId);
- if (null != jobScheduler) {
- jobScheduler.stop();
+ Map<Integer, JobScheduler> schedulerMap = JOB_SCHEDULER_MAP.remove(jobId);
+ if (null == schedulerMap) {
+ return;
+ }
+ for (Entry<Integer, JobScheduler> entry : schedulerMap.entrySet()) {
+ entry.getValue().stop();
}
}
/**
- * Get job context.
+ * Get job contexts.
*
* @param jobId job id
* @return job context
*/
- public static Optional<JobContext> getJobContext(final long jobId) {
- JobScheduler jobScheduler = JOB_SCHEDULER_MAP.get(jobId);
- return Optional.ofNullable(null != jobScheduler ? jobScheduler.getJobContext() : null);
+ public static Optional<Collection<JobContext>> getJobContexts(final long jobId) {
+ Map<Integer, JobScheduler> schedulerMap = JOB_SCHEDULER_MAP.get(jobId);
+ if (null == schedulerMap) {
+ return Optional.empty();
+ }
+ return Optional.of(schedulerMap.values().stream().map(JobScheduler::getJobContext).collect(Collectors.toList()));
+ }
+
+ /**
+ * Persist job progress.
+ *
+ * @param jobContext job context
+ */
+ public static void persistJobProgress(final JobContext jobContext) {
+ REGISTRY_REPOSITORY_API.persistJobProgress(jobContext);
}
private static final class PersistJobContextRunnable implements Runnable {
@Override
public void run() {
- for (Entry<Long, JobScheduler> entry : JOB_SCHEDULER_MAP.entrySet()) {
+ for (Entry<Long, Map<Integer, JobScheduler>> entry : JOB_SCHEDULER_MAP.entrySet()) {
try {
- REGISTRY_REPOSITORY_API.persistJobProgress(entry.getValue().getJobContext());
+ entry.getValue().forEach((shardingItem, jobScheduler) -> REGISTRY_REPOSITORY_API.persistJobProgress(jobScheduler.getJobContext()));
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON