You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by jasons <ja...@au.abb.com> on 2017/12/05 08:04:29 UTC
Re: ActiveMQ losing durable topic messages when
JournalledJDBCPersistence enabled
Hi Tim, we managed to attach a debugger as requested and the path you
identified does seem to be the one taken by the incorrect update to
ACTIVEMQ_ACKS table in respect of DC1. We took several stack dumps (text
below) at the different debug points you requested in case that also helps.
Note that the code passed through the OracleJDBCAdapter.doSetLastAck method
twice, the first time updating LAST_ACKED_ID for DC1 and the second time, as
you'd sort of expected, updating the LAST_ACKED_ID for DC2 (correct as this
consumer was consuming and the selector matched). We also used the debugger
to inspect the value of bind params on the UPDATE statement each time to
confirm which DC1 was being updated in which order. We further used SQL
developer to identify when the change had occurred. Based on your earlier
comment, I guess we move to creating a JIRA which I'll now do. Never having
done this I assume I just bring the history of this forum topic to the JIRA
background. Let me know please if anything else to do on our end.
Cheers
Jason.
Here are the stack traces in the order they fired and were captured from the
debug session for the thread hitting the breakpoint:
1st BP:
Daemon Thread [ActiveMQ Transport: tcp:///127.0.0.1:35880@61616] (Suspended
(entry into method unmatched in DurableTopicSubscription))
owns: Topic (id=72)
DurableTopicSubscription.unmatched(MessageReference) line: 98
SimpleDispatchPolicy.dispatch(MessageReference, MessageEvaluationContext,
List<Subscription>) line: 44
Topic.dispatch(ConnectionContext, Message) line: 769
Topic.doMessageSend(ProducerBrokerExchange, Message) line: 554
Topic.send(ProducerBrokerExchange, Message) line: 482
ManagedTopicRegion(AbstractRegion).send(ProducerBrokerExchange, Message)
line: 503
ManagedRegionBroker(RegionBroker).send(ProducerBrokerExchange, Message)
line: 468
ManagedRegionBroker.send(ProducerBrokerExchange, Message) line: 293
SchedulerBroker(BrokerFilter).send(ProducerBrokerExchange, Message) line:
153
SchedulerBroker.send(ProducerBrokerExchange, Message) line: 195
AdvisoryBroker(BrokerFilter).send(ProducerBrokerExchange, Message) line:
153
CompositeDestinationBroker.send(ProducerBrokerExchange, Message) line: 96
TransactionBroker.send(ProducerBrokerExchange, Message) line: 293
RedeliveryPlugin(MutableBrokerFilter).send(ProducerBrokerExchange, Message)
line: 158
LoggingBrokerPlugin(MutableBrokerFilter).send(ProducerBrokerExchange,
Message) line: 158
LoggingBrokerPlugin.send(ProducerBrokerExchange, Message) line: 275
BrokerService$6(MutableBrokerFilter).send(ProducerBrokerExchange, Message)
line: 158
ManagedTransportConnection(TransportConnection).processMessage(Message)
line: 581
ActiveMQTextMessage(ActiveMQMessage).visit(CommandVisitor) line: 768
ManagedTransportConnection(TransportConnection).service(Command) line: 336
TransportConnection$1.onCommand(Object) line: 200
MutexTransport.onCommand(Object) line: 50
WireFormatNegotiator.onCommand(Object) line: 125
InactivityMonitor(AbstractInactivityMonitor).onCommand(Object) line: 301
TcpTransport(TransportSupport).doConsume(Object) line: 83
TcpTransport.doRun() line: 233
TcpTransport.run() line: 215
Thread.run() line: 745
2nd BP
Daemon Thread [ActiveMQ Transport: tcp:///127.0.0.1:35875@61616] (Suspended
(breakpoint at line 325 in DurableTopicSubscription))
owns: Object (id=130)
DurableTopicSubscription.acknowledge(ConnectionContext, MessageAck,
MessageReference) line: 325
DurableTopicSubscription(PrefetchSubscription).acknowledge(ConnectionContext,
MessageAck) line: 233
ManagedTopicRegion(AbstractRegion).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 526
ManagedRegionBroker(RegionBroker).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 484
SchedulerBroker(BrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 88
AdvisoryBroker(BrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 88
CompositeDestinationBroker(BrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 88
TransactionBroker.acknowledge(ConsumerBrokerExchange, MessageAck) line: 276
RedeliveryPlugin(MutableBrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 98
LoggingBrokerPlugin(MutableBrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 98
LoggingBrokerPlugin.acknowledge(ConsumerBrokerExchange, MessageAck) line:
162
BrokerService$6(MutableBrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 98
ManagedTransportConnection(TransportConnection).processMessageAck(MessageAck)
line: 590
MessageAck.visit(CommandVisitor) line: 245
ManagedTransportConnection(TransportConnection).service(Command) line: 336
TransportConnection$1.onCommand(Object) line: 200
MutexTransport.onCommand(Object) line: 50
WireFormatNegotiator.onCommand(Object) line: 125
InactivityMonitor(AbstractInactivityMonitor).onCommand(Object) line: 301
TcpTransport(TransportSupport).doConsume(Object) line: 83
TcpTransport.doRun() line: 233
TcpTransport.run() line: 215
Thread.run() line: 745
3rd BP:
Daemon Thread [Journal checkpoint worker] (Suspended)
OracleJDBCAdapter(DefaultJDBCAdapter).doSetLastAck(TransactionContext,
ActiveMQDestination, XATransactionId, String, String, long, long) line: 526
JDBCTopicMessageStore.acknowledge(ConnectionContext, String, String,
MessageId, MessageAck) line: 86
JournalTopicMessageStore$2.execute() line: 191
JournalMessageStore$3.execute() line: 306
TransactionTemplate.run(Callback) line: 44
JournalTopicMessageStore(JournalMessageStore).checkpoint(Callback) line:
262
JournalTopicMessageStore.checkpoint() line: 182
JournalPersistenceAdapter$5.call() line: 455
JournalPersistenceAdapter$5.call() line: 452
FutureTask<V>.run() line: 262
ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1145
ThreadPoolExecutor$Worker.run() line: 615
Thread.run() line: 745
--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html
Re: ActiveMQ losing durable topic messages when JournalledJDBCPersistence
enabled
Posted by Tim Bain <tb...@alumni.duke.edu>.
Jason,
Thanks for the very thorough investigation; it's very much appreciated.
Yes, when you submit the bug in JIRA (you'll need to sign up for a JIRA
account), please describe the bug in enough detail that hopefully an
implementor will be able to understand the problem without referring back
to this thread, but also add a link to the thread (on Nabble.com) in case
there were questions. Note that I've recently had poor results using the
link embedded at the bottom of emails, so please make sure that the link
actually loads this specific thread.
Thanks again for all that you've done.
Tim
On Dec 5, 2017 1:04 AM, "jasons" <ja...@au.abb.com> wrote:
> Hi Tim, we managed to attach a debugger as requested and the path you
> identified does seem to be the one taken by the incorrect update to
> ACTIVEMQ_ACKS table in respect of DC1. We took several stack dumps (text
> below) at the different debug points you requested in case that also helps.
> Note that the code passed through the OracleJDBCAdapter.doSetLastAck method
> twice, the first time updating LAST_ACKED_ID for DC1 and the second time,
> as
> you'd sort of expected, updating the LAST_ACKED_ID for DC2 (correct as this
> consumer was consuming and the selector matched). We also used the
> debugger
> to inspect the value of bind params on the UPDATE statement each time to
> confirm which DC1 was being updated in which order. We further used SQL
> developer to identify when the change had occurred. Based on your earlier
> comment, I guess we move to creating a JIRA which I'll now do. Never
> having
> done this I assume I just bring the history of this forum topic to the JIRA
> background. Let me know please if anything else to do on our end.
>
> Cheers
>
> Jason.
>
>
> Here are the stack traces in the order they fired and were captured from
> the
> debug session for the thread hitting the breakpoint:
>
> 1st BP:
>
> Daemon Thread [ActiveMQ Transport: tcp:///127.0.0.1:35880@61616]
> (Suspended
> (entry into method unmatched in DurableTopicSubscription))
> owns: Topic (id=72)
> DurableTopicSubscription.unmatched(MessageReference) line: 98
> SimpleDispatchPolicy.dispatch(MessageReference,
> MessageEvaluationContext,
> List<Subscription>) line: 44
> Topic.dispatch(ConnectionContext, Message) line: 769
> Topic.doMessageSend(ProducerBrokerExchange, Message) line: 554
> Topic.send(ProducerBrokerExchange, Message) line: 482
> ManagedTopicRegion(AbstractRegion).send(ProducerBrokerExchange,
> Message)
> line: 503
> ManagedRegionBroker(RegionBroker).send(ProducerBrokerExchange,
> Message)
> line: 468
> ManagedRegionBroker.send(ProducerBrokerExchange, Message) line:
> 293
> SchedulerBroker(BrokerFilter).send(ProducerBrokerExchange,
> Message) line:
> 153
> SchedulerBroker.send(ProducerBrokerExchange, Message) line: 195
> AdvisoryBroker(BrokerFilter).send(ProducerBrokerExchange,
> Message) line:
> 153
> CompositeDestinationBroker.send(ProducerBrokerExchange, Message)
> line: 96
> TransactionBroker.send(ProducerBrokerExchange, Message) line: 293
> RedeliveryPlugin(MutableBrokerFilter).send(ProducerBrokerExchange,
> Message)
> line: 158
> LoggingBrokerPlugin(MutableBrokerFilter).send(
> ProducerBrokerExchange,
> Message) line: 158
> LoggingBrokerPlugin.send(ProducerBrokerExchange, Message) line:
> 275
> BrokerService$6(MutableBrokerFilter).send(ProducerBrokerExchange,
> Message)
> line: 158
> ManagedTransportConnection(TransportConnection).
> processMessage(Message)
> line: 581
> ActiveMQTextMessage(ActiveMQMessage).visit(CommandVisitor) line:
> 768
> ManagedTransportConnection(TransportConnection).service(Command)
> line: 336
> TransportConnection$1.onCommand(Object) line: 200
> MutexTransport.onCommand(Object) line: 50
> WireFormatNegotiator.onCommand(Object) line: 125
> InactivityMonitor(AbstractInactivityMonitor).onCommand(Object)
> line: 301
> TcpTransport(TransportSupport).doConsume(Object) line: 83
> TcpTransport.doRun() line: 233
> TcpTransport.run() line: 215
> Thread.run() line: 745
>
> 2nd BP
>
> Daemon Thread [ActiveMQ Transport: tcp:///127.0.0.1:35875@61616]
> (Suspended
> (breakpoint at line 325 in DurableTopicSubscription))
> owns: Object (id=130)
> DurableTopicSubscription.acknowledge(ConnectionContext,
> MessageAck,
> MessageReference) line: 325
>
> DurableTopicSubscription(PrefetchSubscription).
> acknowledge(ConnectionContext,
> MessageAck) line: 233
> ManagedTopicRegion(AbstractRegion).acknowledge(
> ConsumerBrokerExchange,
> MessageAck) line: 526
> ManagedRegionBroker(RegionBroker).acknowledge(
> ConsumerBrokerExchange,
> MessageAck) line: 484
> SchedulerBroker(BrokerFilter).acknowledge(ConsumerBrokerExchange,
> MessageAck) line: 88
> AdvisoryBroker(BrokerFilter).acknowledge(ConsumerBrokerExchange,
> MessageAck) line: 88
>
> CompositeDestinationBroker(BrokerFilter).acknowledge(
> ConsumerBrokerExchange,
> MessageAck) line: 88
> TransactionBroker.acknowledge(ConsumerBrokerExchange, MessageAck)
> line: 276
> RedeliveryPlugin(MutableBrokerFilter).acknowledge(
> ConsumerBrokerExchange,
> MessageAck) line: 98
>
> LoggingBrokerPlugin(MutableBrokerFilter).acknowledge(
> ConsumerBrokerExchange,
> MessageAck) line: 98
> LoggingBrokerPlugin.acknowledge(ConsumerBrokerExchange,
> MessageAck) line:
> 162
> BrokerService$6(MutableBrokerFilter).acknowledge(
> ConsumerBrokerExchange,
> MessageAck) line: 98
>
> ManagedTransportConnection(TransportConnection).
> processMessageAck(MessageAck)
> line: 590
> MessageAck.visit(CommandVisitor) line: 245
> ManagedTransportConnection(TransportConnection).service(Command)
> line: 336
> TransportConnection$1.onCommand(Object) line: 200
> MutexTransport.onCommand(Object) line: 50
> WireFormatNegotiator.onCommand(Object) line: 125
> InactivityMonitor(AbstractInactivityMonitor).onCommand(Object)
> line: 301
> TcpTransport(TransportSupport).doConsume(Object) line: 83
> TcpTransport.doRun() line: 233
> TcpTransport.run() line: 215
> Thread.run() line: 745
>
>
> 3rd BP:
>
> Daemon Thread [Journal checkpoint worker] (Suspended)
> OracleJDBCAdapter(DefaultJDBCAdapter).doSetLastAck(
> TransactionContext,
> ActiveMQDestination, XATransactionId, String, String, long, long) line: 526
> JDBCTopicMessageStore.acknowledge(ConnectionContext, String,
> String,
> MessageId, MessageAck) line: 86
> JournalTopicMessageStore$2.execute() line: 191
> JournalMessageStore$3.execute() line: 306
> TransactionTemplate.run(Callback) line: 44
> JournalTopicMessageStore(JournalMessageStore).checkpoint(Callback)
> line:
> 262
> JournalTopicMessageStore.checkpoint() line: 182
> JournalPersistenceAdapter$5.call() line: 455
> JournalPersistenceAdapter$5.call() line: 452
> FutureTask<V>.run() line: 262
> ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1145
> ThreadPoolExecutor$Worker.run() line: 615
> Thread.run() line: 745
>
>
>
>
>
>
>
>
> --
> Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-
> f2341805.html
>