You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Eric X (JIRA)" <ji...@apache.org> on 2015/08/19 22:19:45 UTC

[jira] [Created] (AMQ-5934) ActiveMQ durable topic subscribers not able to receive new messages after running for awhile

Eric X created AMQ-5934:
---------------------------

             Summary: ActiveMQ durable topic subscribers not able to receive new messages after running for awhile
                 Key: AMQ-5934
                 URL: https://issues.apache.org/jira/browse/AMQ-5934
             Project: ActiveMQ
          Issue Type: Bug
    Affects Versions: 5.10.2
         Environment: ActiveMQ 5.10.2 broker and a topic subscriber with ActiveMQ 5.10 client libraries.
            Reporter: Eric X
            Priority: Critical


We have set up an environment which uses ActiveMQ 5.10.2 as broker and a test durable topic subscriber that connects to the broker using tcp protocol.  The client connection string as follows:
failover:(tcp://host1:61617?keepAlive=true&wireFormat.tightEncodingEnabled=false,tcp://host1:61617?keepAlive=true&wireFormat.tightEncodingEnabled=false)?randomize=false&initialReconnectDelay=500&timeout=2000&useExponentialBackOff=false&jms.watchTopicAdvisories=false

At first, we started five test clients with aforementioned connection string and make sure every client is connected to the broker.  After that, we use a test client to publish 1000000 messages to the topic that five clients subscribed.  After running for a few hours, two of these five clients failed.  In these two clients, there are 636786 and 93915 messages in pending queue, 1000 messages in dispatched queue.  The other three received all the messages from topic.  We have been able to consistently reproduce the issue.  Actually, we have reported the same issue in our production environment.  These stale clients appears connect to the broker but all these messages going to pending queue after an indeterminate time period.  

Here is the activemq conf file for this issue


 <!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at
   
    http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.base}/conf/credentials.properties</value> 
        </property>      
    </bean> 

    <!-- 
        The <broker> element is used to configure the ActiveMQ broker. 
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" advisorySupport="false" useJmx="true" brokerName="dev.masterbroker-1" dataDirectory="${activemq.data}" >

        <!-- 
            The managementContext is used to configure how ActiveMQ is exposed in 
            JMX. By default, ActiveMQ uses the MBean server that is started by 
            the JVM. For more information, see: 
            
            http://activemq.apache.org/jmx.html 
        -->
        <managementContext>
            <managementContext createConnector="true" connectorPort="2099"/>
        </managementContext>

	<plugins>


        <jaasDualAuthenticationPlugin configuration="activemq-ssl-domain"/>
        <jaasDualAuthenticationPlugin configuration="activemq-domain"/>
 
        <!-- <simpleAuthenticationPlugin>
	<users>
		<authenticationUser username="system" password="manager"
			groups="users,admins"/>
		<authenticationUser username="user" password="password"
			groups="users"/>
		<authenticationUser username="guest" password="password" groups="guests"/>
	</users>
      </simpleAuthenticationPlugin>   -->
 
      <authorizationPlugin>
        <map>
          <authorizationMap>
            <authorizationEntries>
              <authorizationEntry queue=">" read="admins,users,guests" write="admins,users,guests" admin="admins" />
              <authorizationEntry queue="USERS.>" read="users" write="users" admin="users" />
              <authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
              
              <authorizationEntry topic=">" read="admins,users,guests" write="admins,users,guests" admin="admins" />
              <authorizationEntry topic="USERS.>" read="users" write="users" admin="users" />
              <authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
              
              <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users"/>
            </authorizationEntries>
            
            <tempDestinationAuthorizationEntry>  
             <tempDestinationAuthorizationEntry read="tempDestinationAdmins" write="tempDestinationAdmins" admin="tempDestinationAdmins"/>
           </tempDestinationAuthorizationEntry>               
          </authorizationMap>
        </map>
      </authorizationPlugin>

      <redeliveryPlugin fallbackToDeadLetter="false" sendToDlqIfMaxRetriesExceeded="false">
              <redeliveryPolicyMap>
                  <redeliveryPolicyMap>
                      <redeliveryPolicyEntries>
                          <!-- a destination specific policy -->
                          <redeliveryPolicy topic="TP5" maximumRedeliveries="-1" redeliveryDelay="10000" />
                      </redeliveryPolicyEntries>
                      <!-- the fallback policy for all other destinations -->
                      <defaultEntry>
                          <redeliveryPolicy maximumRedeliveries="4" initialRedeliveryDelay="5000" redeliveryDelay="10000" />
                      </defaultEntry>
                  </redeliveryPolicyMap>
              </redeliveryPolicyMap>
       </redeliveryPlugin>

	</plugins> 

        <!-- 
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag). 
            For more information, see: 
            
            http://activemq.apache.org/persistence.html 
        -->
        <!--
        <persistenceAdapter>
       	            <kahaDB directory="${activemq.data}/kahadb"
       	                    ignoreMissingJournalfiles="true"
       	                    checkForCorruptJournalFiles="true"
       	                    checksumJournalFiles="true"
       	                    journalMaxFileLength="256mb"/>
        </persistenceAdapter>
        -->

        <persistenceAdapter>
                <mKahaDB directory="${activemq.data}/kahadb">

                        <filteredPersistenceAdapters>
                                <!-- match all queues -->
                                <filteredKahaDB queue=">">
                                        <persistenceAdapter>
                                                <kahaDB journalMaxFileLength="64mb"
                                                        ignoreMissingJournalfiles="true"
                                                        checkForCorruptJournalFiles="true"
                                                        checksumJournalFiles="true" />
                                        </persistenceAdapter>
                                </filteredKahaDB>

                                <!-- match all destinations -->
                                <filteredKahaDB topic=">">
                                        <persistenceAdapter>
                                                <kahaDB journalMaxFileLength="64mb"
                                                        ignoreMissingJournalfiles="true"
                                                        checkForCorruptJournalFiles="true"
                                                        checksumJournalFiles="true" />
                                        </persistenceAdapter>
                                </filteredKahaDB>
                        </filteredPersistenceAdapters>
                </mKahaDB>
        </persistenceAdapter>

        <!--
            It's advisable to turn on producer flow control in the production system

            The systemUsage controls the maximum amount of space the broker will 
            use before slowing down producers. For more information, see:
            
            http://activemq.apache.org/producer-flow-control.html -->
              
           <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" optimizedDispatch="true" producerFlowControl="false" memoryLimit="200mb" expireMessagesPeriod="300000">
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
                <policyEntry queue=">" optimizedDispatch="true" producerFlowControl="false" memoryLimit="100mb">
                  <!-- Use VM cursor for better latency
                       For more information, see:

                       http://activemq.apache.org/message-cursors.html

                  <pendingQueuePolicy>
                    <vmQueueCursor/>
                  </pendingQueuePolicy>
                  -->
                </policyEntry>


           <policyEntry queue="Que1" producerFlowControl="false" maxPageSize="5000" >
            <dispatchPolicy>
              <strictOrderDispatchPolicy/>
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy/>
            </subscriptionRecoveryPolicy>
          </policyEntry>

          <policyEntry queue="Que2" producerFlowControl="false" maxPageSize="5000" >
            <dispatchPolicy>
              <strictOrderDispatchPolicy/>
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy/>
            </subscriptionRecoveryPolicy>
          </policyEntry>

          <policyEntry queue="Que3" producerFlowControl="false" maxPageSize="5000" >
            <dispatchPolicy>
              <strictOrderDispatchPolicy/>
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy/>
            </subscriptionRecoveryPolicy>
          </policyEntry>

         <policyEntry queue="Que4" producerFlowControl="false" maxPageSize="5000" >
            <dispatchPolicy>
              <strictOrderDispatchPolicy/>
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy/>
            </subscriptionRecoveryPolicy>
          </policyEntry>

          <policyEntry queue="Que5" producerFlowControl="false" maxPageSize="5000" >
            <dispatchPolicy>
              <strictOrderDispatchPolicy/>
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy/>
            </subscriptionRecoveryPolicy>
          </policyEntry>

          <policyEntry queue="Que6" producerFlowControl="false" maxPageSize="5000" >
            <dispatchPolicy>
              <strictOrderDispatchPolicy/>
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy/>
            </subscriptionRecoveryPolicy>
          </policyEntry>


          <policyEntry queue="Que7" producerFlowControl="false" maxPageSize="5000" >
            <dispatchPolicy>
              <strictOrderDispatchPolicy/>
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy/>
            </subscriptionRecoveryPolicy>
          </policyEntry>

          <policyEntry queue="Que8" producerFlowControl="false" maxPageSize="5000" >
            <dispatchPolicy>
              <strictOrderDispatchPolicy/>
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy/>
            </subscriptionRecoveryPolicy>
          </policyEntry>

          <policyEntry queue="Que9" producerFlowControl="false" maxPageSize="5000" >
            <dispatchPolicy>
              <strictOrderDispatchPolicy/>
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy/>
            </subscriptionRecoveryPolicy>
          </policyEntry>

          <policyEntry queue="Que10" producerFlowControl="false" maxPageSize="5000"  expireMessagesPeriod="300000" >
            <dispatchPolicy>
              <strictOrderDispatchPolicy/>
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy/>
            </subscriptionRecoveryPolicy>
          </policyEntry>

          



          <policyEntry topic="TP1" producerFlowControl="false" maxPageSize="5000" >
            <dispatchPolicy>
              <roundRobinDispatchPolicy />
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy />
            </subscriptionRecoveryPolicy>
          </policyEntry>

          <policyEntry topic="TP2" producerFlowControl="false" maxPageSize="5000" >
            <dispatchPolicy>
              <roundRobinDispatchPolicy />
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy />
            </subscriptionRecoveryPolicy>
          </policyEntry>

          <policyEntry topic="TP3" producerFlowControl="false" maxPageSize="5000" >
            <dispatchPolicy>
              <roundRobinDispatchPolicy />
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy />
            </subscriptionRecoveryPolicy>
          </policyEntry>

          <policyEntry topic="TP5" producerFlowControl="false" maxPageSize="5000" >
            <dispatchPolicy>
              <roundRobinDispatchPolicy />
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy />
            </subscriptionRecoveryPolicy>
          </policyEntry>

          <policyEntry topic="TP6" producerFlowControl="false" maxPageSize="5000" >
            <dispatchPolicy>
              <roundRobinDispatchPolicy />
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy />
            </subscriptionRecoveryPolicy>
          </policyEntry>

          <policyEntry topic="TP7" producerFlowControl="false" maxPageSize="5000" >
            <dispatchPolicy>
              <roundRobinDispatchPolicy />
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy />
            </subscriptionRecoveryPolicy>
          </policyEntry>

          <policyEntry topic="TP8" producerFlowControl="false" maxPageSize="5000" >
            <dispatchPolicy>
              <roundRobinDispatchPolicy />
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy />
            </subscriptionRecoveryPolicy>
          </policyEntry>


          <policyEntry topic="TP9" producerFlowControl="false" maxPageSize="5000" >
            <dispatchPolicy>
              <roundRobinDispatchPolicy />
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy />
            </subscriptionRecoveryPolicy>
          </policyEntry>

          <policyEntry topic="TP10" producerFlowControl="false" maxPageSize="5000" >
            <dispatchPolicy>
              <roundRobinDispatchPolicy />
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy />
            </subscriptionRecoveryPolicy>
          </policyEntry>

              </policyEntries>
            </policyMap>
        </destinationPolicy>


    <destinations>

      <queue physicalName="Que1" />
      <queue physicalName="Que2" />
      <queue physicalName="Que3" />
      <queue physicalName="Que4" />
      <queue physicalName="Que5" />
      <queue physicalName="Que6" />
      <queue physicalName="Que7" />
      <queue physicalName="Que8" />
      <queue physicalName="Que9" />
      <queue physicalName="Que10" />
      <topic physicalName="TP1" />
      <topic physicalName="TP2" />
      <topic physicalName="TP3" />
      <topic physicalName="TP4" />
      <topic physicalName="TP5" />
      <topic physicalName="TP6" />
      <topic physicalName="TP7" />
      <topic physicalName="TP8" />
      <topic physicalName="TP9" />
      <topic physicalName="TP10" />
    </destinations>

 
        <!--
            The sslContext can be used to configure broker-specific SSL properties.
        -->
        <sslContext>
            <sslContext keyStore="file:${activemq.base}/conf/broker.ks"
              keyStorePassword="password" trustStore="file:${activemq.base}/conf/broker.ts"
              trustStorePassword="password"/>
        </sslContext>
      
        <plugins>
            <loggingBrokerPlugin logAll="false" logConnectionEvents="false" logSessionEvents="false"/>
            <timeStampingBrokerPlugin zeroExpirationOverride="0" ttlCeiling="0" futureOnly="true"/>
            <traceBrokerPathPlugin/>
        </plugins>
  
        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="6 gb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="20 gb" name="foo"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="2 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!-- The store and forward broker networks ActiveMQ will listen to -->
        <networkConnectors>
            <!-- <networkConnector name="default-nc" uri="multicast://default"/> -->
        </networkConnectors>


        <!-- 
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see: 
            
            http://activemq.apache.org/configuring-transports.html 
        -->

        <transportConnectors>

            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->

            <transportConnector name="openwire" uri="tcp://localhost:61636?wireFormat.maxInactivityDurationInitalDelay=30000"/>
            
	         <transportConnector name="nio+ssl" uri="nio+ssl://localhost:61639?needClientAuth=true&amp;wireFormat.maxInactivityDurationInitalDelay=30000"/>

            <transportConnector name="ssl" uri="ssl://localhost:61637?needClientAuth=true&amp;wireFormat.maxInactivityDurationInitalDelay=30000"/>

        </transportConnectors>

        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>


    </broker>
<!--
    <commandAgent xmlns="http://activemq.apache.org/schema/core" brokerUrl="vm://asb-jms-01-vm.eng.rr.com" username="system" password="password"/>
-->

    <!-- 
        Uncomment to enable Camel
        Take a look at activemq-camel.xml for more details
    <import resource="activemq-camel.xml"/> -->

    <!-- 
        Enable web consoles, REST and Ajax APIs and demos
        Take a look at activemq-jetty.xml for more details -->
    <import resource="jetty.xml"/> 
    
</beans>
    




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)