You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/06/03 17:27:51 UTC

svn commit: r951033 [1/2] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/core/ main/activemq/core/policies/ main/activemq/util/ main/cms/ main/decaf/net/ test/ test/activemq/util/ test/decaf/net/

Author: tabish
Date: Thu Jun  3 15:27:50 2010
New Revision: 951033

URL: http://svn.apache.org/viewvc?rev=951033&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-293

Move configuration options into the ActiveMQConnectionFactory and delay the send of ConnectionInfo until the Connection is used to allow a user to specify the clientId.  Adds setClientID to cms::Connection.  Add class IdGenerator which creates client and connection ids using the same method as the java and .net clients to make ids consistent across the clients.  Add getLocalHost to class InetAddress.

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/IdGenerator.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/IdGenerator.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/util/IdGeneratorTest.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/util/IdGeneratorTest.h   (with props)
Removed:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConstants.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/policies/DefaultPrefetchPolicy.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/InetAddress.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/InetAddress.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/net/InetAddressTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/net/InetAddressTest.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=951033&r1=951032&r2=951033&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Thu Jun  3 15:27:50 2010
@@ -87,7 +87,6 @@ cc_sources = \
     activemq/core/ActiveMQConnection.cpp \
     activemq/core/ActiveMQConnectionFactory.cpp \
     activemq/core/ActiveMQConnectionMetaData.cpp \
-    activemq/core/ActiveMQConnectionSupport.cpp \
     activemq/core/ActiveMQConstants.cpp \
     activemq/core/ActiveMQConsumer.cpp \
     activemq/core/ActiveMQProducer.cpp \
@@ -139,6 +138,7 @@ cc_sources = \
     activemq/util/ActiveMQProperties.cpp \
     activemq/util/CMSExceptionSupport.cpp \
     activemq/util/CompositeData.cpp \
+    activemq/util/IdGenerator.cpp \
     activemq/util/LongSequenceGenerator.cpp \
     activemq/util/MarshallingSupport.cpp \
     activemq/util/MemoryUsage.cpp \
@@ -728,7 +728,6 @@ h_sources = \
     activemq/core/ActiveMQConnection.h \
     activemq/core/ActiveMQConnectionFactory.h \
     activemq/core/ActiveMQConnectionMetaData.h \
-    activemq/core/ActiveMQConnectionSupport.h \
     activemq/core/ActiveMQConstants.h \
     activemq/core/ActiveMQConsumer.h \
     activemq/core/ActiveMQProducer.h \
@@ -798,6 +797,7 @@ h_sources = \
     activemq/util/CMSExceptionSupport.h \
     activemq/util/CompositeData.h \
     activemq/util/Config.h \
+    activemq/util/IdGenerator.h \
     activemq/util/LongSequenceGenerator.h \
     activemq/util/MarshallingSupport.h \
     activemq/util/MemoryUsage.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=951033&r1=951032&r2=951033&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Thu Jun  3 15:27:50 2010
@@ -22,16 +22,21 @@
 #include <activemq/core/ActiveMQSession.h>
 #include <activemq/core/ActiveMQProducer.h>
 #include <activemq/core/ActiveMQConstants.h>
+#include <activemq/core/policies/DefaultPrefetchPolicy.h>
+#include <activemq/core/policies/DefaultRedeliveryPolicy.h>
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/exceptions/BrokerException.h>
 #include <activemq/util/CMSExceptionSupport.h>
+#include <activemq/util/IdGenerator.h>
 #include <activemq/transport/failover/FailoverTransport.h>
 
 #include <decaf/lang/Math.h>
 #include <decaf/lang/Boolean.h>
 #include <decaf/util/Iterator.h>
 #include <decaf/util/UUID.h>
+#include <decaf/util/concurrent/Mutex.h>
 #include <decaf/util/concurrent/TimeUnit.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
 
 #include <activemq/commands/Command.h>
 #include <activemq/commands/ActiveMQMessage.h>
@@ -57,6 +62,7 @@ using namespace std;
 using namespace cms;
 using namespace activemq;
 using namespace activemq::core;
+using namespace activemq::core::policies;
 using namespace activemq::commands;
 using namespace activemq::exceptions;
 using namespace activemq::transport;
@@ -69,27 +75,107 @@ using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
+namespace activemq{
+namespace core{
+
+    class ConnectionConfig {
+    public:
+
+        static util::IdGenerator CONNECTION_ID_GENERATOR;
+
+        Pointer<decaf::util::Properties> properties;
+        Pointer<transport::Transport> transport;
+        Pointer<util::IdGenerator> clientIdGenerator;
+
+        util::LongSequenceGenerator sessionIds;
+        util::LongSequenceGenerator tempDestinationIds;
+        util::LongSequenceGenerator localTransactionIds;
+
+        std::string brokerURL;
+
+        bool clientIDSet;
+        bool isConnectionInfoSentToBroker;
+        bool userSpecifiedClientID;
+        decaf::util::concurrent::Mutex ensureConnectionInfoSentMutex;
+
+        bool dispatchAsync;
+        bool alwaysSyncSend;
+        bool useAsyncSend;
+        bool useCompression;
+        unsigned int sendTimeout;
+        unsigned int closeTimeout;
+        unsigned int producerWindowSize;
+
+        cms::ExceptionListener* defaultListener;
+        std::auto_ptr<PrefetchPolicy> defaultPrefetchPolicy;
+        std::auto_ptr<RedeliveryPolicy> defaultRedeliveryPolicy;
+
+        cms::ExceptionListener* exceptionListener;
+
+        Pointer<commands::ConnectionInfo> connectionInfo;
+        Pointer<commands::BrokerInfo> brokerInfo;
+        Pointer<commands::WireFormatInfo> brokerWireFormatInfo;
+        Pointer<CountDownLatch> transportInterruptionProcessingComplete;
+
+        ConnectionConfig() : clientIDSet( false ),
+                             isConnectionInfoSentToBroker( false ),
+                             userSpecifiedClientID( false ),
+                             dispatchAsync( true ),
+                             alwaysSyncSend( false ),
+                             useAsyncSend( false ),
+                             useCompression( false ),
+                             sendTimeout( 0 ),
+                             closeTimeout( 15000 ),
+                             producerWindowSize( 0 ),
+                             defaultListener( NULL ),
+                             defaultPrefetchPolicy( NULL ),
+                             defaultRedeliveryPolicy( NULL ),
+                             exceptionListener( NULL ) {
+
+            this->defaultPrefetchPolicy.reset( new DefaultPrefetchPolicy() );
+            this->defaultRedeliveryPolicy.reset( new DefaultRedeliveryPolicy() );
+            this->clientIdGenerator.reset(new util::IdGenerator );
+            this->connectionInfo.reset( new ConnectionInfo() );
+
+            // Generate a connectionId
+            decaf::lang::Pointer<ConnectionId> connectionId( new ConnectionId() );
+            connectionId->setValue( CONNECTION_ID_GENERATOR.generateId() );
+            this->connectionInfo->setConnectionId( connectionId );
+        }
+
+    };
+
+    // Static init.
+    util::IdGenerator ConnectionConfig::CONNECTION_ID_GENERATOR;
+}}
+
+////////////////////////////////////////////////////////////////////////////////
 ActiveMQConnection::ActiveMQConnection( const Pointer<transport::Transport>& transport,
                                         const Pointer<decaf::util::Properties>& properties ) :
-    ActiveMQConnectionSupport( transport, properties ),
-    connectionMetaData( new ActiveMQConnectionMetaData() ),
-    connectionInfo( new ConnectionInfo() ),
-    exceptionListener( NULL ) {
+    config( NULL ),
+    connectionMetaData( new ActiveMQConnectionMetaData() ) {
+
+    Pointer<ConnectionConfig> configuration( new ConnectionConfig );
 
     // Register for messages and exceptions from the connector.
     transport->setTransportListener( this );
 
-    // Now Start the Transport
-    transport->start();
+    // Set the initial state of the ConnectionInfo
+    configuration->connectionInfo->setManageable( false );
+    configuration->connectionInfo->setFaultTolerant( transport->isFaultTolerant() );
+
+    // Store of the transport and properties, the Connection now owns them.
+    configuration->properties = properties;
+    configuration->transport = transport;
 
-    // Attempt to register with the Broker
-    this->connect();
+    this->config = configuration.release();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 ActiveMQConnection::~ActiveMQConnection() {
     try {
         this->close();
+        delete this->config;
     }
     AMQ_CATCH_NOTHROW( ActiveMQException )
     AMQ_CATCHALL_NOTHROW( )
@@ -137,13 +223,14 @@ cms::Session* ActiveMQConnection::create
 
     try {
 
-        enforceConnected();
+        checkClosed();
+        ensureConnectionInfoSent();
 
         // Create and initialize a new SessionInfo object
         Pointer<SessionInfo> sessionInfo( new SessionInfo() );
         decaf::lang::Pointer<SessionId> sessionId( new SessionId() );
-        sessionId->setConnectionId( connectionInfo->getConnectionId()->getValue() );
-        sessionId->setValue( this->getNextSessionId() );
+        sessionId->setConnectionId( this->config->connectionInfo->getConnectionId()->getValue() );
+        sessionId->setValue( this->config->sessionIds.getNextSequenceId() );
         sessionInfo->setSessionId( sessionId );
         sessionInfo->setAckMode( ackMode );
 
@@ -152,7 +239,7 @@ cms::Session* ActiveMQConnection::create
 
         // Create the session instance.
         ActiveMQSession* session = new ActiveMQSession(
-            sessionInfo, ackMode, this->getProperties(), this );
+            sessionInfo, ackMode, *this->config->properties, this );
 
         // Add the session to the set of active sessions.
         synchronized( &activeSessions ) {
@@ -218,7 +305,37 @@ std::string ActiveMQConnection::getClien
         return "";
     }
 
-    return this->getClientId();
+    return this->config->connectionInfo->getClientId();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setClientID( const std::string& clientID ) {
+
+    if( this->closed.get() ) {
+        throw cms::IllegalStateException( "Connection is already closed", NULL );
+    }
+
+    if( this->config->clientIDSet ) {
+        throw cms::IllegalStateException( "Client ID is already set", NULL );
+    }
+
+    if( this->config->isConnectionInfoSentToBroker ) {
+        throw cms::IllegalStateException( "Cannot set client Id on a Connection already in use.", NULL );
+    }
+
+    if( clientID.empty() ) {
+        throw cms::InvalidClientIdException( "Client ID cannot be an empty string", NULL );
+    }
+
+    this->config->connectionInfo->setClientId( clientID );
+    this->config->userSpecifiedClientID = true;
+    ensureConnectionInfoSent();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setDefaultClientId( const std::string& clientId ) {
+    this->setClientID( clientId );
+    this->config->userSpecifiedClientID = true;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -275,7 +392,8 @@ void ActiveMQConnection::close() throw (
 void ActiveMQConnection::start() throw ( cms::CMSException ) {
     try{
 
-        enforceConnected();
+        checkClosed();
+        ensureConnectionInfoSent();
 
         // This starts or restarts the delivery of all incoming messages
         // messages delivered while this connection is stopped are dropped
@@ -297,7 +415,7 @@ void ActiveMQConnection::stop() throw ( 
 
     try {
 
-        enforceConnected();
+        checkClosed();
 
         // Once current deliveries are done this stops the delivery of any
         // new messages.
@@ -313,46 +431,13 @@ void ActiveMQConnection::stop() throw ( 
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::connect() throw ( activemq::exceptions::ActiveMQException ) {
-
-    try{
-
-        // Start the Transport
-        this->startupTransport();
-
-        // Fill in our connection info.
-        connectionInfo->setUserName( this->getUsername() );
-        connectionInfo->setPassword( this->getPassword() );
-
-        // Get or Create a Client Id
-        string clientId = this->getClientId();
-        if( clientId.length() > 0 ){
-            connectionInfo->setClientId( clientId );
-        } else {
-            connectionInfo->setClientId( UUID::randomUUID().toString() );
-        }
-
-        // Generate a connectionId
-        decaf::lang::Pointer<ConnectionId> connectionId( new ConnectionId() );
-        connectionId->setValue( UUID::randomUUID().toString() );
-        connectionInfo->setConnectionId( connectionId );
-
-        // Now we ping the broker and see if we get an ack / nack
-        syncRequest( connectionInfo );
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::disconnect( long long lastDeliveredSequenceId )
     throw ( activemq::exceptions::ActiveMQException ) {
 
     try{
 
         // Remove our ConnectionId from the Broker
-        Pointer<RemoveInfo> command( this->connectionInfo->createRemoveCommand() );
+        Pointer<RemoveInfo> command( this->config->connectionInfo->createRemoveCommand() );
         command->setLastDeliveredSequenceId( lastDeliveredSequenceId );
         this->syncRequest( command, this->getCloseTimeout() );
 
@@ -361,7 +446,40 @@ void ActiveMQConnection::disconnect( lon
         oneway( shutdown );
 
         // Allow the Support class to shutdown its resources, including the Transport.
-        this->shutdownTransport();
+        bool hasException = false;
+        exceptions::ActiveMQException e;
+
+        if( this->config->transport != NULL ){
+
+            // Clear the listener, we don't care about errors at this point.
+            this->config->transport->setTransportListener( NULL );
+
+            try{
+                this->config->transport->close();
+            }catch( exceptions::ActiveMQException& ex ){
+                if( !hasException ){
+                    hasException = true;
+                    ex.setMark(__FILE__, __LINE__ );
+                    e = ex;
+                }
+            }
+
+            try{
+                this->config->transport.reset( NULL );
+            }catch( exceptions::ActiveMQException& ex ){
+                if( !hasException ){
+                    hasException = true;
+                    ex.setMark(__FILE__, __LINE__ );
+                    e = ex;
+                }
+            }
+        }
+
+        // If we encountered an exception - throw the first one we encountered.
+        // This will preserve the stack trace for logging purposes.
+        if( hasException ){
+            throw e;
+        }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -403,11 +521,12 @@ void ActiveMQConnection::destroyDestinat
                 __FILE__, __LINE__, "Destination passed was NULL" );
         }
 
-        enforceConnected();
+        checkClosed();
+        ensureConnectionInfoSent();
 
         Pointer<DestinationInfo> command( new DestinationInfo() );
 
-        command->setConnectionId( connectionInfo->getConnectionId() );
+        command->setConnectionId( this->config->connectionInfo->getConnectionId() );
         command->setOperationType( ActiveMQConstants::DESTINATION_REMOVE_OPERATION );
         command->setDestination( Pointer<ActiveMQDestination>( destination->cloneDataStructure() ) );
 
@@ -435,7 +554,8 @@ void ActiveMQConnection::destroyDestinat
                 __FILE__, __LINE__, "Destination passed was NULL" );
         }
 
-        enforceConnected();
+        checkClosed();
+        ensureConnectionInfoSent();
 
         const ActiveMQDestination* amqDestination =
             dynamic_cast<const ActiveMQDestination*>( destination );
@@ -499,10 +619,10 @@ void ActiveMQConnection::onCommand( cons
             }
 
         } else if( command->isWireFormatInfo() ) {
-            this->brokerWireFormatInfo =
+            this->config->brokerWireFormatInfo =
                 command.dynamicCast<WireFormatInfo>();
         } else if( command->isBrokerInfo() ) {
-            this->brokerInfo =
+            this->config->brokerInfo =
                 command.dynamicCast<BrokerInfo>();
         } else if( command->isShutdownInfo() ) {
 
@@ -569,7 +689,7 @@ void ActiveMQConnection::onException( co
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::transportInterrupted() {
 
-    transportInterruptionProcessingComplete.reset( new CountDownLatch( (int)dispatchers.size() ) );
+    this->config->transportInterruptionProcessingComplete.reset( new CountDownLatch( (int)dispatchers.size() ) );
 
     synchronized( &activeSessions ) {
         std::auto_ptr< Iterator<ActiveMQSession*> > iter( this->activeSessions.iterator() );
@@ -611,8 +731,8 @@ void ActiveMQConnection::oneway( Pointer
     throw ( ActiveMQException ) {
 
     try {
-        enforceConnected();
-        this->getTransport().oneway( command );
+        checkClosed();
+        this->config->transport->oneway( command );
     }
     AMQ_CATCH_EXCEPTION_CONVERT( IOException, ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( decaf::lang::exceptions::UnsupportedOperationException, ActiveMQException )
@@ -626,14 +746,14 @@ void ActiveMQConnection::syncRequest( Po
 
     try {
 
-        enforceConnected();
+        checkClosed();
 
         Pointer<Response> response;
 
         if( timeout == 0 ) {
-            response = this->getTransport().request( command );
+            response = this->config->transport->request( command );
         } else {
-            response = this->getTransport().request( command, timeout );
+            response = this->config->transport->request( command, timeout );
         }
 
         commands::ExceptionResponse* exceptionResponse =
@@ -656,19 +776,52 @@ void ActiveMQConnection::syncRequest( Po
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::enforceConnected() const throw ( ActiveMQException ) {
+void ActiveMQConnection::checkClosed() const throw ( ActiveMQException ) {
     if( this->isClosed() ) {
         throw ActiveMQException(
             __FILE__, __LINE__,
-            "ActiveMQConnection::enforceConnected - Not Connected!" );
+            "ActiveMQConnection::enforceConnected - Connection has already been closed!" );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::ensureConnectionInfoSent() {
+
+    try{
+
+        // Can we skip sending the ConnectionInfo packet, cheap test
+        if( this->config->isConnectionInfoSentToBroker || closed.get() ) {
+            return;
+        }
+
+        synchronized( &( this->config->ensureConnectionInfoSentMutex ) ) {
+
+            // Can we skip sending the ConnectionInfo packet??
+            if( this->config->isConnectionInfoSentToBroker || closed.get() ) {
+                return;
+            }
+
+            // check for a user specified Id
+            if( !this->config->userSpecifiedClientID ) {
+                this->config->connectionInfo->setClientId( this->config->clientIdGenerator->generateId() );
+            }
+
+            // Now we ping the broker and see if we get an ack / nack
+            syncRequest( this->config->connectionInfo );
+
+            this->config->isConnectionInfoSentToBroker = true;
+        }
     }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::fire( const ActiveMQException& ex ) {
-    if( exceptionListener != NULL ) {
+    if( this->config->exceptionListener != NULL ) {
         try {
-            exceptionListener->onException( ex.convertToCMSException() );
+            this->config->exceptionListener->onException( ex.convertToCMSException() );
         }
         catch(...){}
     }
@@ -678,18 +831,18 @@ void ActiveMQConnection::fire( const Act
 const ConnectionInfo& ActiveMQConnection::getConnectionInfo() const
     throw( ActiveMQException ) {
 
-    enforceConnected();
+    checkClosed();
 
-    return *this->connectionInfo;
+    return *this->config->connectionInfo;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 const ConnectionId& ActiveMQConnection::getConnectionId() const
     throw( ActiveMQException ) {
 
-    enforceConnected();
+    checkClosed();
 
-    return *( this->connectionInfo->getConnectionId() );
+    return *( this->config->connectionInfo->getConnectionId() );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -722,10 +875,10 @@ void ActiveMQConnection::removeTransport
 void ActiveMQConnection::waitForTransportInterruptionProcessingToComplete()
     throw( decaf::lang::exceptions::InterruptedException ) {
 
-    if( transportInterruptionProcessingComplete != NULL ) {
+    if( this->config->transportInterruptionProcessingComplete != NULL ) {
 
         while( !closed.get() && !transportFailed.get() &&
-               !transportInterruptionProcessingComplete->await( 15, TimeUnit::SECONDS) ) {
+               !this->config->transportInterruptionProcessingComplete->await( 15, TimeUnit::SECONDS) ) {
 
             //LOG.warn( "dispatch paused, waiting for outstanding dispatch interruption processing (" +
             //          transportInterruptionProcessingComplete.getCount() + ") to complete..");
@@ -740,8 +893,8 @@ void ActiveMQConnection::setTransportInt
 
     synchronized( &mutex ) {
 
-        if( transportInterruptionProcessingComplete != NULL ) {
-            transportInterruptionProcessingComplete->countDown();
+        if( this->config->transportInterruptionProcessingComplete != NULL ) {
+            this->config->transportInterruptionProcessingComplete->countDown();
 
             try {
                 signalInterruptionProcessingComplete();
@@ -754,16 +907,16 @@ void ActiveMQConnection::setTransportInt
 void ActiveMQConnection::signalInterruptionProcessingComplete()
     throw( decaf::lang::exceptions::InterruptedException ) {
 
-    if( transportInterruptionProcessingComplete->await( 0, TimeUnit::SECONDS ) ) {
+    if( this->config->transportInterruptionProcessingComplete->await( 0, TimeUnit::SECONDS ) ) {
         synchronized( &mutex ) {
 
-            transportInterruptionProcessingComplete.reset( NULL );
+            this->config->transportInterruptionProcessingComplete.reset( NULL );
             FailoverTransport* failoverTransport =
-                dynamic_cast<FailoverTransport*>( this->getTransport().narrow( typeid( FailoverTransport ) ) );
+                dynamic_cast<FailoverTransport*>( this->config->transport->narrow( typeid( FailoverTransport ) ) );
 
             if( failoverTransport != NULL ) {
                 failoverTransport->setConnectionInterruptProcessingComplete(
-                    this->connectionInfo->getConnectionId() );
+                    this->config->connectionInfo->getConnectionId() );
 
                 //if( LOG.isDebugEnabled() ) {
                 //    LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
@@ -772,3 +925,153 @@ void ActiveMQConnection::signalInterrupt
         }
     }
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setUsername( const std::string& username ) {
+    this->config->connectionInfo->setUserName( username );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+const std::string& ActiveMQConnection::getUsername() const {
+    return this->config->connectionInfo->getUserName();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setPassword( const std::string& password ){
+    this->config->connectionInfo->setPassword( password );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+const std::string& ActiveMQConnection::getPassword() const {
+    return this->config->connectionInfo->getPassword();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setBrokerURL( const std::string& brokerURL ){
+    this->config->brokerURL = brokerURL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+const std::string& ActiveMQConnection::getBrokerURL() const {
+    return this->config->brokerURL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setExceptionListener( cms::ExceptionListener* listener ) {
+    this->config->defaultListener = listener;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::ExceptionListener* ActiveMQConnection::getExceptionListener() const {
+    return this->config->defaultListener;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setPrefetchPolicy( PrefetchPolicy* policy ) {
+    this->config->defaultPrefetchPolicy.reset( policy );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+PrefetchPolicy* ActiveMQConnection::getPrefetchPolicy() const {
+    return this->config->defaultPrefetchPolicy.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setRedeliveryPolicy( RedeliveryPolicy* policy ) {
+    this->config->defaultRedeliveryPolicy.reset( policy );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+RedeliveryPolicy* ActiveMQConnection::getRedeliveryPolicy() const {
+    return this->config->defaultRedeliveryPolicy.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isDispatchAsync() const {
+    return this->config->dispatchAsync;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setDispatchAsync( bool value ) {
+    this->config->dispatchAsync = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isAlwaysSyncSend() const {
+    return this->config->alwaysSyncSend;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setAlwaysSyncSend( bool value ) {
+    this->config->alwaysSyncSend = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isUseAsyncSend() const {
+    return this->config->useAsyncSend;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setUseAsyncSend( bool value ) {
+    this->config->useAsyncSend = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isUseCompression() const {
+    return this->config->useCompression;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setUseCompression( bool value ) {
+    this->config->useCompression = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int ActiveMQConnection::getSendTimeout() const {
+    return this->config->sendTimeout;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setSendTimeout( unsigned int timeout ) {
+    this->config->sendTimeout = timeout;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int ActiveMQConnection::getCloseTimeout() const {
+    return this->config->closeTimeout;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setCloseTimeout( unsigned int timeout ) {
+    this->config->closeTimeout = timeout;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int ActiveMQConnection::getProducerWindowSize() const {
+    return this->config->producerWindowSize;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setProducerWindowSize( unsigned int windowSize ) {
+    this->config->producerWindowSize = windowSize;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConnection::getNextSessionId() {
+    return this->config->sessionIds.getNextSequenceId();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConnection::getNextTempDestinationId() {
+    return this->config->tempDestinationIds.getNextSequenceId();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConnection::getNextLocalTransactionId() {
+    return this->config->localTransactionIds.getNextSequenceId();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+transport::Transport& ActiveMQConnection::getTransport() const {
+    return *( this->config->transport );
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=951033&r1=951032&r2=951033&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Thu Jun  3 15:27:50 2010
@@ -20,22 +20,16 @@
 
 #include <cms/Connection.h>
 #include <activemq/util/Config.h>
-#include <activemq/core/ActiveMQConnectionSupport.h>
 #include <activemq/core/ActiveMQConnectionMetaData.h>
 #include <activemq/core/Dispatcher.h>
 #include <activemq/commands/ActiveMQTempDestination.h>
-#include <activemq/commands/BrokerInfo.h>
 #include <activemq/commands/ConnectionInfo.h>
 #include <activemq/commands/ConsumerInfo.h>
-#include <activemq/commands/ProducerInfo.h>
-#include <activemq/commands/LocalTransactionId.h>
-#include <activemq/commands/WireFormatInfo.h>
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/transport/TransportListener.h>
 #include <decaf/util/Properties.h>
 #include <decaf/util/StlMap.h>
 #include <decaf/util/StlSet.h>
-#include <decaf/util/concurrent/CountDownLatch.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/lang/exceptions/UnsupportedOperationException.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
@@ -49,18 +43,21 @@ namespace core{
 
     using decaf::lang::Pointer;
     using decaf::util::concurrent::atomic::AtomicBoolean;
-    using decaf::util::concurrent::CountDownLatch;
 
     class ActiveMQSession;
     class ActiveMQProducer;
+    class ConnectionConfig;
+    class PrefetchPolicy;
+    class RedeliveryPolicy;
 
     /**
      * Concrete connection used for all connectors to the
      * ActiveMQ broker.
+     *
+     * @since 2.0
      */
     class AMQCPP_API ActiveMQConnection : public cms::Connection,
-                                          public ActiveMQConnectionSupport
-    {
+                                          public transport::TransportListener {
     private:
 
         typedef decaf::util::StlMap< Pointer<commands::ConsumerId>,
@@ -73,6 +70,8 @@ namespace core{
 
     private:
 
+        ConnectionConfig* config;
+
         /**
          * Sync object.
          */
@@ -84,11 +83,6 @@ namespace core{
         std::auto_ptr<cms::ConnectionMetaData> connectionMetaData;
 
         /**
-         * Connection Information for this connection to the Broker
-         */
-        Pointer<commands::ConnectionInfo> connectionInfo;
-
-        /**
          * Indicates if this Connection is started
          */
         AtomicBoolean started;
@@ -130,27 +124,6 @@ namespace core{
          */
         decaf::util::StlSet<transport::TransportListener*> transportListeners;
 
-        /**
-         * the registered exception listener
-         */
-        cms::ExceptionListener* exceptionListener;
-
-        /**
-         * Command sent from the Broker with its BrokerInfo
-         */
-        Pointer<commands::BrokerInfo> brokerInfo;
-
-        /**
-         * Command sent from the Broker with its WireFormatInfo
-         */
-        Pointer<commands::WireFormatInfo> brokerWireFormatInfo;
-
-        /**
-         * Latch used to track completion of recovery of consumers
-         * after a Connection Interrupted event.
-         */
-        Pointer<CountDownLatch> transportInterruptionProcessingComplete;
-
     private:
 
         ActiveMQConnection( const ActiveMQConnection& );
@@ -314,6 +287,16 @@ namespace core{
         virtual cms::Session* createSession() throw ( cms::CMSException );
 
         /**
+         * {@inheritDoc}
+         */
+        virtual std::string getClientID() const;
+
+        /**
+         * {@inheritDoc}
+         */
+        virtual void setClientID( const std::string& clientID );
+
+        /**
          * Creates a new Session to work for this Connection using the
          * specified acknowledgment mode
          * @param ackMode the Acknowledgment Mode to use.
@@ -323,12 +306,6 @@ namespace core{
             throw ( cms::CMSException );
 
         /**
-         * Get the Client Id for this session
-         * @return string version of Client Id
-         */
-        virtual std::string getClientID() const;
-
-        /**
          * Closes this connection as well as any Sessions
          * created from it (and those Sessions' consumers and
          * producers).
@@ -352,16 +329,210 @@ namespace core{
          * Gets the registered Exception Listener for this connection
          * @return pointer to an exception listener or NULL
          */
-        virtual cms::ExceptionListener* getExceptionListener() const{
-            return exceptionListener; };
+        virtual cms::ExceptionListener* getExceptionListener() const;
 
         /**
          * Sets the registered Exception Listener for this connection
          * @param listener pointer to and <code>ExceptionListener</code>
          */
-        virtual void setExceptionListener( cms::ExceptionListener* listener ){
-            exceptionListener = listener;
-        };
+        virtual void setExceptionListener( cms::ExceptionListener* listener );
+
+    public:   // Configuration Options
+
+        /**
+         * Sets the username that should be used when creating a new connection
+         * @param username string
+         */
+        void setUsername( const std::string& username );
+
+        /**
+         * Gets the username that this factory will use when creating a new
+         * connection instance.
+         * @return username string, "" for default credentials
+         */
+        const std::string& getUsername() const;
+
+        /**
+         * Sets the password that should be used when creating a new connection
+         * @param password string
+         */
+        void setPassword( const std::string& password );
+
+        /**
+         * Gets the password that this factory will use when creating a new
+         * connection instance.
+         * @return password string, "" for default credentials
+         */
+        const std::string& getPassword() const;
+
+        /**
+         * Sets the Client Id.
+         * @param clientId - The new clientId value.
+         */
+        void setDefaultClientId( const std::string& clientId );
+
+        /**
+         * Sets the Broker URL that should be used when creating a new
+         * connection instance
+         * @param brokerURL string
+         */
+        void setBrokerURL( const std::string& brokerURL );
+
+        /**
+         * Gets the Broker URL that this factory will use when creating a new
+         * connection instance.
+         * @return brokerURL string
+         */
+        const std::string& getBrokerURL() const;
+
+        /**
+         * Sets the PrefetchPolicy instance that this factory should use when it creates
+         * new Connection instances.  The PrefetchPolicy passed becomes the property of the
+         * factory and will be deleted when the factory is destroyed.
+         *
+         * @param policy
+         *      The new PrefetchPolicy that the ConnectionFactory should clone for Connections.
+         */
+        void setPrefetchPolicy( PrefetchPolicy* policy );
+
+        /**
+         * Gets the pointer to the current PrefetchPolicy that is in use by this ConnectionFactory.
+         *
+         * @returns a pointer to this objects PrefetchPolicy.
+         */
+        PrefetchPolicy* getPrefetchPolicy() const;
+
+        /**
+         * Sets the RedeliveryPolicy instance that this factory should use when it creates
+         * new Connection instances.  The RedeliveryPolicy passed becomes the property of the
+         * factory and will be deleted when the factory is destroyed.
+         *
+         * @param policy
+         *      The new RedeliveryPolicy that the ConnectionFactory should clone for Connections.
+         */
+        void setRedeliveryPolicy( RedeliveryPolicy* policy );
+
+        /**
+         * Gets the pointer to the current RedeliveryPolicy that is in use by this ConnectionFactory.
+         *
+         * @returns a pointer to this objects RedeliveryPolicy.
+         */
+        RedeliveryPolicy* getRedeliveryPolicy() const;
+
+        /**
+         * @return The value of the dispatch asynchronously option sent to the broker.
+         */
+        bool isDispatchAsync() const;
+
+        /**
+         * Should messages be dispatched synchronously or asynchronously from the producer
+         * thread for non-durable topics in the broker? For fast consumers set this to false.
+         * For slow consumers set it to true so that dispatching will not block fast consumers. .
+         *
+         * @param value
+         *        The value of the dispatch asynchronously option sent to the broker.
+         */
+        void setDispatchAsync( bool value );
+
+        /**
+         * Gets if the Connection should always send things Synchronously.
+         *
+         * @return true if sends should always be Synchronous.
+         */
+        bool isAlwaysSyncSend() const;
+
+        /**
+         * Sets if the Connection should always send things Synchronously.
+         * @param value
+         *        true if sends should always be Synchronous.
+         */
+        void setAlwaysSyncSend( bool value );
+
+        /**
+         * Gets if the useAsyncSend option is set
+         * @returns true if on false if not.
+         */
+        bool isUseAsyncSend() const;
+
+        /**
+         * Sets the useAsyncSend option
+         * @param value - true to activate, false to disable.
+         */
+        void setUseAsyncSend( bool value );
+
+        /**
+         * Gets if the Connection is configured for Message body compression.
+         * @returns if the Message body will be Compressed or not.
+         */
+        bool isUseCompression() const;
+
+        /**
+         * Sets whether Message body compression is enabled.
+         *
+         * @param value
+         *      Boolean indicating if Message body compression is enabled.
+         */
+        void setUseCompression( bool value );
+
+        /**
+         * Gets the assigned send timeout for this Connector
+         * @return the send timeout configured in the connection uri
+         */
+        unsigned int getSendTimeout() const;
+
+        /**
+         * Sets the send timeout to use when sending Message objects, this will
+         * cause all messages to be sent using a Synchronous request is non-zero.
+         * @param timeout - The time to wait for a response.
+         */
+        void setSendTimeout( unsigned int timeout );
+
+        /**
+         * Gets the assigned close timeout for this Connector
+         * @return the close timeout configured in the connection uri
+         */
+        unsigned int getCloseTimeout() const;
+
+        /**
+         * Sets the close timeout to use when sending the disconnect request.
+         * @param timeout - The time to wait for a close message.
+         */
+        void setCloseTimeout( unsigned int timeout );
+
+        /**
+         * Gets the configured producer window size for Producers that are created
+         * from this connector.  This only applies if there is no send timeout and the
+         * producer is able to send asynchronously.
+         * @return size in bytes of messages that this producer can produce before
+         *         it must block and wait for ProducerAck messages to free resources.
+         */
+        unsigned int getProducerWindowSize() const;
+
+        /**
+         * Sets the size in Bytes of messages that a producer can send before it is blocked
+         * to await a ProducerAck from the broker that frees enough memory to allow another
+         * message to be sent.
+         * @param windowSize - The size in bytes of the Producers memory window.
+         */
+        void setProducerWindowSize( unsigned int windowSize );
+
+        /**
+         * Get the Next available Session Id.
+         * @return the next id in the sequence.
+         */
+        long long getNextSessionId();
+
+        /**
+         * Get the Next Temporary Destination Id
+         * @return the next id in the sequence.
+         */
+        long long getNextTempDestinationId();
+
+        /**
+         * Get the Next Temporary Destination Id
+         * @return the next id in the sequence.
+         */
+        long long getNextLocalTransactionId();
 
     public: // TransportListener
 
@@ -428,6 +599,13 @@ namespace core{
             throw( exceptions::ActiveMQException );
 
         /**
+         * Gets a reference to this object's Transport instance.
+         *
+         * @return a reference to the Transport that is in use by this Connection.
+         */
+        transport::Transport& getTransport() const;
+
+        /**
          * Sends a oneway message.
          * @param command The message to send.
          * @throws ConnectorException if not currently connected, or
@@ -461,14 +639,14 @@ namespace core{
 
     private:
 
-        // Sends the connect message to the broker and waits for the response.
-        void connect() throw ( activemq::exceptions::ActiveMQException );
-
         // Sends a oneway disconnect message to the broker.
         void disconnect( long long lastDeliveredSequenceId ) throw ( activemq::exceptions::ActiveMQException );
 
-        // Check for Connected State and Throw an exception if not.
-        void enforceConnected() const throw ( activemq::exceptions::ActiveMQException );
+        // Check for Closed State and Throw an exception if true.
+        void checkClosed() const throw ( activemq::exceptions::ActiveMQException );
+
+        // If its not been sent, then send the ConnectionInfo to the Broker.
+        void ensureConnectionInfoSent();
 
         // Waits for all Consumers to handle the Transport Interrupted event.
         void waitForTransportInterruptionProcessingToComplete()

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp?rev=951033&r1=951032&r2=951033&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp Thu Jun  3 15:27:50 2010
@@ -17,20 +17,24 @@
 #include "ActiveMQConnectionFactory.h"
 
 #include <decaf/net/URI.h>
-#include <decaf/util/UUID.h>
 #include <decaf/util/Properties.h>
+#include <decaf/lang/Boolean.h>
+#include <decaf/lang/Integer.h>
 #include <decaf/lang/Pointer.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
 #include <activemq/exceptions/ExceptionDefines.h>
 #include <activemq/transport/TransportRegistry.h>
 #include <activemq/core/ActiveMQConnection.h>
 #include <activemq/core/ActiveMQConstants.h>
+#include <activemq/core/policies/DefaultPrefetchPolicy.h>
+#include <activemq/core/policies/DefaultRedeliveryPolicy.h>
 #include <activemq/util/URISupport.h>
 #include <memory>
 
 using namespace std;
 using namespace activemq;
 using namespace activemq::core;
+using namespace activemq::core::policies;
 using namespace activemq::exceptions;
 using namespace activemq::transport;
 using namespace decaf;
@@ -40,19 +44,105 @@ using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
+const std::string ActiveMQConnectionFactory::DEFAULT_URI = "failover:(tcp://localhost:61616)";
+
+////////////////////////////////////////////////////////////////////////////////
 namespace activemq{
 namespace core{
 
     class FactorySettings {
     public:
 
+        Pointer<Properties> properties;
+
         std::string username;
         std::string password;
+        std::string clientId;
+
         std::string brokerURL;
 
+        bool dispatchAsync;
+        bool alwaysSyncSend;
+        bool useAsyncSend;
+        bool useCompression;
+        unsigned int sendTimeout;
+        unsigned int closeTimeout;
+        unsigned int producerWindowSize;
+
         cms::ExceptionListener* defaultListener;
+        std::auto_ptr<PrefetchPolicy> defaultPrefetchPolicy;
+        std::auto_ptr<RedeliveryPolicy> defaultRedeliveryPolicy;
 
-        FactorySettings() : brokerURL("failover:(tcp://localhost:61616)"), defaultListener( NULL ) {
+        FactorySettings() : brokerURL( ActiveMQConnectionFactory::DEFAULT_URI ),
+                            dispatchAsync( true ),
+                            alwaysSyncSend( false ),
+                            useAsyncSend( false ),
+                            useCompression( false ),
+                            sendTimeout( 0 ),
+                            closeTimeout( 15000 ),
+                            producerWindowSize( 0 ),
+                            defaultListener( NULL ),
+                            defaultPrefetchPolicy( NULL ),
+                            defaultRedeliveryPolicy( NULL ) {
+
+            this->properties.reset( new Properties() );
+            this->defaultPrefetchPolicy.reset( new DefaultPrefetchPolicy() );
+            this->defaultRedeliveryPolicy.reset( new DefaultRedeliveryPolicy() );
+        }
+
+        void updateConfiguration( const URI& uri ) {
+
+            this->brokerURL = uri.toString();
+            this->properties->clear();
+            activemq::util::URISupport::parseQuery( uri.getQuery(), properties.get() );
+
+            // Check the connection options
+            this->alwaysSyncSend = Boolean::parseBoolean(
+                properties->getProperty(
+                    core::ActiveMQConstants::toString(
+                        core::ActiveMQConstants::CONNECTION_ALWAYSSYNCSEND ), "false" ) );
+
+            this->useAsyncSend = Boolean::parseBoolean(
+                properties->getProperty(
+                    core::ActiveMQConstants::toString(
+                        core::ActiveMQConstants::CONNECTION_USEASYNCSEND ), "false" ) );
+
+            this->useCompression = Boolean::parseBoolean(
+                properties->getProperty(
+                    core::ActiveMQConstants::toString(
+                        core::ActiveMQConstants::CONNECTION_USECOMPRESSION ), "false" ) );
+
+            this->dispatchAsync = Boolean::parseBoolean(
+                properties->getProperty(
+                    core::ActiveMQConstants::toString(
+                        core::ActiveMQConstants::CONNECTION_DISPATCHASYNC ), "true" ) );
+
+            this->producerWindowSize = decaf::lang::Integer::parseInt(
+                properties->getProperty(
+                    core::ActiveMQConstants::toString(
+                        core::ActiveMQConstants::CONNECTION_PRODUCERWINDOWSIZE ), "0" ) );
+
+            this->sendTimeout = decaf::lang::Integer::parseInt(
+                properties->getProperty(
+                    core::ActiveMQConstants::toString(
+                        core::ActiveMQConstants::CONNECTION_SENDTIMEOUT ), "0" ) );
+
+            this->closeTimeout = decaf::lang::Integer::parseInt(
+                properties->getProperty(
+                    core::ActiveMQConstants::toString(
+                        core::ActiveMQConstants::CONNECTION_CLOSETIMEOUT ), "15000" ) );
+
+            this->clientId = properties->getProperty(
+                core::ActiveMQConstants::toString(
+                    core::ActiveMQConstants::PARAM_CLIENTID ), "" );
+
+            this->username = properties->getProperty(
+                core::ActiveMQConstants::toString(
+                    core::ActiveMQConstants::PARAM_USERNAME ), "" );
+
+            this->password = properties->getProperty(
+                core::ActiveMQConstants::toString(
+                    core::ActiveMQConstants::PARAM_PASSWORD ), "" );
         }
 
     };
@@ -93,7 +183,7 @@ ActiveMQConnectionFactory::~ActiveMQConn
 cms::Connection* ActiveMQConnectionFactory::createConnection()
     throw ( cms::CMSException ) {
 
-    return createConnection( settings->brokerURL, settings->username, settings->password, UUID::randomUUID().toString() );
+    return createConnection( settings->brokerURL, settings->username, settings->password, settings->clientId );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -102,7 +192,7 @@ cms::Connection* ActiveMQConnectionFacto
     const std::string& password )
         throw ( cms::CMSException ) {
 
-    return createConnection( settings->brokerURL, username, password, UUID::randomUUID().toString() );
+    return createConnection( settings->brokerURL, username, password, settings->clientId );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -123,33 +213,26 @@ cms::Connection* ActiveMQConnectionFacto
     throw ( cms::CMSException ) {
 
     Pointer<Transport> transport;
-    Pointer<Properties> properties( new Properties() );
     auto_ptr<ActiveMQConnection> connection;
-    std::string clientIdLocal = clientId;
 
     try{
 
-        // if no Client Id specified, create one
-        if( clientIdLocal == "" ) {
-            clientIdLocal = UUID::randomUUID().toString();
-        }
-
         // Try to convert the String URL into a valid URI
         URI uri( url );
 
-        // Store login data in the properties
-        properties->setProperty(
-            ActiveMQConstants::toString(
-                ActiveMQConstants::PARAM_USERNAME ), username );
-        properties->setProperty(
-            ActiveMQConstants::toString(
-                ActiveMQConstants::PARAM_PASSWORD ), password );
-        properties->setProperty(
-            ActiveMQConstants::toString(
-                ActiveMQConstants::PARAM_CLIENTID ), clientIdLocal );
+        // Update configuration with new authentication info if any was provided.
+        this->settings->updateConfiguration( uri );
 
-        // Parse out properties so they can be passed to the Connectors.
-        activemq::util::URISupport::parseQuery( uri.getQuery(), properties.get() );
+        // Store login data in the properties
+        if( !username.empty() ) {
+            this->settings->username = username;
+        }
+        if( !password.empty() ) {
+            this->settings->password = password;
+        }
+        if( !clientId.empty() ) {
+            this->settings->clientId = clientId;
+        }
 
         // Use the TransportBuilder to get our Transport
         transport =
@@ -162,9 +245,21 @@ cms::Connection* ActiveMQConnectionFacto
                 "failed creating new Transport" );
         }
 
+        Pointer<Properties> properties( this->settings->properties->clone() );
+
         // Create and Return the new connection object.
         connection.reset( new ActiveMQConnection( transport, properties ) );
 
+        // Set all options parsed from the URI.
+        configureConnection( connection.get() );
+
+        // Now start the connection since all other configuration is done.
+        transport->start();
+
+        if( !this->settings->clientId.empty() ) {
+            connection->setDefaultClientId( this->settings->clientId );
+        }
+
         return connection.release();
     }
     catch( cms::CMSException& ex ){
@@ -198,6 +293,26 @@ cms::Connection* ActiveMQConnectionFacto
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::configureConnection( ActiveMQConnection* connection ) {
+
+    connection->setUsername( this->settings->username );
+    connection->setPassword( this->settings->password );
+    connection->setDispatchAsync( this->settings->dispatchAsync );
+    connection->setAlwaysSyncSend( this->settings->alwaysSyncSend );
+    connection->setUseAsyncSend( this->settings->useAsyncSend );
+    connection->setUseCompression( this->settings->useCompression );
+    connection->setSendTimeout( this->settings->sendTimeout );
+    connection->setCloseTimeout( this->settings->closeTimeout );
+    connection->setProducerWindowSize( this->settings->producerWindowSize );
+    connection->setPrefetchPolicy( this->settings->defaultPrefetchPolicy->clone() );
+    connection->setRedeliveryPolicy( this->settings->defaultRedeliveryPolicy->clone() );
+
+    if( this->settings->defaultListener ) {
+        connection->setExceptionListener( this->settings->defaultListener );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnectionFactory::setUsername( const std::string& username ) {
     settings->username = username;
 }
@@ -218,6 +333,16 @@ const std::string& ActiveMQConnectionFac
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+std::string ActiveMQConnectionFactory::getClientId() const {
+    return this->settings->clientId;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setClientId( const std::string& clientId ) {
+    this->settings->clientId = clientId;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnectionFactory::setBrokerURL( const std::string& brokerURL ){
     settings->brokerURL = brokerURL;
 }
@@ -236,3 +361,93 @@ void ActiveMQConnectionFactory::setExcep
 cms::ExceptionListener* ActiveMQConnectionFactory::getExceptionListener() const {
     return this->settings->defaultListener;
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setPrefetchPolicy( PrefetchPolicy* policy ) {
+    this->settings->defaultPrefetchPolicy.reset( policy );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+PrefetchPolicy* ActiveMQConnectionFactory::getPrefetchPolicy() const {
+    return this->settings->defaultPrefetchPolicy.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setRedeliveryPolicy( RedeliveryPolicy* policy ) {
+    this->settings->defaultRedeliveryPolicy.reset( policy );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+RedeliveryPolicy* ActiveMQConnectionFactory::getRedeliveryPolicy() const {
+    return this->settings->defaultRedeliveryPolicy.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isDispatchAsync() const {
+    return this->settings->dispatchAsync;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setDispatchAsync( bool value ) {
+    this->settings->dispatchAsync = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isAlwaysSyncSend() const {
+    return this->settings->alwaysSyncSend;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setAlwaysSyncSend( bool value ) {
+    this->settings->alwaysSyncSend = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isUseAsyncSend() const {
+    return this->settings->useAsyncSend;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setUseAsyncSend( bool value ) {
+    this->settings->useAsyncSend = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isUseCompression() const {
+    return this->settings->useCompression;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setUseCompression( bool value ) {
+    this->settings->useCompression = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int ActiveMQConnectionFactory::getSendTimeout() const {
+    return this->settings->sendTimeout;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setSendTimeout( unsigned int timeout ) {
+    this->settings->sendTimeout = timeout;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int ActiveMQConnectionFactory::getCloseTimeout() const {
+    return this->settings->closeTimeout;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setCloseTimeout( unsigned int timeout ) {
+    this->settings->closeTimeout = timeout;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int ActiveMQConnectionFactory::getProducerWindowSize() const {
+    return this->settings->producerWindowSize;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setProducerWindowSize( unsigned int windowSize ) {
+    this->settings->producerWindowSize = windowSize;
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h?rev=951033&r1=951032&r2=951033&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h Thu Jun  3 15:27:50 2010
@@ -25,9 +25,17 @@
 namespace activemq{
 namespace core{
 
+    class ActiveMQConnection;
     class FactorySettings;
+    class PrefetchPolicy;
+    class RedeliveryPolicy;
 
     class AMQCPP_API ActiveMQConnectionFactory : public cms::ConnectionFactory {
+    public:
+
+        // Default Broker URI if none specified
+        static const std::string DEFAULT_URI;
+
     private:
 
         // d-Pointer holding pre-configured factory settings
@@ -97,55 +105,69 @@ namespace core{
                                                    const std::string& clientId )
             throw ( cms::CMSException );
 
+    public:   // Configuration Options
+
         /**
          * Sets the username that should be used when creating a new connection
          * @param username string
          */
-        virtual void setUsername( const std::string& username );
+        void setUsername( const std::string& username );
 
         /**
          * Gets the username that this factory will use when creating a new
          * connection instance.
          * @return username string, "" for default credentials
          */
-        virtual const std::string& getUsername() const;
+        const std::string& getUsername() const;
 
         /**
          * Sets the password that should be used when creating a new connection
          * @param password string
          */
-        virtual void setPassword( const std::string& password );
+        void setPassword( const std::string& password );
 
         /**
          * Gets the password that this factory will use when creating a new
          * connection instance.
          * @return password string, "" for default credentials
          */
-        virtual const std::string& getPassword() const;
+        const std::string& getPassword() const;
+
+        /**
+         * Gets the Configured Client Id.
+         * @return the clientId.
+         */
+        std::string getClientId() const;
+
+        /**
+         * Sets the Client Id.
+         * @param clientId - The new clientId value.
+         */
+        void setClientId( const std::string& clientId );
 
         /**
          * Sets the Broker URL that should be used when creating a new
          * connection instance
          * @param brokerURL string
          */
-        virtual void setBrokerURL( const std::string& brokerURL );
+        void setBrokerURL( const std::string& brokerURL );
 
         /**
          * Gets the Broker URL that this factory will use when creating a new
          * connection instance.
          * @return brokerURL string
          */
-        virtual const std::string& getBrokerURL() const;
+        const std::string& getBrokerURL() const;
 
         /**
          * Set an CMS ExceptionListener that will be set on eat connection once it has been
-         * created.  The factory des not take ownership of this pointer, the client must ensure
+         * created.  The factory does not take ownership of this pointer, the client must ensure
          * that its lifetime is scoped to the connection that it is applied to.
          *
          * @param listener
          * 		The listener to set on the connection or NULL for no listener.
          */
-        virtual void setExceptionListener( cms::ExceptionListener* listener );
+        void setExceptionListener( cms::ExceptionListener* listener );
 
         /**
          * Returns the currently set ExceptionListener that will be set on any new Connection
@@ -153,7 +175,138 @@ namespace core{
          *
          * @return a pointer to a CMS ExceptionListener instance or NULL if not set.
          */
-        virtual cms::ExceptionListener* getExceptionListener() const;
+        cms::ExceptionListener* getExceptionListener() const;
+
+        /**
+         * Sets the PrefetchPolicy instance that this factory should use when it creates
+         * new Connection instances.  The PrefetchPolicy passed becomes the property of the
+         * factory and will be deleted when the factory is destroyed.
+         *
+         * @param policy
+         *      The new PrefetchPolicy that the ConnectionFactory should clone for Connections.
+         */
+        void setPrefetchPolicy( PrefetchPolicy* policy );
+
+        /**
+         * Gets the pointer to the current PrefetchPolicy that is in use by this ConnectionFactory.
+         *
+         * @returns a pointer to this objects PrefetchPolicy.
+         */
+        PrefetchPolicy* getPrefetchPolicy() const;
+
+        /**
+         * Sets the RedeliveryPolicy instance that this factory should use when it creates
+         * new Connection instances.  The RedeliveryPolicy passed becomes the property of the
+         * factory and will be deleted when the factory is destroyed.
+         *
+         * @param policy
+         *      The new RedeliveryPolicy that the ConnectionFactory should clone for Connections.
+         */
+        void setRedeliveryPolicy( RedeliveryPolicy* policy );
+
+        /**
+         * Gets the pointer to the current RedeliveryPolicy that is in use by this ConnectionFactory.
+         *
+         * @returns a pointer to this objects RedeliveryPolicy.
+         */
+        RedeliveryPolicy* getRedeliveryPolicy() const;
+
+        /**
+         * @return The value of the dispatch asynchronously option sent to the broker.
+         */
+        bool isDispatchAsync() const;
+
+        /**
+         * Should messages be dispatched synchronously or asynchronously from the producer
+         * thread for non-durable topics in the broker? For fast consumers set this to false.
+         * For slow consumers set it to true so that dispatching will not block fast consumers. .
+         *
+         * @param value
+         *        The value of the dispatch asynchronously option sent to the broker.
+         */
+        void setDispatchAsync( bool value );
+
+        /**
+         * Gets if the Connection should always send things Synchronously.
+         *
+         * @return true if sends should always be Synchronous.
+         */
+        bool isAlwaysSyncSend() const;
+
+        /**
+         * Sets if the Connection should always send things Synchronously.
+         * @param value
+         *        true if sends should always be Synchronous.
+         */
+        void setAlwaysSyncSend( bool value );
+
+        /**
+         * Gets if the useAsyncSend option is set
+         * @returns true if on false if not.
+         */
+        bool isUseAsyncSend() const;
+
+        /**
+         * Sets the useAsyncSend option
+         * @param value - true to activate, false to disable.
+         */
+        void setUseAsyncSend( bool value );
+
+        /**
+         * Gets if the Connection is configured for Message body compression.
+         * @returns if the Message body will be Compressed or not.
+         */
+        bool isUseCompression() const;
+
+        /**
+         * Sets whether Message body compression is enabled.
+         *
+         * @param value
+         *      Boolean indicating if Message body compression is enabled.
+         */
+        void setUseCompression( bool value );
+
+        /**
+         * Gets the assigned send timeout for this Connector
+         * @return the send timeout configured in the connection uri
+         */
+        unsigned int getSendTimeout() const;
+
+        /**
+         * Sets the send timeout to use when sending Message objects, this will
+         * cause all messages to be sent using a Synchronous request is non-zero.
+         * @param timeout - The time to wait for a response.
+         */
+        void setSendTimeout( unsigned int timeout );
+
+        /**
+         * Gets the assigned close timeout for this Connector
+         * @return the close timeout configured in the connection uri
+         */
+        unsigned int getCloseTimeout() const;
+
+        /**
+         * Sets the close timeout to use when sending the disconnect request.
+         * @param timeout - The time to wait for a close message.
+         */
+        void setCloseTimeout( unsigned int timeout );
+
+        /**
+         * Gets the configured producer window size for Producers that are created
+         * from this connector.  This only applies if there is no send timeout and the
+         * producer is able to send asynchronously.
+         * @return size in bytes of messages that this producer can produce before
+         *         it must block and wait for ProducerAck messages to free resources.
+         */
+        unsigned int getProducerWindowSize() const;
+
+        /**
+         * Sets the size in Bytes of messages that a producer can send before it is blocked
+         * to await a ProducerAck from the broker that frees enough memory to allow another
+         * message to be sent.
+         * @param windowSize - The size in bytes of the Producers memory window.
+         */
+        void setProducerWindowSize( unsigned int windowSize );
 
     public:
 
@@ -173,14 +326,16 @@ namespace core{
                                                   const std::string& clientId = "" )
             throw ( cms::CMSException );
 
-    public:
+    private:
 
-        virtual cms::Connection* doCreateConnection( const std::string& url,
-                                                     const std::string& username,
-                                                     const std::string& password,
-                                                     const std::string& clientId )
+        cms::Connection* doCreateConnection( const std::string& url,
+                                             const std::string& username,
+                                             const std::string& password,
+                                             const std::string& clientId )
             throw ( cms::CMSException );
 
+        void configureConnection( ActiveMQConnection* connection );
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConstants.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConstants.cpp?rev=951033&r1=951032&r2=951033&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConstants.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConstants.cpp Thu Jun  3 15:27:50 2010
@@ -51,6 +51,7 @@ ActiveMQConstants::StaticInitializer::St
     uriParams[CONNECTION_ALWAYSSYNCSEND] = "connection.alwaysSyncSend";
     uriParams[CONNECTION_USEASYNCSEND] = "connection.useAsyncSend";
     uriParams[CONNECTION_USECOMPRESSION] = "connection.useCompression";
+    uriParams[CONNECTION_DISPATCHASYNC] = "connection.dispatchAsync";
     uriParams[PARAM_USERNAME] = "username";
     uriParams[PARAM_PASSWORD] = "password";
     uriParams[PARAM_CLIENTID] = "client-id";

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h?rev=951033&r1=951032&r2=951033&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h Thu Jun  3 15:27:50 2010
@@ -89,6 +89,7 @@ namespace core{
             CONNECTION_ALWAYSSYNCSEND,
             CONNECTION_USEASYNCSEND,
             CONNECTION_USECOMPRESSION,
+            CONNECTION_DISPATCHASYNC,
             PARAM_USERNAME,
             PARAM_PASSWORD,
             PARAM_CLIENTID,

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/policies/DefaultPrefetchPolicy.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/policies/DefaultPrefetchPolicy.cpp?rev=951033&r1=951032&r2=951033&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/policies/DefaultPrefetchPolicy.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/policies/DefaultPrefetchPolicy.cpp Thu Jun  3 15:27:50 2010
@@ -33,7 +33,12 @@ int DefaultPrefetchPolicy::DEFAULT_QUEUE
 int DefaultPrefetchPolicy::DEFAULT_TOPIC_PREFETCH = MAX_PREFETCH_SIZE;
 
 ////////////////////////////////////////////////////////////////////////////////
-DefaultPrefetchPolicy::DefaultPrefetchPolicy() {
+DefaultPrefetchPolicy::DefaultPrefetchPolicy() :
+    durableTopicPrefetch( DEFAULT_DURABLE_TOPIC_PREFETCH ),
+    queuePrefetch( DEFAULT_QUEUE_PREFETCH ),
+    queueBrowserPrefetch( DEFAULT_QUEUE_BROWSER_PREFETCH ),
+    topicPrefetch( DEFAULT_TOPIC_PREFETCH ) {
+
 }
 
 ////////////////////////////////////////////////////////////////////////////////

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/IdGenerator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/IdGenerator.cpp?rev=951033&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/IdGenerator.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/IdGenerator.cpp Thu Jun  3 15:27:50 2010
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "IdGenerator.h"
+
+#include <decaf/lang/Long.h>
+#include <decaf/lang/System.h>
+#include <decaf/lang/Thread.h>
+#include <decaf/net/InetAddress.h>
+#include <decaf/net/ServerSocket.h>
+
+#include <apr_strings.h>
+
+using namespace activemq;
+using namespace activemq::util;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::net;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+IdGenerator::StaticInit IdGenerator::statics;
+
+////////////////////////////////////////////////////////////////////////////////
+IdGenerator::StaticInit::StaticInit() : UNIQUE_STUB(), instanceCount(0), hostname() {
+
+    std::string stub = "";
+
+    try {
+        hostname = InetAddress::getLocalHost().getHostName();
+        ServerSocket ss( 0 );
+        stub = "-" + Long::toString( ss.getLocalPort() ) + "-" +
+                     Long::toString( System::currentTimeMillis() ) + "-";
+        Thread::sleep( 100 );
+        ss.close();
+    } catch( Exception& ioe ) {
+        hostname = "localhost";
+        stub = "-1-" + Long::toString( System::currentTimeMillis() ) + "-";
+    }
+
+    UNIQUE_STUB = stub;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+IdGenerator::IdGenerator() : seed(), sequence(0) {
+    synchronized( &statics.mutex ) {
+        this->seed = std::string( "ID:" ) + statics.hostname +
+                     statics.UNIQUE_STUB + Long::toString( statics.instanceCount++ ) + ":";
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+IdGenerator::IdGenerator( const std::string& prefix ) : seed(), sequence(0) {
+    synchronized( &statics.mutex ) {
+        this->seed = prefix + statics.UNIQUE_STUB + Long::toString( statics.instanceCount++ ) + ":";
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+IdGenerator::~IdGenerator() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string IdGenerator::generateId() const {
+
+    std::string result;
+
+    synchronized( &statics.mutex ) {
+        result = this->seed + Long::toString( this->sequence++ );
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string IdGenerator::getSeedFromId( const std::string& id ) {
+
+    std::string result = id;
+
+    if( !id.empty() ) {
+        std::size_t index = id.find_last_of( ':' );
+        if( index != std::string::npos && ( index + 1 ) < id.length() ) {
+            result = id.substr( 0, index + 1 );
+        }
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long IdGenerator::getSequenceFromId( const std::string& id ) {
+
+    long long result = -1;
+
+    if( !id.empty() ) {
+        std::size_t index = id.find_last_of( ':' );
+        if( index != std::string::npos && ( index + 1 ) < id.length() ) {
+            std::string numStr = id.substr( index + 1, id.length() );
+            result = Long::parseLong( numStr );
+        }
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int IdGenerator::compare( const std::string& id1, const std::string& id2 ) {
+
+    int result = -1;
+    std::string seed1 = IdGenerator::getSeedFromId( id1 );
+    std::string seed2 = IdGenerator::getSeedFromId( id2 );
+
+    if( !seed1.empty() && !seed2.empty() ) {
+
+        result = apr_strnatcmp( seed1.c_str(), seed2.c_str() );
+
+        if( result == 0 ) {
+            long long count1 = IdGenerator::getSequenceFromId( id1 );
+            long long count2 = IdGenerator::getSequenceFromId( id2 );
+            result = (int)( count1 - count2 );
+        }
+    }
+
+    return result;
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/IdGenerator.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/IdGenerator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/IdGenerator.h?rev=951033&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/IdGenerator.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/IdGenerator.h Thu Jun  3 15:27:50 2010
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ATIVEMQ_UTIL_IDGENERATOR_H_
+#define _ATIVEMQ_UTIL_IDGENERATOR_H_
+
+#include <activemq/util/Config.h>
+
+#include <decaf/util/concurrent/Mutex.h>
+#include <string>
+
+namespace activemq {
+namespace util {
+
+    class AMQCPP_API IdGenerator {
+    private:
+
+        class StaticInit {
+        public:
+
+            std::string UNIQUE_STUB;
+            int instanceCount;
+            std::string hostname;
+            mutable decaf::util::concurrent::Mutex mutex;
+
+            StaticInit();
+        };
+
+        static StaticInit statics;
+
+        std::string seed;
+        mutable long long sequence;
+
+    public:
+
+        IdGenerator();
+
+        IdGenerator( const std::string& prefix );
+
+        virtual ~IdGenerator();
+
+    public:
+
+        /**
+         * @returns a newly generated unique id.
+         */
+        std::string generateId() const;
+
+    public:
+
+        /**
+         * Since the initialization of this object results in the retrieval of the
+         * machine's host name we can quickly return it here.
+         *
+         * @return the previously retrieved host name.
+         */
+        static std::string getHostname() {
+            return statics.hostname;
+        }
+
+        /**
+         * Gets the seed value from a Generated Id, the count portion is removed.
+         *
+         * @returns the seed portion of the Id, minus the count value.
+         */
+        static std::string getSeedFromId( const std::string& id );
+
+        /**
+         * Gets the count value from a Generated Id, the seed portion is removed.
+         *
+         * @returns the sequence count portion of the id, minus the seed value.
+         */
+        static long long getSequenceFromId( const std::string& id );
+
+        /**
+         * Compares two generated id values.
+         *
+         * @param id1
+         *      The first id to compare, or left hand side.
+         * @param id2
+         *      The second id to compare, or right hand side.
+         *
+         * @returns zero if ids are equal or positove if id1 > id2...
+         */
+        static int compare( const std::string& id1, const std::string& id2 );
+
+    };
+
+}}
+
+#endif /* _ATIVEMQ_UTIL_IDGENERATOR_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/IdGenerator.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h?rev=951033&r1=951032&r2=951033&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h Thu Jun  3 15:27:50 2010
@@ -115,13 +115,36 @@ namespace cms{
             throw ( CMSException ) = 0;
 
         /**
-         * Get the Client Id for this session
+         * Get the Client Id for this session, the client Id is provider specific and is either
+         * assigned by the connection factory or set using the setClientID method.
          *
-         * @return Client Id String
+         * @return Client Id String for this Connection.
+         *
+         * @throws CMSException if the provider fails to return the client id or an internal error occurs.
          */
         virtual std::string getClientID() const = 0;
 
         /**
+         * Sets the client identifier for this connection.
+         *
+         * The preferred way to assign a CMS client's client identifier is for it to be configured in a
+         * client-specific ConnectionFactory object and transparently assigned to the Connection object
+         * it creates.
+         *
+         * If a client sets the client identifier explicitly, it must do so immediately after it creates
+         * the connection and before any other action on the connection is taken. After this point,
+         * setting the client identifier is a programming error that should throw an IllegalStateException.
+         *
+         * @param clientID
+         *      The unique client identifier to assign to the Connection.
+         *
+         * @throws CMSException if the provider fails to set the client id due to some internal error.
+         * @throws InvalidClientIDException if the id given is somehow invalid or is a duplicate.
+         * @throws IllegalStateException if the client tries to set the id after a Connection method has been called.
+         */
+        virtual void setClientID( const std::string& clientID ) = 0;
+
+        /**
          * Gets the registered Exception Listener for this connection
          *
          * @return pointer to an exception listener or NULL

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/InetAddress.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/InetAddress.cpp?rev=951033&r1=951032&r2=951033&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/InetAddress.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/InetAddress.cpp Thu Jun  3 15:27:50 2010
@@ -23,10 +23,14 @@
 #include <decaf/net/Inet6Address.h>
 #include <decaf/net/UnknownHostException.h>
 
+#include <apr_network_io.h>
+#include <decaf/internal/AprPool.h>
+
 using namespace decaf;
 using namespace decaf::net;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
+using namespace decaf::internal;
 
 ////////////////////////////////////////////////////////////////////////////////
 const unsigned char InetAddress::loopbackBytes[4] = { 127, 0, 0, 1 };
@@ -101,7 +105,7 @@ std::string InetAddress::getHostAddress(
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-std::string InetAddress::getHostname() const {
+std::string InetAddress::getHostName() const {
 
     if( !this->hostname.empty() ) {
         return this->hostname;
@@ -112,7 +116,7 @@ std::string InetAddress::getHostname() c
 
 ////////////////////////////////////////////////////////////////////////////////
 std::string InetAddress::toString() const {
-    return getHostname() + " / " + getHostAddress();
+    return getHostName() + " / " + getHostAddress();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -143,7 +147,29 @@ InetAddress InetAddress::getByAddress( c
 
 ////////////////////////////////////////////////////////////////////////////////
 InetAddress InetAddress::getLocalHost() {
-    return InetAddress::LOOPBACK;
+
+    char hostname[APRMAXHOSTLEN + 1] = {0};
+
+    AprPool pool;
+    apr_status_t result = apr_gethostname( hostname, APRMAXHOSTLEN+1, pool.getAprPool() );
+
+    if( result != APR_SUCCESS ) {
+        return InetAddress::LOOPBACK;
+    }
+
+    apr_sockaddr_t* address = NULL;
+    result = apr_sockaddr_info_get( &address, hostname, APR_UNSPEC, 0, APR_IPV4_ADDR_OK, pool.getAprPool() );
+
+    if( result != APR_SUCCESS ) {
+        throw UnknownHostException(
+            __FILE__, __LINE__, "Could not resolve the IP Address of this host." );
+    }
+
+    if( address->family == APR_INET6 ) {
+        return Inet6Address( hostname, (const unsigned char*)address->ipaddr_ptr, address->ipaddr_len );
+    } else {
+        return Inet4Address( hostname, (const unsigned char*)address->ipaddr_ptr, address->ipaddr_len );
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////