You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by dlaio <d....@gmail.com> on 2011/07/15 21:08:21 UTC

Activemq CMS - high data rate lockup

Perhaps my search foo is weak, but I have been unable to find any recent
examples of people sending high data data rates over activemq CMS. 
http://www.mostly-useless.com/blog/2007/12/27/playing-with-activemq/ -> has
some good data (although from 2007).

Have found some references to CMS 2.2 through the nabble search, but coming
up short at the moment for more recent applications.

I have a pretty simple data driven signal processing system, the processing
is essentially linear:

A-->B-->C-->D

Data rates are highish (~110 Mbit/sec) but nothing astronomical, messages
vary in size, but are generally pretty big (average ~ 200kB, but can be up
to a 1MB).

I moved from a simple sockets to activemq (for portability reasons), I am
using activemq 5.4.2 and CMS 3.4.2 on a pretty peppy RHEL 5.3 64 bit machine
(2.6 Ghz 8 core Xeon with 12GB RAM) (the versions/OS that I use is defined
for me, so updating versions is a bigish deal (I know 5.5 is out)).

I can slow down the processing, and running with anything close to 40Mb/sec
results in the processes consuming a full CPU each (where as running without
activemq each process consumes ~ 20% of a processor) so I am concerned about
activemq's overhead (I suppose it is more likely the overhead of the CMS
bindings). 

I don't care about persistence, the data is time sensitive so if the
processing goes down it is of no value for me (video like data).  I have
disabled producerFlowControl (unsure if that was a good idea), but as a data
driven processing chain if I am not able to keep up with the data rate then
I am dead in the water.  Going to try nio for the transport as I have seen
that mentioned a few places for high throughput usage.

Seem to get into a deadlock state where the process is waiting on a mutex:

__kernel_vsyscall() at 0xffffe410	
pthread_cond_timedwait@@GLIBC_2.3.2() at 0xf7b5bd12	
decaf::internal::util::concurrent::ConditionImpl::wait() at 0xf77ce9f3	
decaf::util::concurrent::Mutex::wait() at 0xf784d767	
decaf::util::concurrent::Mutex::wait() at 0xf784d5f2	
activemq::transport::failover::FailoverTransport::oneway() at 0xf7540ed7	
activemq::transport::correlator::ResponseCorrelator::oneway() at 0xf752a2b8	
activemq::core::ActiveMQConnection::oneway() at 0xf746af12	
activemq::core::ActiveMQSession::send() at 0xf74bb9e2	
activemq::core::ActiveMQProducer::send() at 0xf74aa764	
activemq::core::ActiveMQProducer::send() at 0xf74a7dd1	
activemq::core::ActiveMQProducer::send() at 0xf74a9585	
activeMqProducer::send() at ioActiveMqProducer.cpp:116 0x805a235	
io::send() at io.cpp:162 0x8057614		
process_data_msg() at main.cpp:814 0x804cf7c	
process_message() at main.cpp:891 0x804d272	
main() at main.cpp:1,097 0x804e481	

I esentially used the cpp samples when I created my io library (just a
simple wrapper).  

Consumer code (Sanitised - so may not be syntactically correct) :

void activeMqConsumer::runConsumer()
{

	try {

		// Create a ConnectionFactory
		ActiveMQConnectionFactory* connectionFactory = new
ActiveMQConnectionFactory( brokerURI );

		// Create a Connection
		connection = connectionFactory->createConnection();
		delete connectionFactory;

		ActiveMQConnection* amqConnection =
dynamic_cast&lt;ActiveMQConnection*&gt;( connection );
		if( amqConnection != NULL ) {
			amqConnection->addTransportListener( this );
		}

		connection->setExceptionListener(this);

		// Create a Session
		if( clientAck ) {
			session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
		} else {
			session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
		}

		// Create the destination (Topic or Queue)
		if( useTopic ) {
			destination = session->createTopic( destURI );
		} else {
			destination = session->createQueue( destURI );
		}

		// Create a MessageConsumer from the Session to the Topic or Queue
		consumer = session->createConsumer( destination );
		consumer->setMessageListener( this );

	} catch (CMSException& e) {
		e.printStackTrace();
	}
}


// Called from the consumer since this class is a registered
MessageListener.
void activeMqConsumer::onMessage( const Message* message )
{
	INT32 msgSize;
	INT32 bytesRead = 0;
	FLOAT64 latency = 0;
	TIME currentTime;
	INT32 rval;

	rval = SDL_SemWait(this->pElementAvailable);
	if (rval == -1)
	{
		printf("(onMessage) error: SDL_SemWait failed\n");
	}

	try
	{
		// cast the activeMq "message" into an array of bytes
		const cms::BytesMessage *bytesMessage = dynamic_cast<const
cms::BytesMessage *>(message);
	    if(bytesMessage != NULL)
	    {
	        msgSize = bytesMessage->getBodyLength();
			bytesRead = bytesMessage->readBytes((unsigned char*)this->pInputMsg,
msgSize);
			totalBytesRecvd += msgSize;
			totalMsgsRecvd++;
			// the assumption here is that you receive messages after they have been
sent
			// if the times between processing modules are not synchronized all bets
are off.
			getSystemTime(&(currentTime));
			latency = (FLOAT64)currentTime - (FLOAT64)this->pInputMsg->hdr.time;
			this->accruedLatency += latency;

	      if(clientAck == TRUE)
	      {
	         message->acknowledge();
	      }
	    }
	}
   catch(cms::CMSException &exception)
   {
      exception.printStackTrace();
   }

	if(startTime == 0)
	{
		getSystemTime(&(startTime));
	}

	// indicate that there is data available
	rval = SDL_SemPost(this->pDataAvailable);
	if (rval == -1)
	{
		printf("(enqueue) error: SDL_SemPost failed\n");
	}

}

Producer code:

activeMqProducer::activeMqProducer( const std::string& brokerURI,
                         const std::string& destURI,
                         BOOLEAN useTopic)
{
		// Create a ConnectionFactory
	    auto_ptr<ConnectionFactory>
connectionFactory(ConnectionFactory::createCMSConnectionFactory( brokerURI )
);

	    // Create a Connection
	    connection = connectionFactory->createConnection();

	    sessionTransacted = false;

	    // Create a Session
	    if( sessionTransacted )
	    {
	        session = connection->createSession( Session::SESSION_TRANSACTED );
	    }
	    else
	    {
	        session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
	    }

	    // Create the destination (Topic or Queue)
	    if( useTopic )
	    {
	        destination = session->createTopic( destURI );
	    }
	    else
	    {
	        destination = session->createQueue( destURI );
	    }

	    // Create a MessageProducer from the Session to the Topic or Queue
	    producer = session->createProducer( destination );
	    producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );

	    message = session->createBytesMessage();

}


void activeMqProducer::send(IO_MSG *pOutputMsg)
{
	// Set the send time in the IO message
	getSystemTime(&(pOutputMsg->hdr.time));
	// Convert the message into an activeMQ bytesMessage
	message->setBodyBytes( (const unsigned char*)pOutputMsg,
pOutputMsg->hdr.msgSize + sizeof(MSG_HDR));
	// Send to broker
	producer->send( message );
}

Any suggestions on how to speed up the data rates and stop the producer from
locking up would be greatly appreciated. 

V/R
~Joe

--
View this message in context: http://activemq.2283324.n4.nabble.com/Activemq-CMS-high-data-rate-lockup-tp3670718p3670718.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Activemq CMS - high data rate lockup

Posted by Timothy Bish <ta...@gmail.com>.
On Fri, 2011-07-15 at 12:08 -0700, dlaio wrote:
> Perhaps my search foo is weak, but I have been unable to find any recent
> examples of people sending high data data rates over activemq CMS. 
> http://www.mostly-useless.com/blog/2007/12/27/playing-with-activemq/ -> has
> some good data (although from 2007).
> 
> Have found some references to CMS 2.2 through the nabble search, but coming
> up short at the moment for more recent applications.
> 
> I have a pretty simple data driven signal processing system, the processing
> is essentially linear:
> 
> A-->B-->C-->D
> 
> Data rates are highish (~110 Mbit/sec) but nothing astronomical, messages
> vary in size, but are generally pretty big (average ~ 200kB, but can be up
> to a 1MB).
> 
> I moved from a simple sockets to activemq (for portability reasons), I am
> using activemq 5.4.2 and CMS 3.4.2 on a pretty peppy RHEL 5.3 64 bit machine
> (2.6 Ghz 8 core Xeon with 12GB RAM) (the versions/OS that I use is defined
> for me, so updating versions is a bigish deal (I know 5.5 is out)).
> 
> I can slow down the processing, and running with anything close to 40Mb/sec
> results in the processes consuming a full CPU each (where as running without
> activemq each process consumes ~ 20% of a processor) so I am concerned about
> activemq's overhead (I suppose it is more likely the overhead of the CMS
> bindings). 
> 
> I don't care about persistence, the data is time sensitive so if the
> processing goes down it is of no value for me (video like data).  I have
> disabled producerFlowControl (unsure if that was a good idea), but as a data
> driven processing chain if I am not able to keep up with the data rate then
> I am dead in the water.  Going to try nio for the transport as I have seen
> that mentioned a few places for high throughput usage.
> 
> Seem to get into a deadlock state where the process is waiting on a mutex:
> 
> __kernel_vsyscall() at 0xffffe410	
> pthread_cond_timedwait@@GLIBC_2.3.2() at 0xf7b5bd12	
> decaf::internal::util::concurrent::ConditionImpl::wait() at 0xf77ce9f3	
> decaf::util::concurrent::Mutex::wait() at 0xf784d767	
> decaf::util::concurrent::Mutex::wait() at 0xf784d5f2	
> activemq::transport::failover::FailoverTransport::oneway() at 0xf7540ed7	
> activemq::transport::correlator::ResponseCorrelator::oneway() at 0xf752a2b8	
> activemq::core::ActiveMQConnection::oneway() at 0xf746af12	
> activemq::core::ActiveMQSession::send() at 0xf74bb9e2	
> activemq::core::ActiveMQProducer::send() at 0xf74aa764	
> activemq::core::ActiveMQProducer::send() at 0xf74a7dd1	
> activemq::core::ActiveMQProducer::send() at 0xf74a9585	
> activeMqProducer::send() at ioActiveMqProducer.cpp:116 0x805a235	
> io::send() at io.cpp:162 0x8057614		
> process_data_msg() at main.cpp:814 0x804cf7c	
> process_message() at main.cpp:891 0x804d272	
> main() at main.cpp:1,097 0x804e481	
> 
> I esentially used the cpp samples when I created my io library (just a
> simple wrapper).  
> 

If you are experiencing deadlocks in the code the best way to help is to
create a new Jira ticket and attach a complete backtrace of all threads
so that we can see where the deadlock is occurring.  Also a unit test
that reproduces the issue would be helpful.  

Regards
Tim.


> Consumer code (Sanitised - so may not be syntactically correct) :
> 
> void activeMqConsumer::runConsumer()
> {
> 
> 	try {
> 
> 		// Create a ConnectionFactory
> 		ActiveMQConnectionFactory* connectionFactory = new
> ActiveMQConnectionFactory( brokerURI );
> 
> 		// Create a Connection
> 		connection = connectionFactory->createConnection();
> 		delete connectionFactory;
> 
> 		ActiveMQConnection* amqConnection =
> dynamic_cast&lt;ActiveMQConnection*&gt;( connection );
> 		if( amqConnection != NULL ) {
> 			amqConnection->addTransportListener( this );
> 		}
> 
> 		connection->setExceptionListener(this);
> 
> 		// Create a Session
> 		if( clientAck ) {
> 			session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
> 		} else {
> 			session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
> 		}
> 
> 		// Create the destination (Topic or Queue)
> 		if( useTopic ) {
> 			destination = session->createTopic( destURI );
> 		} else {
> 			destination = session->createQueue( destURI );
> 		}
> 
> 		// Create a MessageConsumer from the Session to the Topic or Queue
> 		consumer = session->createConsumer( destination );
> 		consumer->setMessageListener( this );
> 
> 	} catch (CMSException& e) {
> 		e.printStackTrace();
> 	}
> }
> 
> 
> // Called from the consumer since this class is a registered
> MessageListener.
> void activeMqConsumer::onMessage( const Message* message )
> {
> 	INT32 msgSize;
> 	INT32 bytesRead = 0;
> 	FLOAT64 latency = 0;
> 	TIME currentTime;
> 	INT32 rval;
> 
> 	rval = SDL_SemWait(this->pElementAvailable);
> 	if (rval == -1)
> 	{
> 		printf("(onMessage) error: SDL_SemWait failed\n");
> 	}
> 
> 	try
> 	{
> 		// cast the activeMq "message" into an array of bytes
> 		const cms::BytesMessage *bytesMessage = dynamic_cast<const
> cms::BytesMessage *>(message);
> 	    if(bytesMessage != NULL)
> 	    {
> 	        msgSize = bytesMessage->getBodyLength();
> 			bytesRead = bytesMessage->readBytes((unsigned char*)this->pInputMsg,
> msgSize);
> 			totalBytesRecvd += msgSize;
> 			totalMsgsRecvd++;
> 			// the assumption here is that you receive messages after they have been
> sent
> 			// if the times between processing modules are not synchronized all bets
> are off.
> 			getSystemTime(&(currentTime));
> 			latency = (FLOAT64)currentTime - (FLOAT64)this->pInputMsg->hdr.time;
> 			this->accruedLatency += latency;
> 
> 	      if(clientAck == TRUE)
> 	      {
> 	         message->acknowledge();
> 	      }
> 	    }
> 	}
>    catch(cms::CMSException &exception)
>    {
>       exception.printStackTrace();
>    }
> 
> 	if(startTime == 0)
> 	{
> 		getSystemTime(&(startTime));
> 	}
> 
> 	// indicate that there is data available
> 	rval = SDL_SemPost(this->pDataAvailable);
> 	if (rval == -1)
> 	{
> 		printf("(enqueue) error: SDL_SemPost failed\n");
> 	}
> 
> }
> 
> Producer code:
> 
> activeMqProducer::activeMqProducer( const std::string& brokerURI,
>                          const std::string& destURI,
>                          BOOLEAN useTopic)
> {
> 		// Create a ConnectionFactory
> 	    auto_ptr<ConnectionFactory>
> connectionFactory(ConnectionFactory::createCMSConnectionFactory( brokerURI )
> );
> 
> 	    // Create a Connection
> 	    connection = connectionFactory->createConnection();
> 
> 	    sessionTransacted = false;
> 
> 	    // Create a Session
> 	    if( sessionTransacted )
> 	    {
> 	        session = connection->createSession( Session::SESSION_TRANSACTED );
> 	    }
> 	    else
> 	    {
> 	        session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
> 	    }
> 
> 	    // Create the destination (Topic or Queue)
> 	    if( useTopic )
> 	    {
> 	        destination = session->createTopic( destURI );
> 	    }
> 	    else
> 	    {
> 	        destination = session->createQueue( destURI );
> 	    }
> 
> 	    // Create a MessageProducer from the Session to the Topic or Queue
> 	    producer = session->createProducer( destination );
> 	    producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
> 
> 	    message = session->createBytesMessage();
> 
> }
> 
> 
> void activeMqProducer::send(IO_MSG *pOutputMsg)
> {
> 	// Set the send time in the IO message
> 	getSystemTime(&(pOutputMsg->hdr.time));
> 	// Convert the message into an activeMQ bytesMessage
> 	message->setBodyBytes( (const unsigned char*)pOutputMsg,
> pOutputMsg->hdr.msgSize + sizeof(MSG_HDR));
> 	// Send to broker
> 	producer->send( message );
> }
> 
> Any suggestions on how to speed up the data rates and stop the producer from
> locking up would be greatly appreciated. 
> 
> V/R
> ~Joe
> 
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/Activemq-CMS-high-data-rate-lockup-tp3670718p3670718.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.

-- 
Tim Bish
------------
FuseSource
Email: tim.bish@fusesource.com
Web: http://fusesource.com
Twitter: tabish121
Blog: http://timbish.blogspot.com/