You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/02/07 00:11:39 UTC
svn commit: r741774 [5/17] - in /activemq/activemq-cpp/trunk/src: main/
main/activemq/commands/ main/activemq/core/ main/activemq/exceptions/
main/activemq/state/ main/activemq/wireformat/openwire/marshal/
main/activemq/wireformat/openwire/marshal/v1/ ...
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/ReplayCommand.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/ReplayCommand.cpp?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/ReplayCommand.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/ReplayCommand.cpp Fri Feb 6 23:11:28 2009
@@ -49,12 +49,12 @@
////////////////////////////////////////////////////////////////////////////////
ReplayCommand* ReplayCommand::cloneDataStructure() const {
- ReplayCommand* replayCommand = new ReplayCommand();
+ std::auto_ptr<ReplayCommand> replayCommand( new ReplayCommand() );
// Copy the data from the base class or classes
replayCommand->copyDataStructure( this );
- return replayCommand;
+ return replayCommand.release();
}
////////////////////////////////////////////////////////////////////////////////
@@ -124,7 +124,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-commands::Command* ReplayCommand::visit( activemq::state::CommandVisitor* visitor )
+decaf::lang::Pointer<commands::Command> ReplayCommand::visit( activemq::state::CommandVisitor* visitor )
throw( exceptions::ActiveMQException ) {
return visitor->processReplayCommand( this );
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/ReplayCommand.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/ReplayCommand.h?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/ReplayCommand.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/ReplayCommand.h Fri Feb 6 23:11:28 2009
@@ -25,6 +25,7 @@
#include <activemq/util/Config.h>
#include <activemq/commands/BaseCommand.h>
+#include <decaf/lang/Pointer.h>
#include <vector>
#include <string>
@@ -104,7 +105,7 @@
*
* @return a Response to the visitor being called or NULL if no response.
*/
- virtual commands::Command* visit( activemq::state::CommandVisitor* visitor )
+ virtual decaf::lang::Pointer<commands::Command> visit( activemq::state::CommandVisitor* visitor )
throw( exceptions::ActiveMQException );
virtual int getFirstNakNumber() const;
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/Response.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/Response.cpp?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/Response.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/Response.cpp Fri Feb 6 23:11:28 2009
@@ -48,12 +48,12 @@
////////////////////////////////////////////////////////////////////////////////
Response* Response::cloneDataStructure() const {
- Response* response = new Response();
+ std::auto_ptr<Response> response( new Response() );
// Copy the data from the base class or classes
response->copyDataStructure( this );
- return response;
+ return response.release();
}
////////////////////////////////////////////////////////////////////////////////
@@ -118,7 +118,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-commands::Command* Response::visit( activemq::state::CommandVisitor* visitor )
+decaf::lang::Pointer<commands::Command> Response::visit( activemq::state::CommandVisitor* visitor )
throw( exceptions::ActiveMQException ) {
return visitor->processResponse( this );
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/Response.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/Response.h?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/Response.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/Response.h Fri Feb 6 23:11:28 2009
@@ -25,6 +25,7 @@
#include <activemq/util/Config.h>
#include <activemq/commands/BaseCommand.h>
+#include <decaf/lang/Pointer.h>
#include <vector>
#include <string>
@@ -103,7 +104,7 @@
*
* @return a Response to the visitor being called or NULL if no response.
*/
- virtual commands::Command* visit( activemq::state::CommandVisitor* visitor )
+ virtual decaf::lang::Pointer<commands::Command> visit( activemq::state::CommandVisitor* visitor )
throw( exceptions::ActiveMQException );
virtual int getCorrelationId() const;
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/SessionId.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/SessionId.cpp?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/SessionId.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/SessionId.cpp Fri Feb 6 23:11:28 2009
@@ -55,12 +55,12 @@
////////////////////////////////////////////////////////////////////////////////
SessionId* SessionId::cloneDataStructure() const {
- SessionId* sessionId = new SessionId();
+ std::auto_ptr<SessionId> sessionId( new SessionId() );
// Copy the data from the base class or classes
sessionId->copyDataStructure( this );
- return sessionId;
+ return sessionId.release();
}
////////////////////////////////////////////////////////////////////////////////
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/SessionId.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/SessionId.h?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/SessionId.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/SessionId.h Fri Feb 6 23:11:28 2009
@@ -25,6 +25,7 @@
#include <activemq/util/Config.h>
#include <activemq/commands/BaseDataStructure.h>
+#include <decaf/lang/Pointer.h>
#include <decaf/lang/Comparable.h>
#include <vector>
#include <string>
@@ -52,6 +53,8 @@
const static unsigned char ID_SESSIONID = 121;
+ typedef decaf::lang::PointerComparator<SessionId> COMPARATOR;
+
public:
SessionId();
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/SessionInfo.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/SessionInfo.cpp?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/SessionInfo.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/SessionInfo.cpp Fri Feb 6 23:11:28 2009
@@ -38,23 +38,21 @@
////////////////////////////////////////////////////////////////////////////////
SessionInfo::SessionInfo() {
- this->sessionId = NULL;
}
////////////////////////////////////////////////////////////////////////////////
SessionInfo::~SessionInfo() {
- delete this->sessionId;
}
////////////////////////////////////////////////////////////////////////////////
SessionInfo* SessionInfo::cloneDataStructure() const {
- SessionInfo* sessionInfo = new SessionInfo();
+ std::auto_ptr<SessionInfo> sessionInfo( new SessionInfo() );
// Copy the data from the base class or classes
sessionInfo->copyDataStructure( this );
- return sessionInfo;
+ return sessionInfo.release();
}
////////////////////////////////////////////////////////////////////////////////
@@ -75,11 +73,7 @@
__FILE__, __LINE__,
"SessionInfo::copyDataStructure - src is NULL or invalid" );
}
- if( srcPtr->getSessionId() != NULL ) {
- this->setSessionId(
- dynamic_cast<SessionId*>(
- srcPtr->getSessionId()->cloneDataStructure() ) );
- }
+ this->setSessionId( srcPtr->getSessionId() );
}
////////////////////////////////////////////////////////////////////////////////
@@ -119,7 +113,7 @@
return false;
}
if( this->getSessionId() != NULL ) {
- if( !this->getSessionId()->equals( valuePtr->getSessionId() ) ) {
+ if( !this->getSessionId()->equals( valuePtr->getSessionId().get() ) ) {
return false;
}
} else if( valuePtr->getSessionId() != NULL ) {
@@ -132,24 +126,24 @@
}
////////////////////////////////////////////////////////////////////////////////
-commands::Command* SessionInfo::visit( activemq::state::CommandVisitor* visitor )
+decaf::lang::Pointer<commands::Command> SessionInfo::visit( activemq::state::CommandVisitor* visitor )
throw( exceptions::ActiveMQException ) {
return visitor->processSessionInfo( this );
}
////////////////////////////////////////////////////////////////////////////////
-const SessionId* SessionInfo::getSessionId() const {
+const decaf::lang::Pointer<SessionId>& SessionInfo::getSessionId() const {
return sessionId;
}
////////////////////////////////////////////////////////////////////////////////
-SessionId* SessionInfo::getSessionId() {
+decaf::lang::Pointer<SessionId>& SessionInfo::getSessionId() {
return sessionId;
}
////////////////////////////////////////////////////////////////////////////////
-void SessionInfo::setSessionId( SessionId* sessionId ) {
+void SessionInfo::setSessionId( const decaf::lang::Pointer<SessionId>& sessionId ) {
this->sessionId = sessionId;
}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/SessionInfo.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/SessionInfo.h?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/SessionInfo.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/SessionInfo.h Fri Feb 6 23:11:28 2009
@@ -25,6 +25,7 @@
#include <activemq/util/Config.h>
#include <activemq/commands/BaseCommand.h>
+#include <decaf/lang/Pointer.h>
#include <activemq/commands/SessionId.h>
#include <vector>
#include <string>
@@ -45,7 +46,7 @@
class AMQCPP_API SessionInfo : public BaseCommand {
protected:
- SessionId* sessionId;
+ decaf::lang::Pointer<SessionId> sessionId;
protected:
@@ -104,12 +105,12 @@
*
* @return a Response to the visitor being called or NULL if no response.
*/
- virtual commands::Command* visit( activemq::state::CommandVisitor* visitor )
+ virtual decaf::lang::Pointer<commands::Command> visit( activemq::state::CommandVisitor* visitor )
throw( exceptions::ActiveMQException );
- virtual const SessionId* getSessionId() const;
- virtual SessionId* getSessionId();
- virtual void setSessionId( SessionId* sessionId );
+ virtual const decaf::lang::Pointer<SessionId>& getSessionId() const;
+ virtual decaf::lang::Pointer<SessionId>& getSessionId();
+ virtual void setSessionId( const decaf::lang::Pointer<SessionId>& sessionId );
};
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/ShutdownInfo.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/ShutdownInfo.cpp?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/ShutdownInfo.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/ShutdownInfo.cpp Fri Feb 6 23:11:28 2009
@@ -47,12 +47,12 @@
////////////////////////////////////////////////////////////////////////////////
ShutdownInfo* ShutdownInfo::cloneDataStructure() const {
- ShutdownInfo* shutdownInfo = new ShutdownInfo();
+ std::auto_ptr<ShutdownInfo> shutdownInfo( new ShutdownInfo() );
// Copy the data from the base class or classes
shutdownInfo->copyDataStructure( this );
- return shutdownInfo;
+ return shutdownInfo.release();
}
////////////////////////////////////////////////////////////////////////////////
@@ -112,7 +112,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-commands::Command* ShutdownInfo::visit( activemq::state::CommandVisitor* visitor )
+decaf::lang::Pointer<commands::Command> ShutdownInfo::visit( activemq::state::CommandVisitor* visitor )
throw( exceptions::ActiveMQException ) {
return visitor->processShutdownInfo( this );
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/ShutdownInfo.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/ShutdownInfo.h?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/ShutdownInfo.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/ShutdownInfo.h Fri Feb 6 23:11:28 2009
@@ -25,6 +25,7 @@
#include <activemq/util/Config.h>
#include <activemq/commands/BaseCommand.h>
+#include <decaf/lang/Pointer.h>
#include <vector>
#include <string>
@@ -102,7 +103,7 @@
*
* @return a Response to the visitor being called or NULL if no response.
*/
- virtual commands::Command* visit( activemq::state::CommandVisitor* visitor )
+ virtual decaf::lang::Pointer<commands::Command> visit( activemq::state::CommandVisitor* visitor )
throw( exceptions::ActiveMQException );
};
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/SubscriptionInfo.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/SubscriptionInfo.cpp?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/SubscriptionInfo.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/SubscriptionInfo.cpp Fri Feb 6 23:11:28 2009
@@ -39,27 +39,23 @@
SubscriptionInfo::SubscriptionInfo() {
this->clientId = "";
- this->destination = NULL;
this->selector = "";
this->subcriptionName = "";
- this->subscribedDestination = NULL;
}
////////////////////////////////////////////////////////////////////////////////
SubscriptionInfo::~SubscriptionInfo() {
- delete this->destination;
- delete this->subscribedDestination;
}
////////////////////////////////////////////////////////////////////////////////
SubscriptionInfo* SubscriptionInfo::cloneDataStructure() const {
- SubscriptionInfo* subscriptionInfo = new SubscriptionInfo();
+ std::auto_ptr<SubscriptionInfo> subscriptionInfo( new SubscriptionInfo() );
// Copy the data from the base class or classes
subscriptionInfo->copyDataStructure( this );
- return subscriptionInfo;
+ return subscriptionInfo.release();
}
////////////////////////////////////////////////////////////////////////////////
@@ -81,18 +77,10 @@
"SubscriptionInfo::copyDataStructure - src is NULL or invalid" );
}
this->setClientId( srcPtr->getClientId() );
- if( srcPtr->getDestination() != NULL ) {
- this->setDestination(
- dynamic_cast<ActiveMQDestination*>(
- srcPtr->getDestination()->cloneDataStructure() ) );
- }
+ this->setDestination( srcPtr->getDestination() );
this->setSelector( srcPtr->getSelector() );
this->setSubcriptionName( srcPtr->getSubcriptionName() );
- if( srcPtr->getSubscribedDestination() != NULL ) {
- this->setSubscribedDestination(
- dynamic_cast<ActiveMQDestination*>(
- srcPtr->getSubscribedDestination()->cloneDataStructure() ) );
- }
+ this->setSubscribedDestination( srcPtr->getSubscribedDestination() );
}
////////////////////////////////////////////////////////////////////////////////
@@ -144,7 +132,7 @@
return false;
}
if( this->getDestination() != NULL ) {
- if( !this->getDestination()->equals( valuePtr->getDestination() ) ) {
+ if( !this->getDestination()->equals( valuePtr->getDestination().get() ) ) {
return false;
}
} else if( valuePtr->getDestination() != NULL ) {
@@ -157,7 +145,7 @@
return false;
}
if( this->getSubscribedDestination() != NULL ) {
- if( !this->getSubscribedDestination()->equals( valuePtr->getSubscribedDestination() ) ) {
+ if( !this->getSubscribedDestination()->equals( valuePtr->getSubscribedDestination().get() ) ) {
return false;
}
} else if( valuePtr->getSubscribedDestination() != NULL ) {
@@ -185,17 +173,17 @@
}
////////////////////////////////////////////////////////////////////////////////
-const ActiveMQDestination* SubscriptionInfo::getDestination() const {
+const decaf::lang::Pointer<ActiveMQDestination>& SubscriptionInfo::getDestination() const {
return destination;
}
////////////////////////////////////////////////////////////////////////////////
-ActiveMQDestination* SubscriptionInfo::getDestination() {
+decaf::lang::Pointer<ActiveMQDestination>& SubscriptionInfo::getDestination() {
return destination;
}
////////////////////////////////////////////////////////////////////////////////
-void SubscriptionInfo::setDestination( ActiveMQDestination* destination ) {
+void SubscriptionInfo::setDestination( const decaf::lang::Pointer<ActiveMQDestination>& destination ) {
this->destination = destination;
}
@@ -230,17 +218,17 @@
}
////////////////////////////////////////////////////////////////////////////////
-const ActiveMQDestination* SubscriptionInfo::getSubscribedDestination() const {
+const decaf::lang::Pointer<ActiveMQDestination>& SubscriptionInfo::getSubscribedDestination() const {
return subscribedDestination;
}
////////////////////////////////////////////////////////////////////////////////
-ActiveMQDestination* SubscriptionInfo::getSubscribedDestination() {
+decaf::lang::Pointer<ActiveMQDestination>& SubscriptionInfo::getSubscribedDestination() {
return subscribedDestination;
}
////////////////////////////////////////////////////////////////////////////////
-void SubscriptionInfo::setSubscribedDestination( ActiveMQDestination* subscribedDestination ) {
+void SubscriptionInfo::setSubscribedDestination( const decaf::lang::Pointer<ActiveMQDestination>& subscribedDestination ) {
this->subscribedDestination = subscribedDestination;
}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/SubscriptionInfo.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/SubscriptionInfo.h?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/SubscriptionInfo.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/SubscriptionInfo.h Fri Feb 6 23:11:28 2009
@@ -25,6 +25,7 @@
#include <activemq/util/Config.h>
#include <activemq/commands/BaseDataStructure.h>
+#include <decaf/lang/Pointer.h>
#include <activemq/commands/ActiveMQDestination.h>
#include <activemq/commands/ActiveMQDestination.h>
#include <vector>
@@ -47,10 +48,10 @@
protected:
std::string clientId;
- ActiveMQDestination* destination;
+ decaf::lang::Pointer<ActiveMQDestination> destination;
std::string selector;
std::string subcriptionName;
- ActiveMQDestination* subscribedDestination;
+ decaf::lang::Pointer<ActiveMQDestination> subscribedDestination;
protected:
@@ -106,9 +107,9 @@
virtual std::string& getClientId();
virtual void setClientId( const std::string& clientId );
- virtual const ActiveMQDestination* getDestination() const;
- virtual ActiveMQDestination* getDestination();
- virtual void setDestination( ActiveMQDestination* destination );
+ virtual const decaf::lang::Pointer<ActiveMQDestination>& getDestination() const;
+ virtual decaf::lang::Pointer<ActiveMQDestination>& getDestination();
+ virtual void setDestination( const decaf::lang::Pointer<ActiveMQDestination>& destination );
virtual const std::string& getSelector() const;
virtual std::string& getSelector();
@@ -118,9 +119,9 @@
virtual std::string& getSubcriptionName();
virtual void setSubcriptionName( const std::string& subcriptionName );
- virtual const ActiveMQDestination* getSubscribedDestination() const;
- virtual ActiveMQDestination* getSubscribedDestination();
- virtual void setSubscribedDestination( ActiveMQDestination* subscribedDestination );
+ virtual const decaf::lang::Pointer<ActiveMQDestination>& getSubscribedDestination() const;
+ virtual decaf::lang::Pointer<ActiveMQDestination>& getSubscribedDestination();
+ virtual void setSubscribedDestination( const decaf::lang::Pointer<ActiveMQDestination>& subscribedDestination );
};
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/TransactionId.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/TransactionId.cpp?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/TransactionId.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/TransactionId.cpp Fri Feb 6 23:11:28 2009
@@ -53,12 +53,12 @@
////////////////////////////////////////////////////////////////////////////////
TransactionId* TransactionId::cloneDataStructure() const {
- TransactionId* transactionId = new TransactionId();
+ std::auto_ptr<TransactionId> transactionId( new TransactionId() );
// Copy the data from the base class or classes
transactionId->copyDataStructure( this );
- return transactionId;
+ return transactionId.release();
}
////////////////////////////////////////////////////////////////////////////////
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/TransactionId.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/TransactionId.h?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/TransactionId.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/TransactionId.h Fri Feb 6 23:11:28 2009
@@ -25,6 +25,7 @@
#include <activemq/util/Config.h>
#include <activemq/commands/BaseDataStructure.h>
+#include <decaf/lang/Pointer.h>
#include <decaf/lang/Comparable.h>
#include <vector>
#include <string>
@@ -50,6 +51,8 @@
const static unsigned char ID_TRANSACTIONID = 0;
+ typedef decaf::lang::PointerComparator<TransactionId> COMPARATOR;
+
public:
TransactionId();
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/TransactionInfo.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/TransactionInfo.cpp?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/TransactionInfo.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/TransactionInfo.cpp Fri Feb 6 23:11:28 2009
@@ -38,26 +38,22 @@
////////////////////////////////////////////////////////////////////////////////
TransactionInfo::TransactionInfo() {
- this->connectionId = NULL;
- this->transactionId = NULL;
this->type = 0;
}
////////////////////////////////////////////////////////////////////////////////
TransactionInfo::~TransactionInfo() {
- delete this->connectionId;
- delete this->transactionId;
}
////////////////////////////////////////////////////////////////////////////////
TransactionInfo* TransactionInfo::cloneDataStructure() const {
- TransactionInfo* transactionInfo = new TransactionInfo();
+ std::auto_ptr<TransactionInfo> transactionInfo( new TransactionInfo() );
// Copy the data from the base class or classes
transactionInfo->copyDataStructure( this );
- return transactionInfo;
+ return transactionInfo.release();
}
////////////////////////////////////////////////////////////////////////////////
@@ -78,16 +74,8 @@
__FILE__, __LINE__,
"TransactionInfo::copyDataStructure - src is NULL or invalid" );
}
- if( srcPtr->getConnectionId() != NULL ) {
- this->setConnectionId(
- dynamic_cast<ConnectionId*>(
- srcPtr->getConnectionId()->cloneDataStructure() ) );
- }
- if( srcPtr->getTransactionId() != NULL ) {
- this->setTransactionId(
- dynamic_cast<TransactionId*>(
- srcPtr->getTransactionId()->cloneDataStructure() ) );
- }
+ this->setConnectionId( srcPtr->getConnectionId() );
+ this->setTransactionId( srcPtr->getTransactionId() );
this->setType( srcPtr->getType() );
}
@@ -135,14 +123,14 @@
return false;
}
if( this->getConnectionId() != NULL ) {
- if( !this->getConnectionId()->equals( valuePtr->getConnectionId() ) ) {
+ if( !this->getConnectionId()->equals( valuePtr->getConnectionId().get() ) ) {
return false;
}
} else if( valuePtr->getConnectionId() != NULL ) {
return false;
}
if( this->getTransactionId() != NULL ) {
- if( !this->getTransactionId()->equals( valuePtr->getTransactionId() ) ) {
+ if( !this->getTransactionId()->equals( valuePtr->getTransactionId().get() ) ) {
return false;
}
} else if( valuePtr->getTransactionId() != NULL ) {
@@ -158,39 +146,39 @@
}
////////////////////////////////////////////////////////////////////////////////
-commands::Command* TransactionInfo::visit( activemq::state::CommandVisitor* visitor )
+decaf::lang::Pointer<commands::Command> TransactionInfo::visit( activemq::state::CommandVisitor* visitor )
throw( exceptions::ActiveMQException ) {
return visitor->processTransactionInfo( this );
}
////////////////////////////////////////////////////////////////////////////////
-const ConnectionId* TransactionInfo::getConnectionId() const {
+const decaf::lang::Pointer<ConnectionId>& TransactionInfo::getConnectionId() const {
return connectionId;
}
////////////////////////////////////////////////////////////////////////////////
-ConnectionId* TransactionInfo::getConnectionId() {
+decaf::lang::Pointer<ConnectionId>& TransactionInfo::getConnectionId() {
return connectionId;
}
////////////////////////////////////////////////////////////////////////////////
-void TransactionInfo::setConnectionId( ConnectionId* connectionId ) {
+void TransactionInfo::setConnectionId( const decaf::lang::Pointer<ConnectionId>& connectionId ) {
this->connectionId = connectionId;
}
////////////////////////////////////////////////////////////////////////////////
-const TransactionId* TransactionInfo::getTransactionId() const {
+const decaf::lang::Pointer<TransactionId>& TransactionInfo::getTransactionId() const {
return transactionId;
}
////////////////////////////////////////////////////////////////////////////////
-TransactionId* TransactionInfo::getTransactionId() {
+decaf::lang::Pointer<TransactionId>& TransactionInfo::getTransactionId() {
return transactionId;
}
////////////////////////////////////////////////////////////////////////////////
-void TransactionInfo::setTransactionId( TransactionId* transactionId ) {
+void TransactionInfo::setTransactionId( const decaf::lang::Pointer<TransactionId>& transactionId ) {
this->transactionId = transactionId;
}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/TransactionInfo.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/TransactionInfo.h?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/TransactionInfo.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/TransactionInfo.h Fri Feb 6 23:11:28 2009
@@ -25,6 +25,7 @@
#include <activemq/util/Config.h>
#include <activemq/commands/BaseCommand.h>
+#include <decaf/lang/Pointer.h>
#include <activemq/commands/ConnectionId.h>
#include <activemq/commands/TransactionId.h>
#include <vector>
@@ -46,8 +47,8 @@
class AMQCPP_API TransactionInfo : public BaseCommand {
protected:
- ConnectionId* connectionId;
- TransactionId* transactionId;
+ decaf::lang::Pointer<ConnectionId> connectionId;
+ decaf::lang::Pointer<TransactionId> transactionId;
unsigned char type;
protected:
@@ -107,16 +108,16 @@
*
* @return a Response to the visitor being called or NULL if no response.
*/
- virtual commands::Command* visit( activemq::state::CommandVisitor* visitor )
+ virtual decaf::lang::Pointer<commands::Command> visit( activemq::state::CommandVisitor* visitor )
throw( exceptions::ActiveMQException );
- virtual const ConnectionId* getConnectionId() const;
- virtual ConnectionId* getConnectionId();
- virtual void setConnectionId( ConnectionId* connectionId );
-
- virtual const TransactionId* getTransactionId() const;
- virtual TransactionId* getTransactionId();
- virtual void setTransactionId( TransactionId* transactionId );
+ virtual const decaf::lang::Pointer<ConnectionId>& getConnectionId() const;
+ virtual decaf::lang::Pointer<ConnectionId>& getConnectionId();
+ virtual void setConnectionId( const decaf::lang::Pointer<ConnectionId>& connectionId );
+
+ virtual const decaf::lang::Pointer<TransactionId>& getTransactionId() const;
+ virtual decaf::lang::Pointer<TransactionId>& getTransactionId();
+ virtual void setTransactionId( const decaf::lang::Pointer<TransactionId>& transactionId );
virtual unsigned char getType() const;
virtual void setType( unsigned char type );
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/WireFormatInfo.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/WireFormatInfo.cpp?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/WireFormatInfo.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/WireFormatInfo.cpp Fri Feb 6 23:11:28 2009
@@ -163,7 +163,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-commands::Command* WireFormatInfo::visit( activemq::state::CommandVisitor* visitor )
+decaf::lang::Pointer<commands::Command> WireFormatInfo::visit( activemq::state::CommandVisitor* visitor )
throw( exceptions::ActiveMQException ) {
return visitor->processWireFormat( this );
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/WireFormatInfo.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/WireFormatInfo.h?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/WireFormatInfo.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/WireFormatInfo.h Fri Feb 6 23:11:28 2009
@@ -29,6 +29,22 @@
namespace commands{
class AMQCPP_API WireFormatInfo : public BaseCommand {
+ private:
+
+ std::vector<unsigned char> magic;
+ std::vector<unsigned char> marshalledProperties;
+
+ /**
+ * WireFormatInfo Properties, unmarshaled from the marshaled
+ * properties on use.
+ */
+ util::PrimitiveMap properties;
+
+ /**
+ * OpenWire Protocol Version
+ */
+ int version;
+
public:
const static unsigned char ID_WIREFORMATINFO = 1;
@@ -41,13 +57,13 @@
/**
* Get the unique identifier that this object and its own
- * Marshaller share.
+ * Marshaler share.
* @returns new DataStructure type copy.
*/
virtual unsigned char getDataStructureType() const;
/**
- * Clone this obbject and return a new instance that the
+ * Clone this object and return a new instance that the
* caller now owns, this will be an exact copy of this one
* @returns new copy of this object.
*/
@@ -75,9 +91,9 @@
virtual bool equals( const DataStructure* value ) const;
/**
- * Indicates that this command is aware of Marshalling, and needs
- * to have its Marshalling methods invoked.
- * @returns boolean indicating desire to be in marshalling stages
+ * Indicates that this command is aware of Marshaling, and needs
+ * to have its Marshaling methods invoked.
+ * @returns boolean indicating desire to be in marshaling stages
*/
virtual bool isMarshalAware() const {
return true;
@@ -90,8 +106,8 @@
*
* @return a Response to the visitor being called or NULL if no response.
*/
- virtual commands::Command* visit( activemq::state::CommandVisitor* visitor )
- throw( exceptions::ActiveMQException );
+ virtual decaf::lang::Pointer<commands::Command> visit(
+ activemq::state::CommandVisitor* visitor ) throw( exceptions::ActiveMQException );
/**
* Get the current Wireformat Version
@@ -141,7 +157,7 @@
/**
* Sets if the cacheEnabled flag is on
- * @param cacheEnabled - ture to turn flag is on
+ * @param cacheEnabled - true to turn flag is on
*/
void setCacheEnabled( bool cacheEnabled );
@@ -153,7 +169,7 @@
/**
* Sets if the tightEncodingEnabled flag is on
- * @param tightEncodingEnabled - ture to turn flag is on
+ * @param tightEncodingEnabled - true to turn flag is on
*/
void setTightEncodingEnabled( bool tightEncodingEnabled );
@@ -165,7 +181,7 @@
/**
* Sets if the sizePrefixDisabled flag is on
- * @param sizePrefixDisabled - ture to turn flag is on
+ * @param sizePrefixDisabled - true to turn flag is on
*/
void setSizePrefixDisabled( bool sizePrefixDisabled );
@@ -230,10 +246,10 @@
public:
/**
- * Handles the marshalling of the objects properties into the
+ * Handles the marshaling of the objects properties into the
* internal byte array before the object is marshalled to the
* wire
- * @param wireFormat - the wireformatting controller
+ * @param wireFormat - the wire formatting controller
*/
virtual void beforeMarshal( wireformat::WireFormat* wireFormat AMQCPP_UNUSED )
throw ( decaf::io::IOException );
@@ -246,22 +262,6 @@
virtual void afterUnmarshal( wireformat::WireFormat* wireFormat AMQCPP_UNUSED )
throw ( decaf::io::IOException );
- private:
-
- std::vector<unsigned char> magic;
- std::vector<unsigned char> marshalledProperties;
-
- /**
- * WireFormatInfo Properties, unmarshalled from the marshalled
- * properties on use.
- */
- util::PrimitiveMap properties;
-
- /**
- * OpenWire Protocal Version
- */
- int version;
-
};
}}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/XATransactionId.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/XATransactionId.cpp?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/XATransactionId.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/XATransactionId.cpp Fri Feb 6 23:11:28 2009
@@ -54,12 +54,12 @@
////////////////////////////////////////////////////////////////////////////////
XATransactionId* XATransactionId::cloneDataStructure() const {
- XATransactionId* xATransactionId = new XATransactionId();
+ std::auto_ptr<XATransactionId> xATransactionId( new XATransactionId() );
// Copy the data from the base class or classes
xATransactionId->copyDataStructure( this );
- return xATransactionId;
+ return xATransactionId.release();
}
////////////////////////////////////////////////////////////////////////////////
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/XATransactionId.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/XATransactionId.h?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/XATransactionId.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/XATransactionId.h Fri Feb 6 23:11:28 2009
@@ -25,6 +25,7 @@
#include <activemq/util/Config.h>
#include <activemq/commands/TransactionId.h>
+#include <decaf/lang/Pointer.h>
#include <decaf/lang/Comparable.h>
#include <vector>
#include <string>
@@ -53,6 +54,8 @@
const static unsigned char ID_XATRANSACTIONID = 112;
+ typedef decaf::lang::PointerComparator<XATransactionId> COMPARATOR;
+
public:
XATransactionId();
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/srcmakefile.mk
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/srcmakefile.mk?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/srcmakefile.mk (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/srcmakefile.mk Fri Feb 6 23:11:28 2009
@@ -85,6 +85,7 @@
activemq/commands/ActiveMQMapMessage.h \
activemq/commands/ActiveMQMessage.h \
activemq/commands/ActiveMQMessageBase.h \
+ activemq/commands/ActiveMQMessageTemplate.h \
activemq/commands/ActiveMQObjectMessage.h \
activemq/commands/ActiveMQQueue.h \
activemq/commands/ActiveMQStreamMessage.h \
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQAckHandler.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQAckHandler.h?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQAckHandler.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQAckHandler.h Fri Feb 6 23:11:28 2009
@@ -19,12 +19,11 @@
#include <cms/CMSException.h>
#include <activemq/util/Config.h>
+#include <activemq/commands/Message.h>
namespace activemq{
namespace core{
- class ActiveMQMessage;
-
/**
* Interface class that is used to give CMS Messages an interface to
* Ack themselves with.
@@ -39,8 +38,8 @@
* @param message Message to Acknowledge
* @throw CMSException
*/
- virtual void acknowledgeMessage( const ActiveMQMessage* message )
- throw ( cms::CMSException ) = 0;
+ virtual void acknowledgeMessage( const commands::Message* message )
+ throw ( cms::CMSException ) = 0;
};
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp Fri Feb 6 23:11:28 2009
@@ -90,7 +90,8 @@
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::addDispatcher(
- const commands::ConsumerId& consumer, Dispatcher* dispatcher ) {
+ const decaf::lang::Pointer<commands::ConsumerId>& consumer, Dispatcher* dispatcher )
+ throw ( cms::CMSException ) {
// Add the consumer to the map.
synchronized( &dispatchers ) {
@@ -99,7 +100,9 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::removeDispatcher( const commands::ConsumerId& consumer ) {
+void ActiveMQConnection::removeDispatcher(
+ const decaf::lang::Pointer<commands::ConsumerId>& consumer )
+ throw ( cms::CMSException ) {
// Remove the consumer from the map.
synchronized( &dispatchers ) {
@@ -127,10 +130,10 @@
// Create and initialize a new SessionInfo object
std::auto_ptr<commands::SessionInfo> sessionInfo( new commands::SessionInfo() );
- std::auto_ptr<commands::SessionId> sessionId( new commands::SessionId() );
+ decaf::lang::Pointer<commands::SessionId> sessionId( new commands::SessionId() );
sessionId->setConnectionId( connectionInfo.getConnectionId()->getValue() );
sessionId->setValue( this->getNextSessionId() );
- sessionInfo->setSessionId( sessionId.release() );
+ sessionInfo->setSessionId( sessionId );
// Send the subscription message to the broker.
syncRequest( sessionInfo.get() );
@@ -180,7 +183,7 @@
// Add this producer from the set of active consumer.
synchronized( &activeProducers ) {
- activeProducers.setValue( producer->getProducerId(), producer );
+ activeProducers.setValue( producer->getProducerInfo().getProducerId(), producer );
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
@@ -189,14 +192,15 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::removeProducer( ActiveMQProducer* producer )
- throw ( cms::CMSException ) {
+void ActiveMQConnection::removeProducer(
+ const decaf::lang::Pointer<commands::ProducerId>& producerId )
+ throw ( cms::CMSException ) {
try {
// Remove this producer from the set of active consumer.
synchronized( &activeProducers ) {
- activeProducers.remove( producer->getProducerId() );
+ activeProducers.remove( producerId );
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
@@ -317,7 +321,7 @@
}
// Generate a connectionId
- commands::ConnectionId* connectionId = new commands::ConnectionId();
+ decaf::lang::Pointer<commands::ConnectionId> connectionId( new commands::ConnectionId() );
connectionId->setValue( UUID::randomUUID().toString() );
connectionInfo.setConnectionId( connectionId );
@@ -358,8 +362,8 @@
if( consumer->getPrefetchSize() == 0 ) {
commands::MessagePull messagePull;
- messagePull.setConsumerId( consumer->getConsumerId()->cloneDataStructure() );
- messagePull.setDestination( consumer->getDestination()->cloneDataStructure() );
+ messagePull.setConsumerId( consumer->getConsumerId() );
+ messagePull.setDestination( consumer->getDestination() );
messagePull.setTimeout( timeout );
this->oneway( &messagePull );
@@ -388,9 +392,11 @@
commands::DestinationInfo command;
- command.setConnectionId( connectionInfo.getConnectionId()->cloneDataStructure() );
+ command.setConnectionId( connectionInfo.getConnectionId() );
command.setOperationType( ActiveMQConstants::DESTINATION_REMOVE_OPERATION );
- command.setDestination( destination->cloneDataStructure() );
+ command.setDestination(
+ decaf::lang::Pointer<commands::ActiveMQDestination>(
+ destination->cloneDataStructure() ) );
// Send the message to the broker.
syncRequest( &command );
@@ -435,19 +441,15 @@
try{
+ std::auto_ptr<commands::Command> commandPtr( command );
+
if( typeid( *command ) == typeid( commands::MessageDispatch ) ) {
commands::MessageDispatch* dispatch =
dynamic_cast<commands::MessageDispatch*>( command );
- // Due to the severe suckiness of C++, in order to cast to
- // a type that is in a different branch of the inheritance hierarchy
- // we have to cast to the type at the "crotch" of the branch and then
- // we can implicitly cast up the other branch.
- core::ActiveMQMessage* message =
- dynamic_cast<core::ActiveMQMessage*>( dispatch->getMessage() );
- if( message == NULL ) {
- delete command;
+ // Check fo an empty Message, shouldn't ever happen but who knows.
+ if( dispatch->getMessage() == NULL ) {
throw ActiveMQException(
__FILE__, __LINE__,
"ActiveMQConnection::onCommand - "
@@ -458,28 +460,18 @@
Dispatcher* dispatcher = NULL;
synchronized( &dispatchers ) {
- dispatcher = dispatchers.getValue( *( dispatch->getConsumerId() ) );
+ dispatcher = dispatchers.getValue( dispatch->getConsumerId() );
// If we have no registered dispatcher, the consumer was probably
- // just closed. Just delete the message.
- if( dispatcher == NULL ) {
- delete message;
- } else {
+ // just closed.
+ if( dispatcher != NULL ) {
// Dispatch the message.
- DispatchData data( *( dispatch->getConsumerId() ), message );
+ DispatchData data( dispatch->getConsumerId(), dispatch->getMessage() );
dispatcher->dispatch( data );
}
}
- // Clear the Message as we've passed it onto the
- // listener, who is responsible for deleting it at
- // the appropriate time, which depends on things like
- // the session being transacted etc.
- dispatch->setMessage( NULL );
-
- delete command;
-
} else if( typeid( *command ) == typeid( commands::ProducerAck ) ) {
commands::ProducerAck* producerAck =
@@ -488,20 +480,18 @@
// Get the consumer info object for this consumer.
ActiveMQProducer* producer = NULL;
synchronized( &this->activeProducers ) {
- producer = this->activeProducers.getValue( *( producerAck->getProducerId() ) );
+ producer = this->activeProducers.getValue( producerAck->getProducerId() );
if( producer != NULL ){
producer->onProducerAck( *producerAck );
}
}
- delete command;
-
} else if( typeid( *command ) == typeid( commands::WireFormatInfo ) ) {
this->brokerWireFormatInfo.reset(
- dynamic_cast<commands::WireFormatInfo*>( command ) );
+ dynamic_cast<commands::WireFormatInfo*>( commandPtr.release() ) );
} else if( typeid( *command ) == typeid( commands::BrokerInfo ) ) {
this->brokerInfo.reset(
- dynamic_cast<commands::BrokerInfo*>( command ) );
+ dynamic_cast<commands::BrokerInfo*>( commandPtr.release() ) );
} else if( typeid( *command ) == typeid( commands::KeepAliveInfo ) ) {
if( command->isResponseRequired() ) {
@@ -510,8 +500,6 @@
oneway( command );
}
- delete command;
-
} else if( typeid( *command ) == typeid( commands::ShutdownInfo ) ) {
try {
@@ -523,11 +511,8 @@
}
} catch( ... ) { /* do nothing */ }
- delete command;
-
} else {
//LOGDECAF_WARN( logger, "Received an unknown command" );
- delete command;
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
@@ -590,8 +575,7 @@
// Create an exception to hold the error information.
commands::BrokerError* brokerError =
- dynamic_cast<commands::BrokerError*>(
- exceptionResponse->getException() );
+ exceptionResponse->getException()->cloneDataStructure();
BrokerException exception( __FILE__, __LINE__, brokerError );
// Throw the exception.
@@ -606,12 +590,12 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::disposeOf( commands::DataStructure* objectId )
+void ActiveMQConnection::disposeOf( const decaf::lang::Pointer<commands::DataStructure>& objectId )
throw ( ActiveMQException ) {
try{
commands::RemoveInfo command;
- command.setObjectId( objectId->cloneDataStructure() );
+ command.setObjectId( objectId );
oneway( &command );
}
AMQ_CATCH_RETHROW( ActiveMQException )
@@ -620,13 +604,13 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::disposeOf( commands::DataStructure* objectId,
+void ActiveMQConnection::disposeOf( const decaf::lang::Pointer<commands::DataStructure>& objectId,
unsigned int timeout )
throw ( ActiveMQException ) {
try{
commands::RemoveInfo command;
- command.setObjectId( objectId->cloneDataStructure() );
+ command.setObjectId( objectId );
this->syncRequest( &command, timeout );
}
AMQ_CATCH_RETHROW( ActiveMQException )
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h Fri Feb 6 23:11:28 2009
@@ -22,7 +22,6 @@
#include <activemq/util/Config.h>
#include <activemq/core/ActiveMQConnectionSupport.h>
#include <activemq/core/ActiveMQConnectionMetaData.h>
-#include <activemq/core/ActiveMQMessage.h>
#include <activemq/core/Dispatcher.h>
#include <activemq/commands/ActiveMQTempDestination.h>
#include <activemq/commands/BrokerInfo.h>
@@ -57,6 +56,16 @@
{
private:
+ typedef decaf::util::Map< decaf::lang::Pointer<commands::ConsumerId>,
+ Dispatcher*,
+ commands::ConsumerId::COMPARATOR > DispatcherMap;
+
+ typedef decaf::util::Map< decaf::lang::Pointer<commands::ProducerId>,
+ ActiveMQProducer*,
+ commands::ProducerId::COMPARATOR > ProducerMap;
+
+ private:
+
/**
* Sync object.
*/
@@ -81,12 +90,12 @@
/**
* Map of message dispatchers indexed by consumer id.
*/
- decaf::util::Map< commands::ConsumerId, Dispatcher* > dispatchers;
+ DispatcherMap dispatchers;
/**
- * Map of message dispatchers indexed by consumer id.
+ * Map of message producers indexed by consumer id.
*/
- decaf::util::Map< commands::ProducerId, ActiveMQProducer* > activeProducers;
+ ProducerMap activeProducers;
/**
* Maintain the set of all active sessions.
@@ -136,34 +145,38 @@
virtual void removeSession( ActiveMQSession* session ) throw ( cms::CMSException );
/**
- * Adds an active consumer to the Set of known consumers
- * @param consumer - The consumer to add to the the known set.
+ * Adds an active Producer to the Set of known producers.
+ * @param producer - The Producer to add from the the known set.
*/
virtual void addProducer( ActiveMQProducer* producer ) throw ( cms::CMSException );
/**
- * Removes an active consumer to the Set of known consumers
- * @param consumer - The consumer to remove from the the known set.
+ * Removes an active Producer to the Set of known producers.
+ * @param producerId - The ProducerId to remove from the the known set.
*/
- virtual void removeProducer( ActiveMQProducer* producer ) throw ( cms::CMSException );
+ virtual void removeProducer( const decaf::lang::Pointer<commands::ProducerId>& producerId )
+ throw ( cms::CMSException );
/**
* Adds a dispatcher for a consumer.
* @param consumer - The consumer for which to register a dispatcher.
* @param dispatcher - The dispatcher to handle incoming messages for the consumer.
*/
- virtual void addDispatcher( const commands::ConsumerId& consumer, Dispatcher* dispatcher );
+ virtual void addDispatcher(
+ const decaf::lang::Pointer<commands::ConsumerId>& consumer, Dispatcher* dispatcher )
+ throw ( cms::CMSException );
/**
* Removes the dispatcher for a consumer.
* @param consumer - The consumer for which to remove the dispatcher.
*/
- virtual void removeDispatcher( const commands::ConsumerId& consumer );
+ virtual void removeDispatcher( const decaf::lang::Pointer<commands::ConsumerId>& consumer )
+ throw ( cms::CMSException );
/**
* If supported sends a message pull request to the service provider asking
* for the delivery of a new message. This is used in the case where the
- * service provider has been configured with a zero prefectch or is only
+ * service provider has been configured with a zero prefetch or is only
* capable of delivering messages on a pull basis.
* @param consumer - the ConsumerInfo for the requesting Consumer.
* @param timeout - the time that the client is willing to wait.
@@ -362,7 +375,7 @@
* @throw ConnectorException if any problems occur from sending
* the message.
*/
- void disposeOf( commands::DataStructure* objectId )
+ void disposeOf( const decaf::lang::Pointer<commands::DataStructure>& objectId )
throw ( activemq::exceptions::ActiveMQException );
/**
@@ -373,7 +386,7 @@
* @throw ConnectorException if any problems occur from sending
* the message.
*/
- void disposeOf( commands::DataStructure* objectId, unsigned int timeout )
+ void disposeOf( const decaf::lang::Pointer<commands::DataStructure>& objectId, unsigned int timeout )
throw ( activemq::exceptions::ActiveMQException );
/**
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp Fri Feb 6 23:11:28 2009
@@ -31,7 +31,6 @@
#include <activemq/core/ActiveMQConstants.h>
#include <activemq/core/ActiveMQSession.h>
#include <activemq/core/ActiveMQTransactionContext.h>
-#include <activemq/core/ActiveMQMessage.h>
#include <cms/ExceptionListener.h>
using namespace std;
@@ -136,7 +135,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-ActiveMQMessage* ActiveMQConsumer::dequeue( int timeout )
+decaf::lang::Pointer<commands::Message> ActiveMQConsumer::dequeue( int timeout )
throw ( cms::CMSException ) {
try {
@@ -167,23 +166,23 @@
}
if( unconsumedMessages.empty() ) {
- return NULL;
+ break;
}
// Fetch the Message then copy it so it can be handed off
// to the user.
DispatchData data = unconsumedMessages.pop();
- // Get the message.
- ActiveMQMessage* message = data.getMessage();
+ decaf::lang::Pointer<commands::Message> message = data.getMessage();
// If it's expired, process the message and then go back to waiting.
- if( message->isExpired() ) {
+ if( dynamic_cast<commands::ActiveMQMessageBase*>( message.get() )->isExpired() ) {
- beforeMessageIsConsumed(message);
- afterMessageIsConsumed(message, true);
- if (timeout > 0) {
- timeout = std::max((int)(deadline - Date::getCurrentTimeMilliseconds()), 0);
+ beforeMessageIsConsumed( message );
+ afterMessageIsConsumed( message, true );
+ if( timeout > 0 ) {
+ timeout = std::max(
+ (int)( deadline - Date::getCurrentTimeMilliseconds() ), 0 );
}
// Go back to waiting for a non-expired message.
@@ -195,7 +194,7 @@
}
}
- return NULL;
+ return decaf::lang::Pointer<commands::Message>();
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -213,7 +212,7 @@
this->sendPullRequest( 0 );
// Wait for the next message.
- ActiveMQMessage* msg = dequeue( -1 );
+ decaf::lang::Pointer<commands::Message> msg = dequeue( -1 );
if( msg == NULL ) {
return NULL;
}
@@ -223,7 +222,8 @@
// Need to clone the message because the user is responsible for freeing
// its copy of the message.
- cms::Message* clonedMsg = dynamic_cast<cms::Message*>(msg)->clone();
+ cms::Message* clonedMsg =
+ dynamic_cast<cms::Message*>( msg->cloneDataStructure() );
// Post processing (may result in the message being deleted)
afterMessageIsConsumed( msg, false );
@@ -248,7 +248,7 @@
this->sendPullRequest( millisecs );
// Wait for the next message.
- ActiveMQMessage* msg = dequeue( millisecs );
+ decaf::lang::Pointer<commands::Message> msg = dequeue( millisecs );
if( msg == NULL ) {
return NULL;
}
@@ -258,7 +258,8 @@
// Need to clone the message because the user is responsible for freeing
// its copy of the message.
- cms::Message* clonedMsg = dynamic_cast<cms::Message*>(msg)->clone();
+ cms::Message* clonedMsg =
+ dynamic_cast<cms::Message*>( msg->cloneDataStructure() );
// Post processing (may result in the message being deleted)
afterMessageIsConsumed( msg, false );
@@ -283,7 +284,7 @@
this->sendPullRequest( -1 );
// Get the next available message, if there is one.
- ActiveMQMessage* msg = dequeue( 0 );
+ decaf::lang::Pointer<commands::Message> msg = dequeue( 0 );
if( msg == NULL ) {
return NULL;
}
@@ -293,7 +294,8 @@
// Need to clone the message because the user is responsible for freeing
// its copy of the message.
- cms::Message* clonedMsg = dynamic_cast<cms::Message*>(msg)->clone();
+ cms::Message* clonedMsg =
+ dynamic_cast<cms::Message*>( msg->cloneDataStructure() );
// Post processing (may result in the message being deleted)
afterMessageIsConsumed( msg, false );
@@ -338,7 +340,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::beforeMessageIsConsumed( ActiveMQMessage* message ) {
+void ActiveMQConsumer::beforeMessageIsConsumed( decaf::lang::Pointer<commands::Message>& message ) {
// If the Session is in ClientAcknowledge mode, then we set the
// handler in the message to this object and send it out. Otherwise
@@ -347,7 +349,7 @@
// Register ourself so that we can handle the Message's
// acknowledge method.
- message->setAckHandler( this );
+ dynamic_cast<commands::ActiveMQMessageBase*>( message.get() )->setAckHandler( this );
}
// If the session is transacted then we hand off the message to it to
@@ -373,17 +375,14 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::afterMessageIsConsumed( ActiveMQMessage* message,
+void ActiveMQConsumer::afterMessageIsConsumed( decaf::lang::Pointer<commands::Message>& message,
bool messageExpired AMQCPP_UNUSED ) {
try{
if( session->isAutoAcknowledge() || messageExpired ) {
- this->acknowledge( message, ActiveMQConstants::ACK_TYPE_CONSUMED );
+ this->acknowledge( message.get(), ActiveMQConstants::ACK_TYPE_CONSUMED );
}
-
- // The Message is cleaned up here.
- delete message;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -391,7 +390,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::acknowledgeMessage( const ActiveMQMessage* message )
+void ActiveMQConsumer::acknowledgeMessage( const commands::Message* message )
throw ( cms::CMSException ) {
try{
@@ -407,36 +406,30 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::acknowledge( const ActiveMQMessage* message, int ackType )
+void ActiveMQConsumer::acknowledge( const commands::Message* message, int ackType )
throw ( cms::CMSException ) {
try{
this->checkClosed();
- const commands::Message* amqMessage =
- dynamic_cast<const commands::Message*>( message );
-
- if( amqMessage == NULL ) {
+ if( message == NULL ) {
throw ActiveMQException(
__FILE__, __LINE__,
- "ActiveMQConsumer::acknowledge - "
- "Message was not a commands::Message derivation.");
+ "ActiveMQConsumer::acknowledge - Message passed to Ack was NULL.");
}
commands::MessageAck ack;
ack.setAckType( (int)ackType );
- ack.setConsumerId( consumerInfo->getConsumerId()->cloneDataStructure() );
- ack.setDestination( amqMessage->getDestination()->cloneDataStructure() );
- ack.setFirstMessageId( amqMessage->getMessageId()->cloneDataStructure() );
- ack.setLastMessageId( amqMessage->getMessageId()->cloneDataStructure() );
+ ack.setConsumerId( consumerInfo->getConsumerId() );
+ ack.setDestination( message->getDestination() );
+ ack.setFirstMessageId( message->getMessageId() );
+ ack.setLastMessageId( message->getMessageId() );
ack.setMessageCount( 1 );
if( this->session->getAcknowledgeMode() == cms::Session::SESSION_TRANSACTED ) {
- if( this->transaction == NULL ||
- this->transaction->getTransactionInfo() == NULL ||
- this->transaction->getTransactionInfo()->getTransactionId() == NULL ) {
+ if( this->transaction == NULL ) {
throw ActiveMQException(
__FILE__, __LINE__,
@@ -444,8 +437,7 @@
"Transacted Session, has no Transaction Info.");
}
- ack.setTransactionId(
- this->transaction->getTransactionInfo()->getTransactionId()->cloneDataStructure() );
+ ack.setTransactionId( this->transaction->getTransactionId() );
}
this->session->oneway( &ack );
@@ -460,12 +452,11 @@
try {
- ActiveMQMessage* message = data.getMessage();
+ decaf::lang::Pointer<commands::Message> message = data.getMessage();
// Don't dispatch expired messages, ack it and then destroy it
- if( message->isExpired() ) {
- this->acknowledge( message, ActiveMQConstants::ACK_TYPE_CONSUMED );
- delete message;
+ if( dynamic_cast<commands::ActiveMQMessageBase*>( message.get() )->isExpired() ) {
+ this->acknowledge( message.get(), ActiveMQConstants::ACK_TYPE_CONSUMED );
// stop now, don't queue
return;
@@ -473,13 +464,12 @@
// If we have a listener, send the message.
if( listener != NULL ) {
- ActiveMQMessage* message = data.getMessage();
// Preprocessing.
beforeMessageIsConsumed( message );
// Notify the listener
- listener->onMessage( dynamic_cast<cms::Message*>(message) );
+ listener->onMessage( dynamic_cast<cms::Message*>( message.get() ) );
// Postprocessing
afterMessageIsConsumed( message, false );
@@ -503,11 +493,8 @@
try {
- synchronized( &unconsumedMessages ) {
-
- while( !unconsumedMessages.empty() ) {
- delete unconsumedMessages.pop().getMessage();
- }
+ synchronized( &this->unconsumedMessages ) {
+ this->unconsumedMessages.clear();
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
@@ -531,10 +518,8 @@
if( this->consumerInfo->getPrefetchSize() == 0 ) {
commands::MessagePull messagePull;
- messagePull.setConsumerId(
- this->consumerInfo->getConsumerId()->cloneDataStructure() );
- messagePull.setDestination(
- this->consumerInfo->getDestination()->cloneDataStructure() );
+ messagePull.setConsumerId( this->consumerInfo->getConsumerId() );
+ messagePull.setDestination( this->consumerInfo->getDestination() );
messagePull.setTimeout( timeout );
this->session->oneway( &messagePull );
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h Fri Feb 6 23:11:28 2009
@@ -25,6 +25,7 @@
#include <activemq/util/Config.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/commands/ConsumerInfo.h>
+#include <activemq/commands/ActiveMQMessageBase.h>
#include <activemq/core/ActiveMQAckHandler.h>
#include <activemq/core/Dispatcher.h>
@@ -73,7 +74,7 @@
/**
* Queue of consumed messages.
*/
- decaf::util::Queue<cms::Message*> dispatchedMessages;
+ decaf::util::Queue< decaf::lang::Pointer<commands::Message> > dispatchedMessages;
/**
* Boolean that indicates if the consumer has been closed
@@ -154,7 +155,7 @@
* @param message the Message to Acknowledge
* @throw CMSException
*/
- virtual void acknowledgeMessage( const ActiveMQMessage* message )
+ virtual void acknowledgeMessage( const commands::Message* message )
throw ( cms::CMSException );
public: // Dispatcher Methods
@@ -175,7 +176,7 @@
* @param ackType the Type of ack to send, (connector enum)
* @throw CMSException
*/
- virtual void acknowledge( const ActiveMQMessage* message, int ackType )
+ virtual void acknowledge( const commands::Message* message, int ackType )
throw ( cms::CMSException );
/**
@@ -224,20 +225,23 @@
* @throws InvalidStateException if this consumer is closed upon
* entering this method.
*/
- ActiveMQMessage* dequeue( int timeout ) throw ( cms::CMSException );
+ decaf::lang::Pointer<commands::Message> dequeue( int timeout )
+ throw ( cms::CMSException );
/**
* Pre-consume processing
* @param message - the message being consumed.
*/
- void beforeMessageIsConsumed( ActiveMQMessage* message );
+ void beforeMessageIsConsumed(
+ decaf::lang::Pointer<commands::Message>& message );
/**
* Post-consume processing
* @param message - the consumed message
* @param messageExpired - flag indicating if the message has expired.
*/
- void afterMessageIsConsumed( ActiveMQMessage* message, bool messageExpired );
+ void afterMessageIsConsumed(
+ decaf::lang::Pointer<commands::Message>& message, bool messageExpired );
private:
@@ -246,7 +250,7 @@
* for the delivery of a new message. This is used in the case where the
* service provider has been configured with a zero prefectch or is only
* capable of delivering messages on a pull basis. No request is made if
- * there are already messages in the uncomsumed queue since there's no need
+ * there are already messages in the unconsumed queue since there's no need
* for a server round-trip in that instance.
* @param timeout - the time that the client is willing to wait.
*/
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp Fri Feb 6 23:11:28 2009
@@ -21,7 +21,6 @@
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/core/ActiveMQTransactionContext.h>
#include <activemq/core/ActiveMQConsumer.h>
-#include <activemq/core/ActiveMQMessage.h>
#include <activemq/core/ActiveMQProducer.h>
#include <activemq/core/ActiveMQSessionExecutor.h>
#include <activemq/util/ActiveMQProperties.h>
@@ -263,7 +262,7 @@
// Register this as a message dispatcher for the consumer since we
// could start receiving messages from the broker right away once we
// send the ConsumerInfo command.
- this->connection->addDispatcher( *( consumerInfo->getConsumerId() ), this );
+ this->connection->addDispatcher( consumerInfo->getConsumerId(), this );
// Create the consumer instance.
std::auto_ptr<ActiveMQConsumer> consumer(
@@ -271,7 +270,8 @@
// Add the consumer to the map.
synchronized( &this->consumers ) {
- this->consumers.setValue( consumer->getConsumerId(), consumer.get() );
+ this->consumers.setValue(
+ consumer->getConsumerInfo().getConsumerId(), consumer.get() );
}
return consumer.release();
@@ -305,7 +305,7 @@
// Register this as a message dispatcher for the consumer since we
// could start receiving messages from the broker right away once we
// send the ConsumerInfo command.
- this->connection->addDispatcher( *( consumerInfo->getConsumerId() ), this );
+ this->connection->addDispatcher( consumerInfo->getConsumerId(), this );
// Create the consumer instance.
std::auto_ptr<ActiveMQConsumer> consumer(
@@ -313,7 +313,8 @@
// Add the consumer to the map.
synchronized( &this->consumers ) {
- this->consumers.setValue( consumer->getConsumerId(), consumer.get() );
+ this->consumers.setValue(
+ consumer->getConsumerInfo().getConsumerId(), consumer.get() );
}
return consumer.release();
@@ -332,13 +333,13 @@
this->checkClosed();
- std::auto_ptr<commands::ProducerId> producerId( new commands::ProducerId() );
+ decaf::lang::Pointer<commands::ProducerId> producerId( new commands::ProducerId() );
producerId->setConnectionId( this->sessionInfo->getSessionId()->getConnectionId() );
producerId->setSessionId( this->sessionInfo->getSessionId()->getValue() );
producerId->setValue( this->connection->getNextProducerId() );
std::auto_ptr<commands::ProducerInfo> producerInfo( new commands::ProducerInfo() );
- producerInfo->setProducerId( producerId.release() );
+ producerInfo->setProducerId( producerId );
producerInfo->setWindowSize( this->connection->getProducerWindowSize() );
// Producers are allowed to have NULL destinations. In this case, the
@@ -356,7 +357,9 @@
// Get any options specified in the destination and apply them to the
// ProducerInfo object.
- producerInfo->setDestination( amqDestination->cloneDataStructure() );
+ producerInfo->setDestination(
+ decaf::lang::Pointer<commands::ActiveMQDestination>(
+ amqDestination->cloneDataStructure() ) );
const ActiveMQProperties& options = amqDestination->getOptions();
producerInfo->setDispatchAsync( Boolean::parseBoolean(
options.getProperty( "producer.dispatchAsync", "false" )) );
@@ -370,7 +373,7 @@
synchronized( &this->producers ) {
// Place the Producer into the Map.
- this->producers.setValue( producer->getProducerId(), producer.get() );
+ this->producers.setValue( producerId, producer.get() );
}
// Add to the Connections list
@@ -573,20 +576,18 @@
}
// Clear any old data that might be in the message object
- delete amqMessage->getMessageId();
- delete amqMessage->getProducerId();
- delete amqMessage->getTransactionId();
+ amqMessage->getMessageId().reset( NULL );
+ amqMessage->getProducerId().reset( NULL );
+ amqMessage->getTransactionId().reset( NULL );
// Always assign the message ID, regardless of the disable
// flag. Not adding a message ID will cause an NPE at the broker.
- commands::MessageId* id = new commands::MessageId();
- id->setProducerId(
- producer->getProducerId().cloneDataStructure() );
+ decaf::lang::Pointer<commands::MessageId> id( new commands::MessageId() );
+ id->setProducerId( producer->getProducerInfo().getProducerId() );
id->setProducerSequenceId( this->connection->getNextProducerSequenceId() );
amqMessage->setMessageId( id );
- amqMessage->setProducerId(
- producer->getProducerId().cloneDataStructure() );
+ amqMessage->setProducerId( producer->getProducerInfo().getProducerId() );
if( this->getAcknowledgeMode() == cms::Session::SESSION_TRANSACTED ) {
@@ -597,8 +598,7 @@
"Transacted Session, has no Transaction Info.");
}
- amqMessage->setTransactionId(
- this->transaction->getTransactionId()->cloneDataStructure() );
+ amqMessage->setTransactionId( this->transaction->getTransactionId() );
}
if( this->connection->getSendTimeout() <= 0 &&
@@ -646,8 +646,7 @@
std::auto_ptr<commands::RemoveSubscriptionInfo> rsi(
new commands::RemoveSubscriptionInfo() );
- rsi->setConnectionId(
- this->connection->getConnectionId().cloneDataStructure() );
+ rsi->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
rsi->setSubcriptionName( name );
rsi->setClientId( this->connection->getConnectionInfo().getClientId() );
@@ -720,14 +719,14 @@
this->checkClosed();
std::auto_ptr<commands::ConsumerInfo> consumerInfo( new commands::ConsumerInfo() );
- std::auto_ptr<commands::ConsumerId> consumerId( new commands::ConsumerId() );
+ decaf::lang::Pointer<commands::ConsumerId> consumerId( new commands::ConsumerId() );
consumerId->setConnectionId(
this->connection->getConnectionId().getValue() );
consumerId->setSessionId( this->sessionInfo->getSessionId()->getValue() );
consumerId->setValue( this->connection->getNextSessionId() );
- consumerInfo->setConsumerId( consumerId.release() );
+ consumerInfo->setConsumerId( consumerId );
// Cast the destination to an OpenWire destination, so we can
// get all the goodies.
@@ -739,7 +738,9 @@
"Destination was either NULL or not created by this OpenWireConnector" );
}
- consumerInfo->setDestination( amqDestination->cloneDataStructure() );
+ consumerInfo->setDestination(
+ decaf::lang::Pointer<commands::ActiveMQDestination>(
+ amqDestination->cloneDataStructure() ) );
return consumerInfo.release();
}
@@ -751,7 +752,7 @@
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSession::applyDestinationOptions( commands::ConsumerInfo* info ) {
- const commands::ActiveMQDestination* amqDestination = info->getDestination();
+ decaf::lang::Pointer<commands::ActiveMQDestination> amqDestination = info->getDestination();
// Get any options specified in the destination and apply them to the
// ConsumerInfo object.
@@ -855,9 +856,11 @@
try {
commands::DestinationInfo command;
- command.setConnectionId( this->connection->getConnectionId().cloneDataStructure() );
+ command.setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
command.setOperationType( ActiveMQConstants::DESTINATION_ADD_OPERATION );
- command.setDestination( tempDestination->cloneDataStructure() );
+ command.setDestination(
+ decaf::lang::Pointer<commands::ActiveMQTempDestination>(
+ tempDestination->cloneDataStructure() ) );
// Send the message to the broker.
this->connection->syncRequest( &command );
@@ -879,9 +882,11 @@
commands::DestinationInfo command;
- command.setConnectionId( this->connection->getConnectionId().cloneDataStructure() );
+ command.setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
command.setOperationType( ActiveMQConstants::DESTINATION_REMOVE_OPERATION );
- command.setDestination( tempDestination->cloneDataStructure() );
+ command.setDestination(
+ decaf::lang::Pointer<commands::ActiveMQTempDestination>(
+ tempDestination->cloneDataStructure() ) );
// Send the message to the broker.
this->connection->syncRequest( &command );
@@ -911,7 +916,7 @@
try{
std::auto_ptr<commands::LocalTransactionId> id( new commands::LocalTransactionId() );
- id->setConnectionId( this->connection->getConnectionId().cloneDataStructure() );
+ id->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
id->setValue( this->connection->getNextTransactionId() );
return id.release();
@@ -957,7 +962,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::disposeOf( commands::ConsumerId* id )
+void ActiveMQSession::disposeOf( decaf::lang::Pointer<commands::ConsumerId> id )
throw ( activemq::exceptions::ActiveMQException ) {
try{
@@ -966,7 +971,7 @@
synchronized( &this->consumers ) {
- if( this->consumers.containsKey( *id ) ) {
+ if( this->consumers.containsKey( id ) ) {
// If the executor thread is currently running, stop it.
bool wasStarted = isStarted();
@@ -974,11 +979,11 @@
stop();
}
- ActiveMQConsumer* consumer = this->consumers.getValue( *id );
- this->connection->removeDispatcher( consumer->getConsumerId() );
+ // Remove this Id both from the Sessions Map of Consumers and from
+ // the Connection.
+ this->connection->removeDispatcher( id );
this->connection->disposeOf( id );
-
- this->consumers.remove( *id );
+ this->consumers.remove( id );
//TODO
// // Remove this consumer from the Transaction if we are transacted
@@ -990,13 +995,7 @@
if( this->executor.get() != NULL ) {
// Purge any pending messages for this consumer.
- vector<ActiveMQMessage*> messages =
- this->executor->purgeConsumerMessages( consumer );
-
- // Destroy the messages.
- for( unsigned int ix = 0; ix < messages.size(); ++ix ) {
- delete messages[ix];
- }
+ this->executor->purgeConsumerMessages( id );
}
if( wasStarted ) {
@@ -1011,7 +1010,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::disposeOf( commands::ProducerId* id )
+void ActiveMQSession::disposeOf( decaf::lang::Pointer<commands::ProducerId> id )
throw ( activemq::exceptions::ActiveMQException ) {
try{
@@ -1020,13 +1019,11 @@
synchronized( &this->producers ) {
- if( this->producers.containsKey( *id ) ) {
+ if( this->producers.containsKey( id ) ) {
- ActiveMQProducer* producer = this->producers.getValue( *id );
- this->connection->removeProducer( producer );
+ this->connection->removeProducer( id );
this->connection->disposeOf( id );
-
- this->producers.remove( *id );
+ this->producers.remove( id );
}
}
}
@@ -1034,3 +1031,14 @@
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQConsumer* ActiveMQSession::getConsumer( const decaf::lang::Pointer<commands::ConsumerId>& id ) {
+
+ synchronized( &this->consumers ) {
+ if( this->consumers.containsKey( id ) ) {
+ return this->consumers.getValue( id );
+ }
+ }
+ return NULL;
+}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h?rev=741774&r1=741773&r2=741774&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h Fri Feb 6 23:11:28 2009
@@ -50,8 +50,17 @@
class ActiveMQConsumer;
class ActiveMQSessionExecutor;
- class AMQCPP_API ActiveMQSession : public cms::Session,
- public Dispatcher {
+ class AMQCPP_API ActiveMQSession : public cms::Session, public Dispatcher {
+ private:
+
+ typedef decaf::util::Map< decaf::lang::Pointer<commands::ConsumerId>,
+ ActiveMQConsumer*,
+ commands::ConsumerId::COMPARATOR> ConsumersMap;
+
+ typedef decaf::util::Map< decaf::lang::Pointer<commands::ProducerId>,
+ ActiveMQProducer*,
+ commands::ProducerId::COMPARATOR> ProducersMap;
+
private:
/**
@@ -77,12 +86,12 @@
/**
* Map of consumers.
*/
- decaf::util::Map< commands::ConsumerId, ActiveMQConsumer*> consumers;
+ ConsumersMap consumers;
/**
* Map of producers.
*/
- decaf::util::Map< commands::ProducerId, ActiveMQProducer*> producers;
+ ProducersMap producers;
/**
* Sends incoming messages to the registered consumers.
@@ -103,9 +112,11 @@
virtual ~ActiveMQSession();
- decaf::util::Map< commands::ConsumerId, ActiveMQConsumer*>& getConsumers() {
- return consumers;
- }
+ /**
+ * Looks up a consumer of this Session by a Pointer to its Id.
+ * @param id - a Pointer to a ConsumerId to match in the Map of Consumers.
+ */
+ ActiveMQConsumer* getConsumer( const decaf::lang::Pointer<commands::ConsumerId>& id );
/**
* Redispatches the given set of unconsumed messages to the consumers.
@@ -425,7 +436,7 @@
* and clean up any resources associated with it.
* @param consumerId - the Id of the Consumer to dispose.
*/
- void disposeOf( commands::ConsumerId* id )
+ void disposeOf( decaf::lang::Pointer<commands::ConsumerId> id )
throw ( activemq::exceptions::ActiveMQException );
/**
@@ -433,7 +444,7 @@
* and clean up any resources associated with it.
* @param consumerId - the Id of the Producer to dispose.
*/
- void disposeOf( commands::ProducerId* id )
+ void disposeOf( decaf::lang::Pointer<commands::ProducerId> id )
throw ( activemq::exceptions::ActiveMQException );
private: