You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/05/09 00:38:28 UTC
[pulsar] branch branch-2.7 updated: [Branch-2.7] Fix deadlock on
Monitoring thread blocked by LeaderService.isLeader() (#10512)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new e24bf54 [Branch-2.7] Fix deadlock on Monitoring thread blocked by LeaderService.isLeader() (#10512)
e24bf54 is described below
commit e24bf549d06361db93234edb19f91fa90dadba8c
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Sun May 9 08:37:33 2021 +0800
[Branch-2.7] Fix deadlock on Monitoring thread blocked by LeaderService.isLeader() (#10512)
---
Fixes #10235
According to #10235, when `LeaderService` is changing leadership status
(like losing leadership, or becoming a leader), the `LeaderService` will
be locked with `synchronized` block. Which will block other threads if
calling `LeaderService.isLeader()`. This PR changes `ClusterServiceCoordinator`
and `WorkerStatsManager` to check if is leader from `MembershipManager`,
which will not block other threads if `LeaderService` is at `synchronized` block.
Also, this PR will not resolve the root cause of #10235, since there is
lack of context about blocked reader for the `FunctionAssignmentTailer`.
- [ ] Make sure that the change passes the CI checks.
---
Original PR is https://github.com/apache/pulsar/pull/10502
---
.../apache/pulsar/functions/worker/ClusterServiceCoordinator.java | 8 +++++---
.../java/org/apache/pulsar/functions/worker/WorkerService.java | 7 ++++++-
.../org/apache/pulsar/functions/worker/WorkerStatsManager.java | 6 +++++-
.../pulsar/functions/worker/ClusterServiceCoordinatorTest.java | 6 +++++-
4 files changed, 21 insertions(+), 6 deletions(-)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
index c2bde9d..c1682a8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
@@ -29,6 +29,7 @@ import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
+import java.util.function.Supplier;
@Slf4j
public class ClusterServiceCoordinator implements AutoCloseable {
@@ -49,10 +50,12 @@ public class ClusterServiceCoordinator implements AutoCloseable {
private final Map<String, TimerTaskInfo> tasks = new HashMap<>();
private final ScheduledExecutorService executor;
private final LeaderService leaderService;
+ private final Supplier<Boolean> isLeader;
- public ClusterServiceCoordinator(String workerId, LeaderService leaderService) {
+ public ClusterServiceCoordinator(String workerId, LeaderService leaderService, Supplier<Boolean> isLeader) {
this.workerId = workerId;
this.leaderService = leaderService;
+ this.isLeader = isLeader;
this.executor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("cluster-service-coordinator-timer").build());
}
@@ -67,8 +70,7 @@ public class ClusterServiceCoordinator implements AutoCloseable {
TimerTaskInfo timerTaskInfo = entry.getValue();
String taskName = entry.getKey();
this.executor.scheduleAtFixedRate(() -> {
- boolean isLeader = leaderService.isLeader();
- if (isLeader) {
+ if (isLeader.get()) {
try {
timerTaskInfo.getTask().run();
} catch (Exception e) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 4af8aad..d0218b4 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import java.net.URI;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Supplier;
/**
* A service component contains everything to run a worker except rest server.
@@ -228,6 +229,8 @@ public class WorkerService {
log.info("/** Initializing Runtime Manager **/");
MessageId lastAssignmentMessageId = functionRuntimeManager.initialize();
+ Supplier<Boolean> checkIsStillLeader =
+ () -> membershipManager.getLeader().getWorkerId().equals(workerConfig.getWorkerId());
// Setting references to managers in scheduler
schedulerManager.setFunctionMetaDataManager(functionMetaDataManager);
@@ -250,7 +253,8 @@ public class WorkerService {
// Starting cluster services
this.clusterServiceCoordinator = new ClusterServiceCoordinator(
workerConfig.getWorkerId(),
- leaderService);
+ leaderService,
+ checkIsStillLeader);
clusterServiceCoordinator.addTask("membership-monitor",
workerConfig.getFailureCheckFreqMs(),
@@ -291,6 +295,7 @@ public class WorkerService {
workerStatsManager.setFunctionRuntimeManager(functionRuntimeManager);
workerStatsManager.setFunctionMetaDataManager(functionMetaDataManager);
workerStatsManager.setLeaderService(leaderService);
+ workerStatsManager.setIsLeader(checkIsStillLeader);
workerStatsManager.startupTimeEnd();
} catch (Throwable t) {
log.error("Error Starting up in worker", t);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
index 703b131..78cfdb1 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.functions.proto.Function;
import java.io.IOException;
import java.io.StringWriter;
import java.util.List;
+import java.util.function.Supplier;
public class WorkerStatsManager {
@@ -66,6 +67,9 @@ public class WorkerStatsManager {
@Setter
private LeaderService leaderService;
+ @Setter
+ private Supplier<Boolean> isLeader;
+
private CollectorRegistry collectorRegistry = new CollectorRegistry();
private final Summary statWorkerStartupTime;
@@ -279,7 +283,7 @@ public class WorkerStatsManager {
}
private void generateLeaderMetrics(StringWriter stream) {
- if (leaderService.isLeader()) {
+ if (isLeader.get()) {
List<Function.FunctionMetaData> metadata = functionMetaDataManager.getAllFunctionMetaData();
// get total number functions
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java
index a9e33da..63246e5 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java
@@ -30,6 +30,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
import org.apache.pulsar.functions.worker.executor.MockExecutorController;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -56,6 +58,7 @@ public class ClusterServiceCoordinatorTest {
private ClusterServiceCoordinator coordinator;
private ScheduledExecutorService mockExecutor;
private MockExecutorController mockExecutorController;
+ private Supplier<Boolean> checkIsStillLeader;
@BeforeMethod
public void setup() throws Exception {
@@ -71,7 +74,8 @@ public class ClusterServiceCoordinatorTest {
).thenReturn(mockExecutor);
this.leaderService = mock(LeaderService.class);
- this.coordinator = new ClusterServiceCoordinator("test-coordinator", leaderService);
+ this.checkIsStillLeader = () -> leaderService.isLeader();
+ this.coordinator = new ClusterServiceCoordinator("test-coordinator", leaderService, checkIsStillLeader);
}