You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2020/11/27 11:49:53 UTC
[shardingsphere] branch master updated: Split taskExecuteEngine to
inventory and incremental DumperExecuteEngine (#8381)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 02b213e Split taskExecuteEngine to inventory and incremental DumperExecuteEngine (#8381)
02b213e is described below
commit 02b213e640f3d4f03df9bbc948e04089a920733c
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Fri Nov 27 19:49:28 2020 +0800
Split taskExecuteEngine to inventory and incremental DumperExecuteEngine (#8381)
* Split taskExecuteEngine to inventoryDumperExecuteEngine and incrementalDumperExecuteEngine
* Optimize ScalingTaskScheduler
* Optimize ScalingTaskScheduler
Co-authored-by: qiulu3 <Lucas209910>
---
.../scaling/web/HttpServerHandlerTest.java | 2 +-
.../scaling/web/HttpServerInitializerTest.java | 2 +-
.../scaling/core/config/ScalingContext.java | 7 ++++--
.../incremental/IncrementalDataScalingTask.java | 2 +-
.../core/schedule/ScalingTaskScheduler.java | 27 +++++++++++++---------
.../impl/StandaloneScalingJobServiceTest.java | 2 +-
6 files changed, 25 insertions(+), 17 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
index 522f1f3..2b0462e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
@@ -62,7 +62,7 @@ public final class HttpServerHandlerTest {
@SneakyThrows(ReflectiveOperationException.class)
public void setUp() {
ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "serverConfig", new ServerConfiguration());
- ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "taskExecuteEngine", mock(ShardingScalingExecuteEngine.class));
+ ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "inventoryDumperExecuteEngine", mock(ShardingScalingExecuteEngine.class));
httpServerHandler = new HttpServerHandler();
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerInitializerTest.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerInitializerTest.java
index 69ae3ca..8387749 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerInitializerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerInitializerTest.java
@@ -50,7 +50,7 @@ public final class HttpServerInitializerTest {
@SneakyThrows(ReflectiveOperationException.class)
public void setUp() {
ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "serverConfig", new ServerConfiguration());
- ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "taskExecuteEngine", mock(ShardingScalingExecuteEngine.class));
+ ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "inventoryDumperExecuteEngine", mock(ShardingScalingExecuteEngine.class));
when(socketChannel.pipeline()).thenReturn(channelPipeline);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java
index ccd4b04..a4bdda0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java
@@ -33,7 +33,9 @@ public final class ScalingContext {
private ServerConfiguration serverConfig;
- private ShardingScalingExecuteEngine taskExecuteEngine;
+ private ShardingScalingExecuteEngine inventoryDumperExecuteEngine;
+
+ private ShardingScalingExecuteEngine incrementalDumperExecuteEngine;
private ShardingScalingExecuteEngine importerExecuteEngine;
@@ -53,7 +55,8 @@ public final class ScalingContext {
*/
public void init(final ServerConfiguration serverConfig) {
this.serverConfig = serverConfig;
- taskExecuteEngine = ShardingScalingExecuteEngine.newCachedThreadInstance();
+ inventoryDumperExecuteEngine = ShardingScalingExecuteEngine.newFixedThreadInstance(serverConfig.getWorkerThread());
+ incrementalDumperExecuteEngine = ShardingScalingExecuteEngine.newCachedThreadInstance();
importerExecuteEngine = ShardingScalingExecuteEngine.newFixedThreadInstance(serverConfig.getWorkerThread());
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
index 865b87f..488abfa 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
@@ -73,7 +73,7 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
dumper = DumperFactory.newInstanceLogDumper(dumperConfig, getPositionManager().getPosition());
Collection<Importer> importers = instanceImporters();
instanceChannel(importers);
- Future<?> future = ScalingContext.getInstance().getTaskExecuteEngine().submitAll(importers, new ExecuteCallback() {
+ Future<?> future = ScalingContext.getInstance().getIncrementalDumperExecuteEngine().submitAll(importers, new ExecuteCallback() {
@Override
public void onSuccess() {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
index a0486b2..1e88663 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
@@ -62,16 +62,22 @@ public final class ScalingTaskScheduler implements Runnable {
@Override
public void run() {
- shardingScalingJob.setStatus(SyncTaskControlStatus.MIGRATE_INVENTORY_DATA.name());
- if (ScalingTaskUtil.allInventoryTasksFinished(shardingScalingJob.getInventoryDataTasks())) {
+ if (executeInventoryDataSyncTask()) {
executeIncrementalDataSyncTask();
- return;
}
- log.info("Start inventory data sync task.");
+ }
+
+ private synchronized boolean executeInventoryDataSyncTask() {
+ log.info("-------------- Start inventory data sync task --------------");
+ if (ScalingTaskUtil.allInventoryTasksFinished(shardingScalingJob.getInventoryDataTasks())) {
+ return true;
+ }
+ shardingScalingJob.setStatus(SyncTaskControlStatus.MIGRATE_INVENTORY_DATA.name());
ExecuteCallback inventoryDataTaskCallback = createInventoryDataTaskCallback();
for (ScalingTask each : shardingScalingJob.getInventoryDataTasks()) {
- ScalingContext.getInstance().getTaskExecuteEngine().submit(each, inventoryDataTaskCallback);
+ ScalingContext.getInstance().getInventoryDumperExecuteEngine().submit(each, inventoryDataTaskCallback);
}
+ return false;
}
private ExecuteCallback createInventoryDataTaskCallback() {
@@ -93,17 +99,16 @@ public final class ScalingTaskScheduler implements Runnable {
};
}
- private void executeIncrementalDataSyncTask() {
- log.info("Start incremental data sync task.");
- if (!SyncTaskControlStatus.MIGRATE_INVENTORY_DATA.name().equals(shardingScalingJob.getStatus())) {
- shardingScalingJob.setStatus(SyncTaskControlStatus.STOPPED.name());
+ private synchronized void executeIncrementalDataSyncTask() {
+ log.info("-------------- Start incremental data sync task --------------");
+ if (SyncTaskControlStatus.SYNCHRONIZE_INCREMENTAL_DATA.name().equals(shardingScalingJob.getStatus())) {
return;
}
+ shardingScalingJob.setStatus(SyncTaskControlStatus.SYNCHRONIZE_INCREMENTAL_DATA.name());
ExecuteCallback incrementalDataTaskCallback = createIncrementalDataTaskCallback();
for (ScalingTask each : shardingScalingJob.getIncrementalDataTasks()) {
- ScalingContext.getInstance().getTaskExecuteEngine().submit(each, incrementalDataTaskCallback);
+ ScalingContext.getInstance().getIncrementalDumperExecuteEngine().submit(each, incrementalDataTaskCallback);
}
- shardingScalingJob.setStatus(SyncTaskControlStatus.SYNCHRONIZE_INCREMENTAL_DATA.name());
}
private ExecuteCallback createIncrementalDataTaskCallback() {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
index d200f7a..40accb2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
@@ -55,7 +55,7 @@ public final class StandaloneScalingJobServiceTest {
@SneakyThrows(ReflectiveOperationException.class)
public void setUp() {
ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "serverConfig", new ServerConfiguration());
- ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "taskExecuteEngine", mock(ShardingScalingExecuteEngine.class));
+ ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "inventoryDumperExecuteEngine", mock(ShardingScalingExecuteEngine.class));
}
@Test