You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/05/09 00:38:28 UTC

[pulsar] branch branch-2.7 updated: [Branch-2.7] Fix deadlock on Monitoring thread blocked by LeaderService.isLeader() (#10512)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new e24bf54  [Branch-2.7] Fix deadlock on Monitoring thread blocked by LeaderService.isLeader() (#10512)
e24bf54 is described below

commit e24bf549d06361db93234edb19f91fa90dadba8c
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Sun May 9 08:37:33 2021 +0800

    [Branch-2.7] Fix deadlock on Monitoring thread blocked by LeaderService.isLeader() (#10512)
    
    ---
    
    Fixes #10235
    
    According to #10235, when `LeaderService` is changing leadership status
    (like losing leadership, or becoming a leader), the `LeaderService` will
    be locked with `synchronized` block. Which will block other threads if
    calling `LeaderService.isLeader()`. This PR changes `ClusterServiceCoordinator`
    and `WorkerStatsManager` to check if is leader from `MembershipManager`,
    which will not block other threads if `LeaderService` is at `synchronized` block.
    
    Also, this PR will not resolve the root cause of #10235, since there is
    lack of context about blocked reader for the `FunctionAssignmentTailer`.
    
    - [ ] Make sure that the change passes the CI checks.
    
    ---
    Original PR is https://github.com/apache/pulsar/pull/10502
---
 .../apache/pulsar/functions/worker/ClusterServiceCoordinator.java | 8 +++++---
 .../java/org/apache/pulsar/functions/worker/WorkerService.java    | 7 ++++++-
 .../org/apache/pulsar/functions/worker/WorkerStatsManager.java    | 6 +++++-
 .../pulsar/functions/worker/ClusterServiceCoordinatorTest.java    | 6 +++++-
 4 files changed, 21 insertions(+), 6 deletions(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
index c2bde9d..c1682a8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
@@ -29,6 +29,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Supplier;
 
 @Slf4j
 public class ClusterServiceCoordinator implements AutoCloseable {
@@ -49,10 +50,12 @@ public class ClusterServiceCoordinator implements AutoCloseable {
     private final Map<String, TimerTaskInfo> tasks = new HashMap<>();
     private final ScheduledExecutorService executor;
     private final LeaderService leaderService;
+    private final Supplier<Boolean> isLeader;
 
-    public ClusterServiceCoordinator(String workerId, LeaderService leaderService) {
+    public ClusterServiceCoordinator(String workerId, LeaderService leaderService, Supplier<Boolean> isLeader) {
         this.workerId = workerId;
         this.leaderService = leaderService;
+        this.isLeader = isLeader;
         this.executor = Executors.newSingleThreadScheduledExecutor(
             new ThreadFactoryBuilder().setNameFormat("cluster-service-coordinator-timer").build());
     }
@@ -67,8 +70,7 @@ public class ClusterServiceCoordinator implements AutoCloseable {
             TimerTaskInfo timerTaskInfo = entry.getValue();
             String taskName = entry.getKey();
             this.executor.scheduleAtFixedRate(() -> {
-                boolean isLeader = leaderService.isLeader();
-                if (isLeader) {
+                if (isLeader.get()) {
                     try {
                         timerTaskInfo.getTask().run();
                     } catch (Exception e) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 4af8aad..d0218b4 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import java.net.URI;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Supplier;
 
 /**
  * A service component contains everything to run a worker except rest server.
@@ -228,6 +229,8 @@ public class WorkerService {
             log.info("/** Initializing Runtime Manager **/");
 
             MessageId lastAssignmentMessageId = functionRuntimeManager.initialize();
+            Supplier<Boolean> checkIsStillLeader =
+                () -> membershipManager.getLeader().getWorkerId().equals(workerConfig.getWorkerId());
 
             // Setting references to managers in scheduler
             schedulerManager.setFunctionMetaDataManager(functionMetaDataManager);
@@ -250,7 +253,8 @@ public class WorkerService {
             // Starting cluster services
             this.clusterServiceCoordinator = new ClusterServiceCoordinator(
                     workerConfig.getWorkerId(),
-                    leaderService);
+                    leaderService,
+                    checkIsStillLeader);
 
             clusterServiceCoordinator.addTask("membership-monitor",
                     workerConfig.getFailureCheckFreqMs(),
@@ -291,6 +295,7 @@ public class WorkerService {
             workerStatsManager.setFunctionRuntimeManager(functionRuntimeManager);
             workerStatsManager.setFunctionMetaDataManager(functionMetaDataManager);
             workerStatsManager.setLeaderService(leaderService);
+            workerStatsManager.setIsLeader(checkIsStillLeader);
             workerStatsManager.startupTimeEnd();
         } catch (Throwable t) {
             log.error("Error Starting up in worker", t);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
index 703b131..78cfdb1 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.functions.proto.Function;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.List;
+import java.util.function.Supplier;
 
 public class WorkerStatsManager {
 
@@ -66,6 +67,9 @@ public class WorkerStatsManager {
   @Setter
   private LeaderService leaderService;
 
+  @Setter
+  private Supplier<Boolean> isLeader;
+
   private CollectorRegistry collectorRegistry = new CollectorRegistry();
 
   private final Summary statWorkerStartupTime;
@@ -279,7 +283,7 @@ public class WorkerStatsManager {
   }
 
   private void generateLeaderMetrics(StringWriter stream) {
-    if (leaderService.isLeader()) {
+    if (isLeader.get()) {
 
       List<Function.FunctionMetaData> metadata = functionMetaDataManager.getAllFunctionMetaData();
       // get total number functions
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java
index a9e33da..63246e5 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java
@@ -30,6 +30,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
 import org.apache.pulsar.functions.worker.executor.MockExecutorController;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -56,6 +58,7 @@ public class ClusterServiceCoordinatorTest {
     private ClusterServiceCoordinator coordinator;
     private ScheduledExecutorService mockExecutor;
     private MockExecutorController mockExecutorController;
+    private Supplier<Boolean> checkIsStillLeader;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -71,7 +74,8 @@ public class ClusterServiceCoordinatorTest {
         ).thenReturn(mockExecutor);
 
         this.leaderService = mock(LeaderService.class);
-        this.coordinator = new ClusterServiceCoordinator("test-coordinator", leaderService);
+        this.checkIsStillLeader = () -> leaderService.isLeader();
+        this.coordinator = new ClusterServiceCoordinator("test-coordinator", leaderService, checkIsStillLeader);
     }