You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by jasons <ja...@au.abb.com> on 2017/12/05 08:04:29 UTC

Re: ActiveMQ losing durable topic messages when JournalledJDBCPersistence enabled

Hi Tim, we managed to attach a debugger as requested and the path you
identified does seem to be the one taken by the incorrect update to
ACTIVEMQ_ACKS table in respect of DC1.  We took several stack dumps (text
below) at the different debug points you requested in case that also helps.  
Note that the code passed through the OracleJDBCAdapter.doSetLastAck method
twice, the first time updating LAST_ACKED_ID for DC1 and the second time, as
you'd sort of expected, updating the LAST_ACKED_ID for DC2 (correct as this
consumer was consuming and the selector matched).  We also used the debugger
to inspect the value of bind params on the UPDATE statement each time to
confirm which DC1 was being updated in which order.  We further used SQL
developer to identify when the change had occurred.   Based on your earlier
comment, I guess we move to creating a JIRA which I'll now do.  Never having
done this I assume I just bring the history of this forum topic to the JIRA
background.  Let me know please if anything else to do on our end.

Cheers

Jason.


Here are the stack traces in the order they fired and were captured from the
debug session for the thread hitting the breakpoint: 

1st BP:

Daemon Thread [ActiveMQ Transport: tcp:///127.0.0.1:35880@61616] (Suspended
(entry into method unmatched in DurableTopicSubscription))	
	owns: Topic  (id=72)	
	DurableTopicSubscription.unmatched(MessageReference) line: 98	
	SimpleDispatchPolicy.dispatch(MessageReference, MessageEvaluationContext,
List<Subscription>) line: 44	
	Topic.dispatch(ConnectionContext, Message) line: 769	
	Topic.doMessageSend(ProducerBrokerExchange, Message) line: 554	
	Topic.send(ProducerBrokerExchange, Message) line: 482	
	ManagedTopicRegion(AbstractRegion).send(ProducerBrokerExchange, Message)
line: 503	
	ManagedRegionBroker(RegionBroker).send(ProducerBrokerExchange, Message)
line: 468	
	ManagedRegionBroker.send(ProducerBrokerExchange, Message) line: 293	
	SchedulerBroker(BrokerFilter).send(ProducerBrokerExchange, Message) line:
153	
	SchedulerBroker.send(ProducerBrokerExchange, Message) line: 195	
	AdvisoryBroker(BrokerFilter).send(ProducerBrokerExchange, Message) line:
153	
	CompositeDestinationBroker.send(ProducerBrokerExchange, Message) line: 96	
	TransactionBroker.send(ProducerBrokerExchange, Message) line: 293	
	RedeliveryPlugin(MutableBrokerFilter).send(ProducerBrokerExchange, Message)
line: 158	
	LoggingBrokerPlugin(MutableBrokerFilter).send(ProducerBrokerExchange,
Message) line: 158	
	LoggingBrokerPlugin.send(ProducerBrokerExchange, Message) line: 275	
	BrokerService$6(MutableBrokerFilter).send(ProducerBrokerExchange, Message)
line: 158	
	ManagedTransportConnection(TransportConnection).processMessage(Message)
line: 581	
	ActiveMQTextMessage(ActiveMQMessage).visit(CommandVisitor) line: 768	
	ManagedTransportConnection(TransportConnection).service(Command) line: 336	
	TransportConnection$1.onCommand(Object) line: 200	
	MutexTransport.onCommand(Object) line: 50	
	WireFormatNegotiator.onCommand(Object) line: 125	
	InactivityMonitor(AbstractInactivityMonitor).onCommand(Object) line: 301	
	TcpTransport(TransportSupport).doConsume(Object) line: 83	
	TcpTransport.doRun() line: 233	
	TcpTransport.run() line: 215	
	Thread.run() line: 745	

2nd BP

Daemon Thread [ActiveMQ Transport: tcp:///127.0.0.1:35875@61616] (Suspended
(breakpoint at line 325 in DurableTopicSubscription))	
	owns: Object  (id=130)	
	DurableTopicSubscription.acknowledge(ConnectionContext, MessageAck,
MessageReference) line: 325	

DurableTopicSubscription(PrefetchSubscription).acknowledge(ConnectionContext,
MessageAck) line: 233	
	ManagedTopicRegion(AbstractRegion).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 526	
	ManagedRegionBroker(RegionBroker).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 484	
	SchedulerBroker(BrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 88	
	AdvisoryBroker(BrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 88	

CompositeDestinationBroker(BrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 88	
	TransactionBroker.acknowledge(ConsumerBrokerExchange, MessageAck) line: 276	
	RedeliveryPlugin(MutableBrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 98	

LoggingBrokerPlugin(MutableBrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 98	
	LoggingBrokerPlugin.acknowledge(ConsumerBrokerExchange, MessageAck) line:
162	
	BrokerService$6(MutableBrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 98	

ManagedTransportConnection(TransportConnection).processMessageAck(MessageAck)
line: 590	
	MessageAck.visit(CommandVisitor) line: 245	
	ManagedTransportConnection(TransportConnection).service(Command) line: 336	
	TransportConnection$1.onCommand(Object) line: 200	
	MutexTransport.onCommand(Object) line: 50	
	WireFormatNegotiator.onCommand(Object) line: 125	
	InactivityMonitor(AbstractInactivityMonitor).onCommand(Object) line: 301	
	TcpTransport(TransportSupport).doConsume(Object) line: 83	
	TcpTransport.doRun() line: 233	
	TcpTransport.run() line: 215	
	Thread.run() line: 745	


3rd BP:

Daemon Thread [Journal checkpoint worker] (Suspended)	
	OracleJDBCAdapter(DefaultJDBCAdapter).doSetLastAck(TransactionContext,
ActiveMQDestination, XATransactionId, String, String, long, long) line: 526	
	JDBCTopicMessageStore.acknowledge(ConnectionContext, String, String,
MessageId, MessageAck) line: 86	
	JournalTopicMessageStore$2.execute() line: 191	
	JournalMessageStore$3.execute() line: 306	
	TransactionTemplate.run(Callback) line: 44	
	JournalTopicMessageStore(JournalMessageStore).checkpoint(Callback) line:
262	
	JournalTopicMessageStore.checkpoint() line: 182	
	JournalPersistenceAdapter$5.call() line: 455	
	JournalPersistenceAdapter$5.call() line: 452	
	FutureTask<V>.run() line: 262	
	ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1145	
	ThreadPoolExecutor$Worker.run() line: 615	
	Thread.run() line: 745	








--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html

Re: ActiveMQ losing durable topic messages when JournalledJDBCPersistence enabled

Posted by Tim Bain <tb...@alumni.duke.edu>.
Jason,

Thanks for the very thorough investigation; it's very much appreciated.

Yes, when you submit the bug in JIRA (you'll need to sign up for a JIRA
account), please describe the bug in enough detail that hopefully an
implementor will be able to understand the problem without referring back
to this thread, but also add a link to the thread (on Nabble.com) in case
there were questions. Note that I've recently had poor results using the
link embedded at the bottom of emails, so please make sure that the link
actually loads this specific thread.

Thanks again for all that you've done.

Tim

On Dec 5, 2017 1:04 AM, "jasons" <ja...@au.abb.com> wrote:

> Hi Tim, we managed to attach a debugger as requested and the path you
> identified does seem to be the one taken by the incorrect update to
> ACTIVEMQ_ACKS table in respect of DC1.  We took several stack dumps (text
> below) at the different debug points you requested in case that also helps.
> Note that the code passed through the OracleJDBCAdapter.doSetLastAck method
> twice, the first time updating LAST_ACKED_ID for DC1 and the second time,
> as
> you'd sort of expected, updating the LAST_ACKED_ID for DC2 (correct as this
> consumer was consuming and the selector matched).  We also used the
> debugger
> to inspect the value of bind params on the UPDATE statement each time to
> confirm which DC1 was being updated in which order.  We further used SQL
> developer to identify when the change had occurred.   Based on your earlier
> comment, I guess we move to creating a JIRA which I'll now do.  Never
> having
> done this I assume I just bring the history of this forum topic to the JIRA
> background.  Let me know please if anything else to do on our end.
>
> Cheers
>
> Jason.
>
>
> Here are the stack traces in the order they fired and were captured from
> the
> debug session for the thread hitting the breakpoint:
>
> 1st BP:
>
> Daemon Thread [ActiveMQ Transport: tcp:///127.0.0.1:35880@61616]
> (Suspended
> (entry into method unmatched in DurableTopicSubscription))
>         owns: Topic  (id=72)
>         DurableTopicSubscription.unmatched(MessageReference) line: 98
>         SimpleDispatchPolicy.dispatch(MessageReference,
> MessageEvaluationContext,
> List<Subscription>) line: 44
>         Topic.dispatch(ConnectionContext, Message) line: 769
>         Topic.doMessageSend(ProducerBrokerExchange, Message) line: 554
>         Topic.send(ProducerBrokerExchange, Message) line: 482
>         ManagedTopicRegion(AbstractRegion).send(ProducerBrokerExchange,
> Message)
> line: 503
>         ManagedRegionBroker(RegionBroker).send(ProducerBrokerExchange,
> Message)
> line: 468
>         ManagedRegionBroker.send(ProducerBrokerExchange, Message) line:
> 293
>         SchedulerBroker(BrokerFilter).send(ProducerBrokerExchange,
> Message) line:
> 153
>         SchedulerBroker.send(ProducerBrokerExchange, Message) line: 195
>         AdvisoryBroker(BrokerFilter).send(ProducerBrokerExchange,
> Message) line:
> 153
>         CompositeDestinationBroker.send(ProducerBrokerExchange, Message)
> line: 96
>         TransactionBroker.send(ProducerBrokerExchange, Message) line: 293
>         RedeliveryPlugin(MutableBrokerFilter).send(ProducerBrokerExchange,
> Message)
> line: 158
>         LoggingBrokerPlugin(MutableBrokerFilter).send(
> ProducerBrokerExchange,
> Message) line: 158
>         LoggingBrokerPlugin.send(ProducerBrokerExchange, Message) line:
> 275
>         BrokerService$6(MutableBrokerFilter).send(ProducerBrokerExchange,
> Message)
> line: 158
>         ManagedTransportConnection(TransportConnection).
> processMessage(Message)
> line: 581
>         ActiveMQTextMessage(ActiveMQMessage).visit(CommandVisitor) line:
> 768
>         ManagedTransportConnection(TransportConnection).service(Command)
> line: 336
>         TransportConnection$1.onCommand(Object) line: 200
>         MutexTransport.onCommand(Object) line: 50
>         WireFormatNegotiator.onCommand(Object) line: 125
>         InactivityMonitor(AbstractInactivityMonitor).onCommand(Object)
> line: 301
>         TcpTransport(TransportSupport).doConsume(Object) line: 83
>         TcpTransport.doRun() line: 233
>         TcpTransport.run() line: 215
>         Thread.run() line: 745
>
> 2nd BP
>
> Daemon Thread [ActiveMQ Transport: tcp:///127.0.0.1:35875@61616]
> (Suspended
> (breakpoint at line 325 in DurableTopicSubscription))
>         owns: Object  (id=130)
>         DurableTopicSubscription.acknowledge(ConnectionContext,
> MessageAck,
> MessageReference) line: 325
>
> DurableTopicSubscription(PrefetchSubscription).
> acknowledge(ConnectionContext,
> MessageAck) line: 233
>         ManagedTopicRegion(AbstractRegion).acknowledge(
> ConsumerBrokerExchange,
> MessageAck) line: 526
>         ManagedRegionBroker(RegionBroker).acknowledge(
> ConsumerBrokerExchange,
> MessageAck) line: 484
>         SchedulerBroker(BrokerFilter).acknowledge(ConsumerBrokerExchange,
> MessageAck) line: 88
>         AdvisoryBroker(BrokerFilter).acknowledge(ConsumerBrokerExchange,
> MessageAck) line: 88
>
> CompositeDestinationBroker(BrokerFilter).acknowledge(
> ConsumerBrokerExchange,
> MessageAck) line: 88
>         TransactionBroker.acknowledge(ConsumerBrokerExchange, MessageAck)
> line: 276
>         RedeliveryPlugin(MutableBrokerFilter).acknowledge(
> ConsumerBrokerExchange,
> MessageAck) line: 98
>
> LoggingBrokerPlugin(MutableBrokerFilter).acknowledge(
> ConsumerBrokerExchange,
> MessageAck) line: 98
>         LoggingBrokerPlugin.acknowledge(ConsumerBrokerExchange,
> MessageAck) line:
> 162
>         BrokerService$6(MutableBrokerFilter).acknowledge(
> ConsumerBrokerExchange,
> MessageAck) line: 98
>
> ManagedTransportConnection(TransportConnection).
> processMessageAck(MessageAck)
> line: 590
>         MessageAck.visit(CommandVisitor) line: 245
>         ManagedTransportConnection(TransportConnection).service(Command)
> line: 336
>         TransportConnection$1.onCommand(Object) line: 200
>         MutexTransport.onCommand(Object) line: 50
>         WireFormatNegotiator.onCommand(Object) line: 125
>         InactivityMonitor(AbstractInactivityMonitor).onCommand(Object)
> line: 301
>         TcpTransport(TransportSupport).doConsume(Object) line: 83
>         TcpTransport.doRun() line: 233
>         TcpTransport.run() line: 215
>         Thread.run() line: 745
>
>
> 3rd BP:
>
> Daemon Thread [Journal checkpoint worker] (Suspended)
>         OracleJDBCAdapter(DefaultJDBCAdapter).doSetLastAck(
> TransactionContext,
> ActiveMQDestination, XATransactionId, String, String, long, long) line: 526
>         JDBCTopicMessageStore.acknowledge(ConnectionContext, String,
> String,
> MessageId, MessageAck) line: 86
>         JournalTopicMessageStore$2.execute() line: 191
>         JournalMessageStore$3.execute() line: 306
>         TransactionTemplate.run(Callback) line: 44
>         JournalTopicMessageStore(JournalMessageStore).checkpoint(Callback)
> line:
> 262
>         JournalTopicMessageStore.checkpoint() line: 182
>         JournalPersistenceAdapter$5.call() line: 455
>         JournalPersistenceAdapter$5.call() line: 452
>         FutureTask<V>.run() line: 262
>         ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1145
>         ThreadPoolExecutor$Worker.run() line: 615
>         Thread.run() line: 745
>
>
>
>
>
>
>
>
> --
> Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-
> f2341805.html
>