You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by Gary Tully <ga...@gmail.com> on 2009/11/03 10:51:55 UTC

Re: Broker get stuck with high amount of persistent messages for a given destination

can you verify that you see the problem with the latest snapshot:
https://repository.apache.org/content/repositories/snapshots/org/apache/activemq/apache-activemq/5.4-SNAPSHOT/


2009/11/3 Manuel Teira <mt...@tid.es>:
> Hello again.
> Let me insist on this problem, since it's hitting our production environment
> every other day. Anytime that, for whatever reason, one of the system queues
> goes over ~60.000 pending persistent messages, we are not able to restart it
> properly, suffering the problems I've tried to detail in the former message.
> Could you be so kind to take a look on it? If you need any further
> information, please, make me know.
>
> Best regards.
>
>
> Manuel Teira escribió:
>>
>> Hello.
>>
>> We are embedding an activemq 4.2 broker in our application, running on a
>> Sun JVM 5, and using an Oracle 10g database as persistent messaging store
>> (without journal, since we are using a JDBC master-slave cluster).
>> In a pair of occasions, we were unable to get the broker started in our
>> production environment. Analyzing the situation, it seems that the cause was
>> the great amount of persistent messages for a given queue (over 75.000),
>> what was blocking the attempt to create consumers on that destination.
>>
>> Trying to reproduce the problem with a more up to date activemq version,
>> we set up a 5.2.0 standalone broker to resemble our scenario, and a client
>> that tries to create a set of consumers on the loaded queue. We exported the
>> table with messages from our production environment and filled the test
>> scenario database with it. I'm attaching both the activemq configuration and
>> the client used for testing.
>>
>> What we found was:
>>
>> The broker started normally. Since no attempt to consume messages from the
>> "loaded" destination was performed.
>> Once the client started, we observed that:
>>  - We tried to start 10 consumers on the queue.  Only one of those
>> consumers got the session.createConsumer(), others were blocked (note that
>> all of them are sharing the same connection)
>>  - The only started consumer was unable to get any message, it was blocked
>> in the getMessage() attempt.
>>  - The broker seems to be trying to load all the messages from the
>> database, showing the following  stack dump (just the involved threads):
>>
>> "ActiveMQ Transport: tcp:///127.0.0.1:33554" daemon prio=4 tid=0x00ac2ef8
>> nid=0x3c waiting on condition [0xaae7e000..0xaae7fbf0]
>>        at sun.misc.Unsafe.park(Native Method)
>>        at
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:118)
>>        at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:681)
>>        at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:711)
>>        at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1041)
>>        at
>> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:184)
>>        at
>> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:256)
>>        at
>> org.apache.activemq.broker.region.Queue.addSubscription(Queue.java:217)
>>        at
>> org.apache.activemq.broker.region.AbstractRegion.addConsumer(AbstractRegion.java:275)
>>        - locked <0xe35bbb40> (a java.lang.Object)
>>        at
>> org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:372)
>>        at
>> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>>        at
>> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>>        at
>> org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:83)
>>        at
>> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>>        at
>> org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:93)
>>        at
>> org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:541)
>>        at
>> org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:345)
>>        at
>> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
>>        at
>> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
>>        at
>> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
>>        at
>> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
>>        at
>> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
>>        - locked <0xb8a0d5d0> (a
>> org.apache.activemq.transport.InactivityMonitor$1)
>>        at
>> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
>>        at
>> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
>>        at
>> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
>>        at java.lang.Thread.run(Thread.java:595)
>>
>> "QueueThread:queue://TaskManagerQueue" daemon prio=10 tid=0x005e1bd8
>> nid=0x42 runnable [0xaac7e000..0xaac7faf0]
>>        at oracle.jdbc.driver.T2CStatement.t2cDefineFetch(Native Method)
>>        at
>> oracle.jdbc.driver.T2CPreparedStatement.doDefineFetch(T2CPreparedStatement.java:827)
>>        at
>> oracle.jdbc.driver.T2CPreparedStatement.executeForRows(T2CPreparedStatement.java:768)
>>        at
>> oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:1062)
>>        at
>> oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1132)
>>        at
>> oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3285)
>>        at
>> oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3329)
>>        - locked <0xe37a09a0> (a oracle.jdbc.driver.T2CPreparedStatement)
>>        - locked <0xe379c398> (a oracle.jdbc.driver.T2CConnection)
>>        at
>> org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:91)
>>        at
>> org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:91)
>>        at
>> org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doRecoverNextMessages(DefaultJDBCAdapter.java:709)
>>        at
>> org.apache.activemq.store.jdbc.JDBCMessageStore.recoverNextMessages(JDBCMessageStore.java:230)
>>        at
>> org.apache.activemq.store.ProxyMessageStore.recoverNextMessages(ProxyMessageStore.java:83)
>>        at
>> org.apache.activemq.broker.region.cursors.QueueStorePrefetch.doFillBatch(QueueStorePrefetch.java:75)
>>        at
>> org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:227)
>>        - locked <0xb8a12f30> (a
>> org.apache.activemq.broker.region.cursors.QueueStorePrefetch)
>>        at
>> org.apache.activemq.broker.region.cursors.AbstractStoreCursor.reset(AbstractStoreCursor.java:100)
>>        at
>> org.apache.activemq.broker.region.cursors.StoreQueueCursor.reset(StoreQueueCursor.java:157)
>>        - locked <0xb8a13648> (a
>> org.apache.activemq.broker.region.cursors.StoreQueueCursor)
>>        at
>> org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1179)
>>        - locked <0xb8a13648> (a
>> org.apache.activemq.broker.region.cursors.StoreQueueCursor)
>>        at
>> org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:1308)
>>        at org.apache.activemq.broker.region.Queue.iterate(Queue.java:1011)
>>        - locked <0xb8a13748> (a org.apache.activemq.broker.region.Queue$2)
>>        at
>> org.apache.activemq.thread.DeterministicTaskRunner.runTask(DeterministicTaskRunner.java:84)
>>        at
>> org.apache.activemq.thread.DeterministicTaskRunner$1.run(DeterministicTaskRunner.java:41)
>>        at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
>>        at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
>>        at java.lang.Thread.run(Thread.java:595)
>>
>> We waited for 9 or 10 hours without apparent changes: Only one consumer
>> got, the other threads waiting, and no message consumed.
>> In the production environment, the only way to recover the situation we
>> found, was to move message from ACTIVEMQ_MSGS to another table, start the
>> broker with ACTIVEMQ_MSGS empty, and create an application to deserialize
>> messages from the table and inject them to the queue using the JMS API.
>>
>> We consider that over 10 hours of service unavailability is a serious
>> problem. The problem seems to be related with the broker trying to get all
>> the messages from the table at once whenever a consumer for a given
>> destination is created. Is this a known problem? Is there any way to improve
>> the situation?
>>
>> Best regards.
>>
>>
>> --
>> Manuel.
>>
>>
>
>



-- 
http://blog.garytully.com

Open Source Integration
http://fusesource.com