You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/02/19 02:06:08 UTC

svn commit: r745701 [3/3] - in /activemq/activemq-cpp/trunk/src: main/ main/activemq/core/ main/activemq/state/ main/activemq/transport/ main/activemq/transport/correlator/ main/activemq/transport/failover/ main/activemq/transport/logging/ main/activem...

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.h Thu Feb 19 01:06:07 2009
@@ -26,6 +26,8 @@
 namespace transport{
 namespace tcp{
 
+    using decaf::lang::Pointer;
+
     /**
      * Factory Responsible for creating the TcpTransport.
      */
@@ -39,14 +41,17 @@
         /**
          * Creates a slimed down Transport instance which can be used in composite
          * transport instances.
+         *
          * @param location - URI location to connect to.
          * @param wireformat - the assigned WireFormat for the new Transport.
          * @param properties - Properties to apply to the transport.
+         *
+         * @return new Pointer to a TcpTransport.
          * @throws ActiveMQexception if an error occurs
          */
-        virtual Transport* doCreateComposite( const decaf::net::URI& location,
-                                              wireformat::WireFormat* wireFormat,
-                                              const decaf::util::Properties& properties )
+        virtual Pointer<Transport> doCreateComposite( const decaf::net::URI& location,
+                                                      const Pointer<wireformat::WireFormat>& wireFormat,
+                                                      const decaf::util::Properties& properties )
             throw ( exceptions::ActiveMQException );
 
     };

Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormat.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormat.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormat.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormat.h Thu Feb 19 01:06:07 2009
@@ -23,6 +23,7 @@
 #include <decaf/io/DataInputStream.h>
 #include <decaf/io/DataOutputStream.h>
 #include <decaf/io/IOException.h>
+#include <decaf/lang/Pointer.h>
 
 #include <activemq/util/Config.h>
 #include <activemq/commands/Command.h>
@@ -33,6 +34,8 @@
 namespace activemq{
 namespace wireformat{
 
+    using decaf::lang::Pointer;
+
     /**
      * Provides a mechanism to marshal commands into and out of packets
      * or into and out of streams, Channels and Datagrams.
@@ -50,7 +53,8 @@
          * @param out - the output stream to write the command to.
          * @throws IOException
          */
-        virtual void marshal( commands::Command* command, decaf::io::DataOutputStream* out )
+        virtual void marshal( const Pointer<commands::Command>& command,
+                              decaf::io::DataOutputStream* out )
             throw ( decaf::io::IOException ) = 0;
 
         /**
@@ -59,7 +63,7 @@
          * @returns the newly marshaled Command, caller owns the pointer
          * @throws IOException
          */
-        virtual commands::Command* unmarshal( decaf::io::DataInputStream* in )
+        virtual Pointer<commands::Command> unmarshal( decaf::io::DataInputStream* in )
             throw ( decaf::io::IOException ) = 0;
 
         /**
@@ -84,11 +88,12 @@
         /**
          * If the Transport Provides a Negotiator this method will create and return
          * a news instance of the Negotiator.
-         * @returns new instance of a WireFormatNegotiator.
+         * @returns new instance of a WireFormatNegotiator as a Pointer<Transport>.
          * @throws UnsupportedOperationException if the WireFormat doesn't have a Negotiator.
          */
-        virtual WireFormatNegotiator* createNegotiator( transport::Transport* transport )
-            throw( decaf::lang::exceptions::UnsupportedOperationException ) = 0;
+        virtual Pointer<transport::Transport> createNegotiator(
+            const Pointer<transport::Transport>& transport )
+                throw( decaf::lang::exceptions::UnsupportedOperationException ) = 0;
 
     };
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormatFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormatFactory.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormatFactory.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormatFactory.h Thu Feb 19 01:06:07 2009
@@ -21,11 +21,14 @@
 #include <activemq/util/Config.h>
 #include <activemq/wireformat/WireFormat.h>
 #include <decaf/util/Properties.h>
+#include <decaf/lang/Pointer.h>
 #include <decaf/lang/exceptions/IllegalStateException.h>
 
 namespace activemq{
 namespace wireformat{
 
+    using decaf::lang::Pointer;
+
     /**
      * The WireFormatFactory is the interface that all WireFormatFactory
      * classes must extend.  The Factory creates a WireFormat Object based on
@@ -40,9 +43,11 @@
         /**
          * Creates a new WireFormat Object passing it a set of
          * properties from which it can obtain any optional settings
+         *
          * @param properties - the Properties for this WireFormat
+         * @return Pointer to a new instance of a WireFormat object.
          */
-        virtual WireFormat* createWireFormat( const decaf::util::Properties& properties )
+        virtual Pointer<WireFormat> createWireFormat( const decaf::util::Properties& properties )
             throw ( decaf::lang::exceptions::IllegalStateException ) = 0;
 
     };

Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormatNegotiator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormatNegotiator.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormatNegotiator.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormatNegotiator.h Thu Feb 19 01:06:07 2009
@@ -20,10 +20,13 @@
 
 #include <activemq/util/Config.h>
 #include <activemq/transport/TransportFilter.h>
+#include <decaf/lang/Pointer.h>
 
 namespace activemq {
 namespace wireformat {
 
+    using decaf::lang::Pointer;
+
     /**
      * Defines a WireFormatNegotiator which allows a WireFormat to
      */
@@ -33,10 +36,9 @@
         /**
          * Constructor.
          * @param next - the next Transport in the chain
-         * @param own - true if this filter owns the next and should delete it
          */
-        WireFormatNegotiator( Transport* next, const bool own = true ) :
-            transport::TransportFilter( next, own ) {}
+        WireFormatNegotiator( const Pointer<transport::Transport>& next ) :
+            transport::TransportFilter( next ) {}
 
         virtual ~WireFormatNegotiator() {}
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp Thu Feb 19 01:06:07 2009
@@ -86,11 +86,11 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-WireFormatNegotiator* OpenWireFormat::createNegotiator( transport::Transport* transport )
+Pointer<Transport> OpenWireFormat::createNegotiator( const Pointer<Transport>& transport )
     throw( decaf::lang::exceptions::UnsupportedOperationException ) {
 
     try{
-        return new OpenWireFormatNegotiator( this, transport, true );
+        return Pointer<Transport>( new OpenWireFormatNegotiator( this, transport ) );
     }
     AMQ_CATCH_RETHROW( UnsupportedOperationException )
     AMQ_CATCHALL_THROW( UnsupportedOperationException )
@@ -149,13 +149,13 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 void OpenWireFormat::setPreferedWireFormatInfo(
-    commands::WireFormatInfo* info ) throw ( IllegalStateException ) {
+    const Pointer<commands::WireFormatInfo>& info ) throw ( IllegalStateException ) {
 
-    this->preferedWireFormatInfo.reset( info );
+    this->preferedWireFormatInfo = info;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void OpenWireFormat::marshal( commands::Command* command,
+void OpenWireFormat::marshal( const Pointer<commands::Command>& command,
                               decaf::io::DataOutputStream* dataOut )
     throw ( decaf::io::IOException ) {
 
@@ -166,7 +166,7 @@
         if( command != NULL ) {
 
             DataStructure* dataStructure =
-                dynamic_cast< DataStructure* >( command );
+                dynamic_cast< DataStructure* >( command.get() );
 
             unsigned char type = dataStructure->getDataStructureType();
 
@@ -226,7 +226,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-commands::Command* OpenWireFormat::unmarshal( decaf::io::DataInputStream* dis )
+Pointer<commands::Command> OpenWireFormat::unmarshal( decaf::io::DataInputStream* dis )
     throw ( decaf::io::IOException ) {
 
     try {
@@ -236,7 +236,7 @@
         }
 
         // Get the unmarshalled DataStructure
-        DataStructure* data = doUnmarshal( dis );
+        Pointer<DataStructure> data( doUnmarshal( dis ) );
 
         if( data == NULL ) {
             throw IOException(
@@ -246,19 +246,10 @@
         }
 
         // Now all unmarshals from this level should result in an object
-        // that is a commands::Command type, if its not then we throw an
-        // exception.
-        commands::Command* command =
-            dynamic_cast< commands::Command* >( data );
-
-        if( command == NULL ) {
-            delete data;
-
-            throw IOException(
-                __FILE__, __LINE__,
-                "OpenWireFormat::doUnmarshal - "
-                "Unmarshalled a non Command Type" );
-        }
+        // that is a commands::Command type, if its not then the cast will
+        // throw an ClassCastException.
+        Pointer<Command> command =
+            data.dynamicCast<Command, Pointer<Command>::CounterType>();
 
         return command;
     }
@@ -513,10 +504,10 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void OpenWireFormat::renegotiateWireFormat( WireFormatInfo* info )
+void OpenWireFormat::renegotiateWireFormat( const WireFormatInfo& info )
     throw ( IllegalStateException ) {
 
-    if( preferedWireFormatInfo.get() == NULL ) {
+    if( preferedWireFormatInfo == NULL ) {
         throw IllegalStateException(
             __FILE__, __LINE__,
             "OpenWireFormat::renegotiateWireFormat - "
@@ -524,15 +515,15 @@
     }
 
     this->setVersion( Math::min( preferedWireFormatInfo->getVersion(),
-                                 info->getVersion() ) );
-    this->stackTraceEnabled = info->isStackTraceEnabled() &&
+                                 info.getVersion() ) );
+    this->stackTraceEnabled = info.isStackTraceEnabled() &&
                               preferedWireFormatInfo->isStackTraceEnabled();
-    this->tcpNoDelayEnabled = info->isTcpNoDelayEnabled() &&
+    this->tcpNoDelayEnabled = info.isTcpNoDelayEnabled() &&
                               preferedWireFormatInfo->isTcpNoDelayEnabled();
-    this->cacheEnabled = info->isCacheEnabled() &&
+    this->cacheEnabled = info.isCacheEnabled() &&
                          preferedWireFormatInfo->isCacheEnabled();
-    this->tightEncodingEnabled = info->isTightEncodingEnabled() &&
+    this->tightEncodingEnabled = info.isTightEncodingEnabled() &&
                                  preferedWireFormatInfo->isTightEncodingEnabled();
-    this->sizePrefixDisabled = info->isSizePrefixDisabled() &&
+    this->sizePrefixDisabled = info.isSizePrefixDisabled() &&
                                preferedWireFormatInfo->isSizePrefixDisabled();
 }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.h Thu Feb 19 01:06:07 2009
@@ -23,6 +23,7 @@
 #include <activemq/commands/DataStructure.h>
 #include <activemq/wireformat/WireFormat.h>
 #include <activemq/wireformat/openwire/utils/BooleanStream.h>
+#include <decaf/lang/Pointer.h>
 #include <decaf/util/Properties.h>
 #include <decaf/lang/exceptions/IllegalStateException.h>
 #include <decaf/lang/exceptions/IllegalArgumentException.h>
@@ -36,7 +37,39 @@
     class DataStreamMarshaller;
 }
 
+    using decaf::lang::Pointer;
+
     class AMQCPP_API OpenWireFormat : public wireformat::WireFormat {
+    protected:
+
+        // Declared here to make life easier.
+        static const unsigned char NULL_TYPE;
+
+        // V1 if the default version we start at.
+        static const int DEFAULT_VERSION = 1;
+
+    private:
+
+        // Configuration parameters
+        decaf::util::Properties properties;
+
+        // Preferred WireFormatInfo
+        Pointer<commands::WireFormatInfo> preferedWireFormatInfo;
+
+        // Marshalers
+        std::vector< marshal::DataStreamMarshaller* > dataMarshallers;
+
+        // Uniquely Generated ID, initialize in the Ctor
+        std::string id;
+
+        // WireFormat Data
+        int version;
+        bool stackTraceEnabled;
+        bool tcpNoDelayEnabled;
+        bool cacheEnabled;
+        bool tightEncodingEnabled;
+        bool sizePrefixDisabled;
+
     public:
 
         /**
@@ -61,8 +94,9 @@
          * a news instance of the Negotiator.
          * @returns new instance of a WireFormatNegotiator.
          */
-        virtual wireformat::WireFormatNegotiator* createNegotiator( transport::Transport* transport )
-            throw( decaf::lang::exceptions::UnsupportedOperationException );
+        virtual Pointer<transport::Transport> createNegotiator(
+            const Pointer<transport::Transport>& transport )
+                throw( decaf::lang::exceptions::UnsupportedOperationException );
 
         /**
          * Allows an external source to add marshalers to this object for
@@ -77,7 +111,7 @@
          * @param out - the output stream to write the command to.
          * @throws IOException
          */
-        virtual void marshal( commands::Command* command,
+        virtual void marshal( const Pointer<commands::Command>& command,
                               decaf::io::DataOutputStream* dataOut )
             throw ( decaf::io::IOException );
 
@@ -87,7 +121,7 @@
          * @returns the newly marshaled Command, caller owns the pointer
          * @throws IOException
          */
-        virtual commands::Command* unmarshal( decaf::io::DataInputStream* dis )
+        virtual Pointer<commands::Command> unmarshal( decaf::io::DataInputStream* dis )
             throw ( decaf::io::IOException );
 
         /**
@@ -159,22 +193,22 @@
          * @param info - The new Wireformat Info settings
          * @throws IllegalStateException is the params can't be negotiated.
          */
-        void renegotiateWireFormat( commands::WireFormatInfo* info )
+        void renegotiateWireFormat( const commands::WireFormatInfo& info )
             throw ( decaf::lang::exceptions::IllegalStateException );
 
         /**
          * Configures this object using the provided WireformatInfo object
          * @param info - a WireFormatInfo object, takes ownership.
          */
-        virtual void setPreferedWireFormatInfo( commands::WireFormatInfo* info )
+        virtual void setPreferedWireFormatInfo( const Pointer<commands::WireFormatInfo>& info )
             throw ( decaf::lang::exceptions::IllegalStateException );
 
         /**
          * Gets the Preferred WireFormatInfo object that this class holds
          * @return pointer to a preferred WireFormatInfo object
          */
-        virtual commands::WireFormatInfo* getPreferedWireFormatInfo() const {
-            return this->preferedWireFormatInfo.get();
+        virtual const Pointer<commands::WireFormatInfo>& getPreferedWireFormatInfo() const {
+            return this->preferedWireFormatInfo;
         }
 
         /**
@@ -292,36 +326,6 @@
          */
         void destroyMarshalers();
 
-    protected:
-
-        // Declared here to make life easier.
-        static const unsigned char NULL_TYPE;
-
-        // V1 if the default version we start at.
-        static const int DEFAULT_VERSION = 1;
-
-    private:
-
-        // This object config data
-        decaf::util::Properties properties;
-
-        // Preferred WireFormatInfo
-        std::auto_ptr<commands::WireFormatInfo> preferedWireFormatInfo;
-
-        // Marshalers
-        std::vector< marshal::DataStreamMarshaller* > dataMarshallers;
-
-        // Uniquely Generated ID, initialize in the Ctor
-        std::string id;
-
-        // WireFormat Data
-        int version;
-        bool stackTraceEnabled;
-        bool tcpNoDelayEnabled;
-        bool cacheEnabled;
-        bool tightEncodingEnabled;
-        bool sizePrefixDisabled;
-
     };
 
 }}}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.cpp Thu Feb 19 01:06:07 2009
@@ -20,6 +20,7 @@
 
 #include <decaf/lang/Boolean.h>
 #include <decaf/lang/Integer.h>
+#include <decaf/lang/Pointer.h>
 
 using namespace std;
 using namespace activemq;
@@ -33,13 +34,13 @@
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-WireFormat* OpenWireFormatFactory::createWireFormat(
+Pointer<WireFormat> OpenWireFormatFactory::createWireFormat(
     const decaf::util::Properties& properties )
         throw ( decaf::lang::exceptions::IllegalStateException ) {
 
     try{
 
-        WireFormatInfo* info = new WireFormatInfo();
+        Pointer<WireFormatInfo> info( new WireFormatInfo() );
 
         // Configure the version to use
         info->setVersion( Integer::parseInt(
@@ -63,12 +64,12 @@
                                     "false" ) ) );
 
         // Create the Openwire Format Object
-        OpenWireFormat* f = new OpenWireFormat( properties );
+        Pointer<OpenWireFormat> wireFormat( new OpenWireFormat( properties ) );
 
         // give the format object the ownership
-        f->setPreferedWireFormatInfo( info );
+        wireFormat->setPreferedWireFormatInfo( info );
 
-        return f;
+        return wireFormat;
     }
     AMQ_CATCH_RETHROW( IllegalStateException )
     AMQ_CATCHALL_THROW( IllegalStateException )

Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.h Thu Feb 19 01:06:07 2009
@@ -22,12 +22,15 @@
 #include <activemq/wireformat/WireFormatFactory.h>
 #include <activemq/commands/WireFormatInfo.h>
 #include <decaf/lang/exceptions/IllegalStateException.h>
+#include <decaf/lang/Pointer.h>
 #include <decaf/util/Properties.h>
 
 namespace activemq{
 namespace wireformat{
 namespace openwire{
 
+    using decaf::lang::Pointer;
+
     class AMQCPP_API OpenWireFormatFactory : public wireformat::WireFormatFactory {
     public:
 
@@ -54,7 +57,7 @@
          * properties from which it can obtain any optional settings
          * @param properties - the Properties for this WireFormat
          */
-        virtual wireformat::WireFormat* createWireFormat(
+        virtual Pointer<wireformat::WireFormat> createWireFormat(
             const decaf::util::Properties& properties )
                 throw ( decaf::lang::exceptions::IllegalStateException );
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp Thu Feb 19 01:06:07 2009
@@ -23,6 +23,7 @@
 
 using namespace std;
 using namespace activemq;
+using namespace activemq::commands;
 using namespace activemq::exceptions;
 using namespace activemq::wireformat;
 using namespace activemq::wireformat::openwire;
@@ -32,15 +33,14 @@
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-OpenWireFormatNegotiator::OpenWireFormatNegotiator( OpenWireFormat* openWireFormat,
-                                                    Transport* next,
-                                                    bool own ) :
-    WireFormatNegotiator( next, own ),
+OpenWireFormatNegotiator::OpenWireFormatNegotiator( OpenWireFormat* wireFormat,
+                                                    const Pointer<Transport>& next ) :
+    WireFormatNegotiator( next ),
     wireInfoSentDownLatch(1),
     readyCountDownLatch(1)
 {
     this->firstTime.set( true );
-    this->openWireFormat = openWireFormat;
+    this->openWireFormat = wireFormat;
     this->closed = true;
 }
 
@@ -52,7 +52,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void OpenWireFormatNegotiator::oneway( commands::Command* command )
+void OpenWireFormatNegotiator::oneway( const Pointer<Command>& command )
     throw( CommandIOException, UnsupportedOperationException ) {
 
     try{
@@ -81,7 +81,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-commands::Response* OpenWireFormatNegotiator::request( commands::Command* command )
+Pointer<Response> OpenWireFormatNegotiator::request( const Pointer<Command>& command )
     throw( CommandIOException, UnsupportedOperationException ) {
 
     try{
@@ -110,8 +110,9 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-commands::Response* OpenWireFormatNegotiator::request( commands::Command* command, unsigned int timeout )
-    throw( CommandIOException, UnsupportedOperationException ) {
+Pointer<Response> OpenWireFormatNegotiator::request(
+    const Pointer<Command>& command, unsigned int timeout )
+        throw( CommandIOException, UnsupportedOperationException ) {
 
     try{
 
@@ -139,11 +140,11 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void OpenWireFormatNegotiator::onCommand( commands::Command* command ) {
+void OpenWireFormatNegotiator::onCommand( const Pointer<Command>& command ) {
 
     if( command->isWireFormatInfo() ) {
 
-        WireFormatInfo* info = dynamic_cast<WireFormatInfo*>( command );
+        WireFormatInfo* info = dynamic_cast<WireFormatInfo*>( command.get() );
 
         try {
 
@@ -156,7 +157,7 @@
             }
 
             wireInfoSentDownLatch.await( negotiationTimeout );
-            openWireFormat->renegotiateWireFormat( info );
+            openWireFormat->renegotiateWireFormat( *info );
 
             readyCountDownLatch.countDown();
 
@@ -225,7 +226,7 @@
             // the message as it marshaled out to the wire
             Transport* transport = this->next->narrow( typeid( transport::IOTransport ) );
             if( transport == NULL ) {
-                transport = this->next;
+                transport = this->next.get();
             }
 
             // We first send the WireFormat that we'd prefer.

Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.h Thu Feb 19 01:06:07 2009
@@ -26,11 +26,14 @@
 #include <decaf/util/concurrent/CountDownLatch.h>
 #include <decaf/util/concurrent/Concurrent.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
+#include <decaf/lang/Pointer.h>
 
 namespace activemq{
 namespace wireformat{
 namespace openwire{
 
+    using decaf::lang::Pointer;
+
     class AMQCPP_API OpenWireFormatNegotiator : public wireformat::WireFormatNegotiator {
     private:
 
@@ -64,13 +67,12 @@
 
         /**
          * Constructor - Initializes this object around another Transport
-         * @param openWireFormat - The WireFormat object we use to negotiate
+         * @param wireFormat - The WireFormat object we use to negotiate
          * @param next - The next transport in the chain
          * @param own - do we own the Transport pointer.
          */
-        OpenWireFormatNegotiator( OpenWireFormat* openWireFormat,
-                                  transport::Transport* next,
-                                  bool own = true );
+        OpenWireFormatNegotiator( OpenWireFormat* wireFormat,
+                                  const Pointer<transport::Transport>& next );
 
         virtual ~OpenWireFormatNegotiator();
 
@@ -85,7 +87,7 @@
          * @throws UnsupportedOperationException if this method is not implemented
          * by this transport.
          */
-        virtual void oneway( commands::Command* command )
+        virtual void oneway( const Pointer<commands::Command>& command )
             throw( transport::CommandIOException,
                    decaf::lang::exceptions::UnsupportedOperationException );
 
@@ -97,7 +99,7 @@
          * @return the response from the server.
          * @throws CommandIOException if an error occurs with the request.
          */
-        virtual commands::Response* request( commands::Command* command )
+        virtual Pointer<commands::Response> request( const Pointer<commands::Command>& command )
             throw( transport::CommandIOException,
                    decaf::lang::exceptions::UnsupportedOperationException );
 
@@ -110,9 +112,10 @@
          * @return the response from the server.
          * @throws CommandIOException if an error occurs with the request.
          */
-        virtual commands::Response* request( commands::Command* command, unsigned int timeout )
-            throw( transport::CommandIOException,
-                   decaf::lang::exceptions::UnsupportedOperationException );
+        virtual Pointer<commands::Response> request(
+            const Pointer<commands::Command>& command, unsigned int timeout )
+                throw( transport::CommandIOException,
+                       decaf::lang::exceptions::UnsupportedOperationException );
 
         /**
          * This is called in the context of the nested transport's
@@ -122,7 +125,7 @@
          * the command listener.
          * @param command the received from the nested transport.
          */
-        virtual void onCommand( commands::Command* command );
+        virtual void onCommand( const Pointer<commands::Command>& command );
 
         /**
          * Event handler for an exception from a command transport.

Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp Thu Feb 19 01:06:07 2009
@@ -37,58 +37,43 @@
 #include <activemq/commands/WireFormatInfo.h>
 
 using namespace activemq;
+using namespace activemq::commands;
 using namespace activemq::wireformat;
 using namespace activemq::wireformat::openwire;
 using namespace activemq::transport;
 using namespace activemq::transport::mock;
+using namespace decaf;
+using namespace decaf::lang;
 
 ////////////////////////////////////////////////////////////////////////////////
-commands::Response* OpenWireResponseBuilder::buildResponse(
-    const commands::Command* command ){
+Pointer<Response> OpenWireResponseBuilder::buildResponse(
+    const Pointer<Command>& command ){
 
-    if( typeid( *command ) == typeid( commands::ActiveMQBytesMessage ) ||
-        typeid( *command ) == typeid( commands::ActiveMQMapMessage ) ||
-        typeid( *command ) == typeid( commands::ActiveMQMessage ) ||
-        typeid( *command ) == typeid( commands::ActiveMQObjectMessage ) ||
-        typeid( *command ) == typeid( commands::ActiveMQStreamMessage ) ||
-        typeid( *command ) == typeid( commands::ActiveMQTextMessage ) ||
-        typeid( *command ) == typeid( commands::ConnectionInfo ) ||
-        typeid( *command ) == typeid( commands::ConsumerInfo ) ||
-        typeid( *command ) == typeid( commands::DestinationInfo ) ||
-        typeid( *command ) == typeid( commands::ProducerInfo ) ||
-        typeid( *command ) == typeid( commands::RemoveSubscriptionInfo ) ||
-        typeid( *command ) == typeid( commands::RemoveInfo ) ||
-        typeid( *command ) == typeid( commands::SessionInfo ) ) {
+    if( command->isResponseRequired() ) {
 
         // These Commands just require a response that matches their command IDs
-        commands::Response* response = new commands::Response();
+        Pointer<Response> response( new commands::Response() );
         response->setCorrelationId( command->getCommandId() );
         return response;
     }
 
-    // If this command requires a response we don't know what it is
-    // so we throw an exception.
-    if( command->isResponseRequired() ) {
-        throw transport::CommandIOException( __FILE__, __LINE__,
-            "OpenWireResponseBuilder - unrecognized command" );
-    }
-
-    return NULL;
+    return Pointer<Response>();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void OpenWireResponseBuilder::buildIncomingCommands(
-    const commands::Command* command, decaf::util::StlQueue<commands::Command*>& queue ){
+    const Pointer<Command>& command, decaf::util::StlQueue< Pointer<Command> >& queue ){
 
     // Delegate this to buildResponse
     if( command->isResponseRequired() ) {
         queue.push( buildResponse( command ) );
     }
 
-    if( typeid( *command ) == typeid( commands::WireFormatInfo ) ) {
+    if( command->isWireFormatInfo() ) {
 
         // Return a copy of the callers own requested WireFormatInfo
         // so they get exactly the settings they asked for.
-        queue.push( dynamic_cast<commands::Command*>( command->cloneDataStructure() ) );
+        queue.push( Pointer<Command>(
+            dynamic_cast<WireFormatInfo*>( command->cloneDataStructure() ) ) );
     }
 }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h Thu Feb 19 01:06:07 2009
@@ -21,11 +21,14 @@
 #include <activemq/util/Config.h>
 #include <activemq/transport/mock/MockTransport.h>
 #include <decaf/util/StlQueue.h>
+#include <decaf/lang/Pointer.h>
 
 namespace activemq{
 namespace wireformat{
 namespace openwire{
 
+    using decaf::lang::Pointer;
+
     class AMQCPP_API OpenWireResponseBuilder :
         public transport::mock::MockTransport::ResponseBuilder{
     public:
@@ -33,11 +36,12 @@
         OpenWireResponseBuilder() {}
         virtual ~OpenWireResponseBuilder() {}
 
-        virtual commands::Response* buildResponse( const commands::Command* command );
+        virtual Pointer<commands::Response> buildResponse(
+            const Pointer<commands::Command>& command );
 
         virtual void buildIncomingCommands(
-            const commands::Command* command,
-            decaf::util::StlQueue<commands::Command*>& queue );
+            const Pointer<commands::Command>& command,
+            decaf::util::StlQueue< Pointer<commands::Command> >& queue );
 
     };
 

Modified: activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.cpp Thu Feb 19 01:06:07 2009
@@ -21,6 +21,7 @@
 #include <decaf/util/concurrent/Concurrent.h>
 #include <decaf/util/concurrent/Mutex.h>
 #include <decaf/lang/Thread.h>
+#include <decaf/lang/Pointer.h>
 #include <activemq/core/ActiveMQConnectionFactory.h>
 #include <activemq/core/ActiveMQConnection.h>
 #include <activemq/transport/mock/MockTransport.h>
@@ -30,8 +31,10 @@
 
 using namespace activemq;
 using namespace activemq::core;
+using namespace activemq::transport;
 using namespace decaf;
 using namespace decaf::util;
+using namespace decaf::lang;
 
 ////////////////////////////////////////////////////////////////////////////////
 //void ActiveMQConnectionTest::test1WithStomp()
@@ -239,9 +242,8 @@
         MyCommandListener cmdListener;
         MyDispatcher msgListener;
         std::string connectionId = "testConnectionId";
-        decaf::util::Properties* properties =
-            new decaf::util::Properties();
-        transport::Transport* transport = NULL;
+        Pointer<decaf::util::Properties> properties( new decaf::util::Properties() );
+        Pointer<Transport> transport;
 
         // Default to Stomp
         properties->setProperty( "wireFormat", "openwire" );

Modified: activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQSessionTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQSessionTest.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQSessionTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQSessionTest.cpp Thu Feb 19 01:06:07 2009
@@ -553,7 +553,7 @@
     // Send the Message
     CPPUNIT_ASSERT( dTransport != NULL );
 
-    MessageDispatch* dispatch = new MessageDispatch();
+    Pointer<MessageDispatch> dispatch( new MessageDispatch() );
     dispatch->setMessage( msg );
     dispatch->setConsumerId( Pointer<ConsumerId>( id.cloneDataStructure() ) );
 

Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp Thu Feb 19 01:06:07 2009
@@ -72,10 +72,14 @@
 
     virtual bool hasNegotiator() const { return false; }
 
-    virtual wireformat::WireFormatNegotiator* createNegotiator( transport::Transport* transport )
-        throw( decaf::lang::exceptions::UnsupportedOperationException ) { return NULL; }
+    virtual Pointer<Transport> createNegotiator(
+        const Pointer<transport::Transport>& transport )
+            throw( decaf::lang::exceptions::UnsupportedOperationException ) {
 
-    virtual commands::Command* unmarshal( decaf::io::DataInputStream* inputStream )
+        return Pointer<wireformat::WireFormatNegotiator>();
+    }
+
+    virtual Pointer<commands::Command> unmarshal( decaf::io::DataInputStream* inputStream )
         throw ( CommandIOException ){
 
         try{
@@ -87,7 +91,7 @@
 
             synchronized( inputStream ){
 
-                MyCommand* command = new MyCommand();
+                Pointer<MyCommand> command( new MyCommand() );
                 try{
 
                     // Throw a little uncertainty into the test.
@@ -97,16 +101,9 @@
                     command->c = inputStream->readByte();
 
                 } catch( decaf::lang::Exception& ex ){
-
-                    // Free the memory.
-                    delete command;
-
                     ex.setMark( __FILE__, __LINE__ );
                     throw CommandIOException();
                 } catch( ... ) {
-                    // Free the memory.
-                    delete command;
-
                     throw CommandIOException( __FILE__, __LINE__, "Catch all" );
                 }
 
@@ -114,7 +111,7 @@
             }
 
             CPPUNIT_ASSERT( false );
-            return NULL;
+            return Pointer<Command>();
 
         }catch( decaf::lang::Exception& ex ){
             CommandIOException cx;
@@ -128,7 +125,8 @@
         }
     }
 
-    virtual void marshal( commands::Command* command, decaf::io::DataOutputStream* outputStream )
+    virtual void marshal( const Pointer<commands::Command>& command,
+                          decaf::io::DataOutputStream* outputStream )
         throw (CommandIOException)
     {
         try{
@@ -136,7 +134,7 @@
             synchronized( outputStream ){
 
                 const MyCommand* m =
-                    dynamic_cast<const MyCommand*>(command);
+                    dynamic_cast<const MyCommand*>(command.get());
                 outputStream->write( m->c );
             }
 
@@ -170,10 +168,9 @@
     }
 
     std::string str;
-    virtual void onCommand( commands::Command* command ){
-        const MyCommand* cmd = dynamic_cast<const MyCommand*>(command);
+    virtual void onCommand( const Pointer<commands::Command>& command ){
+        const MyCommand* cmd = dynamic_cast<const MyCommand*>(command.get());
         str += cmd->c;
-        delete command;
         latch.countDown();
     }
 
@@ -208,8 +205,8 @@
     decaf::io::DataInputStream input( &is );
     decaf::io::DataOutputStream output( &os );
     MyTransportListener listener;
-    MyWireFormat wireFormat;
-    IOTransport transport( &wireFormat );
+    Pointer<MyWireFormat> wireFormat( new MyWireFormat() );
+    IOTransport transport( wireFormat );
     transport.setTransportListener( &listener );
     transport.setInputStream( &input );
     transport.setOutputStream( &output );
@@ -230,11 +227,11 @@
     decaf::io::DataOutputStream output( &bos );
 
     for( int i = 0; i < 50; ++i ) {
-        MyWireFormat wireFormat;
+        Pointer<MyWireFormat> wireFormat( new MyWireFormat() );
         MyTransportListener listener;
 
         IOTransport transport;
-        transport.setWireFormat( &wireFormat );
+        transport.setWireFormat( wireFormat );
         transport.setTransportListener( &listener );
         transport.setInputStream( &input );
         transport.setOutputStream( &output );
@@ -261,13 +258,13 @@
     decaf::io::DataInputStream input( &is );
     decaf::io::DataOutputStream output( &os );
 
-    MyWireFormat wireFormat;
+    Pointer<MyWireFormat> wireFormat( new MyWireFormat() );
     MyTransportListener listener(10);
     IOTransport transport;
     transport.setInputStream( &input );
     transport.setOutputStream( &output );
     transport.setTransportListener( &listener );
-    transport.setWireFormat( &wireFormat );
+    transport.setWireFormat( wireFormat );
 
     transport.start();
 
@@ -297,27 +294,27 @@
     decaf::io::DataInputStream input( &is );
     decaf::io::DataOutputStream output( &os );
 
-    MyWireFormat wireFormat;
+    Pointer<MyWireFormat> wireFormat( new MyWireFormat() );
     MyTransportListener listener;
     IOTransport transport;
     transport.setInputStream( &input );
     transport.setOutputStream( &output );
     transport.setTransportListener( &listener );
-    transport.setWireFormat( &wireFormat );
+    transport.setWireFormat( wireFormat );
 
     transport.start();
 
-    MyCommand cmd;
-    cmd.c = '1';
-    transport.oneway( &cmd );
-    cmd.c = '2';
-    transport.oneway( &cmd );
-    cmd.c = '3';
-    transport.oneway( &cmd );
-    cmd.c = '4';
-    transport.oneway( &cmd );
-    cmd.c = '5';
-    transport.oneway( &cmd );
+    Pointer<MyCommand> cmd( new MyCommand() );
+    cmd->c = '1';
+    transport.oneway( cmd );
+    cmd->c = '2';
+    transport.oneway( cmd );
+    cmd->c = '3';
+    transport.oneway( cmd );
+    cmd->c = '4';
+    transport.oneway( cmd );
+    cmd->c = '5';
+    transport.oneway( cmd );
 
     const unsigned char* bytes = os.toByteArray();
     std::size_t size = os.size();
@@ -339,14 +336,14 @@
     decaf::io::DataInputStream input( &is );
     decaf::io::DataOutputStream output( &os );
 
-    MyWireFormat wireFormat;
+    Pointer<MyWireFormat> wireFormat( new MyWireFormat() );
     MyTransportListener listener;
     IOTransport transport;
-    wireFormat.throwException = true;
+    wireFormat->throwException = true;
     transport.setInputStream( &input );
     transport.setOutputStream( &output );
     transport.setTransportListener( &listener );
-    transport.setWireFormat( &wireFormat );
+    transport.setWireFormat( wireFormat );
 
     unsigned char buffer[1] = { '1' };
     try{

Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp Thu Feb 19 01:06:07 2009
@@ -17,34 +17,308 @@
 
 #include "ResponseCorrelatorTest.h"
 
+#include <activemq/util/Config.h>
+#include <activemq/commands/BaseCommand.h>
+#include <activemq/transport/DefaultTransportListener.h>
+#include <activemq/transport/correlator/ResponseCorrelator.h>
+#include <decaf/lang/Thread.h>
+#include <decaf/util/concurrent/Concurrent.h>
+#include <decaf/lang/exceptions/UnsupportedOperationException.h>
+#include <queue>
+
 using namespace activemq;
 using namespace activemq::transport;
 using namespace activemq::transport::correlator;
 
 ////////////////////////////////////////////////////////////////////////////////
-void ResponseCorrelatorTest::testBasics(){
+namespace activemq{
+namespace transport{
+namespace correlator{
+
+    class MyCommand : public commands::BaseCommand{
+    private:
+
+        unsigned int commandId;
+
+    public:
+
+        virtual std::string toString() const{ return ""; }
+
+        virtual unsigned char getDataStructureType() const { return 1; }
+
+        virtual decaf::lang::Pointer<commands::Command> visit( activemq::state::CommandVisitor* visitor )
+            throw( exceptions::ActiveMQException ) { return decaf::lang::Pointer<commands::Command>(); }
+
+        virtual MyCommand* cloneDataStructure() const{
+            MyCommand* command = new MyCommand;
+            command->setCommandId( this->getCommandId() );
+            command->setResponseRequired( this->isResponseRequired() );
+            return command;
+        }
+    };
+
+    class MyTransport : public Transport, public decaf::lang::Runnable{
+    public:
+
+        TransportListener* listener;
+        decaf::lang::Thread* thread;
+        decaf::util::concurrent::Mutex mutex;
+        decaf::util::concurrent::Mutex startedMutex;
+        bool done;
+        std::queue< Pointer<commands::Command> > requests;
+
+    public:
+
+        MyTransport(){
+            listener = NULL;
+            thread = NULL;
+            done = false;
+        }
+
+        virtual ~MyTransport(){
+
+            close();
+        }
+
+        virtual void oneway( const Pointer<Command>& command )
+            throw(CommandIOException, decaf::lang::exceptions::UnsupportedOperationException)
+        {
+            synchronized( &mutex ){
+                requests.push( command );
+                mutex.notifyAll();
+            }
+        }
+
+        virtual Pointer<Response> request( const Pointer<Command>& command AMQCPP_UNUSED )
+            throw(CommandIOException, decaf::lang::exceptions::UnsupportedOperationException)
+        {
+            throw decaf::lang::exceptions::UnsupportedOperationException(
+                __FILE__, __LINE__, "stuff" );
+        }
+
+        virtual Pointer<Response> request( const Pointer<Command>& command AMQCPP_UNUSED,
+                                             unsigned int timeout AMQCPP_UNUSED )
+            throw(CommandIOException, decaf::lang::exceptions::UnsupportedOperationException)
+        {
+            throw decaf::lang::exceptions::UnsupportedOperationException(
+                __FILE__, __LINE__, "stuff" );
+        }
+
+        virtual void setWireFormat(
+            const Pointer<wireformat::WireFormat>& wireFormat AMQCPP_UNUSED ) {}
+
+        virtual void setTransportListener( TransportListener* listener ) {
+            this->listener = listener;
+        }
+
+        virtual void start() throw( cms::CMSException ){
+            close();
+
+            done = false;
+            thread = new decaf::lang::Thread( this );
+            thread->start();
+        }
+
+        virtual void close() throw( cms::CMSException ){
+
+            done = true;
+
+            if( thread != NULL ){
+                synchronized( &mutex ){
+                    mutex.notifyAll();
+                }
+                thread->join();
+                delete thread;
+                thread = NULL;
+            }
+        }
+
+        virtual Pointer<Response> createResponse( const Pointer<Command>& command ){
+
+            Pointer<Response> resp( new commands::Response() );
+            resp->setCorrelationId( command->getCommandId() );
+            resp->setResponseRequired( false );
+            return resp;
+        }
+
+        virtual void run(){
+
+            try{
+
+                synchronized(&startedMutex)
+                {
+                   startedMutex.notifyAll();
+                }
+
+                synchronized( &mutex ){
+
+                    while( !done ){
+
+                        if( requests.empty() ){
+                            mutex.wait();
+                        }else{
+
+                            Pointer<Command> cmd = requests.front();
+                            requests.pop();
+
+                            // Only send a response if one is required.
+                            Pointer<Response> resp;
+                            if( cmd->isResponseRequired() ){
+                                resp = createResponse( cmd );
+                            }
+
+                            mutex.unlock();
+
+                            // Send both the response and the original
+                            // command back to the correlator.
+                            if( listener != NULL ){
+                                if( resp != NULL ){
+                                    listener->onCommand( resp );
+                                }
+                                listener->onCommand( cmd );
+                            }
+
+                            mutex.lock();
+                        }
+                    }
+                }
+            }catch( exceptions::ActiveMQException& ex ){
+                if( listener ){
+                    listener->onTransportException( this, ex );
+                }
+            }
+            catch( ... ){
+                if( listener ){
+                    exceptions::ActiveMQException ex( __FILE__, __LINE__, "stuff" );
+                    listener->onTransportException( this, ex );
+                }
+            }
+        }
+
+        virtual Transport* narrow( const std::type_info& typeId ) {
+            if( typeid( *this ) == typeId ) {
+                return this;
+            }
+
+            return NULL;
+        }
+
+        virtual bool isFaultTolerant() const {
+            return true;
+        }
+
+        virtual bool isConnected() const {
+            return false;
+        }
+
+        virtual bool isClosed() const {
+            return false;
+        }
+
+    };
+
+    class MyBrokenTransport : public MyTransport{
+    public:
+
+        MyBrokenTransport(){}
+        virtual ~MyBrokenTransport(){}
+
+        virtual commands::Response* createResponse(commands:: Command* command AMQCPP_UNUSED){
+            throw exceptions::ActiveMQException( __FILE__, __LINE__,
+                "bad stuff" );
+        }
+    };
+
+    class MyListener : public DefaultTransportListener {
+    public:
+
+        int exCount;
+        std::set<int> commands;
+        decaf::util::concurrent::Mutex mutex;
+
+    public:
+
+        MyListener(){
+            exCount = 0;
+        }
+        virtual ~MyListener(){}
+        virtual void onCommand( commands::Command* command ){
+
+            synchronized( &mutex ){
+                commands.insert( command->getCommandId() );
+
+                mutex.notify();
+            }
+        }
+
+        virtual void onTransportException(
+            Transport* source AMQCPP_UNUSED,
+            const decaf::lang::Exception& ex AMQCPP_UNUSED)
+        {
+            synchronized( &mutex ){
+                exCount++;
+            }
+        }
+
+    };
+
+    class RequestThread : public decaf::lang::Thread{
+    public:
+
+        Transport* transport;
+        Pointer<MyCommand> cmd;
+        Pointer<Response> resp;
+
+    public:
+
+        RequestThread(){
+            transport = NULL;
+            cmd.reset( new MyCommand() );
+        }
+
+        virtual ~RequestThread(){
+            join();
+        }
+
+        void setTransport( Transport* transport ){
+            this->transport = transport;
+        }
+
+        void run(){
+
+            try{
+                resp = transport->request(cmd);
+            }catch( ... ){
+                CPPUNIT_ASSERT( false );
+            }
+        }
+    };
+
+}}}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelatorTest::testBasics() {
 
     try{
 
         MyListener listener;
-        MyTransport transport;
-        ResponseCorrelator correlator( &transport, false );
+        Pointer<MyTransport> transport( new MyTransport() );
+        ResponseCorrelator correlator( transport );
         correlator.setTransportListener( &listener );
-        CPPUNIT_ASSERT( transport.listener == &correlator );
+        CPPUNIT_ASSERT( transport->listener == &correlator );
 
         // Give the thread a little time to get up and running.
-        synchronized(&transport.startedMutex)
-        {
+        synchronized( &(transport->startedMutex) ) {
             // Start the transport.
             correlator.start();
-            transport.startedMutex.wait();
+            transport->startedMutex.wait();
         }
 
         // Send one request.
-        MyCommand cmd;
-        commands::Response* resp = correlator.request( &cmd );
+        Pointer<MyCommand> cmd( new MyCommand );
+        Pointer<Response> resp = correlator.request( cmd );
         CPPUNIT_ASSERT( resp != NULL );
-        CPPUNIT_ASSERT( resp->getCorrelationId() == cmd.getCommandId() );
+        CPPUNIT_ASSERT( resp->getCorrelationId() == cmd->getCommandId() );
 
         // Wait to get the message back asynchronously.
         decaf::lang::Thread::sleep( 100 );
@@ -56,9 +330,6 @@
         CPPUNIT_ASSERT( listener.exCount == 0 );
 
         correlator.close();
-
-        // Destroy the response.
-        delete resp;
     }
     AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
     AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
@@ -70,25 +341,25 @@
     try{
 
         MyListener listener;
-        MyTransport transport;
-        ResponseCorrelator correlator( &transport, false );
+        Pointer<MyTransport> transport( new MyTransport() );
+        ResponseCorrelator correlator( transport );
         correlator.setTransportListener( &listener );
-        CPPUNIT_ASSERT( transport.listener == &correlator );
+        CPPUNIT_ASSERT( transport->listener == &correlator );
 
         // Give the thread a little time to get up and running.
-        synchronized(&transport.startedMutex)
-        {
+        synchronized( &(transport->startedMutex) ) {
+
             // Start the transport.
             correlator.start();
 
-            transport.startedMutex.wait();
+            transport->startedMutex.wait();
         }
 
         // Send many oneway request (we'll get them back asynchronously).
         const unsigned int numCommands = 1000;
-        MyCommand commands[numCommands];
-        for( unsigned int ix=0; ix<numCommands; ix++ ){
-            correlator.oneway( &commands[ix] );
+        for( unsigned int ix = 0; ix < numCommands; ++ix ) {
+            Pointer<MyCommand> command( new MyCommand() );
+            correlator.oneway( command );
         }
 
         // Give the thread a little time to get all the messages back.
@@ -110,24 +381,23 @@
     try{
 
         MyListener listener;
-        MyBrokenTransport transport;
-        ResponseCorrelator correlator( &transport, false );
+        Pointer<MyBrokenTransport> transport( new MyBrokenTransport() );
+        ResponseCorrelator correlator( transport );
         correlator.setTransportListener( &listener );
-        CPPUNIT_ASSERT( transport.listener == &correlator );
+        CPPUNIT_ASSERT( transport->listener == &correlator );
 
         // Give the thread a little time to get up and running.
-        synchronized(&transport.startedMutex)
-        {
+        synchronized( &(transport->startedMutex) ) {
             // Start the transport.
             correlator.start();
 
-            transport.startedMutex.wait();
+            transport->startedMutex.wait();
         }
 
         // Send one request.
-        MyCommand cmd;
+        Pointer<MyCommand> cmd( new MyCommand );
         try{
-            correlator.request( &cmd, 500 );
+            correlator.request( cmd, 500 );
             CPPUNIT_ASSERT(false);
         }catch( CommandIOException& ex ){
             // Expected.
@@ -154,21 +424,20 @@
     try{
 
         MyListener listener;
-        MyTransport transport;
-        ResponseCorrelator correlator( &transport, false );
+        Pointer<MyTransport> transport( new MyTransport() );
+        ResponseCorrelator correlator( transport );
         correlator.setTransportListener( &listener );
-        CPPUNIT_ASSERT( transport.listener == &correlator );
+        CPPUNIT_ASSERT( transport->listener == &correlator );
 
         // Start the transport.
         correlator.start();
 
         // Make sure the start command got down to the thread.
-        CPPUNIT_ASSERT( transport.thread != NULL );
+        CPPUNIT_ASSERT( transport->thread != NULL );
 
         // Give the thread a little time to get up and running.
-        synchronized(&transport.startedMutex)
-        {
-            transport.startedMutex.wait(500);
+        synchronized( &(transport->startedMutex) ) {
+            transport->startedMutex.wait(500);
         }
 
         // Start all the requester threads.
@@ -184,7 +453,8 @@
         for( unsigned int ix=0; ix<numRequests; ++ix ){
             requesters[ix].join();
             CPPUNIT_ASSERT( requesters[ix].resp != NULL );
-            CPPUNIT_ASSERT( requesters[ix].cmd.getCommandId() == requesters[ix].resp->getCorrelationId() );
+            CPPUNIT_ASSERT( requesters[ix].cmd->getCommandId() ==
+                            requesters[ix].resp->getCorrelationId() );
         }
 
         decaf::lang::Thread::sleep( 60 );
@@ -219,17 +489,17 @@
 ////////////////////////////////////////////////////////////////////////////////
 void ResponseCorrelatorTest::testNarrow(){
 
-    MyTransport transport;
-    ResponseCorrelator correlator( &transport, false );
+    Pointer<MyTransport> transport( new MyTransport() );
+    ResponseCorrelator correlator( transport );
 
     Transport* narrowed = correlator.narrow( typeid( transport ) );
-    CPPUNIT_ASSERT( narrowed == &transport );
+    CPPUNIT_ASSERT( narrowed == transport );
 
     narrowed = correlator.narrow( typeid( std::string() ) );
     CPPUNIT_ASSERT( narrowed == NULL );
 
     narrowed = correlator.narrow( typeid( MyTransport ) );
-    CPPUNIT_ASSERT( narrowed == &transport );
+    CPPUNIT_ASSERT( narrowed == transport );
 
     narrowed = correlator.narrow( typeid( transport::correlator::ResponseCorrelator ) );
     CPPUNIT_ASSERT( narrowed == &correlator );

Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.h (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.h Thu Feb 19 01:06:07 2009
@@ -21,15 +21,6 @@
 #include <cppunit/TestFixture.h>
 #include <cppunit/extensions/HelperMacros.h>
 
-#include <activemq/util/Config.h>
-#include <activemq/commands/BaseCommand.h>
-#include <activemq/transport/DefaultTransportListener.h>
-#include <activemq/transport/correlator/ResponseCorrelator.h>
-#include <decaf/lang/Thread.h>
-#include <decaf/util/concurrent/Concurrent.h>
-#include <decaf/lang/exceptions/UnsupportedOperationException.h>
-#include <queue>
-
 namespace activemq{
 namespace transport{
 namespace correlator{
@@ -46,294 +37,6 @@
 
     public:
 
-        class MyCommand : public commands::BaseCommand{
-        private:
-
-            unsigned int commandId;
-
-        public:
-
-            virtual std::string toString() const{ return ""; }
-
-            virtual unsigned char getDataStructureType() const { return 1; }
-
-            virtual decaf::lang::Pointer<commands::Command> visit( activemq::state::CommandVisitor* visitor )
-                throw( exceptions::ActiveMQException ) { return decaf::lang::Pointer<commands::Command>(); }
-
-            virtual MyCommand* cloneDataStructure() const{
-                MyCommand* command = new MyCommand;
-                command->setCommandId( this->getCommandId() );
-                command->setResponseRequired( this->isResponseRequired() );
-                return command;
-            }
-        };
-
-        class MyTransport
-        :
-            public Transport,
-            public decaf::lang::Runnable{
-        public:
-            TransportListener* listener;
-            decaf::lang::Thread* thread;
-            decaf::util::concurrent::Mutex mutex;
-            decaf::util::concurrent::Mutex startedMutex;
-            bool done;
-            std::queue<commands::Command*> requests;
-
-        public:
-
-            MyTransport(){
-                listener = NULL;
-                thread = NULL;
-                done = false;
-            }
-
-            virtual ~MyTransport(){
-
-                close();
-            }
-
-            virtual void oneway( commands::Command* command )
-                throw(CommandIOException, decaf::lang::exceptions::UnsupportedOperationException)
-            {
-                synchronized( &mutex ){
-                    requests.push( command );
-                    mutex.notifyAll();
-                }
-            }
-
-            virtual commands::Response* request( commands::Command* command AMQCPP_UNUSED)
-                throw(CommandIOException, decaf::lang::exceptions::UnsupportedOperationException)
-            {
-                throw decaf::lang::exceptions::UnsupportedOperationException(
-                    __FILE__,
-                    __LINE__,
-                    "stuff" );
-            }
-
-            virtual commands::Response* request( commands::Command* command AMQCPP_UNUSED,
-                                                 unsigned int timeout AMQCPP_UNUSED )
-                throw(CommandIOException, decaf::lang::exceptions::UnsupportedOperationException)
-            {
-                throw decaf::lang::exceptions::UnsupportedOperationException(
-                    __FILE__,
-                    __LINE__,
-                    "stuff" );
-            }
-
-            virtual void setWireFormat( wireformat::WireFormat* wireFormat ) {
-
-            }
-
-            virtual void setTransportListener( TransportListener* listener ) {
-                this->listener = listener;
-            }
-
-            virtual void start() throw( cms::CMSException ){
-                close();
-
-                done = false;
-
-                thread = new decaf::lang::Thread( this );
-                thread->start();
-            }
-
-            virtual void close() throw( cms::CMSException ){
-
-                done = true;
-
-                if( thread != NULL ){
-                    synchronized( &mutex ){
-                        mutex.notifyAll();
-                    }
-                    thread->join();
-                    delete thread;
-                    thread = NULL;
-                }
-            }
-
-            virtual commands::Response* createResponse( commands::Command* command ){
-
-                commands::Response* resp = new commands::Response();
-                resp->setCorrelationId( command->getCommandId() );
-                resp->setResponseRequired( false );
-                return resp;
-            }
-
-            virtual void run(){
-
-                try{
-
-                    synchronized(&startedMutex)
-                    {
-                       startedMutex.notifyAll();
-                    }
-
-                    synchronized( &mutex ){
-
-                        while( !done ){
-
-                            if( requests.empty() ){
-                                mutex.wait();
-                            }else{
-
-                                commands::Command* cmd = requests.front();
-                                requests.pop();
-
-                                // Only send a response if one is required.
-                                commands::Response* resp = NULL;
-                                if( cmd->isResponseRequired() ){
-                                    resp = createResponse( cmd );
-                                }
-
-                                mutex.unlock();
-
-                                // Send both the response and the original
-                                // command back to the correlator.
-                                if( listener != NULL ){
-                                    if( resp != NULL ){
-                                        listener->onCommand( resp );
-                                    }
-                                    listener->onCommand( cmd );
-                                }
-
-                                mutex.lock();
-                            }
-                        }
-                    }
-                }catch( exceptions::ActiveMQException& ex ){
-                    if( listener ){
-                        listener->onTransportException( this, ex );
-                    }
-                }
-                catch( ... ){
-                    if( listener ){
-                        exceptions::ActiveMQException ex( __FILE__, __LINE__, "stuff" );
-                        listener->onTransportException( this, ex );
-                    }
-                }
-            }
-
-            virtual Transport* narrow( const std::type_info& typeId ) {
-                if( typeid( *this ) == typeId ) {
-                    return this;
-                }
-
-                return NULL;
-            }
-
-            /**
-             * Is this Transport fault tolerant, meaning that it will reconnect to
-             * a broker on disconnect.
-             *
-             * @returns true if the Transport is fault tolerant.
-             */
-            virtual bool isFaultTolerant() const {
-                return true;
-            }
-
-            /**
-             * Is the Transport Connected to its Broker.
-             *
-             * @returns true if a connection has been made.
-             */
-            virtual bool isConnected() const {
-                return false;
-            }
-
-            /**
-             * Has the Transport been shutdown and no longer usable.
-             *
-             * @returns true if the Transport
-             */
-            virtual bool isClosed() const {
-                return false;
-            }
-
-        };
-
-        class MyBrokenTransport : public MyTransport{
-        public:
-
-            MyBrokenTransport(){}
-            virtual ~MyBrokenTransport(){}
-
-            virtual commands::Response* createResponse(commands:: Command* command AMQCPP_UNUSED){
-                throw exceptions::ActiveMQException( __FILE__, __LINE__,
-                    "bad stuff" );
-            }
-        };
-
-        class MyListener : public DefaultTransportListener {
-        public:
-
-            int exCount;
-            std::set<int> commands;
-            decaf::util::concurrent::Mutex mutex;
-
-        public:
-
-            MyListener(){
-                exCount = 0;
-            }
-            virtual ~MyListener(){}
-            virtual void onCommand( commands::Command* command ){
-
-                synchronized( &mutex ){
-                    commands.insert( command->getCommandId() );
-
-                    mutex.notify();
-                }
-            }
-
-            virtual void onTransportException(
-                Transport* source AMQCPP_UNUSED,
-                const decaf::lang::Exception& ex AMQCPP_UNUSED)
-            {
-                synchronized( &mutex ){
-                    exCount++;
-                }
-            }
-
-        };
-
-        class RequestThread : public decaf::lang::Thread{
-        public:
-
-            Transport* transport;
-            MyCommand cmd;
-            commands::Response* resp;
-        public:
-
-            RequestThread(){
-                transport = NULL;
-                resp = NULL;
-            }
-            virtual ~RequestThread(){
-                join();
-
-                if( resp != NULL ){
-                    delete resp;
-                    resp = NULL;
-                }
-            }
-
-            void setTransport( Transport* transport ){
-                this->transport = transport;
-            }
-
-            void run(){
-
-                try{
-                    resp = transport->request(&cmd);
-                }catch( ... ){
-                    CPPUNIT_ASSERT( false );
-                }
-            }
-        };
-
-    public:
-
         virtual ~ResponseCorrelatorTest(){}
 
         void testBasics();

Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/mock/MockTransportFactoryTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/mock/MockTransportFactoryTest.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/mock/MockTransportFactoryTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/mock/MockTransportFactoryTest.cpp Thu Feb 19 01:06:07 2009
@@ -38,11 +38,11 @@
 
     MockTransportFactory factory;
 
-    auto_ptr<Transport> transport( factory.create( uri ) );
+    Pointer<Transport> transport( factory.create( uri ) );
 
     CPPUNIT_ASSERT( transport.get() != NULL );
 
-    transport.reset( factory.createComposite( uri ) );
+    transport = factory.createComposite( uri );
 
     CPPUNIT_ASSERT( transport.get() != NULL );