You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Fugitt, Jesse" <jf...@informatica.com> on 2014/09/04 22:26:56 UTC

problem when MessageStore updateMessage function throws an exception

When using the new option persisteJMSRedelivered (to ensure the redelivered flag is set correctly on potentially duplicate messages that are re-dispatched by the broker even after a restart): <policyEntry queue=">" persistJMSRedelivered="true"></policyEntry>

there is still a case where a message can be re-sent and will not be marked as redelivered.  I can open a JIRA and probably create a unit test but it is pretty clear from the pasted code below where the exception is getting swallowed.  Would the preferred fix be to update the broker interface and make preProcessDispatch throw an IOException or would it be better to add a new field to the MessageDispatch class to indicate an exception occurred and leave the interface alone?

The specific case when this can happen is when a MessageStore returns an exception during the updateMessage call, which then gets swallowed (and an ERROR logged) and still allows the message to be dispatched to the consumer.  The exception seems like it should actually propagate out of the preProcessDispatch function in RegionBroker as shown below, but this would require changing the Broker interface and making the void preProcessDispatch function throw an IOException.

//RegionBroker.java
    @Override
    public void preProcessDispatch(MessageDispatch messageDispatch) {
        Message message = messageDispatch.getMessage();
        if (message != null) {
            long endTime = System.currentTimeMillis();
            message.setBrokerOutTime(endTime);
            if (getBrokerService().isEnableStatistics()) {
                long totalTime = endTime - message.getBrokerInTime();
                ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
            }
            if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() && !message.isRedelivered() && message.isPersistent()) {
                final int originalValue = message.getRedeliveryCounter();
                message.incrementRedeliveryCounter();
                try {
                    ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message);
                } catch (IOException error) {
                    LOG.error("Failed to persist JMSRedeliveryFlag on {} in {}", message.getMessageId(), message.getDestination(), error);
                } finally {
                    message.setRedeliveryCounter(originalValue);
                }
            }
        }
    }

//TransportConnection.java
    protected void processDispatch(Command command) throws IOException {
        MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
        try {
            if (!stopping.get()) {
                if (messageDispatch != null) {
                    broker.preProcessDispatch(messageDispatch);
                }
                dispatch(command);  //This code will dispatch the message whether or not the updateMessage function actually worked
            }
        ...



I wanted to get input on this issue before proceeding further with it.

Thanks,
Jesse

RE: problem when MessageStore updateMessage function throws an exception

Posted by "Fugitt, Jesse" <jf...@informatica.com>.
Yes, the fix in the JIRA is working for us now with a re-built broker using that fix (other fixes might be possible that don't involve changing the broker interface but the attached fix seemed the most straightforward).  The problem for us occurs typically during a broker shutdown when the message store's updateMessage throws the exception.  We are expecting the broker to never assign a message out to a consumer if it couldn't successfully complete updateMessage (otherwise you are open to duplicates being sent upon restart of the broker that are not marked redelivered).  The attached unit test illustrates the expected behavior.

Thanks,
Jesse

-----Original Message-----
From: Gary Tully [mailto:gary.tully@gmail.com] 
Sent: Thursday, September 11, 2014 4:51 PM
To: dev@activemq.apache.org
Subject: Re: problem when MessageStore updateMessage function throws an exception

ah, I see your jira and patch. will try and have a peek tomorrow.

On 11 September 2014 22:47, Gary Tully <ga...@gmail.com> wrote:
> A store failure is typically fatal. There is an ioexception handler in 
> the loop for message sends and the default behaviour is to stop the 
> broker.
> One option is to pull in the ioexception handler in this case also, 
> allowing the broker stop behaviour.
>
> at this point, predispatch, the message is pending an ack and won't 
> get dispatched again till the consumer/subscription closes so not 
> dispatching will leave it dangling.
>
> What is the ideal behaviour for a message in this case?
>
>
> On 4 September 2014 21:26, Fugitt, Jesse <jf...@informatica.com> wrote:
>> When using the new option persisteJMSRedelivered (to ensure the 
>> redelivered flag is set correctly on potentially duplicate messages 
>> that are re-dispatched by the broker even after a restart): 
>> <policyEntry queue=">" persistJMSRedelivered="true"></policyEntry>
>>
>> there is still a case where a message can be re-sent and will not be marked as redelivered.  I can open a JIRA and probably create a unit test but it is pretty clear from the pasted code below where the exception is getting swallowed.  Would the preferred fix be to update the broker interface and make preProcessDispatch throw an IOException or would it be better to add a new field to the MessageDispatch class to indicate an exception occurred and leave the interface alone?
>>
>> The specific case when this can happen is when a MessageStore returns an exception during the updateMessage call, which then gets swallowed (and an ERROR logged) and still allows the message to be dispatched to the consumer.  The exception seems like it should actually propagate out of the preProcessDispatch function in RegionBroker as shown below, but this would require changing the Broker interface and making the void preProcessDispatch function throw an IOException.
>>
>> //RegionBroker.java
>>     @Override
>>     public void preProcessDispatch(MessageDispatch messageDispatch) {
>>         Message message = messageDispatch.getMessage();
>>         if (message != null) {
>>             long endTime = System.currentTimeMillis();
>>             message.setBrokerOutTime(endTime);
>>             if (getBrokerService().isEnableStatistics()) {
>>                 long totalTime = endTime - message.getBrokerInTime();
>>                 ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
>>             }
>>             if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() && !message.isRedelivered() && message.isPersistent()) {
>>                 final int originalValue = message.getRedeliveryCounter();
>>                 message.incrementRedeliveryCounter();
>>                 try {
>>                     ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message);
>>                 } catch (IOException error) {
>>                     LOG.error("Failed to persist JMSRedeliveryFlag on {} in {}", message.getMessageId(), message.getDestination(), error);
>>                 } finally {
>>                     message.setRedeliveryCounter(originalValue);
>>                 }
>>             }
>>         }
>>     }
>>
>> //TransportConnection.java
>>     protected void processDispatch(Command command) throws IOException {
>>         MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
>>         try {
>>             if (!stopping.get()) {
>>                 if (messageDispatch != null) {
>>                     broker.preProcessDispatch(messageDispatch);
>>                 }
>>                 dispatch(command);  //This code will dispatch the message whether or not the updateMessage function actually worked
>>             }
>>         ...
>>
>>
>>
>> I wanted to get input on this issue before proceeding further with it.
>>
>> Thanks,
>> Jesse
>
>
>
> --
> http://redhat.com
> http://blog.garytully.com



--
http://redhat.com
http://blog.garytully.com

Re: problem when MessageStore updateMessage function throws an exception

Posted by Gary Tully <ga...@gmail.com>.
ah, I see your jira and patch. will try and have a peek tomorrow.

On 11 September 2014 22:47, Gary Tully <ga...@gmail.com> wrote:
> A store failure is typically fatal. There is an ioexception handler in
> the loop for message sends and the default behaviour is to stop the
> broker.
> One option is to pull in the ioexception handler in this case also,
> allowing the broker stop behaviour.
>
> at this point, predispatch, the message is pending an ack and won't
> get dispatched again till the consumer/subscription closes so not
> dispatching will leave it dangling.
>
> What is the ideal behaviour for a message in this case?
>
>
> On 4 September 2014 21:26, Fugitt, Jesse <jf...@informatica.com> wrote:
>> When using the new option persisteJMSRedelivered (to ensure the redelivered flag is set correctly on potentially duplicate messages that are re-dispatched by the broker even after a restart): <policyEntry queue=">" persistJMSRedelivered="true"></policyEntry>
>>
>> there is still a case where a message can be re-sent and will not be marked as redelivered.  I can open a JIRA and probably create a unit test but it is pretty clear from the pasted code below where the exception is getting swallowed.  Would the preferred fix be to update the broker interface and make preProcessDispatch throw an IOException or would it be better to add a new field to the MessageDispatch class to indicate an exception occurred and leave the interface alone?
>>
>> The specific case when this can happen is when a MessageStore returns an exception during the updateMessage call, which then gets swallowed (and an ERROR logged) and still allows the message to be dispatched to the consumer.  The exception seems like it should actually propagate out of the preProcessDispatch function in RegionBroker as shown below, but this would require changing the Broker interface and making the void preProcessDispatch function throw an IOException.
>>
>> //RegionBroker.java
>>     @Override
>>     public void preProcessDispatch(MessageDispatch messageDispatch) {
>>         Message message = messageDispatch.getMessage();
>>         if (message != null) {
>>             long endTime = System.currentTimeMillis();
>>             message.setBrokerOutTime(endTime);
>>             if (getBrokerService().isEnableStatistics()) {
>>                 long totalTime = endTime - message.getBrokerInTime();
>>                 ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
>>             }
>>             if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() && !message.isRedelivered() && message.isPersistent()) {
>>                 final int originalValue = message.getRedeliveryCounter();
>>                 message.incrementRedeliveryCounter();
>>                 try {
>>                     ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message);
>>                 } catch (IOException error) {
>>                     LOG.error("Failed to persist JMSRedeliveryFlag on {} in {}", message.getMessageId(), message.getDestination(), error);
>>                 } finally {
>>                     message.setRedeliveryCounter(originalValue);
>>                 }
>>             }
>>         }
>>     }
>>
>> //TransportConnection.java
>>     protected void processDispatch(Command command) throws IOException {
>>         MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
>>         try {
>>             if (!stopping.get()) {
>>                 if (messageDispatch != null) {
>>                     broker.preProcessDispatch(messageDispatch);
>>                 }
>>                 dispatch(command);  //This code will dispatch the message whether or not the updateMessage function actually worked
>>             }
>>         ...
>>
>>
>>
>> I wanted to get input on this issue before proceeding further with it.
>>
>> Thanks,
>> Jesse
>
>
>
> --
> http://redhat.com
> http://blog.garytully.com



-- 
http://redhat.com
http://blog.garytully.com

Re: problem when MessageStore updateMessage function throws an exception

Posted by Gary Tully <ga...@gmail.com>.
A store failure is typically fatal. There is an ioexception handler in
the loop for message sends and the default behaviour is to stop the
broker.
One option is to pull in the ioexception handler in this case also,
allowing the broker stop behaviour.

at this point, predispatch, the message is pending an ack and won't
get dispatched again till the consumer/subscription closes so not
dispatching will leave it dangling.

What is the ideal behaviour for a message in this case?


On 4 September 2014 21:26, Fugitt, Jesse <jf...@informatica.com> wrote:
> When using the new option persisteJMSRedelivered (to ensure the redelivered flag is set correctly on potentially duplicate messages that are re-dispatched by the broker even after a restart): <policyEntry queue=">" persistJMSRedelivered="true"></policyEntry>
>
> there is still a case where a message can be re-sent and will not be marked as redelivered.  I can open a JIRA and probably create a unit test but it is pretty clear from the pasted code below where the exception is getting swallowed.  Would the preferred fix be to update the broker interface and make preProcessDispatch throw an IOException or would it be better to add a new field to the MessageDispatch class to indicate an exception occurred and leave the interface alone?
>
> The specific case when this can happen is when a MessageStore returns an exception during the updateMessage call, which then gets swallowed (and an ERROR logged) and still allows the message to be dispatched to the consumer.  The exception seems like it should actually propagate out of the preProcessDispatch function in RegionBroker as shown below, but this would require changing the Broker interface and making the void preProcessDispatch function throw an IOException.
>
> //RegionBroker.java
>     @Override
>     public void preProcessDispatch(MessageDispatch messageDispatch) {
>         Message message = messageDispatch.getMessage();
>         if (message != null) {
>             long endTime = System.currentTimeMillis();
>             message.setBrokerOutTime(endTime);
>             if (getBrokerService().isEnableStatistics()) {
>                 long totalTime = endTime - message.getBrokerInTime();
>                 ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
>             }
>             if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() && !message.isRedelivered() && message.isPersistent()) {
>                 final int originalValue = message.getRedeliveryCounter();
>                 message.incrementRedeliveryCounter();
>                 try {
>                     ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message);
>                 } catch (IOException error) {
>                     LOG.error("Failed to persist JMSRedeliveryFlag on {} in {}", message.getMessageId(), message.getDestination(), error);
>                 } finally {
>                     message.setRedeliveryCounter(originalValue);
>                 }
>             }
>         }
>     }
>
> //TransportConnection.java
>     protected void processDispatch(Command command) throws IOException {
>         MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
>         try {
>             if (!stopping.get()) {
>                 if (messageDispatch != null) {
>                     broker.preProcessDispatch(messageDispatch);
>                 }
>                 dispatch(command);  //This code will dispatch the message whether or not the updateMessage function actually worked
>             }
>         ...
>
>
>
> I wanted to get input on this issue before proceeding further with it.
>
> Thanks,
> Jesse



-- 
http://redhat.com
http://blog.garytully.com