You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/09/03 10:54:23 UTC
[shardingsphere] branch master updated: Add
dataConsistencyCheckAlgorithm interface for scaling (#12179)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4558290 Add dataConsistencyCheckAlgorithm interface for scaling (#12179)
4558290 is described below
commit 4558290ad064ca89cf2619867492f234c3759d87
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Fri Sep 3 18:53:30 2021 +0800
Add dataConsistencyCheckAlgorithm interface for scaling (#12179)
* Add dataConsistencyCheckAlgorithm interface and config
* Integrate dataConsistencyCheckAlgorithm
* Unit test
---
.../governance/yaml/YamlScalingConfiguration.java | 2 ++
.../ScalingDataConsistencyCheckAlgorithm.java} | 23 +++++-----------------
.../scaling/core/config/ScalingContext.java | 7 +++++++
.../scaling/core/config/ServerConfiguration.java | 2 ++
.../scaling/core/job/FinishedCheckJob.java | 17 +++++++++++-----
.../scaling/core/job/FinishedCheckJobTest.java | 9 ---------
6 files changed, 28 insertions(+), 32 deletions(-)
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/yaml/YamlScalingConfiguration.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/yaml/YamlScalingConfiguration.java
index f1b60c2..7f4a546 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/yaml/YamlScalingConfiguration.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/yaml/YamlScalingConfiguration.java
@@ -34,4 +34,6 @@ public final class YamlScalingConfiguration implements YamlConfiguration {
private int workerThread;
private YamlShardingSphereAlgorithmConfiguration clusterAutoSwitchAlgorithm;
+
+ private YamlShardingSphereAlgorithmConfiguration dataConsistencyCheckAlgorithm;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingDataConsistencyCheckAlgorithm.java
similarity index 62%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingDataConsistencyCheckAlgorithm.java
index bccb4e7..5cdc834 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingDataConsistencyCheckAlgorithm.java
@@ -15,27 +15,14 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.config;
+package org.apache.shardingsphere.scaling.core.api;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
-import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
+import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmPostProcessor;
/**
- * Global server configuration.
+ * Scaling data consistency check algorithm for SPI.
*/
-@Getter
-@Setter
-public final class ServerConfiguration {
+public interface ScalingDataConsistencyCheckAlgorithm extends ShardingSphereAlgorithm, ShardingSphereAlgorithmPostProcessor {
- private int port = 8080;
-
- private int blockQueueSize = 10000;
-
- private int workerThread = 30;
-
- private ShardingSphereAlgorithmConfiguration clusterAutoSwitchAlgorithm;
-
- private ModeConfiguration modeConfiguration;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java
index c5dfe21..96805c4 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java
@@ -22,6 +22,7 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
import org.apache.shardingsphere.scaling.core.api.ScalingClusterAutoSwitchAlgorithm;
+import org.apache.shardingsphere.scaling.core.api.ScalingDataConsistencyCheckAlgorithm;
import org.apache.shardingsphere.scaling.core.executor.engine.ExecuteEngine;
import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
@@ -34,6 +35,7 @@ public final class ScalingContext {
static {
ShardingSphereServiceLoader.register(ScalingClusterAutoSwitchAlgorithm.class);
+ ShardingSphereServiceLoader.register(ScalingDataConsistencyCheckAlgorithm.class);
}
private static final ScalingContext INSTANCE = new ScalingContext();
@@ -42,6 +44,8 @@ public final class ScalingContext {
private volatile ScalingClusterAutoSwitchAlgorithm clusterAutoSwitchAlgorithm;
+ private volatile ScalingDataConsistencyCheckAlgorithm dataConsistencyCheckAlgorithm;
+
private ExecuteEngine inventoryDumperExecuteEngine;
private ExecuteEngine incrementalDumperExecuteEngine;
@@ -70,6 +74,9 @@ public final class ScalingContext {
if (null != serverConfig.getClusterAutoSwitchAlgorithm()) {
clusterAutoSwitchAlgorithm = ShardingSphereAlgorithmFactory.createAlgorithm(serverConfig.getClusterAutoSwitchAlgorithm(), ScalingClusterAutoSwitchAlgorithm.class);
}
+ if (null != serverConfig.getDataConsistencyCheckAlgorithm()) {
+ dataConsistencyCheckAlgorithm = ShardingSphereAlgorithmFactory.createAlgorithm(serverConfig.getDataConsistencyCheckAlgorithm(), ScalingDataConsistencyCheckAlgorithm.class);
+ }
inventoryDumperExecuteEngine = ExecuteEngine.newFixedThreadInstance(serverConfig.getWorkerThread());
incrementalDumperExecuteEngine = ExecuteEngine.newCachedThreadInstance();
importerExecuteEngine = ExecuteEngine.newFixedThreadInstance(serverConfig.getWorkerThread());
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
index bccb4e7..ab7ec67 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
@@ -37,5 +37,7 @@ public final class ServerConfiguration {
private ShardingSphereAlgorithmConfiguration clusterAutoSwitchAlgorithm;
+ private ShardingSphereAlgorithmConfiguration dataConsistencyCheckAlgorithm;
+
private ModeConfiguration modeConfiguration;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
index 6c92f37..22ae919 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
@@ -23,11 +23,12 @@ import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.apache.shardingsphere.scaling.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.scaling.core.api.ScalingAPI;
import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
+import org.apache.shardingsphere.scaling.core.api.ScalingDataConsistencyCheckAlgorithm;
import org.apache.shardingsphere.scaling.core.common.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.job.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.util.ScalingTaskUtil;
-import org.apache.shardingsphere.scaling.core.util.ThreadUtil;
import java.util.Map;
@@ -60,11 +61,17 @@ public final class FinishedCheckJob implements SimpleJob {
private void trySwitch(final long jobId) {
// TODO lock proxy
- ThreadUtil.sleep(10 * 1000L);
- if (dataConsistencyCheck(jobId)) {
- scalingAPI.stop(jobId);
- //TODO auto switch configuration
+ ScalingDataConsistencyCheckAlgorithm dataConsistencyCheckAlgorithm = ScalingContext.getInstance().getDataConsistencyCheckAlgorithm();
+ if (null != dataConsistencyCheckAlgorithm) {
+ if (!dataConsistencyCheck(jobId)) {
+ log.error("data consistency check failed, job {}", jobId);
+ return;
+ }
+ } else {
+ log.info("dataConsistencyCheckAlgorithm is not configured, data consistency check will be ignored.");
}
+ scalingAPI.stop(jobId);
+ //TODO auto switch configuration
}
private boolean dataConsistencyCheck(final long jobId) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJobTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJobTest.java
index 5781adc..aed9058 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJobTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJobTest.java
@@ -29,7 +29,6 @@ import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.scaling.core.config.WorkflowConfiguration;
import org.apache.shardingsphere.scaling.core.fixture.EmbedTestingServer;
-import org.apache.shardingsphere.scaling.core.job.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
import org.apache.shardingsphere.scaling.core.util.ResourceUtil;
import org.junit.AfterClass;
@@ -41,7 +40,6 @@ import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.Collections;
-import java.util.Map;
import java.util.Properties;
import static org.mockito.Mockito.when;
@@ -77,7 +75,6 @@ public final class FinishedCheckJobTest {
when(governanceRepositoryAPI.getChildrenKeys(ScalingConstant.SCALING_ROOT)).thenReturn(Collections.singletonList("1"));
when(scalingAPI.getJobConfig(1L)).thenReturn(mockJobConfigWithWorkflow());
when(scalingAPI.getProgress(1L)).thenReturn(Collections.emptyMap());
- when(scalingAPI.dataConsistencyCheck(1L)).thenReturn(mockDataConsistencyCheck());
finishedCheckJob.execute(null);
}
@@ -86,12 +83,6 @@ public final class FinishedCheckJobTest {
ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "serverConfig", null);
}
- private Map<String, DataConsistencyCheckResult> mockDataConsistencyCheck() {
- DataConsistencyCheckResult checkResult = new DataConsistencyCheckResult(1, 1);
- checkResult.setDataValid(true);
- return Collections.singletonMap("t_order", checkResult);
- }
-
private static ServerConfiguration mockServerConfig() {
ServerConfiguration result = new ServerConfiguration();
result.setClusterAutoSwitchAlgorithm(new ShardingSphereAlgorithmConfiguration("Fixture", new Properties()));