You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/03/12 16:27:07 UTC

svn commit: r752895 - in /activemq/activemq-cpp/trunk/src: main/activemq/core/ main/activemq/state/ main/activemq/transport/ main/activemq/transport/correlator/ main/activemq/transport/failover/ main/activemq/transport/mock/ main/activemq/transport/tcp...

Author: tabish
Date: Thu Mar 12 15:27:05 2009
New Revision: 752895

URL: http://svn.apache.org/viewvc?rev=752895&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQCPP-100

Fixes a memory leak and corrects the way in which transports are created when inside the Failover Transport.  Cleans up the Pointer class some more and adds some more test cases.  

Modified:
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/ResponseCorrelator.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportFactory.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportFactory.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.h
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp
    activemq/activemq-cpp/trunk/src/main/decaf/lang/Pointer.h
    activemq/activemq-cpp/trunk/src/test/decaf/lang/PointerTest.cpp
    activemq/activemq-cpp/trunk/src/test/decaf/lang/PointerTest.h

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp?rev=752895&r1=752894&r2=752895&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp Thu Mar 12 15:27:05 2009
@@ -488,10 +488,10 @@
 
         } else if( command->isWireFormatInfo() ) {
             this->brokerWireFormatInfo =
-                command.dynamicCast<WireFormatInfo, Pointer<WireFormatInfo>::CounterType>();
+                command.dynamicCast<WireFormatInfo>();
         } else if( command->isBrokerInfo() ) {
             this->brokerInfo =
-                command.dynamicCast<BrokerInfo, Pointer<BrokerInfo>::CounterType>();
+                command.dynamicCast<BrokerInfo>();
         } else if( command->isKeepAliveInfo() ) {
 
             if( command->isResponseRequired() ) {

Modified: activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.cpp?rev=752895&r1=752894&r2=752895&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.cpp Thu Mar 12 15:27:05 2009
@@ -85,7 +85,7 @@
         if( result == NULL ) {
             return Pointer<Tracked>();
         } else {
-            return result.dynamicCast<Tracked, Pointer<Tracked>::CounterType >();
+            return result.dynamicCast<Tracked>();
         }
     }
     AMQ_CATCH_RETHROW( IOException )
@@ -99,7 +99,7 @@
     try{
         if( trackMessages && command != NULL && command->isMessage() ) {
             Pointer<Message> message =
-                command.dynamicCast<Message, Pointer<Message>::CounterType>();
+                command.dynamicCast<Message>();
             if( message->getTransactionId() == NULL ) {
                 currentCacheSize = currentCacheSize + message->getSize();
             }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.cpp?rev=752895&r1=752894&r2=752895&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.cpp Thu Mar 12 15:27:05 2009
@@ -22,13 +22,11 @@
 #include <activemq/wireformat/WireFormat.h>
 #include <activemq/wireformat/WireFormatRegistry.h>
 #include <activemq/util/URISupport.h>
-#include <activemq/transport/correlator/ResponseCorrelator.h>
 #include <activemq/transport/logging/LoggingTransport.h>
 
 using namespace std;
 using namespace activemq;
 using namespace activemq::transport;
-using namespace activemq::transport::correlator;
 using namespace activemq::transport::logging;
 using namespace activemq::wireformat;
 using namespace activemq::exceptions;
@@ -39,60 +37,6 @@
 using namespace decaf::util;
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Transport> AbstractTransportFactory::create( const decaf::net::URI& location )
-    throw ( exceptions::ActiveMQException ) {
-
-    try{
-
-        Properties properties =
-            activemq::util::URISupport::parseQuery( location.getQuery() );
-
-        Pointer<WireFormat> wireFormat = this->createWireFormat( properties );
-
-        // Create the initial Transport, then wrap it in the normal Filters
-        Pointer<Transport> transport( doCreateComposite( location, wireFormat, properties ) );
-
-        // Create the Transport for response correlator
-        transport.reset( new ResponseCorrelator( transport ) );
-
-        // If command tracing was enabled, wrap the transport with a logging transport.
-        if( properties.getProperty( "transport.commandTracingEnabled", "false" ) == "true" ) {
-            // Create the Transport for response correlator
-            transport.reset( new LoggingTransport( transport ) );
-        }
-
-        // If there is a negotiator need then we create and wrap here.
-        if( wireFormat->hasNegotiator() ) {
-            transport = wireFormat->createNegotiator( transport );
-        }
-
-        return transport;
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-Pointer<Transport> AbstractTransportFactory::createComposite( const decaf::net::URI& location )
-    throw ( exceptions::ActiveMQException ) {
-
-    try{
-
-        Properties properties =
-            activemq::util::URISupport::parseQuery( location.getQuery() );
-
-        Pointer<WireFormat> wireFormat = this->createWireFormat( properties );
-
-        // Create the initial Transport, then wrap it in the normal Filters
-        return doCreateComposite( location, wireFormat, properties );
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
 Pointer<WireFormat> AbstractTransportFactory::createWireFormat(
     const decaf::util::Properties& properties )
         throw( decaf::lang::exceptions::NoSuchElementException ) {

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.h?rev=752895&r1=752894&r2=752895&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.h Thu Mar 12 15:27:05 2009
@@ -43,42 +43,9 @@
 
         virtual ~AbstractTransportFactory() {}
 
-        /**
-         * Creates a fully configured Transport instance which could be a chain
-         * of filters and transports.
-         * @param location - URI location to connect to plus any properties to assign.
-         * @throws ActiveMQexception if an error occurs
-         */
-        virtual Pointer<Transport> create( const decaf::net::URI& location )
-            throw ( exceptions::ActiveMQException );
-
-        /**
-         * Creates a slimed down Transport instance which can be used in composite
-         * transport instances.
-         * @param location - URI location to connect to plus any properties to assign.
-         * @throws ActiveMQexception if an error occurs
-         */
-        virtual Pointer<Transport> createComposite( const decaf::net::URI& location )
-            throw ( exceptions::ActiveMQException );
-
     protected:
 
         /**
-         * Creates a slimed down Transport instance which can be used in composite
-         * transport instances or as the basis for a fully wrapped Transport.  This
-         * method must be implemented by the actual TransportFactory that extends this
-         * abstract base class.
-         * @param location - URI location to connect to.
-         * @param wireformat - the assigned WireFormat for the new Transport.
-         * @param properties - Properties to apply to the transport.
-         * @throws ActiveMQexception if an error occurs
-         */
-        virtual Pointer<Transport> doCreateComposite( const decaf::net::URI& location,
-                                                      const Pointer<wireformat::WireFormat>& wireFormat,
-                                                      const decaf::util::Properties& properties )
-            throw ( exceptions::ActiveMQException ) = 0;
-
-        /**
          * Creates the WireFormat that is configured for this Transport and returns it.
          * The default WireFormat is Openwire.
          *

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/ResponseCorrelator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/ResponseCorrelator.cpp?rev=752895&r1=752894&r2=752895&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/ResponseCorrelator.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/ResponseCorrelator.cpp Thu Mar 12 15:27:05 2009
@@ -176,7 +176,7 @@
     }
 
     Pointer<Response> response =
-        command.dynamicCast< Response, Pointer<Response>::CounterType >();
+        command.dynamicCast< Response >();
 
     // It is a response - let's correlate ...
     synchronized( &mapMutex ){

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=752895&r1=752894&r2=752895&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp Thu Mar 12 15:27:05 2009
@@ -40,9 +40,8 @@
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-FailoverTransport::FailoverTransport( const Pointer<wireformat::WireFormat>& wireFormat ) {
+FailoverTransport::FailoverTransport() {
 
-    this->wireFormat = wireFormat;
     this->timeout = -1;
     this->initialReconnectDelay = 10;
     this->maxReconnectDelay = 1000 * 30;
@@ -87,8 +86,7 @@
         }
 
         try{
-            Pointer<RemoveInfo> remove =
-                command.dynamicCast<RemoveInfo, Pointer<RemoveInfo>::CounterType >();
+            Pointer<RemoveInfo> remove = command.dynamicCast<RemoveInfo>();
 
             return true;
         } AMQ_CATCHALL_NOTHROW()
@@ -590,8 +588,7 @@
 
                 Pointer<IOException> ioException;
                 try{
-                    ioException = connectionFailure.dynamicCast<
-                        IOException, Pointer<IOException>::CounterType >();
+                    ioException = connectionFailure.dynamicCast<IOException>();
                 }
                 AMQ_CATCH_NOTHROW( ClassCastException )
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h?rev=752895&r1=752894&r2=752895&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h Thu Mar 12 15:27:05 2009
@@ -84,7 +84,6 @@
         decaf::util::concurrent::Mutex listenerMutex;
         decaf::util::StlMap<int, Pointer<Command> > requestMap;
 
-        Pointer<wireformat::WireFormat> wireFormat;
         Pointer<URI> connectedTransportURI;
         Pointer<URI> failedConnectTransportURI;
         Pointer<Transport> connectedTransport;
@@ -97,8 +96,7 @@
 
     public:
 
-        FailoverTransport( const Pointer<wireformat::WireFormat>& wireFormat );
-
+        FailoverTransport();
         virtual ~FailoverTransport();
 
         /**
@@ -202,7 +200,6 @@
          * @param WireFormat the object used to encode / decode commands.
          */
         virtual void setWireFormat( const Pointer<wireformat::WireFormat>& wireFormat ) {
-            this->wireFormat = wireFormat;
         }
 
         /**

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportFactory.cpp?rev=752895&r1=752894&r2=752895&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportFactory.cpp Thu Mar 12 15:27:05 2009
@@ -18,6 +18,7 @@
 #include "FailoverTransportFactory.h"
 
 #include <activemq/transport/failover/FailoverTransport.h>
+#include <activemq/transport/correlator/ResponseCorrelator.h>
 #include <activemq/util/CompositeData.h>
 #include <activemq/util/URISupport.h>
 
@@ -30,22 +31,61 @@
 using namespace activemq::util;
 using namespace activemq::transport;
 using namespace activemq::transport::failover;
+using namespace activemq::transport::correlator;
 using namespace activemq::exceptions;
 using namespace decaf;
 using namespace decaf::util;
 using namespace decaf::lang;
 
 ////////////////////////////////////////////////////////////////////////////////
+Pointer<Transport> FailoverTransportFactory::create( const decaf::net::URI& location )
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+
+        Properties properties =
+            activemq::util::URISupport::parseQuery( location.getQuery() );
+
+        // Create the initial Transport, then wrap it in the normal Filters
+        Pointer<Transport> transport( doCreateComposite( location, properties ) );
+
+        // Create the Transport for response correlator
+        transport.reset( new ResponseCorrelator( transport ) );
+
+        return transport;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Transport> FailoverTransportFactory::createComposite( const decaf::net::URI& location )
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+
+        Properties properties =
+            activemq::util::URISupport::parseQuery( location.getQuery() );
+
+        // Create the initial Transport, then wrap it in the normal Filters
+        return doCreateComposite( location, properties );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
 Pointer<Transport> FailoverTransportFactory::doCreateComposite(
     const decaf::net::URI& location,
-    const Pointer<wireformat::WireFormat>& wireFormat,
     const decaf::util::Properties& properties )
         throw ( exceptions::ActiveMQException ) {
 
     try {
 
         CompositeData data = URISupport::parseComposite( location );
-        Pointer<FailoverTransport> transport( new FailoverTransport( wireFormat ) );
+        Pointer<FailoverTransport> transport( new FailoverTransport() );
 
         transport->setInitialReconnectDelay(
             Long::parseLong( properties.getProperty( "initialReconnectDelay", "10" ) ) );

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportFactory.h?rev=752895&r1=752894&r2=752895&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportFactory.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportFactory.h Thu Mar 12 15:27:05 2009
@@ -44,18 +44,36 @@
         virtual ~FailoverTransportFactory() {}
 
         /**
+         * Creates a fully configured Transport instance which could be a chain
+         * of filters and transports.
+         * @param location - URI location to connect to plus any properties to assign.
+         * @throws ActiveMQexception if an error occurs
+         */
+        virtual Pointer<Transport> create( const decaf::net::URI& location )
+            throw ( exceptions::ActiveMQException );
+
+        /**
+         * Creates a slimed down Transport instance which can be used in composite
+         * transport instances.
+         * @param location - URI location to connect to plus any properties to assign.
+         * @throws ActiveMQexception if an error occurs
+         */
+        virtual Pointer<Transport> createComposite( const decaf::net::URI& location )
+            throw ( exceptions::ActiveMQException );
+
+    protected:
+
+        /**
          * Creates a slimed down Transport instance which can be used in composite
          * transport instances.
          *
          * @param location - URI location to connect to.
-         * @param wireformat - the assigned WireFormat for the new Transport.
          * @param properties - Properties to apply to the transport.
          *
          * @return Pointer to a new FailoverTransport instance.
          * @throws ActiveMQexception if an error occurs
          */
         virtual Pointer<Transport> doCreateComposite( const decaf::net::URI& location,
-                                                      const Pointer<wireformat::WireFormat>& wireFormat,
                                                       const decaf::util::Properties& properties )
             throw ( exceptions::ActiveMQException );
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.cpp?rev=752895&r1=752894&r2=752895&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.cpp Thu Mar 12 15:27:05 2009
@@ -55,7 +55,7 @@
     if( command->isResponse() ) {
 
         Pointer<Response> response =
-            command.dynamicCast<Response, Pointer<Response>::CounterType >();
+            command.dynamicCast<Response>();
         Pointer<Command> object;
 
         synchronized( &( parent->requestMap ) ) {
@@ -64,8 +64,7 @@
 
         if( object != NULL ) {
             try{
-                Pointer<Tracked> tracked =
-                    object.dynamicCast<Tracked, Pointer<Tracked>::CounterType >();
+                Pointer<Tracked> tracked = object.dynamicCast<Tracked>();
                 tracked->onResponse();
             }
             AMQ_CATCH_NOTHROW( ClassCastException )
@@ -74,8 +73,7 @@
 
     if( !parent->initialized && command->isBrokerInfo() ) {
 
-        Pointer<BrokerInfo> info =
-            command.dynamicCast<BrokerInfo, Pointer<BrokerInfo>::CounterType >();
+        Pointer<BrokerInfo> info = command.dynamicCast<BrokerInfo>();
         std::vector< Pointer<BrokerInfo> >& peers = info->getPeerBrokerInfos();
         for( std::size_t i = 0; i < peers.size(); ++i ) {
             std::string brokerString = peers[i]->getBrokerURL();

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.cpp?rev=752895&r1=752894&r2=752895&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.cpp Thu Mar 12 15:27:05 2009
@@ -16,20 +16,76 @@
  */
 
 #include <activemq/transport/mock/MockTransportFactory.h>
+#include <activemq/transport/correlator/ResponseCorrelator.h>
+#include <activemq/transport/logging/LoggingTransport.h>
 //#include <activemq/wireformat/stomp/StompResponseBuilder.h>
 #include <activemq/wireformat/openwire/OpenWireResponseBuilder.h>
 #include <activemq/transport/Transport.h>
 #include <activemq/transport/mock/MockTransport.h>
+#include <activemq/util/URISupport.h>
 
 using namespace activemq;
+using namespace activemq::util;
+using namespace activemq::wireformat;
 using namespace activemq::transport;
 using namespace activemq::transport::mock;
+using namespace activemq::transport::correlator;
+using namespace activemq::transport::logging;
 using namespace activemq::exceptions;
 using namespace decaf;
 using namespace decaf::util;
 using namespace decaf::lang;
 
 ////////////////////////////////////////////////////////////////////////////////
+Pointer<Transport> MockTransportFactory::create( const decaf::net::URI& location )
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+
+        Properties properties =
+            activemq::util::URISupport::parseQuery( location.getQuery() );
+
+        Pointer<WireFormat> wireFormat = this->createWireFormat( properties );
+
+        // Create the initial Transport, then wrap it in the normal Filters
+        Pointer<Transport> transport( doCreateComposite( location, wireFormat, properties ) );
+
+        // Create the Transport for response correlator
+        transport.reset( new ResponseCorrelator( transport ) );
+
+        // If command tracing was enabled, wrap the transport with a logging transport.
+        if( properties.getProperty( "transport.commandTracingEnabled", "false" ) == "true" ) {
+            // Create the Transport for response correlator
+            transport.reset( new LoggingTransport( transport ) );
+        }
+
+        return transport;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Transport> MockTransportFactory::createComposite( const decaf::net::URI& location )
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+
+        Properties properties =
+            activemq::util::URISupport::parseQuery( location.getQuery() );
+
+        Pointer<WireFormat> wireFormat = this->createWireFormat( properties );
+
+        // Create the initial Transport, then wrap it in the normal Filters
+        return doCreateComposite( location, wireFormat, properties );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
 Pointer<Transport> MockTransportFactory::doCreateComposite(
     const decaf::net::URI& location AMQCPP_UNUSED,
     const Pointer<wireformat::WireFormat>& wireFormat,

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.h?rev=752895&r1=752894&r2=752895&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.h Thu Mar 12 15:27:05 2009
@@ -36,6 +36,24 @@
 
         virtual ~MockTransportFactory() {}
 
+        /**
+         * Creates a fully configured Transport instance which could be a chain
+         * of filters and transports.
+         * @param location - URI location to connect to plus any properties to assign.
+         * @throws ActiveMQexception if an error occurs
+         */
+        virtual Pointer<Transport> create( const decaf::net::URI& location )
+            throw ( exceptions::ActiveMQException );
+
+        /**
+         * Creates a slimed down Transport instance which can be used in composite
+         * transport instances.
+         * @param location - URI location to connect to plus any properties to assign.
+         * @throws ActiveMQexception if an error occurs
+         */
+        virtual Pointer<Transport> createComposite( const decaf::net::URI& location )
+            throw ( exceptions::ActiveMQException );
+
     protected:
 
         /**

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.cpp?rev=752895&r1=752894&r2=752895&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.cpp Thu Mar 12 15:27:05 2009
@@ -19,24 +19,89 @@
 
 #include <activemq/transport/IOTransport.h>
 #include <activemq/transport/tcp/TcpTransport.h>
+#include <activemq/transport/correlator/ResponseCorrelator.h>
+#include <activemq/transport/logging/LoggingTransport.h>
+#include <activemq/util/URISupport.h>
+#include <activemq/wireformat/WireFormat.h>
+#include <decaf/util/Properties.h>
 
 using namespace activemq;
+using namespace activemq::util;
+using namespace activemq::wireformat;
 using namespace activemq::transport;
 using namespace activemq::transport::tcp;
+using namespace activemq::transport::correlator;
+using namespace activemq::transport::logging;
 using namespace activemq::exceptions;
 using namespace decaf;
 using namespace decaf::lang;
 
 ////////////////////////////////////////////////////////////////////////////////
+Pointer<Transport> TcpTransportFactory::create( const decaf::net::URI& location )
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+
+        Properties properties =
+            activemq::util::URISupport::parseQuery( location.getQuery() );
+
+        Pointer<WireFormat> wireFormat = this->createWireFormat( properties );
+
+        // Create the initial Transport, then wrap it in the normal Filters
+        Pointer<Transport> transport( doCreateComposite( location, wireFormat, properties ) );
+
+        // Create the Transport for response correlator
+        transport.reset( new ResponseCorrelator( transport ) );
+
+        // If command tracing was enabled, wrap the transport with a logging transport.
+        if( properties.getProperty( "transport.commandTracingEnabled", "false" ) == "true" ) {
+            // Create the Transport for response correlator
+            transport.reset( new LoggingTransport( transport ) );
+        }
+
+        return transport;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Transport> TcpTransportFactory::createComposite( const decaf::net::URI& location )
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+
+        Properties properties =
+            activemq::util::URISupport::parseQuery( location.getQuery() );
+
+        Pointer<WireFormat> wireFormat = this->createWireFormat( properties );
+
+        // Create the initial Transport, then wrap it in the normal Filters
+        return doCreateComposite( location, wireFormat, properties );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
 Pointer<Transport> TcpTransportFactory::doCreateComposite( const decaf::net::URI& location,
                                                            const Pointer<wireformat::WireFormat>& wireFormat,
                                                            const decaf::util::Properties& properties )
     throw ( exceptions::ActiveMQException ) {
 
     try {
-        return Pointer<Transport>(
-            new TcpTransport( location, properties,
-                              Pointer<Transport>( new IOTransport( wireFormat ) ) ) );
+
+        Pointer<Transport> transport( new TcpTransport(
+            location, properties, Pointer<Transport>( new IOTransport( wireFormat ) ) ) );
+
+        // If there is a negotiator need then we create and wrap here.
+        if( wireFormat->hasNegotiator() ) {
+            transport = wireFormat->createNegotiator( transport );
+        }
+
+        return transport;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.h?rev=752895&r1=752894&r2=752895&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.h Thu Mar 12 15:27:05 2009
@@ -36,6 +36,24 @@
 
         virtual ~TcpTransportFactory() {}
 
+        /**
+         * Creates a fully configured Transport instance which could be a chain
+         * of filters and transports.
+         * @param location - URI location to connect to plus any properties to assign.
+         * @throws ActiveMQexception if an error occurs
+         */
+        virtual Pointer<Transport> create( const decaf::net::URI& location )
+            throw ( exceptions::ActiveMQException );
+
+        /**
+         * Creates a slimed down Transport instance which can be used in composite
+         * transport instances.
+         * @param location - URI location to connect to plus any properties to assign.
+         * @throws ActiveMQexception if an error occurs
+         */
+        virtual Pointer<Transport> createComposite( const decaf::net::URI& location )
+            throw ( exceptions::ActiveMQException );
+
     protected:
 
         /**

Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp?rev=752895&r1=752894&r2=752895&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp Thu Mar 12 15:27:05 2009
@@ -248,8 +248,7 @@
         // Now all unmarshals from this level should result in an object
         // that is a commands::Command type, if its not then the cast will
         // throw an ClassCastException.
-        Pointer<Command> command =
-            data.dynamicCast<Command, Pointer<Command>::CounterType>();
+        Pointer<Command> command = data.dynamicCast<Command>();
 
         return command;
     }

Modified: activemq/activemq-cpp/trunk/src/main/decaf/lang/Pointer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/decaf/lang/Pointer.h?rev=752895&r1=752894&r2=752895&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/decaf/lang/Pointer.h (original)
+++ activemq/activemq-cpp/trunk/src/main/decaf/lang/Pointer.h Thu Mar 12 15:27:05 2009
@@ -28,7 +28,6 @@
 namespace decaf {
 namespace lang {
 
-    template <typename T>
     class DECAF_API AtomicRefCounter {
     private:
 
@@ -42,25 +41,17 @@
 
         AtomicRefCounter() :
             counter( new decaf::util::concurrent::atomic::AtomicInteger( 1 ) ) {}
-        AtomicRefCounter( T* value DECAF_UNUSED ) :
-            counter( new decaf::util::concurrent::atomic::AtomicInteger( 1 ) ) {}
         AtomicRefCounter( const AtomicRefCounter& other ) : counter( other.counter ) {
             this->counter->incrementAndGet();
         }
 
-        template< typename U >
-        AtomicRefCounter( const AtomicRefCounter<U>& other ) {
-            this->counter = reinterpret_cast<const AtomicRefCounter<T>& >( other ).counter;
-            this->counter->incrementAndGet();
-        }
-
         /**
          * Swaps this instance's reference counter with the one given, this allows
          * for copy-and-swap semantics of this object.
          *
          * @param the value to swap with this one's
          */
-        void swap( AtomicRefCounter<T>& other ) {
+        void swap( AtomicRefCounter& other ) {
             std::swap( this->counter, other.counter );
         }
 
@@ -80,62 +71,6 @@
         }
     };
 
-    template <typename T>
-    class DECAF_API InvasiveCounter {
-    private:
-
-        T* counter;
-
-    private:
-
-        InvasiveCounter& operator= ( const InvasiveCounter& );
-
-    public:
-
-        InvasiveCounter() : counter( NULL ) {}
-
-        InvasiveCounter( T* value ) : counter( value ) {
-
-            if( value != NULL ) {
-                value->addReference();
-            }
-        }
-
-        InvasiveCounter( const InvasiveCounter& other ) : counter( other.counter ) {
-            this->counter->addReference();
-        }
-
-        template< typename U >
-        InvasiveCounter( const InvasiveCounter<U>& other ) {
-            this->counter = reinterpret_cast< const InvasiveCounter<T>& >( other ).counter;
-            this->counter->addReference();
-        }
-
-        /**
-         * Swaps this instance's reference counter with the one given, this allows
-         * for copy-and-swap semantics of this object.
-         *
-         * @param the value to swap with this one's
-         */
-        void swap( InvasiveCounter<T>& other ) {
-            std::swap( this->counter, other.counter );
-        }
-
-        /**
-         * Removes a reference to the counter and returns if the counter
-         * has reached zero.
-         *
-         * @return true if the count is now zero.
-         */
-        bool release() {
-            if( this->counter != NULL ) {
-                return this->counter->releaseReference();
-            }
-
-            return false;
-        }
-    };
-
     // Used internally in Pointer.
     struct STATIC_CAST_TOKEN {};
     struct DYNAMIC_CAST_TOKEN {};
@@ -154,7 +89,7 @@
      *
      * @since 1.0
      */
-    template< typename T, typename REFCOUNTER = AtomicRefCounter<T> >
+    template< typename T, typename REFCOUNTER = AtomicRefCounter >
     class DECAF_API Pointer : public REFCOUNTER {
     private:
 
@@ -182,7 +117,7 @@
          *
          * @param value - instance of the type we are containing here.
          */
-        explicit Pointer( const PointerType value ) : REFCOUNTER( value ), value( value ) {
+        explicit Pointer( const PointerType value ) : REFCOUNTER(), value( value ) {
         }
 
         /**
@@ -369,14 +304,14 @@
             return !( this->value == right.get() );
         }
 
-        template< typename T1, typename R1 >
-        Pointer<T1, R1> dynamicCast() const {
-            return Pointer<T1, R1>( *this, DYNAMIC_CAST_TOKEN() );
+        template< typename T1 >
+        Pointer<T1, CounterType> dynamicCast() const {
+            return Pointer<T1, CounterType>( *this, DYNAMIC_CAST_TOKEN() );
         }
 
-        template< typename T1, typename R1 >
-        Pointer<T1, R1> staticCast() const {
-            return Pointer<T1, R1>( *this, STATIC_CAST_TOKEN() );
+        template< typename T1 >
+        Pointer<T1, CounterType> staticCast() const {
+            return Pointer<T1, CounterType>( *this, STATIC_CAST_TOKEN() );
         }
     };
 
@@ -416,7 +351,7 @@
      * to be compared based on the comparison of the object itself and not just the value of
      * the pointer.
      */
-    template< typename T, typename R = AtomicRefCounter<T> >
+    template< typename T, typename R = AtomicRefCounter >
     class PointerComparator : public decaf::util::Comparator< Pointer<T,R> > {
     public:
 

Modified: activemq/activemq-cpp/trunk/src/test/decaf/lang/PointerTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/decaf/lang/PointerTest.cpp?rev=752895&r1=752894&r2=752895&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/decaf/lang/PointerTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/decaf/lang/PointerTest.cpp Thu Mar 12 15:27:05 2009
@@ -20,6 +20,7 @@
 #include <decaf/lang/Pointer.h>
 #include <decaf/lang/Thread.h>
 #include <decaf/lang/Runnable.h>
+#include <decaf/lang/exceptions/ClassCastException.h>
 
 #include <map>
 #include <string>
@@ -27,6 +28,7 @@
 using namespace std;
 using namespace decaf;
 using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
 class TestClassBase {
@@ -367,50 +369,50 @@
     CPPUNIT_ASSERT( *( testMap2.rbegin()->first ) == 3 );
 }
 
-////////////////////////////////////////////////////////////////////////////////
-class SelfCounting {
-private:
-
-    int refCount;
-
-public:
-
-    SelfCounting() : refCount( 0 ) {}
-    SelfCounting( const SelfCounting& other ) : refCount( other.refCount ) {}
-
-    void addReference() { this->refCount++; }
-    bool releaseReference() { return !( --this->refCount ); }
-
-    std::string returnHello() { return "Hello"; }
-};
-
-////////////////////////////////////////////////////////////////////////////////
-void PointerTest::testInvasive() {
-
-    Pointer< SelfCounting, InvasiveCounter<SelfCounting> > thePointer( new SelfCounting );
-
-    // Test Null Initialize
-    Pointer< SelfCounting, InvasiveCounter<SelfCounting> > nullPointer;
-    CPPUNIT_ASSERT( nullPointer.get() == NULL );
-
-    // Test Value Constructor
-    Pointer< SelfCounting, InvasiveCounter<SelfCounting> > pointer( thePointer );
-    CPPUNIT_ASSERT( pointer.get() == thePointer );
-
-    // Test Copy Constructor
-    Pointer< SelfCounting, InvasiveCounter<SelfCounting> > ctorCopy( pointer );
-    CPPUNIT_ASSERT( ctorCopy.get() == thePointer );
-
-    // Test Assignment
-    Pointer< SelfCounting, InvasiveCounter<SelfCounting> > copy = pointer;
-    CPPUNIT_ASSERT( copy.get() == thePointer );
-
-    CPPUNIT_ASSERT( ( *pointer ).returnHello() == "Hello" );
-    CPPUNIT_ASSERT( pointer->returnHello() == "Hello" );
-
-    copy.reset( NULL );
-    CPPUNIT_ASSERT( copy.get() == NULL );
-}
+//////////////////////////////////////////////////////////////////////////////////
+//class SelfCounting {
+//private:
+//
+//    int refCount;
+//
+//public:
+//
+//    SelfCounting() : refCount( 0 ) {}
+//    SelfCounting( const SelfCounting& other ) : refCount( other.refCount ) {}
+//
+//    void addReference() { this->refCount++; }
+//    bool releaseReference() { return !( --this->refCount ); }
+//
+//    std::string returnHello() { return "Hello"; }
+//};
+//
+//////////////////////////////////////////////////////////////////////////////////
+//void PointerTest::testInvasive() {
+//
+//    Pointer< SelfCounting, InvasiveCounter<SelfCounting> > thePointer( new SelfCounting );
+//
+//    // Test Null Initialize
+//    Pointer< SelfCounting, InvasiveCounter<SelfCounting> > nullPointer;
+//    CPPUNIT_ASSERT( nullPointer.get() == NULL );
+//
+//    // Test Value Constructor
+//    Pointer< SelfCounting, InvasiveCounter<SelfCounting> > pointer( thePointer );
+//    CPPUNIT_ASSERT( pointer.get() == thePointer );
+//
+//    // Test Copy Constructor
+//    Pointer< SelfCounting, InvasiveCounter<SelfCounting> > ctorCopy( pointer );
+//    CPPUNIT_ASSERT( ctorCopy.get() == thePointer );
+//
+//    // Test Assignment
+//    Pointer< SelfCounting, InvasiveCounter<SelfCounting> > copy = pointer;
+//    CPPUNIT_ASSERT( copy.get() == thePointer );
+//
+//    CPPUNIT_ASSERT( ( *pointer ).returnHello() == "Hello" );
+//    CPPUNIT_ASSERT( pointer->returnHello() == "Hello" );
+//
+//    copy.reset( NULL );
+//    CPPUNIT_ASSERT( copy.get() == NULL );
+//}
 
 ////////////////////////////////////////////////////////////////////////////////
 TestClassBase* methodReturnRawPointer() {
@@ -429,3 +431,33 @@
 
     Pointer<TestClassBase> result = methodReturnPointer();
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void PointerTest::testDynamicCast() {
+
+    Pointer<TestClassBase> pointer1( new TestClassA );
+    Pointer<TestClassBase> pointer2( new TestClassB );
+
+    Pointer<TestClassA> ptrTestClassA;
+    CPPUNIT_ASSERT_NO_THROW(
+        ptrTestClassA = pointer1.dynamicCast<TestClassA>() );
+    CPPUNIT_ASSERT( ptrTestClassA != NULL );
+
+    Pointer<TestClassB> ptrTestClassB;
+    CPPUNIT_ASSERT_NO_THROW(
+        ptrTestClassB = pointer2.dynamicCast<TestClassB>() );
+    CPPUNIT_ASSERT( ptrTestClassB != NULL );
+
+    Pointer<TestClassA> ptrTestClassA2;
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should Throw a ClassCastException",
+        ptrTestClassA2 = pointer2.dynamicCast<TestClassA>(),
+        ClassCastException );
+
+    Pointer<TestClassBase> nullPointer;
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should Throw a ClassCastException",
+        ptrTestClassA2 = nullPointer.dynamicCast<TestClassA>(),
+        ClassCastException );
+
+}

Modified: activemq/activemq-cpp/trunk/src/test/decaf/lang/PointerTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/decaf/lang/PointerTest.h?rev=752895&r1=752894&r2=752895&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/decaf/lang/PointerTest.h (original)
+++ activemq/activemq-cpp/trunk/src/test/decaf/lang/PointerTest.h Thu Mar 12 15:27:05 2009
@@ -34,8 +34,8 @@
         CPPUNIT_TEST( testThreaded2 );
         CPPUNIT_TEST( testOperators );
         CPPUNIT_TEST( testSTLContainers );
-        CPPUNIT_TEST( testInvasive );
         CPPUNIT_TEST( testReturnByValue );
+        CPPUNIT_TEST( testDynamicCast );
         CPPUNIT_TEST_SUITE_END();
 
     public:
@@ -50,8 +50,8 @@
         void testThreaded2();
         void testOperators();
         void testSTLContainers();
-        void testInvasive();
         void testReturnByValue();
+        void testDynamicCast();
 
     };