You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/09/07 12:42:11 UTC

[shardingsphere] branch master updated: Scaling FinishedCheckJob ignore disabled job (#12242)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new cc08e5c  Scaling FinishedCheckJob ignore disabled job (#12242)
cc08e5c is described below

commit cc08e5c2774912ab4dee871204de97e4a673dfca
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Tue Sep 7 20:41:44 2021 +0800

    Scaling FinishedCheckJob ignore disabled job (#12242)
    
    * FinishedCheckJob ignore disabled job
    
    * Unit test
---
 .../shardingsphere/scaling/core/api/JobInfo.java   |  2 ++
 .../scaling/core/api/impl/ScalingAPIImpl.java      |  1 +
 .../scaling/core/job/FinishedCheckJob.java         | 41 +++++++++++-----------
 .../scaling/core/job/FinishedCheckJobTest.java     | 35 +++++++++---------
 4 files changed, 41 insertions(+), 38 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobInfo.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobInfo.java
index d455951..ea5b98d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobInfo.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobInfo.java
@@ -40,4 +40,6 @@ public final class JobInfo {
     private String createTime;
     
     private String stopTime;
+    
+    private transient String jobParameter;
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
index 7f85ed2..c7b297d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
@@ -67,6 +67,7 @@ public final class ScalingAPIImpl implements ScalingAPI {
         result.setTables(jobConfig.getHandleConfig().getLogicTables());
         result.setCreateTime(jobConfigPOJO.getProps().getProperty("create_time"));
         result.setStopTime(jobConfigPOJO.getProps().getProperty("stop_time"));
+        result.setJobParameter(jobConfigPOJO.getJobParameter());
         return result;
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
index 8edfe4c..b4db0ab 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
@@ -21,18 +21,19 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.ScalingTaskFinishedEvent;
-import org.apache.shardingsphere.scaling.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.scaling.core.api.JobInfo;
 import org.apache.shardingsphere.scaling.core.api.ScalingAPI;
 import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
 import org.apache.shardingsphere.scaling.core.api.ScalingDataConsistencyCheckAlgorithm;
-import org.apache.shardingsphere.scaling.core.common.constant.ScalingConstant;
 import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfigurationWrap;
 import org.apache.shardingsphere.scaling.core.job.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.scaling.core.util.ScalingTaskUtil;
 
+import java.util.List;
 import java.util.Map;
 
 @Slf4j
@@ -40,26 +41,26 @@ public final class FinishedCheckJob implements SimpleJob {
     
     private final ScalingAPI scalingAPI = ScalingAPIFactory.getScalingAPI();
     
-    private final GovernanceRepositoryAPI governanceRepositoryAPI = ScalingAPIFactory.getGovernanceRepositoryAPI();
-    
     @Override
     public void execute(final ShardingContext shardingContext) {
-        governanceRepositoryAPI.getChildrenKeys(ScalingConstant.SCALING_ROOT).stream()
-                .filter(each -> !each.startsWith("_"))
-                .forEach(each -> {
-                    long jobId = Long.parseLong(each);
-                    try {
-                        JobConfiguration jobConfig = scalingAPI.getJobConfig(jobId);
-                        if (ScalingTaskUtil.almostFinished(scalingAPI.getProgress(jobId), jobConfig.getHandleConfig())) {
-                            log.info("scaling job {} almost finished.", jobId);
-                            trySwitch(jobId, jobConfig);
-                        }
-                        // CHECKSTYLE:OFF
-                    } catch (final Exception ex) {
-                        // CHECKSTYLE:ON
-                        log.error("scaling job {} finish check failed!", jobId, ex);
-                    }
-                });
+        List<JobInfo> jobInfos = scalingAPI.list();
+        for (JobInfo jobInfo : jobInfos) {
+            if (!jobInfo.isActive()) {
+                continue;
+            }
+            long jobId = jobInfo.getJobId();
+            try {
+                JobConfiguration jobConfig = YamlEngine.unmarshal(jobInfo.getJobParameter(), JobConfiguration.class);
+                if (ScalingTaskUtil.almostFinished(scalingAPI.getProgress(jobId), jobConfig.getHandleConfig())) {
+                    log.info("scaling job {} almost finished.", jobId);
+                    trySwitch(jobId, jobConfig);
+                }
+                // CHECKSTYLE:OFF
+            } catch (final Exception ex) {
+                // CHECKSTYLE:ON
+                log.error("scaling job {} finish check failed!", jobId, ex);
+            }
+        }
     }
     
     private void trySwitch(final long jobId, final JobConfiguration jobConfig) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJobTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJobTest.java
index aed9058..86f18c7 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJobTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJobTest.java
@@ -21,16 +21,12 @@ import lombok.SneakyThrows;
 import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
-import org.apache.shardingsphere.scaling.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.scaling.core.api.JobInfo;
 import org.apache.shardingsphere.scaling.core.api.ScalingAPI;
-import org.apache.shardingsphere.scaling.core.common.constant.ScalingConstant;
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
-import org.apache.shardingsphere.scaling.core.config.WorkflowConfiguration;
 import org.apache.shardingsphere.scaling.core.fixture.EmbedTestingServer;
 import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
-import org.apache.shardingsphere.scaling.core.util.ResourceUtil;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -40,6 +36,7 @@ import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
 
 import static org.mockito.Mockito.when;
@@ -52,9 +49,6 @@ public final class FinishedCheckJobTest {
     @Mock
     private ScalingAPI scalingAPI;
     
-    @Mock
-    private GovernanceRepositoryAPI governanceRepositoryAPI;
-    
     @BeforeClass
     public static void beforeClass() throws Exception {
         EmbedTestingServer.start();
@@ -67,13 +61,24 @@ public final class FinishedCheckJobTest {
     @SneakyThrows(ReflectiveOperationException.class)
     public void setUp() {
         ReflectionUtil.setFieldValue(finishedCheckJob, "scalingAPI", scalingAPI);
-        ReflectionUtil.setFieldValue(finishedCheckJob, "governanceRepositoryAPI", governanceRepositoryAPI);
     }
     
     @Test
-    public void assertExecuteWithWorkflow() {
-        when(governanceRepositoryAPI.getChildrenKeys(ScalingConstant.SCALING_ROOT)).thenReturn(Collections.singletonList("1"));
-        when(scalingAPI.getJobConfig(1L)).thenReturn(mockJobConfigWithWorkflow());
+    public void assertExecuteAllDisabledJob() {
+        JobInfo jobInfo = new JobInfo(1L);
+        jobInfo.setActive(false);
+        List<JobInfo> jobInfos = Collections.singletonList(jobInfo);
+        when(scalingAPI.list()).thenReturn(jobInfos);
+        finishedCheckJob.execute(null);
+    }
+    
+    @Test
+    public void assertExecuteActiveJob() {
+        JobInfo jobInfo = new JobInfo(1L);
+        jobInfo.setActive(true);
+        jobInfo.setJobParameter("handleConfig:\nruleConfig:\n");
+        List<JobInfo> jobInfos = Collections.singletonList(jobInfo);
+        when(scalingAPI.list()).thenReturn(jobInfos);
         when(scalingAPI.getProgress(1L)).thenReturn(Collections.emptyMap());
         finishedCheckJob.execute(null);
     }
@@ -89,10 +94,4 @@ public final class FinishedCheckJobTest {
         result.setModeConfiguration(new ModeConfiguration("Cluster", new ClusterPersistRepositoryConfiguration("Zookeeper", "test", EmbedTestingServer.getConnectionString(), null), true));
         return result;
     }
-    
-    private JobConfiguration mockJobConfigWithWorkflow() {
-        JobConfiguration result = ResourceUtil.mockJobConfig();
-        result.getHandleConfig().setWorkflowConfig(new WorkflowConfiguration("ds_0", "1"));
-        return result;
-    }
 }