You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/02/01 09:39:31 UTC

[shardingsphere] branch master updated: Merge scaling listener path and elastic job path (#9253)

This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new acf8a33  Merge scaling listener path and elastic job path (#9253)
acf8a33 is described below

commit acf8a3326bd6e19f96c5f1dc06c27ee0105bc4b6
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Mon Feb 1 17:39:11 2021 +0800

    Merge scaling listener path and elastic job path (#9253)
    
    * Merge scaling listener path and elastic job path
    
    * Optimize RegistryRepositoryAPIImpl
    
    * Optimize RegistryRepositoryAPIImpl
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../scaling/core/api/ScalingAPIFactory.java        |  4 +-
 .../core/api/impl/RegistryRepositoryAPIImpl.java   | 11 +++--
 .../scaling/core/constant/ScalingConstant.java     | 47 +---------------------
 .../execute/executor/job/ScalingJobExecutor.java   |  7 ++--
 .../scaling/core/job/FinishedCheckJob.java         |  2 +-
 .../core/job/preparer/ScalingJobPreparer.java      |  4 +-
 .../scaling/core/utils/ElasticJobUtil.java         |  2 +-
 .../scaling/core/utils/ScalingTaskUtil.java        | 15 -------
 8 files changed, 17 insertions(+), 75 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
index d71725f..a2cfdc1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
@@ -96,7 +96,7 @@ public final class ScalingAPIFactory {
             GovernanceConfiguration governanceConfig = ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
             GovernanceCenterConfiguration registryCenterConfig = governanceConfig.getRegistryCenterConfiguration();
             RegistryRepository registryRepository = TypedSPIRegistry.getRegisteredService(RegistryRepository.class, registryCenterConfig.getType(), registryCenterConfig.getProps());
-            registryRepository.init(governanceConfig.getName(), registryCenterConfig);
+            registryRepository.init(governanceConfig.getName() + ScalingConstant.SCALING_ROOT, registryCenterConfig);
             INSTANCE = new RegistryRepositoryAPIImpl(registryRepository);
         }
     }
@@ -112,7 +112,7 @@ public final class ScalingAPIFactory {
         
         private ElasticJobAPIHolder() {
             GovernanceConfiguration governanceConfig = ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
-            String namespace = governanceConfig.getName() + ScalingConstant.SCALING_ROOT_PATH;
+            String namespace = governanceConfig.getName() + ScalingConstant.SCALING_ROOT;
             jobStatisticsAPI = JobAPIFactory.createJobStatisticsAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(), namespace, null);
             jobConfigurationAPI = JobAPIFactory.createJobConfigurationAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(), namespace, null);
         }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
index cf7a17a..4cd9002 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
@@ -29,7 +29,6 @@ import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTa
 import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
 import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTask;
 import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
-import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
 
 import java.util.HashMap;
 import java.util.List;
@@ -51,7 +50,7 @@ public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
         jobPosition.setDatabaseType(jobContext.getJobConfig().getHandleConfig().getDatabaseType());
         jobPosition.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(jobContext));
         jobPosition.setInventoryTaskProgressMap(getInventoryTaskProgressMap(jobContext));
-        registryRepository.persist(ScalingTaskUtil.getScalingListenerPath(jobContext.getJobId(), jobContext.getShardingItem()), jobPosition.toJson());
+        registryRepository.persist(getPath(jobContext.getJobId(), jobContext.getShardingItem()), jobPosition.toJson());
     }
     
     private Map<String, IncrementalTaskProgress> getIncrementalTaskProgressMap(final JobContext jobContext) {
@@ -70,11 +69,15 @@ public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
         return result;
     }
     
+    private String getPath(final long jobId, final int shardingItem) {
+        return String.format("/%d/offset/%d", jobId, shardingItem);
+    }
+    
     @Override
     public JobProgress getJobProgress(final long jobId, final int shardingItem) {
         String data = null;
         try {
-            data = registryRepository.get(ScalingTaskUtil.getScalingListenerPath(jobId, shardingItem));
+            data = registryRepository.get(getPath(jobId, shardingItem));
         } catch (final NullPointerException ex) {
             log.info("job {}-{} without break point.", jobId, shardingItem);
         }
@@ -84,7 +87,7 @@ public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
     @Override
     public void deleteJob(final long jobId) {
         log.info("delete job {}", jobId);
-        registryRepository.delete(ScalingTaskUtil.getScalingListenerPath(jobId));
+        registryRepository.delete(String.valueOf(jobId));
     }
     
     @Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/constant/ScalingConstant.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/constant/ScalingConstant.java
index 3c0ef9d..c85360e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/constant/ScalingConstant.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/constant/ScalingConstant.java
@@ -44,50 +44,5 @@ public final class ScalingConstant {
     /**
      * Scaling root path.
      */
-    public static final String SCALING_ROOT_PATH = "/scaling";
-    
-    /**
-     * Scaling listener path.
-     */
-    public static final String SCALING_LISTENER_PATH = SCALING_ROOT_PATH + "/listener";
-    
-    /**
-     * Scaling elastic job path.
-     */
-    public static final String SCALING_ELASTIC_JOB_PATH = SCALING_ROOT_PATH + "/elastic_job";
-    
-    /**
-     * Config.
-     */
-    public static final String CONFIG = "config";
-    
-    /**
-     * Status.
-     */
-    public static final String STATUS = "status";
-    
-    /**
-     * Position.
-     */
-    public static final String POSITION = "position";
-    
-    /**
-     * Workflow.
-     */
-    public static final String WORKFLOW = "workflow";
-    
-    /**
-     * Incremental.
-     */
-    public static final String INCREMENTAL = "incremental";
-    
-    /**
-     * Inventory.
-     */
-    public static final String INVENTORY = "inventory";
-    
-    /**
-     * Delay.
-     */
-    public static final String DELAY = "delay";
+    public static final String SCALING_ROOT = "/scaling";
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
index 94d24d9..73fd0f0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
@@ -40,7 +40,6 @@ import org.apache.shardingsphere.scaling.core.execute.executor.AbstractScalingEx
 import org.apache.shardingsphere.scaling.core.execute.executor.ScalingExecutor;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import org.apache.shardingsphere.scaling.core.utils.ElasticJobUtil;
-import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
 
 import java.util.Map;
 import java.util.Optional;
@@ -55,7 +54,7 @@ public final class ScalingJobExecutor extends AbstractScalingExecutor implements
     
     private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().serializeNulls().create();
     
-    private static final Pattern CONFIG_PATTERN = Pattern.compile(ScalingTaskUtil.getScalingListenerPath("(\\d+)", ScalingConstant.CONFIG));
+    private static final Pattern CONFIG_PATTERN = Pattern.compile("(\\d+)/config");
     
     private final RegistryRepositoryAPI registryRepositoryAPI = ScalingAPIFactory.getRegistryRepositoryAPI();
     
@@ -75,7 +74,7 @@ public final class ScalingJobExecutor extends AbstractScalingExecutor implements
     }
     
     private void watchConfigRepository() {
-        registryRepositoryAPI.watch(ScalingConstant.SCALING_LISTENER_PATH, event -> {
+        registryRepositoryAPI.watch(ScalingConstant.SCALING_ROOT, event -> {
             Optional<JobConfiguration> jobConfig = getJobConfig(event);
             if (!jobConfig.isPresent()) {
                 return;
@@ -144,7 +143,7 @@ public final class ScalingJobExecutor extends AbstractScalingExecutor implements
         if (new LeaderService(registryCenter, jobId).isLeader()) {
             log.info("leader worker update config.");
             JobAPIFactory.createJobConfigurationAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
-                    governanceConfig.getName() + ScalingConstant.SCALING_ELASTIC_JOB_PATH, null)
+                    governanceConfig.getName() + ScalingConstant.SCALING_ROOT, null)
                     .updateJobConfiguration(JobConfigurationPOJO.fromJobConfiguration(createJobConfig(jobId, jobConfig)));
         }
         jobBootstrapWrapper.setRunning(jobConfig.getHandleConfig().isRunning());
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
index a39f87c..59302e9 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
@@ -44,7 +44,7 @@ public final class FinishedCheckJob implements SimpleJob {
     
     @Override
     public void execute(final ShardingContext shardingContext) {
-        List<String> jobs = registryRepositoryAPI.getChildrenKeys(ScalingConstant.SCALING_LISTENER_PATH);
+        List<String> jobs = registryRepositoryAPI.getChildrenKeys(ScalingConstant.SCALING_ROOT);
         for (String each : jobs) {
             long jobId = Long.parseLong(each);
             try {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
index 0e06c4c..6b1482c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
@@ -56,9 +56,9 @@ public final class ScalingJobPreparer {
             checkDataSource(jobContext, dataSourceManager);
             initIncrementalTasks(jobContext, dataSourceManager);
             initInventoryTasks(jobContext, dataSourceManager);
-        } catch (final PrepareFailedException | SQLException ex) {
-            log.error("Preparing scaling job {} failed", jobContext.getJobId(), ex);
+        } catch (final SQLException ex) {
             jobContext.setStatus(JobStatus.PREPARING_FAILURE);
+            throw new PrepareFailedException("Scaling job preparing failed", ex);
         }
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ElasticJobUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ElasticJobUtil.java
index 4c1d621e..bc35fc2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ElasticJobUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ElasticJobUtil.java
@@ -49,7 +49,7 @@ public final class ElasticJobUtil {
     private static ZookeeperConfiguration getZookeeperConfig() {
         GovernanceConfiguration governanceConfig = ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
         ZookeeperConfiguration result = new ZookeeperConfiguration(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
-                governanceConfig.getName() + ScalingConstant.SCALING_ELASTIC_JOB_PATH);
+                governanceConfig.getName() + ScalingConstant.SCALING_ROOT);
         Properties props = governanceConfig.getRegistryCenterConfiguration().getProps();
         result.setMaxSleepTimeMilliseconds(getProperty(props, "max.sleep.time.milliseconds", result.getMaxSleepTimeMilliseconds()));
         result.setBaseSleepTimeMilliseconds(getProperty(props, "base.sleep.time.milliseconds", result.getBaseSleepTimeMilliseconds()));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
index 826eb77..19760f5 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.scaling.core.utils;
 
 import org.apache.shardingsphere.scaling.core.config.HandleConfiguration;
-import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
 import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
 import org.apache.shardingsphere.scaling.core.job.position.JobProgress;
 import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTask;
@@ -71,18 +70,4 @@ public final class ScalingTaskUtil {
                 .flatMap(each -> each.getIncrementalTaskProgressMap().values().stream())
                 .allMatch(each -> each.getIncrementalTaskDelay().getDelayMilliseconds() <= handleConfig.getWorkflowConfig().getAllowDelayMilliseconds());
     }
-    
-    /**
-     * Get scaling listener path.
-     *
-     * @param paths sub paths.
-     * @return path.
-     */
-    public static String getScalingListenerPath(final Object... paths) {
-        StringBuilder result = new StringBuilder(ScalingConstant.SCALING_LISTENER_PATH);
-        for (Object each : paths) {
-            result.append("/").append(each);
-        }
-        return result.toString();
-    }
 }