You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Timothy Bish (JIRA)" <ji...@apache.org> on 2012/07/10 16:54:34 UTC
[jira] [Commented] (AMQCPP-413) Producer connection that causes
broker to reach its memory/disk limits doesn't get the 'all full' exception
even though the broker is configured to send it for Producer Flow Control.
[ https://issues.apache.org/jira/browse/AMQCPP-413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13410399#comment-13410399 ]
Timothy Bish commented on AMQCPP-413:
-------------------------------------
Please attach your broker config so this can be tested using your specific configuration.
> Producer connection that causes broker to reach its memory/disk limits doesn't get the 'all full' exception even though the broker is configured to send it for Producer Flow Control.
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: AMQCPP-413
> URL: https://issues.apache.org/jira/browse/AMQCPP-413
> Project: ActiveMQ C++ Client
> Issue Type: Bug
> Components: CMS Impl
> Affects Versions: 3.4.0, 3.4.4
> Environment: Linux
> Reporter: John Rocha
> Assignee: Timothy Bish
> Priority: Critical
> Labels: cpp, producer_flow_control
>
> Producer connection that cause the broker to reach its memory/disk limits
> doesn't get the 'all full' exception even though the broker is configured to
> send it for Producer Flow Control.
> +Scenario #1+
> # Delete the broker data directory\\
> \\
> # Start the broker, that sends an exception if no space\\
> \\
> # DO *+NOT+* START A CONUMSER.\\
> \\
> # Run a producer that does synchronous sends, and has the default sendTimeot of zero(0), it uses one connection, and enters a loop that just sends messages.\\
> \\
> After awhile the producer will lock up and never recovers.\\
> \\
> # Start another producer in another window.\\
> \\
> It immediate fails with a 'broker full' exception.
> +Scenario #2+
> # Delete the broker data directory\\
> \\
> # Start the broker, that sends an exception if no space\\
> \\
> # DO *+NOT+* START A CONUMSER.\\
> \\
> # Run a producer that does synchronous sends, and has the sendTimeot of 500 ms, it uses one connection, and enters a loop that just sends messages.\\
> \\
> After awhile the producer will cause the broker to reach it's limit. And then the send method will start timing out. It never gets a 'broker full' exception.\\
> \\
> # Start another producer in another window.\\
> \\
> It immediate fails with a 'broker full' exception
> \\
> \\
> ----
> \\
> \\
> {noformat}
> ######################################################################
> ## SCENARIO #1
> ######################################################################
> ##
> ## Restart our tomcat service which restarts the AMQ Broker and view the
> ## directory size.
> ##
> root@psbu-jrr-lnx:# /usr/BWhttpd/bin/init_tomcat stop; \rm -rf /usr/BWhttpd/tomcat/activemq-data; /usr/BWhttpd/bin/init_tomcat start; du -sh /usr/BWhttpd/tomcat/activemq-data
> Stopping tomcat ... done
> Killing tomcat ... done.
> Starting tomcat ... done
> 44K /usr/BWhttpd/tomcat/activemq-data
> root@psbu-jrr-lnx:#
> ##
> ## View the activemq.xml configuration file used for startint active MQ
> ##
> root@psbu-jrr-lnx:# cat /usr/BWhttpd/tomcat/webapps/amqbroker/WEB-INF/classes/conf/activemq.xml
> <?xml version="1.0" encoding="UTF-8"?>
> <beans xmlns="http://www.springframework.org/schema/beans"
> xmlns:amq="http://activemq.apache.org/schema/core"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
> http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core-5.3.2.xsd"
> default-autowire="byName">
> <!-- Allows to use system properties as variables in this configuration file -->
> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
> </bean>
> <broker xmlns="http://activemq.apache.org/schema/core"
> brokerName="localhost"
> advisorySupport="true"
> dataDirectory="${catalina.home}/activemq-data"
> useJmx="false"
> useShutdownHook="false">
> <!-- Destination specific policies using destination names or wildcards
> -->
> <destinationPolicy>
> <policyMap>
> <policyEntries>
> <policyEntry queue=">" memoryLimit="5mb" />
> <policyEntry topic=">" memoryLimit="5mb" />
> </policyEntries>
> </policyMap>
> </destinationPolicy>
> <systemUsage>
> <systemUsage sendFailIfNoSpace="true">
> <memoryUsage>
> <memoryUsage limit="5 mb" />
> </memoryUsage>
> <storeUsage>
> <storeUsage limit="4 mb" />
> </storeUsage>
> <tempUsage>
> <tempUsage limit="1 mb" />
> </tempUsage>
> </systemUsage>
> </systemUsage>
> <!-- The transport connectors ActiveMQ will listen to -->
> <transportConnectors>
> <transportConnector name="tcp"
> uri="tcp://0.0.0.0:61616?wireFormat.maxInactivityDuration=0" />
> </transportConnectors>
> </broker>
> </beans>
> ##
> ## View the runtime environment to validate the library is 3.4.4, the latests.
> ## I cannot explain why the number is 14.0.4, but I observed that 3.4.0 used
> ## 14.0.0
> ##
> 242(TEST)jrr@[SUSE10.1]> ls $LD_LIBRARY_PATH/libactive*
> /usr/BWhttpd/lib//libactivemq-cpp.so*
> /usr/BWhttpd/lib//libactivemq-cpp.so.14*
> /usr/BWhttpd/lib//libactivemq-cpp.so.14.0.4*
> 243(TEST)jrr@[SUSE10.1]>
> ##
> ## Compile the simple producer
> ##
> Compiling simple_producer.o
> g++ -g -c -MD -Wall -Werror -I /views/LU-7.0-NEWAMQ/server/CommonLib/include/activemq-cpp/ -I /views/LU-7.0-NEWAMQ/server/CommonLib/include/ -I /views/LU-7.0-NEWAMQ/server/CommonLib/include/apr-1 simple_producer.cpp
> g++ -o simple_producer.exe simple_producer.o -lc -lrt \
> -lactivemq-cpp \
> -lboost_thread \
> -L /usr/BWhttpd/lib
> Compilation finished at Mon Jul 9 15:32:48
> ##
> ## Execute the test program. It creates a connection and then loops forever
> ## sending messages. Where for each message it creates sessions, destiation,
> ## producer,etc. sends the message and then destructs all of those pieces.
> ##
> ## It locks up at 1775 messages
> ##
> 257(TEST)jrr@[SUSE10.1]> simple_producer.exe
> =====================================================
> Starting the example:
> -----------------------------------------------------
> Sending message #1
> Creating connection for tcp://127.0.0.1:61616?connection.alwaysSyncSend=true
> Sending message #2
> Sending message #3
> Sending message #4
> Sending message #5
> ...
> ...
> ...
> Sending message #1772
> Sending message #1773
> Sending message #1774
> Sending message #1775
> Sending message #1776
> ##
> ## View the ActiveMQ disk usage during the lock up
> ##
> du -sh activemq-data/
> 12M activemq-data/
> root@psbu-jrr-lnx:#
> ##
> ## Use GDB to see where the simple_producer is blocked
> ##
> psbu-jrr-lnx[SUSE10.1]:176> ps auxw | egrep simple_producer.exe
> jrr 16974 1.3 0.6 53980 6724 pts/12 Sl+ 17:58 0:00 simple_producer.exe
> jrr 16997 0.0 0.0 1864 660 pts/18 S+ 17:59 0:00 /bin/grep -E simple_producer.exe
> psbu-jrr-lnx[SUSE10.1]:177>
> psbu-jrr-lnx[SUSE10.1]:177> gdb simple_producer.exe
> GNU gdb 6.6
> Copyright (C) 2006 Free Software Foundation, Inc.
> GDB is free software, covered by the GNU General Public License, and you are
> welcome to change it and/or distribute copies of it under certain conditions.
> Type "show copying" to see the conditions.
> There is absolutely no warranty for GDB. Type "show warranty" for details.
> This GDB was configured as "i586-suse-linux"...
> Using host libthread_db library "/lib/libthread_db.so.1".
> (gdb) set print pretty
> (gdb) set pagination off
> (gdb) attach 16974
> Attaching to program: /views/TEST/AMQ/AMQ2/simple_producer.exe, process 16974
> Reading symbols from /lib/librt.so.1...done.
> Loaded symbols for /lib/librt.so.1
> Reading symbols from /usr/BWhttpd/lib/libactivemq-cpp.so.14...done.
> Loaded symbols for /usr/BWhttpd/lib/libactivemq-cpp.so.14
> Reading symbols from /usr/BWhttpd/lib/libboost_thread.so.1.43.0...done.
> Loaded symbols for /usr/BWhttpd/lib/libboost_thread.so.1.43.0
> Reading symbols from /usr/lib/libstdc++.so.6...done.
> Loaded symbols for /usr/lib/libstdc++.so.6
> Reading symbols from /lib/libm.so.6...done.
> Loaded symbols for /lib/libm.so.6
> Reading symbols from /lib/libc.so.6...done.
> Loaded symbols for /lib/libc.so.6
> Reading symbols from /lib/libgcc_s.so.1...done.
> Loaded symbols for /lib/libgcc_s.so.1
> Reading symbols from /lib/libpthread.so.0...done.
> [Thread debugging using libthread_db enabled]
> [New Thread -1221044560 (LWP 16974)]
> [New Thread -1254835296 (LWP 16981)]
> [New Thread -1246442592 (LWP 16978)]
> [New Thread -1238049888 (LWP 16977)]
> [New Thread -1229657184 (LWP 16976)]
> [New Thread -1221264480 (LWP 16975)]
> Loaded symbols for /lib/libpthread.so.0
> Reading symbols from /lib/ld-linux.so.2...done.
> Loaded symbols for /lib/ld-linux.so.2
> Reading symbols from /usr/BWhttpd/lib/libapr-1.so.0...done.
> Loaded symbols for /usr/BWhttpd/lib/libapr-1.so.0
> Reading symbols from /lib/libuuid.so.1...done.
> Loaded symbols for /lib/libuuid.so.1
> Reading symbols from /lib/libcrypt.so.1...done.
> Loaded symbols for /lib/libcrypt.so.1
> Reading symbols from /usr/BWhttpd/lib/libaprutil-1.so.0...done.
> Loaded symbols for /usr/BWhttpd/lib/libaprutil-1.so.0
> Reading symbols from /usr/BWhttpd/lib/libexpat.so.1...done.
> Loaded symbols for /usr/BWhttpd/lib/libexpat.so.1
> Reading symbols from /usr/lib/libssl.so.0.9.8...done.
> Loaded symbols for /usr/lib/libssl.so.0.9.8
> Reading symbols from /usr/lib/libcrypto.so.0.9.8...done.
> Loaded symbols for /usr/lib/libcrypto.so.0.9.8
> Reading symbols from /lib/libdl.so.2...done.
> Loaded symbols for /lib/libdl.so.2
> 0xffffe410 in __kernel_vsyscall ()
> (gdb) thread apply all where
> Thread 6 (Thread -1221264480 (LWP 16975)):
> #0 0xffffe410 in __kernel_vsyscall ()
> #1 0xb7595056 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libpthread.so.0
> #2 0xb767629d in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libc.so.6
> #3 0xb7d7841d in decaf::internal::util::concurrent::ConditionImpl::wait (condition=0x806b120) at decaf/internal/util/concurrent/unix/ConditionImpl.cpp:101
> #4 0xb7df07c3 in decaf::util::concurrent::Mutex::wait (this=0x806ae5c) at decaf/util/concurrent/Mutex.cpp:126
> #5 0xb7d77fc0 in decaf::internal::util::concurrent::SynchronizableImpl::wait (this=0x806ae58) at decaf/internal/util/concurrent/SynchronizableImpl.cpp:48
> #6 0xb7de6ba9 in decaf::util::TimerImpl::run (this=0x806ae50) at decaf/util/Timer.cpp:81
> #7 0xb7dac25a in decaf::lang::ThreadProperties::runCallback (properties=0x806aff8) at decaf/lang/Thread.cpp:137
> #8 0xb7daa47c in (anonymous namespace)::threadWorker (arg=0x806aff8) at decaf/lang/Thread.cpp:190
> #9 0xb75912ab in start_thread () from /lib/libpthread.so.0
> #10 0xb766aa4e in clone () from /lib/libc.so.6
> Thread 5 (Thread -1229657184 (LWP 16976)):
> #0 0xffffe410 in __kernel_vsyscall ()
> #1 0xb7595056 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libpthread.so.0
> #2 0xb767629d in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libc.so.6
> #3 0xb7d7841d in decaf::internal::util::concurrent::ConditionImpl::wait (condition=0x806b368) at decaf/internal/util/concurrent/unix/ConditionImpl.cpp:101
> #4 0xb7df07c3 in decaf::util::concurrent::Mutex::wait (this=0x806b1fc) at decaf/util/concurrent/Mutex.cpp:126
> #5 0xb7d77fc0 in decaf::internal::util::concurrent::SynchronizableImpl::wait (this=0x806b1f8) at decaf/internal/util/concurrent/SynchronizableImpl.cpp:48
> #6 0xb7de6ba9 in decaf::util::TimerImpl::run (this=0x806b1f0) at decaf/util/Timer.cpp:81
> #7 0xb7dac25a in decaf::lang::ThreadProperties::runCallback (properties=0x806b240) at decaf/lang/Thread.cpp:137
> #8 0xb7daa47c in (anonymous namespace)::threadWorker (arg=0x806b240) at decaf/lang/Thread.cpp:190
> #9 0xb75912ab in start_thread () from /lib/libpthread.so.0
> #10 0xb766aa4e in clone () from /lib/libc.so.6
> Thread 4 (Thread -1238049888 (LWP 16977)):
> #0 0xffffe410 in __kernel_vsyscall ()
> #1 0xb7595056 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libpthread.so.0
> #2 0xb767629d in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libc.so.6
> #3 0xb7d7841d in decaf::internal::util::concurrent::ConditionImpl::wait (condition=0x806c718) at decaf/internal/util/concurrent/unix/ConditionImpl.cpp:101
> #4 0xb7df07c3 in decaf::util::concurrent::Mutex::wait (this=0x806c2ec) at decaf/util/concurrent/Mutex.cpp:126
> #5 0xb7d77fc0 in decaf::internal::util::concurrent::SynchronizableImpl::wait (this=0x806c2e8) at decaf/internal/util/concurrent/SynchronizableImpl.cpp:48
> #6 0xb7de6ba9 in decaf::util::TimerImpl::run (this=0x806c2e0) at decaf/util/Timer.cpp:81
> #7 0xb7dac25a in decaf::lang::ThreadProperties::runCallback (properties=0x806c5c0) at decaf/lang/Thread.cpp:137
> #8 0xb7daa47c in (anonymous namespace)::threadWorker (arg=0x806c5c0) at decaf/lang/Thread.cpp:190
> #9 0xb75912ab in start_thread () from /lib/libpthread.so.0
> #10 0xb766aa4e in clone () from /lib/libc.so.6
> Thread 3 (Thread -1246442592 (LWP 16978)):
> #0 0xffffe410 in __kernel_vsyscall ()
> #1 0xb765ba8b in std::exception::what () from /lib/libc.so.6
> #2 0xb7580b7a in apr_socket_recv (sock=0x80649e8, buf=0x8066c10 "", len=0xb5b4c040) at network_io/unix/sendrecv.c:81
> #3 0xb7d49ebd in decaf::internal::net::tcp::TcpSocket::read (this=0x8064898, buffer=0x8066c10 "", size=8192, offset=0, length=8192) at decaf/internal/net/tcp/TcpSocket.cpp:649
> #4 0xb7d4d1c0 in decaf::internal::net::tcp::TcpSocketInputStream::doReadArrayBounded (this=0x8066c10, buffer=0x2000 <Address 0x2000 out of bounds>, size=8192, offset=0, length=8192) at decaf/internal/net/tcp/TcpSocketInputStream.cpp:108
> #5 0xb7d91d1f in decaf::io::InputStream::doReadArray (this=0x8066998, buffer=0x8066c10 "", size=8192) at decaf/io/InputStream.cpp:138
> #6 0xb7d92333 in decaf::io::InputStream::read (this=0x8066998, buffer=0x8066c10 "", size=8192) at decaf/io/InputStream.cpp:72
> #7 0xb7d866ef in decaf::io::BufferedInputStream::bufferData (this=0x8066b60, inputStream=0x8066998, buffer=@0xb5b4c1f8) at decaf/io/BufferedInputStream.cpp:326
> #8 0xb7d86d18 in decaf::io::BufferedInputStream::doReadArrayBounded (this=0x8066b60, buffer=0x80669ca "", size=4, offset=0, length=4) at decaf/io/BufferedInputStream.cpp:228
> #9 0xb7d92191 in decaf::io::InputStream::read (this=0x8066b60, buffer=0x80669ca "", size=4, offset=0, length=4) at decaf/io/InputStream.cpp:84
> #10 0xb7d8a757 in decaf::io::DataInputStream::readAllData (this=0x80669b8, buffer=0x80669ca "", length=4) at decaf/io/DataInputStream.cpp:492
> #11 0xb7d8c684 in decaf::io::DataInputStream::readInt (this=0x80669b8) at decaf/io/DataInputStream.cpp:124
> #12 0xb7cae1b8 in activemq::wireformat::openwire::OpenWireFormat::unmarshal (this=0x8063e10, transport=0x8064790, dis=0x80669b8) at activemq/wireformat/openwire/OpenWireFormat.cpp:245
> #13 0xb7c2c9f7 in activemq::transport::IOTransport::run (this=0x8064790) at activemq/transport/IOTransport.cpp:246
> #14 0xb7dac25a in decaf::lang::ThreadProperties::runCallback (properties=0x806c088) at decaf/lang/Thread.cpp:137
> #15 0xb7daa47c in (anonymous namespace)::threadWorker (arg=0x806c088) at decaf/lang/Thread.cpp:190
> #16 0xb75912ab in start_thread () from /lib/libpthread.so.0
> #17 0xb766aa4e in clone () from /lib/libc.so.6
> Thread 2 (Thread -1254835296 (LWP 16981)):
> #0 0xffffe410 in __kernel_vsyscall ()
> #1 0xb7595056 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libpthread.so.0
> #2 0xb767629d in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libc.so.6
> #3 0xb7d7841d in decaf::internal::util::concurrent::ConditionImpl::wait (condition=0x806cde8) at decaf/internal/util/concurrent/unix/ConditionImpl.cpp:101
> #4 0xb7df07c3 in decaf::util::concurrent::Mutex::wait (this=0x806d24c) at decaf/util/concurrent/Mutex.cpp:126
> #5 0xb7c1fada in activemq::threads::CompositeTaskRunner::run (this=0x806d208) at activemq/threads/CompositeTaskRunner.cpp:115
> #6 0xb7dac25a in decaf::lang::ThreadProperties::runCallback (properties=0x806cbc0) at decaf/lang/Thread.cpp:137
> #7 0xb7daa47c in (anonymous namespace)::threadWorker (arg=0x806cbc0) at decaf/lang/Thread.cpp:190
> #8 0xb75912ab in start_thread () from /lib/libpthread.so.0
> #9 0xb766aa4e in clone () from /lib/libc.so.6
> Thread 1 (Thread -1221044560 (LWP 16974)):
> #0 0xffffe410 in __kernel_vsyscall ()
> #1 0xb7595056 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libpthread.so.0
> #2 0xb767629d in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libc.so.6
> #3 0xb7d7841d in decaf::internal::util::concurrent::ConditionImpl::wait (condition=0x806f1a8) at decaf/internal/util/concurrent/unix/ConditionImpl.cpp:101
> #4 0xb7df07c3 in decaf::util::concurrent::Mutex::wait (this=0x806f13c) at decaf/util/concurrent/Mutex.cpp:126
> #5 0xb7de9395 in decaf::util::concurrent::CountDownLatch::await (this=0x806f134) at decaf/util/concurrent/CountDownLatch.cpp:53
> #6 0xb7c36025 in activemq::transport::correlator::FutureResponse::getResponse (this=0x806f130) at ./activemq/transport/correlator/FutureResponse.h:62
> #7 0xb7c33aa2 in activemq::transport::correlator::ResponseCorrelator::request (this=0x806b740, command=@0xbf9301cc) at activemq/transport/correlator/ResponseCorrelator.cpp:120
> #8 0xb7b649fa in activemq::core::ActiveMQConnection::syncRequest (this=0x806b858, command=@0xbf9301cc, timeout=0) at activemq/core/ActiveMQConnection.cpp:896
> #9 0xb7baf1d8 in activemq::core::ActiveMQSession::send (this=0x80635b8, message=0x806e640, producer=0x806e530, usage=0x0) at activemq/core/ActiveMQSession.cpp:921
> #10 0xb7ba1e71 in activemq::core::ActiveMQProducer::send (this=0x806e530, destination=0x806e3ec, message=0x806e640, deliveryMode=1, priority=4, timeToLive=0) at activemq/core/ActiveMQProducer.cpp:211
> #11 0xb7ba2b07 in activemq::core::ActiveMQProducer::send (this=0x806e530, destination=0x806e3ec, message=0x806e640) at activemq/core/ActiveMQProducer.cpp:152
> #12 0xb7ba3cab in activemq::core::ActiveMQProducer::send (this=0x806e530, message=0x806e640) at activemq/core/ActiveMQProducer.cpp:128
> #13 0x0804c5ac in AMQ_Producer::send (this=0xbf9305c8, msg=@0xbf930600, type=@0xbf9305f8) at simple_producer.cpp:720
> #14 0x0804ca45 in run_test () at simple_producer.cpp:767
> #15 0x0804cd47 in main () at simple_producer.cpp:798
> #0 0xffffe410 in __kernel_vsyscall ()
> (gdb) detach
> Detaching from program: /views/TEST/AMQ/AMQ2/simple_producer.exe, process 16974
> (gdb) quit
> psbu-jrr-lnx[SUSE10.1]:178>
> ##
> ## If we kill the program and execute it again, then this time it will
> ## terminate with the expected exception.
> ##
> 258(TEST)jrr@[SUSE10.1]> simple_producer.exe
> =====================================================
> Starting the example:
> -----------------------------------------------------
> Sending message #1
> Creating connection for tcp://127.0.0.1:61616?connection.alwaysSyncSend=true
> Exception[1]: Error while sending message [*** BEGIN SERVER-SIDE STACK TRACE ***
> Message: Usage Manager Temp Store is Full (01001622326f 1048576). Stopping producer (ID:psbu-jrr-lnx-53043-1341882066464-0:0:0:0) to prevent flooding queue://c.c.p.v.ms.events. See http://activemq.apache.org/producer-flow-control.html for more info
> Exception Class javax.jms.ResourceAllocationException
> [FILE: BaseDestination.java, LINE: 579] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
> [FILE: BaseDestination.java, LINE: 573] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
> [FILE: Queue.java, LINE: 757] occurred in: org.apache.activemq.broker.region.Queue.checkUsage
> [FILE: Queue.java, LINE: 674] occurred in: org.apache.activemq.broker.region.Queue.doMessageSend
> [FILE: Queue.java, LINE: 653] occurred in: org.apache.activemq.broker.region.Queue.send
> [FILE: AbstractRegion.java, LINE: 365] occurred in: org.apache.activemq.broker.region.AbstractRegion.send
> [FILE: RegionBroker.java, LINE: 523] occurred in: org.apache.activemq.broker.region.RegionBroker.send
> [FILE: BrokerFilter.java, LINE: 129] occurred in: org.apache.activemq.broker.BrokerFilter.send
> [FILE: CompositeDestinationBroker.java, LINE: 96] occurred in: org.apache.activemq.broker.CompositeDestinationBroker.send
> [FILE: TransactionBroker.java, LINE: 227] occurred in: org.apache.activemq.broker.TransactionBroker.send
> [FILE: MutableBrokerFilter.java, LINE: 135] occurred in: org.apache.activemq.broker.MutableBrokerFilter.send
> [FILE: TransportConnection.java, LINE: 458] occurred in: org.apache.activemq.broker.TransportConnection.processMessage
> [FILE: ActiveMQMessage.java, LINE: 681] occurred in: org.apache.activemq.command.ActiveMQMessage.visit
> [FILE: TransportConnection.java, LINE: 306] occurred in: org.apache.activemq.broker.TransportConnection.service
> [FILE: TransportConnection.java, LINE: 179] occurred in: org.apache.activemq.broker.TransportConnection$1.onCommand
> [FILE: TransportFilter.java, LINE: 69] occurred in: org.apache.activemq.transport.TransportFilter.onCommand
> [FILE: WireFormatNegotiator.java, LINE: 113] occurred in: org.apache.activemq.transport.WireFormatNegotiator.onCommand
> [FILE: InactivityMonitor.java, LINE: 227] occurred in: org.apache.activemq.transport.InactivityMonitor.onCommand
> [FILE: TransportSupport.java, LINE: 83] occurred in: org.apache.activemq.transport.TransportSupport.doConsume
> [FILE: TcpTransport.java, LINE: 220] occurred in: org.apache.activemq.transport.tcp.TcpTransport.doRun
> [FILE: TcpTransport.java, LINE: 202] occurred in: org.apache.activemq.transport.tcp.TcpTransport.run
> [FILE: , LINE: -1] occurred in: java.lang.Thread.run
> *** END SERVER-SIDE STACK TRACE ***]
> Sending message #2
> ...
> ...
> ...
> Sending message #10
> Exception[10]: Error while sending message [*** BEGIN SERVER-SIDE STACK TRACE ***
> Message: Usage Manager Temp Store is Full (01001622426f 1048576). Stopping producer (ID:psbu-jrr-lnx-53043-1341882066464-0:0:9:0) to prevent flooding queue://c.c.p.v.ms.events. See http://activemq.apache.org/producer-flow-control.html for more info
> Exception Class javax.jms.ResourceAllocationException
> [FILE: BaseDestination.java, LINE: 579] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
> [FILE: BaseDestination.java, LINE: 573] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
> [FILE: Queue.java, LINE: 757] occurred in: org.apache.activemq.broker.region.Queue.checkUsage
> [FILE: Queue.java, LINE: 674] occurred in: org.apache.activemq.broker.region.Queue.doMessageSend
> [FILE: Queue.java, LINE: 653] occurred in: org.apache.activemq.broker.region.Queue.send
> [FILE: AbstractRegion.java, LINE: 365] occurred in: org.apache.activemq.broker.region.AbstractRegion.send
> [FILE: RegionBroker.java, LINE: 523] occurred in: org.apache.activemq.broker.region.RegionBroker.send
> [FILE: BrokerFilter.java, LINE: 129] occurred in: org.apache.activemq.broker.BrokerFilter.send
> [FILE: CompositeDestinationBroker.java, LINE: 96] occurred in: org.apache.activemq.broker.CompositeDestinationBroker.send
> [FILE: TransactionBroker.java, LINE: 227] occurred in: org.apache.activemq.broker.TransactionBroker.send
> [FILE: MutableBrokerFilter.java, LINE: 135] occurred in: org.apache.activemq.broker.MutableBrokerFilter.send
> [FILE: TransportConnection.java, LINE: 458] occurred in: org.apache.activemq.broker.TransportConnection.processMessage
> [FILE: ActiveMQMessage.java, LINE: 681] occurred in: org.apache.activemq.command.ActiveMQMessage.visit
> [FILE: TransportConnection.java, LINE: 306] occurred in: org.apache.activemq.broker.TransportConnection.service
> [FILE: TransportConnection.java, LINE: 179] occurred in: org.apache.activemq.broker.TransportConnection$1.onCommand
> [FILE: TransportFilter.java, LINE: 69] occurred in: org.apache.activemq.transport.TransportFilter.onCommand
> [FILE: WireFormatNegotiator.java, LINE: 113] occurred in: org.apache.activemq.transport.WireFormatNegotiator.onCommand
> [FILE: InactivityMonitor.java, LINE: 227] occurred in: org.apache.activemq.transport.InactivityMonitor.onCommand
> [FILE: TransportSupport.java, LINE: 83] occurred in: org.apache.activemq.transport.TransportSupport.doConsume
> [FILE: TcpTransport.java, LINE: 220] occurred in: org.apache.activemq.transport.tcp.TcpTransport.doRun
> [FILE: TcpTransport.java, LINE: 202] occurred in: org.apache.activemq.transport.tcp.TcpTransport.run
> [FILE: , LINE: -1] occurred in: java.lang.Thread.run
> *** END SERVER-SIDE STACK TRACE ***]
> Exception limit (10) reached. Stopping test
> -----------------------------------------------------
> Finished with the example.
> =====================================================
> terminate called after throwing an instance of 'decaf::lang::exceptions::RuntimeException'
> what(): Unlock Failed, this thread is not the Lock Owner!
> Abort
> 259(TEST)jrr@[SUSE10.1]>
> {noformat}
> \\
> \\
> ----
> \\
> \\
> {noformat}
> ######################################################################
> ## SCENARIO #2
> ######################################################################
> ##
> ## Restart our tomcat service which restarts the AMQ Broker and view the
> ## directory size.
> ##
> root@psbu-jrr-lnx:# /usr/BWhttpd/bin/init_tomcat stop; \rm -rf /usr/BWhttpd/tomcat/activemq-data; /usr/BWhttpd/bin/init_tomcat start; du -sh /usr/BWhttpd/tomcat/activemq-data
> Stopping tomcat ... done
> Killing tomcat ... done.
> Starting tomcat ... done
> 44K /usr/BWhttpd/tomcat/activemq-data
> root@psbu-jrr-lnx:#
> ##
> ## Execute the test program, this time pass in variables to cause the sendTime
> ## to be set to 500ms. It creates a connection and then loops forever sending
> ## messages. Where for each message it creates sessions, destiation,
> ## producer,etc. sends the message and then destructs all of those pieces.
> ##
> ## Now at 1775 messages the send times out. Moreover, the test moves on trying
> ## more sends, and they all time out. After 10 failures the test exits. This
> ## sender that causes the broker memory limmit to be reached never gets a
> ## 'broker full' exception.
> ##
> 262(TEST)jrr@[SUSE10.1]> env SEND_TO=500 simple_producer.exe
> =====================================================
> Starting the example:
> -----------------------------------------------------
> SEND_TO set to 500
> Sending message #1
> Creating connection for tcp://127.0.0.1:61616?connection.alwaysSyncSend=true&connection.sendTimeout=500
> SEND_TO set to 500
> Sending message #2
> SEND_TO set to 500
> Sending message #3
> SEND_TO set to 500
> Sending message #4
> ...
> ...
> ...
> Sending message #1776
> Exception[1]: Error while sending message [No valid response received for command: Message { commandId = 8879, responseRequired = true, ProducerId = ID:psbu-jrr-lnx-47169-1341882401921-0:0:1775:0, Destination = queue://c.c.p.v.ms.events, TransactionId = NULL, OriginalDestination = NULL, MessageId = ID:psbu-jrr-lnx-47169-1341882401921-0:0:1775:0:0:0, OriginalTransactionId = NULL, GroupID = , GroupSequence = 0, CorrelationId = , Persistent = false, Expiration = 0, Priority = 4, ReplyTo = NULL, Timestamp = 1341882406582, Type = TEST_TYPE, Content = [size=1045], MarshalledProperties = NULL, DataStructure = NULL, TargetConsumerId = NULL, Compressed = false, RedeliveryCounter = 0, BrokerPath = NULL, Arrival = 0, UserID = , RecievedByDFBridge = false, Droppable = false, Cluster = NULL, BrokerInTime = 0, BrokerOutTime = 0 }Text = 1024 characters folowed by a message.xxxxxxxx... world! 1776, check broker.]
> SEND_TO set to 500
> Sending message #1777
> ...
> ...
> ...
> Sending message #1785
> Exception[10]: Error while sending message [No valid response received for command: Message { commandId = 8924, responseRequired = true, ProducerId = ID:psbu-jrr-lnx-47169-1341882401921-0:0:1784:0, Destination = queue://c.c.p.v.ms.events, TransactionId = NULL, OriginalDestination = NULL, MessageId = ID:psbu-jrr-lnx-47169-1341882401921-0:0:1784:0:0:0, OriginalTransactionId = NULL, GroupID = , GroupSequence = 0, CorrelationId = , Persistent = false, Expiration = 0, Priority = 4, ReplyTo = NULL, Timestamp = 1341882411136, Type = TEST_TYPE, Content = [size=1045], MarshalledProperties = NULL, DataStructure = NULL, TargetConsumerId = NULL, Compressed = false, RedeliveryCounter = 0, BrokerPath = NULL, Arrival = 0, UserID = , RecievedByDFBridge = false, Droppable = false, Cluster = NULL, BrokerInTime = 0, BrokerOutTime = 0 }Text = 1024 characters folowed by a message.xxxxxxxx... world! 1785, check broker.]
> Exception limit (10) reached. Stopping test
> -----------------------------------------------------
> Finished with the example.
> =====================================================
> terminate called after throwing an instance of 'decaf::lang::exceptions::RuntimeException'
> what(): Unlock Failed, this thread is not the Lock Owner!
> Abort
> ##
> ##
> ## View the ActiveMQ disk usage during the lock up
> ##
> root@psbu-jrr-lnx:# !du
> du -sh activemq-data/
> 12M activemq-data/
> root@psbu-jrr-lnx:#
> ##
> ## If we re-execute the test now that the broker is full, then this time it
> ## will terminate with the expected broker full exception.
> ##
> 263(TEST)jrr@[SUSE10.1]> env SEND_TO=500 simple_producer.exe
> =====================================================
> Starting the example:
> -----------------------------------------------------
> SEND_TO set to 500
> Sending message #1
> Creating connection for tcp://127.0.0.1:61616?connection.alwaysSyncSend=true&connection.sendTimeout=500
> Exception[1]: Error while sending message [*** BEGIN SERVER-SIDE STACK TRACE ***
> Message: Usage Manager Temp Store is Full (01001622756f 1048576). Stopping producer (ID:psbu-jrr-lnx-32967-1341882542997-0:0:0:0) to prevent flooding queue://c.c.p.v.ms.events. See http://activemq.apache.org/producer-flow-control.html for more info
> Exception Class javax.jms.ResourceAllocationException
> [FILE: BaseDestination.java, LINE: 579] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
> [FILE: BaseDestination.java, LINE: 573] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
> [FILE: Queue.java, LINE: 757] occurred in: org.apache.activemq.broker.region.Queue.checkUsage
> [FILE: Queue.java, LINE: 674] occurred in: org.apache.activemq.broker.region.Queue.doMessageSend
> [FILE: Queue.java, LINE: 653] occurred in: org.apache.activemq.broker.region.Queue.send
> [FILE: AbstractRegion.java, LINE: 365] occurred in: org.apache.activemq.broker.region.AbstractRegion.send
> [FILE: RegionBroker.java, LINE: 523] occurred in: org.apache.activemq.broker.region.RegionBroker.send
> [FILE: BrokerFilter.java, LINE: 129] occurred in: org.apache.activemq.broker.BrokerFilter.send
> [FILE: CompositeDestinationBroker.java, LINE: 96] occurred in: org.apache.activemq.broker.CompositeDestinationBroker.send
> [FILE: TransactionBroker.java, LINE: 227] occurred in: org.apache.activemq.broker.TransactionBroker.send
> [FILE: MutableBrokerFilter.java, LINE: 135] occurred in: org.apache.activemq.broker.MutableBrokerFilter.send
> [FILE: TransportConnection.java, LINE: 458] occurred in: org.apache.activemq.broker.TransportConnection.processMessage
> [FILE: ActiveMQMessage.java, LINE: 681] occurred in: org.apache.activemq.command.ActiveMQMessage.visit
> [FILE: TransportConnection.java, LINE: 306] occurred in: org.apache.activemq.broker.TransportConnection.service
> [FILE: TransportConnection.java, LINE: 179] occurred in: org.apache.activemq.broker.TransportConnection$1.onCommand
> [FILE: TransportFilter.java, LINE: 69] occurred in: org.apache.activemq.transport.TransportFilter.onCommand
> [FILE: WireFormatNegotiator.java, LINE: 113] occurred in: org.apache.activemq.transport.WireFormatNegotiator.onCommand
> [FILE: InactivityMonitor.java, LINE: 227] occurred in: org.apache.activemq.transport.InactivityMonitor.onCommand
> [FILE: TransportSupport.java, LINE: 83] occurred in: org.apache.activemq.transport.TransportSupport.doConsume
> [FILE: TcpTransport.java, LINE: 220] occurred in: org.apache.activemq.transport.tcp.TcpTransport.doRun
> [FILE: TcpTransport.java, LINE: 202] occurred in: org.apache.activemq.transport.tcp.TcpTransport.run
> [FILE: , LINE: -1] occurred in: java.lang.Thread.run
> *** END SERVER-SIDE STACK TRACE ***]
> SEND_TO set to 500
> ...
> ...
> ...
> SEND_TO set to 500
> Sending message #10
> Exception[10]: Error while sending message [*** BEGIN SERVER-SIDE STACK TRACE ***
> Message: Usage Manager Temp Store is Full (01001623156f 1048576). Stopping producer (ID:psbu-jrr-lnx-32967-1341882542997-0:0:9:0) to prevent flooding queue://c.c.p.v.ms.events. See http://activemq.apache.org/producer-flow-control.html for more info
> Exception Class javax.jms.ResourceAllocationException
> [FILE: BaseDestination.java, LINE: 579] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
> [FILE: BaseDestination.java, LINE: 573] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
> [FILE: Queue.java, LINE: 757] occurred in: org.apache.activemq.broker.region.Queue.checkUsage
> [FILE: Queue.java, LINE: 674] occurred in: org.apache.activemq.broker.region.Queue.doMessageSend
> [FILE: Queue.java, LINE: 653] occurred in: org.apache.activemq.broker.region.Queue.send
> [FILE: AbstractRegion.java, LINE: 365] occurred in: org.apache.activemq.broker.region.AbstractRegion.send
> [FILE: RegionBroker.java, LINE: 523] occurred in: org.apache.activemq.broker.region.RegionBroker.send
> [FILE: BrokerFilter.java, LINE: 129] occurred in: org.apache.activemq.broker.BrokerFilter.send
> [FILE: CompositeDestinationBroker.java, LINE: 96] occurred in: org.apache.activemq.broker.CompositeDestinationBroker.send
> [FILE: TransactionBroker.java, LINE: 227] occurred in: org.apache.activemq.broker.TransactionBroker.send
> [FILE: MutableBrokerFilter.java, LINE: 135] occurred in: org.apache.activemq.broker.MutableBrokerFilter.send
> [FILE: TransportConnection.java, LINE: 458] occurred in: org.apache.activemq.broker.TransportConnection.processMessage
> [FILE: ActiveMQMessage.java, LINE: 681] occurred in: org.apache.activemq.command.ActiveMQMessage.visit
> [FILE: TransportConnection.java, LINE: 306] occurred in: org.apache.activemq.broker.TransportConnection.service
> [FILE: TransportConnection.java, LINE: 179] occurred in: org.apache.activemq.broker.TransportConnection$1.onCommand
> [FILE: TransportFilter.java, LINE: 69] occurred in: org.apache.activemq.transport.TransportFilter.onCommand
> [FILE: WireFormatNegotiator.java, LINE: 113] occurred in: org.apache.activemq.transport.WireFormatNegotiator.onCommand
> [FILE: InactivityMonitor.java, LINE: 227] occurred in: org.apache.activemq.transport.InactivityMonitor.onCommand
> [FILE: TransportSupport.java, LINE: 83] occurred in: org.apache.activemq.transport.TransportSupport.doConsume
> [FILE: TcpTransport.java, LINE: 220] occurred in: org.apache.activemq.transport.tcp.TcpTransport.doRun
> [FILE: TcpTransport.java, LINE: 202] occurred in: org.apache.activemq.transport.tcp.TcpTransport.run
> [FILE: , LINE: -1] occurred in: java.lang.Thread.run
> *** END SERVER-SIDE STACK TRACE ***]
> Exception limit (10) reached. Stopping test
> -----------------------------------------------------
> Finished with the example.
> =====================================================
> terminate called after throwing an instance of 'decaf::lang::exceptions::RuntimeException'
> what(): Unlock Failed, this thread is not the Lock Owner!
> Abort
> 264(TEST)jrr@[SUSE10.1]>
> {noformat}
> \\
> \\
> ----
> \\
> \\
> {code}
> //////////////////////////////////////////////////////////////////////
> // Not so simple producer.
> //
> // The Active MQ Client's simple producer code has been modified to allow for
> // investigation into client behaviour. I've integrated the products engine for
> // obtaining a connection, session, queue, destination, sender and message.
> //
> // The code basically does the following:
> // - create a connection to be used over and over for the test
> // - loop forever
> // - create a session, queue, destination, sender and message
> // - send the message
> // - destroy the message, sender, destination, topic and session.
> //
> // ----------------------------------------
> // ENVIRONMENT VARIABLE OVERRIDES
> // ----------------------------------------
> //
> // SEND_TO : default is 0. If non-zero then it appends connection.sendTimeout
> // to the destination URI using the value set for SEND_TO. Refer to
> // the connection options at
> // http://activemq.apache.org/cms/configuring.html for details
> // regarding its values.
> //
> /*
> * Licensed to the Apache Software Foundation (ASF) under one or more
> * contributor license agreements. See the NOTICE file distributed with
> * this work for additional information regarding copyright ownership.
> * The ASF licenses this file to You under the Apache License, Version 2.0
> * (the "License"); you may not use this file except in compliance with
> * the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> #include <decaf/lang/Thread.h>
> #include <decaf/lang/Runnable.h>
> #include <decaf/util/concurrent/CountDownLatch.h>
> #include <decaf/lang/Long.h>
> #include <decaf/util/Date.h>
> #include <activemq/core/ActiveMQConnectionFactory.h>
> #include <activemq/util/Config.h>
> #include <activemq/library/ActiveMQCPP.h>
> #include <cms/Connection.h>
> #include <cms/Session.h>
> #include <cms/TextMessage.h>
> #include <cms/BytesMessage.h>
> #include <cms/MapMessage.h>
> #include <cms/ExceptionListener.h>
> #include <cms/MessageListener.h>
> #include <stdlib.h>
> #include <stdio.h>
> #include <iostream>
> #include <memory>
> #include <boost/scoped_ptr.hpp>
> #include <boost/thread/thread.hpp>
> #include <boost/lexical_cast.hpp>
> using namespace activemq;
> using namespace activemq::core;
> using namespace decaf;
> using namespace decaf::lang;
> using namespace decaf::util;
> using namespace decaf::util::concurrent;
> using namespace cms;
> using namespace std;
> //
> // Override values with environement values if they exist
> //
> template <class DEST>
> bool GetEnv (const std::string var,
> DEST &dst,
> const DEST &def_value)
> {
> const char *val(::getenv(var.c_str()));
> dst = def_value;
> if (NULL == val) {
> return (false);
> }
> try {
> dst = boost::lexical_cast<DEST>(val);
> printf("%s set to %s\n", var.c_str(), val);
> return(true);
> }
> catch (std::exception &e) {
> printf("%s\n", e.what());
> printf("Cannot convert '%s' to the desired output format\n", val);
> }
> return(false);
> }
> // ---------------------------------------------------------------------------
> // EventEngineUtil.hxx
> // ---------------------------------------------------------------------------
> class ee_except : public std::exception
> {
> public:
> // ee_except(std::string)
> //
> // Create an exception using the specified string
> //
> explicit ee_except (const std::string &_msg) :
> msg(_msg)
> {};
> explicit ee_except (const std::stringstream &_msg) :
> msg(_msg.str())
> {};
> // ee_except(std::string, errNum)
> //
> // Create an excpetion using the specified string, and then append the
> // numeric and string representation of the specified errno
> //
> explicit ee_except (const std::string &_msg,
> int errNum) {
> ee_except_helper(_msg, errNum);
> }
> explicit ee_except (const std::stringstream &_msg,
> int errNum) {
> ee_except_helper(_msg.str(), errNum);
> }
> // destroy the exception
> ~ee_except() throw() {};
> // return the message, overloading the standard what method
> virtual const char *what() const throw() {
> return msg.c_str();
> }
> protected:
> void ee_except_helper (const std::string &_msg,
> int errNum) {
> std::stringstream ss_error;
> if (errNum) {
> ss_error << _msg << ", errno: " << errNum
> << ", " << strerror(errNum);
> msg = ss_error.str();
> } else {
> msg = _msg;
> }
> }
> private:
> std::string msg;
> };
> // ---------------------------------------------------------------------------
> // From EventEngineAMQProducer
> // ---------------------------------------------------------------------------
> #include <boost/thread/thread.hpp>
> #include <boost/shared_ptr.hpp>
> #include <cms/DeliveryMode.h>
> #include <cms/Connection.h>
> #include <activemq/library/ActiveMQCPP.h>
> // CLASS: ConnSessionManager
> //
> // Singleton class to manage Active MQ (AMQ) connections.
> //
> // getConnSession():
> //
> // This method gets a connection for the specified broker, creates a session
> // with the specified ackMode and populates the connSesn parameter with
> // shared pointers to the alloated resources.
> //
> // If a connection already exists for the brokerURI, it creates a session
> // for it and returns them, if the connection does not exist it makes one
> // attaches a session and returns them. If it cannot create the connection
> // (such as the broker is not up), it throws an exception.
> //
> // The connection manager can manage multiple connections based on the
> // brokerURI. Each unique broker URI will be a different connection managed
> // by the connection manager.
> //
> //
> class ConnSessionManager {
> public:
> typedef boost::shared_ptr<cms::Connection> P_SHR_CONN_T;
> typedef boost::shared_ptr<cms::Session> P_SHR_SESSION_T;
> struct ConnSession {
> P_SHR_CONN_T p_conn;
> P_SHR_SESSION_T p_session;
> };
> static void getConnSession(ConnSession &connSesn,
> const std::string &brokerURI,
> cms::Session::AcknowledgeMode ackMode);
> protected:
> // ----------------------------------------------------------------------
> // P R O T E C T E D M E M B E R V A R I A B L E S
> typedef std::map<const std::string, P_SHR_CONN_T> CM_MAP_T;
> CM_MAP_T cm_map;
> static boost::mutex access_mtx;
> // ----------------------------------------------------------------------
> // P R O T E C T E D M E M B E R F U N C T I O N
> ConnSessionManager();
> ~ConnSessionManager();
> static ConnSessionManager &Singleton(void);
> void addConnection(ConnSession &connSesn,
> const std::string &brokerURI,
> cms::Session::AcknowledgeMode ackMode);
> };
> // CLASS: AMQ_Producer
> //
> // Active MQ (AMQ) message producer.
> //
> // The constructor creates an AMQ Producer object that has the ability to send
> // messages to the specified DESTination, HOST and PORT.
> //
> // The send() method causes the message to be sent. While creating the message,
> // the send() method will apply the properites (if any) that have been set.
> //
> // The initProperty() method clears the list of properties that have been
> // established for this producer.
> //
> // The setProperty() method sets an AMQ message property for the message to be
> // sent. A new property is added each time this command is invoked, unless it's
> // for an existing property, in which case the old property is overwritten with
> // the new property.
> //
> class AMQ_Producer {
> public:
> AMQ_Producer(const std::string &dest,
> const std::string &host,
> const std::string &port);
> ~AMQ_Producer() {};
> void send(const std::string &msg,
> const std::string &type = "");
> protected:
> const std::string m_dest;
> std::string m_broker_uri;
> std::map<const std::string, std::string> propertyInfo;
> static const cms::DeliveryMode::DELIVERY_MODE m_delivery_mode;
> };
> // ---------------------------------------------------------------------------
> // ---------------------------------------------------------------------------
> #include <cms/ConnectionFactory.h>
> #include <boost/thread/once.hpp>
> // access control to the singleton.
> boost::mutex ConnSessionManager::access_mtx;
> static boost::once_flag init_flag = BOOST_ONCE_INIT;
> /****************************************************************************
> **
> ** Name: ConnSessionManager::ConnSessionManager
> **
> ** Function: Create the connection manage object (singleton) and initialize
> ** the Active MQ library that we use, once.
> **
> ** Input Parms: None
> **
> ** Return Parm: None
> **
> ****************************************************************************/
> ConnSessionManager::ConnSessionManager () :
> cm_map()
> {
> // only call the library initialization ONCE.
> boost::call_once(activemq::library::ActiveMQCPP::initializeLibrary,
> init_flag);
> }
> /****************************************************************************
> **
> ** Name: ConnSessionManager::~ConnSessionManager
> **
> ** Function: Destroty the connection manager
> **
> ** Input Parms: None
> **
> ** Return Parm: None
> **
> ****************************************************************************/
> ConnSessionManager::~ConnSessionManager ()
> {
> activemq::library::ActiveMQCPP::shutdownLibrary();
> }
> /****************************************************************************
> **
> ** Name: ConnSessionManager::Singleton
> **
> ** Function: Return a referece to our singleton.
> **
> ** Input Parms: None
> **
> ** Return Parm: None
> **
> ****************************************************************************/
> ConnSessionManager &
> ConnSessionManager::Singleton (void)
> {
> static ConnSessionManager singleton;
> return (singleton);
> }
> /****************************************************************************
> **
> ** Name: ConnSessionManager::addConnection
> **
> ** Function: Add, or re-add a connection to the connection manager.
> ** Create a session on the connection and return them.
> **
> ** An exception is thrown if we cannot get a connection.
> **
> ** Input Parms: connSesn - ConnSession reference that is populated with a
> ** connections shared pointer and session shared
> ** pointer.
> **
> ** brokerURI - The Active MQ broker URI string to use to create
> ** the connection.
> **
> ** http://activemq.apache.org/cms/cms-api-overview.html
> ** http://activemq.apache.org/cms/configuring.html
> **
> ** ackMode - The kind of acknowledgement we want the session to
> ** have
> **
> ** http://activemq.apache.org/cms/api_docs/activemqcpp-3.4.0/html/classcms_1_1_session.html#ae2fd7b8b76928b465727760c78522185
> **
> **
> ** Return Parm: None
> **
> ****************************************************************************/
> void
> ConnSessionManager::addConnection (ConnSessionManager::ConnSession &connSesn,
> const std::string &brokerURI,
> cms::Session::AcknowledgeMode ackMode)
> {
> CM_MAP_T &map_ref = Singleton().cm_map;
> P_SHR_CONN_T &p_conn(connSesn.p_conn);
> P_SHR_SESSION_T &p_session(connSesn.p_session);
> // Create a ConnectionFactory that we automatically dealloate itself
> std::auto_ptr<cms::ConnectionFactory>
> connectionFactory(cms::ConnectionFactory::
> createCMSConnectionFactory(brokerURI));
> // Create the connection and attach to shared pointer
> p_conn.reset(connectionFactory->createConnection());
> // Start the connection
> p_conn->start();
> // Create the session
> p_session.reset(p_conn->createSession(ackMode));
> // Update the map
> map_ref.erase(brokerURI);
> map_ref[brokerURI] = p_conn;
> }
> /****************************************************************************
> **
> ** Name: ConnSessionManager::getConnSession
> **
> ** Function: Get a working Active MQ connection for the specified brokerURI
> ** and return it along with an allocated session or thrown an
> ** exception.
> **
> ** This finds the existing AMQ connection for the specified
> ** brokerURI, validates that it's still connected, creates a
> ** session on it and returns them. If the connection isn't valid
> ** anymore (i.e. the broker restarted) or if the connection
> ** doesn't exist yet, we create it.
> **
> ** If we cannot create the connection we throw an exception.
> **
> ** WHY DO WE ALLOCATE A SESSION TOO? Why don't we just get the
> ** conection and return it? It's possible that the connection was
> ** reset since the last time we used it (i.e the broker
> ** restarted). The only way we can tell this is if the
> ** createSession call fails. Since we have to create a session to
> ** prove that the connection works, lets use it. The caller would
> ** do it once it had a connection anyway!
> **
> ** Why don't we just return a shared pointer to the session, why
> ** do we also return a shared pointer to the connection? Sessions
> ** are built on top of connections. It would be BAD if the
> ** connection was deleted before the session. So we keep a shared
> ** pointer of both together to ensure that they have the same
> ** lifecycle.
> **
> ** Input Parms: connSesn - ConnSession reference that is populated with a
> ** connections shared pointer and session shared
> ** pointer.
> **
> ** brokerURI - The Active MQ broker URI string to use to create
> ** the connection.
> **
> ** http://activemq.apache.org/cms/cms-api-overview.html
> ** http://activemq.apache.org/cms/configuring.html
> **
> ** ackMode - The kind of acknowledgement we want the session to
> ** have
> **
> ** http://activemq.apache.org/cms/api_docs/activemqcpp-3.4.0/html/classcms_1_1_session.html#ae2fd7b8b76928b465727760c78522185
> **
> **
> ** Return Parm: P_SHR_CONN_T - A shared pointer to the connection to use
> **
> ****************************************************************************/
> void
> ConnSessionManager::getConnSession (ConnSessionManager::ConnSession &connSesn,
> const std::string &brokerURI,
> cms::Session::AcknowledgeMode ackMode)
> {
> P_SHR_CONN_T &p_conn(connSesn.p_conn);
> P_SHR_SESSION_T &p_session(connSesn.p_session);
> ConnSessionManager &me = Singleton();
> CM_MAP_T &map_ref = me.cm_map;
> // restrict access
> boost::lock_guard<boost::mutex> lock(access_mtx);
> // get the entry
> p_conn = map_ref[brokerURI];
> // if it's not allocated yet, then allocate it, attach it and return it
> if (NULL == p_conn) {
> printf("Creating connection for %s\n",
> brokerURI.c_str());
> return (me.addConnection(connSesn, brokerURI, ackMode));
> }
> // Check if the broker restarted and reset our connection. If it has,
> // then create session will generate an exception and we know we then
> // need to make a new connection. If it it works then use the session,
> // passing it back in the connSession
> try {
> p_session.reset(p_conn->createSession(ackMode));
> }
> catch (std::exception &e) {
> printf("Re-creating connection for %s: %s\n",
> brokerURI.c_str(), e.what());
> return (me.addConnection(connSesn, brokerURI, ackMode));
> }
> }
> // **********************************************************************
> // **********************************************************************
> // AMQ_Producer
> // **********************************************************************
> // **********************************************************************
> // m_delivery_mode
> //
> // The producer's delivery mode is set to NON_PERSISTENT. When persistent
> // it causes the broker to save the message to disk so that it is available
> // even if the broker restarts. However, this has a price, it greatly slows
> // down the send (220,475 microseconds with persistance, vs. 226
> // microseconds without.) These measurements where taken WITH synchronous
> // sends. We could have investigated and used async sends to speed up the
> // send call and keep persistence, however, this wasn't deemed necessary.
> //
> // http://activemq.apache.org/cms/api_docs/activemqcpp-3.4.0/html/classcms_1_1_delivery_mode.html
> //
> // http://activemq.apache.org/persistence-questions.html
> //
> // http://activemq.apache.org/what-is-the-difference-between-persistent-and-non-persistent-delivery.html
> //
> const cms::DeliveryMode::DELIVERY_MODE
> AMQ_Producer::m_delivery_mode = cms::DeliveryMode::NON_PERSISTENT;
> /****************************************************************************
> **
> ** Name: AMQ_Producer::AMQ_Producer
> **
> ** Function: Create the Active MQ Producer client object, storing the
> ** destination and creating the URI string from the provided host
> ** name and port number.
> **
> ** Input Parms: dest: The destination queue/topic to communicate with
> **
> ** host: The host to communicate with
> **
> ** port: The port on the host to communicate with
> **
> ** Return Parm:
> **
> ****************************************************************************/
> AMQ_Producer::AMQ_Producer (const std::string &dest,
> const std::string &host,
> const std::string &port) :
> m_dest(dest),
> m_broker_uri("tcp://" + host + ":" + port +
> "?connection.alwaysSyncSend=true")
> {
> // add the send timeout if its environemtn varialbe is given and its a
> // value other than the default, zero.
> int sendTimeout(0);
> GetEnv("SEND_TO", sendTimeout, 0);
> if (sendTimeout) {
> std::stringstream ss_sto;
> ss_sto << "&connection.sendTimeout=" << sendTimeout;
> m_broker_uri += ss_sto.str();
> }
> }
> /****************************************************************************
> **
> ** Name: AMQ_Producer::send
> **
> ** Function: Cause the message to be sent by this producer to the
> ** established message queue, host and port.
> **
> ** This gets a cached (or if needed new) connection which has a
> ** session created for it, it then creates the destination,
> ** producer and message. Finally it adds all of the properties
> ** that we want to the message and sends it.
> **
> ** This model is best explained at the Active MQ CPP example page
> ** at: http://activemq.apache.org/cms/cms-api-overview.html
> **
> ** Input Parms: msg - The message to send with this producer
> **
> ** type - (optional) The CMSType/JMSType to set the message to.
> **
> ** Return Parm: None
> **
> ****************************************************************************/
> void
> AMQ_Producer::send (const std::string &msg,
> const std::string &type)
> {
> std::string s_where;
> try {
> ConnSessionManager::ConnSession conSesn;
> // Create the Connection + Session
> s_where = "creating connection/session";
> ConnSessionManager::getConnSession(conSesn, m_broker_uri,
> cms::Session::AUTO_ACKNOWLEDGE);
> // Create the Destination (queue/topic)
> s_where = "creating destination";
> boost::shared_ptr<cms::Destination>
> p_destination(conSesn.p_session->createQueue(m_dest));
> // Create the Producer
> s_where = "creating message producer";
> boost::shared_ptr<cms::MessageProducer>
> p_producer(conSesn.p_session->createProducer(p_destination.get()));
> p_producer->setDeliveryMode(m_delivery_mode);
> // Create the Message
> s_where = "creating connection";
> boost::shared_ptr<cms::TextMessage>
> p_msg(conSesn.p_session->createTextMessage(msg));
> // Set the Message's JMSType/CMSType
> if (!type.empty()) {
> p_msg->setCMSType(type);
> }
> std::map<const std::string, std::string>::iterator it;
> // run through all properties, adding them to the message
> for (it = propertyInfo.begin(); it != propertyInfo.end(); ++it) {
> s_where = "adding property " + (*it).first;
> p_msg->setStringProperty((*it).first, (*it).second);
> }
> // Send it!
> s_where = "sending message";
> p_producer->send(p_msg.get());
> }
> catch (std::exception &e) {
> std::stringstream ss_err;
> ss_err << "Error while " << s_where << " [" << e.what() << "]";
> throw ee_except(ss_err);
> }
> }
> // ---------------------------------------------------------------------------
> const std::string base_msg("1024 characters folowed by a message.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxHello world! ");
> void run_test ()
> {
> std::string host("127.0.0.1");
> std::string port("61616");
> std::string destURI("c.c.p.v.ms.events");
> unsigned int iteration(0);
> unsigned int ex_count(0);
> // Loop, creating producers and sending the message
> while (1) {
> iteration++;
> AMQ_Producer producer(destURI, host, port);
> std::stringstream msg;
> msg << base_msg << iteration;
> printf("Sending message #%d\n", iteration);
> try {
> producer.send(msg.str(), "TEST_TYPE");
> }
> catch (std::exception &e) {
> ex_count++;
> printf("\n\nException[%d]: %s\n", ex_count, e.what());
> if (ex_count >= 10 ) {
> printf("\n\nException limit (%d) reached. Stopping test\n\n\n",
> ex_count);
> break;
> }
> }
> }
> }
> ///////////////////////////////////////////////////////////////////////////////
> int main(int, char*) {
> activemq::library::ActiveMQCPP::initializeLibrary();
> std::cout << "=====================================================\n";
> std::cout << "Starting the example:" << std::endl;
> std::cout << "-----------------------------------------------------\n";
> run_test();
> std::cout << "-----------------------------------------------------\n";
> std::cout << "Finished with the example." << std::endl;
> std::cout << "=====================================================\n";
> activemq::library::ActiveMQCPP::shutdownLibrary();
> }
> {code}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira