You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Gary Tully (JIRA)" <ji...@apache.org> on 2018/01/26 12:45:00 UTC
[jira] [Commented] (AMQ-6891) Duplicated message in JMS
transaction, when jdbc persistence fails (Memory leak on Queue)
[ https://issues.apache.org/jira/browse/AMQ-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340992#comment-16340992 ]
Gary Tully commented on AMQ-6891:
---------------------------------
[~rkraus] thanks for the great test and analysis. I think you have identified the problem being the lack of work on the sync rollback. Am just wondering if the hash/equals of messageContext should be implemented such that indexOrderedCursorUpdates.remove() would be sufficient and am wondering about calling the completion in the rollback case, but it may be necessary for cleanup.
I will investigate some more. thanks again for the great bug report :-)
> Duplicated message in JMS transaction, when jdbc persistence fails (Memory leak on Queue)
> -----------------------------------------------------------------------------------------
>
> Key: AMQ-6891
> URL: https://issues.apache.org/jira/browse/AMQ-6891
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.15.2
> Reporter: Radek Kraus
> Assignee: Gary Tully
> Priority: Major
> Attachments: JmsTransactionCommitFailureTest.java
>
>
> I have following scenario (see attached test case):
> # Send 1 message in JMS transaction
> # Enable database problem simulation (throw {{SQLException}} in {{TransactionContext.executeBatch()}} method - the similar situation should happen, when commit fails)
> # Attempt to send 2 messages in one JMS transaction, send operation fails as is expected (only 1 message is in database from first send operation)
> # Disable database problem simulation ({{SQLException}} is not thrown from now)
> # Repeat the attempt to send "the same" 2 messages in one JMS transaction, send operation is successful now, how is expected (3 messages are in database)
> # Attempt to receive 3 messages 1, 2, 3, but 5 messages are received 1, 2, 3, 2, 3.
> I have suspicion, that problem is in {{org.apache.activemq.broker.region.Queue}}. It seems that reason is {{indexOrderedCursorUpdates}} list. The {{Queue.onAdd(MessageContext)}} method is invoked for each message by {{JDBCMessageStore.addMessage(ConnectionContext, Message) method}}, which adds {{MessageContext}} into this list. The added {{MessageContext}} is processed (and removed) in {{Queue.doPendingCursorAdditions()}} method, which is invoked only from "afterCommit synchronization" ({{Queue.CursorAddSync.afterCommit()}} method). But when commit operation fails, then "afterCommit" method is not invoked (but {{afterRollback}} method is invoked) and {{MessageContext}} entries stays in {{indexOrderedCursorUpdates}} list.
> Personaly I would expect, that some "remove" operation should be done in {{Queue.CursorAddSync.afterRolback()}} method. Probably the similar operation should be done in {{Queue.doMessageSend()}} method on place, where {{Exception}} from "addMessage" is handled in case when JMS transaction is not used. Or some different "completion" operation should be introduced, because {{MessageContext}} is only add into the list, but don't removed in case of failure.
> When I tried to register (and use) {{LeaseLockerIOExceptionHandler}} IOExceptionHandler, the transports was successfully restarted, but my "client" code was blocked in {{ActiveMQSession.commit()}} method. Is it expected behavior?
> When I tried to add following code into {{Queue.CursorAddSync.afterRollback()}}, I received only 3 expected messages (when JMS transaction is used), but it was only blind shot, sorry, because I don't understand the whole logic here.
> {code:java}
> @Override
> public void afterRollback() throws Exception {
> synchronized(indexOrderedCursorUpdates) {
> for(int i = indexOrderedCursorUpdates.size() - 1; i >= 0; i--) {
> MessageContext mc = indexOrderedCursorUpdates.get(i);
> if(mc.message.getMessageId().equals(messageContext.message.getMessageId())) {
> indexOrderedCursorUpdates.remove(mc);
> if(mc.onCompletion != null) {
> mc.onCompletion.run();
> }
> break;
> }
> }
> }
> messageContext.message.decrementReferenceCount();
> }
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)