You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Gary Tully (JIRA)" <ji...@apache.org> on 2018/01/26 17:12:00 UTC

[jira] [Reopened] (AMQ-6891) Duplicated message in JMS transaction, when jdbc persistence fails (Memory leak on Queue)

     [ https://issues.apache.org/jira/browse/AMQ-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Gary Tully reopened AMQ-6891:
-----------------------------

the non transacted case has this issue also. Most often store IOErrors would result in the broker stop. However there are cases where we want to let these error bubble back to the producers and in that case we still have leak in the non transacted case.

> Duplicated message in JMS transaction, when jdbc persistence fails (Memory leak on Queue)
> -----------------------------------------------------------------------------------------
>
>                 Key: AMQ-6891
>                 URL: https://issues.apache.org/jira/browse/AMQ-6891
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.15.2
>            Reporter: Radek Kraus
>            Assignee: Gary Tully
>            Priority: Major
>             Fix For: 5.16.0
>
>         Attachments: JmsTransactionCommitFailureTest.java
>
>
> I have following scenario (see attached test case):
>  # Send 1 message in JMS transaction
>  # Enable database problem simulation (throw {{SQLException}} in {{TransactionContext.executeBatch()}} method - the similar situation should happen, when commit fails)
>  # Attempt to send 2 messages in one JMS transaction, send operation fails as is expected (only 1 message is in database from first send operation)
>  # Disable database problem simulation ({{SQLException}} is not thrown from now)
>  # Repeat the attempt to send "the same" 2 messages in one JMS transaction, send operation is successful now, how is expected (3 messages are in database)
>  # Attempt to receive 3 messages 1, 2, 3, but 5 messages are received 1, 2, 3, 2, 3.
> I have suspicion, that problem is in {{org.apache.activemq.broker.region.Queue}}. It seems that reason is {{indexOrderedCursorUpdates}} list. The {{Queue.onAdd(MessageContext)}} method is invoked for each message by {{JDBCMessageStore.addMessage(ConnectionContext, Message) method}}, which adds {{MessageContext}} into this list. The added {{MessageContext}} is processed (and removed) in {{Queue.doPendingCursorAdditions()}} method, which is invoked only from "afterCommit synchronization" ({{Queue.CursorAddSync.afterCommit()}} method). But when commit operation fails, then "afterCommit" method is not invoked (but {{afterRollback}} method is invoked) and {{MessageContext}} entries stays in {{indexOrderedCursorUpdates}} list.
> Personaly I would expect, that some "remove" operation should be done in {{Queue.CursorAddSync.afterRolback()}} method. Probably the similar operation should be done in {{Queue.doMessageSend()}} method on place, where {{Exception}} from "addMessage" is handled in case when JMS transaction is not used. Or some different "completion" operation should be introduced, because {{MessageContext}} is only add into the list,  but don't removed in case of failure.
> When I tried to register (and use) {{LeaseLockerIOExceptionHandler}} IOExceptionHandler, the transports was successfully restarted, but my "client" code was blocked in {{ActiveMQSession.commit()}} method. Is it expected behavior?
> When I tried to add following code into {{Queue.CursorAddSync.afterRollback()}}, I received only 3 expected messages (when JMS transaction is used), but it was only blind shot, sorry, because I don't understand the whole logic here.
> {code:java}
> @Override
> public void afterRollback() throws Exception {
>   synchronized(indexOrderedCursorUpdates) {
>     for(int i = indexOrderedCursorUpdates.size() - 1; i >= 0; i--) {
>       MessageContext mc = indexOrderedCursorUpdates.get(i);
>         if(mc.message.getMessageId().equals(messageContext.message.getMessageId())) {
>         indexOrderedCursorUpdates.remove(mc);
>         if(mc.onCompletion != null) {
>           mc.onCompletion.run();
>         }
>         break;
>       }
>     }
>   }
>   messageContext.message.decrementReferenceCount();
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)