You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by benzagel <cf...@gmail.com> on 2017/09/08 17:28:51 UTC

ActiveMQ Config to purge 'persistent' messages in DLQ.

Hi everyone!

I am having an issue where a subscriber is not running and messages get sent
to the DLQ. The messages are marked 'Persistent'. Is there a way I can make
the DLQ 'persistent' messages to get deleted after a 24h period?

I don't care about redelivery or any kind of checking at this point.

Here is the current config I have:



<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">

    
    <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

   
    <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>

    
    <broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost" dataDirectory="${activemq.data}">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
				</policyEntry> 
				<policyEntry queue=">"><deadLetterStrategy><sharedDeadLetterStrategy
processExpired="true" expiration="86400000"
processNonPersistent="true"/></deadLetterStrategy></policyEntry> 
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        
        <persistenceAdapter>
            <kahaDB journalMaxFileLength="1mb"
directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>


          
          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="96 mb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="32 mb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        
        <transportConnectors>
            
            <transportConnector name="openwire"
uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp"
uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp"
uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt"
uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws"
uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

        
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans"
class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>

    
    <import resource="jetty.xml"/>

</beans>





--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html

Re: ActiveMQ Config to purge 'persistent' messages in DLQ.

Posted by benzagel <cf...@gmail.com>.
Thanks for your help Tim!

The system has been smooth since.



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html

Re: ActiveMQ Config to purge 'persistent' messages in DLQ.

Posted by Tim Bain <tb...@alumni.duke.edu>.
You've probably already tried this and found it out, but persistent
messages are the normal case for the SharedDeadLetterStrategy, so they'll
be processed even if processExpired is false. Non-persistent messages are
not sent to the DLQ by default, which is why you've explicitly enabled that
via processNonPersistent, but persistent ones are always processed if
you've enabled the SharedDeadLetterStrategy.

Tim

On Sep 18, 2017 8:41 AM, "benzagel" <cf...@gmail.com> wrote:

> Will this work on messages that are marked as 'Persistent'?
>
> I have changed the config to now have processexpired to false. I understand
> why the previous one would be circular in processing.
>
> Just wondering about persistent messages.
>
> Chris
>
>
>
> --
> Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-
> f2341805.html
>

Re: ActiveMQ Config to purge 'persistent' messages in DLQ.

Posted by benzagel <cf...@gmail.com>.
Will this work on messages that are marked as 'Persistent'?

I have changed the config to now have processexpired to false. I understand
why the previous one would be circular in processing.

Just wondering about persistent messages.

Chris



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html

Re: ActiveMQ Config to purge 'persistent' messages in DLQ.

Posted by Tim Bain <tb...@alumni.duke.edu>.
I believe the problem is that your current configuration has your messages
expire after 24 hours on the DLQ, only to be deleted and sent to..... the
DLQ. I believe this is what is referred to by the warning in the Setting
Expiration on Messages in the DLQ section of
http://activemq.apache.org/message-redelivery-and-dlq-handling.html. So to
make this work they way you want, we need to prevent messages that expire
from the DLQ from being sent to the DLQ.

If you don't need to send expired messages from your normal queues to the
DLQ (only ones that fail processing), I think you can just set
processExpired="false" in your existing config to get the behavior you're
looking for.

If you do need to send messages that expire from non-DLQ destinations to
the DLQ, and then want to expire them from the DLQ 24 hours later, I think
you can simply set an additional policyEntry for the DLQ (before your
existing one for queue=">") along the lines of the following:

<policyEntry queue="DLQ"><deadLetterStrategy><sharedDeadLetterStrategy
processExpired="false" expiration="86400000" processNonPersistent="true"/><
/deadLetterStrategy></policyEntry>

Tim

On Fri, Sep 8, 2017 at 11:28 AM, benzagel <cf...@gmail.com> wrote:

> Hi everyone!
>
> I am having an issue where a subscriber is not running and messages get
> sent
> to the DLQ. The messages are marked 'Persistent'. Is there a way I can make
> the DLQ 'persistent' messages to get deleted after a 24h period?
>
> I don't care about redelivery or any kind of checking at this point.
>
> Here is the current config I have:
>
>
>
> <beans
>   xmlns="http://www.springframework.org/schema/beans"
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>   xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans.xsd
>   http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd">
>
>
>     <bean
> class="org.springframework.beans.factory.config.
> PropertyPlaceholderConfigurer">
>         <property name="locations">
>             <value>file:${activemq.conf}/credentials.properties</value>
>         </property>
>     </bean>
>
>
>     <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
>           lazy-init="false" scope="singleton"
>           init-method="start" destroy-method="stop">
>     </bean>
>
>
>     <broker xmlns="http://activemq.apache.org/schema/core"
> brokerName="localhost" dataDirectory="${activemq.data}">
>
>         <destinationPolicy>
>             <policyMap>
>               <policyEntries>
>                 <policyEntry topic=">" >
>
>                   <pendingMessageLimitStrategy>
>                     <constantPendingMessageLimitStrategy limit="1000"/>
>                   </pendingMessageLimitStrategy>
>                                 </policyEntry>
>                                 <policyEntry queue=">"><deadLetterStrategy>
> <sharedDeadLetterStrategy
> processExpired="true" expiration="86400000"
> processNonPersistent="true"/></deadLetterStrategy></policyEntry>
>               </policyEntries>
>             </policyMap>
>         </destinationPolicy>
>
>
>
>         <managementContext>
>             <managementContext createConnector="false"/>
>         </managementContext>
>
>
>         <persistenceAdapter>
>             <kahaDB journalMaxFileLength="1mb"
> directory="${activemq.data}/kahadb"/>
>         </persistenceAdapter>
>
>
>
>           <systemUsage>
>             <systemUsage>
>                 <memoryUsage>
>                     <memoryUsage percentOfJvmHeap="70" />
>                 </memoryUsage>
>                 <storeUsage>
>                     <storeUsage limit="96 mb"/>
>                 </storeUsage>
>                 <tempUsage>
>                     <tempUsage limit="32 mb"/>
>                 </tempUsage>
>             </systemUsage>
>         </systemUsage>
>
>
>         <transportConnectors>
>
>             <transportConnector name="openwire"
> uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;
> wireFormat.maxFrameSize=104857600"/>
>             <transportConnector name="amqp"
> uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;
> wireFormat.maxFrameSize=104857600"/>
>             <transportConnector name="stomp"
> uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;
> wireFormat.maxFrameSize=104857600"/>
>             <transportConnector name="mqtt"
> uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;
> wireFormat.maxFrameSize=104857600"/>
>             <transportConnector name="ws"
> uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;
> wireFormat.maxFrameSize=104857600"/>
>         </transportConnectors>
>
>
>         <shutdownHooks>
>             <bean xmlns="http://www.springframework.org/schema/beans"
> class="org.apache.activemq.hooks.SpringContextHook" />
>         </shutdownHooks>
>
>     </broker>
>
>
>     <import resource="jetty.xml"/>
>
> </beans>
>
>
>
>
>
> --
> Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-
> f2341805.html
>