You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/02/02 02:50:01 UTC

[shardingsphere] branch master updated: Optimize ScalingJobExecutor (#9260)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 77f5ef8  Optimize ScalingJobExecutor (#9260)
77f5ef8 is described below

commit 77f5ef8c1411e7d69ef853d8b5a20097ad00f934
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Tue Feb 2 10:48:30 2021 +0800

    Optimize ScalingJobExecutor (#9260)
    
    * Optimize FinishedCheckJobExecutor
    
    * Optimize ScalingJobExecutor
    
    * Optimize ScalingJobExecutor
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../scaling/core/api/RegistryRepositoryAPI.java    |   8 ++
 .../scaling/core/api/ScalingAPI.java               |  14 +--
 .../scaling/core/api/ScalingAPIFactory.java        |   2 +-
 .../core/api/impl/RegistryRepositoryAPIImpl.java   |  20 +--
 .../scaling/core/api/impl/ScalingAPIImpl.java      |  43 +++----
 .../executor/job/FinishedCheckJobExecutor.java     |   2 +-
 .../execute/executor/job/ScalingJobExecutor.java   | 139 +++++----------------
 .../scaling/core/job/FinishedCheckJob.java         |  40 +++---
 8 files changed, 102 insertions(+), 166 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java
index 0b4b7fb..2845ed6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java
@@ -66,4 +66,12 @@ public interface RegistryRepositoryAPI {
      * @param listener data changed event listener
      */
     void watch(String key, DataChangedEventListener listener);
+    
+    /**
+     * Persist data.
+     *
+     * @param key key of data
+     * @param value value of data
+     */
+    void persist(String key, String value);
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java
index 9cd124b..4e7bc0a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java
@@ -39,19 +39,19 @@ public interface ScalingAPI {
     List<JobInfo> list();
     
     /**
-     * Start a scaling job by config.
+     * Start a scaling job by id.
      *
-     * @param jobConfig job config
-     * @return job id
+     * @param jobId job id
      */
-    Optional<Long> start(JobConfiguration jobConfig);
+    void start(long jobId);
     
     /**
-     * Start a scaling job by id.
+     * Start a scaling job by config.
      *
-     * @param jobId job id
+     * @param jobConfig job config
+     * @return job id
      */
-    void start(long jobId);
+    Optional<Long> start(JobConfiguration jobConfig);
     
     /**
      * Stop a job.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
index a2cfdc1..61f56c3 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
@@ -96,7 +96,7 @@ public final class ScalingAPIFactory {
             GovernanceConfiguration governanceConfig = ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
             GovernanceCenterConfiguration registryCenterConfig = governanceConfig.getRegistryCenterConfiguration();
             RegistryRepository registryRepository = TypedSPIRegistry.getRegisteredService(RegistryRepository.class, registryCenterConfig.getType(), registryCenterConfig.getProps());
-            registryRepository.init(governanceConfig.getName() + ScalingConstant.SCALING_ROOT, registryCenterConfig);
+            registryRepository.init(governanceConfig.getName(), registryCenterConfig);
             INSTANCE = new RegistryRepositoryAPIImpl(registryRepository);
         }
     }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
index 4cd9002..4f413bf 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
 import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEventListener;
 import org.apache.shardingsphere.scaling.core.api.RegistryRepositoryAPI;
+import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
 import org.apache.shardingsphere.scaling.core.job.JobContext;
 import org.apache.shardingsphere.scaling.core.job.position.JobProgress;
 import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTask;
@@ -50,7 +51,7 @@ public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
         jobPosition.setDatabaseType(jobContext.getJobConfig().getHandleConfig().getDatabaseType());
         jobPosition.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(jobContext));
         jobPosition.setInventoryTaskProgressMap(getInventoryTaskProgressMap(jobContext));
-        registryRepository.persist(getPath(jobContext.getJobId(), jobContext.getShardingItem()), jobPosition.toJson());
+        registryRepository.persist(getOffsetPath(jobContext.getJobId(), jobContext.getShardingItem()), jobPosition.toJson());
     }
     
     private Map<String, IncrementalTaskProgress> getIncrementalTaskProgressMap(final JobContext jobContext) {
@@ -69,15 +70,11 @@ public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
         return result;
     }
     
-    private String getPath(final long jobId, final int shardingItem) {
-        return String.format("/%d/offset/%d", jobId, shardingItem);
-    }
-    
     @Override
     public JobProgress getJobProgress(final long jobId, final int shardingItem) {
         String data = null;
         try {
-            data = registryRepository.get(getPath(jobId, shardingItem));
+            data = registryRepository.get(getOffsetPath(jobId, shardingItem));
         } catch (final NullPointerException ex) {
             log.info("job {}-{} without break point.", jobId, shardingItem);
         }
@@ -87,7 +84,7 @@ public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
     @Override
     public void deleteJob(final long jobId) {
         log.info("delete job {}", jobId);
-        registryRepository.delete(String.valueOf(jobId));
+        registryRepository.delete(String.format("%s/%d", ScalingConstant.SCALING_ROOT, jobId));
     }
     
     @Override
@@ -99,4 +96,13 @@ public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
     public void watch(final String key, final DataChangedEventListener listener) {
         registryRepository.watch(key, listener);
     }
+    
+    @Override
+    public void persist(final String key, final String value) {
+        registryRepository.persist(key, value);
+    }
+    
+    private String getOffsetPath(final long jobId, final int shardingItem) {
+        return String.format("%s/%d/offset/%d", ScalingConstant.SCALING_ROOT, jobId, shardingItem);
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
index e4efc97..203fcc0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
@@ -18,13 +18,15 @@
 package org.apache.shardingsphere.scaling.core.api.impl;
 
 import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import org.apache.shardingsphere.scaling.core.api.JobInfo;
 import org.apache.shardingsphere.scaling.core.api.ScalingAPI;
 import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
 import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
 import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundException;
 import org.apache.shardingsphere.scaling.core.job.JobContext;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
@@ -34,7 +36,6 @@ import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckerFa
 import org.apache.shardingsphere.scaling.core.job.environmental.ScalingEnvironmentalManager;
 import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
 import org.apache.shardingsphere.scaling.core.job.position.JobProgress;
-import org.apache.shardingsphere.scaling.core.utils.ElasticJobUtil;
 import org.apache.shardingsphere.scaling.core.utils.JobConfigurationUtil;
 
 import java.sql.SQLException;
@@ -50,6 +51,8 @@ import java.util.stream.Stream;
 @Slf4j
 public final class ScalingAPIImpl implements ScalingAPI {
     
+    private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().serializeNulls().create();
+    
     @Override
     public List<JobInfo> list() {
         return ScalingAPIFactory.getJobStatisticsAPI().getAllJobsBriefInfo().stream()
@@ -104,17 +107,6 @@ public final class ScalingAPIImpl implements ScalingAPI {
     }
     
     @Override
-    public Optional<Long> start(final JobConfiguration jobConfig) {
-        log.info("Start scaling job by {}", jobConfig);
-        JobConfigurationUtil.fillInProperties(jobConfig);
-        if (jobConfig.getHandleConfig().getShardingTotalCount() == 0) {
-            return Optional.empty();
-        }
-        executeScalingJob(jobConfig);
-        return Optional.of(jobConfig.getHandleConfig().getJobId());
-    }
-    
-    @Override
     public void start(final long jobId) {
         log.info("Start scaling job {}", jobId);
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
@@ -122,15 +114,24 @@ public final class ScalingAPIImpl implements ScalingAPI {
         ScalingAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
     }
     
-    private void executeScalingJob(final JobConfiguration jobConfig) {
-        log.info("execute scaling job {}", jobConfig.getHandleConfig().getJobId());
-        new OneOffJobBootstrap(ElasticJobUtil.createRegistryCenter(), new ScalingJob(), createElasticJobConfig(jobConfig)).execute();
+    @Override
+    public Optional<Long> start(final JobConfiguration jobConfig) {
+        log.info("Start scaling job by {}", jobConfig);
+        JobConfigurationUtil.fillInProperties(jobConfig);
+        if (jobConfig.getHandleConfig().getShardingTotalCount() == 0) {
+            return Optional.empty();
+        }
+        ScalingAPIFactory.getRegistryRepositoryAPI().persist(String.format("%s/%d", ScalingConstant.SCALING_ROOT, jobConfig.getHandleConfig().getJobId()), ScalingJob.class.getCanonicalName());
+        ScalingAPIFactory.getRegistryRepositoryAPI().persist(String.format("%s/%d/config", ScalingConstant.SCALING_ROOT, jobConfig.getHandleConfig().getJobId()), createElasticJobConfig(jobConfig));
+        return Optional.of(jobConfig.getHandleConfig().getJobId());
     }
     
-    private org.apache.shardingsphere.elasticjob.api.JobConfiguration createElasticJobConfig(final JobConfiguration jobConfig) {
-        return org.apache.shardingsphere.elasticjob.api.JobConfiguration.newBuilder(String.valueOf(jobConfig.getHandleConfig().getJobId()), jobConfig.getHandleConfig().getShardingTotalCount())
-                .jobParameter(new Gson().toJson(jobConfig))
-                .build();
+    private String createElasticJobConfig(final JobConfiguration jobConfig) {
+        JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
+        jobConfigPOJO.setJobName(String.valueOf(jobConfig.getHandleConfig().getJobId()));
+        jobConfigPOJO.setShardingTotalCount(jobConfig.getHandleConfig().getShardingTotalCount());
+        jobConfigPOJO.setJobParameter(GSON.toJson(jobConfig));
+        return YamlEngine.marshal(jobConfigPOJO);
     }
     
     @Override
@@ -177,7 +178,7 @@ public final class ScalingAPIImpl implements ScalingAPI {
     }
     
     private JobConfiguration getJobConfig(final JobConfigurationPOJO elasticJobConfigPOJO) {
-        return new Gson().fromJson(elasticJobConfigPOJO.getJobParameter(), JobConfiguration.class);
+        return GSON.fromJson(elasticJobConfigPOJO.getJobParameter(), JobConfiguration.class);
     }
     
     private JobConfigurationPOJO getElasticJobConfigPOJO(final long jobId) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/FinishedCheckJobExecutor.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/FinishedCheckJobExecutor.java
index 1039852..0da5290 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/FinishedCheckJobExecutor.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/FinishedCheckJobExecutor.java
@@ -31,7 +31,7 @@ import org.apache.shardingsphere.scaling.core.utils.ElasticJobUtil;
 @Slf4j
 public final class FinishedCheckJobExecutor extends AbstractScalingExecutor implements ScalingExecutor {
     
-    private static final String JOB_NAME = "finished_check";
+    private static final String JOB_NAME = "_finished_check";
     
     private static final String CRON_EXPRESSION = "0 * * * * ?";
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
index 73fd0f0..65bf861 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
@@ -17,33 +17,22 @@
 
 package org.apache.shardingsphere.scaling.core.execute.executor.job;
 
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonSyntaxException;
-import lombok.Getter;
-import lombok.Setter;
+import com.google.common.collect.Sets;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import org.apache.shardingsphere.elasticjob.lite.internal.election.LeaderService;
-import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobAPIFactory;
-import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
-import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
 import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent;
-import org.apache.shardingsphere.scaling.core.api.RegistryRepositoryAPI;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
+import org.apache.shardingsphere.scaling.core.api.JobSchedulerCenter;
 import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
-import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
 import org.apache.shardingsphere.scaling.core.execute.executor.AbstractScalingExecutor;
 import org.apache.shardingsphere.scaling.core.execute.executor.ScalingExecutor;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import org.apache.shardingsphere.scaling.core.utils.ElasticJobUtil;
 
-import java.util.Map;
 import java.util.Optional;
-import java.util.regex.Matcher;
+import java.util.Set;
 import java.util.regex.Pattern;
 
 /**
@@ -52,40 +41,33 @@ import java.util.regex.Pattern;
 @Slf4j
 public final class ScalingJobExecutor extends AbstractScalingExecutor implements ScalingExecutor {
     
-    private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().serializeNulls().create();
+    private static final Pattern CONFIG_PATTERN = Pattern.compile(ScalingConstant.SCALING_ROOT + "/(\\d+)/config");
     
-    private static final Pattern CONFIG_PATTERN = Pattern.compile("(\\d+)/config");
-    
-    private final RegistryRepositoryAPI registryRepositoryAPI = ScalingAPIFactory.getRegistryRepositoryAPI();
-    
-    private final Map<String, JobBootstrapWrapper> scalingJobBootstrapMap = Maps.newHashMap();
-    
-    private GovernanceConfiguration governanceConfig;
-    
-    private CoordinatorRegistryCenter registryCenter;
+    private static final Set<String> EXECUTING_JOBS = Sets.newConcurrentHashSet();
     
     @Override
     public void start() {
         super.start();
         log.info("Start scaling job executor.");
-        this.governanceConfig = ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
-        registryCenter = ElasticJobUtil.createRegistryCenter();
-        watchConfigRepository();
+        watchRegistryRepositoryConfig();
     }
     
-    private void watchConfigRepository() {
-        registryRepositoryAPI.watch(ScalingConstant.SCALING_ROOT, event -> {
-            Optional<JobConfiguration> jobConfig = getJobConfig(event);
-            if (!jobConfig.isPresent()) {
+    private void watchRegistryRepositoryConfig() {
+        ScalingAPIFactory.getRegistryRepositoryAPI().watch(ScalingConstant.SCALING_ROOT, event -> {
+            Optional<JobConfigurationPOJO> jobConfigPOJOOptional = getJobConfigPOJO(event);
+            if (!jobConfigPOJOOptional.isPresent()) {
+                return;
+            }
+            JobConfigurationPOJO jobConfigPOJO = jobConfigPOJOOptional.get();
+            if (DataChangedEvent.Type.DELETED.equals(event.getType()) || jobConfigPOJO.isDisabled()) {
+                EXECUTING_JOBS.remove(jobConfigPOJO.getJobName());
+                JobSchedulerCenter.stop(Long.parseLong(jobConfigPOJO.getJobName()));
                 return;
             }
             switch (event.getType()) {
                 case ADDED:
                 case UPDATED:
-                    executeJob(getJobId(event.getKey()), jobConfig.get());
-                    break;
-                case DELETED:
-                    deleteJob(getJobId(event.getKey()), jobConfig.get());
+                    execute(jobConfigPOJO);
                     break;
                 default:
                     break;
@@ -93,84 +75,23 @@ public final class ScalingJobExecutor extends AbstractScalingExecutor implements
         });
     }
     
-    private String getJobId(final String key) {
-        Matcher matcher = CONFIG_PATTERN.matcher(key);
-        if (matcher.matches()) {
-            return matcher.group(1);
-        }
-        return key.split("/")[2];
-    }
-    
-    private Optional<JobConfiguration> getJobConfig(final DataChangedEvent event) {
-        if (!CONFIG_PATTERN.matcher(event.getKey()).matches()) {
-            return Optional.empty();
-        }
+    private Optional<JobConfigurationPOJO> getJobConfigPOJO(final DataChangedEvent event) {
         try {
-            log.info("{} job config: {} = {}", event.getType(), event.getKey(), event.getValue());
-            return Optional.of(GSON.fromJson(event.getValue(), JobConfiguration.class));
-        } catch (JsonSyntaxException ex) {
-            log.error("analyze job config failed.", ex);
+            if (CONFIG_PATTERN.matcher(event.getKey()).matches()) {
+                log.info("{} job config: {} = {}", event.getType(), event.getKey(), event.getValue());
+                return Optional.of(YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class));
+            }
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            log.error("analyze job config pojo failed.", ex);
         }
         return Optional.empty();
     }
     
-    private void executeJob(final String jobId, final JobConfiguration jobConfig) {
-        JobBootstrapWrapper jobBootstrapWrapper = scalingJobBootstrapMap.get(jobId);
-        if (null == jobBootstrapWrapper) {
-            createJob(jobId, jobConfig);
-            return;
-        }
-        updateJob(jobId, jobConfig);
-    }
-    
-    private void createJob(final String jobId, final JobConfiguration jobConfig) {
-        if (jobConfig.getHandleConfig().isRunning()) {
-            JobBootstrapWrapper jobBootstrapWrapper = new JobBootstrapWrapper(jobId, jobConfig);
-            jobBootstrapWrapper.getJobBootstrap().execute();
-            scalingJobBootstrapMap.put(jobId, jobBootstrapWrapper);
-        }
-    }
-    
-    private void updateJob(final String jobId, final JobConfiguration jobConfig) {
-        JobBootstrapWrapper jobBootstrapWrapper = scalingJobBootstrapMap.get(jobId);
-        if (jobBootstrapWrapper.isRunning() && jobConfig.getHandleConfig().isRunning()) {
-            log.warn("scaling elastic job has already running, ignore current config.");
-            return;
-        }
-        if (jobBootstrapWrapper.isRunning() == jobConfig.getHandleConfig().isRunning()) {
-            return;
-        }
-        if (new LeaderService(registryCenter, jobId).isLeader()) {
-            log.info("leader worker update config.");
-            JobAPIFactory.createJobConfigurationAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
-                    governanceConfig.getName() + ScalingConstant.SCALING_ROOT, null)
-                    .updateJobConfiguration(JobConfigurationPOJO.fromJobConfiguration(createJobConfig(jobId, jobConfig)));
-        }
-        jobBootstrapWrapper.setRunning(jobConfig.getHandleConfig().isRunning());
-        jobBootstrapWrapper.getJobBootstrap().execute();
-    }
-    
-    private org.apache.shardingsphere.elasticjob.api.JobConfiguration createJobConfig(final String jobId, final JobConfiguration jobConfig) {
-        return org.apache.shardingsphere.elasticjob.api.JobConfiguration.newBuilder(jobId, jobConfig.getHandleConfig().getShardingTables().length)
-                .jobParameter(GSON.toJson(jobConfig)).overwrite(true).build();
-    }
-    
-    private void deleteJob(final String jobId, final JobConfiguration jobConfig) {
-        jobConfig.getHandleConfig().setRunning(false);
-        executeJob(jobId, jobConfig);
-    }
-    
-    @Getter
-    @Setter
-    private final class JobBootstrapWrapper {
-        
-        private final OneOffJobBootstrap jobBootstrap;
-        
-        private boolean running;
-        
-        private JobBootstrapWrapper(final String jobId, final JobConfiguration jobConfig) {
-            jobBootstrap = new OneOffJobBootstrap(registryCenter, new ScalingJob(), createJobConfig(jobId, jobConfig));
-            running = jobConfig.getHandleConfig().isRunning();
+    private void execute(final JobConfigurationPOJO jobConfigPOJO) {
+        if (EXECUTING_JOBS.add(jobConfigPOJO.getJobName())) {
+            new OneOffJobBootstrap(ElasticJobUtil.createRegistryCenter(), new ScalingJob(), jobConfigPOJO.toJobConfiguration()).execute();
         }
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
index 59302e9..6aa517a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
@@ -32,7 +32,6 @@ import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResu
 import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
 import org.apache.shardingsphere.scaling.core.utils.ThreadUtil;
 
-import java.util.List;
 import java.util.Map;
 
 @Slf4j
@@ -44,25 +43,26 @@ public final class FinishedCheckJob implements SimpleJob {
     
     @Override
     public void execute(final ShardingContext shardingContext) {
-        List<String> jobs = registryRepositoryAPI.getChildrenKeys(ScalingConstant.SCALING_ROOT);
-        for (String each : jobs) {
-            long jobId = Long.parseLong(each);
-            try {
-                JobConfiguration jobConfig = scalingAPI.getJobConfig(jobId);
-                WorkflowConfiguration workflowConfig = jobConfig.getHandleConfig().getWorkflowConfig();
-                if (workflowConfig == null) {
-                    continue;
-                }
-                if (ScalingTaskUtil.almostFinished(scalingAPI.getProgress(jobId), jobConfig.getHandleConfig())) {
-                    log.info("scaling job {} almost finished.", jobId);
-                    trySwitch(jobId, workflowConfig);
-                }
-                // CHECKSTYLE:OFF
-            } catch (final Exception ex) {
-                // CHECKSTYLE:ON
-                log.error("scaling job {} finish check failed!", jobId, ex);
-            }
-        }
+        registryRepositoryAPI.getChildrenKeys(ScalingConstant.SCALING_ROOT).stream()
+                .filter(each -> !each.startsWith("_"))
+                .forEach(each -> {
+                    long jobId = Long.parseLong(each);
+                    try {
+                        JobConfiguration jobConfig = scalingAPI.getJobConfig(jobId);
+                        WorkflowConfiguration workflowConfig = jobConfig.getHandleConfig().getWorkflowConfig();
+                        if (workflowConfig == null) {
+                            return;
+                        }
+                        if (ScalingTaskUtil.almostFinished(scalingAPI.getProgress(jobId), jobConfig.getHandleConfig())) {
+                            log.info("scaling job {} almost finished.", jobId);
+                            trySwitch(jobId, workflowConfig);
+                        }
+                        // CHECKSTYLE:OFF
+                    } catch (final Exception ex) {
+                        // CHECKSTYLE:ON
+                        log.error("scaling job {} finish check failed!", jobId, ex);
+                    }
+                });
     }
     
     private void trySwitch(final long jobId, final WorkflowConfiguration workflowConfig) {