You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/08/09 12:47:37 UTC
[shardingsphere] branch master updated: Decouple RuleAlteredJobScheduler and RuleAlteredJobContext for common usage (#19971)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f5e90f7c564 Decouple RuleAlteredJobScheduler and RuleAlteredJobContext for common usage (#19971)
f5e90f7c564 is described below
commit f5e90f7c56488bd5fd592ad57ca28f868bfe4daa
Author: Da Xiang Huang <lo...@foxmail.com>
AuthorDate: Tue Aug 9 20:47:31 2022 +0800
Decouple RuleAlteredJobScheduler and RuleAlteredJobContext for common usage (#19971)
---
.../pipeline/api/context/PipelineJobContext.java | 12 +++++
.../scenario/rulealtered/RuleAlteredJob.java | 2 +-
.../rulealtered/RuleAlteredJobScheduler.java | 60 +++++++++++++++++-----
3 files changed, 61 insertions(+), 13 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobContext.java
index b2d92bccf0b..b4b621eeeeb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobContext.java
@@ -66,4 +66,16 @@ public interface PipelineJobContext {
* @return job process context
*/
PipelineProcessContext getJobProcessContext();
+
+ /**
+ * Set stopping.
+ * @param stopping stopping
+ */
+ void setStopping(boolean stopping);
+
+ /**
+ * Get stopping.
+ * @return stopping
+ */
+ boolean isStopping();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index 236a3f17881..600533f1041 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -63,7 +63,7 @@ public final class RuleAlteredJob extends AbstractPipelineJob implements SimpleJ
return;
}
log.info("start RuleAlteredJobScheduler, jobId={}, shardingItem={}", getJobId(), shardingItem);
- RuleAlteredJobScheduler jobScheduler = new RuleAlteredJobScheduler(jobContext);
+ RuleAlteredJobScheduler jobScheduler = new RuleAlteredJobScheduler(jobContext, jobContext.getInventoryTasks(), jobContext.getIncrementalTasks());
runInBackground(() -> {
prepare(jobContext);
jobScheduler.start();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index 7ae9c65e99b..9290bb2b9f3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -19,16 +19,23 @@ package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import java.util.Collection;
+
/**
* Rule altered job scheduler.
*/
@@ -37,8 +44,27 @@ import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@Getter
public final class RuleAlteredJobScheduler implements PipelineTasksRunner {
- private final RuleAlteredJobContext jobContext;
-
+ private final PipelineJobContext jobContext;
+
+ private final Collection<InventoryTask> inventoryTasks;
+
+ private final Collection<IncrementalTask> incrementalTasks;
+
+ private final LazyInitializer<ExecuteEngine> inventoryDumperExecuteEngineLazyInitializer = new LazyInitializer<ExecuteEngine>() {
+
+ @Override
+ protected ExecuteEngine initialize() {
+ return ExecuteEngine.newCachedThreadInstance("Inventory-" + jobContext.getJobId());
+ }
+ };
+
+ private final LazyInitializer<ExecuteEngine> incrementalDumperExecuteEngineLazyInitializer = new LazyInitializer<ExecuteEngine>() {
+ @Override
+ protected ExecuteEngine initialize() {
+ return ExecuteEngine.newCachedThreadInstance("Incremental-" + jobContext.getJobId());
+ }
+ };
+
/**
* Stop all task.
*/
@@ -46,12 +72,12 @@ public final class RuleAlteredJobScheduler implements PipelineTasksRunner {
jobContext.setStopping(true);
log.info("stop, jobId={}, shardingItem={}", jobContext.getJobId(), jobContext.getShardingItem());
// TODO blocking stop
- for (InventoryTask each : jobContext.getInventoryTasks()) {
+ for (InventoryTask each : getInventoryTasks()) {
log.info("stop inventory task {} - {}", jobContext.getJobId(), each.getTaskId());
each.stop();
each.close();
}
- for (IncrementalTask each : jobContext.getIncrementalTasks()) {
+ for (IncrementalTask each : getIncrementalTasks()) {
log.info("stop incremental task {} - {}", jobContext.getJobId(), each.getTaskId());
each.stop();
each.close();
@@ -75,28 +101,33 @@ public final class RuleAlteredJobScheduler implements PipelineTasksRunner {
}
private synchronized boolean executeInventoryTask() {
- if (PipelineJobProgressDetector.allInventoryTasksFinished(jobContext.getInventoryTasks())) {
+ if (PipelineJobProgressDetector.allInventoryTasksFinished(getInventoryTasks())) {
log.info("All inventory tasks finished.");
return true;
}
log.info("-------------- Start inventory task --------------");
jobContext.setStatus(JobStatus.EXECUTE_INVENTORY_TASK);
ExecuteCallback inventoryTaskCallback = createInventoryTaskCallback();
- for (InventoryTask each : jobContext.getInventoryTasks()) {
+ for (InventoryTask each : getInventoryTasks()) {
if (each.getProgress().getPosition() instanceof FinishedPosition) {
continue;
}
- jobContext.getJobProcessContext().getInventoryDumperExecuteEngine().submit(each, inventoryTaskCallback);
+ getInventoryDumperExecuteEngine().submit(each, inventoryTaskCallback);
}
return false;
}
-
+
+ @SneakyThrows(ConcurrentException.class)
+ private ExecuteEngine getInventoryDumperExecuteEngine() {
+ return inventoryDumperExecuteEngineLazyInitializer.get();
+ }
+
private ExecuteCallback createInventoryTaskCallback() {
return new ExecuteCallback() {
@Override
public void onSuccess() {
- if (PipelineJobProgressDetector.allInventoryTasksFinished(jobContext.getInventoryTasks())) {
+ if (PipelineJobProgressDetector.allInventoryTasksFinished(getInventoryTasks())) {
log.info("onSuccess, all inventory tasks finished.");
executeIncrementalTask();
}
@@ -119,14 +150,19 @@ public final class RuleAlteredJobScheduler implements PipelineTasksRunner {
log.info("-------------- Start incremental task --------------");
jobContext.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK);
ExecuteCallback incrementalTaskCallback = createIncrementalTaskCallback();
- for (IncrementalTask each : jobContext.getIncrementalTasks()) {
+ for (IncrementalTask each : getIncrementalTasks()) {
if (each.getProgress().getPosition() instanceof FinishedPosition) {
continue;
}
- jobContext.getJobProcessContext().getIncrementalDumperExecuteEngine().submit(each, incrementalTaskCallback);
+ getIncrementalDumperExecuteEngine().submit(each, incrementalTaskCallback);
}
}
-
+
+ @SneakyThrows(ConcurrentException.class)
+ private ExecuteEngine getIncrementalDumperExecuteEngine() {
+ return incrementalDumperExecuteEngineLazyInitializer.get();
+ }
+
private ExecuteCallback createIncrementalTaskCallback() {
return new ExecuteCallback() {