You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by dlaio <d....@gmail.com> on 2011/07/15 21:08:21 UTC
Activemq CMS - high data rate lockup
Perhaps my search foo is weak, but I have been unable to find any recent
examples of people sending high data data rates over activemq CMS.
http://www.mostly-useless.com/blog/2007/12/27/playing-with-activemq/ -> has
some good data (although from 2007).
Have found some references to CMS 2.2 through the nabble search, but coming
up short at the moment for more recent applications.
I have a pretty simple data driven signal processing system, the processing
is essentially linear:
A-->B-->C-->D
Data rates are highish (~110 Mbit/sec) but nothing astronomical, messages
vary in size, but are generally pretty big (average ~ 200kB, but can be up
to a 1MB).
I moved from a simple sockets to activemq (for portability reasons), I am
using activemq 5.4.2 and CMS 3.4.2 on a pretty peppy RHEL 5.3 64 bit machine
(2.6 Ghz 8 core Xeon with 12GB RAM) (the versions/OS that I use is defined
for me, so updating versions is a bigish deal (I know 5.5 is out)).
I can slow down the processing, and running with anything close to 40Mb/sec
results in the processes consuming a full CPU each (where as running without
activemq each process consumes ~ 20% of a processor) so I am concerned about
activemq's overhead (I suppose it is more likely the overhead of the CMS
bindings).
I don't care about persistence, the data is time sensitive so if the
processing goes down it is of no value for me (video like data). I have
disabled producerFlowControl (unsure if that was a good idea), but as a data
driven processing chain if I am not able to keep up with the data rate then
I am dead in the water. Going to try nio for the transport as I have seen
that mentioned a few places for high throughput usage.
Seem to get into a deadlock state where the process is waiting on a mutex:
__kernel_vsyscall() at 0xffffe410
pthread_cond_timedwait@@GLIBC_2.3.2() at 0xf7b5bd12
decaf::internal::util::concurrent::ConditionImpl::wait() at 0xf77ce9f3
decaf::util::concurrent::Mutex::wait() at 0xf784d767
decaf::util::concurrent::Mutex::wait() at 0xf784d5f2
activemq::transport::failover::FailoverTransport::oneway() at 0xf7540ed7
activemq::transport::correlator::ResponseCorrelator::oneway() at 0xf752a2b8
activemq::core::ActiveMQConnection::oneway() at 0xf746af12
activemq::core::ActiveMQSession::send() at 0xf74bb9e2
activemq::core::ActiveMQProducer::send() at 0xf74aa764
activemq::core::ActiveMQProducer::send() at 0xf74a7dd1
activemq::core::ActiveMQProducer::send() at 0xf74a9585
activeMqProducer::send() at ioActiveMqProducer.cpp:116 0x805a235
io::send() at io.cpp:162 0x8057614
process_data_msg() at main.cpp:814 0x804cf7c
process_message() at main.cpp:891 0x804d272
main() at main.cpp:1,097 0x804e481
I esentially used the cpp samples when I created my io library (just a
simple wrapper).
Consumer code (Sanitised - so may not be syntactically correct) :
void activeMqConsumer::runConsumer()
{
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory* connectionFactory = new
ActiveMQConnectionFactory( brokerURI );
// Create a Connection
connection = connectionFactory->createConnection();
delete connectionFactory;
ActiveMQConnection* amqConnection =
dynamic_cast<ActiveMQConnection*>( connection );
if( amqConnection != NULL ) {
amqConnection->addTransportListener( this );
}
connection->setExceptionListener(this);
// Create a Session
if( clientAck ) {
session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
} else {
session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
}
// Create the destination (Topic or Queue)
if( useTopic ) {
destination = session->createTopic( destURI );
} else {
destination = session->createQueue( destURI );
}
// Create a MessageConsumer from the Session to the Topic or Queue
consumer = session->createConsumer( destination );
consumer->setMessageListener( this );
} catch (CMSException& e) {
e.printStackTrace();
}
}
// Called from the consumer since this class is a registered
MessageListener.
void activeMqConsumer::onMessage( const Message* message )
{
INT32 msgSize;
INT32 bytesRead = 0;
FLOAT64 latency = 0;
TIME currentTime;
INT32 rval;
rval = SDL_SemWait(this->pElementAvailable);
if (rval == -1)
{
printf("(onMessage) error: SDL_SemWait failed\n");
}
try
{
// cast the activeMq "message" into an array of bytes
const cms::BytesMessage *bytesMessage = dynamic_cast<const
cms::BytesMessage *>(message);
if(bytesMessage != NULL)
{
msgSize = bytesMessage->getBodyLength();
bytesRead = bytesMessage->readBytes((unsigned char*)this->pInputMsg,
msgSize);
totalBytesRecvd += msgSize;
totalMsgsRecvd++;
// the assumption here is that you receive messages after they have been
sent
// if the times between processing modules are not synchronized all bets
are off.
getSystemTime(&(currentTime));
latency = (FLOAT64)currentTime - (FLOAT64)this->pInputMsg->hdr.time;
this->accruedLatency += latency;
if(clientAck == TRUE)
{
message->acknowledge();
}
}
}
catch(cms::CMSException &exception)
{
exception.printStackTrace();
}
if(startTime == 0)
{
getSystemTime(&(startTime));
}
// indicate that there is data available
rval = SDL_SemPost(this->pDataAvailable);
if (rval == -1)
{
printf("(enqueue) error: SDL_SemPost failed\n");
}
}
Producer code:
activeMqProducer::activeMqProducer( const std::string& brokerURI,
const std::string& destURI,
BOOLEAN useTopic)
{
// Create a ConnectionFactory
auto_ptr<ConnectionFactory>
connectionFactory(ConnectionFactory::createCMSConnectionFactory( brokerURI )
);
// Create a Connection
connection = connectionFactory->createConnection();
sessionTransacted = false;
// Create a Session
if( sessionTransacted )
{
session = connection->createSession( Session::SESSION_TRANSACTED );
}
else
{
session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
}
// Create the destination (Topic or Queue)
if( useTopic )
{
destination = session->createTopic( destURI );
}
else
{
destination = session->createQueue( destURI );
}
// Create a MessageProducer from the Session to the Topic or Queue
producer = session->createProducer( destination );
producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
message = session->createBytesMessage();
}
void activeMqProducer::send(IO_MSG *pOutputMsg)
{
// Set the send time in the IO message
getSystemTime(&(pOutputMsg->hdr.time));
// Convert the message into an activeMQ bytesMessage
message->setBodyBytes( (const unsigned char*)pOutputMsg,
pOutputMsg->hdr.msgSize + sizeof(MSG_HDR));
// Send to broker
producer->send( message );
}
Any suggestions on how to speed up the data rates and stop the producer from
locking up would be greatly appreciated.
V/R
~Joe
--
View this message in context: http://activemq.2283324.n4.nabble.com/Activemq-CMS-high-data-rate-lockup-tp3670718p3670718.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.
Re: Activemq CMS - high data rate lockup
Posted by Timothy Bish <ta...@gmail.com>.
On Fri, 2011-07-15 at 12:08 -0700, dlaio wrote:
> Perhaps my search foo is weak, but I have been unable to find any recent
> examples of people sending high data data rates over activemq CMS.
> http://www.mostly-useless.com/blog/2007/12/27/playing-with-activemq/ -> has
> some good data (although from 2007).
>
> Have found some references to CMS 2.2 through the nabble search, but coming
> up short at the moment for more recent applications.
>
> I have a pretty simple data driven signal processing system, the processing
> is essentially linear:
>
> A-->B-->C-->D
>
> Data rates are highish (~110 Mbit/sec) but nothing astronomical, messages
> vary in size, but are generally pretty big (average ~ 200kB, but can be up
> to a 1MB).
>
> I moved from a simple sockets to activemq (for portability reasons), I am
> using activemq 5.4.2 and CMS 3.4.2 on a pretty peppy RHEL 5.3 64 bit machine
> (2.6 Ghz 8 core Xeon with 12GB RAM) (the versions/OS that I use is defined
> for me, so updating versions is a bigish deal (I know 5.5 is out)).
>
> I can slow down the processing, and running with anything close to 40Mb/sec
> results in the processes consuming a full CPU each (where as running without
> activemq each process consumes ~ 20% of a processor) so I am concerned about
> activemq's overhead (I suppose it is more likely the overhead of the CMS
> bindings).
>
> I don't care about persistence, the data is time sensitive so if the
> processing goes down it is of no value for me (video like data). I have
> disabled producerFlowControl (unsure if that was a good idea), but as a data
> driven processing chain if I am not able to keep up with the data rate then
> I am dead in the water. Going to try nio for the transport as I have seen
> that mentioned a few places for high throughput usage.
>
> Seem to get into a deadlock state where the process is waiting on a mutex:
>
> __kernel_vsyscall() at 0xffffe410
> pthread_cond_timedwait@@GLIBC_2.3.2() at 0xf7b5bd12
> decaf::internal::util::concurrent::ConditionImpl::wait() at 0xf77ce9f3
> decaf::util::concurrent::Mutex::wait() at 0xf784d767
> decaf::util::concurrent::Mutex::wait() at 0xf784d5f2
> activemq::transport::failover::FailoverTransport::oneway() at 0xf7540ed7
> activemq::transport::correlator::ResponseCorrelator::oneway() at 0xf752a2b8
> activemq::core::ActiveMQConnection::oneway() at 0xf746af12
> activemq::core::ActiveMQSession::send() at 0xf74bb9e2
> activemq::core::ActiveMQProducer::send() at 0xf74aa764
> activemq::core::ActiveMQProducer::send() at 0xf74a7dd1
> activemq::core::ActiveMQProducer::send() at 0xf74a9585
> activeMqProducer::send() at ioActiveMqProducer.cpp:116 0x805a235
> io::send() at io.cpp:162 0x8057614
> process_data_msg() at main.cpp:814 0x804cf7c
> process_message() at main.cpp:891 0x804d272
> main() at main.cpp:1,097 0x804e481
>
> I esentially used the cpp samples when I created my io library (just a
> simple wrapper).
>
If you are experiencing deadlocks in the code the best way to help is to
create a new Jira ticket and attach a complete backtrace of all threads
so that we can see where the deadlock is occurring. Also a unit test
that reproduces the issue would be helpful.
Regards
Tim.
> Consumer code (Sanitised - so may not be syntactically correct) :
>
> void activeMqConsumer::runConsumer()
> {
>
> try {
>
> // Create a ConnectionFactory
> ActiveMQConnectionFactory* connectionFactory = new
> ActiveMQConnectionFactory( brokerURI );
>
> // Create a Connection
> connection = connectionFactory->createConnection();
> delete connectionFactory;
>
> ActiveMQConnection* amqConnection =
> dynamic_cast<ActiveMQConnection*>( connection );
> if( amqConnection != NULL ) {
> amqConnection->addTransportListener( this );
> }
>
> connection->setExceptionListener(this);
>
> // Create a Session
> if( clientAck ) {
> session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
> } else {
> session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
> }
>
> // Create the destination (Topic or Queue)
> if( useTopic ) {
> destination = session->createTopic( destURI );
> } else {
> destination = session->createQueue( destURI );
> }
>
> // Create a MessageConsumer from the Session to the Topic or Queue
> consumer = session->createConsumer( destination );
> consumer->setMessageListener( this );
>
> } catch (CMSException& e) {
> e.printStackTrace();
> }
> }
>
>
> // Called from the consumer since this class is a registered
> MessageListener.
> void activeMqConsumer::onMessage( const Message* message )
> {
> INT32 msgSize;
> INT32 bytesRead = 0;
> FLOAT64 latency = 0;
> TIME currentTime;
> INT32 rval;
>
> rval = SDL_SemWait(this->pElementAvailable);
> if (rval == -1)
> {
> printf("(onMessage) error: SDL_SemWait failed\n");
> }
>
> try
> {
> // cast the activeMq "message" into an array of bytes
> const cms::BytesMessage *bytesMessage = dynamic_cast<const
> cms::BytesMessage *>(message);
> if(bytesMessage != NULL)
> {
> msgSize = bytesMessage->getBodyLength();
> bytesRead = bytesMessage->readBytes((unsigned char*)this->pInputMsg,
> msgSize);
> totalBytesRecvd += msgSize;
> totalMsgsRecvd++;
> // the assumption here is that you receive messages after they have been
> sent
> // if the times between processing modules are not synchronized all bets
> are off.
> getSystemTime(&(currentTime));
> latency = (FLOAT64)currentTime - (FLOAT64)this->pInputMsg->hdr.time;
> this->accruedLatency += latency;
>
> if(clientAck == TRUE)
> {
> message->acknowledge();
> }
> }
> }
> catch(cms::CMSException &exception)
> {
> exception.printStackTrace();
> }
>
> if(startTime == 0)
> {
> getSystemTime(&(startTime));
> }
>
> // indicate that there is data available
> rval = SDL_SemPost(this->pDataAvailable);
> if (rval == -1)
> {
> printf("(enqueue) error: SDL_SemPost failed\n");
> }
>
> }
>
> Producer code:
>
> activeMqProducer::activeMqProducer( const std::string& brokerURI,
> const std::string& destURI,
> BOOLEAN useTopic)
> {
> // Create a ConnectionFactory
> auto_ptr<ConnectionFactory>
> connectionFactory(ConnectionFactory::createCMSConnectionFactory( brokerURI )
> );
>
> // Create a Connection
> connection = connectionFactory->createConnection();
>
> sessionTransacted = false;
>
> // Create a Session
> if( sessionTransacted )
> {
> session = connection->createSession( Session::SESSION_TRANSACTED );
> }
> else
> {
> session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
> }
>
> // Create the destination (Topic or Queue)
> if( useTopic )
> {
> destination = session->createTopic( destURI );
> }
> else
> {
> destination = session->createQueue( destURI );
> }
>
> // Create a MessageProducer from the Session to the Topic or Queue
> producer = session->createProducer( destination );
> producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
>
> message = session->createBytesMessage();
>
> }
>
>
> void activeMqProducer::send(IO_MSG *pOutputMsg)
> {
> // Set the send time in the IO message
> getSystemTime(&(pOutputMsg->hdr.time));
> // Convert the message into an activeMQ bytesMessage
> message->setBodyBytes( (const unsigned char*)pOutputMsg,
> pOutputMsg->hdr.msgSize + sizeof(MSG_HDR));
> // Send to broker
> producer->send( message );
> }
>
> Any suggestions on how to speed up the data rates and stop the producer from
> locking up would be greatly appreciated.
>
> V/R
> ~Joe
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/Activemq-CMS-high-data-rate-lockup-tp3670718p3670718.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
--
Tim Bish
------------
FuseSource
Email: tim.bish@fusesource.com
Web: http://fusesource.com
Twitter: tabish121
Blog: http://timbish.blogspot.com/