You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/09/03 10:54:23 UTC

[shardingsphere] branch master updated: Add dataConsistencyCheckAlgorithm interface for scaling (#12179)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4558290  Add dataConsistencyCheckAlgorithm interface for scaling (#12179)
4558290 is described below

commit 4558290ad064ca89cf2619867492f234c3759d87
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Fri Sep 3 18:53:30 2021 +0800

    Add dataConsistencyCheckAlgorithm interface for scaling (#12179)
    
    * Add dataConsistencyCheckAlgorithm interface and config
    
    * Integrate dataConsistencyCheckAlgorithm
    
    * Unit test
---
 .../governance/yaml/YamlScalingConfiguration.java  |  2 ++
 .../ScalingDataConsistencyCheckAlgorithm.java}     | 23 +++++-----------------
 .../scaling/core/config/ScalingContext.java        |  7 +++++++
 .../scaling/core/config/ServerConfiguration.java   |  2 ++
 .../scaling/core/job/FinishedCheckJob.java         | 17 +++++++++++-----
 .../scaling/core/job/FinishedCheckJobTest.java     |  9 ---------
 6 files changed, 28 insertions(+), 32 deletions(-)

diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/yaml/YamlScalingConfiguration.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/yaml/YamlScalingConfiguration.java
index f1b60c2..7f4a546 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/yaml/YamlScalingConfiguration.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/yaml/YamlScalingConfiguration.java
@@ -34,4 +34,6 @@ public final class YamlScalingConfiguration implements YamlConfiguration {
     private int workerThread;
     
     private YamlShardingSphereAlgorithmConfiguration clusterAutoSwitchAlgorithm;
+    
+    private YamlShardingSphereAlgorithmConfiguration dataConsistencyCheckAlgorithm;
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingDataConsistencyCheckAlgorithm.java
similarity index 62%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingDataConsistencyCheckAlgorithm.java
index bccb4e7..5cdc834 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingDataConsistencyCheckAlgorithm.java
@@ -15,27 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.config;
+package org.apache.shardingsphere.scaling.core.api;
 
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
-import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
+import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmPostProcessor;
 
 /**
- * Global server configuration.
+ * Scaling data consistency check algorithm for SPI.
  */
-@Getter
-@Setter
-public final class ServerConfiguration {
+public interface ScalingDataConsistencyCheckAlgorithm extends ShardingSphereAlgorithm, ShardingSphereAlgorithmPostProcessor {
     
-    private int port = 8080;
-    
-    private int blockQueueSize = 10000;
-    
-    private int workerThread = 30;
-    
-    private ShardingSphereAlgorithmConfiguration clusterAutoSwitchAlgorithm;
-    
-    private ModeConfiguration modeConfiguration;
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java
index c5dfe21..96805c4 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java
@@ -22,6 +22,7 @@ import lombok.Getter;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
 import org.apache.shardingsphere.scaling.core.api.ScalingClusterAutoSwitchAlgorithm;
+import org.apache.shardingsphere.scaling.core.api.ScalingDataConsistencyCheckAlgorithm;
 import org.apache.shardingsphere.scaling.core.executor.engine.ExecuteEngine;
 import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
 
@@ -34,6 +35,7 @@ public final class ScalingContext {
     
     static {
         ShardingSphereServiceLoader.register(ScalingClusterAutoSwitchAlgorithm.class);
+        ShardingSphereServiceLoader.register(ScalingDataConsistencyCheckAlgorithm.class);
     }
     
     private static final ScalingContext INSTANCE = new ScalingContext();
@@ -42,6 +44,8 @@ public final class ScalingContext {
     
     private volatile ScalingClusterAutoSwitchAlgorithm clusterAutoSwitchAlgorithm;
     
+    private volatile ScalingDataConsistencyCheckAlgorithm dataConsistencyCheckAlgorithm;
+    
     private ExecuteEngine inventoryDumperExecuteEngine;
     
     private ExecuteEngine incrementalDumperExecuteEngine;
@@ -70,6 +74,9 @@ public final class ScalingContext {
         if (null != serverConfig.getClusterAutoSwitchAlgorithm()) {
             clusterAutoSwitchAlgorithm = ShardingSphereAlgorithmFactory.createAlgorithm(serverConfig.getClusterAutoSwitchAlgorithm(), ScalingClusterAutoSwitchAlgorithm.class);
         }
+        if (null != serverConfig.getDataConsistencyCheckAlgorithm()) {
+            dataConsistencyCheckAlgorithm = ShardingSphereAlgorithmFactory.createAlgorithm(serverConfig.getDataConsistencyCheckAlgorithm(), ScalingDataConsistencyCheckAlgorithm.class);
+        }
         inventoryDumperExecuteEngine = ExecuteEngine.newFixedThreadInstance(serverConfig.getWorkerThread());
         incrementalDumperExecuteEngine = ExecuteEngine.newCachedThreadInstance();
         importerExecuteEngine = ExecuteEngine.newFixedThreadInstance(serverConfig.getWorkerThread());
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
index bccb4e7..ab7ec67 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
@@ -37,5 +37,7 @@ public final class ServerConfiguration {
     
     private ShardingSphereAlgorithmConfiguration clusterAutoSwitchAlgorithm;
     
+    private ShardingSphereAlgorithmConfiguration dataConsistencyCheckAlgorithm;
+    
     private ModeConfiguration modeConfiguration;
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
index 6c92f37..22ae919 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
@@ -23,11 +23,12 @@ import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 import org.apache.shardingsphere.scaling.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.scaling.core.api.ScalingAPI;
 import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
+import org.apache.shardingsphere.scaling.core.api.ScalingDataConsistencyCheckAlgorithm;
 import org.apache.shardingsphere.scaling.core.common.constant.ScalingConstant;
 import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.job.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.scaling.core.util.ScalingTaskUtil;
-import org.apache.shardingsphere.scaling.core.util.ThreadUtil;
 
 import java.util.Map;
 
@@ -60,11 +61,17 @@ public final class FinishedCheckJob implements SimpleJob {
     
     private void trySwitch(final long jobId) {
         // TODO lock proxy
-        ThreadUtil.sleep(10 * 1000L);
-        if (dataConsistencyCheck(jobId)) {
-            scalingAPI.stop(jobId);
-            //TODO auto switch configuration
+        ScalingDataConsistencyCheckAlgorithm dataConsistencyCheckAlgorithm = ScalingContext.getInstance().getDataConsistencyCheckAlgorithm();
+        if (null != dataConsistencyCheckAlgorithm) {
+            if (!dataConsistencyCheck(jobId)) {
+                log.error("data consistency check failed, job {}", jobId);
+                return;
+            }
+        } else {
+            log.info("dataConsistencyCheckAlgorithm is not configured, data consistency check will be ignored.");
         }
+        scalingAPI.stop(jobId);
+        //TODO auto switch configuration
     }
     
     private boolean dataConsistencyCheck(final long jobId) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJobTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJobTest.java
index 5781adc..aed9058 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJobTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJobTest.java
@@ -29,7 +29,6 @@ import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
 import org.apache.shardingsphere.scaling.core.config.WorkflowConfiguration;
 import org.apache.shardingsphere.scaling.core.fixture.EmbedTestingServer;
-import org.apache.shardingsphere.scaling.core.job.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
 import org.apache.shardingsphere.scaling.core.util.ResourceUtil;
 import org.junit.AfterClass;
@@ -41,7 +40,6 @@ import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.Collections;
-import java.util.Map;
 import java.util.Properties;
 
 import static org.mockito.Mockito.when;
@@ -77,7 +75,6 @@ public final class FinishedCheckJobTest {
         when(governanceRepositoryAPI.getChildrenKeys(ScalingConstant.SCALING_ROOT)).thenReturn(Collections.singletonList("1"));
         when(scalingAPI.getJobConfig(1L)).thenReturn(mockJobConfigWithWorkflow());
         when(scalingAPI.getProgress(1L)).thenReturn(Collections.emptyMap());
-        when(scalingAPI.dataConsistencyCheck(1L)).thenReturn(mockDataConsistencyCheck());
         finishedCheckJob.execute(null);
     }
     
@@ -86,12 +83,6 @@ public final class FinishedCheckJobTest {
         ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "serverConfig", null);
     }
     
-    private Map<String, DataConsistencyCheckResult> mockDataConsistencyCheck() {
-        DataConsistencyCheckResult checkResult = new DataConsistencyCheckResult(1, 1);
-        checkResult.setDataValid(true);
-        return Collections.singletonMap("t_order", checkResult);
-    }
-    
     private static ServerConfiguration mockServerConfig() {
         ServerConfiguration result = new ServerConfiguration();
         result.setClusterAutoSwitchAlgorithm(new ShardingSphereAlgorithmConfiguration("Fixture", new Properties()));