You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2007/03/19 13:50:35 UTC
svn commit: r519932 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core:
ActiveMQConsumer.cpp ActiveMQConsumer.h ActiveMQProducer.h
ActiveMQTransaction.h
Author: tabish
Date: Mon Mar 19 05:50:34 2007
New Revision: 519932
URL: http://svn.apache.org/viewvc?view=rev&rev=519932
Log:
Cleaning up some code.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?view=diff&rev=519932&r1=519931&r2=519932
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Mon Mar 19 05:50:34 2007
@@ -41,12 +41,12 @@
__FILE__, __LINE__,
"ActiveMQConsumer::ActiveMQConsumer - Init with NULL Session");
}
-
+
// Init Producer Data
- this->session = session;
- this->consumerInfo = consumerInfo;
- this->listener = NULL;
- this->closed = false;
+ this->session = session;
+ this->consumerInfo = consumerInfo;
+ this->listener = NULL;
+ this->closed = false;
// Listen for our resource to close
this->consumerInfo->addListener( this );
@@ -66,51 +66,50 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::close()
+void ActiveMQConsumer::close()
throw ( cms::CMSException )
{
try
{
if( !closed ) {
-
+
// Identifies any errors encountered during shutdown.
bool haveException = false;
- ActiveMQException error;
-
+ ActiveMQException error;
+
// Close the ConsumerInfo
if( !consumerInfo->isClosed() ) {
try{
- // We don't want a callback now
- this->consumerInfo->removeListener( this );
- this->consumerInfo->close();
+ // We don't want a callback now
+ this->consumerInfo->removeListener( this );
+ this->consumerInfo->close();
} catch( ActiveMQException& ex ){
- if( !haveException ){
- ex.setMark( __FILE__, __LINE__ );
+ if( !haveException ){
+ ex.setMark( __FILE__, __LINE__ );
error = ex;
haveException = true;
}
}
}
-
+
closed = true;
-
+
// Purge all the pending messages
try{
purgeMessages();
} catch ( ActiveMQException& ex ){
- if( !haveException ){
- ex.setMark( __FILE__, __LINE__ );
+ if( !haveException ){
+ ex.setMark( __FILE__, __LINE__ );
error = ex;
haveException = true;
}
}
-
+
// If we encountered an error, propagate it.
if( haveException ){
error.setMark( __FILE__, __LINE__ );
throw error;
}
-
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
@@ -118,7 +117,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-std::string ActiveMQConsumer::getMessageSelector() const
+std::string ActiveMQConsumer::getMessageSelector() const
throw ( cms::CMSException )
{
try
@@ -150,15 +149,16 @@
{
unconsumedMessages.wait();
}
-
+
// This will only happen when this object is being
// closed in another thread context - kind of
// scary.
if( closed ){
- throw ActiveMQException( __FILE__, __LINE__,
+ throw ActiveMQException(
+ __FILE__, __LINE__,
"Consumer is being closed in another thread" );
}
-
+
// Fetch the Message then copy it so it can be handed off
// to the user.
DispatchData data = unconsumedMessages.pop();
@@ -167,7 +167,7 @@
// The Message is cleaned up here if the Session is not
// transacted, otherwise we let the transaction clean up
- // this message as it will have already been ack'd and
+ // this message as it will have already been ack'd and
// stored for later redelivery.
destroyMessage( message );
@@ -181,7 +181,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-cms::Message* ActiveMQConsumer::receive( int millisecs )
+cms::Message* ActiveMQConsumer::receive( int millisecs )
throw ( cms::CMSException )
{
try
@@ -197,23 +197,24 @@
{
// Check for empty, and wait if its not
if( !closed && unconsumedMessages.empty() ){
-
- unconsumedMessages.wait(millisecs);
+
+ unconsumedMessages.wait( millisecs );
// if its still empty...bail
if( unconsumedMessages.empty() ) {
return NULL;
}
}
-
+
// This will only happen when this object is being
// closed in another thread context - kind of
// scary.
if( closed ){
- throw ActiveMQException( __FILE__, __LINE__,
+ throw ActiveMQException(
+ __FILE__, __LINE__,
"Consumer is being closed in another thread" );
}
-
+
// Fetch the Message then copy it so it can be handed off
// to the user.
DispatchData data = unconsumedMessages.pop();
@@ -222,7 +223,7 @@
// The Message is cleaned up here if the Session is not
// transacted, otherwise we let the transaction clean up
- // this message as it will have already been ack'd and
+ // this message as it will have already been ack'd and
// stored for later redelivery.
destroyMessage( message );
@@ -236,7 +237,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-cms::Message* ActiveMQConsumer::receiveNoWait()
+cms::Message* ActiveMQConsumer::receiveNoWait()
throw ( cms::CMSException )
{
try
@@ -260,14 +261,14 @@
// The Message is cleaned up here if the Session is not
// transacted, otherwise we let the transaction clean up
- // this message as it will have already been ack'd and
+ // this message as it will have already been ack'd and
// stored for later redelivery.
destroyMessage( message );
return result;
}
}
-
+
return NULL;
}
AMQ_CATCH_RETHROW( ActiveMQException )
@@ -287,23 +288,23 @@
}
this->listener = listener;
-
+
if( listener != NULL && session != NULL ) {
-
+
// Now that we have a valid message listener,
// redispatch all the messages that it missed.
-
+
bool wasStarted = session->isStarted();
if( wasStarted ) {
session->stop();
}
-
+
session->redispatch( unconsumedMessages );
-
+
if( wasStarted ) {
session->start();
}
- }
+ }
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
@@ -336,9 +337,8 @@
{
try
{
-
ActiveMQMessage* message = data.getMessage();
-
+
// Don't dispatch expired messages, ack it and then destroy it
if( message->isExpired() ) {
session->acknowledge( this, message );
@@ -347,27 +347,27 @@
// stop now, don't queue
return;
}
-
- // If the Session is in ClientAcknowledge mode, then we set the
+
+ // If the Session is in ClientAcknowledge mode, then we set the
// handler in the message to this object and send it out. Otherwise
// we ack it here for all the other Modes.
if( session->getAcknowledgeMode() == Session::CLIENT_ACKNOWLEDGE ) {
-
+
// Register ourself so that we can handle the Message's
// acknowledge method.
message->setAckHandler( this );
-
+
} else {
session->acknowledge( this, message );
}
-
+
// If we have a listener, send the message.
if( listener != NULL ) {
cms::Message* message = dynamic_cast<cms::Message*>(data.getMessage());
listener->onMessage( message );
destroyMessage( message );
} else {
-
+
// No listener, add it to the unconsumed messages list
synchronized( &unconsumedMessages ) {
unconsumedMessages.push( data );
@@ -389,7 +389,7 @@
while( !unconsumedMessages.empty() )
{
// destroy these messages if this is not a transacted
- // session, if it is then the tranasction will clean
+ // session, if it is then the tranasction will clean
// the messages up.
destroyMessage( dynamic_cast<cms::Message*>(unconsumedMessages.pop().getMessage()) );
}
@@ -401,7 +401,7 @@
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::destroyMessage( Message* message ) throw ( ActiveMQException ){
-
+
try
{
/**
@@ -412,7 +412,7 @@
{
delete message;
}
- }
+ }
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
@@ -444,4 +444,3 @@
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
-
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?view=diff&rev=519932&r1=519931&r2=519932
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Mon Mar 19 05:50:34 2007
@@ -34,29 +34,29 @@
class ActiveMQSession;
- class ActiveMQConsumer :
+ class ActiveMQConsumer :
public cms::MessageConsumer,
public ActiveMQAckHandler,
public Dispatcher,
- public connector::ConnectorResourceListener
+ public connector::ConnectorResourceListener
{
private:
-
+
// The session that owns this Consumer
ActiveMQSession* session;
-
+
// The Consumer info for this Consumer
connector::ConsumerInfo* consumerInfo;
-
+
// The Message Listener for this Consumer
cms::MessageListener* listener;
-
+
// Queue of unconsumed messages.
util::Queue<DispatchData> unconsumedMessages;
-
+
// Boolean that indicates if the consumer has been closed
bool closed;
-
+
public:
/**
@@ -77,7 +77,7 @@
* @throws CMSException
*/
virtual void close() throw ( cms::CMSException );
-
+
/**
* Synchronously Receive a Message
* @return new message
@@ -121,9 +121,9 @@
* @return This Consumer's selector expression or "".
* @throws cms::CMSException
*/
- virtual std::string getMessageSelector() const
+ virtual std::string getMessageSelector() const
throw ( cms::CMSException );
-
+
/**
* Method called to acknowledge the message passed
* @param message the Message to Acknowlegde
@@ -133,29 +133,18 @@
throw ( cms::CMSException );
public: // Dispatcher Methods
-
+
/**
* Called asynchronously by the session to dispatch a message.
* @param message object pointer
*/
virtual void dispatch( DispatchData& message );
-
- public: // ActiveMQSessionResource
-
- /**
- * Retrieve the Connector resource that is associated with
- * this Session resource.
- * @return pointer to a Connector Resource, can be NULL
- */
- virtual connector::ConnectorResource* getConnectorResource() {
- return consumerInfo;
- }
public: // ActiveMQConsumer Methods
/**
* Get the Consumer information for this consumer
- * @return Pointer to a Consumer Info Object
+ * @return Pointer to a Consumer Info Object
*/
virtual connector::ConsumerInfo* getConsumerInfo() {
return consumerInfo;
@@ -173,19 +162,19 @@
const connector::ConnectorResource* resource ) throw ( cms::CMSException );
protected:
-
+
/**
* Purges all messages currently in the queue. This can be as a
* result of a rollback, or of the consumer being shutdown.
*/
virtual void purgeMessages() throw (exceptions::ActiveMQException);
-
+
/**
* Destroys the message if the session is transacted, otherwise
* does nothing.
* @param message the message to destroy
*/
- virtual void destroyMessage( cms::Message* message )
+ virtual void destroyMessage( cms::Message* message )
throw (exceptions::ActiveMQException);
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h?view=diff&rev=519932&r1=519931&r2=519932
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h Mon Mar 19 05:50:34 2007
@@ -204,17 +204,6 @@
return defaultTimeToLive;
}
- public: // ActiveMQSessionResource
-
- /**
- * Retrieve the Connector resource that is associated with
- * this Session resource.
- * @return pointer to a Connector Resource, can be NULL
- */
- virtual connector::ConnectorResource* getConnectorResource() {
- return producerInfo;
- }
-
public:
/**
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h?view=diff&rev=519932&r1=519931&r2=519932
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h Mon Mar 19 05:50:34 2007
@@ -212,17 +212,6 @@
virtual void onTaskException( concurrent::Runnable* task,
exceptions::ActiveMQException& ex );
- public: // ActiveMQSessionResource
-
- /**
- * Retrieve the Connector resource that is associated with
- * this Session resource.
- * @return pointer to a Connector Resource, can be NULL
- */
- virtual connector::ConnectorResource* getConnectorResource(void) {
- return transactionInfo;
- }
-
protected:
/**