You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/01/28 10:43:21 UTC
[shardingsphere] branch master updated: Merge scaling-elasticjob
into scaling-core (#9208)
This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 840a6d9 Merge scaling-elasticjob into scaling-core (#9208)
840a6d9 is described below
commit 840a6d9ac95f06e4c87b764cf477e5eb92bace1b
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Thu Jan 28 18:42:55 2021 +0800
Merge scaling-elasticjob into scaling-core (#9208)
* Merge scaling-elasticjob into scaling-core
* Refactor ElasticJobScalingWorker and FinishedCheckWorker
* Refactor ScalingServiceHolder
Co-authored-by: qiulu3 <Lucas209910>
---
.../shardingsphere-proxy-bootstrap/pom.xml | 5 --
.../impl/GovernanceBootstrapInitializer.java | 6 +-
shardingsphere-scaling/pom.xml | 1 -
.../shardingsphere-scaling-bootstrap/pom.xml | 5 --
.../scaling/ScalingWorkerBootstrap.java | 4 +-
.../shardingsphere-scaling-core/pom.xml | 21 +++++-
.../scaling/core/api/ScalingWorker.java | 82 ++++++++++++++++++++++
.../scaling/core/config/HandleConfiguration.java | 6 ++
.../executor/job/FinishedCheckJobExecutor.java} | 20 +++---
.../execute/executor/job/ScalingJobExecutor.java} | 25 ++++---
.../scaling/core}/job/FinishedCheckJob.java | 3 +-
.../scaling/core/job/ScalingJob.java} | 7 +-
.../core/service/AbstractScalingJobService.java | 20 ------
.../scaling/core/service/ScalingJobService.java | 11 +--
.../scaling/core/spi/ScalingWorker.java | 33 ---------
.../scaling/core/spi/ScalingWorkerLoader.java | 47 -------------
.../scaling/core/utils/ElasticJobUtil.java} | 6 +-
.../core/workflow/ScalingServiceHolder.java | 26 -------
.../impl/DistributedScalingJobServiceTest.java | 25 +------
.../shardingsphere-scaling-elasticjob/pom.xml | 60 ----------------
...e.shardingsphere.scaling.core.spi.ScalingWorker | 19 -----
21 files changed, 144 insertions(+), 288 deletions(-)
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/pom.xml b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/pom.xml
index 498ab35..ab091a5 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/pom.xml
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/pom.xml
@@ -140,11 +140,6 @@
<version>${project.parent.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.shardingsphere</groupId>
- <artifactId>shardingsphere-scaling-elasticjob</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<scope>compile</scope>
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializer.java b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializer.java
index 80f87fc..66f5a9f 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializer.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializer.java
@@ -38,10 +38,9 @@ import org.apache.shardingsphere.proxy.config.YamlProxyConfiguration;
import org.apache.shardingsphere.proxy.config.util.DataSourceParameterConverter;
import org.apache.shardingsphere.proxy.config.yaml.YamlProxyRuleConfiguration;
import org.apache.shardingsphere.proxy.config.yaml.YamlProxyServerConfiguration;
+import org.apache.shardingsphere.scaling.core.api.ScalingWorker;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
-import org.apache.shardingsphere.scaling.core.spi.ScalingWorkerLoader;
-import org.apache.shardingsphere.scaling.core.workflow.ScalingServiceHolder;
import org.apache.shardingsphere.transaction.context.TransactionContexts;
import java.util.Collection;
@@ -143,8 +142,7 @@ public final class GovernanceBootstrapInitializer extends AbstractBootstrapIniti
ServerConfiguration serverConfiguration = scalingConfigurationOptional.get();
serverConfiguration.setGovernanceConfig(new GovernanceConfigurationYamlSwapper().swapToObject(yamlConfig.getServerConfiguration().getGovernance()));
ScalingContext.getInstance().init(serverConfiguration);
- ScalingWorkerLoader.initScalingWorker();
- ScalingServiceHolder.getInstance().init();
+ ScalingWorker.init();
}
}
}
diff --git a/shardingsphere-scaling/pom.xml b/shardingsphere-scaling/pom.xml
index 6019ada..0cc7e32 100755
--- a/shardingsphere-scaling/pom.xml
+++ b/shardingsphere-scaling/pom.xml
@@ -31,7 +31,6 @@
<modules>
<module>shardingsphere-scaling-bootstrap</module>
<module>shardingsphere-scaling-core</module>
- <module>shardingsphere-scaling-elasticjob</module>
<module>shardingsphere-scaling-dialect</module>
</modules>
</project>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/pom.xml
index 8835bf8..2d5e289 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/pom.xml
@@ -36,11 +36,6 @@
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
- <artifactId>shardingsphere-scaling-elasticjob</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-scaling-mysql</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/ScalingWorkerBootstrap.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/ScalingWorkerBootstrap.java
index 3f98bc0..c8921ac 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/ScalingWorkerBootstrap.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/ScalingWorkerBootstrap.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.scaling;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.scaling.core.spi.ScalingWorkerLoader;
+import org.apache.shardingsphere.scaling.core.api.ScalingWorker;
import org.apache.shardingsphere.scaling.util.ServerConfigurationInitializer;
@@ -41,7 +41,7 @@ public final class ScalingWorkerBootstrap {
// CHECKSTYLE:ON
log.info("ShardingSphere-Scaling Worker Startup");
ServerConfigurationInitializer.init();
- ScalingWorkerLoader.initScalingWorker();
+ ScalingWorker.init();
wait0();
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
index f0a68ad..bc5bba1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
@@ -36,6 +36,11 @@
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-infra-executor</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core</artifactId>
<version>${project.version}</version>
</dependency>
@@ -55,9 +60,19 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.shardingsphere</groupId>
- <artifactId>shardingsphere-infra-executor</artifactId>
- <version>${project.version}</version>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-lite-core</artifactId>
+ <version>${elasticjob.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-api</artifactId>
+ <version>${elasticjob.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-lite-lifecycle</artifactId>
+ <version>${elasticjob.version}</version>
</dependency>
<dependency>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java
new file mode 100644
index 0000000..eac38d8
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.api;
+
+import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurationsAlteredEvent;
+import org.apache.shardingsphere.governance.core.event.model.rule.SwitchRuleConfigurationEvent;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.scaling.core.config.HandleConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ScalingContext;
+import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
+import org.apache.shardingsphere.scaling.core.config.WorkflowConfiguration;
+import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.execute.executor.job.FinishedCheckJobExecutor;
+import org.apache.shardingsphere.scaling.core.execute.executor.job.ScalingJobExecutor;
+import org.apache.shardingsphere.scaling.core.job.JobContext;
+import org.apache.shardingsphere.scaling.core.service.ScalingJobServiceFactory;
+
+import java.util.Optional;
+
+/**
+ * Scaling worker.
+ */
+@Slf4j
+public final class ScalingWorker {
+
+ private static final ScalingWorker INSTANCE = new ScalingWorker();
+
+ /**
+ * Init scaling worker.
+ */
+ public static void init() {
+ ServerConfiguration serverConfig = ScalingContext.getInstance().getServerConfig();
+ Preconditions.checkArgument(null != serverConfig && null != serverConfig.getGovernanceConfig(), "Scaling server config and governance config is required.");
+ ShardingSphereEventBus.getInstance().register(INSTANCE);
+ new FinishedCheckJobExecutor().start();
+ new ScalingJobExecutor().start();
+ }
+
+ /**
+ * Start scaling job.
+ *
+ * @param event rule configurations altered event.
+ */
+ @Subscribe
+ public void start(final RuleConfigurationsAlteredEvent event) {
+ log.info("Start scaling job by {}", event);
+ Optional<JobContext> jobContext = ScalingJobServiceFactory.getInstance().start(createJobConfig(event));
+ if (!jobContext.isPresent()) {
+ log.info("Switch rule configuration ruleCacheId = {} immediately.", event.getRuleCacheId());
+ ShardingSphereEventBus.getInstance().post(new SwitchRuleConfigurationEvent(event.getSchemaName(), event.getRuleCacheId()));
+ }
+ }
+
+ private JobConfiguration createJobConfig(final RuleConfigurationsAlteredEvent event) {
+ JobConfiguration result = new JobConfiguration();
+ result.setRuleConfig(new RuleConfiguration(
+ new ShardingSphereJDBCDataSourceConfiguration(event.getSourceDataSource(), event.getSourceRule()),
+ new ShardingSphereJDBCDataSourceConfiguration(event.getTargetDataSource(), event.getTargetRule())));
+ result.setHandleConfig(new HandleConfiguration(new WorkflowConfiguration(event.getSchemaName(), event.getRuleCacheId())));
+ return result;
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java
index 1f8e325..353fc3d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java
@@ -18,11 +18,13 @@
package org.apache.shardingsphere.scaling.core.config;
import lombok.Getter;
+import lombok.NoArgsConstructor;
import lombok.Setter;
/**
* Handle configuration.
*/
+@NoArgsConstructor
@Setter
@Getter
public final class HandleConfiguration {
@@ -44,4 +46,8 @@ public final class HandleConfiguration {
private String databaseType;
private WorkflowConfiguration workflowConfig;
+
+ public HandleConfiguration(final WorkflowConfiguration workflowConfig) {
+ this.workflowConfig = workflowConfig;
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/FinishedCheckWorker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/FinishedCheckJobExecutor.java
similarity index 61%
rename from shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/FinishedCheckWorker.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/FinishedCheckJobExecutor.java
index 94ad402..85138f7 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/FinishedCheckWorker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/FinishedCheckJobExecutor.java
@@ -15,30 +15,32 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.elasticjob;
+package org.apache.shardingsphere.scaling.core.execute.executor.job;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
-import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
-import org.apache.shardingsphere.scaling.core.spi.ScalingWorker;
-import org.apache.shardingsphere.scaling.elasticjob.job.FinishedCheckJob;
-import org.apache.shardingsphere.scaling.elasticjob.util.ElasticJobUtils;
+import org.apache.shardingsphere.scaling.core.config.ScalingContext;
+import org.apache.shardingsphere.scaling.core.execute.executor.AbstractScalingExecutor;
+import org.apache.shardingsphere.scaling.core.execute.executor.ScalingExecutor;
+import org.apache.shardingsphere.scaling.core.job.FinishedCheckJob;
+import org.apache.shardingsphere.scaling.core.utils.ElasticJobUtil;
/**
* Finished check worker.
*/
@Slf4j
-public final class FinishedCheckWorker implements ScalingWorker {
+public final class FinishedCheckJobExecutor extends AbstractScalingExecutor implements ScalingExecutor {
private static final String JOB_NAME = "finished_check";
private static final String CRON_EXPRESSION = "0 * * * * ?";
@Override
- public void init(final GovernanceConfiguration governanceConfig) {
- log.info("Init finished check worker.");
- new ScheduleJobBootstrap(ElasticJobUtils.createRegistryCenter(governanceConfig), new FinishedCheckJob(), createJobConfig()).schedule();
+ public void start() {
+ super.start();
+ log.info("start finished check worker.");
+ new ScheduleJobBootstrap(ElasticJobUtil.createRegistryCenter(ScalingContext.getInstance().getServerConfig().getGovernanceConfig()), new FinishedCheckJob(), createJobConfig()).schedule();
}
private JobConfiguration createJobConfig() {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
similarity index 88%
rename from shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
index 17efb6c..a1a6848 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.elasticjob;
+package org.apache.shardingsphere.scaling.core.execute.executor.job;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
@@ -33,12 +33,14 @@ import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
+import org.apache.shardingsphere.scaling.core.execute.executor.AbstractScalingExecutor;
+import org.apache.shardingsphere.scaling.core.execute.executor.ScalingExecutor;
+import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
-import org.apache.shardingsphere.scaling.core.spi.ScalingWorker;
+import org.apache.shardingsphere.scaling.core.utils.ElasticJobUtil;
import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
-import org.apache.shardingsphere.scaling.elasticjob.job.ScalingElasticJob;
-import org.apache.shardingsphere.scaling.elasticjob.util.ElasticJobUtils;
import java.util.Map;
import java.util.Optional;
@@ -46,10 +48,10 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
- * Elastic job scaling worker.
+ * Scaling job executor.
*/
@Slf4j
-public final class ElasticJobScalingWorker implements ScalingWorker {
+public final class ScalingJobExecutor extends AbstractScalingExecutor implements ScalingExecutor {
private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().serializeNulls().create();
@@ -64,10 +66,11 @@ public final class ElasticJobScalingWorker implements ScalingWorker {
private CoordinatorRegistryCenter registryCenter;
@Override
- public void init(final GovernanceConfiguration governanceConfig) {
- log.info("Init elastic job scaling worker.");
- this.governanceConfig = governanceConfig;
- registryCenter = ElasticJobUtils.createRegistryCenter(governanceConfig);
+ public void start() {
+ super.start();
+ log.info("Start scaling job executor.");
+ this.governanceConfig = ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
+ registryCenter = ElasticJobUtil.createRegistryCenter(governanceConfig);
watchConfigRepository();
}
@@ -167,7 +170,7 @@ public final class ElasticJobScalingWorker implements ScalingWorker {
private boolean running;
private JobBootstrapWrapper(final String jobId, final JobConfiguration jobConfig) {
- jobBootstrap = new OneOffJobBootstrap(registryCenter, new ScalingElasticJob(), createJobConfig(jobId, jobConfig));
+ jobBootstrap = new OneOffJobBootstrap(registryCenter, new ScalingJob(), createJobConfig(jobId, jobConfig));
running = jobConfig.getHandleConfig().isRunning();
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/FinishedCheckJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
similarity index 97%
rename from shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/FinishedCheckJob.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
index 1da0364..5184f96 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/FinishedCheckJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.elasticjob.job;
+package org.apache.shardingsphere.scaling.core.job;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -26,7 +26,6 @@ import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.scaling.core.config.WorkflowConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
-import org.apache.shardingsphere.scaling.core.job.JobContext;
import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
import org.apache.shardingsphere.scaling.core.service.impl.DistributedScalingJobService;
import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
similarity index 93%
rename from shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
index 343f55f..d999192 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.elasticjob.job;
+package org.apache.shardingsphere.scaling.core.job;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -24,15 +24,14 @@ import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.apache.shardingsphere.scaling.core.api.JobSchedulerCenter;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
-import org.apache.shardingsphere.scaling.core.job.JobContext;
import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
import org.apache.shardingsphere.scaling.core.service.impl.StandaloneScalingJobService;
/**
- * Scaling elastic job.
+ * Scaling job.
*/
@Slf4j
-public final class ScalingElasticJob implements SimpleJob {
+public final class ScalingJob implements SimpleJob {
private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().serializeNulls().create();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
index ebf0b2d..f01057a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
@@ -18,13 +18,6 @@
package org.apache.shardingsphere.scaling.core.service;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurationsAlteredEvent;
-import org.apache.shardingsphere.scaling.core.config.HandleConfiguration;
-import org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
-import org.apache.shardingsphere.scaling.core.config.WorkflowConfiguration;
-import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.job.JobContext;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckerFactory;
@@ -32,7 +25,6 @@ import org.apache.shardingsphere.scaling.core.job.environmental.ScalingEnvironme
import java.sql.SQLException;
import java.util.Map;
-import java.util.Optional;
/**
* Abstract scaling job service.
@@ -41,18 +33,6 @@ import java.util.Optional;
public abstract class AbstractScalingJobService implements ScalingJobService {
@Override
- public Optional<JobContext> start(final RuleConfigurationsAlteredEvent event) {
- JobConfiguration jobConfig = new JobConfiguration();
- jobConfig.setRuleConfig(new RuleConfiguration(
- new ShardingSphereJDBCDataSourceConfiguration(event.getSourceDataSource(), event.getSourceRule()),
- new ShardingSphereJDBCDataSourceConfiguration(event.getTargetDataSource(), event.getTargetRule())));
- HandleConfiguration handleConfig = new HandleConfiguration();
- handleConfig.setWorkflowConfig(new WorkflowConfiguration(event.getSchemaName(), event.getRuleCacheId()));
- jobConfig.setHandleConfig(handleConfig);
- return start(jobConfig);
- }
-
- @Override
public Map<String, DataConsistencyCheckResult> check(final long jobId) {
log.info("scaling job {} start data consistency check.", jobId);
DataConsistencyChecker dataConsistencyChecker = DataConsistencyCheckerFactory.newInstance(getJob(jobId));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
index e546999..7e09973 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
@@ -17,10 +17,9 @@
package org.apache.shardingsphere.scaling.core.service;
-import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurationsAlteredEvent;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
-import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.JobContext;
+import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
import java.sql.SQLException;
@@ -49,14 +48,6 @@ public interface ScalingJobService {
Optional<JobContext> start(JobConfiguration jobConfig);
/**
- * Start scaling job.
- *
- * @param event rule configurations altered event
- * @return job context
- */
- Optional<JobContext> start(RuleConfigurationsAlteredEvent event);
-
- /**
* Stop job.
*
* @param jobId job id
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorker.java
deleted file mode 100644
index 9c2fe00..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorker.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.spi;
-
-import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
-
-/**
- * Scaling worker.
- */
-public interface ScalingWorker {
-
- /**
- * Init scaling worker.
- *
- * @param governanceConfig governance configuration
- */
- void init(GovernanceConfiguration governanceConfig);
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorkerLoader.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorkerLoader.java
deleted file mode 100644
index 1467f4a..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorkerLoader.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.spi;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
-import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.scaling.core.config.ScalingContext;
-
-import java.util.Collection;
-
-/**
- * Scaling worker loader.
- */
-@Slf4j
-public final class ScalingWorkerLoader {
-
- /**
- * Init scaling worker.
- */
- public static void initScalingWorker() {
- log.info("Init scaling worker");
- ShardingSphereServiceLoader.register(ScalingWorker.class);
- GovernanceConfiguration governanceConfig = ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
- if (null != governanceConfig) {
- Collection<ScalingWorker> scalingWorkers = ShardingSphereServiceLoader.newServiceInstances(ScalingWorker.class);
- for (ScalingWorker each : scalingWorkers) {
- each.init(governanceConfig);
- }
- }
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/util/ElasticJobUtils.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ElasticJobUtil.java
similarity index 96%
rename from shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/util/ElasticJobUtils.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ElasticJobUtil.java
index 356d0af..4f8b9d3 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/util/ElasticJobUtils.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ElasticJobUtil.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.elasticjob.util;
+package org.apache.shardingsphere.scaling.core.utils;
import com.google.common.base.Strings;
import lombok.AccessLevel;
@@ -29,10 +29,10 @@ import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import java.util.Properties;
/**
- * Elastic job utils.
+ * Elastic job util.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class ElasticJobUtils {
+public final class ElasticJobUtil {
/**
* Create registry center.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/workflow/ScalingServiceHolder.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/workflow/ScalingServiceHolder.java
index 169de16..95be557 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/workflow/ScalingServiceHolder.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/workflow/ScalingServiceHolder.java
@@ -17,18 +17,12 @@
package org.apache.shardingsphere.scaling.core.workflow;
-import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurationsAlteredEvent;
-import org.apache.shardingsphere.governance.core.event.model.rule.SwitchRuleConfigurationEvent;
-import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import org.apache.shardingsphere.scaling.core.job.JobContext;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
import org.apache.shardingsphere.scaling.core.service.ScalingJobServiceFactory;
import java.util.Map;
-import java.util.Optional;
/**
* Scaling service holder.
@@ -50,26 +44,6 @@ public final class ScalingServiceHolder {
}
/**
- * Init.
- */
- public void init() {
- ShardingSphereEventBus.getInstance().register(this);
- }
-
- /**
- * Start scaling job.
- *
- * @param event rule configurations altered event.
- */
- @Subscribe
- public void startScalingJob(final RuleConfigurationsAlteredEvent event) {
- Optional<JobContext> jobContext = scalingJobService.start(event);
- if (!jobContext.isPresent()) {
- ShardingSphereEventBus.getInstance().post(new SwitchRuleConfigurationEvent(event.getSchemaName(), event.getRuleCacheId()));
- }
- }
-
- /**
* Check scaling result.
*
* @param jobId job Id
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
index a8608a8..3582600 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
@@ -19,18 +19,16 @@ package org.apache.shardingsphere.scaling.core.service.impl;
import com.google.gson.Gson;
import lombok.SneakyThrows;
-import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurationsAlteredEvent;
import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
-import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundException;
-import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.JobContext;
+import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
import org.apache.shardingsphere.scaling.core.util.JobConfigurationUtil;
@@ -44,7 +42,6 @@ import java.io.IOException;
import java.util.Optional;
import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -78,26 +75,6 @@ public final class DistributedScalingJobServiceTest {
}
@Test
- public void assertStartWithCallbackImmediately() {
- JobConfiguration jobConfig = mockJobConfiguration();
- ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) jobConfig.getRuleConfig().getSource().unwrap();
- RuleConfigurationsAlteredEvent event = new RuleConfigurationsAlteredEvent("schema", source.getDataSource(), source.getRule(), source.getRule(), "cacheId");
- Optional<JobContext> jobContext = scalingJobService.start(event);
- assertFalse(jobContext.isPresent());
- }
-
- @Test
- public void assertStartWithCallbackSuccess() throws IOException {
- JobConfiguration jobConfig = JobConfigurationUtil.initJobConfig("/config_sharding_sphere_jdbc_target.json");
- ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) jobConfig.getRuleConfig().getSource().unwrap();
- ShardingSphereJDBCDataSourceConfiguration target = (ShardingSphereJDBCDataSourceConfiguration) jobConfig.getRuleConfig().getTarget().unwrap();
- RuleConfigurationsAlteredEvent event = new RuleConfigurationsAlteredEvent(
- "schema", source.getDataSource(), source.getRule(), target.getDataSource(), target.getRule(), "cacheId");
- Optional<JobContext> jobContext = scalingJobService.start(event);
- assertTrue(jobContext.isPresent());
- }
-
- @Test
public void assertStop() {
Optional<JobContext> jobContext = scalingJobService.start(mockJobConfiguration());
assertTrue(jobContext.isPresent());
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/pom.xml
deleted file mode 100644
index 1274885..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/pom.xml
+++ /dev/null
@@ -1,60 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.shardingsphere</groupId>
- <artifactId>shardingsphere-scaling</artifactId>
- <version>5.0.0-RC1-SNAPSHOT</version>
- </parent>
- <artifactId>shardingsphere-scaling-elasticjob</artifactId>
- <name>${project.artifactId}</name>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.shardingsphere</groupId>
- <artifactId>shardingsphere-scaling-core</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.shardingsphere.elasticjob</groupId>
- <artifactId>elasticjob-lite-core</artifactId>
- <version>${elasticjob.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.shardingsphere.elasticjob</groupId>
- <artifactId>elasticjob-api</artifactId>
- <version>${elasticjob.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.shardingsphere.elasticjob</groupId>
- <artifactId>elasticjob-lite-lifecycle</artifactId>
- <version>${elasticjob.version}</version>
- </dependency>
-
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- <version>${logback.version}</version>
- </dependency>
- </dependencies>
-</project>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingWorker b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingWorker
deleted file mode 100644
index c4cfd5e..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingWorker
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.scaling.elasticjob.ElasticJobScalingWorker
-org.apache.shardingsphere.scaling.elasticjob.FinishedCheckWorker