You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Gary Tully (JIRA)" <ji...@apache.org> on 2009/02/25 23:56:00 UTC

[jira] Commented: (AMQ-2123) Intermittent Test failure: DuplexNetworkTest.testDurableStoreAndForward (org.apache.activemq.network) - java.lang.IllegalStateException: Message id ID:... could not be recovered from the data store - already dispatched

    [ https://issues.apache.org/activemq/browse/AMQ-2123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=49997#action_49997 ] 

Gary Tully commented on AMQ-2123:
---------------------------------

the problem relates to the number of consumers associated with a topic message send. If 10 consumers are present when the message is persisted, and 11 when the message is dispatched, the ack from the 11th will remove the message. redispatch to any of the others can then fail with the above exception.

With the AMQPersistenceAdapter, writes are batched, so the reference store is updated async. At the point of update, the reference store prepares the required acks for each subscriber in order to keep the message reference around till all subscribers have acked.
The problem arises when the consumer list is updated and another consumer (one that is not in the count that is persisted) gets the message. The set of subscribers used during dispatch is independent of the set persisted. This is a problem. The logic that sets up the acks based on the subscription list is at org.apache.activemq.store.kahadaptor.KahaTopicReferenceStore.addMessageReference(ConnectionContext, MessageId, ReferenceData)
One fix is to serialize dispatch with a flush to the store and with subscriber additions.I think this will lock up the dispatch logic quite a bit.

The logic in org.apache.activemq.store.kahadaptor.KahaTopicReferenceStore.acknowledgeReference(ConnectionContext, String, String, MessageId) can deal with no reference, the case where a message has not been persisted, but it cannot deal with the case of a persisted message and an additional subscriber. Adding the logic to not remove a reference if it is referenced from another subscription resolves the issue. 

> Intermittent Test failure: DuplexNetworkTest.testDurableStoreAndForward  (org.apache.activemq.network) - java.lang.IllegalStateException: Message id ID:... could not be recovered from the data store - already dispatched
> ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-2123
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2123
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.2.0
>         Environment: linux rh
>            Reporter: Gary Tully
>            Assignee: Gary Tully
>             Fix For: 5.3.0
>
>
> {code}
> DuplexNetworkTest.testDurableStoreAndForward  (org.apache.activemq.network)
>     javax.jms.JMSException: java.lang.RuntimeException: java.lang.IllegalStateException: Message id ID:pdrhas4_32-40202-1234575036513-14:8:1:1:3 could not be recovered from the data store - already dispatched
>     at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49)
>     at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1255)
>     Show details »
>     « Hide details
>     javax.jms.JMSException: java.lang.RuntimeException: java.lang.IllegalStateException: Message id ID:pdrhas4_32-40202-1234575036513-14:8:1:1:3 could not be recovered from the data store - already dispatched
>     at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49)
>     at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1255)
>     at org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:1805)
>     at org.apache.activemq.ActiveMQMessageConsumer.&init&(ActiveMQMessageConsumer.java:225)
>     at org.apache.activemq.ActiveMQTopicSubscriber.&init&(ActiveMQTopicSubscriber.java:117)
>     at org.apache.activemq.ActiveMQSession.createDurableSubscriber(ActiveMQSession.java:1207)
>     at org.apache.activemq.ActiveMQSession.createDurableSubscriber(ActiveMQSession.java:1152)
>     at org.apache.activemq.network.SimpleNetworkTest.testDurableStoreAndForward(SimpleNetworkTest.java:127)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>     at java.lang.reflect.Method.invoke(Method.java:585)
>     at junit.framework.TestCase.runTest(TestCase.java:154)
>     at junit.framework.TestCase.runBare(TestCase.java:127)
>     at junit.framework.TestResult$1.protect(TestResult.java:106)
>     at junit.framework.TestResult.runProtected(TestResult.java:124)
>     at junit.framework.TestResult.run(TestResult.java:109)
>     at junit.framework.TestCase.run(TestCase.java:118)
>     at junit.framework.TestSuite.runTest(TestSuite.java:208)
>     at junit.framework.TestSuite.run(TestSuite.java:203)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>     at java.lang.reflect.Method.invoke(Method.java:585)
>     at org.apache.maven.surefire.junit.JUnitTestSet.execute(JUnitTestSet.java:210)
>     at org.apache.maven.surefire.suite.AbstractDirectoryTestSuite.executeTestSet(AbstractDirectoryTestSuite.java:135)
>     at org.apache.maven.surefire.suite.AbstractDirectoryTestSuite.execute(AbstractDirectoryTestSuite.java:160)
>     at org.apache.maven.surefire.Surefire.run(Surefire.java:81)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>     at java.lang.reflect.Method.invoke(Method.java:585)
>     at org.apache.maven.surefire.booter.SurefireBooter.runSuitesInProcess(SurefireBooter.java:182)
>     at org.apache.maven.surefire.booter.SurefireBooter.main(SurefireBooter.java:743)
>     Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.IllegalStateException: Message id ID:pdrhas4_32-40202-1234575036513-14:8:1:1:3 could not be recovered from the data store - already dispatched
>     at org.apache.activemq.broker.region.cursors.AbstractStoreCursor.reset(AbstractStoreCursor.java:104)
>     at org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor.reset(StoreDurableSubscriberCursor.java:225)
>     at org.apache.activemq.broker.region.PrefetchSubscription.dispatchPending(PrefetchSubscription.java:560)
>     at org.apache.activemq.broker.region.DurableTopicSubscription.activate(DurableTopicSubscription.java:130)
>     at org.apache.activemq.broker.region.TopicRegion.addConsumer(TopicRegion.java:105)
>     at org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:376)
>     at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>     at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>     at org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:83)
>     at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>     at org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:93)
>     at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:546)
>     at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:349)
>     at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308)
>     at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:182)
>     at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:104)
>     at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
>     at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:204)
>     at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
>     at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
>     at java.lang.Thread.run(Thread.java:595)
>     Caused by: java.lang.RuntimeException: java.lang.IllegalStateException: Message id ID:pdrhas4_32-40202-1234575036513-14:8:1:1:3 could not be recovered from the data store - already dispatched
>     at org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:239)
>     at org.apache.activemq.broker.region.cursors.AbstractStoreCursor.reset(AbstractStoreCursor.java:101)
>     ... 22 more
>     Caused by: java.lang.IllegalStateException: Message id ID:pdrhas4_32-40202-1234575036513-14:8:1:1:3 could not be recovered from the data store - already dispatched
>     at org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:58)
>     at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82)
>     at org.apache.activemq.store.kahadaptor.KahaTopicReferenceStore.recoverNextMessages(KahaTopicReferenceStore.java:262)
>     at org.apache.activemq.store.amq.AMQTopicMessageStore.recoverNextMessages(AMQTopicMessageStore.java:59)
>     at org.apache.activemq.broker.region.cursors.TopicStorePrefetch.doFillBatch(TopicStorePrefetch.java:91)
>     at org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:236)
>     ... 23 more
>     « Hide details 
> {code}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.