You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/09/08 06:22:10 UTC

[shardingsphere] branch scaling-dev updated: Add shardingsphere-scaling-elasticjob module (#7310)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch scaling-dev
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/scaling-dev by this push:
     new 2822a72  Add shardingsphere-scaling-elasticjob module (#7310)
2822a72 is described below

commit 2822a72b26da615678335c8171fa5a4b48839d44
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Tue Sep 8 14:21:50 2020 +0800

    Add shardingsphere-scaling-elasticjob module (#7310)
    
    * add shardingsphere-scaling-elasticjob module
    
    * add a blank line in the end of file
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 pom.xml                                            |  21 ++-
 shardingsphere-scaling/pom.xml                     |   1 +
 .../src/main/resources/conf/server.yaml            |   5 +
 .../scaling/core/config/JobConfiguration.java      |   2 +
 .../scaling/core/config/ScalingContext.java        |  13 +-
 .../scaling/core/datasource/DataSourceManager.java |   6 +-
 .../ElasticJobEntry.java}                          |  30 ++--
 .../scaling/core/spi/ElasticJobEntryLoader.java    |  43 ++++++
 .../InventoryDataScalingTaskGroupTest.java         |  11 +-
 .../pom.xml                                        |  41 ++++--
 .../scaling/elasticjob/ScalingElasticJobEntry.java | 161 +++++++++++++++++++++
 .../scaling/elasticjob/job/ScalingElasticJob.java  |  68 +++++++++
 ...hardingsphere.scaling.core.spi.ElasticJobEntry} |   5 +-
 13 files changed, 363 insertions(+), 44 deletions(-)

diff --git a/pom.xml b/pom.xml
index 44b3bc5..e3a062a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,7 +132,8 @@
         <os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
         <dockerfile-maven.version>1.4.6</dockerfile-maven.version>
         <docker-compose-maven-plugin.version>2.3.0</docker-compose-maven-plugin.version>
-        
+        <elasticjob.version>3.0.0-alpha</elasticjob.version>
+
         <javadocExecutable>${java.home}/../bin/javadoc</javadocExecutable>
         <maven.deploy.skip>false</maven.deploy.skip>
         <argLine>-Xmx1024m -XX:MaxMetaspaceSize=256m</argLine>
@@ -366,7 +367,23 @@
                 <version>${lombok.version}</version>
                 <scope>provided</scope>
             </dependency>
-            
+
+            <dependency>
+                <groupId>org.apache.shardingsphere.elasticjob</groupId>
+                <artifactId>elasticjob-lite-core</artifactId>
+                <version>${elasticjob.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.shardingsphere.elasticjob</groupId>
+                <artifactId>elasticjob-api</artifactId>
+                <version>${elasticjob.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.shardingsphere.elasticjob</groupId>
+                <artifactId>elasticjob-lite-lifecycle</artifactId>
+                <version>${elasticjob.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-configuration-processor</artifactId>
diff --git a/shardingsphere-scaling/pom.xml b/shardingsphere-scaling/pom.xml
index 48987d2..143b50b 100755
--- a/shardingsphere-scaling/pom.xml
+++ b/shardingsphere-scaling/pom.xml
@@ -33,5 +33,6 @@
         <module>shardingsphere-scaling-bootstrap</module>
         <module>shardingsphere-scaling-mysql</module>
         <module>shardingsphere-scaling-postgresql</module>
+        <module>shardingsphere-scaling-elasticjob</module>
     </modules>
 </project>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml
index 41b507c..8ff8592 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml
@@ -19,3 +19,8 @@ port: 8888
 blockQueueSize: 10000
 pushTimeout: 1000
 workerThread: 30
+#name: elasticjob
+#registryCenter:
+#  type: zookeeper
+#  serverLists: localhost:2181
+#  props:
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
index ca34fc5..74e583c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
@@ -35,6 +35,8 @@ public final class JobConfiguration {
     
     private String jobName;
     
+    private boolean running = true;
+    
     private String[] shardingTables;
     
     private int shardingItem;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java
index f9ba284..0a7c578 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java
@@ -17,11 +17,13 @@
 
 package org.apache.shardingsphere.scaling.core.config;
 
-import org.apache.shardingsphere.scaling.core.execute.engine.ShardingScalingExecuteEngine;
-
+import com.google.common.base.Strings;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.governance.core.yaml.swapper.GovernanceCenterConfigurationYamlSwapper;
+import org.apache.shardingsphere.scaling.core.execute.engine.ShardingScalingExecuteEngine;
+import org.apache.shardingsphere.scaling.core.spi.ElasticJobEntryLoader;
 
 /**
  * ShardingSphere-Scaling context.
@@ -56,5 +58,12 @@ public final class ScalingContext {
         this.serverConfiguration = serverConfiguration;
         taskExecuteEngine = new ShardingScalingExecuteEngine(serverConfiguration.getWorkerThread());
         importerExecuteEngine = new ShardingScalingExecuteEngine(serverConfiguration.getWorkerThread());
+        initElasticJobEntry(serverConfiguration);
+    }
+    
+    private void initElasticJobEntry(final ServerConfiguration serverConfiguration) {
+        if (!Strings.isNullOrEmpty(serverConfiguration.getName()) && null != serverConfiguration.getRegistryCenter()) {
+            ElasticJobEntryLoader.init(serverConfiguration.getName(), new GovernanceCenterConfigurationYamlSwapper().swapToObject(serverConfiguration.getRegistryCenter()));
+        }
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManager.java
index f24d568..4eaa300 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManager.java
@@ -56,9 +56,9 @@ public final class DataSourceManager implements AutoCloseable {
     private void createSourceDatasources(final List<SyncConfiguration> syncConfigs) {
         for (SyncConfiguration syncConfiguration : syncConfigs) {
             DataSourceConfiguration dataSourceConfig = syncConfiguration.getDumperConfiguration().getDataSourceConfiguration();
-            DataSourceWrapper hikariDataSource = dataSourceFactory.newInstance(dataSourceConfig);
-            cachedDataSources.put(dataSourceConfig, hikariDataSource);
-            sourceDatasources.put(dataSourceConfig, hikariDataSource);
+            DataSourceWrapper dataSource = dataSourceFactory.newInstance(dataSourceConfig);
+            cachedDataSources.put(dataSourceConfig, dataSource);
+            sourceDatasources.put(dataSourceConfig, dataSource);
         }
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ElasticJobEntry.java
similarity index 65%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ElasticJobEntry.java
index ca34fc5..425da75 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ElasticJobEntry.java
@@ -15,28 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.config;
+package org.apache.shardingsphere.scaling.core.spi;
 
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
+import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
 
 /**
- * Job configuration.
+ * Elastic job entry.
  */
-@NoArgsConstructor
-@Setter
-@Getter
-public final class JobConfiguration {
-    
-    private int concurrency = 3;
-    
-    private int retryTimes = 3;
-    
-    private String jobName;
-    
-    private String[] shardingTables;
-    
-    private int shardingItem;
+public interface ElasticJobEntry {
     
+    /**
+     * Init elastic job.
+     *
+     * @param namespace registry center namespace
+     * @param registryCenter registry center
+     */
+    void init(String namespace, GovernanceCenterConfiguration registryCenter);
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ElasticJobEntryLoader.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ElasticJobEntryLoader.java
new file mode 100644
index 0000000..ead7b53
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ElasticJobEntryLoader.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.spi;
+
+import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
+import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
+
+import java.util.Collection;
+
+/**
+ * Elastic job entry loader.
+ */
+public final class ElasticJobEntryLoader {
+    
+    /**
+     * Init elastic job entry.
+     *
+     * @param namespace registry center namespace
+     * @param registryCenter registry center
+     */
+    public static void init(final String namespace, final GovernanceCenterConfiguration registryCenter) {
+        ShardingSphereServiceLoader.register(ElasticJobEntry.class);
+        Collection<ElasticJobEntry> elasticJobEntries = ShardingSphereServiceLoader.newServiceInstances(ElasticJobEntry.class);
+        for (ElasticJobEntry each : elasticJobEntries) {
+            each.init(namespace, registryCenter);
+        }
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroupTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroupTest.java
index 87d34fb..5028ec8 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroupTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroupTest.java
@@ -19,22 +19,29 @@ package org.apache.shardingsphere.scaling.core.job.task.inventory;
 
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.job.SyncProgress;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.Collections;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
+@RunWith(MockitoJUnitRunner.class)
 public final class InventoryDataScalingTaskGroupTest {
     
     private DataSourceManager dataSourceManager;
     
+    @Mock
+    private ScalingTask<InventoryPosition> scalingTask;
+    
     @Before
     public void setUp() {
         dataSourceManager = new DataSourceManager();
@@ -47,7 +54,6 @@ public final class InventoryDataScalingTaskGroupTest {
     
     @Test
     public void assertStart() {
-        ScalingTask scalingTask = mock(ScalingTask.class);
         InventoryDataScalingTaskGroup inventoryDataSyncTaskGroup = new InventoryDataScalingTaskGroup(Collections.singletonList(scalingTask));
         inventoryDataSyncTaskGroup.start();
         verify(scalingTask).start();
@@ -55,7 +61,6 @@ public final class InventoryDataScalingTaskGroupTest {
     
     @Test
     public void assertStop() {
-        ScalingTask scalingTask = mock(ScalingTask.class);
         InventoryDataScalingTaskGroup inventoryDataSyncTaskGroup = new InventoryDataScalingTaskGroup(Collections.singletonList(scalingTask));
         inventoryDataSyncTaskGroup.stop();
         verify(scalingTask).stop();
diff --git a/shardingsphere-scaling/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/pom.xml
old mode 100755
new mode 100644
similarity index 50%
copy from shardingsphere-scaling/pom.xml
copy to shardingsphere-scaling/shardingsphere-scaling-elasticjob/pom.xml
index 48987d2..694c37e
--- a/shardingsphere-scaling/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/pom.xml
@@ -16,22 +16,41 @@
   ~ limitations under the License.
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.shardingsphere</groupId>
-        <artifactId>shardingsphere</artifactId>
+        <artifactId>shardingsphere-scaling</artifactId>
         <version>5.0.0-RC1-SNAPSHOT</version>
     </parent>
-    <artifactId>shardingsphere-scaling</artifactId>
-    <packaging>pom</packaging>
+    <artifactId>shardingsphere-scaling-elasticjob</artifactId>
     <name>${project.artifactId}</name>
-    
-    <modules>
-        <module>shardingsphere-scaling-core</module>
-        <module>shardingsphere-scaling-bootstrap</module>
-        <module>shardingsphere-scaling-mysql</module>
-        <module>shardingsphere-scaling-postgresql</module>
-    </modules>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-scaling-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere.elasticjob</groupId>
+            <artifactId>elasticjob-lite-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere.elasticjob</groupId>
+            <artifactId>elasticjob-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere.elasticjob</groupId>
+            <artifactId>elasticjob-lite-lifecycle</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <scope>runtime</scope>
+        </dependency>
+    </dependencies>
 </project>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ScalingElasticJobEntry.java b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ScalingElasticJobEntry.java
new file mode 100644
index 0000000..debbd3b
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ScalingElasticJobEntry.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.elasticjob;
+
+import com.google.common.base.Strings;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonSyntaxException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import org.apache.shardingsphere.elasticjob.lite.internal.election.LeaderService;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobAPIFactory;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
+import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
+import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
+import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent;
+import org.apache.shardingsphere.governance.repository.zookeeper.CuratorZookeeperRepository;
+import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.spi.ElasticJobEntry;
+import org.apache.shardingsphere.scaling.elasticjob.job.ScalingElasticJob;
+
+import java.util.Optional;
+
+/**
+ * Scaling elastic job entry.
+ */
+@Slf4j
+public final class ScalingElasticJobEntry implements ElasticJobEntry {
+    
+    private static final String SCALING_JOB_NAME = "ScalingJob";
+    
+    private static final String SCALING_JOB_CONFIG = "/__scalingjob_config";
+    
+    private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().serializeNulls().create();
+    
+    private final CuratorZookeeperRepository curatorZookeeperRepository = new CuratorZookeeperRepository();
+    
+    private OneOffJobBootstrap scalingJobBootstrap;
+    
+    private boolean running;
+    
+    private String namespace;
+    
+    private GovernanceCenterConfiguration registryCenter;
+    
+    @Override
+    public void init(final String namespace, final GovernanceCenterConfiguration registryCenter) {
+        log.info("Scaling elastic job start...");
+        this.namespace = namespace;
+        this.registryCenter = registryCenter;
+        initConfigurationRepository();
+        watchConfigurationRepository();
+    }
+    
+    private void initConfigurationRepository() {
+        scalingJobBootstrap = new OneOffJobBootstrap(createRegistryCenter(), new ScalingElasticJob(), createJobConfiguration());
+        curatorZookeeperRepository.init(namespace, registryCenter);
+    }
+    
+    private CoordinatorRegistryCenter createRegistryCenter() {
+        ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(registryCenter.getServerLists(), namespace);
+        zkConfig.setMaxSleepTimeMilliseconds(getProperty("max.sleep.time.milliseconds", zkConfig.getMaxSleepTimeMilliseconds()));
+        zkConfig.setBaseSleepTimeMilliseconds(getProperty("base.sleep.time.milliseconds", zkConfig.getBaseSleepTimeMilliseconds()));
+        zkConfig.setConnectionTimeoutMilliseconds(getProperty("connection.timeout.milliseconds", zkConfig.getConnectionTimeoutMilliseconds()));
+        zkConfig.setSessionTimeoutMilliseconds(getProperty("session.timeout.milliseconds", zkConfig.getSessionTimeoutMilliseconds()));
+        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zkConfig);
+        regCenter.init();
+        return regCenter;
+    }
+    
+    private int getProperty(final String key, final int defaultValue) {
+        if (Strings.isNullOrEmpty(registryCenter.getProps().getProperty(key))) {
+            return defaultValue;
+        }
+        return Integer.parseInt(registryCenter.getProps().getProperty(key));
+    }
+    
+    private JobConfiguration createJobConfiguration() {
+        return createJobConfiguration(1, null);
+    }
+    
+    private JobConfiguration createJobConfiguration(final int shardingTotalCount, final String jobParameter) {
+        return JobConfiguration.newBuilder(SCALING_JOB_NAME, shardingTotalCount).jobParameter(jobParameter).build();
+    }
+    
+    private void watchConfigurationRepository() {
+        curatorZookeeperRepository.watch(SCALING_JOB_CONFIG, event -> {
+            Optional<ScalingConfiguration> scalingConfiguration = getScalingConfiguration(event);
+            if (!scalingConfiguration.isPresent()) {
+                return;
+            }
+            switch (event.getChangedType()) {
+                case ADDED:
+                case UPDATED:
+                    executeJob(scalingConfiguration.get());
+                    break;
+                case DELETED:
+                    deleteJob(scalingConfiguration.get());
+                    break;
+                default:
+                    break;
+            }
+        });
+    }
+    
+    private Optional<ScalingConfiguration> getScalingConfiguration(final DataChangedEvent event) {
+        try {
+            log.info("{} scaling config: {}", event.getChangedType(), event.getValue());
+            return Optional.of(GSON.fromJson(event.getValue(), ScalingConfiguration.class));
+        } catch (JsonSyntaxException ex) {
+            log.error("analyze scaling config failed.", ex);
+        }
+        return Optional.empty();
+    }
+    
+    private void executeJob(final ScalingConfiguration scalingConfiguration) {
+        if (running && scalingConfiguration.getJobConfiguration().isRunning()) {
+            log.warn("scaling elastic job has already running, ignore current config.");
+            return;
+        }
+        if (running == scalingConfiguration.getJobConfiguration().isRunning()) {
+            return;
+        }
+        if (new LeaderService(createRegistryCenter(), SCALING_JOB_NAME).isLeader()) {
+            log.info("leader worker update config.");
+            updateJobConfiguration(scalingConfiguration);
+        }
+        scalingJobBootstrap.execute();
+        running = scalingConfiguration.getJobConfiguration().isRunning();
+    }
+    
+    private void deleteJob(final ScalingConfiguration scalingConfiguration) {
+        scalingConfiguration.getJobConfiguration().setRunning(false);
+        executeJob(scalingConfiguration);
+    }
+    
+    private void updateJobConfiguration(final ScalingConfiguration scalingConfiguration) {
+        JobConfigurationAPI jobConfigurationAPI = JobAPIFactory.createJobConfigurationAPI(registryCenter.getServerLists(), namespace, null);
+        jobConfigurationAPI.updateJobConfiguration(
+                JobConfigurationPOJO.fromJobConfiguration(createJobConfiguration(scalingConfiguration.getJobConfiguration().getShardingTables().length, GSON.toJson(scalingConfiguration))));
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java
new file mode 100644
index 0000000..a90ae7b
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.elasticjob.job;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import org.apache.shardingsphere.scaling.core.ScalingJobController;
+import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
+import org.apache.shardingsphere.scaling.core.utils.SyncConfigurationUtil;
+
+/**
+ * Scaling elastic job.
+ */
+@Slf4j
+public final class ScalingElasticJob implements SimpleJob {
+    
+    private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().serializeNulls().create();
+    
+    private static final ScalingJobController SCALING_JOB_CONTROLLER = new ScalingJobController();
+    
+    private ShardingScalingJob shardingScalingJob;
+    
+    @Override
+    public void execute(final ShardingContext shardingContext) {
+        log.info("execute job: {} - {}/{}", shardingContext.getTaskId(), shardingContext.getShardingItem(), shardingContext.getShardingTotalCount());
+        ScalingConfiguration scalingConfiguration = GSON.fromJson(shardingContext.getJobParameter(), ScalingConfiguration.class);
+        if (scalingConfiguration.getJobConfiguration().isRunning()) {
+            startJob(scalingConfiguration, shardingContext);
+            return;
+        }
+        stopJob(shardingContext);
+    }
+    
+    private void startJob(final ScalingConfiguration scalingConfiguration, final ShardingContext shardingContext) {
+        log.info("start job: {} - {}", shardingContext.getJobName(), shardingContext.getShardingItem());
+        scalingConfiguration.getJobConfiguration().setShardingItem(shardingContext.getShardingItem());
+        shardingScalingJob = new ShardingScalingJob(scalingConfiguration.getJobConfiguration().getJobName(), scalingConfiguration.getJobConfiguration().getShardingItem());
+        shardingScalingJob.getSyncConfigurations().addAll(SyncConfigurationUtil.toSyncConfigurations(scalingConfiguration));
+        SCALING_JOB_CONTROLLER.start(shardingScalingJob);
+    }
+    
+    private void stopJob(final ShardingContext shardingContext) {
+        log.info("stop job: {} - {}", shardingContext.getJobName(), shardingContext.getShardingItem());
+        if (null != shardingScalingJob) {
+            SCALING_JOB_CONTROLLER.stop(shardingScalingJob.getJobId());
+            shardingScalingJob = null;
+        }
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ElasticJobEntry
similarity index 92%
copy from shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml
copy to shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ElasticJobEntry
index 41b507c..9088816 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ElasticJobEntry
@@ -15,7 +15,4 @@
 # limitations under the License.
 #
 
-port: 8888
-blockQueueSize: 10000
-pushTimeout: 1000
-workerThread: 30
+org.apache.shardingsphere.scaling.elasticjob.ScalingElasticJobEntry