You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2020/11/27 11:49:53 UTC

[shardingsphere] branch master updated: Split taskExecuteEngine to inventory and incremental DumperExecuteEngine (#8381)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 02b213e  Split taskExecuteEngine to inventory and incremental DumperExecuteEngine (#8381)
02b213e is described below

commit 02b213e640f3d4f03df9bbc948e04089a920733c
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Fri Nov 27 19:49:28 2020 +0800

    Split taskExecuteEngine to inventory and incremental DumperExecuteEngine (#8381)
    
    * Split taskExecuteEngine to inventoryDumperExecuteEngine and incrementalDumperExecuteEngine
    
    * Optimize ScalingTaskScheduler
    
    * Optimize ScalingTaskScheduler
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../scaling/web/HttpServerHandlerTest.java         |  2 +-
 .../scaling/web/HttpServerInitializerTest.java     |  2 +-
 .../scaling/core/config/ScalingContext.java        |  7 ++++--
 .../incremental/IncrementalDataScalingTask.java    |  2 +-
 .../core/schedule/ScalingTaskScheduler.java        | 27 +++++++++++++---------
 .../impl/StandaloneScalingJobServiceTest.java      |  2 +-
 6 files changed, 25 insertions(+), 17 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
index 522f1f3..2b0462e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
@@ -62,7 +62,7 @@ public final class HttpServerHandlerTest {
     @SneakyThrows(ReflectiveOperationException.class)
     public void setUp() {
         ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "serverConfig", new ServerConfiguration());
-        ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "taskExecuteEngine", mock(ShardingScalingExecuteEngine.class));
+        ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "inventoryDumperExecuteEngine", mock(ShardingScalingExecuteEngine.class));
         httpServerHandler = new HttpServerHandler();
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerInitializerTest.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerInitializerTest.java
index 69ae3ca..8387749 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerInitializerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerInitializerTest.java
@@ -50,7 +50,7 @@ public final class HttpServerInitializerTest {
     @SneakyThrows(ReflectiveOperationException.class)
     public void setUp() {
         ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "serverConfig", new ServerConfiguration());
-        ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "taskExecuteEngine", mock(ShardingScalingExecuteEngine.class));
+        ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "inventoryDumperExecuteEngine", mock(ShardingScalingExecuteEngine.class));
         when(socketChannel.pipeline()).thenReturn(channelPipeline);
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java
index ccd4b04..a4bdda0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java
@@ -33,7 +33,9 @@ public final class ScalingContext {
     
     private ServerConfiguration serverConfig;
     
-    private ShardingScalingExecuteEngine taskExecuteEngine;
+    private ShardingScalingExecuteEngine inventoryDumperExecuteEngine;
+    
+    private ShardingScalingExecuteEngine incrementalDumperExecuteEngine;
     
     private ShardingScalingExecuteEngine importerExecuteEngine;
     
@@ -53,7 +55,8 @@ public final class ScalingContext {
      */
     public void init(final ServerConfiguration serverConfig) {
         this.serverConfig = serverConfig;
-        taskExecuteEngine = ShardingScalingExecuteEngine.newCachedThreadInstance();
+        inventoryDumperExecuteEngine = ShardingScalingExecuteEngine.newFixedThreadInstance(serverConfig.getWorkerThread());
+        incrementalDumperExecuteEngine = ShardingScalingExecuteEngine.newCachedThreadInstance();
         importerExecuteEngine = ShardingScalingExecuteEngine.newFixedThreadInstance(serverConfig.getWorkerThread());
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
index 865b87f..488abfa 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
@@ -73,7 +73,7 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
         dumper = DumperFactory.newInstanceLogDumper(dumperConfig, getPositionManager().getPosition());
         Collection<Importer> importers = instanceImporters();
         instanceChannel(importers);
-        Future<?> future = ScalingContext.getInstance().getTaskExecuteEngine().submitAll(importers, new ExecuteCallback() {
+        Future<?> future = ScalingContext.getInstance().getIncrementalDumperExecuteEngine().submitAll(importers, new ExecuteCallback() {
             
             @Override
             public void onSuccess() {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
index a0486b2..1e88663 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
@@ -62,16 +62,22 @@ public final class ScalingTaskScheduler implements Runnable {
     
     @Override
     public void run() {
-        shardingScalingJob.setStatus(SyncTaskControlStatus.MIGRATE_INVENTORY_DATA.name());
-        if (ScalingTaskUtil.allInventoryTasksFinished(shardingScalingJob.getInventoryDataTasks())) {
+        if (executeInventoryDataSyncTask()) {
             executeIncrementalDataSyncTask();
-            return;
         }
-        log.info("Start inventory data sync task.");
+    }
+    
+    private synchronized boolean executeInventoryDataSyncTask() {
+        log.info("-------------- Start inventory data sync task --------------");
+        if (ScalingTaskUtil.allInventoryTasksFinished(shardingScalingJob.getInventoryDataTasks())) {
+            return true;
+        }
+        shardingScalingJob.setStatus(SyncTaskControlStatus.MIGRATE_INVENTORY_DATA.name());
         ExecuteCallback inventoryDataTaskCallback = createInventoryDataTaskCallback();
         for (ScalingTask each : shardingScalingJob.getInventoryDataTasks()) {
-            ScalingContext.getInstance().getTaskExecuteEngine().submit(each, inventoryDataTaskCallback);
+            ScalingContext.getInstance().getInventoryDumperExecuteEngine().submit(each, inventoryDataTaskCallback);
         }
+        return false;
     }
     
     private ExecuteCallback createInventoryDataTaskCallback() {
@@ -93,17 +99,16 @@ public final class ScalingTaskScheduler implements Runnable {
         };
     }
     
-    private void executeIncrementalDataSyncTask() {
-        log.info("Start incremental data sync task.");
-        if (!SyncTaskControlStatus.MIGRATE_INVENTORY_DATA.name().equals(shardingScalingJob.getStatus())) {
-            shardingScalingJob.setStatus(SyncTaskControlStatus.STOPPED.name());
+    private synchronized void executeIncrementalDataSyncTask() {
+        log.info("-------------- Start incremental data sync task --------------");
+        if (SyncTaskControlStatus.SYNCHRONIZE_INCREMENTAL_DATA.name().equals(shardingScalingJob.getStatus())) {
             return;
         }
+        shardingScalingJob.setStatus(SyncTaskControlStatus.SYNCHRONIZE_INCREMENTAL_DATA.name());
         ExecuteCallback incrementalDataTaskCallback = createIncrementalDataTaskCallback();
         for (ScalingTask each : shardingScalingJob.getIncrementalDataTasks()) {
-            ScalingContext.getInstance().getTaskExecuteEngine().submit(each, incrementalDataTaskCallback);
+            ScalingContext.getInstance().getIncrementalDumperExecuteEngine().submit(each, incrementalDataTaskCallback);
         }
-        shardingScalingJob.setStatus(SyncTaskControlStatus.SYNCHRONIZE_INCREMENTAL_DATA.name());
     }
     
     private ExecuteCallback createIncrementalDataTaskCallback() {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
index d200f7a..40accb2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
@@ -55,7 +55,7 @@ public final class StandaloneScalingJobServiceTest {
     @SneakyThrows(ReflectiveOperationException.class)
     public void setUp() {
         ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "serverConfig", new ServerConfiguration());
-        ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "taskExecuteEngine", mock(ShardingScalingExecuteEngine.class));
+        ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "inventoryDumperExecuteEngine", mock(ShardingScalingExecuteEngine.class));
     }
     
     @Test