You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/09/07 12:42:11 UTC
[shardingsphere] branch master updated: Scaling FinishedCheckJob
ignore disabled job (#12242)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new cc08e5c Scaling FinishedCheckJob ignore disabled job (#12242)
cc08e5c is described below
commit cc08e5c2774912ab4dee871204de97e4a673dfca
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Tue Sep 7 20:41:44 2021 +0800
Scaling FinishedCheckJob ignore disabled job (#12242)
* FinishedCheckJob ignore disabled job
* Unit test
---
.../shardingsphere/scaling/core/api/JobInfo.java | 2 ++
.../scaling/core/api/impl/ScalingAPIImpl.java | 1 +
.../scaling/core/job/FinishedCheckJob.java | 41 +++++++++++-----------
.../scaling/core/job/FinishedCheckJobTest.java | 35 +++++++++---------
4 files changed, 41 insertions(+), 38 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobInfo.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobInfo.java
index d455951..ea5b98d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobInfo.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobInfo.java
@@ -40,4 +40,6 @@ public final class JobInfo {
private String createTime;
private String stopTime;
+
+ private transient String jobParameter;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
index 7f85ed2..c7b297d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
@@ -67,6 +67,7 @@ public final class ScalingAPIImpl implements ScalingAPI {
result.setTables(jobConfig.getHandleConfig().getLogicTables());
result.setCreateTime(jobConfigPOJO.getProps().getProperty("create_time"));
result.setStopTime(jobConfigPOJO.getProps().getProperty("stop_time"));
+ result.setJobParameter(jobConfigPOJO.getJobParameter());
return result;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
index 8edfe4c..b4db0ab 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
@@ -21,18 +21,19 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.ScalingTaskFinishedEvent;
-import org.apache.shardingsphere.scaling.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.scaling.core.api.JobInfo;
import org.apache.shardingsphere.scaling.core.api.ScalingAPI;
import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
import org.apache.shardingsphere.scaling.core.api.ScalingDataConsistencyCheckAlgorithm;
-import org.apache.shardingsphere.scaling.core.common.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfigurationWrap;
import org.apache.shardingsphere.scaling.core.job.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.util.ScalingTaskUtil;
+import java.util.List;
import java.util.Map;
@Slf4j
@@ -40,26 +41,26 @@ public final class FinishedCheckJob implements SimpleJob {
private final ScalingAPI scalingAPI = ScalingAPIFactory.getScalingAPI();
- private final GovernanceRepositoryAPI governanceRepositoryAPI = ScalingAPIFactory.getGovernanceRepositoryAPI();
-
@Override
public void execute(final ShardingContext shardingContext) {
- governanceRepositoryAPI.getChildrenKeys(ScalingConstant.SCALING_ROOT).stream()
- .filter(each -> !each.startsWith("_"))
- .forEach(each -> {
- long jobId = Long.parseLong(each);
- try {
- JobConfiguration jobConfig = scalingAPI.getJobConfig(jobId);
- if (ScalingTaskUtil.almostFinished(scalingAPI.getProgress(jobId), jobConfig.getHandleConfig())) {
- log.info("scaling job {} almost finished.", jobId);
- trySwitch(jobId, jobConfig);
- }
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- log.error("scaling job {} finish check failed!", jobId, ex);
- }
- });
+ List<JobInfo> jobInfos = scalingAPI.list();
+ for (JobInfo jobInfo : jobInfos) {
+ if (!jobInfo.isActive()) {
+ continue;
+ }
+ long jobId = jobInfo.getJobId();
+ try {
+ JobConfiguration jobConfig = YamlEngine.unmarshal(jobInfo.getJobParameter(), JobConfiguration.class);
+ if (ScalingTaskUtil.almostFinished(scalingAPI.getProgress(jobId), jobConfig.getHandleConfig())) {
+ log.info("scaling job {} almost finished.", jobId);
+ trySwitch(jobId, jobConfig);
+ }
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ log.error("scaling job {} finish check failed!", jobId, ex);
+ }
+ }
}
private void trySwitch(final long jobId, final JobConfiguration jobConfig) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJobTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJobTest.java
index aed9058..86f18c7 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJobTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJobTest.java
@@ -21,16 +21,12 @@ import lombok.SneakyThrows;
import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
-import org.apache.shardingsphere.scaling.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.scaling.core.api.JobInfo;
import org.apache.shardingsphere.scaling.core.api.ScalingAPI;
-import org.apache.shardingsphere.scaling.core.common.constant.ScalingConstant;
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
-import org.apache.shardingsphere.scaling.core.config.WorkflowConfiguration;
import org.apache.shardingsphere.scaling.core.fixture.EmbedTestingServer;
import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
-import org.apache.shardingsphere.scaling.core.util.ResourceUtil;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -40,6 +36,7 @@ import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.Collections;
+import java.util.List;
import java.util.Properties;
import static org.mockito.Mockito.when;
@@ -52,9 +49,6 @@ public final class FinishedCheckJobTest {
@Mock
private ScalingAPI scalingAPI;
- @Mock
- private GovernanceRepositoryAPI governanceRepositoryAPI;
-
@BeforeClass
public static void beforeClass() throws Exception {
EmbedTestingServer.start();
@@ -67,13 +61,24 @@ public final class FinishedCheckJobTest {
@SneakyThrows(ReflectiveOperationException.class)
public void setUp() {
ReflectionUtil.setFieldValue(finishedCheckJob, "scalingAPI", scalingAPI);
- ReflectionUtil.setFieldValue(finishedCheckJob, "governanceRepositoryAPI", governanceRepositoryAPI);
}
@Test
- public void assertExecuteWithWorkflow() {
- when(governanceRepositoryAPI.getChildrenKeys(ScalingConstant.SCALING_ROOT)).thenReturn(Collections.singletonList("1"));
- when(scalingAPI.getJobConfig(1L)).thenReturn(mockJobConfigWithWorkflow());
+ public void assertExecuteAllDisabledJob() {
+ JobInfo jobInfo = new JobInfo(1L);
+ jobInfo.setActive(false);
+ List<JobInfo> jobInfos = Collections.singletonList(jobInfo);
+ when(scalingAPI.list()).thenReturn(jobInfos);
+ finishedCheckJob.execute(null);
+ }
+
+ @Test
+ public void assertExecuteActiveJob() {
+ JobInfo jobInfo = new JobInfo(1L);
+ jobInfo.setActive(true);
+ jobInfo.setJobParameter("handleConfig:\nruleConfig:\n");
+ List<JobInfo> jobInfos = Collections.singletonList(jobInfo);
+ when(scalingAPI.list()).thenReturn(jobInfos);
when(scalingAPI.getProgress(1L)).thenReturn(Collections.emptyMap());
finishedCheckJob.execute(null);
}
@@ -89,10 +94,4 @@ public final class FinishedCheckJobTest {
result.setModeConfiguration(new ModeConfiguration("Cluster", new ClusterPersistRepositoryConfiguration("Zookeeper", "test", EmbedTestingServer.getConnectionString(), null), true));
return result;
}
-
- private JobConfiguration mockJobConfigWithWorkflow() {
- JobConfiguration result = ResourceUtil.mockJobConfig();
- result.getHandleConfig().setWorkflowConfig(new WorkflowConfiguration("ds_0", "1"));
- return result;
- }
}