You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2007/04/12 22:29:43 UTC

svn commit: r528222 [3/5] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: examples/ main/ main/activemq/connector/openwire/ main/activemq/connector/openwire/commands/ main/activemq/connector/stomp/ main/activemq/connector/stomp/commands/ main/activ...

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h Thu Apr 12 13:29:39 2007
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
+
 #ifndef ACTIVEMQ_TRANSPORT_TRANSPORTFILTER_H_
 #define ACTIVEMQ_TRANSPORT_TRANSPORTFILTER_H_
 
@@ -25,43 +25,43 @@
 
 namespace activemq{
 namespace transport{
-  
+
     /**
      * A filter on the transport layer.  Transport
-     * filters implement the Transport interface and 
+     * filters implement the Transport interface and
      * optionally delegate calls to another Transport object.
      */
-    class TransportFilter 
-    : 
+    class TransportFilter
+    :
         public Transport,
         public CommandListener,
         public TransportExceptionListener
     {
-    protected:        
-        
+    protected:
+
         /**
          * The transport that this filter wraps around.
          */
         Transport* next;
-        
+
         /**
          * Flag to indicate whether this object controls
          * the lifetime of the next transport object.
          */
         bool own;
-        
+
         /**
          * Listener to incoming commands.
          */
         CommandListener* commandlistener;
-        
+
         /**
          * Listener of exceptions from this transport.
          */
         TransportExceptionListener* exceptionListener;
-        
+
     protected:
-    
+
         /**
          * Notify the excpetion listener
          * @param ex - the exception to send to listeners
@@ -69,55 +69,37 @@
         void fire( const exceptions::ActiveMQException& ex ){
 
             if( exceptionListener != NULL ){
-                
+
                 try{
                     exceptionListener->onTransportException( this, ex );
                 }catch( ... ){}
-            }            
+            }
         }
-        
+
         /**
          * Notify the command listener.
          * @param command - the command to send to the listener
          */
         void fire( Command* command ){
-            
+
             try{
                 if( commandlistener != NULL ){
                     commandlistener->onCommand( command );
                 }
             }catch( ... ){}
         }
-        
+
     public:
-  
+
         /**
          * Constructor.
          * @param next - the next Transport in the chain
          * @param own - true if this filter owns the next and should delete it
          */
-        TransportFilter( Transport* next, const bool own = true ){
-            
-            this->next = next;
-            this->own = own;
-            
-            commandlistener = NULL;
-            exceptionListener = NULL;
-                                    
-            // Observe the nested transport for events.
-            next->setCommandListener( this );
-            next->setTransportExceptionListener( this );
-        }
-        
-        virtual ~TransportFilter(){
-            
-            if( own ){
-                delete next;
-                next = NULL;
-            }
-            
-        }
-        
+        TransportFilter( Transport* next, const bool own = true );
+
+        virtual ~TransportFilter();
+
         /**
          * Event handler for the receipt of a command.
          * @param command - the received command object.
@@ -125,14 +107,14 @@
         virtual void onCommand( Command* command ){
             fire( command );
         }
-        
+
         /**
          * Event handler for an exception from a command transport.
          * @param source The source of the exception
          * @param ex The exception.
          */
         virtual void onTransportException( Transport* source, const exceptions::ActiveMQException& ex );
-        
+
         /**
          * Sends a one-way command.  Does not wait for any response from the
          * broker.
@@ -145,7 +127,7 @@
         virtual void oneway( Command* command ) throw(CommandIOException, exceptions::UnsupportedOperationException){
             next->oneway( command );
         }
-        
+
         /**
          * Not supported by this class - throws an exception.
          * @param command the command that is sent as a request
@@ -154,7 +136,7 @@
         virtual Response* request( Command* command ) throw(CommandIOException, exceptions::UnsupportedOperationException){
             return next->request( command );
         }
-        
+
         /**
          * Assigns the command listener for non-response commands.
          * @param listener the listener.
@@ -162,7 +144,7 @@
         virtual void setCommandListener( CommandListener* listener ){
             this->commandlistener = listener;
         }
-        
+
         /**
          * Sets the command reader.
          * @param reader the object that will be used for reading command objects.
@@ -170,7 +152,7 @@
         virtual void setCommandReader( CommandReader* reader ){
             next->setCommandReader( reader );
         }
-        
+
         /**
          * Sets the command writer.
          * @param writer the object that will be used for writing command objects.
@@ -178,7 +160,7 @@
         virtual void setCommandWriter( CommandWriter* writer ){
             next->setCommandWriter( writer );
         }
-      
+
         /**
          * Sets the observer of asynchronous exceptions from this transport.
          * @param listener the listener of transport exceptions.
@@ -186,7 +168,7 @@
         virtual void setTransportExceptionListener( TransportExceptionListener* listener ){
             this->exceptionListener = listener;
         }
-        
+
         /**
          * Starts this transport object and creates the thread for
          * polling on the input stream for commands.  If this object
@@ -197,21 +179,21 @@
          * has already been closed.
          */
         virtual void start() throw( cms::CMSException ){
-            
+
             if( commandlistener == NULL ){
                 throw exceptions::ActiveMQException( __FILE__, __LINE__,
                     "commandListener is invalid" );
             }
-            
+
             if( exceptionListener == NULL ){
                 throw exceptions::ActiveMQException( __FILE__, __LINE__,
                     "exceptionListener is invalid" );
             }
-            
+
             // Start the delegate transport object.
             next->start();
         }
-        
+
         /**
          * Stops the polling thread and closes the streams.  This can
          * be called explicitly, but is also called in the destructor. Once
@@ -219,12 +201,12 @@
          * @throws CMSException if errors occur.
          */
         virtual void close() throw( cms::CMSException ){
-            
+
             next->close();
         }
-        
+
     };
-    
+
 }}
 
 #endif /*ACTIVEMQ_TRANSPORT_TRANSPORTFILTER_H_*/

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransport.cpp?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransport.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransport.cpp Thu Apr 12 13:29:39 2007
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AsyncSendTransport.h"
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::transport::filters;
+using namespace activemq::util;
+using namespace activemq::exceptions;
+using namespace activemq::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+AsyncSendTransport::AsyncSendTransport( Transport* next, bool own )
+ :  TransportFilter( next, own )
+{
+    this->closed = true;
+    this->asyncThread = NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+AsyncSendTransport::~AsyncSendTransport()
+{
+    try {
+        close();
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AsyncSendTransport::oneway( Command* command )
+    throw( CommandIOException, exceptions::UnsupportedOperationException ) {
+
+    try{
+
+        // Put it in the send queue, thread will dispatch it.  We clone it
+        // in case the client deletes their copy before we get a chance to
+        // send it.
+        synchronized( &msgQueue ) {
+            msgQueue.push( command->cloneCommand() );
+            msgQueue.notifyAll();
+        }
+    }
+    AMQ_CATCH_RETHROW( CommandIOException )
+    AMQ_CATCH_RETHROW( UnsupportedOperationException )
+    AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AsyncSendTransport::start() throw( cms::CMSException ) {
+
+    try {
+
+        this->closed = false;
+
+        // Kill the thread
+        this->startThread();
+
+        next->start();
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AsyncSendTransport::close() throw( cms::CMSException ) {
+
+    try {
+
+        this->closed = true;
+
+        // Kill the thread
+        this->stopThread();
+
+        next->close();
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AsyncSendTransport::run()
+{
+    try{
+
+        while( !closed )
+        {
+            Command* command = NULL;
+
+            synchronized( &msgQueue )
+            {
+                // Gaurd against spurious wakeup or race to sync lock
+                // also if the listner has been unregistered we don't
+                // have anyone to notify, so we wait till a new one is
+                // registered, and then we will deliver the backlog
+                while( msgQueue.empty() )
+                {
+                    if( closed )
+                    {
+                        break;
+                    }
+                    msgQueue.wait();
+                }
+
+                // don't want to process messages if we are shutting down.
+                if( closed )
+                {
+                    return;
+                }
+
+                // get the data
+                command = msgQueue.pop();
+            }
+
+            // Dispatch the message
+            next->oneway( command );
+
+            // Destroy Our copy of the message
+            delete command;
+        }
+    }
+    catch(...)
+    {
+        this->fire( ActiveMQException(
+            __FILE__, __LINE__,
+            "AsyncSendTransport::run - "
+            "Connector threw an unknown Exception, recovering..." ) );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AsyncSendTransport::startThread() throw ( ActiveMQException ) {
+
+    try
+    {
+        // Start the thread, if it's not already started.
+        if( asyncThread == NULL )
+        {
+            asyncThread = new Thread( this );
+            asyncThread->start();
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AsyncSendTransport::stopThread() throw ( ActiveMQException ) {
+
+    try
+    {
+        // if the thread is running signal it to quit and then
+        // wait for run to return so thread can die
+        if( asyncThread != NULL )
+        {
+            synchronized( &msgQueue )
+            {
+                // Force a wakeup if run is in a wait.
+                msgQueue.notifyAll();
+            }
+
+            // Wait for it to die and then delete it.
+            asyncThread->join();
+            delete asyncThread;
+            asyncThread = NULL;
+        }
+
+        // Clean all the messages up
+        purgeMessages();
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AsyncSendTransport::purgeMessages() throw ( ActiveMQException )
+{
+    try
+    {
+        synchronized( &msgQueue )
+        {
+            while( !msgQueue.empty() )
+            {
+                // destroy these messages if this is not a transacted
+                // session, if it is then the tranasction will clean
+                // the messages up.
+                delete msgQueue.pop();
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransport.h?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransport.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransport.h Thu Apr 12 13:29:39 2007
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_FILTERS_ASYNCSENDTRANSPORT_H_
+#define _ACTIVEMQ_TRANSPORT_FILTERS_ASYNCSENDTRANSPORT_H_
+
+#include <activemq/transport/TransportFilter.h>
+#include <activemq/util/Queue.h>
+#include <activemq/concurrent/Runnable.h>
+
+namespace activemq{
+namespace transport{
+namespace filters{
+
+    class AsyncSendTransport : public TransportFilter,
+                               public concurrent::Runnable {
+    private:
+
+        /**
+         * Thread to send messages in when oneway is called.
+         */
+        concurrent::Thread* asyncThread;
+
+        /**
+         * Outgoing Message Queue
+         */
+        util::Queue<Command*> msgQueue;
+
+        /**
+         * boolean indicating that this transport is closed
+         */
+        bool closed;
+
+    public:
+
+        /**
+         * Constructor.
+         * @param next - the next Transport in the chain
+         * @param own - true if this filter owns the next and should delete it
+         */
+        AsyncSendTransport( Transport* next, bool own = true );
+
+        virtual ~AsyncSendTransport();
+
+        /**
+         * Sends a one-way command.  Does not wait for any response from the
+         * broker.
+         * @param command the command to be sent.
+         * @throws CommandIOException if an exception occurs during writing of
+         * the command.
+         * @throws UnsupportedOperationException if this method is not implemented
+         * by this transport.
+         */
+        virtual void oneway( Command* command )
+            throw( CommandIOException, exceptions::UnsupportedOperationException );
+
+        /**
+         * Starts this transport object and creates the thread for
+         * polling on the input stream for commands.  If this object
+         * has been closed, throws an exception.  Before calling start,
+         * the caller must set the IO streams and the reader and writer
+         * objects.
+         * @throws CMSException if an error occurs or if this transport
+         * has already been closed.
+         */
+        virtual void start() throw( cms::CMSException );
+
+        /**
+         * Stops the polling thread and closes the streams.  This can
+         * be called explicitly, but is also called in the destructor. Once
+         * this object has been closed, it cannot be restarted.
+         * @throws CMSException if errors occur.
+         */
+        virtual void close() throw( cms::CMSException );
+
+    protected:
+
+        /**
+         * Run method that is called from the Thread class when this object
+         * is registered with a Thread and started.  This function reads from
+         * the outgoing message queue and dispatches calls to the connector that
+         * is registered with this class.
+         */
+        virtual void run();
+
+        /**
+         * Starts the message processing thread to receive messages
+         * asynchronously.  This thread is started when setMessageListener
+         * is invoked, which means that the caller is choosing to use this
+         * consumer asynchronously instead of synchronously (receive).
+         */
+        void startThread() throw ( exceptions::ActiveMQException );
+
+        /**
+         * Stops the asynchronous message processing thread if it's started.
+         */
+        void stopThread() throw ( exceptions::ActiveMQException );
+
+        /**
+         * Purges all messages currently in the queue.  This can be as a
+         * result of a rollback, or of the consumer being shutdown.
+         */
+        virtual void purgeMessages() throw ( exceptions::ActiveMQException );
+
+    };
+
+}}}
+
+#endif /*_ACTIVEMQ_TRANSPORT_FILTERS_ASYNCSENDTRANSPORT_H_*/

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransportFactory.cpp?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransportFactory.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransportFactory.cpp Thu Apr 12 13:29:39 2007
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AsyncSendTransportFactory.h"
+
+#include <activemq/transport/filters/AsyncSendTransport.h>
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::transport::filters;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+TransportFactory& AsyncSendTransportFactory::getInstance(void)
+{
+    // Create the one and only instance of the registrar
+    static TransportFactoryMapRegistrar registrar(
+        "transport.filters.AsyncSendTransport",
+        new AsyncSendTransportFactory() );
+
+    return registrar.getFactory();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Transport* AsyncSendTransportFactory::createTransport(
+    const activemq::util::Properties& properties AMQCPP_UNUSED,
+    Transport* next,
+    bool own ) throw ( ActiveMQException ) {
+
+    try{
+        return new AsyncSendTransport( next, own );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransportFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransportFactory.h?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransportFactory.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransportFactory.h Thu Apr 12 13:29:39 2007
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_FILTERS_ASYNCSENDTRANSPORTFACTORY_H_
+#define _ACTIVEMQ_TRANSPORT_FILTERS_ASYNCSENDTRANSPORTFACTORY_H_
+
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/transport/TransportFactoryMapRegistrar.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace transport{
+namespace filters{
+
+    /**
+     * Factory Responsible for creating the AsyncSendTransport.
+     */
+    class AsyncSendTransportFactory : public TransportFactory {
+
+    public:
+
+        virtual ~AsyncSendTransportFactory() {}
+
+        /**
+         * Creates a Transport instance.
+         * @param properties - Object that will hold transport config values
+         * @param next - the next transport in the chain, or NULL
+         * @param own - does the new Transport own the next
+         * @throws ActiveMQException if an error occurs.
+         */
+        virtual Transport* createTransport(
+            const activemq::util::Properties& properties,
+            Transport* next,
+            bool own ) throw ( exceptions::ActiveMQException );
+
+        /**
+         * Returns a reference to this TransportFactory
+         * @returns TransportFactory Reference
+         */
+        static TransportFactory& getInstance();
+
+    };
+
+}}}
+
+#endif /*_ACTIVEMQ_TRANSPORT_FILTERS_ASYNCSENDTRANSPORTFACTORY_H_*/

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/FutureResponse.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/FutureResponse.h?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/FutureResponse.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/FutureResponse.h Thu Apr 12 13:29:39 2007
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_FILTERS_FUTURERESPONSE_H_
+#define ACTIVEMQ_TRANSPORT_FILTERS_FUTURERESPONSE_H_
+
+#include <activemq/concurrent/Mutex.h>
+#include <activemq/concurrent/Concurrent.h>
+#include <activemq/transport/Response.h>
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace transport{
+namespace filters{
+
+    /**
+     * A container that holds a response object.  Since this
+     * object is Synchronizable, callers can wait on this object
+     * and when a response comes in, notify can be called to
+     * inform those waiting that the response is now available.
+     */
+    class FutureResponse : public concurrent::Synchronizable{
+    private:
+
+        Response* response;
+        concurrent::Mutex mutex;
+
+    public:
+
+        FutureResponse(){
+            response = NULL;
+        }
+
+        virtual ~FutureResponse(){}
+
+        /**
+         * Locks the object.
+         * @throws ActiveMQException
+         */
+        virtual void lock() throw( exceptions::ActiveMQException ){
+            mutex.lock();
+        }
+
+        /**
+         * Unlocks the object.
+         * @throws ActiveMQException
+         */
+        virtual void unlock() throw( exceptions::ActiveMQException ){
+            mutex.unlock();
+        }
+
+        /**
+         * Waits on a signal from this object, which is generated
+         * by a call to Notify.  Must have this object locked before
+         * calling.
+         * @throws ActiveMQException
+         */
+        virtual void wait() throw( exceptions::ActiveMQException ){
+            mutex.wait();
+        }
+
+        /**
+         * Waits on a signal from this object, which is generated
+         * by a call to Notify.  Must have this object locked before
+         * calling.  This wait will timeout after the specified time
+         * interval.
+         * @param millisecs time in millisecsonds to wait, or WAIT_INIFINITE
+         * @throws ActiveMQException
+         */
+        virtual void wait( unsigned long millisecs )
+            throw( exceptions::ActiveMQException )
+        {
+            mutex.wait( millisecs );
+        }
+
+        /**
+         * Signals a waiter on this object that it can now wake
+         * up and continue.  Must have this object locked before
+         * calling.
+         * @throws ActiveMQException
+         */
+        virtual void notify() throw( exceptions::ActiveMQException ){
+            mutex.notify();
+        }
+
+        /**
+         * Signals the waiters on this object that it can now wake
+         * up and continue.  Must have this object locked before
+         * calling.
+         * @throws ActiveMQException
+         */
+        virtual void notifyAll() throw( exceptions::ActiveMQException ){
+            mutex.notifyAll();
+        }
+
+        /**
+         * Getters for the response property.
+         * @return the response object for the request
+         */
+        virtual const Response* getResponse() const{
+            return response;
+        }
+        virtual Response* getResponse(){
+            return response;
+        }
+
+        /**
+         * Setter for the response property.
+         * @param response the response object for the request.
+         */
+        virtual void setResponse( Response* response ){
+            this->response = response;
+        }
+    };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_FILTERS_FUTURERESPONSE_H_*/

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransport.cpp?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransport.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransport.cpp Thu Apr 12 13:29:39 2007
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "LoggingTransport.h"
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::transport::filters;
+
+LOGCMS_INITIALIZE( logger, LoggingTransport, "activemq.transport.filters.LoggingTransport")
+
+////////////////////////////////////////////////////////////////////////////////
+LoggingTransport::LoggingTransport( Transport* next, bool own )
+ :  TransportFilter( next, own )
+{}
+
+////////////////////////////////////////////////////////////////////////////////
+void LoggingTransport::onCommand( Command* command ) {
+
+    ostringstream ostream;
+    ostream << "*** BEGIN RECEIVED ASYNCHRONOUS COMMAND ***" << endl;
+    ostream << command->toString() << endl;
+    ostream << "*** END RECEIVED ASYNCHRONOUS COMMAND ***";
+
+    LOGCMS_INFO( logger, ostream.str() );
+
+    // Delegate to the base class.
+    TransportFilter::onCommand( command );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void LoggingTransport::oneway( Command* command )
+    throw(CommandIOException, exceptions::UnsupportedOperationException) {
+
+    try {
+
+        ostringstream ostream;
+        ostream << "*** BEGIN SENDING ONEWAY COMMAND ***" << endl;
+        ostream << command->toString() << endl;
+        ostream << "*** END SENDING ONEWAY COMMAND ***";
+
+        LOGCMS_INFO( logger, ostream.str() );
+
+        // Delegate to the base class.
+        TransportFilter::oneway( command );
+    }
+    AMQ_CATCH_RETHROW( CommandIOException )
+    AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException )
+    AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Response* LoggingTransport::request( Command* command )
+    throw(CommandIOException, exceptions::UnsupportedOperationException) {
+
+    try {
+
+        // Delegate to the base class.
+        Response* response = TransportFilter::request( command );
+
+        ostringstream ostream;
+        ostream << "*** SENDING REQUEST COMMAND ***" << endl;
+        ostream << command->toString() << endl;
+        ostream << "*** RECEIVED RESPONSE COMMAND ***" << endl;
+        ostream << ( response == NULL? "NULL" : response->toString() );
+
+        LOGCMS_INFO( logger, ostream.str() );
+
+        return response;
+    }
+    AMQ_CATCH_RETHROW( CommandIOException )
+    AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException )
+    AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+
+

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransport.h?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransport.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransport.h Thu Apr 12 13:29:39 2007
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_FILTERS_LOGGINGTRANSPORT_H_
+#define ACTIVEMQ_TRANSPORT_FILTERS_LOGGINGTRANSPORT_H_
+
+#include <activemq/transport/TransportFilter.h>
+#include <activemq/logger/LoggerDefines.h>
+
+namespace activemq{
+namespace transport{
+namespace filters{
+
+    /**
+     * A transport filter that logs commands as they are sent/received.
+     */
+    class LoggingTransport : public TransportFilter
+    {
+    private:
+
+        LOGCMS_DECLARE( logger )
+
+    public:
+
+        /**
+         * Constructor.
+         * @param next - the next Transport in the chain
+         * @param own - true if this filter owns the next and should delete it
+         */
+        LoggingTransport( Transport* next, bool own = true );
+
+        virtual ~LoggingTransport() {}
+
+        /**
+         * Event handler for the receipt of a command.
+         * @param command - the received command object.
+         */
+        virtual void onCommand( Command* command );
+
+        /**
+         * Sends a one-way command.  Does not wait for any response from the
+         * broker.
+         * @param command the command to be sent.
+         * @throws CommandIOException if an exception occurs during writing of
+         * the command.
+         * @throws UnsupportedOperationException if this method is not implemented
+         * by this transport.
+         */
+        virtual void oneway( Command* command ) throw(CommandIOException, exceptions::UnsupportedOperationException);
+
+        /**
+         * Not supported by this class - throws an exception.
+         * @param command the command that is sent as a request
+         * @throws UnsupportedOperationException.
+         */
+        virtual Response* request( Command* command ) throw(CommandIOException, exceptions::UnsupportedOperationException);
+
+    };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_FILTERS_LOGGINGTRANSPORT_H_*/

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransportFactory.cpp?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransportFactory.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransportFactory.cpp Thu Apr 12 13:29:39 2007
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "LoggingTransportFactory.h"
+
+#include <activemq/transport/filters/LoggingTransport.h>
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::transport::filters;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+TransportFactory& LoggingTransportFactory::getInstance(void)
+{
+    // Create the one and only instance of the registrar
+    static TransportFactoryMapRegistrar registrar(
+        "transport.filters.LoggingTransport",
+        new LoggingTransportFactory() );
+
+    return registrar.getFactory();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Transport* LoggingTransportFactory::createTransport(
+    const activemq::util::Properties& properties AMQCPP_UNUSED,
+    Transport* next,
+    bool own ) throw ( ActiveMQException ) {
+
+    try {
+        return new LoggingTransport( next, own );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransportFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransportFactory.h?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransportFactory.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransportFactory.h Thu Apr 12 13:29:39 2007
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_FILTERS_LOGGINGTRANSPORTFACTORY_H_
+#define _ACTIVEMQ_TRANSPORT_FILTERS_LOGGINGTRANSPORTFACTORY_H_
+
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/transport/TransportFactoryMapRegistrar.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace transport{
+namespace filters{
+
+    /**
+     * Factory Responsible for creating the LoggingTransport.
+     */
+    class LoggingTransportFactory : public TransportFactory {
+
+    public:
+
+        virtual ~LoggingTransportFactory() {}
+
+        /**
+         * Creates a Transport instance.
+         * @param properties - Object that will hold transport config values
+         * @param next - the next transport in the chain, or NULL
+         * @param own - does the new Transport own the next
+         * @throws ActiveMQException if an error occurs.
+         */
+        virtual Transport* createTransport(
+            const activemq::util::Properties& properties,
+            Transport* next,
+            bool own ) throw ( exceptions::ActiveMQException );
+
+        /**
+         * Returns a reference to this TransportFactory
+         * @returns TransportFactory Reference
+         */
+        static TransportFactory& getInstance();
+
+    };
+
+}}}
+
+#endif /*_ACTIVEMQ_TRANSPORT_FILTERS_LOGGINGTRANSPORTFACTORY_H_*/

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelator.cpp?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelator.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelator.cpp Thu Apr 12 13:29:39 2007
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ResponseCorrelator.h"
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::transport::filters;
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int ResponseCorrelator::getNextCommandId()
+    throw ( exceptions::ActiveMQException ){
+
+    try{
+
+        synchronized( &commandIdMutex ){
+            return ++nextCommandId;
+        }
+
+        // Should never get here, but some compilers aren't
+        // smart enough to figure out we'll never get here.
+        return 0;
+    }
+    AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ResponseCorrelator::ResponseCorrelator( Transport* next, bool own )
+:
+    TransportFilter( next, own )
+{
+    nextCommandId = 0;
+
+    // Default max response wait time to 3 seconds.
+    maxResponseWaitTime = 3000;
+
+    // Start in the closed state.
+    closed = true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ResponseCorrelator::~ResponseCorrelator(){
+
+    // Close the transport and destroy it.
+    close();
+
+    // Don't do anything with the future responses -
+    // they should be cleaned up by each requester.
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned long ResponseCorrelator::getMaxResponseWaitTime() const{
+    return maxResponseWaitTime;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelator::setMaxResponseWaitTime( const unsigned long milliseconds ){
+    maxResponseWaitTime = milliseconds;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelator::oneway( Command* command )
+    throw( CommandIOException, exceptions::UnsupportedOperationException ) {
+
+    try{
+        command->setCommandId( getNextCommandId() );
+        command->setResponseRequired( false );
+
+        if( closed || next == NULL ){
+            throw CommandIOException( __FILE__, __LINE__,
+                "transport already closed" );
+        }
+
+        next->oneway( command );
+    }
+    AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException )
+    AMQ_CATCH_RETHROW( CommandIOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException )
+    AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Response* ResponseCorrelator::request( Command* command )
+    throw( CommandIOException, exceptions::UnsupportedOperationException ) {
+
+    try{
+        command->setCommandId( getNextCommandId() );
+        command->setResponseRequired( true );
+
+        // Add a future response object to the map indexed by this
+        // command id.
+        FutureResponse* futureResponse =
+           new FutureResponse();
+
+        synchronized( &mapMutex ){
+            requestMap[command->getCommandId()] = futureResponse;
+        }
+
+        // Wait to be notified of the response via the futureResponse
+        // object.
+        Response* response = NULL;
+        synchronized( futureResponse ){
+
+            // Send the request.
+            next->oneway( command );
+
+            // Wait for the response to come in.
+            futureResponse->wait( maxResponseWaitTime );
+
+            // Get the response.
+            response = futureResponse->getResponse();
+        }
+
+        // Perform cleanup on the map.
+        synchronized( &mapMutex ){
+
+            // We've done our waiting - get this thing out
+            // of the map.
+            requestMap.erase( command->getCommandId() );
+
+            // Destroy the futureResponse.  It is safe to
+            // do this now because the other thread only
+            // accesses the futureResponse within a lock on
+            // the map.
+            delete futureResponse;
+            futureResponse = NULL;
+        }
+
+        if( response == NULL ){
+
+            throw CommandIOException( __FILE__, __LINE__,
+                "response from futureResponse was invalid" );
+        }
+
+        return response;
+    }
+    AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException )
+    AMQ_CATCH_RETHROW( CommandIOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException )
+    AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelator::onCommand( Command* command ) {
+
+    // Let's see if the incoming command is a response.
+    Response* response =
+       dynamic_cast<Response*>( command );
+
+    if( response == NULL ){
+
+        // It's a non-response - just notify the listener.
+        fire( command );
+        return;
+    }
+
+    // It is a response - let's correlate ...
+    synchronized( &mapMutex ){
+
+        // Look the future request up based on the correlation id.
+        std::map<unsigned int, FutureResponse*>::iterator iter =
+            requestMap.find( response->getCorrelationId() );
+        if( iter == requestMap.end() ){
+
+            // This is not terrible - just log it.
+            printf("ResponseCorrelator::onCommand() - received unknown response for request: %d\n",
+                response->getCorrelationId() );
+            return;
+        }
+
+        // Get the future response (if it's in the map, it's not NULL).
+        FutureResponse* futureResponse = NULL;
+        futureResponse = iter->second;
+
+        synchronized( futureResponse ){
+
+            // Set the response property in the future response.
+            futureResponse->setResponse( response );
+
+            // Notify all waiting for this response.
+            futureResponse->notifyAll();
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelator::start() throw( cms::CMSException ) {
+
+    /**
+     * We're already started.
+     */
+    if( !closed ){
+        return;
+    }
+
+    if( commandlistener == NULL ){
+        throw exceptions::ActiveMQException( __FILE__, __LINE__,
+            "commandListener is invalid" );
+    }
+
+    if( exceptionListener == NULL ){
+        throw exceptions::ActiveMQException( __FILE__, __LINE__,
+            "exceptionListener is invalid" );
+    }
+
+    if( next == NULL ){
+        throw exceptions::ActiveMQException( __FILE__, __LINE__,
+            "next transport is NULL" );
+    }
+
+    // Start the delegate transport object.
+    next->start();
+
+    // Mark it as open.
+    closed = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelator::close() throw( cms::CMSException ){
+
+    if( !closed && next != NULL ){
+        next->close();
+    }
+
+    closed = true;
+}
+

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelator.h?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelator.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelator.h Thu Apr 12 13:29:39 2007
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_FILTERS_RESPONSECORRELATOR_H_
+#define ACTIVEMQ_TRANSPORT_FILTERS_RESPONSECORRELATOR_H_
+
+#include <activemq/transport/TransportFilter.h>
+#include <activemq/transport/filters/FutureResponse.h>
+#include <activemq/transport/Command.h>
+#include <activemq/concurrent/Mutex.h>
+#include <activemq/concurrent/Concurrent.h>
+#include <map>
+#include <stdio.h>
+
+namespace activemq{
+namespace transport{
+namespace filters{
+
+    /**
+     * This type of transport filter is responsible for correlating
+     * asynchronous responses with requests.  Non-response messages
+     * are simply sent directly to the CommandListener.  It owns
+     * the transport that it
+     */
+    class ResponseCorrelator : public TransportFilter
+    {
+    private:
+
+        /**
+         * The next command id for sent commands.
+         */
+        unsigned int nextCommandId;
+
+        /**
+         * Map of request ids to future response objects.
+         */
+        std::map<unsigned int, FutureResponse*> requestMap;
+
+        /**
+         * Maximum amount of time in milliseconds to wait for a response.
+         */
+        unsigned long maxResponseWaitTime;
+
+        /**
+         * Sync object for accessing the next command id variable.
+         */
+        concurrent::Mutex commandIdMutex;
+
+        /**
+         * Sync object for accessing the request map.
+         */
+        concurrent::Mutex mapMutex;
+
+        /**
+         * Flag to indicate the closed state.
+         */
+        bool closed;
+
+    private:
+
+        /**
+         * Returns the next available command id.
+         */
+        unsigned int getNextCommandId() throw ( exceptions::ActiveMQException );
+
+    public:
+
+        /**
+         * Constructor.
+         * @param next the next transport in the chain
+         * @param own indicates if this transport owns the next
+         */
+        ResponseCorrelator( Transport* next, bool own = true );
+
+        virtual ~ResponseCorrelator();
+
+        /**
+         * Gets the maximum wait time for a response in milliseconds.
+         * @return max time that a response can take
+         */
+        virtual unsigned long getMaxResponseWaitTime() const;
+
+        /**
+         * Sets the maximum wait time for a response in milliseconds.
+         * @param milliseconds the max time that a response can take.
+         */
+        virtual void setMaxResponseWaitTime( const unsigned long milliseconds );
+
+        /**
+         * Sends a one-way command.  Does not wait for any response from the
+         * broker.
+         * @param command the command to be sent.
+         * @throws CommandIOException if an exception occurs during writing of
+         * the command.
+         * @throws UnsupportedOperationException if this method is not implemented
+         * by this transport.
+         */
+        virtual void oneway( Command* command )
+            throw( CommandIOException, exceptions::UnsupportedOperationException );
+
+        /**
+         * Sends the given request to the server and waits for the response.
+         * @param command The request to send.
+         * @return the response from the server.
+         * @throws CommandIOException if an error occurs with the request.
+         */
+        virtual Response* request( Command* command )
+            throw( CommandIOException, exceptions::UnsupportedOperationException );
+
+        /**
+         * This is called in the context of the nested transport's
+         * reading thread.  In the case of a response object,
+         * updates the request map and notifies those waiting on the
+         * response.  Non-response messages are just delegated to
+         * the command listener.
+         * @param command the received from the nested transport.
+         */
+        virtual void onCommand( Command* command );
+
+        /**
+         * Starts this transport object and creates the thread for
+         * polling on the input stream for commands.  If this object
+         * has been closed, throws an exception.  Before calling start,
+         * the caller must set the IO streams and the reader and writer
+         * objects.
+         * @throws CMSException if an error occurs or if this transport
+         * has already been closed.
+         */
+        virtual void start() throw( cms::CMSException );
+
+        /**
+         * Stops the polling thread and closes the streams.  This can
+         * be called explicitly, but is also called in the destructor. Once
+         * this object has been closed, it cannot be restarted.
+         * @throws CMSException if errors occur.
+         */
+        virtual void close() throw( cms::CMSException );
+
+    };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_FILTERS_RESPONSECORRELATOR_H_*/

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelatorFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelatorFactory.cpp?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelatorFactory.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelatorFactory.cpp Thu Apr 12 13:29:39 2007
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ResponseCorrelatorFactory.h"
+
+#include <activemq/transport/filters/ResponseCorrelator.h>
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::transport::filters;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+TransportFactory& ResponseCorrelatorFactory::getInstance(void)
+{
+    // Create the one and only instance of the registrar
+    static TransportFactoryMapRegistrar registrar(
+        "transport.filters.ResponseCorrelator",
+        new ResponseCorrelatorFactory() );
+
+    return registrar.getFactory();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Transport* ResponseCorrelatorFactory::createTransport(
+    const activemq::util::Properties& properties AMQCPP_UNUSED,
+    Transport* next,
+    bool own ) throw ( ActiveMQException ) {
+
+    try {
+        return new ResponseCorrelator( next, own );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelatorFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelatorFactory.h?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelatorFactory.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelatorFactory.h Thu Apr 12 13:29:39 2007
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_FILTERS_RESPONSECORRELATORFACTORY_H_
+#define _ACTIVEMQ_TRANSPORT_FILTERS_RESPONSECORRELATORFACTORY_H_
+
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/transport/TransportFactoryMapRegistrar.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace transport{
+namespace filters{
+
+    /**
+     * Factory Responsible for creating the ResponseCorrelator.
+     */
+    class ResponseCorrelatorFactory : public TransportFactory {
+
+    public:
+
+        virtual ~ResponseCorrelatorFactory() {}
+
+        /**
+         * Creates a Transport instance.
+         * @param properties - Object that will hold transport config values
+         * @param next - the next transport in the chain, or NULL
+         * @param own - does the new Transport own the next
+         * @throws ActiveMQException if an error occurs.
+         */
+        virtual Transport* createTransport(
+            const activemq::util::Properties& properties,
+            Transport* next,
+            bool own ) throw ( exceptions::ActiveMQException );
+
+        /**
+         * Returns a reference to this TransportFactory
+         * @returns TransportFactory Reference
+         */
+        static TransportFactory& getInstance();
+
+    };
+
+}}}
+
+#endif /*_ACTIVEMQ_TRANSPORT_FILTERS_RESPONSECORRELATORFACTORY_H_*/

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransport.cpp?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransport.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransport.cpp Thu Apr 12 13:29:39 2007
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TcpTransport.h"
+
+#include <activemq/network/SocketFactory.h>
+#include <activemq/transport/IOTransport.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/transport/TransportFactoryMap.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::io;
+using namespace activemq::transport;
+using namespace activemq::transport::filters;
+using namespace activemq::network;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+TcpTransport::TcpTransport( const activemq::util::Properties& properties,
+                            Transport* next,
+                            const bool own )
+:
+    TransportFilter( next, own ),
+    socket( NULL ),
+    loggingInputStream( NULL ),
+    loggingOutputStream( NULL ),
+    bufferedInputStream( NULL ),
+    bufferedOutputStream( NULL )
+{
+    try
+    {
+        if( !properties.hasProperty( "transport.uri" ) ) {
+            throw ActiveMQException(
+                __FILE__, __LINE__,
+                "TcpTransport::TcpTransport - "
+                "No URI set for this transport to connect to.");
+        }
+
+        // Create the IO device we will be communicating over the
+        // wire with.  This may need to change if we add more types
+        // of sockets, such as SSL.
+        socket = SocketFactory::createSocket(
+            properties.getProperty( "transport.uri" ), properties );
+
+        // Cast it to an IO transport so we can wire up the socket
+        // input and output streams.
+        IOTransport* ioTransport = dynamic_cast<IOTransport*>( next );
+        if( ioTransport == NULL ){
+            throw ActiveMQException(
+                __FILE__, __LINE__,
+                "TcpTransport::TcpTransport - "
+                "transport must be of type IOTransport");
+        }
+
+        InputStream* inputStream = socket->getInputStream();
+        OutputStream* outputStream = socket->getOutputStream();
+
+        // If tcp tracing was enabled, wrap the iostreams with logging streams
+        if( properties.getProperty( "transport.tcpTracingEnabled", "false" ) == "true" ) {
+            loggingInputStream = new LoggingInputStream( inputStream );
+            loggingOutputStream = new LoggingOutputStream( outputStream );
+
+            inputStream = loggingInputStream;
+            outputStream = loggingOutputStream;
+        }
+
+        // Now wrap the input/output streams with buffered streams
+        bufferedInputStream = new BufferedInputStream(inputStream);
+        bufferedOutputStream = new BufferedOutputStream(outputStream);
+
+        // Give the IOTransport the streams.
+        ioTransport->setInputStream( bufferedInputStream );
+        ioTransport->setOutputStream( bufferedOutputStream );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TcpTransport::~TcpTransport()
+{
+    try
+    {
+        try{
+            close();
+        } catch( cms::CMSException& ex ){ /* Absorb */ }
+
+        if( socket != NULL ) {
+            delete socket;
+            socket = NULL;
+        }
+
+        if( loggingInputStream != NULL ) {
+            delete loggingInputStream;
+            loggingInputStream = NULL;
+        }
+
+        if( loggingOutputStream != NULL ) {
+            delete loggingOutputStream;
+            loggingOutputStream = NULL;
+        }
+
+        if( bufferedInputStream != NULL ) {
+            delete bufferedInputStream;
+            bufferedInputStream = NULL;
+        }
+
+        if( bufferedOutputStream != NULL ) {
+            delete bufferedOutputStream;
+            bufferedOutputStream = NULL;
+        }
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpTransport::close() throw( cms::CMSException ) {
+
+    try
+    {
+        // Invoke the paren't close first.
+        TransportFilter::close();
+
+        // Close the socket.
+        if( socket != NULL ) {
+            socket->close();
+        }
+    }
+    AMQ_CATCH_RETHROW( SocketException )
+    AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, SocketException )
+    AMQ_CATCHALL_THROW( SocketException )
+}

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransport.h?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransport.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransport.h Thu Apr 12 13:29:39 2007
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_FILTERS_TCPTRANSPORT_H_
+#define _ACTIVEMQ_TRANSPORT_FILTERS_TCPTRANSPORT_H_
+
+#include <activemq/transport/TransportFilter.h>
+#include <activemq/network/Socket.h>
+#include <activemq/util/Properties.h>
+#include <activemq/io/LoggingInputStream.h>
+#include <activemq/io/LoggingOutputStream.h>
+#include <activemq/io/BufferedInputStream.h>
+#include <activemq/io/BufferedOutputStream.h>
+
+namespace activemq{
+namespace transport{
+namespace filters{
+
+    /**
+     * Implements a TCP/IP based transport filter, this transport
+     * is meant to wrap an instance of an IOTransport.  The lower
+     * level transport should take care of manaing stream reads
+     * and writes.
+     */
+    class TcpTransport : public TransportFilter
+    {
+    private:
+
+        /**
+         * Socket that this Transport Communicates with
+         */
+        network::Socket* socket;
+
+        io::LoggingInputStream* loggingInputStream;
+        io::LoggingOutputStream* loggingOutputStream;
+
+        io::BufferedInputStream* bufferedInputStream;
+        io::BufferedOutputStream* bufferedOutputStream;
+
+    public:
+
+        /**
+         * Constructor
+         * @param properties the configuration properties for this transport
+         * @param next the next transport in the chain
+         * @param own indicates if this transport owns the next.
+         */
+        TcpTransport( const activemq::util::Properties& properties,
+                      Transport* next,
+                      const bool own = true );
+
+        virtual ~TcpTransport();
+
+        /**
+         * Delegates to the superclass and then closes the socket.
+         * @throws CMSException if errors occur.
+         */
+        virtual void close() throw( cms::CMSException );
+
+    };
+
+}}}
+
+#endif /*_ACTIVEMQ_TRANSPORT_FILTERS_TCPTRANSPORT_H_*/

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransportFactory.cpp?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransportFactory.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransportFactory.cpp Thu Apr 12 13:29:39 2007
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TcpTransportFactory.h"
+
+#include <activemq/transport/filters/TcpTransport.h>
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::transport::filters;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+TransportFactory& TcpTransportFactory::getInstance(void)
+{
+    // Create the one and only instance of the registrar
+    static TransportFactoryMapRegistrar registrar(
+        "tcp", new TcpTransportFactory() );
+
+    return registrar.getFactory();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Transport* TcpTransportFactory::createTransport(
+    const activemq::util::Properties& properties,
+    Transport* next,
+    bool own ) throw ( ActiveMQException ) {
+
+    try {
+        return new TcpTransport( properties, next, own );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransportFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransportFactory.h?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransportFactory.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransportFactory.h Thu Apr 12 13:29:39 2007
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_FILTERS_TCPTRANSPORTFACTORY_H_
+#define _ACTIVEMQ_TRANSPORT_FILTERS_TCPTRANSPORTFACTORY_H_
+
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/transport/TransportFactoryMapRegistrar.h>
+#include <activemq/transport/IOTransportFactory.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace transport{
+namespace filters{
+
+    /**
+     * Factory Responsible for creating the TcpTransport.
+     */
+    class TcpTransportFactory : public TransportFactory
+    {
+    public:
+
+        virtual ~TcpTransportFactory() {}
+
+        /**
+         * Creates a Transport instance.
+         * @param properties - Object that will hold transport config values
+         * @param next - the next transport in the chain, or NULL
+         * @param own - does the new Transport own the next
+         * @throws ActiveMQException if an error occurs.
+         */
+        virtual Transport* createTransport(
+            const activemq::util::Properties& properties,
+            Transport* next,
+            bool own ) throw ( exceptions::ActiveMQException );
+
+        /**
+         * Returns a reference to this TransportFactory
+         * @returns TransportFactory Reference
+         */
+        static TransportFactory& getInstance();
+
+    };
+
+}}}
+
+#endif /*_ACTIVEMQ_TRANSPORT_FILTERS_TCPTRANSPORTFACTORY_H_*/

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireAsyncSenderTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireAsyncSenderTest.cpp?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireAsyncSenderTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireAsyncSenderTest.cpp Thu Apr 12 13:29:39 2007
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
+
 #include "OpenwireAsyncSenderTest.h"
 
 #include <integration/IntegrationCommon.h>
@@ -74,7 +74,7 @@
 ////////////////////////////////////////////////////////////////////////////////
 OpenwireAsyncSenderTest::OpenwireAsyncSenderTest()
 :
-    testSupport("tcp://localhost:61616?wireFormat=openwire&useAsyncSend=true")
+    testSupport("tcp://localhost:61616?wireFormat=openwire&transport.useAsyncSend=true")
 {
     testSupport.initialize();
 }
@@ -93,42 +93,42 @@
             cout << "Starting activemqcms test (sending "
                  << IntegrationCommon::defaultMsgCount
                  << " messages per type and sleeping "
-                 << IntegrationCommon::defaultDelay 
+                 << IntegrationCommon::defaultDelay
                  << " milli-seconds) ...\n"
                  << endl;
         }
-        
+
         // Create CMS Object for Comms
         cms::Session* session = testSupport.getSession();
         cms::Topic* topic = testSupport.getSession()->createTopic("mytopic");
-        cms::MessageConsumer* consumer = 
-            session->createConsumer( topic );            
+        cms::MessageConsumer* consumer =
+            session->createConsumer( topic );
         consumer->setMessageListener( &testSupport );
-        cms::MessageProducer* producer = 
+        cms::MessageProducer* producer =
             session->createProducer( topic );
 
         // Send some text messages
-        testSupport.produceTextMessages( 
+        testSupport.produceTextMessages(
             *producer, IntegrationCommon::defaultMsgCount );
-        
+
         // Send some bytes messages.
-        testSupport.produceTextMessages( 
+        testSupport.produceTextMessages(
             *producer, IntegrationCommon::defaultMsgCount );
 
         // Wait for the messages to get here
         testSupport.waitForMessages( IntegrationCommon::defaultMsgCount * 2 );
-        
+
         unsigned int numReceived = testSupport.getNumReceived();
         if( IntegrationCommon::debug ) {
             printf("received: %d\n", numReceived );
         }
-        CPPUNIT_ASSERT( 
+        CPPUNIT_ASSERT(
             numReceived == IntegrationCommon::defaultMsgCount * 2 );
 
         if( IntegrationCommon::debug ) {
             printf("Shutting Down\n" );
         }
-        delete producer;                      
+        delete producer;
         delete consumer;
         delete topic;
     }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.cpp?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.cpp Thu Apr 12 13:29:39 2007
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
+
 #include "AsyncSenderTest.h"
 
 #include <integration/IntegrationCommon.h>
@@ -74,7 +74,7 @@
 ////////////////////////////////////////////////////////////////////////////////
 AsyncSenderTest::AsyncSenderTest()
 :
-    testSupport("tcp://localhost:61613?wireFormat=stomp&useAsyncSend=true")
+    testSupport("tcp://localhost:61613?wireFormat=stomp&transport.useAsyncSend=true")
 {
     testSupport.initialize();
 }
@@ -93,42 +93,42 @@
             cout << "Starting activemqcms test (sending "
                  << IntegrationCommon::defaultMsgCount
                  << " messages per type and sleeping "
-                 << IntegrationCommon::defaultDelay 
+                 << IntegrationCommon::defaultDelay
                  << " milli-seconds) ...\n"
                  << endl;
         }
-        
+
         // Create CMS Object for Comms
         cms::Session* session = testSupport.getSession();
         cms::Topic* topic = testSupport.getSession()->createTopic("mytopic");
-        cms::MessageConsumer* consumer = 
-            session->createConsumer( topic );            
+        cms::MessageConsumer* consumer =
+            session->createConsumer( topic );
         consumer->setMessageListener( &testSupport );
-        cms::MessageProducer* producer = 
+        cms::MessageProducer* producer =
             session->createProducer( topic );
 
         // Send some text messages
-        testSupport.produceTextMessages( 
+        testSupport.produceTextMessages(
             *producer, IntegrationCommon::defaultMsgCount );
-        
+
         // Send some bytes messages.
-        testSupport.produceTextMessages( 
+        testSupport.produceTextMessages(
             *producer, IntegrationCommon::defaultMsgCount );
 
         // Wait for the messages to get here
         testSupport.waitForMessages( IntegrationCommon::defaultMsgCount * 2 );
-        
+
         unsigned int numReceived = testSupport.getNumReceived();
         if( IntegrationCommon::debug ) {
             printf("received: %d\n", numReceived );
         }
-        CPPUNIT_ASSERT( 
+        CPPUNIT_ASSERT(
             numReceived == IntegrationCommon::defaultMsgCount * 2 );
 
         if( IntegrationCommon::debug ) {
             printf("Shutting Down\n" );
         }
-        delete producer;                      
+        delete producer;
         delete consumer;
         delete topic;
     }