You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/07/30 12:54:19 UTC

[rocketmq] branch develop updated: [ISSUE #4734]Fix DLedgerController startScheduling concurrency problem (#4735)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c489a70ba [ISSUE #4734]Fix DLedgerController startScheduling concurrency problem (#4735)
c489a70ba is described below

commit c489a70baaccfd8a6f65fad5a0f7064f9cc030a2
Author: mxsm <lj...@gmail.com>
AuthorDate: Sat Jul 30 20:54:06 2022 +0800

    [ISSUE #4734]Fix DLedgerController startScheduling concurrency problem (#4735)
---
 .../org/apache/rocketmq/controller/impl/DLedgerController.java   | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index f4a4a00de..3a5480140 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiPredicate;
 import java.util.function.Supplier;
 import org.apache.rocketmq.common.ServiceThread;
@@ -76,7 +77,7 @@ public class DLedgerController implements Controller {
     private final DLedgerControllerStateMachine statemachine;
     // Usr for checking whether the broker is alive
     private BiPredicate<String, String> brokerAlivePredicate;
-    private volatile boolean isScheduling = false;
+    private AtomicBoolean isScheduling = new AtomicBoolean(false);
 
     public DLedgerController(final ControllerConfig config, final BiPredicate<String, String> brokerAlivePredicate) {
         this(config, brokerAlivePredicate, null, null, null);
@@ -119,18 +120,16 @@ public class DLedgerController implements Controller {
 
     @Override
     public void startScheduling() {
-        if (!this.isScheduling) {
+        if (this.isScheduling.compareAndSet(false, true)) {
             log.info("Start scheduling controller events");
-            this.isScheduling = true;
             this.scheduler.start();
         }
     }
 
     @Override
     public void stopScheduling() {
-        if (this.isScheduling) {
+        if (this.isScheduling.compareAndSet(true, false)) {
             log.info("Stop scheduling controller events");
-            this.isScheduling = false;
             this.scheduler.shutdown(true);
         }
     }