You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Jesse Fugitt (JIRA)" <ji...@apache.org> on 2014/09/08 23:07:28 UTC

[jira] [Updated] (AMQ-5347) persistJMSRedelivered flag doesn't work correctly when exceptions occur

     [ https://issues.apache.org/jira/browse/AMQ-5347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jesse Fugitt updated AMQ-5347:
------------------------------
    Attachment: RedeliveryRestartWithExceptionTest.java
                AMQ5347.patch

Attached a proposed patch against the latest code on github for the persistJMSRedelivered issue.

Also, attached a new unit test that passes when run from activemq-unit-tests after the fix is applied.

> persistJMSRedelivered flag doesn't work correctly when exceptions occur
> -----------------------------------------------------------------------
>
>                 Key: AMQ-5347
>                 URL: https://issues.apache.org/jira/browse/AMQ-5347
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.10.0
>            Reporter: Jesse Fugitt
>         Attachments: AMQ5347.patch, RedeliveryRestartWithExceptionTest.java
>
>
> The new flag in 5.10 that ensures the JMSRedelivered flag persists across broker restarts does not work correctly when an exception occurs when attempting to write the "message update" to disk before the restart.  In that case, messages can be assigned to receivers, the broker can be restarted, and then the messages are re-assigned to receivers and do not include the JMSRedelivered flag as expected.  I will attach a unit test and proposed fix to illustrate the problem.
> Also, here is additional information I had sent to the mailing list:
> When using the new option persisteJMSRedelivered (to ensure the redelivered flag is set correctly on potentially duplicate messages that are re-dispatched by the broker even after a restart): <policyEntry queue=">" persistJMSRedelivered="true"></policyEntry>
> there is still a case where a message can be re-sent and will not be marked as redelivered.  I can open a JIRA and probably create a unit test but it is pretty clear from the pasted code below where the exception is getting swallowed.  Would the preferred fix be to update the broker interface and make preProcessDispatch throw an IOException or would it be better to add a new field to the MessageDispatch class to indicate an exception occurred and leave the interface alone?
> The specific case when this can happen is when a MessageStore returns an exception during the updateMessage call, which then gets swallowed (and an ERROR logged) and still allows the message to be dispatched to the consumer.  The exception seems like it should actually propagate out of the preProcessDispatch function in RegionBroker as shown below, but this would require changing the Broker interface and making the void preProcessDispatch function throw an IOException.
> //RegionBroker.java
>     @Override
>     public void preProcessDispatch(MessageDispatch messageDispatch) {
>         Message message = messageDispatch.getMessage();
>         if (message != null) {
>             long endTime = System.currentTimeMillis();
>             message.setBrokerOutTime(endTime);
>             if (getBrokerService().isEnableStatistics()) {
>                 long totalTime = endTime - message.getBrokerInTime();
>                 ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
>             }
>             if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() && !message.isRedelivered() && message.isPersistent()) {
>                 final int originalValue = message.getRedeliveryCounter();
>                 message.incrementRedeliveryCounter();
>                 try {
>                     ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message);
>                 } catch (IOException error) {
>                     LOG.error("Failed to persist JMSRedeliveryFlag on {} in {}", message.getMessageId(), message.getDestination(), error);
>                 } finally {
>                     message.setRedeliveryCounter(originalValue);
>                 }
>             }
>         }
>     }
> //TransportConnection.java
>     protected void processDispatch(Command command) throws IOException {
>         MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
>         try {
>             if (!stopping.get()) {
>                 if (messageDispatch != null) {
>                     broker.preProcessDispatch(messageDispatch);
>                 }
>                 dispatch(command);  //This code will dispatch the message whether or not the updateMessage function actually worked
>             }
>         ...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)