You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2021/05/08 00:19:35 UTC
[pulsar] branch master updated: [fix #10235] fix deadlock on
Monitoring thread blocked by LeaderService.isLeader() (#10502)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fb1d5de [fix #10235] fix deadlock on Monitoring thread blocked by LeaderService.isLeader() (#10502)
fb1d5de is described below
commit fb1d5deb6fe7783672d53127c426ac6e319f9e06
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Sat May 8 08:19:07 2021 +0800
[fix #10235] fix deadlock on Monitoring thread blocked by LeaderService.isLeader() (#10502)
Fixes #10235
### Motivation
According to #10235, when `LeaderService` is changing leadership status (like losing leadership, or becoming a leader), the `LeaderService` will be locked with `synchronized` block. Which will block other threads if calling `LeaderService.isLeader()`. This PR changes `ClusterServiceCoordinator` and `WorkerStatsManager` to check if is leader from `MembershipManager`, which will not block other threads if `LeaderService` is at `synchronized` block.
Also, this PR will not resolve the root cause of #10235, since there is lack of context about blocked reader for the `FunctionAssignmentTailer`.
---
.../apache/pulsar/functions/worker/ClusterServiceCoordinator.java | 8 +++++---
.../org/apache/pulsar/functions/worker/PulsarWorkerService.java | 6 +++++-
.../org/apache/pulsar/functions/worker/WorkerStatsManager.java | 6 +++++-
.../pulsar/functions/worker/ClusterServiceCoordinatorTest.java | 6 +++++-
4 files changed, 20 insertions(+), 6 deletions(-)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
index c2bde9d..c1682a8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
@@ -29,6 +29,7 @@ import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
+import java.util.function.Supplier;
@Slf4j
public class ClusterServiceCoordinator implements AutoCloseable {
@@ -49,10 +50,12 @@ public class ClusterServiceCoordinator implements AutoCloseable {
private final Map<String, TimerTaskInfo> tasks = new HashMap<>();
private final ScheduledExecutorService executor;
private final LeaderService leaderService;
+ private final Supplier<Boolean> isLeader;
- public ClusterServiceCoordinator(String workerId, LeaderService leaderService) {
+ public ClusterServiceCoordinator(String workerId, LeaderService leaderService, Supplier<Boolean> isLeader) {
this.workerId = workerId;
this.leaderService = leaderService;
+ this.isLeader = isLeader;
this.executor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("cluster-service-coordinator-timer").build());
}
@@ -67,8 +70,7 @@ public class ClusterServiceCoordinator implements AutoCloseable {
TimerTaskInfo timerTaskInfo = entry.getValue();
String taskName = entry.getKey();
this.executor.scheduleAtFixedRate(() -> {
- boolean isLeader = leaderService.isLeader();
- if (isLeader) {
+ if (isLeader.get()) {
try {
timerTaskInfo.getTask().run();
} catch (Exception e) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index 1601a98..d15a594 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -32,6 +32,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Supplier;
import javax.ws.rs.core.Response;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -498,6 +499,7 @@ public class PulsarWorkerService implements WorkerService {
log.info("/** Initializing Runtime Manager **/");
MessageId lastAssignmentMessageId = functionRuntimeManager.initialize();
+ Supplier<Boolean> checkIsStillLeader = () -> membershipManager.getLeader().getWorkerId().equals(workerConfig.getWorkerId());
// Setting references to managers in scheduler
schedulerManager.setFunctionMetaDataManager(functionMetaDataManager);
@@ -520,7 +522,8 @@ public class PulsarWorkerService implements WorkerService {
// Starting cluster services
this.clusterServiceCoordinator = new ClusterServiceCoordinator(
workerConfig.getWorkerId(),
- leaderService);
+ leaderService,
+ checkIsStillLeader);
clusterServiceCoordinator.addTask("membership-monitor",
workerConfig.getFailureCheckFreqMs(),
@@ -561,6 +564,7 @@ public class PulsarWorkerService implements WorkerService {
workerStatsManager.setFunctionRuntimeManager(functionRuntimeManager);
workerStatsManager.setFunctionMetaDataManager(functionMetaDataManager);
workerStatsManager.setLeaderService(leaderService);
+ workerStatsManager.setIsLeader(checkIsStillLeader);
workerStatsManager.startupTimeEnd();
} catch (Throwable t) {
log.error("Error Starting up in worker", t);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
index 703b131..78cfdb1 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.functions.proto.Function;
import java.io.IOException;
import java.io.StringWriter;
import java.util.List;
+import java.util.function.Supplier;
public class WorkerStatsManager {
@@ -66,6 +67,9 @@ public class WorkerStatsManager {
@Setter
private LeaderService leaderService;
+ @Setter
+ private Supplier<Boolean> isLeader;
+
private CollectorRegistry collectorRegistry = new CollectorRegistry();
private final Summary statWorkerStartupTime;
@@ -279,7 +283,7 @@ public class WorkerStatsManager {
}
private void generateLeaderMetrics(StringWriter stream) {
- if (leaderService.isLeader()) {
+ if (isLeader.get()) {
List<Function.FunctionMetaData> metadata = functionMetaDataManager.getAllFunctionMetaData();
// get total number functions
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java
index 7b44ba4..d0f612d 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java
@@ -30,6 +30,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
import org.apache.pulsar.functions.worker.executor.MockExecutorController;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -56,6 +58,7 @@ public class ClusterServiceCoordinatorTest {
private ClusterServiceCoordinator coordinator;
private ScheduledExecutorService mockExecutor;
private MockExecutorController mockExecutorController;
+ private Supplier<Boolean> checkIsStillLeader;
@BeforeMethod
public void setup() throws Exception {
@@ -71,7 +74,8 @@ public class ClusterServiceCoordinatorTest {
).thenReturn(mockExecutor);
this.leaderService = mock(LeaderService.class);
- this.coordinator = new ClusterServiceCoordinator("test-coordinator", leaderService);
+ this.checkIsStillLeader = () -> leaderService.isLeader();
+ this.coordinator = new ClusterServiceCoordinator("test-coordinator", leaderService, checkIsStillLeader);
}