You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/04/14 00:12:00 UTC
[pulsar] branch master updated: Fix deadlock by consumer and reader
(#6728)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6d30414 Fix deadlock by consumer and reader (#6728)
6d30414 is described below
commit 6d304140b6205c6a2e94ad34bfd3cc5d16aca5d1
Author: k2la <mz...@gmail.com>
AuthorDate: Tue Apr 14 09:11:51 2020 +0900
Fix deadlock by consumer and reader (#6728)
### Motivation
Broker servers were not able to connect clients when consumers and readers connected to broker servers at almost the same time.
This happened in v2.4.2 and master branch.
As the following threaddump at that time:
```
"bookkeeper-ml-workers-OrderedExecutor-5-0" #52 prio=5 os_prio=0 tid=0x00007ff425fd0800 nid=0x28bf waiting on condition [0x00007ff3478f6000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000750c51a00> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:638)
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:712)
- locked <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:631)
at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:578)
at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.pulsar.broker.service.BrokerService$2.lambda$openLedgerComplete$1(BrokerService.java:687)
at org.apache.pulsar.broker.service.BrokerService$2$$Lambda$229/1013432130.run(Unknown Source)
at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
at org.apache.pulsar.broker.service.BrokerService$2.openLedgerComplete(BrokerService.java:680)
at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$7(ManagedLedgerFactoryImpl.java:328)
at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$184/272111809.accept(Unknown Source)
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:316)
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3$1.operationComplete(ManagedLedgerImpl.java:464)
at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:276)
at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:249)
at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$7(MetaStoreImplZookeeper.java:241)
at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper$$Lambda$584/1125537287.run(Unknown Source)
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
...
"ForkJoinPool.commonPool-worker-36" #1043 daemon prio=5 os_prio=0 tid=0x00007ff34c0ce800 nid=0x26f2 waiting for monitor entry [0x00007ff32d2eb000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:856)
- waiting to lock <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:684)
at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$572/174683985.apply(Unknown Source)
at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274)
at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:667)
at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:579)
at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
at org.apache.pulsar.broker.service.ServerCnx$$Lambda$458/375938934.apply(Unknown Source)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
```
`PersistentTopic#getDurableSubscription` locked `ConcurrentOpenHashMap` after locking `ManagedLedgerImpl`.
( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
On the other hand, `PersistentTopic#getNonDurableSubscription` tried to lock `ManagedLedgerImpl` after trying to lock `ConcurrentOpenHashMap`.
( `ConcurrentOpenHashMap` => `ManagedLedgerImpl`)
So, it seems that deadlock happens.
### Modifications
Fixed as `PersistentTopic#getNonDurableSubscription` try to lock `ConcurrentOpenHashMap` after trying to lock `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
---
.../broker/service/persistent/PersistentTopic.java | 76 +++++++++++-----------
1 file changed, 39 insertions(+), 37 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1b2822f..c5c5cee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -692,49 +692,51 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
log.info("[{}][{}] Creating non-durable subscription at msg id {}", topic, subscriptionName, startMessageId);
- // Create a new non-durable cursor only for the first consumer that connects
- Subscription subscription = subscriptions.computeIfAbsent(subscriptionName, name -> {
- MessageIdImpl msgId = startMessageId != null ? (MessageIdImpl) startMessageId
- : (MessageIdImpl) MessageId.latest;
-
- long ledgerId = msgId.getLedgerId();
- long entryId = msgId.getEntryId();
- if (ledgerId >= 0
- && msgId instanceof BatchMessageIdImpl) {
- // When the start message is relative to a batch, we need to take one step back on the previous message,
- // because the "batch" might not have been consumed in its entirety.
- // The client will then be able to discard the first messages if needed.
- entryId = msgId.getEntryId() - 1;
- }
+ synchronized (ledger) {
+ // Create a new non-durable cursor only for the first consumer that connects
+ Subscription subscription = subscriptions.computeIfAbsent(subscriptionName, name -> {
+ MessageIdImpl msgId = startMessageId != null ? (MessageIdImpl) startMessageId
+ : (MessageIdImpl) MessageId.latest;
+
+ long ledgerId = msgId.getLedgerId();
+ long entryId = msgId.getEntryId();
+ if (ledgerId >= 0
+ && msgId instanceof BatchMessageIdImpl) {
+ // When the start message is relative to a batch, we need to take one step back on the previous message,
+ // because the "batch" might not have been consumed in its entirety.
+ // The client will then be able to discard the first messages if needed.
+ entryId = msgId.getEntryId() - 1;
+ }
- Position startPosition = new PositionImpl(ledgerId, entryId);
- ManagedCursor cursor = null;
- try {
- cursor = ledger.newNonDurableCursor(startPosition, subscriptionName);
- } catch (ManagedLedgerException e) {
- subscriptionFuture.completeExceptionally(e);
- }
+ Position startPosition = new PositionImpl(ledgerId, entryId);
+ ManagedCursor cursor = null;
+ try {
+ cursor = ledger.newNonDurableCursor(startPosition, subscriptionName);
+ } catch (ManagedLedgerException e) {
+ subscriptionFuture.completeExceptionally(e);
+ }
- return new PersistentSubscription(this, subscriptionName, cursor, false);
- });
+ return new PersistentSubscription(this, subscriptionName, cursor, false);
+ });
- if (!subscriptionFuture.isDone()) {
- if (startMessageRollbackDurationSec > 0) {
- long timestamp = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(startMessageRollbackDurationSec);
- subscription.resetCursor(timestamp).handle((s, ex) -> {
- if (ex != null) {
- log.warn("[{}] Failed to reset cursor {} position at timestamp {}", topic, subscriptionName,
- startMessageRollbackDurationSec);
- }
+ if (!subscriptionFuture.isDone()) {
+ if (startMessageRollbackDurationSec > 0) {
+ long timestamp = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(startMessageRollbackDurationSec);
+ subscription.resetCursor(timestamp).handle((s, ex) -> {
+ if (ex != null) {
+ log.warn("[{}] Failed to reset cursor {} position at timestamp {}", topic, subscriptionName,
+ startMessageRollbackDurationSec);
+ }
+ subscriptionFuture.complete(subscription);
+ return null;
+ });
+ } else {
subscriptionFuture.complete(subscription);
- return null;
- });
+ }
} else {
- subscriptionFuture.complete(subscription);
+ // failed to initialize managed-cursor: clean up created subscription
+ subscriptions.remove(subscriptionName);
}
- } else {
- // failed to initialize managed-cursor: clean up created subscription
- subscriptions.remove(subscriptionName);
}
return subscriptionFuture;