You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Eric X (JIRA)" <ji...@apache.org> on 2015/08/19 22:19:45 UTC
[jira] [Created] (AMQ-5934) ActiveMQ durable topic subscribers not
able to receive new messages after running for awhile
Eric X created AMQ-5934:
---------------------------
Summary: ActiveMQ durable topic subscribers not able to receive new messages after running for awhile
Key: AMQ-5934
URL: https://issues.apache.org/jira/browse/AMQ-5934
Project: ActiveMQ
Issue Type: Bug
Affects Versions: 5.10.2
Environment: ActiveMQ 5.10.2 broker and a topic subscriber with ActiveMQ 5.10 client libraries.
Reporter: Eric X
Priority: Critical
We have set up an environment which uses ActiveMQ 5.10.2 as broker and a test durable topic subscriber that connects to the broker using tcp protocol. The client connection string as follows:
failover:(tcp://host1:61617?keepAlive=true&wireFormat.tightEncodingEnabled=false,tcp://host1:61617?keepAlive=true&wireFormat.tightEncodingEnabled=false)?randomize=false&initialReconnectDelay=500&timeout=2000&useExponentialBackOff=false&jms.watchTopicAdvisories=false
At first, we started five test clients with aforementioned connection string and make sure every client is connected to the broker. After that, we use a test client to publish 1000000 messages to the topic that five clients subscribed. After running for a few hours, two of these five clients failed. In these two clients, there are 636786 and 93915 messages in pending queue, 1000 messages in dispatched queue. The other three received all the messages from topic. We have been able to consistently reproduce the issue. Actually, we have reported the same issue in our production environment. These stale clients appears connect to the broker but all these messages going to pending queue after an indeterminate time period.
Here is the activemq conf file for this issue
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.base}/conf/credentials.properties</value>
</property>
</bean>
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" advisorySupport="false" useJmx="true" brokerName="dev.masterbroker-1" dataDirectory="${activemq.data}" >
<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="true" connectorPort="2099"/>
</managementContext>
<plugins>
<jaasDualAuthenticationPlugin configuration="activemq-ssl-domain"/>
<jaasDualAuthenticationPlugin configuration="activemq-domain"/>
<!-- <simpleAuthenticationPlugin>
<users>
<authenticationUser username="system" password="manager"
groups="users,admins"/>
<authenticationUser username="user" password="password"
groups="users"/>
<authenticationUser username="guest" password="password" groups="guests"/>
</users>
</simpleAuthenticationPlugin> -->
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry queue=">" read="admins,users,guests" write="admins,users,guests" admin="admins" />
<authorizationEntry queue="USERS.>" read="users" write="users" admin="users" />
<authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
<authorizationEntry topic=">" read="admins,users,guests" write="admins,users,guests" admin="admins" />
<authorizationEntry topic="USERS.>" read="users" write="users" admin="users" />
<authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
<authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users"/>
</authorizationEntries>
<tempDestinationAuthorizationEntry>
<tempDestinationAuthorizationEntry read="tempDestinationAdmins" write="tempDestinationAdmins" admin="tempDestinationAdmins"/>
</tempDestinationAuthorizationEntry>
</authorizationMap>
</map>
</authorizationPlugin>
<redeliveryPlugin fallbackToDeadLetter="false" sendToDlqIfMaxRetriesExceeded="false">
<redeliveryPolicyMap>
<redeliveryPolicyMap>
<redeliveryPolicyEntries>
<!-- a destination specific policy -->
<redeliveryPolicy topic="TP5" maximumRedeliveries="-1" redeliveryDelay="10000" />
</redeliveryPolicyEntries>
<!-- the fallback policy for all other destinations -->
<defaultEntry>
<redeliveryPolicy maximumRedeliveries="4" initialRedeliveryDelay="5000" redeliveryDelay="10000" />
</defaultEntry>
</redeliveryPolicyMap>
</redeliveryPolicyMap>
</redeliveryPlugin>
</plugins>
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<!--
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"
ignoreMissingJournalfiles="true"
checkForCorruptJournalFiles="true"
checksumJournalFiles="true"
journalMaxFileLength="256mb"/>
</persistenceAdapter>
-->
<persistenceAdapter>
<mKahaDB directory="${activemq.data}/kahadb">
<filteredPersistenceAdapters>
<!-- match all queues -->
<filteredKahaDB queue=">">
<persistenceAdapter>
<kahaDB journalMaxFileLength="64mb"
ignoreMissingJournalfiles="true"
checkForCorruptJournalFiles="true"
checksumJournalFiles="true" />
</persistenceAdapter>
</filteredKahaDB>
<!-- match all destinations -->
<filteredKahaDB topic=">">
<persistenceAdapter>
<kahaDB journalMaxFileLength="64mb"
ignoreMissingJournalfiles="true"
checkForCorruptJournalFiles="true"
checksumJournalFiles="true" />
</persistenceAdapter>
</filteredKahaDB>
</filteredPersistenceAdapters>
</mKahaDB>
</persistenceAdapter>
<!--
It's advisable to turn on producer flow control in the production system
The systemUsage controls the maximum amount of space the broker will
use before slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" optimizedDispatch="true" producerFlowControl="false" memoryLimit="200mb" expireMessagesPeriod="300000">
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
http://activemq.apache.org/slow-consumer-handling.html
-->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
<policyEntry queue=">" optimizedDispatch="true" producerFlowControl="false" memoryLimit="100mb">
<!-- Use VM cursor for better latency
For more information, see:
http://activemq.apache.org/message-cursors.html
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
-->
</policyEntry>
<policyEntry queue="Que1" producerFlowControl="false" maxPageSize="5000" >
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry queue="Que2" producerFlowControl="false" maxPageSize="5000" >
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry queue="Que3" producerFlowControl="false" maxPageSize="5000" >
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry queue="Que4" producerFlowControl="false" maxPageSize="5000" >
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry queue="Que5" producerFlowControl="false" maxPageSize="5000" >
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry queue="Que6" producerFlowControl="false" maxPageSize="5000" >
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry queue="Que7" producerFlowControl="false" maxPageSize="5000" >
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry queue="Que8" producerFlowControl="false" maxPageSize="5000" >
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry queue="Que9" producerFlowControl="false" maxPageSize="5000" >
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry queue="Que10" producerFlowControl="false" maxPageSize="5000" expireMessagesPeriod="300000" >
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry topic="TP1" producerFlowControl="false" maxPageSize="5000" >
<dispatchPolicy>
<roundRobinDispatchPolicy />
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy />
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry topic="TP2" producerFlowControl="false" maxPageSize="5000" >
<dispatchPolicy>
<roundRobinDispatchPolicy />
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy />
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry topic="TP3" producerFlowControl="false" maxPageSize="5000" >
<dispatchPolicy>
<roundRobinDispatchPolicy />
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy />
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry topic="TP5" producerFlowControl="false" maxPageSize="5000" >
<dispatchPolicy>
<roundRobinDispatchPolicy />
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy />
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry topic="TP6" producerFlowControl="false" maxPageSize="5000" >
<dispatchPolicy>
<roundRobinDispatchPolicy />
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy />
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry topic="TP7" producerFlowControl="false" maxPageSize="5000" >
<dispatchPolicy>
<roundRobinDispatchPolicy />
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy />
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry topic="TP8" producerFlowControl="false" maxPageSize="5000" >
<dispatchPolicy>
<roundRobinDispatchPolicy />
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy />
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry topic="TP9" producerFlowControl="false" maxPageSize="5000" >
<dispatchPolicy>
<roundRobinDispatchPolicy />
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy />
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry topic="TP10" producerFlowControl="false" maxPageSize="5000" >
<dispatchPolicy>
<roundRobinDispatchPolicy />
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy />
</subscriptionRecoveryPolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<destinations>
<queue physicalName="Que1" />
<queue physicalName="Que2" />
<queue physicalName="Que3" />
<queue physicalName="Que4" />
<queue physicalName="Que5" />
<queue physicalName="Que6" />
<queue physicalName="Que7" />
<queue physicalName="Que8" />
<queue physicalName="Que9" />
<queue physicalName="Que10" />
<topic physicalName="TP1" />
<topic physicalName="TP2" />
<topic physicalName="TP3" />
<topic physicalName="TP4" />
<topic physicalName="TP5" />
<topic physicalName="TP6" />
<topic physicalName="TP7" />
<topic physicalName="TP8" />
<topic physicalName="TP9" />
<topic physicalName="TP10" />
</destinations>
<!--
The sslContext can be used to configure broker-specific SSL properties.
-->
<sslContext>
<sslContext keyStore="file:${activemq.base}/conf/broker.ks"
keyStorePassword="password" trustStore="file:${activemq.base}/conf/broker.ts"
trustStorePassword="password"/>
</sslContext>
<plugins>
<loggingBrokerPlugin logAll="false" logConnectionEvents="false" logSessionEvents="false"/>
<timeStampingBrokerPlugin zeroExpirationOverride="0" ttlCeiling="0" futureOnly="true"/>
<traceBrokerPathPlugin/>
</plugins>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="6 gb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="20 gb" name="foo"/>
</storeUsage>
<tempUsage>
<tempUsage limit="2 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<!-- The store and forward broker networks ActiveMQ will listen to -->
<networkConnectors>
<!-- <networkConnector name="default-nc" uri="multicast://default"/> -->
</networkConnectors>
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://localhost:61636?wireFormat.maxInactivityDurationInitalDelay=30000"/>
<transportConnector name="nio+ssl" uri="nio+ssl://localhost:61639?needClientAuth=true&wireFormat.maxInactivityDurationInitalDelay=30000"/>
<transportConnector name="ssl" uri="ssl://localhost:61637?needClientAuth=true&wireFormat.maxInactivityDurationInitalDelay=30000"/>
</transportConnectors>
<!-- destroy the spring context on shutdown to stop jetty -->
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
<!--
<commandAgent xmlns="http://activemq.apache.org/schema/core" brokerUrl="vm://asb-jms-01-vm.eng.rr.com" username="system" password="password"/>
-->
<!--
Uncomment to enable Camel
Take a look at activemq-camel.xml for more details
<import resource="activemq-camel.xml"/> -->
<!--
Enable web consoles, REST and Ajax APIs and demos
Take a look at activemq-jetty.xml for more details -->
<import resource="jetty.xml"/>
</beans>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)