You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by coryplusplus <co...@gmail.com> on 2014/10/03 18:08:03 UTC

Remove Unacknowledged Heartbeat Message

Hi,
Currently I am using apache-activemq-5.5.1 and am having some trouble with
memory usage filling up for a particular topic.

2014-08-12 22:33:44,881 | INFO  | Usage Manager memory limit reached.
Stopping producer (XXXXX) to prevent flooding topic://SERVER_PONG. See
http://activemq.apache.org/producer-flow-control.html for more info
(blocking for: 1s) | org.apache.activemq.broker.region.Topic | ActiveMQ
Transport: tcp:///127.0.0.1:51252

This blocking continues until activeMQ is restarted.

We basically have a client/server application where activemq is hosted on
the same machine as the server.
The server has multiple components and each produce a SERVER_PONG message
which is sent in response to a SERVER_PING message from the client. Our
current theory is that somehow our client is getting into an unexpected
state where it is able to send SERVER_PING messages but unable to consume
SERVER_PONG messages. This results in the SERVER_PONG messages backing up
and eventually causing us to exhaust our memory resources as mentioned
above.

Our first attempt at a solution was to utilize the timeToLive variable and
set it for each SERVER_PONG message that was sent. That would have made sure
that we weren’t keeping SERVER_PONG messages around that were not
acknowledged and would stop us from flooding the broker. This however was
not a possibility as our clients can potentially be based in different time
zones than our server. I saw a lot of posts recommending the timestamp
plugin but that does not work in our case since our producer and broker are
on the same server and our consumer is the one checking the expiration time
before processing. (see
http://activemq.2283324.n4.nabble.com/timeToLive-with-out-of-sync-clocks-td3009739.html)

So the question now has become, is there a way to remove these server pong
messages that have not been acknowledged after x amount of time. Does
ActiveMQ provide an API that will allow us to check number of inFlight
messages per topic and remove them accordingly if we have too many?

Here is our activemq.xml configuration file if needed.


<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">

    
    <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.base}/conf/credentials.properties</value>
        </property>
    </bean>

    
    <broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost" dataDirectory="${activemq.base}/data"
persistent="false" >

        

                <destinationPolicy>
                                <policyMap>
                                                <policyEntries>
                                                                <policyEntry
topic="SERVER_PONG.>">
                                                                               
<pendingMessageLimitStrategy>
                                                                                               
<constantPendingMessageLimitStrategy limit="50" />
                                                                               
</pendingMessageLimitStrategy>
                                                               
</policyEntry>
                                                                <policyEntry
topic="FROM_STATUS_SERVER.>">
                                                                               
<pendingMessageLimitStrategy>
                                                                                               
<constantPendingMessageLimitStrategy limit="5000" />
                                                                               
</pendingMessageLimitStrategy>
                                                               
</policyEntry>
                                                                <policyEntry
topic=">" producerFlowControl="false" memoryLimit="1 kb">
                                                                               
<pendingSubscriberPolicy>
                                                                                               
<vmCursor />
                                                                               
</pendingSubscriberPolicy>
                                                               
</policyEntry>
                                                                <policyEntry
queue=">" producerFlowControl="false" memoryLimit="1 kb">
                                                                                
                                                               
</policyEntry>
                                                </policyEntries>
                                </policyMap>
                </destinationPolicy>


        
        <managementContext>
            <managementContext createConnector="true"/>
        </managementContext>

        


          
        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="3072 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="5 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="8 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        
        <transportConnectors>
                        <transportConnector name="openwire"
uri="tcp://0.0.0.0:60000?transport.keepAlive=true&amp;wireformat.maxInactivityDuration=0&amp;transport.connectionTimeout=0"/>
        </transportConnectors>

    </broker>

    

</beans>





--
View this message in context: http://activemq.2283324.n4.nabble.com/Remove-Unacknowledged-Heartbeat-Message-tp4686129.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Remove Unacknowledged Heartbeat Message

Posted by coryplusplus <co...@gmail.com>.
Thank you so much for the comment and you are right. Timezone differences
does not seem to make a difference. Based on the information I was reading
online it had come across that it did.
I appreciate the help.



--
View this message in context: http://activemq.2283324.n4.nabble.com/Remove-Unacknowledged-Heartbeat-Message-tp4686129p4686177.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Remove Unacknowledged Heartbeat Message

Posted by Tim Bain <tb...@alumni.duke.edu>.
Why are consumers in other timezones a concern?  The expiration timestamp
is measured in milliseconds UTC, so the same instant in time will have the
same value in all timezones.  As long as all clocks are relatively close to
being in sync, it shouldn't matter what timezone anyone's located in...

Also, your statement that the consumer is the one checking the expiration
time is correct but incomplete; all brokers will also check expiration
times of messages they hold, and DLQ (or delete) any that are expired.

On Fri, Oct 3, 2014 at 10:08 AM, coryplusplus <co...@gmail.com>
wrote:

> Hi,
> Currently I am using apache-activemq-5.5.1 and am having some trouble with
> memory usage filling up for a particular topic.
>
> 2014-08-12 22:33:44,881 | INFO  | Usage Manager memory limit reached.
> Stopping producer (XXXXX) to prevent flooding topic://SERVER_PONG. See
> http://activemq.apache.org/producer-flow-control.html for more info
> (blocking for: 1s) | org.apache.activemq.broker.region.Topic | ActiveMQ
> Transport: tcp:///127.0.0.1:51252
>
> This blocking continues until activeMQ is restarted.
>
> We basically have a client/server application where activemq is hosted on
> the same machine as the server.
> The server has multiple components and each produce a SERVER_PONG message
> which is sent in response to a SERVER_PING message from the client. Our
> current theory is that somehow our client is getting into an unexpected
> state where it is able to send SERVER_PING messages but unable to consume
> SERVER_PONG messages. This results in the SERVER_PONG messages backing up
> and eventually causing us to exhaust our memory resources as mentioned
> above.
>
> Our first attempt at a solution was to utilize the timeToLive variable and
> set it for each SERVER_PONG message that was sent. That would have made
> sure
> that we weren’t keeping SERVER_PONG messages around that were not
> acknowledged and would stop us from flooding the broker. This however was
> not a possibility as our clients can potentially be based in different time
> zones than our server. I saw a lot of posts recommending the timestamp
> plugin but that does not work in our case since our producer and broker are
> on the same server and our consumer is the one checking the expiration time
> before processing. (see
>
> http://activemq.2283324.n4.nabble.com/timeToLive-with-out-of-sync-clocks-td3009739.html
> )
>
> So the question now has become, is there a way to remove these server pong
> messages that have not been acknowledged after x amount of time. Does
> ActiveMQ provide an API that will allow us to check number of inFlight
> messages per topic and remove them accordingly if we have too many?
>
> Here is our activemq.xml configuration file if needed.
>
>
> <beans
>   xmlns="http://www.springframework.org/schema/beans"
>   xmlns:amq="http://activemq.apache.org/schema/core"
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>   xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>   http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd">
>
>
>     <bean
>
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>         <property name="locations">
>
> <value>file:${activemq.base}/conf/credentials.properties</value>
>         </property>
>     </bean>
>
>
>     <broker xmlns="http://activemq.apache.org/schema/core"
> brokerName="localhost" dataDirectory="${activemq.base}/data"
> persistent="false" >
>
>
>
>                 <destinationPolicy>
>                                 <policyMap>
>                                                 <policyEntries>
>
> <policyEntry
> topic="SERVER_PONG.>">
>
> <pendingMessageLimitStrategy>
>
> <constantPendingMessageLimitStrategy limit="50" />
>
> </pendingMessageLimitStrategy>
>
> </policyEntry>
>
> <policyEntry
> topic="FROM_STATUS_SERVER.>">
>
> <pendingMessageLimitStrategy>
>
> <constantPendingMessageLimitStrategy limit="5000" />
>
> </pendingMessageLimitStrategy>
>
> </policyEntry>
>
> <policyEntry
> topic=">" producerFlowControl="false" memoryLimit="1 kb">
>
> <pendingSubscriberPolicy>
>
> <vmCursor />
>
> </pendingSubscriberPolicy>
>
> </policyEntry>
>
> <policyEntry
> queue=">" producerFlowControl="false" memoryLimit="1 kb">
>
>
> </policyEntry>
>                                                 </policyEntries>
>                                 </policyMap>
>                 </destinationPolicy>
>
>
>
>         <managementContext>
>             <managementContext createConnector="true"/>
>         </managementContext>
>
>
>
>
>
>         <systemUsage>
>             <systemUsage>
>                 <memoryUsage>
>                     <memoryUsage limit="3072 mb"/>
>                 </memoryUsage>
>                 <storeUsage>
>                     <storeUsage limit="5 gb"/>
>                 </storeUsage>
>                 <tempUsage>
>                     <tempUsage limit="8 gb"/>
>                 </tempUsage>
>             </systemUsage>
>         </systemUsage>
>
>
>         <transportConnectors>
>                         <transportConnector name="openwire"
> uri="tcp://
> 0.0.0.0:60000?transport.keepAlive=true&amp;wireformat.maxInactivityDuration=0&amp;transport.connectionTimeout=0
> "/>
>         </transportConnectors>
>
>     </broker>
>
>
>
> </beans>
>
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Remove-Unacknowledged-Heartbeat-Message-tp4686129.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>