You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by Matevz Tadel <ma...@cern.ch> on 2013/03/08 03:22:02 UTC

[amq-cpp] cms::ExceptionListener::onException() and what one can do there

Hi,

I'm using a single connection to an AMQ topic in a multi-threaded program that 
does many other things beyond sending data over AMQ. So, when an AMQ 
onException() happens I really do not want to exit the application (all demos 
seem to have "// exit(1);" in this handler :) ) and would like to reinitialize 
my single connection even if this means leaking some resources.

I'm getting exceptions like this since switching to 3.5.0 (but the load on 
application is getting higher so it doesn't have to be a change introduced in 
3.5.0):
2013-03-06 01:51:51 ERR XrdFileCloseReporterAmq::onException Exception callback 
invoked:
     DataInputStream::readLong - Reached EOF
and the application now crashes when I close the connection (before this used to 
trigger throwing of a true exception in my application thread that does AMQ calls).

Please see my connect and onException() implementations below. Full class is 
available here:
http://www.gled.org/viewvc/gled/trunk/libsets/XrdMon/Glasses/XrdFileCloseReporterAmq.h?revision=2860&view=markup
http://www.gled.org/viewvc/gled/trunk/libsets/XrdMon/Glasses/XrdFileCloseReporterAmq.cxx?revision=2862&view=markup

I guess the question is ... is there anything sane one can do after 
onException() is invoked or this actually means internal state of amq-cpp is 
corrupted beyond repair? If this is indeed the case, the only option  for me is 
to separate sending of AMQ messages from the main application.

Best,
Matevz


void XrdFileCloseReporterAmq::amq_connect()
{
   static const Exc_t _eh("XrdFileCloseReporterAmq::amq_connect ");

   TString uri;

   // 2012-07 Failover and Stomp don't splice ... or, they splice too well, 
making as
   // many threads as ulimit lets them on first error ... thrashing the machine.
   // uri.Form("failover://(tcp://%s:%hu?wireFormat=stomp)", mAmqHost.Data(), 
mAmqPort);

   uri.Form("tcp://%s:%hu?wireFormat=stomp", mAmqHost.Data(), mAmqPort);

   try
   {
     auto_ptr<cms::ConnectionFactory> conn_fac
       (new activemq::core::ActiveMQConnectionFactory(uri.Data(), 
mAmqUser.Data(), mAmqPswd.Data()));

     mConn = conn_fac->createConnection();
     mConn->setExceptionListener(this);
     mConn->start();
   }
   catch (cms::CMSException& e)
   {
     throw _eh + "Exception during connection creation: " + e.getStackTraceString();
   }

   try
   {
     mSess = mConn->createSession(); // Default is AUTO_ACKNOWLEDGE
     mDest = mSess->createTopic(mAmqTopic.Data());
     mProd = mSess->createProducer(mDest);
     mProd->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT); // Copied from 
examples, NFI.
   }
   catch (cms::InvalidDestinationException& e)
   {
     throw _eh + "Invalid destination exception during producer creation: " + 
e.getStackTraceString();
   }
   catch (cms::CMSException& e)
   {
     throw _eh + "Exception during session, topic or message producer 
initialization: " + e.getStackTraceString();
   }
}


void XrdFileCloseReporterAmq::onException(const cms::CMSException &e)
{
   static const Exc_t _eh("XrdFileCloseReporterAmq::onException ");

   if (*mLog)
       mLog->Form(ZLog::L_Error, _eh, "Exception callback invoked:\n    %s",
		 e.getStackTraceString().c_str());

   cms::Connection *conn = mConn;
   mConn = 0;

   // This somehow results in proper exception getting thrown in the amq thread.
   // Go figure ... zen engineering or just a bug?
   //
   // 2013-03-07: Argh ... and this apparently crashes with activemq-cpp-3.5.0!
   // What to do?
   // Use mConn == 0 in main thread as a signal things should be reopened.
   // But do no call close :)
   if (conn)
   {
     conn->close();
     delete conn;
   }
}



Re: [amq-cpp] cms::ExceptionListener::onException() and what one can do there

Posted by "matevz.tadel" <ma...@cern.ch>.
I will definitely try 3.6.0 as soon as I get to it.

Yes, we use stomp, this is configured at the receiving side at CERN as many
(most, I think) message producers use python. I tried using failover with
stomp (with 3.4.4, I think) and it didn't end well, I got ulimit # of
threads on first error and got accused of ddosing the servers ... then I
also found a note about this on the web :)

I more or less implemented all the logick for failover, including
exponential cool off in retry times + configurable-length buffer for
messages that can not be sent. I will fix the onException() according to
your advice (close instead of delete the connection).

I'll come back if I encounter the error again after I put in these changes.




--
View this message in context: http://activemq.2283324.n4.nabble.com/amq-cpp-cms-ExceptionListener-onException-and-what-one-can-do-there-tp4664483p4664567.html
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.

Re: [amq-cpp] cms::ExceptionListener::onException() and what one can do there

Posted by Timothy Bish <ta...@gmail.com>.
First thing to do is switch to the new 3.6.0 release.
http://activemq.apache.org/cms/activemq-cpp-360-release.html

I noticed you are using Stomp which I don't recommened, switching to 
openwire will allow you to use the failover transport.

You can call close on the connection from the Exception callback and it 
should kick any blocked threads waiting on sends and receives.  You 
can't delete you connection from that callback though that'd be a bit of 
badness.  Once the onException method is called your application will 
have to tear down its CMS resources and recreate them, you can keep your 
ConnectionFactory but the other stuff Session, Connection have to go.  
Handling all that is tricky that using the failover transport and 
openwire can save you a lot of grief.

On 03/07/2013 09:22 PM, Matevz Tadel wrote:
> Hi,
>
> I'm using a single connection to an AMQ topic in a multi-threaded 
> program that does many other things beyond sending data over AMQ. So, 
> when an AMQ onException() happens I really do not want to exit the 
> application (all demos seem to have "// exit(1);" in this handler :) ) 
> and would like to reinitialize my single connection even if this means 
> leaking some resources.
>
> I'm getting exceptions like this since switching to 3.5.0 (but the 
> load on application is getting higher so it doesn't have to be a 
> change introduced in 3.5.0):
> 2013-03-06 01:51:51 ERR XrdFileCloseReporterAmq::onException Exception 
> callback invoked:
>     DataInputStream::readLong - Reached EOF
> and the application now crashes when I close the connection (before 
> this used to trigger throwing of a true exception in my application 
> thread that does AMQ calls).
>
> Please see my connect and onException() implementations below. Full 
> class is available here:
> http://www.gled.org/viewvc/gled/trunk/libsets/XrdMon/Glasses/XrdFileCloseReporterAmq.h?revision=2860&view=markup 
>
> http://www.gled.org/viewvc/gled/trunk/libsets/XrdMon/Glasses/XrdFileCloseReporterAmq.cxx?revision=2862&view=markup 
>
>
> I guess the question is ... is there anything sane one can do after 
> onException() is invoked or this actually means internal state of 
> amq-cpp is corrupted beyond repair? If this is indeed the case, the 
> only option  for me is to separate sending of AMQ messages from the 
> main application.
>
> Best,
> Matevz
>
>
> void XrdFileCloseReporterAmq::amq_connect()
> {
>   static const Exc_t _eh("XrdFileCloseReporterAmq::amq_connect ");
>
>   TString uri;
>
>   // 2012-07 Failover and Stomp don't splice ... or, they splice too 
> well, making as
>   // many threads as ulimit lets them on first error ... thrashing the 
> machine.
>   // uri.Form("failover://(tcp://%s:%hu?wireFormat=stomp)", 
> mAmqHost.Data(), mAmqPort);
>
>   uri.Form("tcp://%s:%hu?wireFormat=stomp", mAmqHost.Data(), mAmqPort);
>
>   try
>   {
>     auto_ptr<cms::ConnectionFactory> conn_fac
>       (new activemq::core::ActiveMQConnectionFactory(uri.Data(), 
> mAmqUser.Data(), mAmqPswd.Data()));
>
>     mConn = conn_fac->createConnection();
>     mConn->setExceptionListener(this);
>     mConn->start();
>   }
>   catch (cms::CMSException& e)
>   {
>     throw _eh + "Exception during connection creation: " + 
> e.getStackTraceString();
>   }
>
>   try
>   {
>     mSess = mConn->createSession(); // Default is AUTO_ACKNOWLEDGE
>     mDest = mSess->createTopic(mAmqTopic.Data());
>     mProd = mSess->createProducer(mDest);
>     mProd->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT); // 
> Copied from examples, NFI.
>   }
>   catch (cms::InvalidDestinationException& e)
>   {
>     throw _eh + "Invalid destination exception during producer 
> creation: " + e.getStackTraceString();
>   }
>   catch (cms::CMSException& e)
>   {
>     throw _eh + "Exception during session, topic or message producer 
> initialization: " + e.getStackTraceString();
>   }
> }
>
>
> void XrdFileCloseReporterAmq::onException(const cms::CMSException &e)
> {
>   static const Exc_t _eh("XrdFileCloseReporterAmq::onException ");
>
>   if (*mLog)
>       mLog->Form(ZLog::L_Error, _eh, "Exception callback invoked:\n    
> %s",
>          e.getStackTraceString().c_str());
>
>   cms::Connection *conn = mConn;
>   mConn = 0;
>
>   // This somehow results in proper exception getting thrown in the 
> amq thread.
>   // Go figure ... zen engineering or just a bug?
>   //
>   // 2013-03-07: Argh ... and this apparently crashes with 
> activemq-cpp-3.5.0!
>   // What to do?
>   // Use mConn == 0 in main thread as a signal things should be reopened.
>   // But do no call close :)
>   if (conn)
>   {
>     conn->close();
>     delete conn;
>   }
> }
>
>
>


-- 
Tim Bish
Sr Software Engineer | RedHat Inc.
tim.bish@redhat.com | www.fusesource.com | www.redhat.com
skype: tabish121 | twitter: @tabish121
blog: http://timbish.blogspot.com/