You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2007/03/18 20:40:53 UTC
svn commit: r519679 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core:
ActiveMQConnection.cpp ActiveMQConnection.h ActiveMQSession.cpp
ActiveMQSession.h ActiveMQSessionExecutor.cpp
Author: nmittler
Date: Sun Mar 18 12:40:52 2007
New Revision: 519679
URL: http://svn.apache.org/viewvc?view=rev&rev=519679
Log:
AMQCPP-90 - simplifying the execution of session threads
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?view=diff&rev=519679&r1=519678&r2=519679
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Sun Mar 18 12:40:52 2007
@@ -39,9 +39,6 @@
this->closed = false;
this->exceptionListener = NULL;
- alwaysSessionAsync = Boolean::parseBoolean(
- connectionData->getProperties().getProperty( "alwaysSessionAsync", "true" ) );
-
// Register for messages and exceptions from the connector.
Connector* connector = connectionData->getConnector();
connector->setConsumerMessageListener( this );
@@ -97,16 +94,12 @@
{
try
{
- // Determine whether or not to make dispatch for this session asynchronous
- bool doSessionAsync = alwaysSessionAsync || !activeSessions.isEmpty() ||
- ackMode==Session::SESSION_TRANSACTED || ackMode==Session::CLIENT_ACKNOWLEDGE;
// Create the session instance.
ActiveMQSession* session = new ActiveMQSession(
connectionData->getConnector()->createSession( ackMode ),
connectionData->getProperties(),
- this,
- doSessionAsync );
+ this );
// Add the session to the set of active sessions.
synchronized( &activeSessions ) {
@@ -224,28 +217,18 @@
synchronized( &dispatchers )
{
dispatcher = dispatchers.getValue(consumer->getConsumerId());
- }
- // If we have no registered dispatcher, this is bad!! (should never happen)
- if( dispatcher == NULL )
- {
- // Indicate to Broker that we received the message, but it
- // was not consumed.
- connectionData->getConnector()->acknowledge(
- consumer->getSessionInfo(),
- consumer,
- dynamic_cast<Message*>( message ),
- Connector::DeliveredAck );
-
- // Delete the message here
- delete message;
-
- throw ActiveMQException(__FILE__, __LINE__, "no dispatcher registered for consumer" );
- }
-
- // Dispatch the message.
- DispatchData data( consumer, message );
- dispatcher->dispatch( data );
+ // If we have no registered dispatcher, the consumer was probably
+ // just closed. Just delete the message.
+ if( dispatcher == NULL ) {
+ delete message;
+ } else {
+
+ // Dispatch the message.
+ DispatchData data( consumer, message );
+ dispatcher->dispatch( data );
+ }
+ }
}
catch( exceptions::ActiveMQException& ex )
{
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?view=diff&rev=519679&r1=519678&r2=519679
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Sun Mar 18 12:40:52 2007
@@ -83,12 +83,6 @@
* Maintain the set of all active sessions.
*/
util::Set<ActiveMQSession*> activeSessions;
-
- /**
- * If true, dispatch for all sessions will be asynchronous to the
- * transport.
- */
- bool alwaysSessionAsync;
public:
@@ -210,7 +204,7 @@
virtual void onConsumerMessage( connector::ConsumerInfo* consumer,
core::ActiveMQMessage* message );
- private:
+ public:
/**
* Notify the excpetion listener
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?view=diff&rev=519679&r1=519678&r2=519679
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Sun Mar 18 12:40:52 2007
@@ -41,8 +41,7 @@
////////////////////////////////////////////////////////////////////////////////
ActiveMQSession::ActiveMQSession( SessionInfo* sessionInfo,
const Properties& properties,
- ActiveMQConnection* connection,
- bool sessionAsyncDispatch )
+ ActiveMQConnection* connection )
{
if( sessionInfo == NULL || connection == NULL )
{
@@ -56,7 +55,6 @@
this->connection = connection;
this->closed = false;
this->asyncThread = NULL;
- this->sessionAsyncDispatch = sessionAsyncDispatch;
this->useAsyncSend = Boolean::parseBoolean(
properties.getProperty( "useAsyncSend", "false" ) );
@@ -90,6 +88,13 @@
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::fire( exceptions::ActiveMQException& ex ) {
+ if( connection != NULL ) {
+ connection->fire( ex );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQSession::close() throw ( cms::CMSException )
{
// If we're already close, just get outta' here.
@@ -717,6 +722,11 @@
if( consumer != NULL )
{
+ bool wasStarted = isStarted();
+ if( wasStarted ) {
+ stop();
+ }
+
// Remove the dispatcher for the Connection
connection->removeDispatcher( consumer );
@@ -730,6 +740,10 @@
// Remove this consumer from the consumers map.
synchronized( &consumers ) {
consumers.remove( consumer->getConsumerId() );
+ }
+
+ if( wasStarted ) {
+ start();
}
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?view=diff&rev=519679&r1=519678&r2=519679
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Sun Mar 18 12:40:52 2007
@@ -89,11 +89,6 @@
* Is this Session using Async Sends.
*/
bool useAsyncSend;
-
- /**
- * Indicates whether or not dispatching should be done asynchronously.
- */
- bool sessionAsyncDispatch;
/**
* Outgoing Message Queue
@@ -106,18 +101,10 @@
ActiveMQSession( connector::SessionInfo* sessionInfo,
const util::Properties& properties,
- ActiveMQConnection* connection,
- bool sessionAsyncDispatch );
+ ActiveMQConnection* connection );
virtual ~ActiveMQSession();
- /**
- * Indicates whether or not dispatching should be done asynchronously.
- */
- bool isSessionAsyncDispatch() const {
- return sessionAsyncDispatch;
- }
-
util::Map<long long, ActiveMQConsumer*>& getConsumers() {
return consumers;
}
@@ -143,6 +130,11 @@
* state.
*/
bool isStarted() const;
+
+ /**
+ * Fires the given exception to the exception listener of the connection
+ */
+ void fire( exceptions::ActiveMQException& ex );
public: // Methods from ActiveMQMessageDispatcher
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp?view=diff&rev=519679&r1=519678&r2=519679
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp Sun Mar 18 12:40:52 2007
@@ -47,32 +47,20 @@
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionExecutor::execute( DispatchData& data ) {
- // If dispatch async is off, just dispatch in the context of the transport
- // thread.
- if ( !session->isSessionAsyncDispatch() ){
- dispatch(data);
- }else {
-
- synchronized( &messageQueue ) {
- messageQueue.push(data);
- wakeup();
- }
+ // Add the data to the queue.
+ synchronized( &messageQueue ) {
+ messageQueue.push(data);
+ wakeup();
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionExecutor::executeFirst( DispatchData& data ) {
- // If dispatch async is off, just dispatch in the context of the transport
- // thread.
- if ( !session->isSessionAsyncDispatch() && started ){
- dispatch(data);
- }else {
-
- synchronized( &messageQueue ) {
- messageQueue.enqueueFront(data);
- wakeup();
- }
+ // Add the data to the front of the queue.
+ synchronized( &messageQueue ) {
+ messageQueue.enqueueFront(data);
+ wakeup();
}
}
@@ -83,7 +71,7 @@
started = true;
// Don't create the thread unless we need to.
- if( session->isSessionAsyncDispatch() && thread == NULL ) {
+ if( thread == NULL ) {
thread = new Thread( this );
thread->start();
}
@@ -126,19 +114,32 @@
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionExecutor::dispatch( DispatchData& data ) {
- ActiveMQConsumer* consumer = NULL;
- util::Map<long long, ActiveMQConsumer*>& consumers = session->getConsumers();
-
- synchronized(&consumers) {
- consumer = consumers.getValue( data.getConsumer()->getConsumerId() );
- }
-
- if( consumer == NULL ) {
- execute( data );
- } else {
- consumer->dispatch( data );
- }
+ try {
+ ActiveMQConsumer* consumer = NULL;
+ util::Map<long long, ActiveMQConsumer*>& consumers = session->getConsumers();
+
+ synchronized(&consumers) {
+ consumer = consumers.getValue( data.getConsumer()->getConsumerId() );
+ }
+
+ // If the consumer is not available, just delete the message.
+ // Otherwise, dispatch the message to the consumer.
+ if( consumer == NULL ) {
+ delete data.getMessage();
+ } else {
+ consumer->dispatch( data );
+ }
+ } catch( ActiveMQException& ex ) {
+ ex.setMark(__FILE__, __LINE__ );
+ ex.printStackTrace();
+ } catch( std::exception& ex ) {
+ ActiveMQException amqex( __FILE__, __LINE__, ex.what() );
+ amqex.printStackTrace();
+ } catch( ... ) {
+ ActiveMQException amqex( __FILE__, __LINE__, "caught unknown exception" );
+ amqex.printStackTrace();
+ }
}
////////////////////////////////////////////////////////////////////////////////
@@ -162,10 +163,14 @@
}
} catch( ActiveMQException& ex ) {
- ex.printStackTrace();
+ ex.setMark(__FILE__, __LINE__ );
+ session->fire( ex );
+ } catch( std::exception& stdex ) {
+ ActiveMQException ex( __FILE__, __LINE__, stdex.what() );
+ session->fire( ex );
} catch( ... ) {
ActiveMQException ex(__FILE__, __LINE__, "caught unknown exception" );
- ex.printStackTrace();
+ session->fire( ex );
}
}
@@ -190,12 +195,8 @@
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionExecutor::wakeup() {
- if( session->isSessionAsyncDispatch() ) {
- synchronized( &messageQueue ) {
- messageQueue.notifyAll();
- }
- } else if( started ){
- dispatchAll();
+ synchronized( &messageQueue ) {
+ messageQueue.notifyAll();
}
}