You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "lhotari (via GitHub)" <gi...@apache.org> on 2024/03/26 10:29:13 UTC

[PR] [fix][misc] Make ConcurrentBitSet thread safe [pulsar]

lhotari opened a new pull request, #22361:
URL: https://github.com/apache/pulsar/pull/22361

   Fixes #22360
   
   ### Motivation
   
   ConcurrentBitSet isn't thread safe although it claims this in the javadoc. This is causing concurrency issues in Pulsar. See #22360 for examples.
   
   ### Modifications
   
   - Fix usage of StampedLock in ConcurrentBitSet
     - modifications must use the write lock
     - read operations must use the read lock
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][misc] Make ConcurrentBitSet thread safe [pulsar]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari merged PR #22361:
URL: https://github.com/apache/pulsar/pull/22361


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][misc] Make ConcurrentBitSet thread safe [pulsar]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on PR #22361:
URL: https://github.com/apache/pulsar/pull/22361#issuecomment-2020210772

   For resolving the deadlock, I removed the locks from the `length()` method and documented this. 
   There aren't currently external calls to `length()` so I think that this is a reasonable solution.
   Replacing `StampedLock` with `ReentrantReadWriteLock` would have been the other option but since this is code that is part of Pulsar's performance hot spots, it could cause performance regressions. That's why I choose to keep StampedLock and just leave the `length()` method without an explicit lock. It will get called in other methods currently and that is fine as long as length isn't called externally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][misc] Make ConcurrentBitSet thread safe [pulsar]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on PR #22361:
URL: https://github.com/apache/pulsar/pull/22361#issuecomment-2020155347

   StampedLock isn't a reentrant lock. There are dead locks with stack traces such as
   ```
   "pulsar-io-8-4" #109 prio=5 os_prio=31 cpu=59.46ms elapsed=14.23s tid=0x0000000158d1a600 nid=0x12307 waiting on condition  [0x000000058bd60000]
      java.lang.Thread.State: WAITING (parking)
   	at jdk.internal.misc.Unsafe.park(java.base@17.0.10/Native Method)
   	- parking to wait for  <0x000020001d61b2a0> (a java.util.concurrent.locks.StampedLock)
   	at java.util.concurrent.locks.LockSupport.park(java.base@17.0.10/LockSupport.java:211)
   	at java.util.concurrent.locks.StampedLock.acquireRead(java.base@17.0.10/StampedLock.java:1378)
   	at java.util.concurrent.locks.StampedLock.readLock(java.base@17.0.10/StampedLock.java:554)
   	at org.apache.pulsar.common.util.collections.ConcurrentBitSet.length(ConcurrentBitSet.java:299)
   	at java.util.BitSet.previousSetBit(java.base@17.0.10/BitSet.java:799)
   	at org.apache.pulsar.common.util.collections.ConcurrentBitSet.previousSetBit(ConcurrentBitSet.java:141)
   	at org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet.lambda$forEachRawRange$4(ConcurrentOpenLongPairRangeSet.java:219)
   	at org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet$$Lambda$1521/0x0000007000b627d0.accept(Unknown Source)
   	at java.util.concurrent.ConcurrentSkipListMap.forEach(java.base@17.0.10/ConcurrentSkipListMap.java:3030)
   	at org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet.forEachRawRange(ConcurrentOpenLongPairRangeSet.java:211)
   	at org.apache.bookkeeper.mledger.impl.RangeSetWrapper.forEachRawRange(RangeSetWrapper.java:126)
   	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.buildIndividualDeletedMessageRanges(ManagedCursorImpl.java:3022)
   	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.persistPositionToLedger(ManagedCursorImpl.java:3088)
   	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.internalMarkDelete(ManagedCursorImpl.java:2185)
   	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.internalAsyncMarkDelete(ManagedCursorImpl.java:2066)
   	- locked <0x000020002de7a600> (a java.util.ArrayDeque)
   	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncDelete(ManagedCursorImpl.java:2392)
   	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.acknowledgeMessage(PersistentSubscription.java:391)
   	at org.apache.pulsar.broker.service.Consumer.individualAckNormal(Consumer.java:535)
   	at org.apache.pulsar.broker.service.Consumer.messageAcked(Consumer.java:485)
   	at org.apache.pulsar.broker.service.ServerCnx.handleAck(ServerCnx.java:1898)
   	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:160)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
   	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:202)
   	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:164)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
   	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
   	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
   	at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
   	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
   	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
   	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
   	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
   	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
   	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
   	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
   	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
   	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
   	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
   	at java.lang.Thread.run(java.base@17.0.10/Thread.java:840)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org