You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/05/19 03:39:43 UTC

[shardingsphere] branch master updated: Replace volatile to AtomicXxx in AbstractPipelineJob (#25780)

This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e3bdd18f8e3 Replace volatile to AtomicXxx in AbstractPipelineJob (#25780)
e3bdd18f8e3 is described below

commit e3bdd18f8e36e8d243cfba868f775d505695afb8
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Fri May 19 11:39:36 2023 +0800

    Replace volatile to AtomicXxx in AbstractPipelineJob (#25780)
---
 .../pipeline/core/job/AbstractPipelineJob.java     | 33 ++++++++++++++++------
 1 file changed, 25 insertions(+), 8 deletions(-)

diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 0b0d9bb394c..e9ab3c47e9b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.data.pipeline.core.job;
 
 import lombok.AccessLevel;
 import lombok.Getter;
-import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
@@ -41,6 +40,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Abstract pipeline job.
@@ -54,11 +55,9 @@ public abstract class AbstractPipelineJob implements PipelineJob {
     @Getter(AccessLevel.PROTECTED)
     private final PipelineJobAPI jobAPI;
     
-    @Getter
-    private volatile boolean stopping;
+    private final AtomicBoolean stopping = new AtomicBoolean(false);
     
-    @Setter
-    private volatile JobBootstrap jobBootstrap;
+    private final AtomicReference<JobBootstrap> jobBootstrap = new AtomicReference<>();
     
     private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new ConcurrentHashMap<>();
     
@@ -67,6 +66,24 @@ public abstract class AbstractPipelineJob implements PipelineJob {
         jobAPI = TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobId).getTypeName());
     }
     
+    /**
+     * Is stopping.
+     *
+     * @return whether job is stopping
+     */
+    public boolean isStopping() {
+        return stopping.get();
+    }
+    
+    /**
+     * Set job bootstrap.
+     *
+     * @param jobBootstrap job bootstrap
+     */
+    public void setJobBootstrap(final JobBootstrap jobBootstrap) {
+        this.jobBootstrap.set(jobBootstrap);
+    }
+    
     protected void prepare(final PipelineJobItemContext jobItemContext) {
         try {
             doPrepare(jobItemContext);
@@ -124,15 +141,15 @@ public abstract class AbstractPipelineJob implements PipelineJob {
     }
     
     private void innerStop() {
-        stopping = true;
+        stopping.set(true);
         log.info("stop tasks runner, jobId={}", jobId);
         for (PipelineTasksRunner each : tasksRunnerMap.values()) {
             each.stop();
         }
         Optional<ElasticJobListener> pipelineJobListener = ElasticJobServiceLoader.getCachedTypedServiceInstance(ElasticJobListener.class, PipelineElasticJobListener.class.getName());
         pipelineJobListener.ifPresent(jobListener -> awaitJobStopped((PipelineElasticJobListener) jobListener, jobId, TimeUnit.SECONDS.toMillis(2)));
-        if (null != jobBootstrap) {
-            jobBootstrap.shutdown();
+        if (null != jobBootstrap.get()) {
+            jobBootstrap.get().shutdown();
         }
     }