You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/08/09 12:47:37 UTC

[shardingsphere] branch master updated: Decouple RuleAlteredJobScheduler and RuleAlteredJobContext for common usage (#19971)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f5e90f7c564 Decouple RuleAlteredJobScheduler and RuleAlteredJobContext for common usage (#19971)
f5e90f7c564 is described below

commit f5e90f7c56488bd5fd592ad57ca28f868bfe4daa
Author: Da Xiang Huang <lo...@foxmail.com>
AuthorDate: Tue Aug 9 20:47:31 2022 +0800

    Decouple RuleAlteredJobScheduler and RuleAlteredJobContext for common usage (#19971)
---
 .../pipeline/api/context/PipelineJobContext.java   | 12 +++++
 .../scenario/rulealtered/RuleAlteredJob.java       |  2 +-
 .../rulealtered/RuleAlteredJobScheduler.java       | 60 +++++++++++++++++-----
 3 files changed, 61 insertions(+), 13 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobContext.java
index b2d92bccf0b..b4b621eeeeb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobContext.java
@@ -66,4 +66,16 @@ public interface PipelineJobContext {
      * @return job process context
      */
     PipelineProcessContext getJobProcessContext();
+
+    /**
+     * Set stopping.
+     * @param stopping stopping
+     */
+    void setStopping(boolean stopping);
+
+    /**
+     * Get stopping.
+     * @return stopping
+     */
+    boolean isStopping();
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index 236a3f17881..600533f1041 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -63,7 +63,7 @@ public final class RuleAlteredJob extends AbstractPipelineJob implements SimpleJ
             return;
         }
         log.info("start RuleAlteredJobScheduler, jobId={}, shardingItem={}", getJobId(), shardingItem);
-        RuleAlteredJobScheduler jobScheduler = new RuleAlteredJobScheduler(jobContext);
+        RuleAlteredJobScheduler jobScheduler = new RuleAlteredJobScheduler(jobContext, jobContext.getInventoryTasks(), jobContext.getIncrementalTasks());
         runInBackground(() -> {
             prepare(jobContext);
             jobScheduler.start();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index 7ae9c65e99b..9290bb2b9f3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -19,16 +19,23 @@ package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.LazyInitializer;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 
+import java.util.Collection;
+
 /**
  * Rule altered job scheduler.
  */
@@ -37,8 +44,27 @@ import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 @Getter
 public final class RuleAlteredJobScheduler implements PipelineTasksRunner {
     
-    private final RuleAlteredJobContext jobContext;
-    
+    private final PipelineJobContext jobContext;
+
+    private final Collection<InventoryTask> inventoryTasks;
+
+    private final Collection<IncrementalTask> incrementalTasks;
+
+    private final LazyInitializer<ExecuteEngine> inventoryDumperExecuteEngineLazyInitializer = new LazyInitializer<ExecuteEngine>() {
+
+        @Override
+        protected ExecuteEngine initialize() {
+            return ExecuteEngine.newCachedThreadInstance("Inventory-" + jobContext.getJobId());
+        }
+    };
+
+    private final LazyInitializer<ExecuteEngine> incrementalDumperExecuteEngineLazyInitializer = new LazyInitializer<ExecuteEngine>() {
+        @Override
+        protected ExecuteEngine initialize() {
+            return ExecuteEngine.newCachedThreadInstance("Incremental-" + jobContext.getJobId());
+        }
+    };
+
     /**
      * Stop all task.
      */
@@ -46,12 +72,12 @@ public final class RuleAlteredJobScheduler implements PipelineTasksRunner {
         jobContext.setStopping(true);
         log.info("stop, jobId={}, shardingItem={}", jobContext.getJobId(), jobContext.getShardingItem());
         // TODO blocking stop
-        for (InventoryTask each : jobContext.getInventoryTasks()) {
+        for (InventoryTask each : getInventoryTasks()) {
             log.info("stop inventory task {} - {}", jobContext.getJobId(), each.getTaskId());
             each.stop();
             each.close();
         }
-        for (IncrementalTask each : jobContext.getIncrementalTasks()) {
+        for (IncrementalTask each : getIncrementalTasks()) {
             log.info("stop incremental task {} - {}", jobContext.getJobId(), each.getTaskId());
             each.stop();
             each.close();
@@ -75,28 +101,33 @@ public final class RuleAlteredJobScheduler implements PipelineTasksRunner {
     }
     
     private synchronized boolean executeInventoryTask() {
-        if (PipelineJobProgressDetector.allInventoryTasksFinished(jobContext.getInventoryTasks())) {
+        if (PipelineJobProgressDetector.allInventoryTasksFinished(getInventoryTasks())) {
             log.info("All inventory tasks finished.");
             return true;
         }
         log.info("-------------- Start inventory task --------------");
         jobContext.setStatus(JobStatus.EXECUTE_INVENTORY_TASK);
         ExecuteCallback inventoryTaskCallback = createInventoryTaskCallback();
-        for (InventoryTask each : jobContext.getInventoryTasks()) {
+        for (InventoryTask each : getInventoryTasks()) {
             if (each.getProgress().getPosition() instanceof FinishedPosition) {
                 continue;
             }
-            jobContext.getJobProcessContext().getInventoryDumperExecuteEngine().submit(each, inventoryTaskCallback);
+            getInventoryDumperExecuteEngine().submit(each, inventoryTaskCallback);
         }
         return false;
     }
-    
+
+    @SneakyThrows(ConcurrentException.class)
+    private ExecuteEngine getInventoryDumperExecuteEngine() {
+        return inventoryDumperExecuteEngineLazyInitializer.get();
+    }
+
     private ExecuteCallback createInventoryTaskCallback() {
         return new ExecuteCallback() {
             
             @Override
             public void onSuccess() {
-                if (PipelineJobProgressDetector.allInventoryTasksFinished(jobContext.getInventoryTasks())) {
+                if (PipelineJobProgressDetector.allInventoryTasksFinished(getInventoryTasks())) {
                     log.info("onSuccess, all inventory tasks finished.");
                     executeIncrementalTask();
                 }
@@ -119,14 +150,19 @@ public final class RuleAlteredJobScheduler implements PipelineTasksRunner {
         log.info("-------------- Start incremental task --------------");
         jobContext.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK);
         ExecuteCallback incrementalTaskCallback = createIncrementalTaskCallback();
-        for (IncrementalTask each : jobContext.getIncrementalTasks()) {
+        for (IncrementalTask each : getIncrementalTasks()) {
             if (each.getProgress().getPosition() instanceof FinishedPosition) {
                 continue;
             }
-            jobContext.getJobProcessContext().getIncrementalDumperExecuteEngine().submit(each, incrementalTaskCallback);
+            getIncrementalDumperExecuteEngine().submit(each, incrementalTaskCallback);
         }
     }
-    
+
+    @SneakyThrows(ConcurrentException.class)
+    private ExecuteEngine getIncrementalDumperExecuteEngine() {
+        return incrementalDumperExecuteEngineLazyInitializer.get();
+    }
+
     private ExecuteCallback createIncrementalTaskCallback() {
         return new ExecuteCallback() {