You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Michael La Budde <pa...@hotmail.com> on 2011/03/07 19:56:41 UTC

Best Practices for Reliability, Availability, and (most importantly) no Duplicate Messages

All:
I have done quite a bit of reading - ActiveMQ documentation, the ActiveMQ In Action book (MEAP), the ActiveMQ forum, as well as many other internet postings.
We are using ActiveMQ 5.3.1 in production and recently experienced a case where 849 messages were duplicated within a 70 second period.
My environment:
linux; JDK 1.6; ActiveMQ 5.3.1; Camel 2.4
My current configuration is master/slave shared nothing. Here's the master config:
<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at


    http://www.apache.org/licenses/LICENSE-2.0


    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">


    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.base}/conf/credentials.properties</value>
        </property>
    </bean>


    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core"
			brokerName="emsp-primary"
			dataDirectory="${activemq.base}/data"
			destroyApplicationContextOnStop="true" >


        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="true" memoryLimit="5mb" useCache="false">
                  <pendingSubscriberPolicy>
                    <vmCursor />
                  </pendingSubscriberPolicy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="true" memoryLimit="5mb" useCache="false">
                  <!-- Use VM cursor for better latency
                       For more information, see:


                       http://activemq.apache.org/message-cursors.html


                  <pendingQueuePolicy>
                    <vmQueueCursor/>
                  </pendingQueuePolicy>
                  -->
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>


		<networkConnectors>
		</networkConnectors>


		<persistenceAdapter>
			<kahaDB directory="${activemq.base}/data/kahadb"
					indexWriteBatchSize="100"
					journalMaxFileLength="33554432"
					enableIndexWriteAsync="true"
					enableJournalDiskSyncs="false" />
		</persistenceAdapter>


		<systemUsage>
			<systemUsage>
				<memoryUsage>
					<memoryUsage limit="512 mb"/>
				</memoryUsage>
				<storeUsage>
					<storeUsage limit="2 gb" name="data-store"/>
				</storeUsage>
				<tempUsage>
					<tempUsage limit="100 mb"/>
				</tempUsage>
			</systemUsage>
		</systemUsage>


        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
        </transportConnectors>


    </broker>


    <import resource="jetty.xml"/>


</beans>
And the Failover config:
<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at


    http://www.apache.org/licenses/LICENSE-2.0


    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">


    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.base}/conf/credentials.properties</value>
        </property>
    </bean>


    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core"
			brokerName="emsp-failover"
			dataDirectory="${activemq.base}/data"
			destroyApplicationContextOnStop="true" >


        <!--
			For better performances use VM cursor and small memory limit.
			For more information, see:


            http://activemq.apache.org/message-cursors.html


            Also, if your producer is "hanging", it's probably due to producer flow control.
            For more information, see:
            http://activemq.apache.org/producer-flow-control.html
        -->


        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="true" memoryLimit="5mb" useCache="false">
                  <pendingSubscriberPolicy>
                    <vmCursor />
                  </pendingSubscriberPolicy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="true" memoryLimit="5mb" useCache="false">
                  <!-- Use VM cursor for better latency
                       For more information, see:


                       http://activemq.apache.org/message-cursors.html


                  <pendingQueuePolicy>
                    <vmQueueCursor/>
                  </pendingQueuePolicy>
                  -->
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:


            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>


		<!-- The store and forward broker networks ActiveMQ will listen to -->
		<networkConnectors>
			<networkConnector name="bridge-to-primary"
						   uri="static:(tcp://10.9.8.7:61616)"
						   dynamicOnly="false"
						   duplex="true"
						   prefetchSize="1">
				<excludedDestinations>
					<queue physicalName="Consumer.*.VirtualTopic.>"/>
				</excludedDestinations>
				<staticallyIncludedDestinations>
					<topic physicalName="VirtualTopic.MyCompany.SharedServices.DirectoryServices"/>
				</staticallyIncludedDestinations>
			</networkConnector>
		</networkConnectors>


		<persistenceAdapter>
			<kahaDB directory="${activemq.base}/data/kahadb"
					indexWriteBatchSize="100"
					journalMaxFileLength="33554432"
					enableIndexWriteAsync="true"
					enableJournalDiskSyncs="false" />
		</persistenceAdapter>


		<systemUsage>
			<systemUsage>
				<memoryUsage>
					<memoryUsage limit="512 mb"/>
				</memoryUsage>
				<storeUsage>
					<storeUsage limit="2 gb" name="data-store"/>
				</storeUsage>
				<tempUsage>
					<tempUsage limit="100 mb"/>
				</tempUsage>
			</systemUsage>
		</systemUsage>


        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
        </transportConnectors>


    </broker>


    <import resource="jetty.xml"/>


</beans>
Finally, here are portions of the ActiveMQ log files:
Log Messages from Primary
2011-02-16 05:18:39,197 | WARN  | Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: MessageAck {commandId = 1166, responseRequired = false, ackType = 2, consumerId = ID:prod-interop-mq2.mycompany.com-52485-1296046376681-0:4:3:1, firstMessageId = ID:prod-interop-mq1.mycompany.com-48185-1296047391209-0:2:3:1540:1, lastMessageId = ID:prod-interop-mq1.mycompany.com-48185-1296047391209-0:2:3:1540:1, destination = queue://MyCompany.CAM.Import, transactionId = null, messageCount = 1} | org.apache.activemq.broker.region.PrefetchSubscription | ActiveMQ Transport: tcp:///10.9.8.7:371512011-02-16 05:18:45,861 | INFO  | Network connection between vm://emsp-failover#0 and tcp:///10.9.8.7:61616 shutdown due to a local error: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: /10.9.8.7:61616 | org.apache.activemq.network.DemandForwardingBridge | InactivityMonitor Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@e757372011-02-16 05:18:45,905 | INFO  | Connector vm://emsp-failover Stopped | org.apache.activemq.broker.TransportConnector | InactivityMonitor Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@e757372011-02-16 05:18:45,905 | INFO  | emsp-failover bridge to emsp-primary stopped | org.apache.activemq.network.DemandForwardingBridge | InactivityMonitor Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@e757372011-02-16 05:18:45,908 | INFO  | Establishing network connection from vm://emsp-failover to tcp://10.9.8.7:61616 | org.apache.activemq.network.DiscoveryNetworkConnector | Simple Discovery Agent: java.util.concurrent.ThreadPoolExecutor$Worker@504fe42011-02-16 05:18:45,911 | INFO  | Connector vm://emsp-failover Started | org.apache.activemq.broker.TransportConnector | Simple Discovery Agent: java.util.concurrent.ThreadPoolExecutor$Worker@504fe42011-02-16 05:19:30,738 | INFO  | Usage Manager Memory Limit reached on queue://MyCompany.CAM.Import. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info | org.apache.activemq.broker.region.Queue | ActiveMQ Transport: tcp:///10.9.8.8:446342011-02-16 05:19:38,877 | INFO  | Network connection between vm://emsp-failover#6 and tcp:///10.9.8.7:61616(emsp-primary) has been established. | org.apache.activemq.network.DemandForwardingBridge | StartLocalBridge: localBroker=vm://emsp-failover#6

Log Messages from Failover
2011-02-16 05:19:30,361 | WARN  | Caught an exception processing local command | org.apache.activemq.network.DemandForwardingBridge | ActiveMQ Connection Dispatcher: vm://emsp-primary#6org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: /10.9.8.8:49724	at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:235)	at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:83)	at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)	at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)	at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)	at org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(DemandForwardingBridgeSupport.java:702)	at org.apache.activemq.network.DemandForwardingBridgeSupport$1.onCommand(DemandForwardingBridgeSupport.java:158)	at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)	at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)	at org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:118)	at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)	at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)	at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1205)	at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:790)	at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:826)	at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98)	at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)2011-02-16 05:19:30,334 | WARN  | Network connection between vm://emsp-primary#6 and tcp:///10.9.8.8:49724 shutdown due to a remote error: java.net.SocketException: Broken pipe | org.apache.activemq.network.DemandForwardingBridge | InactivityMonitor Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@9ec3322011-02-16 05:19:28,418 | WARN  | KahaDB PageFile flush: 6 queued writes, latch wait took 732 | org.apache.kahadb.page.PageFile | ActiveMQ Journal Checkpoint Worker2011-02-16 05:19:27,385 | INFO  | Slow KahaDB access: Journal append took: 411 ms, Index update took 2368 ms | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Transport: tcp:///10.9.8.7:608992011-02-16 05:19:30,915 | INFO  | Network connection between vm://emsp-primary#6 and tcp:///10.9.8.8:49724 shutdown due to a local error: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: /10.9.8.8:49724 | org.apache.activemq.network.DemandForwardingBridge | ActiveMQ Connection Dispatcher: vm://emsp-primary#62011-02-16 05:19:32,153 | INFO  | Slow KahaDB access: cleanup took 9200 | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint Worker2011-02-16 05:19:33,470 | INFO  | Slow KahaDB access: Journal append took: 0 ms, Index update took 7002 ms | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Transport: tcp:///10.9.8.8:477082011-02-16 05:19:33,887 | WARN  | KahaDB PageFile flush: 4 queued writes, latch wait took 410 | org.apache.kahadb.page.PageFile | ActiveMQ Journal Checkpoint Worker2011-02-16 05:19:38,048 | INFO  | Connector vm://emsp-primary Stopped | org.apache.activemq.broker.TransportConnector | ActiveMQ Task2011-02-16 05:19:38,068 | INFO  | emsp-primary bridge to emsp-failover stopped | org.apache.activemq.network.DemandForwardingBridge | ActiveMQ Task2011-02-16 05:19:38,048 | INFO  | The connection to '/10.9.8.8:49724' is taking a long time to shutdown. | org.apache.activemq.broker.TransportConnection | NetworkBridge2011-02-16 05:19:52,288 | INFO  | Connector vm://emsp-primary Started | org.apache.activemq.broker.TransportConnector | ActiveMQ Transport: tcp:///10.9.8.8:386262011-02-16 05:19:55,082 | INFO  | Slow KahaDB access: cleanup took 1839 | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint Worker2011-02-16 05:19:55,554 | INFO  | Created Duplex Bridge back to emsp-failover | org.apache.activemq.broker.TransportConnection | ActiveMQ Transport: tcp:///10.9.8.8:386262011-02-16 05:19:55,641 | INFO  | Network connection between vm://emsp-primary#8 and tcp:///10.9.8.8:38626(emsp-failover) has been established. | org.apache.activemq.network.DemandForwardingBridge | StartLocalBridge: localBroker=vm://emsp-primary#82011-02-16 05:19:59,225 | INFO  | Slow KahaDB access: cleanup took 610 | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint Worker
I should point out that all clients connect using the failover transport with randomize=false. In other words our second broker is only there to provide failover not load-balancing.
The reliability and availability (i.e. failover) of ActiveMQ have been outstanding. However, producing (and then processing) 849 duplicate messages is unacceptable. 
Is the (occasional) production of duplicate messages known and customary behavior?Is there something in my configuration that can be changed to prevent duplicate messages?
Would using a "shared database" strategy prevent this? ( because only one broker can be active at a time; also good would be a much simpler config - i.e. all brokers can use the same configuration file)
Any other suggestions/advice would be quite welcome!
TIA,
Mike L. (aka patzerbud)