You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Radek Kraus (JIRA)" <ji...@apache.org> on 2018/01/25 16:22:00 UTC

[jira] [Created] (AMQ-6891) Duplicated message in JMS transaction, when jdbc persistence fails

Radek Kraus created AMQ-6891:
--------------------------------

             Summary: Duplicated message in JMS transaction, when jdbc persistence fails
                 Key: AMQ-6891
                 URL: https://issues.apache.org/jira/browse/AMQ-6891
             Project: ActiveMQ
          Issue Type: Bug
          Components: Broker
    Affects Versions: 5.15.2
            Reporter: Radek Kraus


I have following scenario (see attached test case):
 # Send 1 message in JMS transaction
 # Enable database problem simulation (throw {{SQLException}} in {{TransactionContext.executeBatch()}} method - the similar situation should happen, when commit fails)
 # Attempt to send 2 messages in one JMS transaction, send operation fails as is expected (only 1 message is in database from first send operation)
 # Disable database problem simulation ({{SQLException}} is not thrown from now)
 # Repeat the attempt to send "the same" 2 messages in one JMS transaction, send operation is successful now, how is expected (3 messages are in database)
 # Attempt to receive 3 messages 1, 2, 3, but 5 messages are received 1, 2, 3, 2, 3.

I have suspicion, that problem is in {{org.apache.activemq.broker.region.Queue}}. It seems that reason is {{indexOrderedCursorUpdates}} list. The {{Queue.onAdd(MessageContext)}} method is invoked for each message by {{JDBCMessageStore.addMessage(ConnectionContext, Message) method}}, which adds {{MessageContext}} into this list. The added {{MessageContext}} is processed (and removed) in {{Queue.doPendingCursorAdditions()}} method, which is invoked only from "afterCommit synchronization" ({{Queue.CursorAddSync.afterCommit()}} method). But when commit operation fails, then "afterCommit" method is not invoked (but {{afterRollback}} method is invoked) and {{MessageContext}} entries stays in {{indexOrderedCursorUpdates}} list.

Personaly I would expect, that some "remove" operation should be done in {{Queue.CursorAddSync.afterRolback()}} method. Probably the similar operation should be done in {{Queue.doMessageSend()}} method on place, where {{Exception}} from "addMessage" is handled in case when JMS transaction is not used. Or some different "completion" operation should be introduced, because {{MessageContext}} is only add into the list,  but don't removed in case of failure.

When I tried to register (and use) {{LeaseLockerIOExceptionHandler }}IOExceptionHandler, the transports was successfully restarted, but my "client" code was blocked in {{ActiveMQSession.commit()}} method. Is it expected behavior?

When I tried to add following code into {{Queue.CursorAddSync.afterRollback()}}, I received only 3 expected messages (when JMS transaction is used), but it was only blind shot, sorry, because I don't understand the whole logic here.
{code:java}
@Override
public void afterRollback() throws Exception {
  synchronized(indexOrderedCursorUpdates) {
    for(int i = indexOrderedCursorUpdates.size() - 1; i >= 0; i--) {
      MessageContext mc = indexOrderedCursorUpdates.get(i);
        if(mc.message.getMessageId().equals(messageContext.message.getMessageId())) {
        indexOrderedCursorUpdates.remove(mc);
        if(mc.onCompletion != null) {
          mc.onCompletion.run();
        }
        break;
      }
    }
  }
  messageContext.message.decrementReferenceCount();
}
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)