You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/05/19 03:39:43 UTC
[shardingsphere] branch master updated: Replace volatile to AtomicXxx in AbstractPipelineJob (#25780)
This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e3bdd18f8e3 Replace volatile to AtomicXxx in AbstractPipelineJob (#25780)
e3bdd18f8e3 is described below
commit e3bdd18f8e36e8d243cfba868f775d505695afb8
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Fri May 19 11:39:36 2023 +0800
Replace volatile to AtomicXxx in AbstractPipelineJob (#25780)
---
.../pipeline/core/job/AbstractPipelineJob.java | 33 ++++++++++++++++------
1 file changed, 25 insertions(+), 8 deletions(-)
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 0b0d9bb394c..e9ab3c47e9b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.AccessLevel;
import lombok.Getter;
-import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
@@ -41,6 +40,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Abstract pipeline job.
@@ -54,11 +55,9 @@ public abstract class AbstractPipelineJob implements PipelineJob {
@Getter(AccessLevel.PROTECTED)
private final PipelineJobAPI jobAPI;
- @Getter
- private volatile boolean stopping;
+ private final AtomicBoolean stopping = new AtomicBoolean(false);
- @Setter
- private volatile JobBootstrap jobBootstrap;
+ private final AtomicReference<JobBootstrap> jobBootstrap = new AtomicReference<>();
private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new ConcurrentHashMap<>();
@@ -67,6 +66,24 @@ public abstract class AbstractPipelineJob implements PipelineJob {
jobAPI = TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobId).getTypeName());
}
+ /**
+ * Is stopping.
+ *
+ * @return whether job is stopping
+ */
+ public boolean isStopping() {
+ return stopping.get();
+ }
+
+ /**
+ * Set job bootstrap.
+ *
+ * @param jobBootstrap job bootstrap
+ */
+ public void setJobBootstrap(final JobBootstrap jobBootstrap) {
+ this.jobBootstrap.set(jobBootstrap);
+ }
+
protected void prepare(final PipelineJobItemContext jobItemContext) {
try {
doPrepare(jobItemContext);
@@ -124,15 +141,15 @@ public abstract class AbstractPipelineJob implements PipelineJob {
}
private void innerStop() {
- stopping = true;
+ stopping.set(true);
log.info("stop tasks runner, jobId={}", jobId);
for (PipelineTasksRunner each : tasksRunnerMap.values()) {
each.stop();
}
Optional<ElasticJobListener> pipelineJobListener = ElasticJobServiceLoader.getCachedTypedServiceInstance(ElasticJobListener.class, PipelineElasticJobListener.class.getName());
pipelineJobListener.ifPresent(jobListener -> awaitJobStopped((PipelineElasticJobListener) jobListener, jobId, TimeUnit.SECONDS.toMillis(2)));
- if (null != jobBootstrap) {
- jobBootstrap.shutdown();
+ if (null != jobBootstrap.get()) {
+ jobBootstrap.get().shutdown();
}
}