You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Eric X (JIRA)" <ji...@apache.org> on 2015/09/04 15:44:45 UTC

[jira] [Updated] (AMQ-5934) ActiveMQ durable topic subscribers not able to receive new messages after running for awhile

     [ https://issues.apache.org/jira/browse/AMQ-5934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Eric X updated AMQ-5934:
------------------------
    Attachment: activemq.dump

Thanks, Chris!
I attach the thread dump file with this email for your reference. The issue happens in my test environment as well as other stable environment, which has quite stable network environment. Please let me know if you need any other data about this issue.  I look forward to hearing from you soon.
Regards,
-Eric

 


     On Monday, August 24, 2015 9:21 AM, Christopher L. Shannon (JIRA) <ji...@apache.org> wrote:
   

 
    [ https://issues.apache.org/jira/browse/AMQ-5934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14709267#comment-14709267 ] 

Christopher L. Shannon commented on AMQ-5934:
---------------------------------------------

Many times these issues can be tied to a network problem or something in the specific environment where the broker is set up.  That is why I was curious to see what the thread dump looked like to see what your specific clients were doing when they were hung.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)




> ActiveMQ durable topic subscribers not able to receive new messages after running for awhile
> --------------------------------------------------------------------------------------------
>
>                 Key: AMQ-5934
>                 URL: https://issues.apache.org/jira/browse/AMQ-5934
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.10.2, 5.11.1
>         Environment: ActiveMQ 5.10.2 broker and a topic subscriber with ActiveMQ 5.10 client libraries.
>            Reporter: Eric X
>            Priority: Critical
>         Attachments: activemq.dump
>
>
> We have set up an environment which uses ActiveMQ 5.10.2 as broker and a test durable topic subscriber that connects to the broker using tcp protocol.  The client connection string as follows:
> failover:(tcp://host1:61617?keepAlive=true&wireFormat.tightEncodingEnabled=false,tcp://host1:61617?keepAlive=true&wireFormat.tightEncodingEnabled=false)?randomize=false&initialReconnectDelay=500&timeout=2000&useExponentialBackOff=false&jms.watchTopicAdvisories=false
> At first, we started five test clients (durable topic subscribers) with aforementioned connection string and make sure every subscriber is connected to the broker.  After that, we use a test client to publish 1000000 messages to the topic that five clients subscribed.  After running for a few hours, two of these five clients were hung (stopped receiving messages from the broker).  In these two clients, there are 636786 and 93915 messages in pending queue, 1000 messages in dispatched queue.  The other three received all the messages from topic.  We have been able to consistently reproduce the issue.   These stale clients appears connected to the broker but do not receive any new messages after they were hung.
> Can you someone look into this issue?  It is very important for us.  We have tried various configurations including jms.prefetchPolicy.topicPrefetch=1 but none of them are working.
> Here is the activemq conf file (activemq.xml) for this issue.
>  <!--
>     Licensed to the Apache Software Foundation (ASF) under one or more
>     contributor license agreements.  See the NOTICE file distributed with
>     this work for additional information regarding copyright ownership.
>     The ASF licenses this file to You under the Apache License, Version 2.0
>     (the "License"); you may not use this file except in compliance with
>     the License.  You may obtain a copy of the License at
>    
>     http://www.apache.org/licenses/LICENSE-2.0
>    
>     Unless required by applicable law or agreed to in writing, software
>     distributed under the License is distributed on an "AS IS" BASIS,
>     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>     See the License for the specific language governing permissions and
>     limitations under the License.
> -->
> <beans
>   xmlns="http://www.springframework.org/schema/beans"
>   xmlns:amq="http://activemq.apache.org/schema/core"
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>   http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
>     <!-- Allows us to use system properties as variables in this configuration file -->
>     <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>         <property name="locations">
>             <value>file:${activemq.base}/conf/credentials.properties</value> 
>         </property>      
>     </bean> 
>     <!-- 
>         The <broker> element is used to configure the ActiveMQ broker. 
>     -->
>     <broker xmlns="http://activemq.apache.org/schema/core" advisorySupport="false" useJmx="true" brokerName="dev.masterbroker-1" dataDirectory="${activemq.data}" >
>         <!-- 
>             The managementContext is used to configure how ActiveMQ is exposed in 
>             JMX. By default, ActiveMQ uses the MBean server that is started by 
>             the JVM. For more information, see: 
>             
>             http://activemq.apache.org/jmx.html 
>         -->
>         <managementContext>
>             <managementContext createConnector="true" connectorPort="2099"/>
>         </managementContext>
> 	<plugins>
>         <jaasDualAuthenticationPlugin configuration="activemq-ssl-domain"/>
>         <jaasDualAuthenticationPlugin configuration="activemq-domain"/>
>  
>         <!-- <simpleAuthenticationPlugin>
> 	<users>
> 		<authenticationUser username="system" password="manager"
> 			groups="users,admins"/>
> 		<authenticationUser username="user" password="password"
> 			groups="users"/>
> 		<authenticationUser username="guest" password="password" groups="guests"/>
> 	</users>
>       </simpleAuthenticationPlugin>   -->
>  
>       <authorizationPlugin>
>         <map>
>           <authorizationMap>
>             <authorizationEntries>
>               <authorizationEntry queue=">" read="admins,users,guests" write="admins,users,guests" admin="admins" />
>               <authorizationEntry queue="USERS.>" read="users" write="users" admin="users" />
>               <authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
>               
>               <authorizationEntry topic=">" read="admins,users,guests" write="admins,users,guests" admin="admins" />
>               <authorizationEntry topic="USERS.>" read="users" write="users" admin="users" />
>               <authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
>               
>               <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users"/>
>             </authorizationEntries>
>             
>             <tempDestinationAuthorizationEntry>  
>              <tempDestinationAuthorizationEntry read="tempDestinationAdmins" write="tempDestinationAdmins" admin="tempDestinationAdmins"/>
>            </tempDestinationAuthorizationEntry>               
>           </authorizationMap>
>         </map>
>       </authorizationPlugin>
>       <redeliveryPlugin fallbackToDeadLetter="false" sendToDlqIfMaxRetriesExceeded="false">
>               <redeliveryPolicyMap>
>                   <redeliveryPolicyMap>
>                       <redeliveryPolicyEntries>
>                           <!-- a destination specific policy -->
>                           <redeliveryPolicy topic="TP5" maximumRedeliveries="-1" redeliveryDelay="10000" />
>                       </redeliveryPolicyEntries>
>                       <!-- the fallback policy for all other destinations -->
>                       <defaultEntry>
>                           <redeliveryPolicy maximumRedeliveries="4" initialRedeliveryDelay="5000" redeliveryDelay="10000" />
>                       </defaultEntry>
>                   </redeliveryPolicyMap>
>               </redeliveryPolicyMap>
>        </redeliveryPlugin>
> 	</plugins> 
>         <!-- 
>             Configure message persistence for the broker. The default persistence
>             mechanism is the KahaDB store (identified by the kahaDB tag). 
>             For more information, see: 
>             
>             http://activemq.apache.org/persistence.html 
>         -->
>         <!--
>         <persistenceAdapter>
>        	            <kahaDB directory="${activemq.data}/kahadb"
>        	                    ignoreMissingJournalfiles="true"
>        	                    checkForCorruptJournalFiles="true"
>        	                    checksumJournalFiles="true"
>        	                    journalMaxFileLength="256mb"/>
>         </persistenceAdapter>
>         -->
>         <persistenceAdapter>
>                 <mKahaDB directory="${activemq.data}/kahadb">
>                         <filteredPersistenceAdapters>
>                                 <!-- match all queues -->
>                                 <filteredKahaDB queue=">">
>                                         <persistenceAdapter>
>                                                 <kahaDB journalMaxFileLength="64mb"
>                                                         ignoreMissingJournalfiles="true"
>                                                         checkForCorruptJournalFiles="true"
>                                                         checksumJournalFiles="true" />
>                                         </persistenceAdapter>
>                                 </filteredKahaDB>
>                                 <!-- match all destinations -->
>                                 <filteredKahaDB topic=">">
>                                         <persistenceAdapter>
>                                                 <kahaDB journalMaxFileLength="64mb"
>                                                         ignoreMissingJournalfiles="true"
>                                                         checkForCorruptJournalFiles="true"
>                                                         checksumJournalFiles="true" />
>                                         </persistenceAdapter>
>                                 </filteredKahaDB>
>                         </filteredPersistenceAdapters>
>                 </mKahaDB>
>         </persistenceAdapter>
>         <!--
>             It's advisable to turn on producer flow control in the production system
>             The systemUsage controls the maximum amount of space the broker will 
>             use before slowing down producers. For more information, see:
>             
>             http://activemq.apache.org/producer-flow-control.html -->
>               
>            <destinationPolicy>
>             <policyMap>
>               <policyEntries>
>                 <policyEntry topic=">" optimizedDispatch="true" producerFlowControl="false" memoryLimit="200mb" expireMessagesPeriod="300000">
>                     <!-- The constantPendingMessageLimitStrategy is used to prevent
>                          slow topic consumers to block producers and affect other consumers
>                          by limiting the number of messages that are retained
>                          For more information, see:
>                          http://activemq.apache.org/slow-consumer-handling.html
>                     -->
>                   <pendingMessageLimitStrategy>
>                     <constantPendingMessageLimitStrategy limit="1000"/>
>                   </pendingMessageLimitStrategy>
>                 </policyEntry>
>                 <policyEntry queue=">" optimizedDispatch="true" producerFlowControl="false" memoryLimit="100mb">
>                   <!-- Use VM cursor for better latency
>                        For more information, see:
>                        http://activemq.apache.org/message-cursors.html
>                   <pendingQueuePolicy>
>                     <vmQueueCursor/>
>                   </pendingQueuePolicy>
>                   -->
>                 </policyEntry>
>            <policyEntry queue="Que1" producerFlowControl="false" maxPageSize="5000" >
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que2" producerFlowControl="false" maxPageSize="5000" >
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que3" producerFlowControl="false" maxPageSize="5000" >
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>          <policyEntry queue="Que4" producerFlowControl="false" maxPageSize="5000" >
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que5" producerFlowControl="false" maxPageSize="5000" >
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que6" producerFlowControl="false" maxPageSize="5000" >
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que7" producerFlowControl="false" maxPageSize="5000" >
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que8" producerFlowControl="false" maxPageSize="5000" >
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que9" producerFlowControl="false" maxPageSize="5000" >
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que10" producerFlowControl="false" maxPageSize="5000"  expireMessagesPeriod="300000" >
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           
>           <policyEntry topic="TP1" producerFlowControl="false" maxPageSize="5000" >
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP2" producerFlowControl="false" maxPageSize="5000" >
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP3" producerFlowControl="false" maxPageSize="5000" >
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP5" producerFlowControl="false" maxPageSize="5000" >
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP6" producerFlowControl="false" maxPageSize="5000" >
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP7" producerFlowControl="false" maxPageSize="5000" >
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP8" producerFlowControl="false" maxPageSize="5000" >
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP9" producerFlowControl="false" maxPageSize="5000" >
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP10" producerFlowControl="false" maxPageSize="5000" >
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>               </policyEntries>
>             </policyMap>
>         </destinationPolicy>
>     <destinations>
>       <queue physicalName="Que1" />
>       <queue physicalName="Que2" />
>       <queue physicalName="Que3" />
>       <queue physicalName="Que4" />
>       <queue physicalName="Que5" />
>       <queue physicalName="Que6" />
>       <queue physicalName="Que7" />
>       <queue physicalName="Que8" />
>       <queue physicalName="Que9" />
>       <queue physicalName="Que10" />
>       <topic physicalName="TP1" />
>       <topic physicalName="TP2" />
>       <topic physicalName="TP3" />
>       <topic physicalName="TP4" />
>       <topic physicalName="TP5" />
>       <topic physicalName="TP6" />
>       <topic physicalName="TP7" />
>       <topic physicalName="TP8" />
>       <topic physicalName="TP9" />
>       <topic physicalName="TP10" />
>     </destinations>
>  
>         <!--
>             The sslContext can be used to configure broker-specific SSL properties.
>         -->
>         <sslContext>
>             <sslContext keyStore="file:${activemq.base}/conf/broker.ks"
>               keyStorePassword="password" trustStore="file:${activemq.base}/conf/broker.ts"
>               trustStorePassword="password"/>
>         </sslContext>
>       
>         <plugins>
>             <loggingBrokerPlugin logAll="false" logConnectionEvents="false" logSessionEvents="false"/>
>             <timeStampingBrokerPlugin zeroExpirationOverride="0" ttlCeiling="0" futureOnly="true"/>
>             <traceBrokerPathPlugin/>
>         </plugins>
>   
>         <systemUsage>
>             <systemUsage>
>                 <memoryUsage>
>                     <memoryUsage limit="6 gb"/>
>                 </memoryUsage>
>                 <storeUsage>
>                     <storeUsage limit="20 gb" name="foo"/>
>                 </storeUsage>
>                 <tempUsage>
>                     <tempUsage limit="2 gb"/>
>                 </tempUsage>
>             </systemUsage>
>         </systemUsage>
>         <!-- The store and forward broker networks ActiveMQ will listen to -->
>         <networkConnectors>
>             <!-- <networkConnector name="default-nc" uri="multicast://default"/> -->
>         </networkConnectors>
>         <!-- 
>             The transport connectors expose ActiveMQ over a given protocol to
>             clients and other brokers. For more information, see: 
>             
>             http://activemq.apache.org/configuring-transports.html 
>         -->
>         <transportConnectors>
>             <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
>             <transportConnector name="openwire" uri="tcp://localhost:61636?wireFormat.maxInactivityDurationInitalDelay=30000"/>
>             
> 	         <transportConnector name="nio+ssl" uri="nio+ssl://localhost:61639?needClientAuth=true&amp;wireFormat.maxInactivityDurationInitalDelay=30000"/>
>             <transportConnector name="ssl" uri="ssl://localhost:61637?needClientAuth=true&amp;wireFormat.maxInactivityDurationInitalDelay=30000"/>
>         </transportConnectors>
>         <!-- destroy the spring context on shutdown to stop jetty -->
>         <shutdownHooks>
>             <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
>         </shutdownHooks>
>     </broker>
> <!--
>     <commandAgent xmlns="http://activemq.apache.org/schema/core" brokerUrl="vm://asb-jms-01-vm.eng.rr.com" username="system" password="password"/>
> -->
>     <!-- 
>         Uncomment to enable Camel
>         Take a look at activemq-camel.xml for more details
>     <import resource="activemq-camel.xml"/> -->
>     <!-- 
>         Enable web consoles, REST and Ajax APIs and demos
>         Take a look at activemq-jetty.xml for more details -->
>     <import resource="jetty.xml"/> 
>     
> </beans>
>     



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)