You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/02/02 02:50:01 UTC
[shardingsphere] branch master updated: Optimize ScalingJobExecutor
(#9260)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 77f5ef8 Optimize ScalingJobExecutor (#9260)
77f5ef8 is described below
commit 77f5ef8c1411e7d69ef853d8b5a20097ad00f934
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Tue Feb 2 10:48:30 2021 +0800
Optimize ScalingJobExecutor (#9260)
* Optimize FinishedCheckJobExecutor
* Optimize ScalingJobExecutor
* Optimize ScalingJobExecutor
Co-authored-by: qiulu3 <Lucas209910>
---
.../scaling/core/api/RegistryRepositoryAPI.java | 8 ++
.../scaling/core/api/ScalingAPI.java | 14 +--
.../scaling/core/api/ScalingAPIFactory.java | 2 +-
.../core/api/impl/RegistryRepositoryAPIImpl.java | 20 +--
.../scaling/core/api/impl/ScalingAPIImpl.java | 43 +++----
.../executor/job/FinishedCheckJobExecutor.java | 2 +-
.../execute/executor/job/ScalingJobExecutor.java | 139 +++++----------------
.../scaling/core/job/FinishedCheckJob.java | 40 +++---
8 files changed, 102 insertions(+), 166 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java
index 0b4b7fb..2845ed6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java
@@ -66,4 +66,12 @@ public interface RegistryRepositoryAPI {
* @param listener data changed event listener
*/
void watch(String key, DataChangedEventListener listener);
+
+ /**
+ * Persist data.
+ *
+ * @param key key of data
+ * @param value value of data
+ */
+ void persist(String key, String value);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java
index 9cd124b..4e7bc0a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java
@@ -39,19 +39,19 @@ public interface ScalingAPI {
List<JobInfo> list();
/**
- * Start a scaling job by config.
+ * Start a scaling job by id.
*
- * @param jobConfig job config
- * @return job id
+ * @param jobId job id
*/
- Optional<Long> start(JobConfiguration jobConfig);
+ void start(long jobId);
/**
- * Start a scaling job by id.
+ * Start a scaling job by config.
*
- * @param jobId job id
+ * @param jobConfig job config
+ * @return job id
*/
- void start(long jobId);
+ Optional<Long> start(JobConfiguration jobConfig);
/**
* Stop a job.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
index a2cfdc1..61f56c3 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
@@ -96,7 +96,7 @@ public final class ScalingAPIFactory {
GovernanceConfiguration governanceConfig = ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
GovernanceCenterConfiguration registryCenterConfig = governanceConfig.getRegistryCenterConfiguration();
RegistryRepository registryRepository = TypedSPIRegistry.getRegisteredService(RegistryRepository.class, registryCenterConfig.getType(), registryCenterConfig.getProps());
- registryRepository.init(governanceConfig.getName() + ScalingConstant.SCALING_ROOT, registryCenterConfig);
+ registryRepository.init(governanceConfig.getName(), registryCenterConfig);
INSTANCE = new RegistryRepositoryAPIImpl(registryRepository);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
index 4cd9002..4f413bf 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEventListener;
import org.apache.shardingsphere.scaling.core.api.RegistryRepositoryAPI;
+import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.job.JobContext;
import org.apache.shardingsphere.scaling.core.job.position.JobProgress;
import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTask;
@@ -50,7 +51,7 @@ public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
jobPosition.setDatabaseType(jobContext.getJobConfig().getHandleConfig().getDatabaseType());
jobPosition.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(jobContext));
jobPosition.setInventoryTaskProgressMap(getInventoryTaskProgressMap(jobContext));
- registryRepository.persist(getPath(jobContext.getJobId(), jobContext.getShardingItem()), jobPosition.toJson());
+ registryRepository.persist(getOffsetPath(jobContext.getJobId(), jobContext.getShardingItem()), jobPosition.toJson());
}
private Map<String, IncrementalTaskProgress> getIncrementalTaskProgressMap(final JobContext jobContext) {
@@ -69,15 +70,11 @@ public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
return result;
}
- private String getPath(final long jobId, final int shardingItem) {
- return String.format("/%d/offset/%d", jobId, shardingItem);
- }
-
@Override
public JobProgress getJobProgress(final long jobId, final int shardingItem) {
String data = null;
try {
- data = registryRepository.get(getPath(jobId, shardingItem));
+ data = registryRepository.get(getOffsetPath(jobId, shardingItem));
} catch (final NullPointerException ex) {
log.info("job {}-{} without break point.", jobId, shardingItem);
}
@@ -87,7 +84,7 @@ public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
@Override
public void deleteJob(final long jobId) {
log.info("delete job {}", jobId);
- registryRepository.delete(String.valueOf(jobId));
+ registryRepository.delete(String.format("%s/%d", ScalingConstant.SCALING_ROOT, jobId));
}
@Override
@@ -99,4 +96,13 @@ public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
public void watch(final String key, final DataChangedEventListener listener) {
registryRepository.watch(key, listener);
}
+
+ @Override
+ public void persist(final String key, final String value) {
+ registryRepository.persist(key, value);
+ }
+
+ private String getOffsetPath(final long jobId, final int shardingItem) {
+ return String.format("%s/%d/offset/%d", ScalingConstant.SCALING_ROOT, jobId, shardingItem);
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
index e4efc97..203fcc0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
@@ -18,13 +18,15 @@
package org.apache.shardingsphere.scaling.core.api.impl;
import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.scaling.core.api.JobInfo;
import org.apache.shardingsphere.scaling.core.api.ScalingAPI;
import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundException;
import org.apache.shardingsphere.scaling.core.job.JobContext;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
@@ -34,7 +36,6 @@ import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckerFa
import org.apache.shardingsphere.scaling.core.job.environmental.ScalingEnvironmentalManager;
import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
import org.apache.shardingsphere.scaling.core.job.position.JobProgress;
-import org.apache.shardingsphere.scaling.core.utils.ElasticJobUtil;
import org.apache.shardingsphere.scaling.core.utils.JobConfigurationUtil;
import java.sql.SQLException;
@@ -50,6 +51,8 @@ import java.util.stream.Stream;
@Slf4j
public final class ScalingAPIImpl implements ScalingAPI {
+ private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().serializeNulls().create();
+
@Override
public List<JobInfo> list() {
return ScalingAPIFactory.getJobStatisticsAPI().getAllJobsBriefInfo().stream()
@@ -104,17 +107,6 @@ public final class ScalingAPIImpl implements ScalingAPI {
}
@Override
- public Optional<Long> start(final JobConfiguration jobConfig) {
- log.info("Start scaling job by {}", jobConfig);
- JobConfigurationUtil.fillInProperties(jobConfig);
- if (jobConfig.getHandleConfig().getShardingTotalCount() == 0) {
- return Optional.empty();
- }
- executeScalingJob(jobConfig);
- return Optional.of(jobConfig.getHandleConfig().getJobId());
- }
-
- @Override
public void start(final long jobId) {
log.info("Start scaling job {}", jobId);
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
@@ -122,15 +114,24 @@ public final class ScalingAPIImpl implements ScalingAPI {
ScalingAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
}
- private void executeScalingJob(final JobConfiguration jobConfig) {
- log.info("execute scaling job {}", jobConfig.getHandleConfig().getJobId());
- new OneOffJobBootstrap(ElasticJobUtil.createRegistryCenter(), new ScalingJob(), createElasticJobConfig(jobConfig)).execute();
+ @Override
+ public Optional<Long> start(final JobConfiguration jobConfig) {
+ log.info("Start scaling job by {}", jobConfig);
+ JobConfigurationUtil.fillInProperties(jobConfig);
+ if (jobConfig.getHandleConfig().getShardingTotalCount() == 0) {
+ return Optional.empty();
+ }
+ ScalingAPIFactory.getRegistryRepositoryAPI().persist(String.format("%s/%d", ScalingConstant.SCALING_ROOT, jobConfig.getHandleConfig().getJobId()), ScalingJob.class.getCanonicalName());
+ ScalingAPIFactory.getRegistryRepositoryAPI().persist(String.format("%s/%d/config", ScalingConstant.SCALING_ROOT, jobConfig.getHandleConfig().getJobId()), createElasticJobConfig(jobConfig));
+ return Optional.of(jobConfig.getHandleConfig().getJobId());
}
- private org.apache.shardingsphere.elasticjob.api.JobConfiguration createElasticJobConfig(final JobConfiguration jobConfig) {
- return org.apache.shardingsphere.elasticjob.api.JobConfiguration.newBuilder(String.valueOf(jobConfig.getHandleConfig().getJobId()), jobConfig.getHandleConfig().getShardingTotalCount())
- .jobParameter(new Gson().toJson(jobConfig))
- .build();
+ private String createElasticJobConfig(final JobConfiguration jobConfig) {
+ JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
+ jobConfigPOJO.setJobName(String.valueOf(jobConfig.getHandleConfig().getJobId()));
+ jobConfigPOJO.setShardingTotalCount(jobConfig.getHandleConfig().getShardingTotalCount());
+ jobConfigPOJO.setJobParameter(GSON.toJson(jobConfig));
+ return YamlEngine.marshal(jobConfigPOJO);
}
@Override
@@ -177,7 +178,7 @@ public final class ScalingAPIImpl implements ScalingAPI {
}
private JobConfiguration getJobConfig(final JobConfigurationPOJO elasticJobConfigPOJO) {
- return new Gson().fromJson(elasticJobConfigPOJO.getJobParameter(), JobConfiguration.class);
+ return GSON.fromJson(elasticJobConfigPOJO.getJobParameter(), JobConfiguration.class);
}
private JobConfigurationPOJO getElasticJobConfigPOJO(final long jobId) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/FinishedCheckJobExecutor.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/FinishedCheckJobExecutor.java
index 1039852..0da5290 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/FinishedCheckJobExecutor.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/FinishedCheckJobExecutor.java
@@ -31,7 +31,7 @@ import org.apache.shardingsphere.scaling.core.utils.ElasticJobUtil;
@Slf4j
public final class FinishedCheckJobExecutor extends AbstractScalingExecutor implements ScalingExecutor {
- private static final String JOB_NAME = "finished_check";
+ private static final String JOB_NAME = "_finished_check";
private static final String CRON_EXPRESSION = "0 * * * * ?";
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
index 73fd0f0..65bf861 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
@@ -17,33 +17,22 @@
package org.apache.shardingsphere.scaling.core.execute.executor.job;
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonSyntaxException;
-import lombok.Getter;
-import lombok.Setter;
+import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import org.apache.shardingsphere.elasticjob.lite.internal.election.LeaderService;
-import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobAPIFactory;
-import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
-import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent;
-import org.apache.shardingsphere.scaling.core.api.RegistryRepositoryAPI;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
+import org.apache.shardingsphere.scaling.core.api.JobSchedulerCenter;
import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
-import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.execute.executor.AbstractScalingExecutor;
import org.apache.shardingsphere.scaling.core.execute.executor.ScalingExecutor;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.utils.ElasticJobUtil;
-import java.util.Map;
import java.util.Optional;
-import java.util.regex.Matcher;
+import java.util.Set;
import java.util.regex.Pattern;
/**
@@ -52,40 +41,33 @@ import java.util.regex.Pattern;
@Slf4j
public final class ScalingJobExecutor extends AbstractScalingExecutor implements ScalingExecutor {
- private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().serializeNulls().create();
+ private static final Pattern CONFIG_PATTERN = Pattern.compile(ScalingConstant.SCALING_ROOT + "/(\\d+)/config");
- private static final Pattern CONFIG_PATTERN = Pattern.compile("(\\d+)/config");
-
- private final RegistryRepositoryAPI registryRepositoryAPI = ScalingAPIFactory.getRegistryRepositoryAPI();
-
- private final Map<String, JobBootstrapWrapper> scalingJobBootstrapMap = Maps.newHashMap();
-
- private GovernanceConfiguration governanceConfig;
-
- private CoordinatorRegistryCenter registryCenter;
+ private static final Set<String> EXECUTING_JOBS = Sets.newConcurrentHashSet();
@Override
public void start() {
super.start();
log.info("Start scaling job executor.");
- this.governanceConfig = ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
- registryCenter = ElasticJobUtil.createRegistryCenter();
- watchConfigRepository();
+ watchRegistryRepositoryConfig();
}
- private void watchConfigRepository() {
- registryRepositoryAPI.watch(ScalingConstant.SCALING_ROOT, event -> {
- Optional<JobConfiguration> jobConfig = getJobConfig(event);
- if (!jobConfig.isPresent()) {
+ private void watchRegistryRepositoryConfig() {
+ ScalingAPIFactory.getRegistryRepositoryAPI().watch(ScalingConstant.SCALING_ROOT, event -> {
+ Optional<JobConfigurationPOJO> jobConfigPOJOOptional = getJobConfigPOJO(event);
+ if (!jobConfigPOJOOptional.isPresent()) {
+ return;
+ }
+ JobConfigurationPOJO jobConfigPOJO = jobConfigPOJOOptional.get();
+ if (DataChangedEvent.Type.DELETED.equals(event.getType()) || jobConfigPOJO.isDisabled()) {
+ EXECUTING_JOBS.remove(jobConfigPOJO.getJobName());
+ JobSchedulerCenter.stop(Long.parseLong(jobConfigPOJO.getJobName()));
return;
}
switch (event.getType()) {
case ADDED:
case UPDATED:
- executeJob(getJobId(event.getKey()), jobConfig.get());
- break;
- case DELETED:
- deleteJob(getJobId(event.getKey()), jobConfig.get());
+ execute(jobConfigPOJO);
break;
default:
break;
@@ -93,84 +75,23 @@ public final class ScalingJobExecutor extends AbstractScalingExecutor implements
});
}
- private String getJobId(final String key) {
- Matcher matcher = CONFIG_PATTERN.matcher(key);
- if (matcher.matches()) {
- return matcher.group(1);
- }
- return key.split("/")[2];
- }
-
- private Optional<JobConfiguration> getJobConfig(final DataChangedEvent event) {
- if (!CONFIG_PATTERN.matcher(event.getKey()).matches()) {
- return Optional.empty();
- }
+ private Optional<JobConfigurationPOJO> getJobConfigPOJO(final DataChangedEvent event) {
try {
- log.info("{} job config: {} = {}", event.getType(), event.getKey(), event.getValue());
- return Optional.of(GSON.fromJson(event.getValue(), JobConfiguration.class));
- } catch (JsonSyntaxException ex) {
- log.error("analyze job config failed.", ex);
+ if (CONFIG_PATTERN.matcher(event.getKey()).matches()) {
+ log.info("{} job config: {} = {}", event.getType(), event.getKey(), event.getValue());
+ return Optional.of(YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class));
+ }
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ log.error("analyze job config pojo failed.", ex);
}
return Optional.empty();
}
- private void executeJob(final String jobId, final JobConfiguration jobConfig) {
- JobBootstrapWrapper jobBootstrapWrapper = scalingJobBootstrapMap.get(jobId);
- if (null == jobBootstrapWrapper) {
- createJob(jobId, jobConfig);
- return;
- }
- updateJob(jobId, jobConfig);
- }
-
- private void createJob(final String jobId, final JobConfiguration jobConfig) {
- if (jobConfig.getHandleConfig().isRunning()) {
- JobBootstrapWrapper jobBootstrapWrapper = new JobBootstrapWrapper(jobId, jobConfig);
- jobBootstrapWrapper.getJobBootstrap().execute();
- scalingJobBootstrapMap.put(jobId, jobBootstrapWrapper);
- }
- }
-
- private void updateJob(final String jobId, final JobConfiguration jobConfig) {
- JobBootstrapWrapper jobBootstrapWrapper = scalingJobBootstrapMap.get(jobId);
- if (jobBootstrapWrapper.isRunning() && jobConfig.getHandleConfig().isRunning()) {
- log.warn("scaling elastic job has already running, ignore current config.");
- return;
- }
- if (jobBootstrapWrapper.isRunning() == jobConfig.getHandleConfig().isRunning()) {
- return;
- }
- if (new LeaderService(registryCenter, jobId).isLeader()) {
- log.info("leader worker update config.");
- JobAPIFactory.createJobConfigurationAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
- governanceConfig.getName() + ScalingConstant.SCALING_ROOT, null)
- .updateJobConfiguration(JobConfigurationPOJO.fromJobConfiguration(createJobConfig(jobId, jobConfig)));
- }
- jobBootstrapWrapper.setRunning(jobConfig.getHandleConfig().isRunning());
- jobBootstrapWrapper.getJobBootstrap().execute();
- }
-
- private org.apache.shardingsphere.elasticjob.api.JobConfiguration createJobConfig(final String jobId, final JobConfiguration jobConfig) {
- return org.apache.shardingsphere.elasticjob.api.JobConfiguration.newBuilder(jobId, jobConfig.getHandleConfig().getShardingTables().length)
- .jobParameter(GSON.toJson(jobConfig)).overwrite(true).build();
- }
-
- private void deleteJob(final String jobId, final JobConfiguration jobConfig) {
- jobConfig.getHandleConfig().setRunning(false);
- executeJob(jobId, jobConfig);
- }
-
- @Getter
- @Setter
- private final class JobBootstrapWrapper {
-
- private final OneOffJobBootstrap jobBootstrap;
-
- private boolean running;
-
- private JobBootstrapWrapper(final String jobId, final JobConfiguration jobConfig) {
- jobBootstrap = new OneOffJobBootstrap(registryCenter, new ScalingJob(), createJobConfig(jobId, jobConfig));
- running = jobConfig.getHandleConfig().isRunning();
+ private void execute(final JobConfigurationPOJO jobConfigPOJO) {
+ if (EXECUTING_JOBS.add(jobConfigPOJO.getJobName())) {
+ new OneOffJobBootstrap(ElasticJobUtil.createRegistryCenter(), new ScalingJob(), jobConfigPOJO.toJobConfiguration()).execute();
}
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
index 59302e9..6aa517a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
@@ -32,7 +32,6 @@ import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResu
import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
import org.apache.shardingsphere.scaling.core.utils.ThreadUtil;
-import java.util.List;
import java.util.Map;
@Slf4j
@@ -44,25 +43,26 @@ public final class FinishedCheckJob implements SimpleJob {
@Override
public void execute(final ShardingContext shardingContext) {
- List<String> jobs = registryRepositoryAPI.getChildrenKeys(ScalingConstant.SCALING_ROOT);
- for (String each : jobs) {
- long jobId = Long.parseLong(each);
- try {
- JobConfiguration jobConfig = scalingAPI.getJobConfig(jobId);
- WorkflowConfiguration workflowConfig = jobConfig.getHandleConfig().getWorkflowConfig();
- if (workflowConfig == null) {
- continue;
- }
- if (ScalingTaskUtil.almostFinished(scalingAPI.getProgress(jobId), jobConfig.getHandleConfig())) {
- log.info("scaling job {} almost finished.", jobId);
- trySwitch(jobId, workflowConfig);
- }
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- log.error("scaling job {} finish check failed!", jobId, ex);
- }
- }
+ registryRepositoryAPI.getChildrenKeys(ScalingConstant.SCALING_ROOT).stream()
+ .filter(each -> !each.startsWith("_"))
+ .forEach(each -> {
+ long jobId = Long.parseLong(each);
+ try {
+ JobConfiguration jobConfig = scalingAPI.getJobConfig(jobId);
+ WorkflowConfiguration workflowConfig = jobConfig.getHandleConfig().getWorkflowConfig();
+ if (workflowConfig == null) {
+ return;
+ }
+ if (ScalingTaskUtil.almostFinished(scalingAPI.getProgress(jobId), jobConfig.getHandleConfig())) {
+ log.info("scaling job {} almost finished.", jobId);
+ trySwitch(jobId, workflowConfig);
+ }
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ log.error("scaling job {} finish check failed!", jobId, ex);
+ }
+ });
}
private void trySwitch(final long jobId, final WorkflowConfiguration workflowConfig) {