You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/12/09 11:54:51 UTC

[GitHub] [pulsar] massakam opened a new pull request #8877: [broker] Copy list of subscriptions when checking for message expiration to avoid deadlocks

massakam opened a new pull request #8877:
URL: https://github.com/apache/pulsar/pull/8877


   ### Motivation
   
   Some of our broker servers experienced what appears to be a deadlock. The following is the thread dump at that time.
   
   [threaddump.txt.zip](https://github.com/apache/pulsar/files/5665572/threaddump.txt.zip)
   
   The thread "ForkJoinPool.commonPool-worker-120" was locking an instance of `ManagedLedgerImpl`. And this thread seemed to be waiting for `subscriptions`, which is an instance of `ConcurrentOpenHashMap`, to be unlocked. Many other threads were blocked because the lock on the `ManagedLedgerImpl` instance was not released.
   
   ```
   "ForkJoinPool.commonPool-worker-120" #903 daemon prio=5 os_prio=0 tid=0x00007f9aa0010000 nid=0x12b59 waiting on condition [0x00007f9528cc3000]
      java.lang.Thread.State: WAITING (parking)
           at sun.misc.Unsafe.park(Native Method)
           - parking to wait for  <0x00007fa20b3e5eb0> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
           at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
           at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
           at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
           at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
           at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:650)
           at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:720)
           - locked <0x00007fa20512f968> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
           at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:643)
           at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:590)
           at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
           at org.apache.pulsar.broker.service.ServerCnx$$Lambda$476/1880414247.apply(Unknown Source)
           at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
           at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
           at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
           at org.apache.pulsar.broker.service.ServerCnx$$Lambda$475/707554512.apply(Unknown Source)
           at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
           at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
           at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
           at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
           at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
           at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
           at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
           at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
           at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
           at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
   ```
   
   The thread that locked `subscriptions` seems to be "pulsar-msg-expiry-monitor-24-1".
   ```
   "pulsar-msg-expiry-monitor-24-1" #304 prio=5 os_prio=0 tid=0x00007f99602dd000 nid=0x12036 waiting on condition [0x00007f998d47c000]
      java.lang.Thread.State: TIMED_WAITING (parking)
           at sun.misc.Unsafe.park(Native Method)
           - parking to wait for  <0x00007fca4361dfb0> (a java.util.concurrent.CountDownLatch$Sync)
           at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
           at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
           at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
           at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
           at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.getNthEntry(ManagedCursorImpl.java:537)
           at org.apache.pulsar.broker.service.persistent.PersistentTopic.isOldestMessageExpired(PersistentTopic.java:1820)
           at org.apache.pulsar.broker.service.persistent.PersistentSubscription.expireMessages(PersistentSubscription.java:901)
           at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$checkMessageExpiry$36(PersistentTopic.java:1102)
           at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$1011/2104832020.accept(Unknown Source)
           at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
           at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
           at org.apache.pulsar.broker.service.persistent.PersistentTopic.checkMessageExpiry(PersistentTopic.java:1102)
           at org.apache.pulsar.broker.service.BrokerService$$Lambda$1009/2005752676.accept(Unknown Source)
           at org.apache.pulsar.broker.service.BrokerService.lambda$forEachTopic$32(BrokerService.java:951)
           at org.apache.pulsar.broker.service.BrokerService$$Lambda$779/1852910990.accept(Unknown Source)
           at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
           at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
           at org.apache.pulsar.broker.service.BrokerService.forEachTopic(BrokerService.java:948)
           at org.apache.pulsar.broker.service.BrokerService.checkMessageExpiry(BrokerService.java:925)
           at org.apache.pulsar.broker.service.BrokerService$$Lambda$108/203149502.run(Unknown Source)
           at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
           at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
           at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
           at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
           at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
           at java.lang.Thread.run(Thread.java:748)
   ```
   
   I didn't understand why "pulsar-msg-expiry-monitor-24-1" was stopped. However, it seems that this deadlock can be avoided if `subscriptions` is not locked when checking for message expiration, so I created this PR. If anyone understands why "pulsar-msg-expiry-monitor-24-1" was stopped, please let me know.
   
   ### Modifications
   
   When expiring messages for each subscription, copy the values of `subscriptions` as `List` and execute `forEach()` for that `List` instance.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie merged pull request #8877: [broker] Copy list of subscriptions when checking for message expiration to avoid deadlocks

Posted by GitBox <gi...@apache.org>.
sijie merged pull request #8877:
URL: https://github.com/apache/pulsar/pull/8877


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org