You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:33:52 UTC

[pulsar] 06/38: Fix deadlock by consumer and reader (#6728)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0fd02bd7569cbeecd8e512b05ab958443738d308
Author: k2la <mz...@gmail.com>
AuthorDate: Tue Apr 14 09:11:51 2020 +0900

    Fix deadlock by consumer and reader (#6728)
    
    ### Motivation
    
    Broker servers were not able to connect clients when consumers and readers connected to broker servers at almost the same time.
    This happened in v2.4.2 and master branch.
    
    As the following threaddump at that time:
    ```
    "bookkeeper-ml-workers-OrderedExecutor-5-0" #52 prio=5 os_prio=0 tid=0x00007ff425fd0800 nid=0x28bf waiting on condition [0x00007ff3478f6000]
       java.lang.Thread.State: WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for  <0x0000000750c51a00> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
            at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
            at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
            at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
            at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
            at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:638)
            at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:712)
            - locked <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
            at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:631)
            at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:578)
            at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
            at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
            at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
            at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
            at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
            at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
            at org.apache.pulsar.broker.service.BrokerService$2.lambda$openLedgerComplete$1(BrokerService.java:687)
            at org.apache.pulsar.broker.service.BrokerService$2$$Lambda$229/1013432130.run(Unknown Source)
            at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
            at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
            at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
            at org.apache.pulsar.broker.service.BrokerService$2.openLedgerComplete(BrokerService.java:680)
            at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$7(ManagedLedgerFactoryImpl.java:328)
            at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$184/272111809.accept(Unknown Source)
            at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
            at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
            at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
            at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
            at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:316)
            at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3$1.operationComplete(ManagedLedgerImpl.java:464)
            at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:276)
            at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:249)
            at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$7(MetaStoreImplZookeeper.java:241)
            at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper$$Lambda$584/1125537287.run(Unknown Source)
            at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
            at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.lang.Thread.run(Thread.java:748)
    
    ...
    
    "ForkJoinPool.commonPool-worker-36" #1043 daemon prio=5 os_prio=0 tid=0x00007ff34c0ce800 nid=0x26f2 waiting for monitor entry [0x00007ff32d2eb000]
       java.lang.Thread.State: BLOCKED (on object monitor)
            at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:856)
            - waiting to lock <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
            at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:684)
            at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$572/174683985.apply(Unknown Source)
            at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274)
            at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
            at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:667)
            at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:579)
            at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
            at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
            at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
            at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
            at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
            at org.apache.pulsar.broker.service.ServerCnx$$Lambda$458/375938934.apply(Unknown Source)
            at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
            at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
            at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
            at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
            at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
            at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
            at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
            at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
            at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
            at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
    ```
    `PersistentTopic#getDurableSubscription` locked  `ConcurrentOpenHashMap` after locking `ManagedLedgerImpl`.
    ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
    
    On the other hand, `PersistentTopic#getNonDurableSubscription` tried to lock `ManagedLedgerImpl` after trying to lock `ConcurrentOpenHashMap`.
    ( `ConcurrentOpenHashMap` => `ManagedLedgerImpl`)
    
    So, it seems that deadlock happens.
    
    ### Modifications
    Fixed as `PersistentTopic#getNonDurableSubscription` try to lock `ConcurrentOpenHashMap` after trying to lock `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
    (cherry picked from commit 6d304140b6205c6a2e94ad34bfd3cc5d16aca5d1)
---
 .../broker/service/persistent/PersistentTopic.java | 76 +++++++++++-----------
 1 file changed, 39 insertions(+), 37 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 13a076a..18d371b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -671,49 +671,51 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
         log.info("[{}][{}] Creating non-durable subscription at msg id {}", topic, subscriptionName, startMessageId);
 
-        // Create a new non-durable cursor only for the first consumer that connects
-        Subscription subscription = subscriptions.computeIfAbsent(subscriptionName, name -> {
-            MessageIdImpl msgId = startMessageId != null ? (MessageIdImpl) startMessageId
-                    : (MessageIdImpl) MessageId.latest;
-
-            long ledgerId = msgId.getLedgerId();
-            long entryId = msgId.getEntryId();
-            if (ledgerId >= 0
-                && msgId instanceof BatchMessageIdImpl) {
-                // When the start message is relative to a batch, we need to take one step back on the previous message,
-                // because the "batch" might not have been consumed in its entirety.
-                // The client will then be able to discard the first messages if needed.
-                entryId = msgId.getEntryId() - 1;
-            }
+        synchronized (ledger) {
+            // Create a new non-durable cursor only for the first consumer that connects
+            Subscription subscription = subscriptions.computeIfAbsent(subscriptionName, name -> {
+                MessageIdImpl msgId = startMessageId != null ? (MessageIdImpl) startMessageId
+                        : (MessageIdImpl) MessageId.latest;
+
+                long ledgerId = msgId.getLedgerId();
+                long entryId = msgId.getEntryId();
+                if (ledgerId >= 0
+                        && msgId instanceof BatchMessageIdImpl) {
+                    // When the start message is relative to a batch, we need to take one step back on the previous message,
+                    // because the "batch" might not have been consumed in its entirety.
+                    // The client will then be able to discard the first messages if needed.
+                    entryId = msgId.getEntryId() - 1;
+                }
 
-            Position startPosition = new PositionImpl(ledgerId, entryId);
-            ManagedCursor cursor = null;
-            try {
-                cursor = ledger.newNonDurableCursor(startPosition, subscriptionName);
-            } catch (ManagedLedgerException e) {
-                subscriptionFuture.completeExceptionally(e);
-            }
+                Position startPosition = new PositionImpl(ledgerId, entryId);
+                ManagedCursor cursor = null;
+                try {
+                    cursor = ledger.newNonDurableCursor(startPosition, subscriptionName);
+                } catch (ManagedLedgerException e) {
+                    subscriptionFuture.completeExceptionally(e);
+                }
 
-            return new PersistentSubscription(this, subscriptionName, cursor, false);
-        });
+                return new PersistentSubscription(this, subscriptionName, cursor, false);
+            });
 
-        if (!subscriptionFuture.isDone()) {
-            if (startMessageRollbackDurationSec > 0) {
-                long timestamp = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(startMessageRollbackDurationSec);
-                subscription.resetCursor(timestamp).handle((s, ex) -> {
-                    if (ex != null) {
-                        log.warn("[{}] Failed to reset cursor {} position at timestamp {}", topic, subscriptionName,
-                                startMessageRollbackDurationSec);
-                    }
+            if (!subscriptionFuture.isDone()) {
+                if (startMessageRollbackDurationSec > 0) {
+                    long timestamp = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(startMessageRollbackDurationSec);
+                    subscription.resetCursor(timestamp).handle((s, ex) -> {
+                        if (ex != null) {
+                            log.warn("[{}] Failed to reset cursor {} position at timestamp {}", topic, subscriptionName,
+                                    startMessageRollbackDurationSec);
+                        }
+                        subscriptionFuture.complete(subscription);
+                        return null;
+                    });
+                } else {
                     subscriptionFuture.complete(subscription);
-                    return null;
-                });
+                }
             } else {
-                subscriptionFuture.complete(subscription);
+                // failed to initialize managed-cursor: clean up created subscription
+                subscriptions.remove(subscriptionName);
             }
-        } else {
-            // failed to initialize managed-cursor: clean up created subscription
-            subscriptions.remove(subscriptionName);
         }
 
         return subscriptionFuture;