You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2021/01/16 16:04:40 UTC
[shardingsphere] branch master updated: Move
governance-core/scaling into scaling-core (#9055)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5be9b1a Move governance-core/scaling into scaling-core (#9055)
5be9b1a is described below
commit 5be9b1a8e5ed8aac9aa15c94bde987d16902b93d
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Sun Jan 17 00:04:10 2021 +0800
Move governance-core/scaling into scaling-core (#9055)
* move governance-core/scaling into scaling-core
Co-authored-by: qiulu3 <Lucas209910>
---
.../shardingsphere-governance-core/pom.xml | 5 --
.../model/rule/RuleConfigurationsAlteredEvent.java | 14 ++--
.../scaling/callback/ScalingResultCallback.java | 62 ----------------
.../impl/GovernanceBootstrapInitializer.java | 5 +-
.../scaling/ScalingWorkerBootstrap.java | 9 +--
.../shardingsphere-scaling-core/pom.xml | 5 ++
.../scaling/core/config/JobConfiguration.java | 2 +-
.../WorkflowConfiguration.java} | 30 ++++----
.../scaling/core/constant/ScalingConstant.java | 10 +++
.../core/service/AbstractScalingJobService.java | 69 +++---------------
.../scaling/core/service/ScalingJobService.java | 9 +--
.../scaling/core/spi/ScalingWorker.java | 7 --
.../scaling/core/spi/ScalingWorkerLoader.java | 8 +--
.../scaling/core/utils/ScalingTaskUtil.java | 2 +-
.../core/workflow}/ScalingServiceHolder.java | 57 ++++++---------
.../impl/DistributedScalingJobServiceTest.java | 22 ++----
.../scaling/core/spi/ScalingWorkerLoaderTest.java | 52 --------------
...e.shardingsphere.scaling.core.spi.ScalingWorker | 18 -----
.../elasticjob/ElasticJobScalingWorker.java | 7 +-
.../scaling/elasticjob/FinishedCheckWorker.java} | 26 +++++--
.../scaling/elasticjob/job/FinishedCheckJob.java | 84 ++++++++++++++++++++++
...e.shardingsphere.scaling.core.spi.ScalingWorker | 1 +
22 files changed, 190 insertions(+), 314 deletions(-)
diff --git a/shardingsphere-governance/shardingsphere-governance-core/pom.xml b/shardingsphere-governance/shardingsphere-governance-core/pom.xml
index f59c449..465500e 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/pom.xml
+++ b/shardingsphere-governance/shardingsphere-governance-core/pom.xml
@@ -58,11 +58,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.shardingsphere</groupId>
- <artifactId>shardingsphere-scaling-core</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
</dependency>
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/event/model/rule/RuleConfigurationsAlteredEvent.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/event/model/rule/RuleConfigurationsAlteredEvent.java
index abc668b..95ac061 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/event/model/rule/RuleConfigurationsAlteredEvent.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/event/model/rule/RuleConfigurationsAlteredEvent.java
@@ -29,11 +29,17 @@ public final class RuleConfigurationsAlteredEvent {
private final String schemaName;
- private final String yamlDataSourceContent;
+ private final String sourceDataSource;
- private final String yamlRuleConfigurationsContent;
+ private final String sourceRule;
- private final String cachedYamlRuleConfigurationsContent;
+ private final String targetDataSource;
- private final String ruleConfigurationCacheId;
+ private final String targetRule;
+
+ private final String ruleCacheId;
+
+ public RuleConfigurationsAlteredEvent(final String schemaName, final String sourceDataSource, final String sourceRule, final String targetRule, final String ruleCacheId) {
+ this(schemaName, sourceDataSource, sourceRule, sourceDataSource, targetRule, ruleCacheId);
+ }
}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/callback/ScalingResultCallback.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/callback/ScalingResultCallback.java
deleted file mode 100644
index 6669979..0000000
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/callback/ScalingResultCallback.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.governance.core.scaling.callback;
-
-import org.apache.shardingsphere.governance.core.event.model.rule.SwitchRuleConfigurationEvent;
-import org.apache.shardingsphere.governance.core.scaling.ScalingServiceHolder;
-import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import org.apache.shardingsphere.infra.lock.LockContext;
-import org.apache.shardingsphere.scaling.core.service.ScalingCallback;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Scaling result callback.
- */
-public final class ScalingResultCallback implements ScalingCallback {
-
- private final String schemaName;
-
- private final String ruleConfigurationCacheId;
-
- public ScalingResultCallback(final String schemaName, final String ruleConfigurationCacheId) {
- this.schemaName = schemaName;
- this.ruleConfigurationCacheId = ruleConfigurationCacheId;
- }
-
- @Override
- public void onSuccess(final long jobId) {
- if (LockContext.getLockStrategy().tryLock(30L, TimeUnit.SECONDS) && LockContext.getLockStrategy().checkLock()) {
- try {
- Thread.sleep(30000L);
- if (ScalingServiceHolder.getInstance().checkScalingResult(jobId)) {
- ScalingServiceHolder.getInstance().stopScalingJob(jobId);
- ShardingSphereEventBus.getInstance().post(new SwitchRuleConfigurationEvent(schemaName, ruleConfigurationCacheId));
- }
- } catch (final InterruptedException ignored) {
- } finally {
- LockContext.getLockStrategy().releaseLock();
- }
- }
- }
-
- @Override
- public void onFailure(final long jobId) {
- // TODO
- }
-}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializer.java b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializer.java
index 6e46151..b7410ab 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializer.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializer.java
@@ -21,7 +21,6 @@ import org.apache.shardingsphere.governance.context.metadata.GovernanceMetaDataC
import org.apache.shardingsphere.governance.context.transaction.GovernanceTransactionContexts;
import org.apache.shardingsphere.governance.core.facade.GovernanceFacade;
import org.apache.shardingsphere.governance.core.lock.strategy.GovernanceLockStrategy;
-import org.apache.shardingsphere.governance.core.scaling.ScalingServiceHolder;
import org.apache.shardingsphere.governance.core.yaml.swapper.GovernanceConfigurationYamlSwapper;
import org.apache.shardingsphere.infra.auth.builtin.DefaultAuthentication;
import org.apache.shardingsphere.infra.auth.builtin.yaml.config.YamlAuthenticationConfiguration;
@@ -41,8 +40,8 @@ import org.apache.shardingsphere.proxy.config.yaml.YamlProxyRuleConfiguration;
import org.apache.shardingsphere.proxy.config.yaml.YamlProxyServerConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
-import org.apache.shardingsphere.scaling.core.service.impl.DistributedScalingJobService;
import org.apache.shardingsphere.scaling.core.spi.ScalingWorkerLoader;
+import org.apache.shardingsphere.scaling.core.workflow.ScalingServiceHolder;
import org.apache.shardingsphere.transaction.context.TransactionContexts;
import java.util.Collection;
@@ -145,7 +144,7 @@ public final class GovernanceBootstrapInitializer extends AbstractBootstrapIniti
serverConfiguration.setDistributedScalingService(new GovernanceConfigurationYamlSwapper().swapToObject(yamlConfig.getServerConfiguration().getGovernance()));
ScalingContext.getInstance().init(serverConfiguration);
ScalingWorkerLoader.initScalingWorker();
- ScalingServiceHolder.getInstance().init(new DistributedScalingJobService());
+ ScalingServiceHolder.getInstance().init();
}
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/ScalingWorkerBootstrap.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/ScalingWorkerBootstrap.java
index 17571c4..3f98bc0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/ScalingWorkerBootstrap.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/ScalingWorkerBootstrap.java
@@ -23,8 +23,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.spi.ScalingWorkerLoader;
import org.apache.shardingsphere.scaling.util.ServerConfigurationInitializer;
-import java.util.Optional;
-
/**
* Bootstrap of ShardingSphere-Scaling worker.
@@ -43,12 +41,7 @@ public final class ScalingWorkerBootstrap {
// CHECKSTYLE:ON
log.info("ShardingSphere-Scaling Worker Startup");
ServerConfigurationInitializer.init();
- Optional<String> type = ScalingWorkerLoader.initScalingWorker();
- if (!type.isPresent()) {
- log.error("None worker found.");
- return;
- }
- log.info("Worker type: {}", type.get());
+ ScalingWorkerLoader.initScalingWorker();
wait0();
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
index c18b682..f0a68ad 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
@@ -41,6 +41,11 @@
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-governance-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-governance-repository-api</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
index f8f8158..e685d50 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
@@ -41,5 +41,5 @@ public final class JobConfiguration {
private boolean running = true;
- private long allowDelay = 60 * 1000L;
+ private WorkflowConfiguration workflowConfig;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingCallback.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/WorkflowConfiguration.java
similarity index 69%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingCallback.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/WorkflowConfiguration.java
index ee94de0..47f1e5d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingCallback.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/WorkflowConfiguration.java
@@ -15,24 +15,20 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.service;
+package org.apache.shardingsphere.scaling.core.config;
-/**
- * Scaling callback.
- */
-public interface ScalingCallback {
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+@RequiredArgsConstructor
+public final class WorkflowConfiguration {
+
+ private long allowDelay = 60 * 1000L;
- /**
- * Callback when execute success.
- *
- * @param jobId job id
- */
- void onSuccess(long jobId);
+ private final String schemaName;
- /**
- * Callback when execute failure.
- *
- * @param jobId job id
- */
- void onFailure(long jobId);
+ private final String ruleCacheId;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/constant/ScalingConstant.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/constant/ScalingConstant.java
index c677fd4..3c0ef9d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/constant/ScalingConstant.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/constant/ScalingConstant.java
@@ -62,11 +62,21 @@ public final class ScalingConstant {
public static final String CONFIG = "config";
/**
+ * Status.
+ */
+ public static final String STATUS = "status";
+
+ /**
* Position.
*/
public static final String POSITION = "position";
/**
+ * Workflow.
+ */
+ public static final String WORKFLOW = "workflow";
+
+ /**
* Incremental.
*/
public static final String INCREMENTAL = "incremental";
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
index 40f5644..162bf64 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
@@ -17,27 +17,22 @@
package org.apache.shardingsphere.scaling.core.service;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurationsAlteredEvent;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.config.WorkflowConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckerFactory;
import org.apache.shardingsphere.scaling.core.job.environmental.ScalingEnvironmentalManager;
-import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
import java.sql.SQLException;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
/**
* Abstract scaling job service.
@@ -45,29 +40,15 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public abstract class AbstractScalingJobService implements ScalingJobService {
- private static final ScheduledExecutorService FINISH_CHECK_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("Scaling-finish-check-%d"));
-
@Override
- public Optional<ScalingJob> start(final String sourceDataSource, final String sourceRule, final String targetDataSource, final String targetRule, final ScalingCallback scalingCallback) {
- log.info("start scaling job...");
- log.info("sourceDataSource = {}", sourceDataSource);
- log.info("sourceRule = {}", sourceRule);
- log.info("targetDataSource = {}", targetDataSource);
- log.info("targetRule = {}", targetRule);
- Optional<ScalingJob> result = start(sourceDataSource, sourceRule, targetDataSource, targetRule);
- if (!result.isPresent()) {
- return result;
- }
- FINISH_CHECK_EXECUTOR.scheduleWithFixedDelay(new JobFinishChecker(result.get(), scalingCallback), 1, 1, TimeUnit.MINUTES);
- return result;
- }
-
- private Optional<ScalingJob> start(final String sourceDataSource, final String sourceRule, final String targetDataSource, final String targetRule) {
+ public Optional<ScalingJob> start(final RuleConfigurationsAlteredEvent event) {
ScalingConfiguration scalingConfig = new ScalingConfiguration();
scalingConfig.setRuleConfiguration(new RuleConfiguration(
- new ShardingSphereJDBCDataSourceConfiguration(sourceDataSource, sourceRule),
- new ShardingSphereJDBCDataSourceConfiguration(targetDataSource, targetRule)));
- scalingConfig.setJobConfiguration(new JobConfiguration());
+ new ShardingSphereJDBCDataSourceConfiguration(event.getSourceDataSource(), event.getSourceRule()),
+ new ShardingSphereJDBCDataSourceConfiguration(event.getTargetDataSource(), event.getTargetRule())));
+ JobConfiguration jobConfig = new JobConfiguration();
+ jobConfig.setWorkflowConfig(new WorkflowConfiguration(event.getSchemaName(), event.getRuleCacheId()));
+ scalingConfig.setJobConfiguration(jobConfig);
return start(scalingConfig);
}
@@ -88,38 +69,4 @@ public abstract class AbstractScalingJobService implements ScalingJobService {
public void reset(final long jobId) throws SQLException {
new ScalingEnvironmentalManager().resetTargetTable(getJob(jobId));
}
-
- @RequiredArgsConstructor
- private class JobFinishChecker implements Runnable {
-
- private final ScalingJob scalingJob;
-
- private final ScalingCallback scalingCallback;
-
- private boolean executed;
-
- @Override
- public void run() {
- if (executed) {
- return;
- }
- long jobId = scalingJob.getJobId();
- try {
- JobProgress jobProgress = getProgress(jobId);
- if (jobProgress.getStatus().contains("FAILURE")) {
- log.warn("scaling job {} failure.", jobId);
- executed = true;
- scalingCallback.onFailure(jobId);
- } else if (ScalingTaskUtil.allTasksAlmostFinished(jobProgress, scalingJob.getScalingConfig().getJobConfiguration())) {
- log.info("scaling job {} almost finished.", jobId);
- executed = true;
- scalingCallback.onSuccess(jobId);
- }
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- log.error("scaling job {} finish check failed!", jobId, ex);
- }
- }
- }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
index a488f46..89bb13a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.core.service;
+import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurationsAlteredEvent;
import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
@@ -50,14 +51,10 @@ public interface ScalingJobService {
/**
* Start scaling job.
*
- * @param sourceDataSource source data source
- * @param sourceRule source rule
- * @param targetDataSource target data source
- * @param targetRule target rule
- * @param scalingCallback scaling callback
+ * @param event rule configurations altered event
* @return scaling job
*/
- Optional<ScalingJob> start(String sourceDataSource, String sourceRule, String targetDataSource, String targetRule, ScalingCallback scalingCallback);
+ Optional<ScalingJob> start(RuleConfigurationsAlteredEvent event);
/**
* Stop job.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorker.java
index 0676f73..9c2fe00 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorker.java
@@ -25,13 +25,6 @@ import org.apache.shardingsphere.governance.repository.api.config.GovernanceConf
public interface ScalingWorker {
/**
- * Get type.
- *
- * @return type of scaling worker.
- */
- String getType();
-
- /**
* Init scaling worker.
*
* @param governanceConfig governance configuration
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorkerLoader.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorkerLoader.java
index a59f08a..005c09e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorkerLoader.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorkerLoader.java
@@ -23,7 +23,6 @@ import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import java.util.Collection;
-import java.util.Optional;
/**
* Scaling worker loader.
@@ -33,10 +32,8 @@ public final class ScalingWorkerLoader {
/**
* Init scaling worker.
- *
- * @return worker type
*/
- public static Optional<String> initScalingWorker() {
+ public static void initScalingWorker() {
log.info("Init scaling worker");
ShardingSphereServiceLoader.register(ScalingWorker.class);
GovernanceConfiguration governanceConfig = ScalingContext.getInstance().getServerConfig().getDistributedScalingService();
@@ -44,10 +41,7 @@ public final class ScalingWorkerLoader {
Collection<ScalingWorker> scalingWorkers = ShardingSphereServiceLoader.newServiceInstances(ScalingWorker.class);
for (ScalingWorker each : scalingWorkers) {
each.init(governanceConfig);
- return Optional.of(each.getType());
}
- log.error("None worker found.");
}
- return Optional.empty();
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
index 778fd13..b15dec6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
@@ -50,7 +50,7 @@ public final class ScalingTaskUtil {
*/
public static boolean allTasksAlmostFinished(final JobProgress jobProgress, final JobConfiguration jobConfig) {
return jobProgress.getInventoryTaskProgress().stream().allMatch(each -> each.getTotal() == each.getFinished())
- && jobProgress.getIncrementalTaskProgress().stream().allMatch(each -> each.getDelayMillisecond() < jobConfig.getAllowDelay());
+ && jobProgress.getIncrementalTaskProgress().stream().allMatch(each -> each.getDelayMillisecond() < jobConfig.getWorkflowConfig().getAllowDelay());
}
/**
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/ScalingServiceHolder.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/workflow/ScalingServiceHolder.java
similarity index 62%
rename from shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/ScalingServiceHolder.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/workflow/ScalingServiceHolder.java
index 1a31236..eb32895 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/ScalingServiceHolder.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/workflow/ScalingServiceHolder.java
@@ -15,17 +15,17 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.governance.core.scaling;
+package org.apache.shardingsphere.scaling.core.workflow;
import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurationsAlteredEvent;
import org.apache.shardingsphere.governance.core.event.model.rule.SwitchRuleConfigurationEvent;
-import org.apache.shardingsphere.governance.core.scaling.callback.ScalingResultCallback;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
+import org.apache.shardingsphere.scaling.core.service.ScalingJobServiceFactory;
import java.util.Map;
import java.util.Optional;
@@ -38,15 +38,11 @@ public final class ScalingServiceHolder {
private static final ScalingServiceHolder INSTANCE = new ScalingServiceHolder();
- private volatile ScalingJobService scalingJobService;
-
- private ScalingServiceHolder() {
- ShardingSphereEventBus.getInstance().register(this);
- }
+ private final ScalingJobService scalingJobService = ScalingJobServiceFactory.getInstance();
/**
* Get scaling service holder instance.
- *
+ *
* @return scaling service holder instance
*/
public static ScalingServiceHolder getInstance() {
@@ -54,57 +50,50 @@ public final class ScalingServiceHolder {
}
/**
- * Init scaling service.
- *
- * @param scalingJobService scaling job service
+ * Init.
*/
- public void init(final ScalingJobService scalingJobService) {
- this.scalingJobService = scalingJobService;
+ public void init() {
+ ShardingSphereEventBus.getInstance().register(this);
}
/**
* Start scaling job.
- *
+ *
* @param event rule configurations altered event.
*/
@Subscribe
public void startScalingJob(final RuleConfigurationsAlteredEvent event) {
- Optional<ScalingJob> scalingJob = scalingJobService.start(event.getYamlDataSourceContent(),
- event.getYamlRuleConfigurationsContent(), event.getYamlDataSourceContent(),
- event.getCachedYamlRuleConfigurationsContent(), new ScalingResultCallback(event.getSchemaName(), event.getRuleConfigurationCacheId()));
+ Optional<ScalingJob> scalingJob = scalingJobService.start(event);
if (!scalingJob.isPresent()) {
- ShardingSphereEventBus.getInstance().post(new SwitchRuleConfigurationEvent(event.getSchemaName(), event.getRuleConfigurationCacheId()));
+ ShardingSphereEventBus.getInstance().post(new SwitchRuleConfigurationEvent(event.getSchemaName(), event.getRuleCacheId()));
}
}
/**
* Check scaling result.
- *
+ *
* @param jobId job Id
* @return true if scaling result check successfully, else false
*/
public boolean checkScalingResult(final long jobId) {
- return checkScalingResult(jobId, scalingJobService.check(jobId));
- }
-
- private boolean checkScalingResult(final long jobId, final Map<String, DataConsistencyCheckResult> scalingResult) {
- if (!scalingResult.isEmpty()) {
- for (String key : scalingResult.keySet()) {
- boolean isDataValid = scalingResult.get(key).isDataValid();
- boolean isCountValid = scalingResult.get(key).isCountValid();
- if (!isDataValid || !isCountValid) {
- log.error("Scaling job: {}, table: {} data consistency check failed, dataValid: {}, countValid: {}", jobId, key, isDataValid, isCountValid);
- return false;
- }
+ Map<String, DataConsistencyCheckResult> scalingResult = scalingJobService.check(jobId);
+ if (scalingResult.isEmpty()) {
+ return false;
+ }
+ for (String key : scalingResult.keySet()) {
+ boolean isDataValid = scalingResult.get(key).isDataValid();
+ boolean isCountValid = scalingResult.get(key).isCountValid();
+ if (!isDataValid || !isCountValid) {
+ log.error("Scaling job: {}, table: {} data consistency check failed, dataValid: {}, countValid: {}", jobId, key, isDataValid, isCountValid);
+ return false;
}
- return true;
}
- return false;
+ return true;
}
/**
* Stop scaling job.
- *
+ *
* @param jobId job id
*/
public void stopScalingJob(final long jobId) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
index 7184581..f4606d7 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.scaling.core.service.impl;
import com.google.gson.Gson;
import lombok.SneakyThrows;
+import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurationsAlteredEvent;
import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
@@ -31,7 +32,6 @@ import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundExcept
import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
-import org.apache.shardingsphere.scaling.core.service.ScalingCallback;
import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
import org.apache.shardingsphere.scaling.core.util.ScalingConfigurationUtil;
import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
@@ -81,7 +81,8 @@ public final class DistributedScalingJobServiceTest {
public void assertStartWithCallbackImmediately() {
ScalingConfiguration scalingConfig = mockScalingConfiguration();
ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getSource().unwrap();
- Optional<ScalingJob> scalingJob = scalingJobService.start(source.getDataSource(), source.getRule(), source.getDataSource(), source.getRule(), mockScalingCallback());
+ RuleConfigurationsAlteredEvent event = new RuleConfigurationsAlteredEvent("schema", source.getDataSource(), source.getRule(), source.getRule(), "cacheId");
+ Optional<ScalingJob> scalingJob = scalingJobService.start(event);
assertFalse(scalingJob.isPresent());
}
@@ -90,7 +91,9 @@ public final class DistributedScalingJobServiceTest {
ScalingConfiguration scalingConfig = ScalingConfigurationUtil.initConfig("/config_sharding_sphere_jdbc_target.json");
ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getSource().unwrap();
ShardingSphereJDBCDataSourceConfiguration target = (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getTarget().unwrap();
- Optional<ScalingJob> scalingJob = scalingJobService.start(source.getDataSource(), source.getRule(), target.getDataSource(), target.getRule(), mockScalingCallback());
+ RuleConfigurationsAlteredEvent event = new RuleConfigurationsAlteredEvent(
+ "schema", source.getDataSource(), source.getRule(), target.getDataSource(), target.getRule(), "cacheId");
+ Optional<ScalingJob> scalingJob = scalingJobService.start(event);
assertTrue(scalingJob.isPresent());
}
@@ -146,19 +149,6 @@ public final class DistributedScalingJobServiceTest {
return result;
}
- private ScalingCallback mockScalingCallback() {
- return new ScalingCallback() {
-
- @Override
- public void onSuccess(final long jobId) {
- }
-
- @Override
- public void onFailure(final long jobId) {
- }
- };
- }
-
@SneakyThrows(ReflectiveOperationException.class)
private void resetRegistryRepositoryAvailable() {
ReflectionUtil.setStaticFieldValue(RegistryRepositoryHolder.class, "available", null);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorkerLoaderTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorkerLoaderTest.java
deleted file mode 100644
index e5e0048..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorkerLoaderTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.spi;
-
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
-import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
-import org.apache.shardingsphere.scaling.core.config.ScalingContext;
-import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
-import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
-import org.junit.Test;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public final class ScalingWorkerLoaderTest {
-
- @Test
- @SneakyThrows(ReflectiveOperationException.class)
- public void assertWithoutDistributedScalingService() {
- ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "serverConfig", new ServerConfiguration());
- assertFalse(ScalingWorkerLoader.initScalingWorker().isPresent());
- }
-
- @Test
- @SneakyThrows(ReflectiveOperationException.class)
- public void assertScalingWorkerAvailable() {
- ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "serverConfig", mockServerConfiguration());
- assertTrue(ScalingWorkerLoader.initScalingWorker().isPresent());
- }
-
- private ServerConfiguration mockServerConfiguration() {
- ServerConfiguration result = new ServerConfiguration();
- result.setDistributedScalingService(new GovernanceConfiguration("test", new GovernanceCenterConfiguration("Zookeeper", "", null), false));
- return result;
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingWorker b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingWorker
deleted file mode 100644
index 1209df9..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingWorker
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.scaling.core.fixture.FixtureScalingWorker
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java
index 3abd77f..0abdbf2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java
@@ -65,13 +65,8 @@ public final class ElasticJobScalingWorker implements ScalingWorker {
private CoordinatorRegistryCenter registryCenter;
@Override
- public String getType() {
- return "ElasticJob";
- }
-
- @Override
public void init(final GovernanceConfiguration governanceConfig) {
- log.info("Scaling elastic job init...");
+ log.info("Init elastic job scaling worker.");
this.governanceConfig = governanceConfig;
registryCenter = ElasticJobUtils.createRegistryCenter(governanceConfig);
watchConfigRepository();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureScalingWorker.java b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/FinishedCheckWorker.java
similarity index 52%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureScalingWorker.java
rename to shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/FinishedCheckWorker.java
index c440d3c..94ad402 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureScalingWorker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/FinishedCheckWorker.java
@@ -15,19 +15,33 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.fixture;
+package org.apache.shardingsphere.scaling.elasticjob;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
import org.apache.shardingsphere.scaling.core.spi.ScalingWorker;
+import org.apache.shardingsphere.scaling.elasticjob.job.FinishedCheckJob;
+import org.apache.shardingsphere.scaling.elasticjob.util.ElasticJobUtils;
-public final class FixtureScalingWorker implements ScalingWorker {
+/**
+ * Finished check worker.
+ */
+@Slf4j
+public final class FinishedCheckWorker implements ScalingWorker {
- @Override
- public String getType() {
- return "Fixture";
- }
+ private static final String JOB_NAME = "finished_check";
+
+ private static final String CRON_EXPRESSION = "0 * * * * ?";
@Override
public void init(final GovernanceConfiguration governanceConfig) {
+ log.info("Init finished check worker.");
+ new ScheduleJobBootstrap(ElasticJobUtils.createRegistryCenter(governanceConfig), new FinishedCheckJob(), createJobConfig()).schedule();
+ }
+
+ private JobConfiguration createJobConfig() {
+ return JobConfiguration.newBuilder(JOB_NAME, 1).cron(CRON_EXPRESSION).build();
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/FinishedCheckJob.java b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/FinishedCheckJob.java
new file mode 100644
index 0000000..844096a
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/FinishedCheckJob.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.elasticjob.job;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import org.apache.shardingsphere.governance.core.event.model.rule.SwitchRuleConfigurationEvent;
+import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.lock.LockContext;
+import org.apache.shardingsphere.scaling.core.config.WorkflowConfiguration;
+import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
+import org.apache.shardingsphere.scaling.core.job.ScalingJob;
+import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
+import org.apache.shardingsphere.scaling.core.service.impl.DistributedScalingJobService;
+import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
+import org.apache.shardingsphere.scaling.core.utils.ThreadUtil;
+import org.apache.shardingsphere.scaling.core.workflow.ScalingServiceHolder;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public final class FinishedCheckJob implements SimpleJob {
+
+ private static final RegistryRepository REGISTRY_REPOSITORY = RegistryRepositoryHolder.getInstance();
+
+ private final DistributedScalingJobService scalingJobService = new DistributedScalingJobService();
+
+ @Override
+ public void execute(final ShardingContext shardingContext) {
+ List<String> jobs = REGISTRY_REPOSITORY.getChildrenKeys(ScalingConstant.SCALING_LISTENER_PATH);
+ for (String each : jobs) {
+ long jobId = Long.parseLong(each);
+ try {
+ ScalingJob scalingJob = scalingJobService.getJob(jobId);
+ WorkflowConfiguration workflowConfig = scalingJob.getScalingConfig().getJobConfiguration().getWorkflowConfig();
+ if (workflowConfig == null) {
+ continue;
+ }
+ if (ScalingTaskUtil.allTasksAlmostFinished(scalingJobService.getProgress(jobId), scalingJob.getScalingConfig().getJobConfiguration())) {
+ log.info("scaling job {} almost finished.", jobId);
+ trySwitch(jobId, workflowConfig);
+ }
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ log.error("scaling job {} finish check failed!", jobId, ex);
+ }
+ }
+ }
+
+ private void trySwitch(final long jobId, final WorkflowConfiguration workflowConfig) {
+ if (LockContext.getLockStrategy().tryLock(30L, TimeUnit.SECONDS) && LockContext.getLockStrategy().checkLock()) {
+ try {
+ ThreadUtil.sleep(10 * 1000L);
+ if (ScalingServiceHolder.getInstance().checkScalingResult(jobId)) {
+ ScalingServiceHolder.getInstance().stopScalingJob(jobId);
+ ShardingSphereEventBus.getInstance().post(new SwitchRuleConfigurationEvent(workflowConfig.getSchemaName(), workflowConfig.getRuleCacheId()));
+ }
+ } finally {
+ LockContext.getLockStrategy().releaseLock();
+ }
+ } else {
+ log.warn("can not get lock.");
+ }
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingWorker b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingWorker
index 25b2f8f..c4cfd5e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingWorker
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingWorker
@@ -16,3 +16,4 @@
#
org.apache.shardingsphere.scaling.elasticjob.ElasticJobScalingWorker
+org.apache.shardingsphere.scaling.elasticjob.FinishedCheckWorker