You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Jean-Baptiste Onofré (Jira)" <ji...@apache.org> on 2023/02/02 13:37:00 UTC
[jira] [Assigned] (AMQ-9199) Race condition in creating store directory for new queues
[ https://issues.apache.org/jira/browse/AMQ-9199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jean-Baptiste Onofré reassigned AMQ-9199:
-----------------------------------------
Assignee: Jean-Baptiste Onofré
> Race condition in creating store directory for new queues
> ---------------------------------------------------------
>
> Key: AMQ-9199
> URL: https://issues.apache.org/jira/browse/AMQ-9199
> Project: ActiveMQ
> Issue Type: Bug
> Components: KahaDB
> Affects Versions: 5.16.5, 5.17.3
> Environment: * ActiveMQ 5.17.3
> * JBoss EAP 7.4
> * Amazon Linux 2
> Reporter: Michal Janczykowski
> Assignee: Jean-Baptiste Onofré
> Priority: Critical
> Time Spent: 10m
> Remaining Estimate: 0h
>
> After introducing dynamically created STOMP queues in our production system we've observed exceptions thrown from {{IOHelper.mkdirs()}} caused by a race condition in creating directories for a new STOMP queue. The exception is thrown during two different actions:
> # on processing the STOMP subscription action
> # on sending a persistent JMS message to any already existing queue, when {{store.size()}} is performed
>
> *Stack traces:*
> STOMP subscription (from ActiveMQ logs):
> {code:java}
> 2022-12-29 09:09:14,704 | ERROR | java.lang.RuntimeException: Failed to start per destination persistence adapter for destination: queue://STOMP.x-queue-userpn4ckpkl, (...)java.io.IOException: Failed to create directory '(...)/activemq/data/kahadb/queue#3a#2f#2fSTOMP.x-queue-userpn4ckpkl'
> at org.apache.activemq.util.IOHelper.mkdirs(IOHelper.java:331)
> at org.apache.activemq.store.kahadb.MessageDatabase.load(MessageDatabase.java:494)
> at org.apache.activemq.store.kahadb.MessageDatabase.doStart(MessageDatabase.java:309)
> at org.apache.activemq.store.kahadb.KahaDBStore.doStart(KahaDBStore.java:212)
> at org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55)
> at org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter.doStart(KahaDBPersistenceAdapter.java:232)
> at org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55)
> at org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter.startAdapter(MultiKahaDBPersistenceAdapter.java:211)
> at org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter.getMatchingPersistenceAdapter(MultiKahaDBPersistenceAdapter.java:204)
> at org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter.createQueueMessageStore(MultiKahaDBPersistenceAdapter.java:188)
> at org.apache.activemq.broker.region.DestinationFactoryImpl.createDestination(DestinationFactoryImpl.java:84)
> at org.apache.activemq.broker.region.AbstractRegion.createDestination(AbstractRegion.java:629)
> at org.apache.activemq.broker.jmx.ManagedQueueRegion.createDestination(ManagedQueueRegion.java:56)
> at org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:155)
> at org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:357)
> at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
> at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
> at org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:244)
> at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
> at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
> at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
> at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
> at org.apache.activemq.security.AuthorizationBroker.addDestination(AuthorizationBroker.java:118)
> at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
> at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
> at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
> at org.apache.activemq.broker.region.AbstractRegion.lookup(AbstractRegion.java:561)
> at org.apache.activemq.broker.region.AbstractRegion.addConsumer(AbstractRegion.java:346)
> at org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:436)
> at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConsumer(ManagedRegionBroker.java:243)
> at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:104)
> at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:104)
> at org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:131)
> at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:104)
> at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:104)
> at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:104)
> at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:104)
> at org.apache.activemq.security.AuthorizationBroker.addConsumer(AuthorizationBroker.java:183)
> at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:104)
> at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:104)
> at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:104)
> at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:705)
> at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:352)
> at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:335)
> at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:200)
> at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
> at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:302)
> at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:97)
> at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:179)
> at org.apache.activemq.transport.stomp.ProtocolConverter.onStompSubscribe(ProtocolConverter.java:679)
> at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:249)
> at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)
> at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
> at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:172)
> at org.apache.activemq.transport.stomp.StompSslTransportFactory$1$1.doConsume(StompSslTransportFactory.java:73)
> at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
> at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
> at java.base/java.lang.Thread.run(Thread.java:829)
> {code}
>
> JMS message send (from JBOSS running application):
> {code:java}
> 2023-01-12T09:57:47,360Z WARN [org.jboss.jca.core.connectionmanager.listener.TxConnectionListener] (...): javax.jms.JMSException: java.io.IOException: Failed to create directory '(...)/activemq/kahadb/queue#3a#2f#2fSTOMP.x-queue-useref42egyx'
> at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:56)
> at org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1973)
> at org.apache.activemq.ActiveMQConnection$3$1.run(ActiveMQConnection.java:1882)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: java.lang.RuntimeException: java.io.IOException: Failed to create directory '(...)/activemq/kahadb/queue#3a#2f#2fSTOMP.x-queue-useref42egyx'
> at org.apache.activemq.store.kahadb.KahaDBStore.size(KahaDBStore.java:1379)
> at org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter.size(KahaDBPersistenceAdapter.java:223)
> at org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter.size(MultiKahaDBPersistenceAdapter.java:366)
> at org.apache.activemq.usage.StoreUsage.retrieveUsage(StoreUsage.java:53)
> at org.apache.activemq.usage.Usage.caclPercentUsage(Usage.java:283)
> at org.apache.activemq.usage.Usage.isFull(Usage.java:126)
> at org.apache.activemq.broker.region.Queue.checkUsage(Queue.java:939)
> at org.apache.activemq.broker.region.Queue.doMessageSend(Queue.java:875)
> at org.apache.activemq.broker.region.Queue.send(Queue.java:756)
> at org.apache.activemq.broker.region.DestinationFilter.send(DestinationFilter.java:138)
> at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:511)
> at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:477)
> at org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:293)
> at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)
> at org.apache.activemq.broker.scheduler.SchedulerBroker.send(SchedulerBroker.java:320)
> at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)
> at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
> at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:295)
> at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)
> at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)
> at org.apache.activemq.security.AuthorizationBroker.send(AuthorizationBroker.java:226)
> at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)
> at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)
> at org.apache.activemq.plugin.StatisticsBroker.send(StatisticsBroker.java:229)
> at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)
> at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:580)
> at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:769)
> at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:335)
> at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:200)
> at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125)
> at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301)
> at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
> at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
> at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
> ... 1 more
> Caused by: java.io.IOException: Failed to create directory '(...)/activemq/kahadb/queue#3a#2f#2fSTOMP.x-queue-useref42egyx'
> at org.apache.activemq.util.IOHelper.mkdirs(IOHelper.java:331)
> at org.apache.activemq.store.kahadb.MessageDatabase.createPageFile(MessageDatabase.java:3381)
> at org.apache.activemq.store.kahadb.MessageDatabase.getPageFile(MessageDatabase.java:3556)
> at org.apache.activemq.store.kahadb.KahaDBStore.size(KahaDBStore.java:1377)
> ... 35 more {code}
>
> *Background*
> We had KahaDB configured with {{preallocationStrategy=os_kernel_copy}} and size set to 32MB which probably slowed things down. After disabling preallocation for STOMP queues, we no longer observe the issue, but it's just a matter of decreased probability.
>
> There's a pull request with a test case reproducing the issue and a fix proposal: [https://github.com/apache/activemq/pull/957]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)