You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/09/08 06:22:10 UTC
[shardingsphere] branch scaling-dev updated: Add
shardingsphere-scaling-elasticjob module (#7310)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch scaling-dev
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/scaling-dev by this push:
new 2822a72 Add shardingsphere-scaling-elasticjob module (#7310)
2822a72 is described below
commit 2822a72b26da615678335c8171fa5a4b48839d44
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Tue Sep 8 14:21:50 2020 +0800
Add shardingsphere-scaling-elasticjob module (#7310)
* add shardingsphere-scaling-elasticjob module
* add a blank line in the end of file
Co-authored-by: qiulu3 <Lucas209910>
---
pom.xml | 21 ++-
shardingsphere-scaling/pom.xml | 1 +
.../src/main/resources/conf/server.yaml | 5 +
.../scaling/core/config/JobConfiguration.java | 2 +
.../scaling/core/config/ScalingContext.java | 13 +-
.../scaling/core/datasource/DataSourceManager.java | 6 +-
.../ElasticJobEntry.java} | 30 ++--
.../scaling/core/spi/ElasticJobEntryLoader.java | 43 ++++++
.../InventoryDataScalingTaskGroupTest.java | 11 +-
.../pom.xml | 41 ++++--
.../scaling/elasticjob/ScalingElasticJobEntry.java | 161 +++++++++++++++++++++
.../scaling/elasticjob/job/ScalingElasticJob.java | 68 +++++++++
...hardingsphere.scaling.core.spi.ElasticJobEntry} | 5 +-
13 files changed, 363 insertions(+), 44 deletions(-)
diff --git a/pom.xml b/pom.xml
index 44b3bc5..e3a062a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,7 +132,8 @@
<os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
<dockerfile-maven.version>1.4.6</dockerfile-maven.version>
<docker-compose-maven-plugin.version>2.3.0</docker-compose-maven-plugin.version>
-
+ <elasticjob.version>3.0.0-alpha</elasticjob.version>
+
<javadocExecutable>${java.home}/../bin/javadoc</javadocExecutable>
<maven.deploy.skip>false</maven.deploy.skip>
<argLine>-Xmx1024m -XX:MaxMetaspaceSize=256m</argLine>
@@ -366,7 +367,23 @@
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
-
+
+ <dependency>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-lite-core</artifactId>
+ <version>${elasticjob.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-api</artifactId>
+ <version>${elasticjob.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-lite-lifecycle</artifactId>
+ <version>${elasticjob.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
diff --git a/shardingsphere-scaling/pom.xml b/shardingsphere-scaling/pom.xml
index 48987d2..143b50b 100755
--- a/shardingsphere-scaling/pom.xml
+++ b/shardingsphere-scaling/pom.xml
@@ -33,5 +33,6 @@
<module>shardingsphere-scaling-bootstrap</module>
<module>shardingsphere-scaling-mysql</module>
<module>shardingsphere-scaling-postgresql</module>
+ <module>shardingsphere-scaling-elasticjob</module>
</modules>
</project>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml
index 41b507c..8ff8592 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml
@@ -19,3 +19,8 @@ port: 8888
blockQueueSize: 10000
pushTimeout: 1000
workerThread: 30
+#name: elasticjob
+#registryCenter:
+# type: zookeeper
+# serverLists: localhost:2181
+# props:
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
index ca34fc5..74e583c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
@@ -35,6 +35,8 @@ public final class JobConfiguration {
private String jobName;
+ private boolean running = true;
+
private String[] shardingTables;
private int shardingItem;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java
index f9ba284..0a7c578 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java
@@ -17,11 +17,13 @@
package org.apache.shardingsphere.scaling.core.config;
-import org.apache.shardingsphere.scaling.core.execute.engine.ShardingScalingExecuteEngine;
-
+import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.governance.core.yaml.swapper.GovernanceCenterConfigurationYamlSwapper;
+import org.apache.shardingsphere.scaling.core.execute.engine.ShardingScalingExecuteEngine;
+import org.apache.shardingsphere.scaling.core.spi.ElasticJobEntryLoader;
/**
* ShardingSphere-Scaling context.
@@ -56,5 +58,12 @@ public final class ScalingContext {
this.serverConfiguration = serverConfiguration;
taskExecuteEngine = new ShardingScalingExecuteEngine(serverConfiguration.getWorkerThread());
importerExecuteEngine = new ShardingScalingExecuteEngine(serverConfiguration.getWorkerThread());
+ initElasticJobEntry(serverConfiguration);
+ }
+
+ private void initElasticJobEntry(final ServerConfiguration serverConfiguration) {
+ if (!Strings.isNullOrEmpty(serverConfiguration.getName()) && null != serverConfiguration.getRegistryCenter()) {
+ ElasticJobEntryLoader.init(serverConfiguration.getName(), new GovernanceCenterConfigurationYamlSwapper().swapToObject(serverConfiguration.getRegistryCenter()));
+ }
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManager.java
index f24d568..4eaa300 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManager.java
@@ -56,9 +56,9 @@ public final class DataSourceManager implements AutoCloseable {
private void createSourceDatasources(final List<SyncConfiguration> syncConfigs) {
for (SyncConfiguration syncConfiguration : syncConfigs) {
DataSourceConfiguration dataSourceConfig = syncConfiguration.getDumperConfiguration().getDataSourceConfiguration();
- DataSourceWrapper hikariDataSource = dataSourceFactory.newInstance(dataSourceConfig);
- cachedDataSources.put(dataSourceConfig, hikariDataSource);
- sourceDatasources.put(dataSourceConfig, hikariDataSource);
+ DataSourceWrapper dataSource = dataSourceFactory.newInstance(dataSourceConfig);
+ cachedDataSources.put(dataSourceConfig, dataSource);
+ sourceDatasources.put(dataSourceConfig, dataSource);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ElasticJobEntry.java
similarity index 65%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ElasticJobEntry.java
index ca34fc5..425da75 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ElasticJobEntry.java
@@ -15,28 +15,20 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.config;
+package org.apache.shardingsphere.scaling.core.spi;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
+import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
/**
- * Job configuration.
+ * Elastic job entry.
*/
-@NoArgsConstructor
-@Setter
-@Getter
-public final class JobConfiguration {
-
- private int concurrency = 3;
-
- private int retryTimes = 3;
-
- private String jobName;
-
- private String[] shardingTables;
-
- private int shardingItem;
+public interface ElasticJobEntry {
+ /**
+ * Init elastic job.
+ *
+ * @param namespace registry center namespace
+ * @param registryCenter registry center
+ */
+ void init(String namespace, GovernanceCenterConfiguration registryCenter);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ElasticJobEntryLoader.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ElasticJobEntryLoader.java
new file mode 100644
index 0000000..ead7b53
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ElasticJobEntryLoader.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.spi;
+
+import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
+import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
+
+import java.util.Collection;
+
+/**
+ * Elastic job entry loader.
+ */
+public final class ElasticJobEntryLoader {
+
+ /**
+ * Init elastic job entry.
+ *
+ * @param namespace registry center namespace
+ * @param registryCenter registry center
+ */
+ public static void init(final String namespace, final GovernanceCenterConfiguration registryCenter) {
+ ShardingSphereServiceLoader.register(ElasticJobEntry.class);
+ Collection<ElasticJobEntry> elasticJobEntries = ShardingSphereServiceLoader.newServiceInstances(ElasticJobEntry.class);
+ for (ElasticJobEntry each : elasticJobEntries) {
+ each.init(namespace, registryCenter);
+ }
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroupTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroupTest.java
index 87d34fb..5028ec8 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroupTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroupTest.java
@@ -19,22 +19,29 @@ package org.apache.shardingsphere.scaling.core.job.task.inventory;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.job.SyncProgress;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import java.util.Collections;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+@RunWith(MockitoJUnitRunner.class)
public final class InventoryDataScalingTaskGroupTest {
private DataSourceManager dataSourceManager;
+ @Mock
+ private ScalingTask<InventoryPosition> scalingTask;
+
@Before
public void setUp() {
dataSourceManager = new DataSourceManager();
@@ -47,7 +54,6 @@ public final class InventoryDataScalingTaskGroupTest {
@Test
public void assertStart() {
- ScalingTask scalingTask = mock(ScalingTask.class);
InventoryDataScalingTaskGroup inventoryDataSyncTaskGroup = new InventoryDataScalingTaskGroup(Collections.singletonList(scalingTask));
inventoryDataSyncTaskGroup.start();
verify(scalingTask).start();
@@ -55,7 +61,6 @@ public final class InventoryDataScalingTaskGroupTest {
@Test
public void assertStop() {
- ScalingTask scalingTask = mock(ScalingTask.class);
InventoryDataScalingTaskGroup inventoryDataSyncTaskGroup = new InventoryDataScalingTaskGroup(Collections.singletonList(scalingTask));
inventoryDataSyncTaskGroup.stop();
verify(scalingTask).stop();
diff --git a/shardingsphere-scaling/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/pom.xml
old mode 100755
new mode 100644
similarity index 50%
copy from shardingsphere-scaling/pom.xml
copy to shardingsphere-scaling/shardingsphere-scaling-elasticjob/pom.xml
index 48987d2..694c37e
--- a/shardingsphere-scaling/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/pom.xml
@@ -16,22 +16,41 @@
~ limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.shardingsphere</groupId>
- <artifactId>shardingsphere</artifactId>
+ <artifactId>shardingsphere-scaling</artifactId>
<version>5.0.0-RC1-SNAPSHOT</version>
</parent>
- <artifactId>shardingsphere-scaling</artifactId>
- <packaging>pom</packaging>
+ <artifactId>shardingsphere-scaling-elasticjob</artifactId>
<name>${project.artifactId}</name>
-
- <modules>
- <module>shardingsphere-scaling-core</module>
- <module>shardingsphere-scaling-bootstrap</module>
- <module>shardingsphere-scaling-mysql</module>
- <module>shardingsphere-scaling-postgresql</module>
- </modules>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-scaling-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-lite-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-lite-lifecycle</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
</project>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ScalingElasticJobEntry.java b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ScalingElasticJobEntry.java
new file mode 100644
index 0000000..debbd3b
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ScalingElasticJobEntry.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.elasticjob;
+
+import com.google.common.base.Strings;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonSyntaxException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import org.apache.shardingsphere.elasticjob.lite.internal.election.LeaderService;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobAPIFactory;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
+import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
+import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
+import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent;
+import org.apache.shardingsphere.governance.repository.zookeeper.CuratorZookeeperRepository;
+import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.spi.ElasticJobEntry;
+import org.apache.shardingsphere.scaling.elasticjob.job.ScalingElasticJob;
+
+import java.util.Optional;
+
+/**
+ * Scaling elastic job entry.
+ */
+@Slf4j
+public final class ScalingElasticJobEntry implements ElasticJobEntry {
+
+ private static final String SCALING_JOB_NAME = "ScalingJob";
+
+ private static final String SCALING_JOB_CONFIG = "/__scalingjob_config";
+
+ private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().serializeNulls().create();
+
+ private final CuratorZookeeperRepository curatorZookeeperRepository = new CuratorZookeeperRepository();
+
+ private OneOffJobBootstrap scalingJobBootstrap;
+
+ private boolean running;
+
+ private String namespace;
+
+ private GovernanceCenterConfiguration registryCenter;
+
+ @Override
+ public void init(final String namespace, final GovernanceCenterConfiguration registryCenter) {
+ log.info("Scaling elastic job start...");
+ this.namespace = namespace;
+ this.registryCenter = registryCenter;
+ initConfigurationRepository();
+ watchConfigurationRepository();
+ }
+
+ private void initConfigurationRepository() {
+ scalingJobBootstrap = new OneOffJobBootstrap(createRegistryCenter(), new ScalingElasticJob(), createJobConfiguration());
+ curatorZookeeperRepository.init(namespace, registryCenter);
+ }
+
+ private CoordinatorRegistryCenter createRegistryCenter() {
+ ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(registryCenter.getServerLists(), namespace);
+ zkConfig.setMaxSleepTimeMilliseconds(getProperty("max.sleep.time.milliseconds", zkConfig.getMaxSleepTimeMilliseconds()));
+ zkConfig.setBaseSleepTimeMilliseconds(getProperty("base.sleep.time.milliseconds", zkConfig.getBaseSleepTimeMilliseconds()));
+ zkConfig.setConnectionTimeoutMilliseconds(getProperty("connection.timeout.milliseconds", zkConfig.getConnectionTimeoutMilliseconds()));
+ zkConfig.setSessionTimeoutMilliseconds(getProperty("session.timeout.milliseconds", zkConfig.getSessionTimeoutMilliseconds()));
+ CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zkConfig);
+ regCenter.init();
+ return regCenter;
+ }
+
+ private int getProperty(final String key, final int defaultValue) {
+ if (Strings.isNullOrEmpty(registryCenter.getProps().getProperty(key))) {
+ return defaultValue;
+ }
+ return Integer.parseInt(registryCenter.getProps().getProperty(key));
+ }
+
+ private JobConfiguration createJobConfiguration() {
+ return createJobConfiguration(1, null);
+ }
+
+ private JobConfiguration createJobConfiguration(final int shardingTotalCount, final String jobParameter) {
+ return JobConfiguration.newBuilder(SCALING_JOB_NAME, shardingTotalCount).jobParameter(jobParameter).build();
+ }
+
+ private void watchConfigurationRepository() {
+ curatorZookeeperRepository.watch(SCALING_JOB_CONFIG, event -> {
+ Optional<ScalingConfiguration> scalingConfiguration = getScalingConfiguration(event);
+ if (!scalingConfiguration.isPresent()) {
+ return;
+ }
+ switch (event.getChangedType()) {
+ case ADDED:
+ case UPDATED:
+ executeJob(scalingConfiguration.get());
+ break;
+ case DELETED:
+ deleteJob(scalingConfiguration.get());
+ break;
+ default:
+ break;
+ }
+ });
+ }
+
+ private Optional<ScalingConfiguration> getScalingConfiguration(final DataChangedEvent event) {
+ try {
+ log.info("{} scaling config: {}", event.getChangedType(), event.getValue());
+ return Optional.of(GSON.fromJson(event.getValue(), ScalingConfiguration.class));
+ } catch (JsonSyntaxException ex) {
+ log.error("analyze scaling config failed.", ex);
+ }
+ return Optional.empty();
+ }
+
+ private void executeJob(final ScalingConfiguration scalingConfiguration) {
+ if (running && scalingConfiguration.getJobConfiguration().isRunning()) {
+ log.warn("scaling elastic job has already running, ignore current config.");
+ return;
+ }
+ if (running == scalingConfiguration.getJobConfiguration().isRunning()) {
+ return;
+ }
+ if (new LeaderService(createRegistryCenter(), SCALING_JOB_NAME).isLeader()) {
+ log.info("leader worker update config.");
+ updateJobConfiguration(scalingConfiguration);
+ }
+ scalingJobBootstrap.execute();
+ running = scalingConfiguration.getJobConfiguration().isRunning();
+ }
+
+ private void deleteJob(final ScalingConfiguration scalingConfiguration) {
+ scalingConfiguration.getJobConfiguration().setRunning(false);
+ executeJob(scalingConfiguration);
+ }
+
+ private void updateJobConfiguration(final ScalingConfiguration scalingConfiguration) {
+ JobConfigurationAPI jobConfigurationAPI = JobAPIFactory.createJobConfigurationAPI(registryCenter.getServerLists(), namespace, null);
+ jobConfigurationAPI.updateJobConfiguration(
+ JobConfigurationPOJO.fromJobConfiguration(createJobConfiguration(scalingConfiguration.getJobConfiguration().getShardingTables().length, GSON.toJson(scalingConfiguration))));
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java
new file mode 100644
index 0000000..a90ae7b
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.elasticjob.job;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import org.apache.shardingsphere.scaling.core.ScalingJobController;
+import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
+import org.apache.shardingsphere.scaling.core.utils.SyncConfigurationUtil;
+
+/**
+ * Scaling elastic job.
+ */
+@Slf4j
+public final class ScalingElasticJob implements SimpleJob {
+
+ private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().serializeNulls().create();
+
+ private static final ScalingJobController SCALING_JOB_CONTROLLER = new ScalingJobController();
+
+ private ShardingScalingJob shardingScalingJob;
+
+ @Override
+ public void execute(final ShardingContext shardingContext) {
+ log.info("execute job: {} - {}/{}", shardingContext.getTaskId(), shardingContext.getShardingItem(), shardingContext.getShardingTotalCount());
+ ScalingConfiguration scalingConfiguration = GSON.fromJson(shardingContext.getJobParameter(), ScalingConfiguration.class);
+ if (scalingConfiguration.getJobConfiguration().isRunning()) {
+ startJob(scalingConfiguration, shardingContext);
+ return;
+ }
+ stopJob(shardingContext);
+ }
+
+ private void startJob(final ScalingConfiguration scalingConfiguration, final ShardingContext shardingContext) {
+ log.info("start job: {} - {}", shardingContext.getJobName(), shardingContext.getShardingItem());
+ scalingConfiguration.getJobConfiguration().setShardingItem(shardingContext.getShardingItem());
+ shardingScalingJob = new ShardingScalingJob(scalingConfiguration.getJobConfiguration().getJobName(), scalingConfiguration.getJobConfiguration().getShardingItem());
+ shardingScalingJob.getSyncConfigurations().addAll(SyncConfigurationUtil.toSyncConfigurations(scalingConfiguration));
+ SCALING_JOB_CONTROLLER.start(shardingScalingJob);
+ }
+
+ private void stopJob(final ShardingContext shardingContext) {
+ log.info("stop job: {} - {}", shardingContext.getJobName(), shardingContext.getShardingItem());
+ if (null != shardingScalingJob) {
+ SCALING_JOB_CONTROLLER.stop(shardingScalingJob.getJobId());
+ shardingScalingJob = null;
+ }
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ElasticJobEntry
similarity index 92%
copy from shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml
copy to shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ElasticJobEntry
index 41b507c..9088816 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ElasticJobEntry
@@ -15,7 +15,4 @@
# limitations under the License.
#
-port: 8888
-blockQueueSize: 10000
-pushTimeout: 1000
-workerThread: 30
+org.apache.shardingsphere.scaling.elasticjob.ScalingElasticJobEntry