You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/02/01 09:39:31 UTC
[shardingsphere] branch master updated: Merge scaling listener path
and elastic job path (#9253)
This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new acf8a33 Merge scaling listener path and elastic job path (#9253)
acf8a33 is described below
commit acf8a3326bd6e19f96c5f1dc06c27ee0105bc4b6
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Mon Feb 1 17:39:11 2021 +0800
Merge scaling listener path and elastic job path (#9253)
* Merge scaling listener path and elastic job path
* Optimize RegistryRepositoryAPIImpl
* Optimize RegistryRepositoryAPIImpl
Co-authored-by: qiulu3 <Lucas209910>
---
.../scaling/core/api/ScalingAPIFactory.java | 4 +-
.../core/api/impl/RegistryRepositoryAPIImpl.java | 11 +++--
.../scaling/core/constant/ScalingConstant.java | 47 +---------------------
.../execute/executor/job/ScalingJobExecutor.java | 7 ++--
.../scaling/core/job/FinishedCheckJob.java | 2 +-
.../core/job/preparer/ScalingJobPreparer.java | 4 +-
.../scaling/core/utils/ElasticJobUtil.java | 2 +-
.../scaling/core/utils/ScalingTaskUtil.java | 15 -------
8 files changed, 17 insertions(+), 75 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
index d71725f..a2cfdc1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
@@ -96,7 +96,7 @@ public final class ScalingAPIFactory {
GovernanceConfiguration governanceConfig = ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
GovernanceCenterConfiguration registryCenterConfig = governanceConfig.getRegistryCenterConfiguration();
RegistryRepository registryRepository = TypedSPIRegistry.getRegisteredService(RegistryRepository.class, registryCenterConfig.getType(), registryCenterConfig.getProps());
- registryRepository.init(governanceConfig.getName(), registryCenterConfig);
+ registryRepository.init(governanceConfig.getName() + ScalingConstant.SCALING_ROOT, registryCenterConfig);
INSTANCE = new RegistryRepositoryAPIImpl(registryRepository);
}
}
@@ -112,7 +112,7 @@ public final class ScalingAPIFactory {
private ElasticJobAPIHolder() {
GovernanceConfiguration governanceConfig = ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
- String namespace = governanceConfig.getName() + ScalingConstant.SCALING_ROOT_PATH;
+ String namespace = governanceConfig.getName() + ScalingConstant.SCALING_ROOT;
jobStatisticsAPI = JobAPIFactory.createJobStatisticsAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(), namespace, null);
jobConfigurationAPI = JobAPIFactory.createJobConfigurationAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(), namespace, null);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
index cf7a17a..4cd9002 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
@@ -29,7 +29,6 @@ import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTa
import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTask;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
-import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
import java.util.HashMap;
import java.util.List;
@@ -51,7 +50,7 @@ public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
jobPosition.setDatabaseType(jobContext.getJobConfig().getHandleConfig().getDatabaseType());
jobPosition.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(jobContext));
jobPosition.setInventoryTaskProgressMap(getInventoryTaskProgressMap(jobContext));
- registryRepository.persist(ScalingTaskUtil.getScalingListenerPath(jobContext.getJobId(), jobContext.getShardingItem()), jobPosition.toJson());
+ registryRepository.persist(getPath(jobContext.getJobId(), jobContext.getShardingItem()), jobPosition.toJson());
}
private Map<String, IncrementalTaskProgress> getIncrementalTaskProgressMap(final JobContext jobContext) {
@@ -70,11 +69,15 @@ public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
return result;
}
+ private String getPath(final long jobId, final int shardingItem) {
+ return String.format("/%d/offset/%d", jobId, shardingItem);
+ }
+
@Override
public JobProgress getJobProgress(final long jobId, final int shardingItem) {
String data = null;
try {
- data = registryRepository.get(ScalingTaskUtil.getScalingListenerPath(jobId, shardingItem));
+ data = registryRepository.get(getPath(jobId, shardingItem));
} catch (final NullPointerException ex) {
log.info("job {}-{} without break point.", jobId, shardingItem);
}
@@ -84,7 +87,7 @@ public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
@Override
public void deleteJob(final long jobId) {
log.info("delete job {}", jobId);
- registryRepository.delete(ScalingTaskUtil.getScalingListenerPath(jobId));
+ registryRepository.delete(String.valueOf(jobId));
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/constant/ScalingConstant.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/constant/ScalingConstant.java
index 3c0ef9d..c85360e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/constant/ScalingConstant.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/constant/ScalingConstant.java
@@ -44,50 +44,5 @@ public final class ScalingConstant {
/**
* Scaling root path.
*/
- public static final String SCALING_ROOT_PATH = "/scaling";
-
- /**
- * Scaling listener path.
- */
- public static final String SCALING_LISTENER_PATH = SCALING_ROOT_PATH + "/listener";
-
- /**
- * Scaling elastic job path.
- */
- public static final String SCALING_ELASTIC_JOB_PATH = SCALING_ROOT_PATH + "/elastic_job";
-
- /**
- * Config.
- */
- public static final String CONFIG = "config";
-
- /**
- * Status.
- */
- public static final String STATUS = "status";
-
- /**
- * Position.
- */
- public static final String POSITION = "position";
-
- /**
- * Workflow.
- */
- public static final String WORKFLOW = "workflow";
-
- /**
- * Incremental.
- */
- public static final String INCREMENTAL = "incremental";
-
- /**
- * Inventory.
- */
- public static final String INVENTORY = "inventory";
-
- /**
- * Delay.
- */
- public static final String DELAY = "delay";
+ public static final String SCALING_ROOT = "/scaling";
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
index 94d24d9..73fd0f0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
@@ -40,7 +40,6 @@ import org.apache.shardingsphere.scaling.core.execute.executor.AbstractScalingEx
import org.apache.shardingsphere.scaling.core.execute.executor.ScalingExecutor;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.utils.ElasticJobUtil;
-import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
import java.util.Map;
import java.util.Optional;
@@ -55,7 +54,7 @@ public final class ScalingJobExecutor extends AbstractScalingExecutor implements
private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().serializeNulls().create();
- private static final Pattern CONFIG_PATTERN = Pattern.compile(ScalingTaskUtil.getScalingListenerPath("(\\d+)", ScalingConstant.CONFIG));
+ private static final Pattern CONFIG_PATTERN = Pattern.compile("(\\d+)/config");
private final RegistryRepositoryAPI registryRepositoryAPI = ScalingAPIFactory.getRegistryRepositoryAPI();
@@ -75,7 +74,7 @@ public final class ScalingJobExecutor extends AbstractScalingExecutor implements
}
private void watchConfigRepository() {
- registryRepositoryAPI.watch(ScalingConstant.SCALING_LISTENER_PATH, event -> {
+ registryRepositoryAPI.watch(ScalingConstant.SCALING_ROOT, event -> {
Optional<JobConfiguration> jobConfig = getJobConfig(event);
if (!jobConfig.isPresent()) {
return;
@@ -144,7 +143,7 @@ public final class ScalingJobExecutor extends AbstractScalingExecutor implements
if (new LeaderService(registryCenter, jobId).isLeader()) {
log.info("leader worker update config.");
JobAPIFactory.createJobConfigurationAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
- governanceConfig.getName() + ScalingConstant.SCALING_ELASTIC_JOB_PATH, null)
+ governanceConfig.getName() + ScalingConstant.SCALING_ROOT, null)
.updateJobConfiguration(JobConfigurationPOJO.fromJobConfiguration(createJobConfig(jobId, jobConfig)));
}
jobBootstrapWrapper.setRunning(jobConfig.getHandleConfig().isRunning());
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
index a39f87c..59302e9 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
@@ -44,7 +44,7 @@ public final class FinishedCheckJob implements SimpleJob {
@Override
public void execute(final ShardingContext shardingContext) {
- List<String> jobs = registryRepositoryAPI.getChildrenKeys(ScalingConstant.SCALING_LISTENER_PATH);
+ List<String> jobs = registryRepositoryAPI.getChildrenKeys(ScalingConstant.SCALING_ROOT);
for (String each : jobs) {
long jobId = Long.parseLong(each);
try {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
index 0e06c4c..6b1482c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
@@ -56,9 +56,9 @@ public final class ScalingJobPreparer {
checkDataSource(jobContext, dataSourceManager);
initIncrementalTasks(jobContext, dataSourceManager);
initInventoryTasks(jobContext, dataSourceManager);
- } catch (final PrepareFailedException | SQLException ex) {
- log.error("Preparing scaling job {} failed", jobContext.getJobId(), ex);
+ } catch (final SQLException ex) {
jobContext.setStatus(JobStatus.PREPARING_FAILURE);
+ throw new PrepareFailedException("Scaling job preparing failed", ex);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ElasticJobUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ElasticJobUtil.java
index 4c1d621e..bc35fc2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ElasticJobUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ElasticJobUtil.java
@@ -49,7 +49,7 @@ public final class ElasticJobUtil {
private static ZookeeperConfiguration getZookeeperConfig() {
GovernanceConfiguration governanceConfig = ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
ZookeeperConfiguration result = new ZookeeperConfiguration(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
- governanceConfig.getName() + ScalingConstant.SCALING_ELASTIC_JOB_PATH);
+ governanceConfig.getName() + ScalingConstant.SCALING_ROOT);
Properties props = governanceConfig.getRegistryCenterConfiguration().getProps();
result.setMaxSleepTimeMilliseconds(getProperty(props, "max.sleep.time.milliseconds", result.getMaxSleepTimeMilliseconds()));
result.setBaseSleepTimeMilliseconds(getProperty(props, "base.sleep.time.milliseconds", result.getBaseSleepTimeMilliseconds()));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
index 826eb77..19760f5 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.scaling.core.utils;
import org.apache.shardingsphere.scaling.core.config.HandleConfiguration;
-import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
import org.apache.shardingsphere.scaling.core.job.position.JobProgress;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTask;
@@ -71,18 +70,4 @@ public final class ScalingTaskUtil {
.flatMap(each -> each.getIncrementalTaskProgressMap().values().stream())
.allMatch(each -> each.getIncrementalTaskDelay().getDelayMilliseconds() <= handleConfig.getWorkflowConfig().getAllowDelayMilliseconds());
}
-
- /**
- * Get scaling listener path.
- *
- * @param paths sub paths.
- * @return path.
- */
- public static String getScalingListenerPath(final Object... paths) {
- StringBuilder result = new StringBuilder(ScalingConstant.SCALING_LISTENER_PATH);
- for (Object each : paths) {
- result.append("/").append(each);
- }
- return result.toString();
- }
}